Broadleaf Microservices
  • v1.0.0-latest-prod

Messaging

Messaging in this context relates to asynchronous communication between the distributed components of the application. Messages are constructed by a component of the application and posted to a message broker. Other components are configured to listen for messages on the channel to which the message was posted and possibly consume the message in order to continue the lifecycle for the given flow.

Messaging Library Javadocs

Messaging Support

Broadleaf leverages Spring Cloud Stream as an abstraction layer for interacting with messages and brokers. Spring Cloud Stream supports a number of broker implementations, including popular support for Kafka, Amazon Kinesis, Google PubSub, and Azure Event Hubs. Broadleaf framework code is written against the Spring Cloud Stream API and is designed to support and interact with the different backing implementations supported by Spring Cloud Stream.

Characteristics

The characteristics of messaging in Broadleaf are fairly consistent with industry norms regarding message behavior in distributed architectures. This includes no guarantees messages are consumed in the order they were sent. This also includes an at-least-once guarantee of message delivery, but no guarantee that a duplicate message will not be received. For these reasons, it is important to consider additional measures on the consumption side regarding message idempotency to prevent the occasional irregularities mentioned from causing incorrect state in the application.

Consumption

Broadleaf tends to employ messages in 2 primary categories:

  1. Data Synchronization

  2. State Change Notification

Data Synchronization involves communicating the fairly comprehensive state of an entity for the purpose of replication at another persistence location (e.g. another database or schema). This is a common operation for cross-cutting-concern concepts like those employed by the Sandbox and Tenant services. Broadleaf also creates and utilizes the Persistence channel as a generic vehicle for communicating entity state upon change to the system at large, and other components in your enterprise stack can integrate at this layer to receive and synchronize data.

State Change Notification is more targeted than Data Synchronization and is used to communicate a specific change that can be reacted to. For example, a scheduled job can be triggered. Most commonly, components are configured to consume this type of message when they contain data in their own bounded context that is related by some factor. For example, the catalog service will react to a promoted message from the sandbox service when that message is tied to a Product (which is a catalog service maintained concept).

For Data Synchonization, it is generally most useful to include a temporal factor in the domain design to account for a timestamp denoting when the change took place. Consumers of a message can compare the reported timestamp against the timestamp of the currently persisted state, and only accept the incoming change if it’s newer. This addresses the lack of an ordered processing guarantee that this type of scenario is sensitive to.

For State Change Notification, it is generally important to not consume duplicate messages. Employing an idempotent receiver pattern is one way to handle this scenario. The message identifies a unique id, and a consumption log is checked to confirm a message with that id has not already been consumed previously. Broadleaf employs this pattern using com.broadleafcommerce.common.messaging.service.IdempotentMessageConsumptionService at an early phase in message consumption. Your custom consumer code (if applicable) may also inject and leverage IdempotentMessageConsumptionService, as long as the message contains a header with a key of MESSAGE_IDEMPOTENCY_KEY, and a unique id value.

Note
Native broker integrations may also provide more advanced features than what are supported by Spring Cloud Stream. While Broadleaf focuses on the Spring Cloud Stream API, your unique implementation may leverage whatever messaging approach is desired (including more direct broker integration) for new communication flows you create outside of what’s defined in the framework.

Durable Send

Broadleaf builds in a resiliency component for additional message sending guarantees. When a state change is persisted, Broadleaf can also atomically persists an unacknowledged NotificationState. Broadleaf will attempt to immediately send the message to the configured broker via the Spring Cloud Stream API. However, should that fail, Broadleaf employs a RetryHandler instance per supporting Spring Data Repository to look for unacknowledged notification states, and for any found, attempt to send the message again using a backoff algorithm. Using this approach, Broadleaf can handle message broker outages and will pick up where it left off when a failing broker comes back online.

Note
RetryHandler instances generally utilize a cluster singleton pattern via org.apache.camel.cluster.CamelClusterService. CamelClusterService supports a number of approaches for establishing a leader, including jgroups, zookeeper, and kubernetes. Using this approach, Broadleaf guarantees multiple RetryHandler instances are not running at the same time, while also providing HA should a leader node fail.

Examples

Integration With Data Synchronization

Figure 1.1 : PersistenceHandler Example

public class SegmentMemberPersistenceHandler implements PersistenceHandler {                    # (1)

    @Getter(AccessLevel.PROTECTED)
    private final UserService<User> userService;
    private final ObjectMapper objectMapper;

    @Override
    public ObjectMapper getObjectMapper() {
        return objectMapper;
    }

    @Override
    public void processStateChange(JsonNode entityJson) {                                       # (2)
        String userId = getUserId(entityJson);
        String segmentId = getSegmentId(entityJson);
        boolean isDelete = isDeleteOperation(entityJson);
        User user = getUser(userId, entityJson);
        Set<String> segments = getCustomerSegments(user);
        if (isDelete) {
            segments.remove(segmentId);
            log.debug("Removed segment {} from user {}", segmentId, user);
        } else {
            segments.add(segmentId);
            log.debug("Added segment {} to user {}", segmentId, user);
        }

        Instant timestamp = getChangeTimestamp(entityJson);                                     # (3)
        if (user.getLastUpdated().isBefore(timestamp)) {
            userService.replace(user.getId(), user);
        } else {
            log.debug(String.format(
                    "Found an existing user with service id %s and lastUpdated %s which is newer than the change, so will not update",
                    user.getServiceId(),
                    user.getLastUpdated()));
        }
    }

    @StreamListener(SegmentMemberPersistenceConsumer.CHANNEL)                                   # (4)
    @Override
    public void hook(String entityJson) {
        handle(entityJson);
    }

    ...

    protected Instant getChangeTimestamp(JsonNode persistenceMessage) {
        JsonNode changeTimestamp = Optional.ofNullable(persistenceMessage.get("timestamp"))
                .filter(node -> !isNodeNull(node))
                .orElseThrow(this::missingTimestampError);
        try {
            Instant result = getObjectMapper().readValue(changeTimestamp.toString(), Instant.class);
            if (result == null) {
                throw missingTimestampError();
            } else {
                return result;
            }
        } catch (IOException e) {
            throw new IllegalArgumentException(
                    "Problem processing timestamp field on a persistence state change message");
        }
    }

    private IllegalArgumentException missingTimestampError() {
        return new IllegalArgumentException(
                "No timestamp found in persistence message, aborting");
    }

    ...
}
  1. This is an example Spring Bean that extends from the PersistenceHandler interface. The PersistenceHandler interface performs some minor conversion duties and establishes the standard contract for working with messages in the persistence channel.

  2. #processStateChange is the main entry point and is where the custom logic for this consumption case is performed.

  3. #getChangeTimestamp acquires the timestamp from the incoming message. It is at this point that the incoming timestamp and currently persisted timestamp can be compared for consumption validity.

  4. @StreamListener binds this PersistenceHandler method to the Spring Cloud Stream lifecycle.

Figure 1.2 : Consumer Binding Example

public interface SegmentMemberPersistenceConsumer {                                             # (1)

    String CHANNEL = "persistenceInputSegmentMember";

    @Input(CHANNEL)                                                                             # (2)
    SubscribableChannel persistenceInputSegmentMember();
}
  1. This is a simple interface used to describe the Spring Cloud Stream bound component.

  2. Use @Input annotation to identify the channel for the incoming message.

Figure 1.3 : Spring Config Example

@Configuration
@EnableBinding({AdminPermissionPersistenceConsumer.class, AdminUserPersistenceConsumer.class,
        AuthCustomerPersistenceConsumer.class, SegmentMemberPersistenceConsumer.class,          # (1)
        AdminRolePersistenceConsumer.class})
public class AuthDataUpdatePersistenceHandlerAutoConfiguration {

    ...

    @Bean
    public SegmentMemberPersistenceHandler segmentMemberPersistenceHandler(
            UserService<User> userService,
            ObjectMapper objectMapper) {
        return new SegmentMemberPersistenceHandler(userService, objectMapper);                  # (2)
    }
}
  1. SegmentMemberPersistenceConsumer is identified to Spring via the @EnableBinding annotation.

  2. SegmentMemberPersistenceHandler is instantiated as a Spring Bean in this Spring Configuration class.

Figure 1.4 : Environment Config Example

spring:
  cloud:
    stream:
      bindings:
        persistenceInputSegmentMember:
          group: auth-segment-member                                                            # (1)
          destination: persistence                                                              # (2)
  1. In the property file for a service (in this case a yaml file), identify the group name. This is needed when a service has multiple replicas for HA and horizontal scale. In such a case, you do not want the same message consumed by all replicas. Naming a group prevents this and forces only one replica in the group to consume the message.

  2. Specify the name of the destination channel. This should be the same channel on which the message was originally sent.

Integration With State Change Notification

Figure 2.1 : Listener Example

@DataRouteByKey(INVENTORY_ROUTE_KEY)                                                            # (1)
public class FreeSoftInventoryScheduledJobListener {

    protected final static String JOB_TYPE = "FREE_SOFT_INVENTORY";
    protected final static String DETAIL_MIN_DURATION = "minReservationDuration";
    protected final static Duration FALLBACK_MIN_DURATION = Duration.of(5, ChronoUnit.MINUTES);

    @Getter(AccessLevel.PROTECTED)
    private final IdempotentMessageConsumptionService idempotentService;

    @Getter(AccessLevel.PROTECTED)
    private final FreeSoftInventoryJobService freeSoftInventoryJobService;

    @StreamListener(FreeSoftInventoryTriggeredJobEventConsumer.CHANNEL)                         # (2)
    public void listen(Message<ScheduledJobRef> message) {
        if (!JOB_TYPE.equals(message.getPayload().getType())) {
            log.trace("'{}' is not a '{}' job type, ignoring", message.getPayload().getType(),
                    JOB_TYPE);
            return;
        }

        idempotentService.consumeMessage(message,                                               # (3)
                FreeSoftInventoryScheduledJobListener.class.getName(),
                msg -> {
                    doListen(msg.getPayload());
                });
    }

    protected void doListen(ScheduledJobRef jobRef) {
        log.debug("Received scheduled job trigger: {}", jobRef);

        Duration minReservationDuration = findMinReservationDuration(jobRef)
                .orElse(FALLBACK_MIN_DURATION);

        freeSoftInventoryJobService.freeSoftInventory(minReservationDuration, null);
    }

    ...
}
  1. If using Data Routing, to ensure that the IdempotentMessageConsumptionService targets the correct database for obtaining & releasing locks, the data route must be specified using @DataRouteByKey(…​), @DataRouteByExample(…​), or DataRouteSupportUtil.processOnRoute(…​) within the listener

  2. @StreamListener binds this PersistenceHandler method to the Spring Cloud Stream lifecycle.

  3. IdempotentMessageConsumptionService qualifies the message as not a duplicate.

    1. Behind the scenes, the MESSAGE_IDEMPOTENCY_KEY is gathered from the message headers, and this value is used in combination with the listener’s name to produce a unique BLC_RESOURCE_LOCK record. Since there can be multiple listeners to the same message channel, including the listener name as part of the lock ensures that separate listeners don’t lock each other out of consuming the same message.

    2. If a lock cannot be obtained (i.e. the message is already locked), then the message is simply ignored by the listener

    3. If an exception occurs within MyMessageListener#processMessage(…​), then the lock will be removed & the exception is allowed to trickle up to SpringCloudStream, where retry mechanisms will be engaged

Figure 2.2 : Consumer Binding Example

public interface FreeSoftInventoryTriggeredJobEventConsumer {                                   # (1)

    String CHANNEL = "triggeredJobEventInputFreeSoftInventory";

    @Input(CHANNEL)                                                                             # (2)
    SubscribableChannel triggeredJobEventInputFreeSoftInventory();
}
  1. This is a simple interface used to describe the Spring Cloud Stream bound component.

  2. Use @Input annotation to identify the channel for the incoming message.

Figure 2.3 : Environment Config Example

spring:
  cloud:
    stream:
      bindings:
        triggeredJobEventInputFreeSoftInventory:
          group: inventory-free-soft-inventory                                                  # (1)
          destination: triggeredJobEvent                                                        # (2)
  1. In the property file for a service (in this case a yaml file), identify the group name. This is needed when a service has multiple replicas for HA and horizontal scale. In such a case, you do not want the same message consumed by all replicas. Naming a group prevents this and forces only one replica in the group to consume the message.

  2. Specify the name of the destination channel. This should be the same channel on which the message was originally sent.

Purging Old Message Locks

The BLC_RESOURCE_LOCK records used to lock a message are not given an expiration date - i.e. the locks are held indefinitely. To avoid an excessive buildup of BLC_RESOURCE_LOCK records, there is a Purge Message Locks ScheduledJob that will attempt to delete old message locks. This job is working under the assumption that given enough time, those messages will no longer be active, and therefore their locks no longer serve a purpose.

Note
the message lock’s time to live is configurable by the messageLockTTLInSeconds detail (BLC_SCHEDULED_JOB_DETAIL) related to the job.

By default, this deletes the locks with JpaResourceLock#getTypeAlias() equal to Message, Cart and Order. If there is any other lock type that has to be deleted, it can be specified via the messageLockTypesToPurge detail. The value should be the comma-separated lock types.

Other Data Routing Considerations

To introduce idempotent message consumption to a listener, the database schema must first be prepared for the relevant data route. This only applies if Data Routing is enabled for a new services not already configured for messaging.

Regardless of whether you’re using @JpaDataRoute or @MongoDataRoute, you’ll need to ensure that MessagingDataRouteSupporting.class is included in the supportingRouteTypes collection. Note: If your data route supports Data Tracking, then MessagingDataRouteSupporting.class should already be listed.

From there, if you’re using a JPA database provider, then you’ll need to update the schema to include the BLC_RESOURCE_LOCK table (if it’s not already there). Behind the scenes, this table will be used to hold locks on specific messages, not allowing them to be consumed twice.

Durable Send

Trackable

Trackable domain has built-in support for NotificationStates to hold send acknowledgments. In the same vein, TrackableRepository derives from NotificationStateRepository and contains the persistence behavior necessary to manage send acknowledgement state. Those concepts won’t be covered in detail here, but they represent the basic support required to marry persistence of entity state change with notification state. See Sandboxing In Detail and Repository for more information on Trackable domain design and Trackable repository design, respectively.

Broadleaf provides convenience concepts, such as CrudEntityHelper, to assist with simplification of boilerplate code required to handle mapping of projection to persistence state during standard CRUD request lifecycles. Adding support for new messages generally involves customizing the base CrudEntityHelper to include support for the new message type.

Figure 3.1 : Custom CrudEntityHelper Example

public class VendorCrudEntityHelper extends CrudEntityHelper {                                  # (1)

    public VendorCrudEntityHelper(DomainMapperManager mapper,
            List<SortTransformer> sortTransformers,
            EntityValidatorManager validator,
            TrackableBehaviorUtil trackableBehaviorUtil) {
        super(mapper, sortTransformers, validator, trackableBehaviorUtil);
    }

    @Override
    public void notify(Trackable domain,
            ContextInfo context,
            @NonNull NotificationStateRepository repository) {
        super.notify(domain, context, repository);
        NotificationManager notificationManager = getNotificationManager();                     # (2)
        if (notificationManager != null) {
            notificationManager
                    .handle(repository, domain, VendorNotificationEventProducer.TYPE);          # (3)
        }
    }
}
  1. Create a customization of CrudEntityHelper to include the additional support

  2. Retrieve the NotificationManager instance from the superclass.

  3. NotificationManager exposes a simple API for notifying the system of a state change event on the given Trackable entity via the requested channel. The actual state change is not described here. Rather, it will be harvested from the notification states included in the trackable information on the entity.

Figure 3.2 : Producer Binding Example

public interface VendorNotificationEventProducer {                                              # (1)

    String TYPE = "VENDOR_NOTIFICATION_REQUEST";
    String CHANNEL = "vendorNotificationOutput";

    @Output(CHANNEL)                                                                            # (2)
    MessageChannel vendorNotificationOutput();
}
  1. This is a simple interface used to describe the Spring Cloud Stream bound component.

  2. Use @Output annotation to identify the channel for the outgoing message.

Figure 3.3 : DomainMapperMember Example

public class VendorNotificationStateMapperMember
    extends NotificationStateInitializingDomainMapperMember {                                    # (1)

    public VendorNotificationStateMapperMember(TypeFactory typeFactory) {
        super(typeFactory);
    }

    @Override
    protected boolean isQualified(Object entity, ContextInfo contextInfo) {                      # (2)
        return entity instanceof JpaVendor;
    }

    @Override
    protected String messageType() {                                                            # (3)
        return VendorNotificationEventProducer.TYPE;
    }

    @Override
    public String getDataRoutePartition() {                                                     # (4)
        return VENDOR_ROUTE_PACKAGE;
    }

    @Override
    protected boolean applyOnCreate() {                                                         # (5)
        return true;
    }

    @Override
    protected boolean applyOnUpdate() {
        return true;
    }

    @Override
    protected boolean applyOnDelete() {
        return true;
    }
}
  1. DomainMapperMember instances participate in the mapping pipeline for conversion to/from projection to persistence domain. This special, "initializing" DomainMapperMember is used to setup required support for NotificationState upon entity creation.

  2. Generally, the NotificationState initialization should only take place for a specific entity type. The domain mapper pipeline is exposed to a number of different entities during CRUD flows, and the isQualified method provides the opportunity to detect which of those entities should receive attention.

  3. An identifier string unique to the particular type of message being sent

  4. #getDataRoutePartition is a special support concept for DomainMapperMember instances. It defines which route is required for this DomainMapperMember to be included in a given pipeline. Data routing is an advanced concept that supports collapsing multiple microservices under a single Spring runtime. See Data Routing for more information.

  5. Generally (for completeness), NotificationState initialization should be attempted in all CRUD flows. However, when more granular control over which flows should engage initialization is required, the applyOn** methods provide this level of influence.

Figure 3.4 : Notification Config Example

@ConfigurationProperties("broadleaf.vendor.notification")
@Data
public class VendorNotificationProperties
        implements DurableNotificationProperties {                                              # (1)

    @NestedConfigurationProperty
    private NotificationProperties notification = new NotificationProperties();

    @NestedConfigurationProperty
    private RetryProperties retry =
            new RetryProperties("notification-vendor");                                         # (2)

}
  1. Create a DurableNotificationProperties instance to cover the configuration requirements regarding circuit breaker and retry behavior.

  2. Specify the namespace in which the cluster singleton service is controlled when creating the RetryProperties instance.

Figure 3.5 : Spring Config Example

@Configuration
@RequiredArgsConstructor
@EnableBinding({VendorNotificationEventProducer.class})                                          # (1)
@EnableConfigurationProperties(VendorNotificationProperties.class)
public class VendorServiceAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public VendorService<Vendor> vendorService(VendorRepository<Trackable> vendorRepository,
            TypeFactory typeFactory,
            DomainMapperManager mapperManager,
            @Autowired(required = false) List<SortTransformer> sortTransformers,
            EntityValidatorManager entityValidatorManager,
            TrackableBehaviorUtil behaviorUtil,
            @Autowired(required = false) List<RsqlQueryTransformer> rsqlQueryTransformers,
            @Nullable NotificationManager notificationManager) {
        VendorCrudEntityHelper crudEntityHelper = new VendorCrudEntityHelper(mapperManager,     # (2)
                sortTransformers,
                entityValidatorManager,
                behaviorUtil);
        crudEntityHelper.setNotificationManager(notificationManager);
        RsqlCrudEntityHelper helper =
                new RsqlCrudEntityHelper(crudEntityHelper, rsqlQueryTransformers);
        return new DefaultVendorService<>(vendorRepository,
                typeFactory,
                helper);
    }

    @Bean
    @ConditionalOnMissingBean(name = "vendorNotificationHandler")
    public NotificationHandler vendorNotificationHandler(
            VendorNotificationEventProducer producer,
            VendorNotificationProperties properties,
            List<IgnoredNotificationStateRepository> ignoredRepositories,
            PersistenceMessageFactory messageFactory,
            MessageSerializationHelper helper) {
        return new DefaultNotificationHandler(producer::vendorNotificationOutput,                 # (3)
                properties,
                VendorNotificationEventProducer.TYPE,
                ignoredRepositories,
                messageFactory,
                helper);
    }

    @Bean
    @ConditionalOnMissingBean(name = "vendorNotificationRetryClusterService")
    public RetryClusterService vendorNotificationRetryClusterService(
            CamelClusterService camelClusterService,
            VendorNotificationProperties properties,
            VendorRepository<?> repository,
            @Qualifier("vendorNotificationHandler") NotificationHandler handler,
            List<IgnoredNotificationStateRepository> ignoredRepositories,
            DataRouteReference reference)
            throws Exception {
        return RetryServiceFactory.create(camelClusterService, properties,                      # (4)
                Collections.singletonList(repository), handler,
                VendorNotificationEventProducer.TYPE,
                ignoredRepositories, reference);
    }
}

@Configuration
@RequiredArgsConstructor
public class VendorJpaAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean(name = "vendorNotificationStateDomainMapperMember")
    public DomainMapperMember vendorNotificationStateDomainMapperMember(
            NotificationStateService notificationStateService) {
        return new VendorNotificationStateMapperMember(notificationStateService);                 # (5)
    }

}
  1. Use @EnableBinding to bind Spring Cloud Stream to the vendorNotificationOutput producer channel.

  2. Instantiate the customized entity helper instance and build the projection service bean instance.

  3. Build a new NotificationHandler instance to handle message construction (via the messageFactory param), and subsequent sending on the producer. Note, you may inject a custom com.broadleafcommerce.common.messaging.notification.MessageFactory instance for custom message creation.

  4. Establish a RetryClusterService instance for this case that will be responsible for message send resiliency. This is using CamelClusterService to guarantee only one instance of the RetryClusterService is running across multiple replicas at a given time in a given namespace.

  5. Instantiate the DomainMapperMember instance.

Figure 3.6 : Environment Config Example

spring:
  cloud:
    stream:
      bindings:
        vendorNotificationOutput:
          destination: vendorNotification                                                       # (1)
  1. Specify the name of the destination channel. Consumers of this message will listen to the same channel.

Non-Trackable

It is possible that an entity is not Trackable, but it is still desirable to notify the system regarding any state change. The additional requirements here primarily revolve around creating a repository mixin that is appropriately recognized by the system as a NotificationStateRepository. Spring Data’s flexible repository fragment approach helps here.

Figure 4.1 : Non-Trackable Domain Class Example

@Entity
@Table(name = "EXAMPLE")
@Inheritance(strategy = InheritanceType.JOINED)
@EntityListeners(MappableNotificationStateListener.class)                                       # (1)
@Data
public class JpaExample
        implements Identifiable, Serializable, ModelMapperMappable, BusinessTypeAware,
        NotificationStateAware {                                                                # (2)

    private static final long serialVersionUID = 1L;

    @Id
    @GeneratedValue(generator = "blcid")
    @GenericGenerator(name = "blcid", strategy = "blcid")
    @Type(type = "com.broadleafcommerce.data.tracking.jpa.hibernate.ULidType")
    @Column(name = "ID", nullable = false, length = CONTEXT_ID_LENGTH)
    private String id;

    ...

    @JsonIgnore
    @Transient
    private List<? super JpaNotificationState> notificationStates = null;

    @Override
    public void setNotificationStates(List<NotificationState> notificationStates) {
        this.notificationStates = notificationStates;
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<NotificationState> getNotificationStates() {                                    # (3)
        this.notificationStates = JpaNotificationStateUtility.get(this.notificationStates, this);
        return (List<NotificationState>) this.notificationStates;
    }

    ...
}
  1. Add the MappableNotificationStateListener entity listener to groom information related to notification state during persistence.

  2. Implement Identifiable and NotificationStateAware to allow the system to harvest notification state information from the entity.

  3. The notification state values are not persisted directly in this entity. Rather, use JpaNotificationStateUtility to delegate fetch of notification states on demand.

Figure 4.2 : Non-Trackable Repository Interface Example

public interface JpaExampleRepository<D extends Identifiable>
        extends PagingAndSortingRepository<D, String>,                                          # (1)
        CustomizedExampleRepository<D>, NotificationStateRepository { }                          # (2)
  1. This is a standard Spring Data repository interface definition. Notice it extends from Spring’s PagingAndSortingRepository, so this is not a Broadleaf Trackable repository.

  2. NotificationStateRepository is also specified here as a superclass to fulfill the repository type requirement for notifications.

Figure 4.3 : Non-Trackable Repository Fragment Implementation Example

public class JpaCustomizedExampleRepository<D extends JpaExample>
        implements CustomizedExampleRepository<D>, NotificationStateRepositoryFragment {         # (1)

    @NonNull
    @Getter(AccessLevel.PROTECTED)
    private final NotificationStateRepositoryFragment notificationFragment;                        # (2)

    @PersistenceContext
    @Getter(AccessLevel.PROTECTED)
    private EntityManager entityManager;

    @Transactional
    @SuppressWarnings("unchecked")
    public Object save(Object entity) {
        D result = (D) entity;
        if (StringUtils.isEmpty(result.getId())) {
            entityManager.persist(entity);
            JpaNotificationStateService.get().saveFrom((D) entity, entityManager);               # (3)
        } else {
            JpaNotificationStateService.get().saveFrom((D) entity, entityManager);
            result = entityManager.merge((D) entity);
        }
        return result;
    }

    @Transactional
    public List<Object> saveAll(Iterable<?> entities) {
        Assert.notNull(entities, "The given Iterable of entities not be null!");
        List<Object> result = new ArrayList<>();
        for (Object entity : entities) {
            result.add(save(entity));
        }
        return result;
    }

    @Override
    @SuppressWarnings("unchecked")
    public Class<D> getDomainType() {
        return (Class<D>) GenericsUtils.getParameterType(getClass(),
                CustomizedBulkUpdateRepository.class, 0);
    }

    @Override
    public boolean setNotificationAcknowledged(@NonNull Object nativeId,                        # (4)
            @NonNull String messageType,
            int attemptCount,
            @NonNull Class<?> entityType) {
        return notificationFragment.setNotificationAcknowledged(nativeId, messageType, attemptCount,
                entityType);
    }

    @Override
    public boolean setFailedNotificationAttempt(@NonNull Object nativeId,
            @NonNull String messageType,
            int attemptCount,
            @NonNull Instant nextAttempt,
            @NonNull Class<?> entityType,
            boolean stopped) {
        return notificationFragment.setFailedNotificationAttempt(nativeId, messageType,
                attemptCount, nextAttempt, entityType, stopped);
    }

    @Override
    @NonNull
    public Stream<NotificationStateAware> findNotificationReadyMembers(
            Object lastProcessedNativeId,
            int pageSize,
            @NonNull String messageType,
            @NonNull Duration faultThreshold,
            @NonNull Class<?> entityType) {
        return notificationFragment.findNotificationReadyMembers(lastProcessedNativeId, pageSize,
                messageType, faultThreshold, entityType);
    }

    ...
}
  1. Create a repository fragment implementation that implements the required interfaces - especially NotificationStateRepositoryFragment.

  2. Inject an instance of NotificationStateRepositoryFragment to which NotificationStateRepository API calls will be delegated.

  3. Synchronize any instances compiled in the entity’s notification states list.

  4. Delegate any NotificationStateRepository API calls to the NotificationStateRepositoryFragment instance.

Figure 4.4. : Non-Trackable Spring Config Example

@EnableJpaRepositories(basePackageClasses = JpaExampleRepository.class)
@Configuration
static class NonTrackableConfig {

    @Bean
    @ConditionalOnMissingBean
    protected CustomizedExampleRepository<JpaExample> jpaExampleRepositoryImpl(                 # (1)
            NotificationStateValueUpdater notificationFragment) {
        return new JpaCustomizedExampleRepository<>(notificationFragment);
    }

}
  1. Use vanilla Spring Data repository fragment creation. The bean id is important here and ends with the Impl keyword, which Spring data will include as the behind-the-scenes implementation when it instantiates the JpaExampleRepository instance.

Note
Now that that domain and repository are decorated with the appropriate support for notification states, the remainder of the setup for durable send is similar to the original Trackable case, with the exception of the entity helper. Non-trackable domain leverages the MappableCrudEntityHelper concept instead. MappableCrudEntityHelper does not have a notify method to override, but you can still override and enhance create, update, replace, and delete methods directly to add NotificationManager#handle support.

Non-Trackable With Data Routing

If Data Routing is used in conjunction with the datasource supporting this non-trackable repository, it will be necessary to register the repository with the data routing architecture. This will facilitate the usage of the correct route during message send retry.

Figure 5.1 : Non-Trackable Repository Route Supplier Example

@Configuration
@ConditionalOnProperty(value = "broadleaf.common.data.route.enabled", matchIfMissing = true)
static class WithDataRouting {

    @Bean
    public RepositoryDataRouteSupplier<DataRouteSupporting> exampleRepositoryRouteSupplier(
            @Qualifier("exampleSource") DataRouteSupporting route) {
        return () -> new RepositoryDataRouteSupplier.RepositoryMapping<>(
                CustomizedExampleRepository.class,
                Collections.singletonList(route)).map(DataRouteSupporting.class);
    }

}

Simplified Durable Send (Since Release Train 1.8.1, 1.7.7)

Configuring durable send is simplied through the use of the @DurableProducer annotation. The annotation minimizes declaration and removes much of the boiler-plate bean creation mentioned above. If we revisit figure 3.5 with the simplified configuration with the use of @DurableProducer, we are left with this:

Figure 6.1 : DurableProducer Spring Config Example

@Configuration
@RequiredArgsConstructor
public class VendorServiceAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public VendorService<Vendor> vendorService(VendorRepository<Trackable> vendorRepository,
            TypeFactory typeFactory,
            DomainMapperManager mapperManager,
            @Autowired(required = false) List<SortTransformer> sortTransformers,
            EntityValidatorManager entityValidatorManager,
            TrackableBehaviorUtil behaviorUtil,
            @Autowired(required = false) List<RsqlQueryTransformer> rsqlQueryTransformers,
            @Nullable NotificationManager notificationManager) {
        VendorCrudEntityHelper crudEntityHelper = new VendorCrudEntityHelper(mapperManager,     # (1)
                sortTransformers,
                entityValidatorManager,
                behaviorUtil);
        crudEntityHelper.setNotificationManager(notificationManager);
        RsqlCrudEntityHelper helper =
                new RsqlCrudEntityHelper(crudEntityHelper, rsqlQueryTransformers);
        return new DefaultVendorService<>(vendorRepository,
                typeFactory,
                helper);
    }

    ...
}

@Configuration
@RequiredArgsConstructor
@DurableProducer(output = VendorNotificationEventProducer.class,
    name = VendorNotificationEventProducer.TYPE,
    messageFactory = "persistenceMessageFactory",
    configurationPrefix = "broadleaf.vendor.notification"
    entityType = JpaVendor.class)                                                               # (2)
public class VendorJpaAutoConfiguration {
    ...
}
  1. Instantiate the customized entity helper instance and build the projection service bean instance.

  2. Establish the @DurableProducer annotation on the Configuration class

This annotation allows you to exclude the following considerations:

  • You don’t need to create a @ConfigurationProperties class for the durable notification config, or include a bean for it.

  • You don’t need to create a NotificationHandler bean.

  • You don’t need to create a RetryClusterService bean.

  • You don’t need to create an initializing DomainMapperMember bean.

Note
The initializing DomainMapperMember instance is interesting as an approach to separate the NotificationState initialization from the standard entity persistence and can save on customization effort if a customized persistence flow is not otherwise required for the entity type in question. However, if a custom persistence flows is in play, you may forgo a mapper member in favor of initializing NotificationState during creation of the entity in the data store. Either approach is valid and both result in the NotificationState being initialized at the time of entity persist. If you wish to let the system generate an initializing DomainMapperMember for you (instead of creating your own), then assign the entityType parameter on the DurableProducer annotation.

Detached Durable Send (Since Release Train 1.8.1, 1.7.7)

The capability to produce a durable message without an explicit backing @NotificationStateAware entity is supported. The same setup is used (either verbose, or the newer @DurableProducer method) to establish the plumbing for durable message sending. However, instead of using the NotificationManager.handle(..) approach for engaging the send flow, the analogous DetachedMessageSender singleton bean is leveraged using its send(..) method. When sending using this approach, NotificationState instances are still established in the datastore, but there is no associated NotificationStateAware entity. While the standard durable messaging approach described previously is appropriate for most cases, in flows where there is no entity whose state change is tied to the message, this detached pattern is prescribed.

Note
Detached messages do not use a message factory (there is no entity from which to derive a message). As such, the fully actualized message is required at the time of send. Furthermore, no NotificationState initialization is required, and therefore an initializing DomainMapperMember (or the like) is not used.

Figure 7.1 : Detached Spring Config Example

@Configuration
@DurableProducer(output = MyProducer.class, name = "MyMessageType",
            configurationPrefix = "my.prefix")                                                     # (1)
public class MyConfiguration {
 ...
}
  1. Establish the @DurableProducer annotation on a Configuration class

Figure 7.2 : Detached Execution Example Snippet

...
@Autowired
private DetachedMessageSender sender;                                                           # (1)
...
MyMessagePojo message = new MyMessagePojo();                                                    # (2)
String messageId = generateIdempotentId(message);                                               # (3)
sender.send(message, "MyMessageType", messageId, "myRoute");
  1. Inject the DetachedMessageSender singleton into your business component.

  2. Construct a pojo for your message. This pojo will be converted to JSON during transit.

  3. Create an identifying String for your message. The value should be unique to the message and idempotent. For example, if you call send twice with the same message, the messageId String should be the same in both cases.

Channels

Binder Interface

Broadleaf comes pre-configured with several binding interfaces out-of-the-box, and more may be configured and utilized. Broadleaf generally employs the patterns and annotations set forth in the Spring Cloud Stream reference docs. An example binding would resemble:

public interface Persistence {

    String INPUT = "persistenceInput";

    @Input // (1)
    SubscribableChannel persistenceInput();

    @Output // (2)
    MessageChannel persistenceOutput();

}
  1. The Input annotation is the standard Spring Cloud Stream way to describe a specific input channel for messages

  2. The Output annotation is the standard Spring Cloud Stream way to describe a specific output channel for messages

Channel Configuration

Broadleaf channels are configured for groups by default to achieve consumption durability and only-once execution per group. Each service interested in a channel configures a group for that channel in its property file configuration. For example, a property configuration for a pricing service that is interested in entity persistence changes might configure the persistence channel as:

spring.cloud.stream.bindings.persistenceInput.group=pricing
spring.cloud.stream.bindings.persistenceInput.destination=persistence
spring.cloud.stream.bindings.persistenceOutput.destination=persistence

Where the format is spring.cloud.stream.bindings.<channel>. There are significant other settings available for both the general Spring Cloud Stream architecture, as well as the specific middleware binding being used. Please refer to the Spring Cloud Stream reference documentation on the subject for more detailed tweaking of behavior.

The goal will generally be to receive one message instance per group and to do so in a durable fashion. This is achieved by using the Consumer Groups concept in Spring Cloud Stream. Consumer groups allow you to configure a cluster of similar services to behave as a group so that only one member of the group will process the message. For example, there may be several instances of the product service running to achieve scalability requirements, but once a product promotion message is sent, only one member of that cluster should consume the message and perform the sandbox promotion. Furthermore, by default, subscriptions with a consumer group configured are durable and messages will still be received by the group, even if all members of the grouped were stopped at the time the messages were sent. Out-of-the-box channels in Broadleaf are already configured appropriately in regard to consumer groups in all concerned services.

Error Handling

Spring Cloud Stream uses the retry template to attempt successful processing of a message a configured number of times. There is flexible configuration around the retry behavior, including timing and count. However, once the retry count is exhausted, the exception is bubbled to the binder, at which point it is sent back to the broker, or is handled by and error handler at the application level. In Broadleaf’s case, the message will generally be handled by a global error handler. See the Spring Cloud Stream reference docs for more information on retry template.

In the micro common data library, Broadleaf provides a global error handler at the application level, primarily to facilitate any message transformations before passing off to an error queue at the broker level. By default, no message transformation is performed and the message is logged out to standard logging before passing through the the error queue. As with other handlers, the com.broadleafcommerce.micro.common.data.message.DefaultErrorHandler component can be replaced with another component implementing the ErrorHandler interface. Or, another component that declares its own @StreamHandler method can participate in addition to DefaultErrorHandler.

Security

If a message payload supports security assertions, the consumer implementation should support verification of the security related information. Here’s an example snippet from a message payload used for sandbox promotions:

@Data
@RequiredArgsConstructor
@EqualsAndHashCode
public class PromotionRequest<P> implements Serializable, Securable { // (1)

    ...

    private String token; // (2)
}
  1. The Securable interface identifies a payload that contains security related information

  2. A token paramater (which is generally a OAuth2 token in the form of a Base64 encoded, signed JWT token) includes user identification information and scope claims that can verified to make policy decision regarding suitability for processing of a message.

Sometimes, the payload is received as structured information in the form of a JSON string, in which case, the token field should still be included and available in the JSON.

It is not a formal requirement, but this token should be decoded, verified and inspected before allowing processing of the message to proceed. This validation step is generally performed as part of the message handler, and most of the Broadleaf out-of-the-box handlers include a call out to a validation method, passing the secured payload.

Testing

Spring Cloud Stream supports a testing configuration that is automatically enabled in the testing context (even if a legitimate binder is configured for normal operation). This facilitates manual insertion of messages into channels in order to test handler behavior.

Verify Wiring

@SpringBootTest
public class ChangeWiringIT {

    @Configuration
    @EnableAutoConfiguration
    @EnableBinding(PersistenceConsumer.class)
    static class WiringConfig {

        @Bean
        TestPersistenceHandler testPersistenceHandler(Writer writer) {
            return new TestPersistenceHandler(writer);
        }
    }

    @MockBean
    Writer writer; // (1)

    @Autowired
    PersistenceConsumer persistenceConsumer; // (2)

    @Test
    void promotionWiring() throws IOException {
        String payload = "{\"_class\":\"" + MongoCatalog.class.getName() + "\",\"name\":\"test\"}";
        persistenceConsumer.persistenceInput().send(MessageBuilder.withPayload(payload).build()); // (3)
        Mockito.verify(writer).write(payload);
    }

}
  1. A mock Writer instance is created to receive and verify the message processing

  2. A Persistence binder interface instance is injected to allow for channel interaction and testing

  3. A message is pushed into the input channel, which should be received and processed by the handler (see below)

Exercise the Handler

@RequiredArgsConstructor
public class TestPersistenceHandler implements PersistenceHandler {

    private final Writer writer;

    @StreamListener(TenantSyncPersistenceConsumer.CHANNEL)
    @Override
    public void hook(String entityJson) {
        handle(entityJson);
    }

    @Override
    public ObjectMapper getObjectMapper() {
        return new ObjectMapper();
    }

    @Override
    public void processStateChange(JsonNode entityJson) { // (2)
        try {
            writer.write(entityJson.toString());
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public String[] getSupportedSimpleTypeNames() {
        return new String[] {JpaCatalog.class.getSimpleName()};
    }

}
  1. @EnableBinding activates the Persistence interface channels (and makes Persistence available as an injectable bean)

  2. processStateChange consumes the message and interacts with the Writer mock

Additional discussion on testing is available here.