Broadleaf Microservices
  • v1.0.0-latest-prod

Adding a New Bulk Operation

The Bulk Operations orchestration service is responsible for receiving bulk operation requests coming from the Unified Admin, and for processing them in the related microservices with the use of durable messaging.

Tip
Catalog Services offers a comprehensive implementation of bulk operations. Visit this documentation for a high-level explanation of its components

In this tutorial, we will go over the recommended implementation steps in the Catalog Service to delete products from a category in bulk. This example will touch on all the components needed to newly introduce the bulk operations flow to a microservice, please note that some of these components already exist in the Catalog Service out-of-box.

Expected Outcome

After following the implementation steps in this tutorial, the category products grid in the category view should allow selection of multiple products and display a "Bulk Operations" dropdown with the new "Delete Product" option.

Category Product Bulk Operations

Catalog Components

The CatalogBulkOperationConsumer Interface

The component needed to listen for messages in the processBulkOperationRequest channel.

public interface CatalogBulkOperationConsumer {
    String CHANNEL = "processBulkOperationRequestInput";

    @Input(CHANNEL)
    SubscribableChannel processBulkOperationRequestInput();
}

A binding for this channel should be declared in your configuration file:

spring:
  cloud:
    stream:
      bindings:
        processBulkOperationRequestInput:
          group: catalog-process-bulk-operation
          destination: processBulkOperationRequest
Important
This messaging configuration is included in CatalogServices out-of-box.

The BulkOperationHandler Component

Each microservice leveraging bulk operations functionality must have a BulkOperationHandler component to consume and process the Bulk Operation message emitted by the service. This component is also responsible for sending a message to trigger sandbox updates.

@Slf4j
@RequiredArgsConstructor
@DataRouteByKey(CATALOG_ROUTE_KEY)
public class CatalogBulkOperationHandler {

    @Getter(AccessLevel.PROTECTED)
    private final IdempotentMessageConsumptionService idempotentConsumptionService;

    @Getter(AccessLevel.PROTECTED)
    private final List<BulkUpdateProcessor> bulkUpdateProcessors;

    @Getter(AccessLevel.PROTECTED)
    private final BulkOperationService<BulkOperation> bulkOperationService;

    @Getter(value = AccessLevel.PROTECTED)
    private final DetachedDurableMessageSender sender;

    @StreamListener(CatalogBulkOperationConsumer.CHANNEL)
    public void listen(Message<BulkOpsProcessRequest> message) {
        idempotentConsumptionService.consumeMessage(message,
                CatalogBulkOperationHandler.class.getSimpleName(), this::processMessage);
    }

    protected void processMessage(@lombok.NonNull Message<BulkOpsProcessRequest> message) {
        BulkOpsProcessRequest payload = message.getPayload();

        log.info("Starter Bulk Operation {}:{}", payload.getOperationType(),
                payload.getBulkOperationId());

        for (BulkUpdateProcessor processor : bulkUpdateProcessors) {
            if (processor.canHandle(payload.getOperationType(), payload.getEntityType())) {

                // Read and update the bulk operation
                BulkOperation bulkOperation =
                        bulkOperationService.readById(payload.getBulkOperationId());
                bulkOperation = bulkOperationService.updateStatusAndSubStatus(null,
                        PROCESSING.name(), bulkOperation);

                // Call the processor able to handle operations of the given type
                processor.process(bulkOperation);

                UpdateSandboxRequest updateSandboxRequest =
                        new UpdateSandboxRequest(bulkOperation.getSandboxId(),
                                Strings.EMPTY,
                                Strings.EMPTY,
                                bulkOperation.getApplicationId(),
                                bulkOperation.getTenantId(),
                                bulkOperation.getCreatingUser(),
                                true);
                String idempotencyKey =
                        DigestUtils.md5Hex(updateSandboxRequest.toString()).toUpperCase();

                // Send the message to trigger a sandbox update
                sender.send(updateSandboxRequest, UpdateSandboxRequestProducer.TYPE,
                        idempotencyKey, CATALOG_ROUTE_KEY);
            }
        }
    }
}
Important
A more complex version of this component is included in CatalogServices out-of-box.

The BulkUpdateProcessor Interface

Microservices expecting to handle multiple types of Bulk Operations should include a BulkUpdateProcessor interface to be implemented per each operation type. The BulkOperationHandler described above will determine which implementation to use based on the type of the bulk operation to process.

public interface BulkUpdateProcessor {

    /**
     * Whether this processor can handle the requested bulk operation.
     */
    default boolean canHandle(String operationType, @Nullable String entityType) {
        return StringUtils.equalsAnyIgnoreCase(operationType, getOperationType());
    }

    /**
     * Processes the provided bulk operation.
     */
    void process(BulkOperation bulkOperation);

    /**
     * The operation type that this processor supports.
     */
    String getOperationType();
}
Important
A more complex version of this component is included in CatalogServices out-of-box.

Operation-Specific Components

The BulkUpdateProcessorUtil Class

This is a utility class to handle common operations in the bulk flow. These operations may vary depending on the context in which different entities may be targeted by a bulk update. For this reason, you may want to define separate utility classes for different update contexts.

In this example, this component will be responsible for creating the ContextInfo to pass along the Bulk Operations requests. Since we are updating entities within a sandboxable category, we will build a context request including a ContextChangeContainer with name CATEGORY.

@RequiredArgsConstructor
public class CategoryBulkUpdateProcessorUtil {

    @Nullable
    @Getter(AccessLevel.PROTECTED)
    private final DataRouteReference reference;

    @Getter(AccessLevel.PROTECTED)
    private final List<ContextRequestHydrator> contextRequestHydrators;

    @Getter(value = AccessLevel.PROTECTED)
    private final TypeFactory typeFactory;

    public ContextInfo buildBulkOperationContextInfo(BulkOperation bulkOperation,
            OperationType operationType) {
        ContextInfo processBulkOpContextInfo = new ContextInfo(operationType,
                buildBulkOperationContextRequest(bulkOperation),
                bulkOperation.getCreatingUser());

        // Disable filtering by active flag & dates since bulk ops is via an admin context
        processBulkOpContextInfo.setFilterByActiveDates(false);
        processBulkOpContextInfo.setFilterByActiveFlag(false);
        return processBulkOpContextInfo;
    }

    protected ContextRequest buildBulkOperationContextRequest(BulkOperation bulkOperation) {
        ContextRequest contextRequest = typeFactory.get(ContextRequest.class);
        contextRequest.setApplicationId(bulkOperation.getApplicationId());
        contextRequest.setCatalogId(bulkOperation.getCatalogId());
        contextRequest.setSandboxId(bulkOperation.getSandboxId());
        contextRequest.setTenantId(bulkOperation.getTenantId());

        ChangeContainer container = new ContextChangeContainer();
        // Since we are deleting category products, we want to target the "CATEGORY" container in order to properly sandbox our changes
        container.setName("CATEGORY");
        container.setRouteKey(DataRouteSupportUtil.getLookupKeyOnRoute().orElse(null));
        contextRequest.setChangeContainer(container);

        DataRouteSupportUtil.getBestMatchInRoute(
                contextRequestHydrators, reference, contextRequestHydrators.get(0))
                .hydrate(contextRequest);
        return contextRequest;
    }
}

The BulkUpdateProcessor Implementation

In order to handle different types of bulk updates, the BulkUpdateProcessor should have an implementation for each, for this example, we will introduce a CategoryProductDeleteBulkUpdateProcessor

@Slf4j
@RequiredArgsConstructor
public class DeleteCategoryProductBulkUpdateProcessor implements BulkUpdateProcessor {
    protected static final String OPERATION_TYPE = "DELETE_CATEGORY_PRODUCT";

    @Getter(value = AccessLevel.PROTECTED)
    private final CategoryBulkUpdateProcessorUtil processorUtil;

    @Getter(value = AccessLevel.PROTECTED)
    private final CategoryProductService<CategoryProduct> categoryProductService;

    @Getter(value = AccessLevel.PROTECTED)
    private final BulkUpdateProcessorHelper<BulkOperationItem> bulkUpdateProcessorHelper;

    @Override
    public String getOperationType() {
        return OPERATION_TYPE;
    }

    @Override
    public void process(BulkOperation bulkOperation) {
        bulkUpdateProcessorHelper.processBulkItemsInBatches(bulkOperation,
                this::processPageOfItems);
    }

    /**
     * For a page of bulk operation items, identifies the category products to delete from the bulk
     * operation data. Attempts to delete each of the category products, and updates the bulk
     * operation items to success or failure.
     */
    protected Page<BulkOperationItem> processPageOfItems(BulkOperation bulkOperation,
            Page<BulkOperationItem> bulkOperationItems) {

        // Create the context info using the designated CategoryBulkUpdateProcessorUtil
        ContextInfo contextInfo =
                processorUtil.buildBulkOperationContextInfo(bulkOperation, OperationType.DELETE);

        // Collect the IDs of the category products to process
        Map<String, BulkOperationItem> categoryProductIdsMap = bulkOperationItems.getContent()
                .stream()
                .collect(Collectors.toMap(BulkOperationItem::getEntityContextId,
                        Function.identity()));

        for (String categoryProductId : categoryProductIdsMap.keySet()) {
            try {
                BulkOperationContextProperties properties = bulkUpdateProcessorHelper
                        .getBulkOperationContextProperties(bulkOperation, OPERATION_TYPE);
                // Perform the delete operation on each category product
                categoryProductService.delete(categoryProductId, contextInfo);
                BulkOperationItem bulkOperationItem = categoryProductIdsMap.get(categoryProductId);
                // Update the state of the bulk operation item
                bulkOperationItem.setState(SUCCESS.name());
                return null;
            } catch (Exception e) {
                BulkOperationItem bulkOperationItem = categoryProductIdsMap.get(categoryProductId);
                // Update the state of the bulk operation item if errored.
                bulkOperationItem.setState(ERROR.name());
            }
        }
        return bulkOperationItems;
    }
}

Bulk Operations Metadata Extension

Metadata plays an important role in creating the bulk operation request from collection of selected entities in a component.

In this example, we will extend the category grid metadata to:

  • Enable the selection of multiple grid items

  • Add dropdown with a "Delete" operation targeting the selected items.

@Configuration
@RequiredArgsConstructor
@AutoConfigureAfter(CategoryMetadataAutoConfiguration.class)
public class MyCompanyCategoryMetadataAutoConfiguration {

    @Bean
    public ComponentSource addBulkOpsDeleteToCategoryProductGrid() {
        return registry -> {

            TreeView<?> categoryTree = (TreeView<?>) registry.get(CategoryIds.TREE);
            GridExternal<?> categoryProductGrid =
                    (GridExternal<?>) categoryTree.getComponent(CategoryIds.Forms.PRODUCTS).getComponent("category-products");

            categoryProductGrid
                    // Enable the selection of multiple grid items
                    .multipleSelectable()
                    // Add the Bulk Operations dropdown
                    .addGridActionGroup("Bulk Operations",
                            getCategoryProductBulkActions());
        };
    }

    private Action<?>[] getCategoryProductBulkActions() {
        DefaultBulkOperationAction removeProduct = Actions.bulkAction()
                //Define an action with a bulkOperationType matching that of our DeleteCategoryProductBulkUpdateProcessor
                .bulkOperationType("DELETE_CATEGORY_PRODUCT")
                .entityType("CATEGORY_PRODUCT")
                .generateFriendlyName()
                .label("Delete Product")
                .addSubmitEndpoint(Endpoints.post()
                        .uri(BulkOperationPaths.EXECUTE_BULK_OPERATION)
                        .scope(BulkOperationScopes.BULK_OPERATION));
        return new DefaultBulkOperationAction[] {
                removeProduct
        };
    }
}