Broadleaf Microservices
  • v1.0.0-latest-prod

Adding a New Message Producer

In this tutorial, we’ll start by exploring the configuration that’s needed to setup a message channel via Spring Cloud Stream, and we’ll look at how a message is produced and sent. Finally, we’ll create a simple message listener and discuss how enforce idempotent message consumption.

Throughout this tutorial, we’ll specifically focus on the message that is sent when a checkout is successfully completed. That said, the patterns that we’ll look at are generally applicable throughout the Broadleaf Microservices ecosystem.

Building and Configuring a Message Output Channel

Much of building a message producer involves setting up simple Spring Cloud Stream configuration with the following components:

A producer interface that declares the @Output binding target
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface CheckoutCompletionProducer {
    String CHANNEL = "checkoutCompletionOutput";

    @Output(CHANNEL)
    MessageChannel checkoutCompletionOutput();
}
A configuration class that registers the binding targets via @EnableBinding
...
import org.springframework.cloud.stream.annotation.EnableBinding;
...

@Configuration
@EnableBinding(CheckoutCompletionProducer.class)
public static class CartCheckoutMessagingAutoConfiguration {}
Application properties to define the destination of each binding
spring:
  cloud:
    stream:
      bindings:
        checkoutCompletionOutput:
          destination: checkoutCompletion
Note
  • Spring Cloud Stream uses this configuration behind the scenes to auto-generate Kafka queues/topics.

  • Notice that checkoutCompletionOutput matches the value that we previously registered with the @Output annotation. This causes a message channel to be created with the name checkoutCompletionOutput.

  • By default, Broadleaf registers the out-of-box bindings in *-defaults.yml (e.g. cartoperation-defaults.yml) properties files.

Building and Sending the CheckoutCompletionEvent

With the output binding in place, the CheckoutCompletionProducer interface can now be injected into any Spring component that needs to send a message via this channel.

Let’s take a look at the DefaultCheckoutService to understand how messages are built and sent.

package com.broadleafcommerce.cartoperation.service.checkout;

import static com.broadleafcommerce.common.messaging.service.DefaultMessageLockService.MESSAGE_IDEMPOTENCY_KEY;

import org.springframework.integration.support.MessageBuilder;

...

@RequiredArgsConstructor
public class DefaultCheckoutService implements CheckoutService {

    ...

    @Getter(AccessLevel.PROTECTED)
    private final CheckoutCompletionProducer checkoutCompletionProducer; // (1)

    ...

    /**
     * Send a message to notify external services (and internal listeners) of the completed checkout
     * so that they can react accordingly.
     *
     * @param cart The cart that has completed checkout. This should be the primary content of the
     *        out-going message.
     * @param requestId The id representing this request to checkout
     * @param contextInfo Context information around sandbox and multi-tenant state.
     */
    protected void sendCheckoutCompletionMessage(@lombok.NonNull Cart cart,
            @lombok.NonNull String requestId,
            @Nullable ContextInfo contextInfo) {
        try {
            Assert.notNull(checkoutCompletionProducer,
                    "CheckoutCompletionProducer was not available for sending a " +
                            "message for checkout completion. Likely a configuration issue in " +
                            "which messaging for this component is disabled.");
            CheckoutCompletionEvent completionEvent =
                    typeFactory.get(CheckoutCompletionEvent.class);
            completionEvent.setCart(cart);
            completionEvent.setRequestId(requestId);
            completionEvent.setContextInfo(contextInfo);

            checkoutCompletionProducer.checkoutCompletionOutput()
                    .send(MessageBuilder.withPayload(completionEvent) // (2) (3)
                            .setHeaderIfAbsent(MESSAGE_IDEMPOTENCY_KEY, cart.getId()) // (4)
                            .build());
        } catch (Exception e) {
            String errorMessage = String.format(
                    "Failed to send checkout completion message for cart (order number: %s).",
                    cart.getOrderNumber());
            throw new CheckoutCompletionMessageException(errorMessage, e, cart);
        }
    }
}
  1. Inject the CheckoutCompletionProducer

  2. Leverage the MessageBuilder to produce a Message (org.springframework.messaging.Message) with a CheckoutCompletionEvent payload and an idempotency header

  3. Send the message by calling checkoutCompletionProducer.checkoutCompletionOutput().send(…​)

  4. Include the MESSAGE_IDEMPOTENCY_KEY header with the cart id, doing so ensures the message for this cart only gets processed once. See Idempotent Message Consumption in Detail.

Durable sending