Broadleaf Microservices
  • v1.0.0-latest-prod

Broadleaf Workflow Services

since 2.1.4|2.2.0

This feature is still in Beta. Functionality and documentation are subject to change. Workflows will be more deeply leveraged in the framework in future releases, but the functionality is available for review now - allowing the creation of custom workflows.

Overview

Workflows are used to describe one or more steps in a business process with optional branching outcomes. Progress monitoring is included in the admin tool, as well as functions to pause, retry, and cancel (to name a few).

Key Features

  • Workflows are described as configuration, rather than code. This is one of the big differences between Cadence and Broadleaf Workflow Services. The latter allows for a more declarative approach to workflow definition, which is easier to understand and maintain. It also removes the need to worry about proper deterministic programming in the workflow, as this aspect is controlled by the engine instead.

  • Workflow configuration describes one or more steps with optional outcomes, leading to a possible decision tree of execution. This allows for complex workflows to be defined in a simple and understandable way.

  • Each step in the workflow is coded as an activity, which is a Java construct made available in the Broadleaf codebase.

  • In addition to basic business logic execution, activities can be coded for advanced functionality, including interruption for timely workflow pause and cancellation, as well as pausing the workflow altogether upon exit. The latter feature opens the door for "human-in-the-loop" workflows, where a human can interact with the process - possibly completing a necessary step - before the workflow is allowed to continue.

  • Activities can be coded in multiple ways with exceptions in mind. Exceptions may be bubbled, in which case, the workflow is updated with a failure status and becomes available for review and retry - usually via the admin tool. Alternatively, the activity may catch the exception and return a different response driving an alternate path in the workflow configuration.

  • Workflow Services supports idempotency so that a process responsible for requesting a workflow can be retried without fear of spawning a duplicate workflow.

  • Workflows are managed in the admin. Workflows can be filtered based on various criteria, including context. Progress can be monitored in the admin view. Workflows can also be configured to allow retry, pause, resume, and cancellation. Execution history is also visualized here.

  • Persistent logging is available to activity implementations. This allows important information to be exposed to business users in the admin tool - possibly allowing for additional remediation steps to be taken before workflow continuation.

  • Workflow execution scales horizontally with more replicas of the workflow service (or containing flexpackage). It is set up with high consumption concurrency for command messages via Spring Cloud Stream and a backing message broker.

Requirements

  • The workflow client library must be available on the classpath to implement activities, and to initiate workflows from other services. Whether or not if using a Broadleaf starter-based project, the framework component override module in your project should include a maven dependency on the client library. A version should not be required and should be auto-resolved via the inherited common dependencies BOM.

<dependency>
    <groupId>com.broadleafcommerce.microservices</groupId>
    <artifactId>broadleaf-workflow-client</artifactId>
</dependency>
  • WorkflowServices must be running as part of the stack. This is generally achieved by updating the Broadleaf starter manifest to include.

components:
- name: workflow
  routed: true
  domain:
    cloud: workflow
    docker: workflow
    local: localhost
  enabled: false
  ports:
    - port: 8488
      targetPort: 8488
    - debug: true
      port: 8088
      targetPort: 8088
flexPackages:
- name: processing
  domain:
    cloud: processing
    docker: processing
    local: localhost
  enabled: false
  flexUnits: import,indexer,scheduledjob,search,inventory,catalog,offer,pricing,customer,order,menu,content,bulkops,workflow
You may choose a different flexpackage than the example, or configure a new granular flexpackage to contain the workflow service.

Usage

Project Structure

Best practice is to house workflow configuration and activity implementations in a new maven module in your project structure. The library emitted by this module should be available on the classpath of WorkflowServices at runtime - usually by including it as a dependency in the pom.xml of the flexpackage containing the workflow service. An example pom.xml for this new module could potentially be as simple as this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
		<groupId>com.broadleafcommerce.microservices</groupId>
		<artifactId>broadleaf-microservices-flex-parent</artifactId>
		<version>2.2.0-GA</version>
		<relativePath/>
	</parent>
    <artifactId>workflow-fulfillment</artifactId>
    <groupId>com.example.microservices</groupId>
    <name>Fulfillment Workflow Implementations</name>
    <description>Fulfillment Workflow Implementations</description>
    <version>1.0.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.broadleafcommerce.microservices</groupId>
            <artifactId>broadleaf-workflow-client</artifactId>
        </dependency>
        <dependency>
            <groupId>com.broadleafcommerce.microservices</groupId>
            <artifactId>broadleaf-common-extension-compatibility</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.broadleafcommerce.microservices</groupId>
            <artifactId>broadleaf-common-extension-compatibility-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
To execute a workflow in WorkflowServices, the workflow configuration and activity implementations must be visible to WorkflowServices at runtime.

Creating Activities

An activity represents a single step in an overall business process (represented here as a workflow). Activities are coded as Java classes implementing the SimpleActivity interface.

public class MyActivity implements SimpleActivity {

    @Override
    public ActivityResponse start(Map<String, Object> context) {
        // Do something interesting
        // Usually informed by the context
        // Return a response to the workflow engine
        return new ActivityResponse("ok"); (1)
    }

    @Override
    public String getName() {
        return "myBeanName"; (2)
    }

    @Override
    public String getDescription() {
        return "My Display Information"; (3)
    }

}
1 The primary job of the ActivityResponse is to return a result hint to the system upon completion. The value is arbitrary. We’ll discover later that this value is used to pick the next step in the workflow, if applicable. The response can also include error information (which will be included in the execution history) if the activity decides to complete execution, rather than bubble and exception. Furthermore, the response provides API to update the context values (this is persistent), should it be required to inform subsequent activities. There are several more features in the response that will be explored in subsequent examples.
2 This is the name of the activity as it will be referenced in the workflow configuration. It must be unique across all activities in the system. It must also match the bean name used for the activity in Spring configuration.
3 This is the display name of the activity as it will be shown in the admin tool.

Programmatic Pause

Activities can be coded to pause the workflow execution at the time of completion. This is useful for human-in-the-loop workflows, where a human must complete a step before the workflow can continue.

public class MyActivity implements SimpleActivity {

    @Override
    public ActivityResponse start(Map<String, Object> context) {
        return new ActivityResponse("ok").withPauseOnReturn(true);
    }

    @Override
    public String getName() {
        return "myBeanName";
    }

    @Override
    public String getDescription() {
        return "My Display Information";
    }

}

Interruptible Activities

Generally, activities are short running. However, some activities may be coded for a longer lifecycle - possibly processing a large batch of data. In this case, the activity may be coded to allow for interruption. This allows for a timely pause of the activity upon admin user request, or application shutdown. Without interruption support, a pause request or shutdown will not take effect until the natural completion of the activity, which could take longer than desired.

WorkflowServices is hooked into the JVM shutdown lifecycle. When the application is shutdown gracefully, workflows will be notified with a pause signal with auto restart. If the current activity is long-running and is coded with interrupt support, is can quit in a timely fashion and pick up where it left off when the workflow is automatically restarted in a different replica of WorkflowServices. This helps protect workflow continuity in the face of application events that might otherwise cut off the workflow’s successful completion.
@RequiredArgsConstructor
public class TestActivity implements SimpleActivity {

    private final RecordRepository recordRepository;

    @Override
    public ActivityResponse start(Map<String, Object> context) {
        Iterator<Integer> records = recordRepository.findUnprocessRecords();
        while (!isInterrupted() & records.hasNext()) {
            recordRepository.processItem(records.next());
        }
        return new ActivityResponse("ok").withCompleteOnReturn(!isInterrupted()); (1)(2)
    }

    @Override
    public String getName() {
        return "myBeanName";
    }

    @Override
    public String getDescription() {
        return "My Display Information";
    }

}
1 ActivityResponse.withCompleteOnReturn() is used to indicate whether the system should consider the activity completed. This is important for paused workflows with a started, but incomplete activity (such as the case with activity interruption). If the system considers the activity incomplete, it will be able to pick up where it left off when the workflow is resumed.
2 Call made to the SimpleActivity.isInterrupted() method to check if the current workflow thread of execution has been given an interrupt hint.

Persistent Logging

Activities can be coded to log important information to the workflow execution history. This is useful for providing business users with information about the workflow execution, as well as for debugging purposes.

public class MyActivity implements SimpleActivity {

    @Override
    public ActivityResponse start(Map<String, Object> context) {
        try {
            log("Integration details", getIntegrationDetails(context));(1)
            String response = externalIntegration();
            log("Integration response", getIntegrationResponseDetails(response));
        } catch (Exception e) {
            log("Integration failed", getFailureInformation(e, context));
            return new ActivityResponse("nok");
        }
        return new ActivityResponse("ok");
    }

    @Override
    public String getName() {
        return "myBeanName";
    }

    @Override
    public String getDescription() {
        return "My Display Information";
    }

}
1 When viewed in the admin, or via /history endpoint, the information entered via SimpleActivity.log() will be displayed in the execution history.

Inter-Service Communication

Activities will often orchestrate calls to multiple services, both internal and external to the Broadleaf stack. For internal inter-service communication, the generally recommended approach is similar to what is used elsewhere in the stack. Establishing a service client leveraging Spring WebFlux using the OAuth client credentials flow remains the best practice. The developer portal documentation goes into more detail.

The client credentials flow requires access to certain secrets to enable the process. These are generally exposed via config server as part of starter configuration. Out-of-the-box, WorkflowServices is set up with the workflowclient OAuth client credentials - exposed via config server. As a result, this client credential will also be available to your activities running in the engine. However, this has limited scope by default. To increase the types of inter-service communications you can make via web client, you can add more scopes and permissions to workflowclient. This is generally done via liquibase as part of an auth services customization.
<changeSet author="me" id="my-id-1" labels="auth,required">
    <preConditions onFail="MARK_RAN">
        <sqlCheck expectedResult="0">SELECT COUNT(*) FROM blc_client_scopes WHERE
            scope='ORDER_FULFILLMENT' AND id='workflowclient';</sqlCheck>
    </preConditions>
    <insert tableName="blc_client_scopes">
        <column name="id" value="workflowclient" />
        <column name="scope" value="ORDER_FULFILLMENT" />
    </insert>
</changeSet>
<changeSet author="me" id="my-id-2" labels="auth,required">
    <preConditions onFail="MARK_RAN">
        <sqlCheck expectedResult="0">SELECT COUNT(*) FROM blc_client_permissions WHERE
            permission='READ_ORDER_FULFILLMENT' AND id='workflowclient';</sqlCheck>
    </preConditions>
    <insert tableName="blc_client_permissions">
        <column name="id" value="workflowclient" />
        <column name="permission" value="READ_ORDER_FULFILLMENT" />
    </insert>
</changeSet>

Configuring Workflows

The configuration of workflows (a collection of one or more ordered activities) is primarily a Spring environment concern. The setup is achieved by add Spring environment configuration support to your maven module. You must also declare your activities as Spring beans.

workflow-defaults.yml
broadleaf:
  workflow:
    client:
      flows:
        myFlow: (1)
          description: My Custom Workflow
          historical-reset-enabled: false (2)
          retry-enabled: true (3)
      steps:
        myFlow:
          myActivity: (4)
            admin-selectable: true (5)
            decisions:
              ok: myFinalActivity (6)
              nok: myWaitActivity (7)
          myWaitActivity:
            isWait: true (8)
            waitDuration: 2000
            decisions:
              ok: myActivity (9)
          myFinalActivity:
            admin-selectable: true
1 The name of the workflow. This is the name used to initiate the workflow via API. It should be unique.
2 Whether retry of an earlier, already executed step should be allowed via the admin tool or API. This is generally false, but would need to be true if allowing a retry of the entire workflow is desired, for example.
3 A subset of what is allowed via historical reset. True in this case indicates that the most recent attempted activity may be retried. This is generally true and is important for retry of the last problem activity in a failed workflow.
4 The name of the activity as it is referenced in the workflow configuration. This must match the bean name used for the activity in Spring configuration.
5 Whether the activity is selectable in the admin tool. The admin tool allows a go-to-step feature, as well as other functions on steps in the flow. This controls whether a given step is selectable for action to be taken with these functions. Setting to false can be useful to remove an activity from these functions.
6 In this case, the arbitrary "ok" value is used to indicate a successful result by the activity. In this case, the workflow is configured to proceed to the final activity.
7 In this case, the arbitrary "nok" value is used to indicate a failure result by the activity. In this case, the workflow is configured to proceed to the wait activity.
8 This indicates that the activity is a wait activity. This means that the workflow will pause at this step until the wait duration has elapsed. The wait duration is in milliseconds.
9 The step to proceed to after the wait is complete. In this case, we are setting up a recursion loop with the first activity.

Additional Concerns

  • Add an environment post processor class to load your workflow config file into the Spring environment.

  • Add a Spring configuration class to declare singleton beans for all your activity classes.

  • Add META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports to set up autoconfiguration for your workflow configuration class.

  • Add META-INF/spring/spring.factories to set up autoconfiguration for your environment post processor class.

Initiating Workflows

Workflows may be initiated programmatically from anywhere via the workflow client library. In fact, nested workflows may be called from workflow activities. Make sure your caller has access to the client library via maven dependency:

<dependency>
    <groupId>com.broadleafcommerce.microservices</groupId>
    <artifactId>broadleaf-workflow-client</artifactId>
</dependency>
private final WorkflowProvider workflowProvider; (1)

public void launchWorkflow(Map<String, Object> parameters) {
    ContextInfo contextInfo = buildContextInfo(workflowContext, OperationType.CREATE); (2)
    workflowProvider.start(new WorkflowRequest(parameters, (3)
                "myFlow", (4)
                "myReferenceId", (5)
                contextInfo));
}
1 WorkflowProvider is a Spring bean available at runtime via autoconfiguration from the workflow client.
2 Building a context info that has at least tenant and OperationType information for the creation of the workflow entity.
3 Create a WorkflowRequest to pass to the provider.
4 The name of the flow to initiate. This must match the name of the workflow in the configuration.
5 A reference id for the workflow. This is used for idempotency so that repeat requests for the same reference will not spawn duplicate workflows. Using a business reference is appropriate when applicable. For example, if establishing a workflow related to an order fulfillment, it is appropriate to use the fulfillment id as the reference id.
The WorkflowProvider also allows programmatic resume. This is useful in callback scenarios where a human-in-the-loop workflow is paused and the user has completed the necessary step. The workflow can be resumed programmatically via the WorkflowProvider.

API Features

  • Read - reading workflows by id and RSQL (including RSQL against workflow context parameters)

  • History - reading workflow execution history

  • Orchestration Map - reading the orchestration map for a given workflow instance. This includes a list of steps, what is selectable and retryable, and what is currently executed, failed, or not executed.

  • Update - Either update workflow context for a given instance, or post a signal to a given instance. Signals include GOTO, PAUSE, RESUME, and CANCEL.

  • Start - initiate a workflow instance.

  • Delete - delete a workflow instance.

See the Open API reference in the sidebar for more details on the REST API.

Testing

Writing tests can be challenging given that the workflow library development is generally separated from the workflow engine itself. As a result, you won’t usually have a complete WorkflowServices spring environment to spin up and read/execute your workflow configuration as part of an integration test. To help with this, we have added support to the workflow client library for robust unit test coverage of your activities in the context of a workflow execution.

Here’s a fairly sophisticated example that demonstrates an interruptible workflow and asserts results, as well as workflow state (e.g. history elements).

@Test
public void testInterruption() throws Exception {
    WorkflowContext.reset();
    RecordRepository recordRepository = new RecordRepository();
    WorkflowContext.set(clazz -> null, bean -> new TestActivity(recordRepository));
    List<Step> steps = List.of(new Step("test", false));
    SimpleWorkflowExecution workflow = new SimpleWorkflowExecution() {
        @Override
        public String getName() {
            return "test";
        }

        @Override
        public List<Step> getSteps() {
            return steps;
        }
    };
    Thread thread = new Thread(
            () -> assertThrows(WorkflowPauseException.class, () -> workflow.start(Map.of())));
    thread.start();
    Thread.sleep(1000);
    workflow.shutdown();
    thread.join();
    assertThat(recordRepository.getProcessed()).isGreaterThan(0);
    assertThat(workflow.getHistory()).hasSize(2);
}

@Data
static class RecordRepository {

    private Integer count = 0;
    private Integer processed = 0;
    private boolean end = false;

    private void processItem(Integer record) {
        // Simulate processing
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        processed = record;
    }

    private Iterator<Integer> findUnprocessRecords() {
        // Simulate finding unprocessed records
        return new Iterator<>() {
            @Override
            public boolean hasNext() {
                return !end;
            }

            @Override
            public Integer next() {
                count++;
                return count;
            }
        };
    }

}

@RequiredArgsConstructor
static class TestActivity implements SimpleActivity {

    private final RecordRepository recordRepository;

    @Override
    public ActivityResponse start(Map<String, Object> context) {
        Iterator<Integer> records = recordRepository.findUnprocessRecords();
        while (!isInterrupted() & records.hasNext()) {
            recordRepository.processItem(records.next());
        }
        return new ActivityResponse("ok").withCompleteOnReturn(!isInterrupted());
    }

    @Override
    public String getName() {
        return "test";
    }

    @Override
    public String getDescription() {
        return "Test Activity";
    }

}

Pruning

DefaultPruneService is an auto-starting lifecycle service that monitors the database for stale workflows and deletes them after a configurable retention period. See the javadocs for ExecutionProperties for more details, including retention period configuration.

Performance

  • WorkflowServices scales horizontally with each replica being capable of initiating and running workflows.

  • Workflow initiation commands are queued in the message broker

    • If using Kafka (the default), the out-of-the-box config uses 6 partitions. This is suitable for 3 replicas with a concurrency of 2.

    • If more replicas are desired, the partition count can be configured higher to accommodate.

  • WorkflowServices can be separated into a granular flexpackage for increased isolation and independent scale.

  • Activities support a current workflow execution cache (see SimpleActivity.getLocalCache()). This is a programmatic measure that can save on expensive repeated operations if the results can be cached. As a result, the cache can be inspected and leveraged across multiple activities in the same workflow.