Wednesday, March 10, 2021

Sling Jobs

 One of the most underused and powerful feature of sling is Sling Jobs. We use OSGI events for handling of events but in general event mechanism has no knowledge about the content of events. Therefore, it can't decide if an event is important and should be processed by someone. As the event mechanism is a "fire event and forget about it" algorithm, there is no way for an event admin to tell if someone has really processed the event. Processing of an event could fail, the server or bundle could be stopped etc.

On the other hand, there are use cases where the guarantee of processing is a must and usually this comes with the requirement of processing exactly once. Typical examples are sending notification emails (or sms), post processing of content (like thumbnail generation of images or documents), workflow steps etc.

Sling Event takes care of the above use cases. Sling Event has a concept of Job. A Job is also an event that is guaranteed to be executed at least once with the exception being the instance processing the job crashed after the job processing is finished but before the state is persisted. This results in job consumer processing the job twice.


Job Fundamentals: 

1) A job has two parts - topic which describes the nature of the job and payload, the data in the form of key-value pairs.

2) A job consumer is a service consuming and processing a job. It registers itself as an OSGi service together with a property defining which topics this consumer can process.

3) A job executor is a service processing a job. It registers itself as an OSGi service together with a property defining which topics this consumer can process.

Now lets start by creating a event handler for publishing a content:

ActivationEventHandler

package aem.project.core.jobs.handlers;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.framework.Constants;
import org.osgi.service.event.EventConstants;
import com.day.cq.replication.ReplicationAction;
import com.day.cq.replication.ReplicationActionType;

import java.util.HashMap;
import java.util.Map;

import org.apache.sling.event.jobs.JobManager;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Component(service = EventHandler.class,
immediate = true,
property = {
        Constants.SERVICE_DESCRIPTION + "=Demo to listen on changes in the replication",
        EventConstants.EVENT_TOPIC + "=" + ReplicationAction.EVENT_TOPIC
})
public class ActivationEventHandler implements EventHandler{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Reference
JobManager jobManager;

@Override
public void handleEvent(Event event) {
try {
logger.debug("Resource event: {} at: {}", event.getTopic());
            logger.debug("Replication Event is {}", ReplicationAction.fromEvent(event).getType());
            if (ReplicationAction.fromEvent(event).getType().equals(ReplicationActionType.ACTIVATE)) {
                logger.debug("Triggered activate on {}", ReplicationAction.fromEvent(event).getPath());

                //Create a property map to pass it to the JobConsumer service
                Map<String, Object> jobProperties = new HashMap<String, Object>();
                jobProperties.put("path", ReplicationAction.fromEvent(event).getPath());

                //For some reason if the job fails, but you want to keep retrying ; then in JobConsumer//Set the result as failed . Check the JobConsumer

                jobManager.addJob("aem/replication/job", jobProperties); // This can point to you registered Job Consumer Property Topics

                logger.debug("the  job has been started for: {}", jobProperties);
            }
} catch (Exception e) {
logger.error("Exception is " , e);
}
}

}

ActivationEventConsumer

package aem.project.core.jobs.handlers;

import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.component.annotations.Component;
import org.osgi.framework.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service = JobConsumer.class,
immediate = true,
property = {
        Constants.SERVICE_DESCRIPTION + "=Demo to listen on changes in the resource tree",
        JobConsumer.PROPERTY_TOPICS + "=aem/replication/job"  //topic names like sample.replication.job will NOT WORK
})
public class ActivationEventConsumer implements JobConsumer{
private final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public JobResult process(Job job) {
try {
            logger.debug("Processing the JOB *******");

            //A Property map will be passed on so we can fetch the values we need here to//Process the request

            String path = (String) job.getProperty("path");
            logger.debug("The path in which the replication is triggered and passed to the Job is " +
                    "{}", path);
            return JobConsumer.JobResult.OK;
        } catch (Exception e) {
            logger.error("Exception is ", e);
            return JobResult.FAILED;
        }
}

}

Deploy the code to your aem instance and navigate to OSGI Configuration Apache Sling Job Queue Configuration. Define the topic for the job alongside with the other configurations such as retrydelays, maximum retries etc





The jobs will be added to var/eventing/slings



Reference: 

https://sling.apache.org/documentation/bundles/apache-sling-eventing-and-job-handling.html