Broadleaf Microservices
  • v1.0.0-latest-prod

Adding a New Message Listener

In this tutorial, we’ll start by exploring the configuration that’s needed to setup a message channel via Spring Cloud Stream, and we’ll look at how a message is produced and sent. Finally, we’ll create a simple message listener and discuss how enforce idempotent message consumption.

Throughout this tutorial, we’ll specifically focus on the message that is sent when a checkout is successfully completed. That said, the patterns that we’ll look at are generally applicable throughout the Broadleaf Microservices ecosystem.

Building and Configuring a Message Output Channel

Building and Configuring a Message Input Channel

Similar to the creation of a message producer, creating a message listener also involves setting up a few similar components:

A consumer interface that declares the @Input binding target
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyCheckoutCompletionConsumer {

    String CHANNEL = "myCheckoutCompletionInput";

    @Input(CHANNEL)
    SubscribableChannel myCheckoutCompletionInput();

}
A configuration class that registers the binding targets via @EnableBinding
@Configuration
@EnableBinding(OfferCheckoutCompletionConsumer.class)
public class MyMessagingConfiguration {
    ...
}
Application properties to define the destination (i.e. the source) for the binding
spring:
  cloud:
    stream:
      bindings:
        myCheckoutCompletionInput:
          group: my-checkout-completion-group
          destination: checkoutCompletion
Note
  • Notice that myCheckoutCompletionInput matches the value that we previously registered with the @Input annotation.

  • For input channels, it’s important to define unique input channels for each listener. This causes Spring Cloud Stream to produce multiple subscribers, each with an independent delivery lifecycle.

  • If the channel is shared amongst several listeners in the same application, then it will try to deliver the message sequentially to each listener. If the first listener fails to process the message (i.e. it throws an exception), then the message will not be delivered to the other listeners on the same channel. Instead, if each listener has its own input channel, then the failed processing of one listener will not affect any other listeners.

  • For input channels, it’s important to declare a consumer group (i.e. my-checkout-completion-group in the example above) to ensure that the message is not processed multiple times if there are multiple instances of the application.

  • By default, Broadleaf registers the out-of-box bindings in *-defaults.yml (e.g. cartoperation-defaults.yml) properties files.

Building a Message Listener

Listening for messages on the input channel can be achieved using the following pattern:

import org.springframework.cloud.stream.annotation.StreamListener;
...

@RequiredArgsConstructor
@DataRouteByKey("MY_SERVICE_DATA_ROUTE_KEY") // (1)
public class MyCheckoutCompletionListener {

    @Getter(AccessLevel.PROTECTED)
    private final IdempotentMessageConsumptionService idempotentConsumptionService;

    @StreamListener(MyCheckoutCompletionConsumer.CHANNEL) // (2)
    public void listen(Message<CheckoutCompletionEvent> message) { // (3)
        idempotentConsumptionService.consumeMessage(message, // (4)
                CheckoutCompletionListener.class.getSimpleName(), this::processMessage); // (5)
    }

    protected void processMessage(Message<CheckoutCompletionEvent> message) {
        CheckoutCompletionEvent event = message.getPayload();

        // Your logic goes here
    }

}
  1. Leverage @DataRouteByKey or @DataRouteByExample to declare the data route that should be used by the IdempotentMessageConsumptionService.

  2. Use the @StreamListener annotation to declare the channel that you’d like to listen to.

  3. Declare that you expect to receive a message with the CheckoutCompletionEvent payload. This will cause the payload to be deserialized into the CheckoutCompletionEvent object.

  4. Leverage the IdempotentMessageConsumptionService to ensure that you don’t process the same message twice. Spring Cloud Stream will deliver messages at least once, so you need to be sure that the message is only processed once.

  5. If the idempotency check passes, then defer to another method for processing of the message.

    Note
    Throwing an exception while processing the message and allowing it to bubble up to the @StreamListener proxy will engage Spring Cloud Stream’s retry and dead letter queue mechanisms.

From there, we just need to register a bean for this class.

@Bean
public MyCheckoutCompletionListener myCheckoutCompletionEventListener(
        IdempotentMessageConsumptionService idempotentMessageConsumptionService) {
    return new MyCheckoutCompletionListener(idempotentMessageConsumptionService);
}

Idempotent Message Consumption

As previously mentioned, Spring Cloud Stream will deliver messages at least once, so you need to be mindful of not processing the same message twice. The primary mechanisms that we use to avoid duplicate processing are the IdempotentMessageConsumptionService and the MESSAGE_IDEMPOTENCY_KEY message header.

When each message is sent, it should include the MESSAGE_IDEMPOTENCY_KEY header with a value representing a unique identifier for the set of work that is to be completed. For example, the checkout completion messages use the cart id. For other messages a random UUID is sufficient.

When a message is passed to the IdempotentMessageConsumptionService, we gather the MESSAGE_IDEMPOTENCY_KEY value and check to see if the key has been persisted to the blc_resource_lock table (schema identified by the @DataRouteByKey annotation on the listener). If a lock is not present, then the idempotency check will pass and the message will be processed. Otherwise, the message will be ignored.

Note
If the same message payload is sent multiple times with different MESSAGE_IDEMPOTENCY_KEY values, then the message will be processed multiple times.