{
"name":"Delete Example Entities", (1)
"type":"DELETE_EXAMPLE_ENTITIES", (2)
"timingType":"CRON", (3)
"enabled":true,
"executed":false,
"cron": "0 0 1 * * ?", (4)
"manageInAdmin":true
}
This is a step-by-step guide demonstrating how to establish a scheduled job and its associated handler.
The example will be for a "data purge" job that periodically runs to delete MyExampleEntity
records that are managed by the Catalog microservice.
ScheduledJob
in the Scheduled Job MicroserviceUsing the admin application or by directly invoking the Scheduled Job service’s REST API, create a job that looks like this:
{
"name":"Delete Example Entities", (1)
"type":"DELETE_EXAMPLE_ENTITIES", (2)
"timingType":"CRON", (3)
"enabled":true,
"executed":false,
"cron": "0 0 1 * * ?", (4)
"manageInAdmin":true
}
A human-friendly name for the job.
The type of the job, which will be used by the custom event handler to determine whether or not to handle this job.
We want this job to recur rather than simply executing once.
A cron expression stating the job should run at 1:00am every day.
Once this is persisted, it will automatically be picked up and scheduled by the SchedulerService
as documented in Key Components.
For this example, we’re assuming the Catalog microservice is responsible for persistence/management of MyExampleEntity
.
Thus, the Catalog microservice will be the "resource service" that needs to listen for trigger events of the new job and execute the data purging behavior.
Add message channel bindings for the triggered event channel in the Catalog service’s properties
spring:
cloud:
stream:
bindings:
triggeredJobEventInputDeleteExampleEntities: (1)
group: catalog-delete-example-entities (2)
destination: triggeredJobEvent (3)
The input channel name should be distinct for each type of triggered job event that the microservice will listen for.
The Catalog service could be interested in listening for several different types of scheduled jobs, and each could have its own special handler.
If one handler for a channel fails for whatever reason, Spring will not deliver the message to other handlers listening to the same channel.
Thus, sharing channels between handlers (ex: having a single triggeredJobEventInput
channel that all handlers listen to) is not advisable.
By declaring different channels for each handler, we guarantee proper message delivery.
Establish a consumer group to prevent more than one instance of the catalog service from consuming a particular message.
The actual destination channel on the message broker from which messages will be sourced.
The scheduled job service publishes messages to the triggeredJobEvent
channel, and thus it is used here.
Define the message channel interface
public interface DeleteExampleEntitiesTriggeredJobConsumer {
String CHANNEL = "triggeredJobEventInputDeleteExampleEntities";
@Input(CHANNEL)
SubscribableChannel triggeredJobEventInputDeleteExampleEntities();
}
Note
|
Be sure to enable it via @EnableBinding in your configuration class.
|
Establish a listener that will listen for the new job’s triggered event messages
@RequiredArgsConstructor
public class DeleteExampleEntitiesTriggeredJobListener {
public static final String DELETE_EXAMPLE_ENTITIES_JOB_TYPE = "DELETE_EXAMPLE_ENTITIES"; (1)
@Getter(AccessLevel.PROTECTED)
private final IdempotentMessageConsumptionService idempotentConsumptionService;
@Getter(AccessLevel.PROTECTED)
private final MyExampleEntityService<MyExampleEntity> exampleEntityService;
@StreamListener(DeleteExampleEntitiesTriggeredJobConsumer.CHANNEL)
public void listen(Message<ScheduledJobRef> message) {
idempotentConsumptionService.consumeMessage(message, (2)
DeleteExampleEntitiesTriggeredJobListener.class.getSimpleName(),
this::processMessage);
}
protected void processMessage(Message<ScheduledJobRef> message) {
ScheduledJobRef scheduledJobRef = message.getPayload();
if (!StringUtils.equals(scheduledJobRef.getType(), DELETE_EXAMPLE_ENTITIES_JOB_TYPE)) {
return; (3)
}
exampleEntityService.deleteAllOlderThan( (4)
Instant.now().minus(100, ChronoUnit.DAYS));
}
}
This matches the type
we set for the job in the scheduled job service.
Utilize the IdempotentMessageConsumptionService
from the broadleaf-common-messaging
library to avoid processing a message more than once in the event of duplicate delivery.
If the job type of this message doesn’t match what we expect, simply discard it. This is necessary, as the scheduled job service publishes all trigger events to the same destination channel regardless of their type.
Invoke some method on the MyExampleEntityService
that will delete entities older than a given timestamp.
In this case, we use the current time minus 100 days to request deletion of all entities older than 100 days ago.
Note
|
Don’t forget to register this as a bean in your configuration! |
Once this is set up, the catalog service will be able to correctly receive the triggered job events for the new job, and purge data as necessary.
ScheduledJobDetail
What if we wanted to make the age cutoff for the example entities configurable rather than hard-coding it to 100 days?
For that, we can utilize ScheduledJobDetail
, which is essentially a mechanism for passing arguments to the handler.
ScheduledJob
in the Scheduled Job Service to have detailsUsing the admin application or by directly invoking the Scheduled Job service’s REST API, update the previously created scheduled job such that it has the following details
field:
ScheduledJob
"details":[
{
"name":"AGE_CRITERIA_IN_DAYS",
"value":"300"
}
]
Utilize the details in DeleteExampleEntitiesTriggeredJobListener
@RequiredArgsConstructor
public class DeleteExampleEntitiesTriggeredJobListener {
public static final String DELETE_EXAMPLE_ENTITIES_JOB_TYPE = "DELETE_EXAMPLE_ENTITIES";
public static final String AGE_CRITERIA_IN_DAYS_DETAIL_NAME = "AGE_CRITERIA_IN_DAYS"; (1)
@Getter(AccessLevel.PROTECTED)
private final IdempotentMessageConsumptionService idempotentConsumptionService;
@Getter(AccessLevel.PROTECTED)
private final MyExampleEntityService<MyExampleEntity> exampleEntityService;
@StreamListener(DeleteExampleEntitiesTriggeredJobConsumer.CHANNEL)
public void listen(Message<ScheduledJobRef> message) {
idempotentConsumptionService.consumeMessage(message,
DeleteExampleEntitiesTriggeredJobListener.class.getSimpleName(),
this::processMessage);
}
protected void processMessage(Message<ScheduledJobRef> message) {
ScheduledJobRef scheduledJobRef = message.getPayload();
if (!StringUtils.equals(scheduledJobRef.getType(), DELETE_EXAMPLE_ENTITIES_JOB_TYPE)) {
return;
}
int ageCutoffInDays = getAgeCutoffInDays(scheduledJobRef);
exampleEntityService.deleteAllOlderThan(
Instant.now().minus(ageCutoffInDays, ChronoUnit.DAYS)); (5)
}
protected int getAgeCutoffInDays(ScheduledJobRef scheduledJobRef) {
return ListUtils.emptyIfNull(scheduledJobRef.getDetails())
.stream()
.filter(detail -> StringUtils.equals(detail.getName(), AGE_CRITERIA_IN_DAYS_DETAIL_NAME)) (2)
.map(additionalCriteriaDetail -> (String) additionalCriteriaDetail.getValue())
.map(Integer::valueOf) (3)
.findFirst()
.orElse(100); (4)
}
}
A constant that matches the name of the detail we set on the scheduled job
Look through the details and find the one matching the expected name
Convert the value to an integer
Default the value to 100 if no age criteria detail was given
Supply the determined value to the method
With the above changes, we can now update the age cutoff whenever we please just by modifying the detail value in the ScheduledJob
!