Broadleaf Microservices
  • v1.0.0-latest-prod

Example Configuration for a Scheduled Job

This is a step-by-step guide demonstrating how to establish a scheduled job and its associated handler.

Objective

The example will be for a "data purge" job that periodically runs to delete MyExampleEntity records that are managed by the Catalog microservice.

Steps

Establish the ScheduledJob in the Scheduled Job Microservice

  1. Using the admin application or by directly invoking the Scheduled Job service’s REST API, create a job that looks like this:

    Scheduled Job
    {
      "name":"Delete Example Entities", (1)
      "type":"DELETE_EXAMPLE_ENTITIES", (2)
      "timingType":"CRON", (3)
      "enabled":true,
      "executed":false,
      "cron": "0 0 1 * * ?", (4)
      "manageInAdmin":true
    }
    1. A human-friendly name for the job.

    2. The type of the job, which will be used by the custom event handler to determine whether or not to handle this job.

    3. We want this job to recur rather than simply executing once.

    4. A cron expression stating the job should run at 1:00am every day.

Once this is persisted, it will automatically be picked up and scheduled by the SchedulerService as documented in Key Components.

Configure the "Resource Service" to listen for and handle triggered job events

For this example, we’re assuming the Catalog microservice is responsible for persistence/management of MyExampleEntity. Thus, the Catalog microservice will be the "resource service" that needs to listen for trigger events of the new job and execute the data purging behavior.

  1. Add message channel bindings for the triggered event channel in the Catalog service’s properties

    spring:
      cloud:
        stream:
          bindings:
            triggeredJobEventInputDeleteExampleEntities: (1)
              group: catalog-delete-example-entities (2)
              destination: triggeredJobEvent (3)
    1. The input channel name should be distinct for each type of triggered job event that the microservice will listen for. The Catalog service could be interested in listening for several different types of scheduled jobs, and each could have its own special handler. If one handler for a channel fails for whatever reason, Spring will not deliver the message to other handlers listening to the same channel. Thus, sharing channels between handlers (ex: having a single triggeredJobEventInput channel that all handlers listen to) is not advisable. By declaring different channels for each handler, we guarantee proper message delivery.

    2. Establish a consumer group to prevent more than one instance of the catalog service from consuming a particular message.

    3. The actual destination channel on the message broker from which messages will be sourced. The scheduled job service publishes messages to the triggeredJobEvent channel, and thus it is used here.

  2. Define the message channel interface

    public interface DeleteExampleEntitiesTriggeredJobConsumer {
    
        String CHANNEL = "triggeredJobEventInputDeleteExampleEntities";
    
        @Input(CHANNEL)
        SubscribableChannel triggeredJobEventInputDeleteExampleEntities();
    }
    Note
    Be sure to enable it via @EnableBinding in your configuration class.
  3. Establish a listener that will listen for the new job’s triggered event messages

    @RequiredArgsConstructor
    public class DeleteExampleEntitiesTriggeredJobListener {
    
        public static final String DELETE_EXAMPLE_ENTITIES_JOB_TYPE = "DELETE_EXAMPLE_ENTITIES"; (1)
    
        @Getter(AccessLevel.PROTECTED)
        private final IdempotentMessageConsumptionService idempotentConsumptionService;
    
        @Getter(AccessLevel.PROTECTED)
        private final MyExampleEntityService<MyExampleEntity> exampleEntityService;
    
        @StreamListener(DeleteExampleEntitiesTriggeredJobConsumer.CHANNEL)
        public void listen(Message<ScheduledJobRef> message) {
            idempotentConsumptionService.consumeMessage(message, (2)
                    DeleteExampleEntitiesTriggeredJobListener.class.getSimpleName(),
                    this::processMessage);
        }
    
        protected void processMessage(Message<ScheduledJobRef> message) {
            ScheduledJobRef scheduledJobRef = message.getPayload();
            if (!StringUtils.equals(scheduledJobRef.getType(), DELETE_EXAMPLE_ENTITIES_JOB_TYPE)) {
                return; (3)
            }
    
            exampleEntityService.deleteAllOlderThan( (4)
                    Instant.now().minus(100, ChronoUnit.DAYS));
        }
    }
    1. This matches the type we set for the job in the scheduled job service.

    2. Utilize the IdempotentMessageConsumptionService from the broadleaf-common-messaging library to avoid processing a message more than once in the event of duplicate delivery.

    3. If the job type of this message doesn’t match what we expect, simply discard it. This is necessary, as the scheduled job service publishes all trigger events to the same destination channel regardless of their type.

    4. Invoke some method on the MyExampleEntityService that will delete entities older than a given timestamp. In this case, we use the current time minus 100 days to request deletion of all entities older than 100 days ago.

    Note
    Don’t forget to register this as a bean in your configuration!

Once this is set up, the catalog service will be able to correctly receive the triggered job events for the new job, and purge data as necessary.

Enhancing flexibility by using ScheduledJobDetail

What if we wanted to make the age cutoff for the example entities configurable rather than hard-coding it to 100 days? For that, we can utilize ScheduledJobDetail, which is essentially a mechanism for passing arguments to the handler.

Update the ScheduledJob in the Scheduled Job Service to have details

  1. Using the admin application or by directly invoking the Scheduled Job service’s REST API, update the previously created scheduled job such that it has the following details field:

    Details Field in ScheduledJob
    "details":[
      {
        "name":"AGE_CRITERIA_IN_DAYS",
        "value":"300"
      }
    ]

Update the listener in the resource service to utilize the details

  1. Utilize the details in DeleteExampleEntitiesTriggeredJobListener

    @RequiredArgsConstructor
    public class DeleteExampleEntitiesTriggeredJobListener {
    
        public static final String DELETE_EXAMPLE_ENTITIES_JOB_TYPE = "DELETE_EXAMPLE_ENTITIES";
        public static final String AGE_CRITERIA_IN_DAYS_DETAIL_NAME = "AGE_CRITERIA_IN_DAYS"; (1)
    
        @Getter(AccessLevel.PROTECTED)
        private final IdempotentMessageConsumptionService idempotentConsumptionService;
    
        @Getter(AccessLevel.PROTECTED)
        private final MyExampleEntityService<MyExampleEntity> exampleEntityService;
    
        @StreamListener(DeleteExampleEntitiesTriggeredJobConsumer.CHANNEL)
        public void listen(Message<ScheduledJobRef> message) {
            idempotentConsumptionService.consumeMessage(message,
                    DeleteExampleEntitiesTriggeredJobListener.class.getSimpleName(),
                    this::processMessage);
        }
    
        protected void processMessage(Message<ScheduledJobRef> message) {
            ScheduledJobRef scheduledJobRef = message.getPayload();
            if (!StringUtils.equals(scheduledJobRef.getType(), DELETE_EXAMPLE_ENTITIES_JOB_TYPE)) {
                return;
            }
    
            int ageCutoffInDays = getAgeCutoffInDays(scheduledJobRef);
            exampleEntityService.deleteAllOlderThan(
                    Instant.now().minus(ageCutoffInDays, ChronoUnit.DAYS)); (5)
        }
    
        protected int getAgeCutoffInDays(ScheduledJobRef scheduledJobRef) {
            return ListUtils.emptyIfNull(scheduledJobRef.getDetails())
                    .stream()
                    .filter(detail -> StringUtils.equals(detail.getName(), AGE_CRITERIA_IN_DAYS_DETAIL_NAME)) (2)
                    .map(additionalCriteriaDetail -> (String) additionalCriteriaDetail.getValue())
                    .map(Integer::valueOf) (3)
                    .findFirst()
                    .orElse(100); (4)
        }
    }
    1. A constant that matches the name of the detail we set on the scheduled job

    2. Look through the details and find the one matching the expected name

    3. Convert the value to an integer

    4. Default the value to 100 if no age criteria detail was given

    5. Supply the determined value to the method

With the above changes, we can now update the age cutoff whenever we please just by modifying the detail value in the ScheduledJob!