The Bulk Operations orchestration service is responsible for receiving bulk operation requests coming from the Unified Admin, and for processing them in the related microservices with the use of durable messaging.
Tip
|
Catalog Services offers a comprehensive implementation of bulk operations. Visit this documentation for a high-level explanation of its components |
In this tutorial, we will go over the recommended implementation steps in the Catalog Service to delete products from a category in bulk. This example will touch on all the components needed to newly introduce the bulk operations flow to a microservice, please note that some of these components already exist in the Catalog Service out-of-box.
After following the implementation steps in this tutorial, the category products grid in the category view should allow selection of multiple products and display a "Bulk Operations" dropdown with the new "Delete Product" option.
The component needed to listen for messages in the processBulkOperationRequest
channel.
public interface CatalogBulkOperationConsumer {
String CHANNEL = "processBulkOperationRequestInput";
@Input(CHANNEL)
SubscribableChannel processBulkOperationRequestInput();
}
A binding for this channel should be declared in your configuration file:
spring:
cloud:
stream:
bindings:
processBulkOperationRequestInput:
group: catalog-process-bulk-operation
destination: processBulkOperationRequest
Important
|
This messaging configuration is included in CatalogServices out-of-box. |
Each microservice leveraging bulk operations functionality must have a BulkOperationHandler component to consume and process the Bulk Operation message emitted by the service. This component is also responsible for sending a message to trigger sandbox updates.
@Slf4j
@RequiredArgsConstructor
@DataRouteByKey(CATALOG_ROUTE_KEY)
public class CatalogBulkOperationHandler {
@Getter(AccessLevel.PROTECTED)
private final IdempotentMessageConsumptionService idempotentConsumptionService;
@Getter(AccessLevel.PROTECTED)
private final List<BulkUpdateProcessor> bulkUpdateProcessors;
@Getter(AccessLevel.PROTECTED)
private final BulkOperationService<BulkOperation> bulkOperationService;
@Getter(value = AccessLevel.PROTECTED)
private final DetachedDurableMessageSender sender;
@StreamListener(CatalogBulkOperationConsumer.CHANNEL)
public void listen(Message<BulkOpsProcessRequest> message) {
idempotentConsumptionService.consumeMessage(message,
CatalogBulkOperationHandler.class.getSimpleName(), this::processMessage);
}
protected void processMessage(@lombok.NonNull Message<BulkOpsProcessRequest> message) {
BulkOpsProcessRequest payload = message.getPayload();
log.info("Starter Bulk Operation {}:{}", payload.getOperationType(),
payload.getBulkOperationId());
for (BulkUpdateProcessor processor : bulkUpdateProcessors) {
if (processor.canHandle(payload.getOperationType(), payload.getEntityType())) {
// Read and update the bulk operation
BulkOperation bulkOperation =
bulkOperationService.readById(payload.getBulkOperationId());
bulkOperation = bulkOperationService.updateStatusAndSubStatus(null,
PROCESSING.name(), bulkOperation);
// Call the processor able to handle operations of the given type
processor.process(bulkOperation);
UpdateSandboxRequest updateSandboxRequest =
new UpdateSandboxRequest(bulkOperation.getSandboxId(),
Strings.EMPTY,
Strings.EMPTY,
bulkOperation.getApplicationId(),
bulkOperation.getTenantId(),
bulkOperation.getCreatingUser(),
true);
String idempotencyKey =
DigestUtils.md5Hex(updateSandboxRequest.toString()).toUpperCase();
// Send the message to trigger a sandbox update
sender.send(updateSandboxRequest, UpdateSandboxRequestProducer.TYPE,
idempotencyKey, CATALOG_ROUTE_KEY);
}
}
}
}
Important
|
A more complex version of this component is included in CatalogServices out-of-box. |
Microservices expecting to handle multiple types of Bulk Operations should include a BulkUpdateProcessor
interface to be implemented per each operation type. The BulkOperationHandler
described above will determine which implementation to use based on the type of the bulk operation to process.
public interface BulkUpdateProcessor {
/**
* Whether this processor can handle the requested bulk operation.
*/
default boolean canHandle(String operationType, @Nullable String entityType) {
return StringUtils.equalsAnyIgnoreCase(operationType, getOperationType());
}
/**
* Processes the provided bulk operation.
*/
void process(BulkOperation bulkOperation);
/**
* The operation type that this processor supports.
*/
String getOperationType();
}
Important
|
A more complex version of this component is included in CatalogServices out-of-box. |
This is a utility class to handle common operations in the bulk flow. These operations may vary depending on the context in which different entities may be targeted by a bulk update. For this reason, you may want to define separate utility classes for different update contexts.
In this example, this component will be responsible for creating the ContextInfo
to pass along the Bulk Operations requests. Since we are updating entities within a sandboxable category, we will build a context request including a ContextChangeContainer
with name CATEGORY
.
@RequiredArgsConstructor
public class CategoryBulkUpdateProcessorUtil {
@Nullable
@Getter(AccessLevel.PROTECTED)
private final DataRouteReference reference;
@Getter(AccessLevel.PROTECTED)
private final List<ContextRequestHydrator> contextRequestHydrators;
@Getter(value = AccessLevel.PROTECTED)
private final TypeFactory typeFactory;
public ContextInfo buildBulkOperationContextInfo(BulkOperation bulkOperation,
OperationType operationType) {
ContextInfo processBulkOpContextInfo = new ContextInfo(operationType,
buildBulkOperationContextRequest(bulkOperation),
bulkOperation.getCreatingUser());
// Disable filtering by active flag & dates since bulk ops is via an admin context
processBulkOpContextInfo.setFilterByActiveDates(false);
processBulkOpContextInfo.setFilterByActiveFlag(false);
return processBulkOpContextInfo;
}
protected ContextRequest buildBulkOperationContextRequest(BulkOperation bulkOperation) {
ContextRequest contextRequest = typeFactory.get(ContextRequest.class);
contextRequest.setApplicationId(bulkOperation.getApplicationId());
contextRequest.setCatalogId(bulkOperation.getCatalogId());
contextRequest.setSandboxId(bulkOperation.getSandboxId());
contextRequest.setTenantId(bulkOperation.getTenantId());
ChangeContainer container = new ContextChangeContainer();
// Since we are deleting category products, we want to target the "CATEGORY" container in order to properly sandbox our changes
container.setName("CATEGORY");
container.setRouteKey(DataRouteSupportUtil.getLookupKeyOnRoute().orElse(null));
contextRequest.setChangeContainer(container);
DataRouteSupportUtil.getBestMatchInRoute(
contextRequestHydrators, reference, contextRequestHydrators.get(0))
.hydrate(contextRequest);
return contextRequest;
}
}
In order to handle different types of bulk updates, the BulkUpdateProcessor should have an implementation for each, for this example, we will introduce a CategoryProductDeleteBulkUpdateProcessor
@Slf4j
@RequiredArgsConstructor
public class DeleteCategoryProductBulkUpdateProcessor implements BulkUpdateProcessor {
protected static final String OPERATION_TYPE = "DELETE_CATEGORY_PRODUCT";
@Getter(value = AccessLevel.PROTECTED)
private final CategoryBulkUpdateProcessorUtil processorUtil;
@Getter(value = AccessLevel.PROTECTED)
private final CategoryProductService<CategoryProduct> categoryProductService;
@Getter(value = AccessLevel.PROTECTED)
private final BulkUpdateProcessorHelper<BulkOperationItem> bulkUpdateProcessorHelper;
@Override
public String getOperationType() {
return OPERATION_TYPE;
}
@Override
public void process(BulkOperation bulkOperation) {
bulkUpdateProcessorHelper.processBulkItemsInBatches(bulkOperation,
this::processPageOfItems);
}
/**
* For a page of bulk operation items, identifies the category products to delete from the bulk
* operation data. Attempts to delete each of the category products, and updates the bulk
* operation items to success or failure.
*/
protected Page<BulkOperationItem> processPageOfItems(BulkOperation bulkOperation,
Page<BulkOperationItem> bulkOperationItems) {
// Create the context info using the designated CategoryBulkUpdateProcessorUtil
ContextInfo contextInfo =
processorUtil.buildBulkOperationContextInfo(bulkOperation, OperationType.DELETE);
// Collect the IDs of the category products to process
Map<String, BulkOperationItem> categoryProductIdsMap = bulkOperationItems.getContent()
.stream()
.collect(Collectors.toMap(BulkOperationItem::getEntityContextId,
Function.identity()));
for (String categoryProductId : categoryProductIdsMap.keySet()) {
try {
BulkOperationContextProperties properties = bulkUpdateProcessorHelper
.getBulkOperationContextProperties(bulkOperation, OPERATION_TYPE);
// Perform the delete operation on each category product
categoryProductService.delete(categoryProductId, contextInfo);
BulkOperationItem bulkOperationItem = categoryProductIdsMap.get(categoryProductId);
// Update the state of the bulk operation item
bulkOperationItem.setState(SUCCESS.name());
return null;
} catch (Exception e) {
BulkOperationItem bulkOperationItem = categoryProductIdsMap.get(categoryProductId);
// Update the state of the bulk operation item if errored.
bulkOperationItem.setState(ERROR.name());
}
}
return bulkOperationItems;
}
}
Metadata plays an important role in creating the bulk operation request from collection of selected entities in a component.
In this example, we will extend the category grid metadata to:
Enable the selection of multiple grid items
Add dropdown with a "Delete" operation targeting the selected items.
@Configuration
@RequiredArgsConstructor
@AutoConfigureAfter(CategoryMetadataAutoConfiguration.class)
public class MyCompanyCategoryMetadataAutoConfiguration {
@Bean
public ComponentSource addBulkOpsDeleteToCategoryProductGrid() {
return registry -> {
TreeView<?> categoryTree = (TreeView<?>) registry.get(CategoryIds.TREE);
GridExternal<?> categoryProductGrid =
(GridExternal<?>) categoryTree.getComponent(CategoryIds.Forms.PRODUCTS).getComponent("category-products");
categoryProductGrid
// Enable the selection of multiple grid items
.multipleSelectable()
// Add the Bulk Operations dropdown
.addGridActionGroup("Bulk Operations",
getCategoryProductBulkActions());
};
}
private Action<?>[] getCategoryProductBulkActions() {
DefaultBulkOperationAction removeProduct = Actions.bulkAction()
//Define an action with a bulkOperationType matching that of our DeleteCategoryProductBulkUpdateProcessor
.bulkOperationType("DELETE_CATEGORY_PRODUCT")
.entityType("CATEGORY_PRODUCT")
.generateFriendlyName()
.label("Delete Product")
.addSubmitEndpoint(Endpoints.post()
.uri(BulkOperationPaths.EXECUTE_BULK_OPERATION)
.scope(BulkOperationScopes.BULK_OPERATION));
return new DefaultBulkOperationAction[] {
removeProduct
};
}
}