Broadleaf Microservices
  • v1.0.0-latest-prod

Adding a New Message Listener

In this tutorial, we’ll start by exploring the configuration that’s needed to setup a message channel via Spring Cloud Stream, and we’ll look at how a message is produced and sent. Finally, we’ll create a simple message listener and discuss how enforce idempotent message consumption.

Throughout this tutorial, we’ll specifically focus on the message that is sent when a checkout is successfully completed. That said, the patterns that we’ll look at are generally applicable throughout the Broadleaf Microservices ecosystem.

Building and Configuring a Message Output Channel

Much of building a message producer involves setting up simple Spring Cloud Stream configuration with the following components:

A producer interface that declares the @Output binding target
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface CheckoutCompletionProducer {
    String CHANNEL = "checkoutCompletionOutput";

    @Output(CHANNEL)
    MessageChannel checkoutCompletionOutput();
}
A configuration class that registers the binding targets via @EnableBinding
...
import org.springframework.cloud.stream.annotation.EnableBinding;
...

@Configuration
@EnableBinding(CheckoutCompletionProducer.class)
public static class CartCheckoutMessagingAutoConfiguration {}
Application properties to define the destination of each binding
spring:
  cloud:
    stream:
      bindings:
        checkoutCompletionOutput:
          destination: checkoutCompletion
Note
  • Spring Cloud Stream uses this configuration behind the scenes to auto-generate Kafka queues/topics.

  • Notice that checkoutCompletionOutput matches the value that we previously registered with the @Output annotation. This causes a message channel to be created with the name checkoutCompletionOutput.

  • By default, Broadleaf registers the out-of-box bindings in *-defaults.yml (e.g. cartoperation-defaults.yml) properties files.

Building and Sending the CheckoutCompletionEvent

With the output binding in place, the CheckoutCompletionProducer interface can now be injected into any Spring component that needs to send a message via this channel.

Let’s take a look at the DefaultCheckoutService to understand how messages are built and sent.

package com.broadleafcommerce.cartoperation.service.checkout;

import static com.broadleafcommerce.common.messaging.service.DefaultMessageLockService.MESSAGE_IDEMPOTENCY_KEY;

import org.springframework.integration.support.MessageBuilder;

...

@RequiredArgsConstructor
public class DefaultCheckoutService implements CheckoutService {

    ...

    @Getter(AccessLevel.PROTECTED)
    private final CheckoutCompletionProducer checkoutCompletionProducer; // (1)

    ...

    /**
     * Send a message to notify external services (and internal listeners) of the completed checkout
     * so that they can react accordingly.
     *
     * @param cart The cart that has completed checkout. This should be the primary content of the
     *        out-going message.
     * @param requestId The id representing this request to checkout
     * @param contextInfo Context information around sandbox and multi-tenant state.
     */
    protected void sendCheckoutCompletionMessage(@lombok.NonNull Cart cart,
            @lombok.NonNull String requestId,
            @Nullable ContextInfo contextInfo) {
        try {
            Assert.notNull(checkoutCompletionProducer,
                    "CheckoutCompletionProducer was not available for sending a " +
                            "message for checkout completion. Likely a configuration issue in " +
                            "which messaging for this component is disabled.");
            CheckoutCompletionEvent completionEvent =
                    typeFactory.get(CheckoutCompletionEvent.class);
            completionEvent.setCart(cart);
            completionEvent.setRequestId(requestId);
            completionEvent.setContextInfo(contextInfo);

            checkoutCompletionProducer.checkoutCompletionOutput()
                    .send(MessageBuilder.withPayload(completionEvent) // (2) (3)
                            .setHeaderIfAbsent(MESSAGE_IDEMPOTENCY_KEY, cart.getId()) // (4)
                            .build());
        } catch (Exception e) {
            String errorMessage = String.format(
                    "Failed to send checkout completion message for cart (order number: %s).",
                    cart.getOrderNumber());
            throw new CheckoutCompletionMessageException(errorMessage, e, cart);
        }
    }
}
  1. Inject the CheckoutCompletionProducer

  2. Leverage the MessageBuilder to produce a Message (org.springframework.messaging.Message) with a CheckoutCompletionEvent payload and an idempotency header

  3. Send the message by calling checkoutCompletionProducer.checkoutCompletionOutput().send(…​)

  4. Include the MESSAGE_IDEMPOTENCY_KEY header with the cart id, doing so ensures the message for this cart only gets processed once. See Idempotent Message Consumption in Detail.

Building and Configuring a Message Input Channel

Similar to the creation of a message producer, creating a message listener also involves setting up a few similar components:

A consumer interface that declares the @Input binding target
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyCheckoutCompletionConsumer {

    String CHANNEL = "myCheckoutCompletionInput";

    @Input(CHANNEL)
    SubscribableChannel myCheckoutCompletionInput();

}
A configuration class that registers the binding targets via @EnableBinding
@Configuration
@EnableBinding(OfferCheckoutCompletionConsumer.class)
public class MyMessagingConfiguration {
    ...
}
Application properties to define the destination (i.e. the source) for the binding
spring:
  cloud:
    stream:
      bindings:
        myCheckoutCompletionInput:
          group: my-checkout-completion-group
          destination: checkoutCompletion
Note
  • Notice that myCheckoutCompletionInput matches the value that we previously registered with the @Input annotation.

  • For input channels, it’s important to define unique input channels for each listener. This causes Spring Cloud Stream to produce multiple subscribers, each with an independent delivery lifecycle.

  • If the channel is shared amongst several listeners in the same application, then it will try to deliver the message sequentially to each listener. If the first listener fails to process the message (i.e. it throws an exception), then the message will not be delivered to the other listeners on the same channel. Instead, if each listener has its own input channel, then the failed processing of one listener will not affect any other listeners.

  • For input channels, it’s important to declare a consumer group (i.e. my-checkout-completion-group in the example above) to ensure that the message is not processed multiple times if there are multiple instances of the application.

  • By default, Broadleaf registers the out-of-box bindings in *-defaults.yml (e.g. cartoperation-defaults.yml) properties files.

Building a Message Listener

Listening for messages on the input channel can be achieved using the following pattern:

import org.springframework.cloud.stream.annotation.StreamListener;
...

@RequiredArgsConstructor
@DataRouteByKey("MY_SERVICE_DATA_ROUTE_KEY") // (1)
public class MyCheckoutCompletionListener {

    @Getter(AccessLevel.PROTECTED)
    private final IdempotentMessageConsumptionService idempotentConsumptionService;

    @StreamListener(MyCheckoutCompletionConsumer.CHANNEL) // (2)
    public void listen(Message<CheckoutCompletionEvent> message) { // (3)
        idempotentConsumptionService.consumeMessage(message, // (4)
                CheckoutCompletionListener.class.getSimpleName(), this::processMessage); // (5)
    }

    protected void processMessage(Message<CheckoutCompletionEvent> message) {
        CheckoutCompletionEvent event = message.getPayload();

        // Your logic goes here
    }

}
  1. Leverage @DataRouteByKey or @DataRouteByExample to declare the data route that should be used by the IdempotentMessageConsumptionService.

  2. Use the @StreamListener annotation to declare the channel that you’d like to listen to.

  3. Declare that you expect to receive a message with the CheckoutCompletionEvent payload. This will cause the payload to be deserialized into the CheckoutCompletionEvent object.

  4. Leverage the IdempotentMessageConsumptionService to ensure that you don’t process the same message twice. Spring Cloud Stream will deliver messages at least once, so you need to be sure that the message is only processed once.

  5. If the idempotency check passes, then defer to another method for processing of the message.

    Note
    Throwing an exception while processing the message and allowing it to bubble up to the @StreamListener proxy will engage Spring Cloud Stream’s retry and dead letter queue mechanisms.

From there, we just need to register a bean for this class.

@Bean
public MyCheckoutCompletionListener myCheckoutCompletionEventListener(
        IdempotentMessageConsumptionService idempotentMessageConsumptionService) {
    return new MyCheckoutCompletionListener(idempotentMessageConsumptionService);
}

Idempotent Message Consumption

As previously mentioned, Spring Cloud Stream will deliver messages at least once, so you need to be mindful of not processing the same message twice. The primary mechanisms that we use to avoid duplicate processing are the IdempotentMessageConsumptionService and the MESSAGE_IDEMPOTENCY_KEY message header.

When each message is sent, it should include the MESSAGE_IDEMPOTENCY_KEY header with a value representing a unique identifier for the set of work that is to be completed. For example, the checkout completion messages use the cart id. For other messages a random UUID is sufficient.

When a message is passed to the IdempotentMessageConsumptionService, we gather the MESSAGE_IDEMPOTENCY_KEY value and check to see if the key has been persisted to the blc_resource_lock table (schema identified by the @DataRouteByKey annotation on the listener). If a lock is not present, then the idempotency check will pass and the message will be processed. Otherwise, the message will be ignored.

Note
If the same message payload is sent multiple times with different MESSAGE_IDEMPOTENCY_KEY values, then the message will be processed multiple times.