
Consider a situation when you need to urgently replicate a website page, but the replication queue is full of other pages. It would be great if urgent items went to the top of the queue automatically, but AEM doesn’t provide high-priority replication out of the box. The good news is, you can tweak AEM settings manually to achieve this goal.
If there are multiple replication agents configured in the author instance and we replicate a bunch of pages, then all the publish instances will get the updates gradually in parallel. If we have a separate replication agent dedicated to high-priority tasks only, AEM will handle them together with the regular replication agent tasks.
For example, say there is a regular replication agent A that has items A1 through A1000 in the queue. There is also a separate agent for high-priority pages that has items H1 through H5. This means AEM will publish the pages in the following sequence: A1, H1, A2, H2, …, A5, H5, A6, A7, …, A1000.
This is a significant improvement compared to the default AEM agent, which would publish all of the A pages first.
So we need a separate replication agent that will only handle our high-priority replication tasks and will be excluded from normal replication. We will set up a workflow to send items into that agent. Then we can also make a scheduled service that will suspend all the ‘regular’ agents when there are items in the ‘high-priority’ queue.
1. Creating a separate replication agent to serve high-priority tasks
An out-of-the-box AEM instance has a default ‘publish’ replication agent.
Let’s copy it in CRXDE and rename the node to ‘publish_high’. Then we’ll edit its settings and enable the ‘ignore default’ checkbox on the ‘triggers’ tab.
We’ve created a separate replication agent which is excluded from normal replication settings. Let’s configure a workflow to send replication jobs to that agent.
2. Configuring a high-priority replication workflow
The workflow model is simple — just one single process step backed by a java class.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package com.acme.www.core.service.workflows; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Properties; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Service; import org.apache.sling.api.resource.ResourceResolverFactory; import org.osgi.framework.Constants; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.day.cq.replication.ReplicationActionType; import com.day.cq.replication.ReplicationException; import com.day.cq.replication.ReplicationOptions; import com.day.cq.replication.Replicator; import com.day.cq.workflow.WorkflowException; import com.day.cq.workflow.WorkflowSession; import com.day.cq.workflow.exec.WorkItem; import com.day.cq.workflow.exec.WorkflowProcess; import com.day.cq.workflow.metadata.MetaDataMap; @Component @Service @Properties({ @Property(name = Constants.SERVICE_DESCRIPTION, value = "Publishes with high priority"), @Property(name = "process.label", value = "High Priority Publish Process") }) public class HighPriorityPublish implements WorkflowProcess { private static final Logger LOG = LoggerFactory.getLogger(HighPriorityPublish.class); @Reference private Replicator replicator; @Reference private ResourceResolverFactory resourceResolverFactory; @Activate public void activate(ComponentContext componentContext) throws Exception { } @Override public void execute(WorkItem workItem, WorkflowSession workflowSession, MetaDataMap dataMap) throws WorkflowException { String path = workItem.getWorkflowData().getPayload().toString(); ReplicationOptions opts = new ReplicationOptions(); //picks up our 'publish_high' agent opts.setFilter(agent -> agent.getId().endsWith("_high")); try { replicator.replicate(workflowSession.getSession(), ReplicationActionType.ACTIVATE, path, opts); } catch (ReplicationException e) { LOG.error("Workflow failed for {}", workItem.getWorkflowData().getPayload().toString(), e); } } } |
The code sends a payload page to the replicator service and specifies that our ‘publish_high’ agent should handle a replication job. Now we can run the workflow on a high-priority page that needs to be replicated.
3. Suspending all regular agents if there are high-priority replication tasks in the backlog
The two steps we described above allow us to process high-priority replication jobs in parallel with the ‘regular’ replication jobs and thus speed up the publication process. But we can also postpone non-priority publication tasks altogether to replicate critical pages even faster.
For this, we need to implement a service that performs the following steps every 10 seconds:
- makes sure high-priority agent queues are not empty, meaning something needs to be published with high priority
- checks whether high-priority items and regular replication agents are active, then pauses regular agents
- validates that no high-priority items and regular replication agents are still paused, then activates them
This service would stop any replication jobs other than the high-priority ones and resume the regular jobs again when high-priority jobs are done. The code snippet below is for illustrative purposes only. Do not use it in production as is.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
package com.acme.www.core.service; import java.util.Arrays; import java.util.List; import java.util.Optional; import org.apache.felix.scr.annotations.*; import org.apache.sling.commons.osgi.PropertiesUtil; import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.day.cq.replication.Agent; import com.day.cq.replication.AgentManager; import com.day.cq.replication.ReplicationQueue; @Component( policy = ConfigurationPolicy.REQUIRE, label = "Replication Queue Suspend Sheduler", immediate = true, metatype = true ) @Service(value = Runnable.class) public class ReplicationQueueSuspendScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueSuspendScheduler.class); public static final String HIGH_PRIORITY_QUEUE_SUFFIX = "_high"; protected static final String DEFAULT_SCHEDULER_EXPRESSION = "0/10 * * ? * * *"; //every 10 seconds @Property( value = DEFAULT_SCHEDULER_EXPRESSION, label = "Scheduler Expression", description = "Cron Expression to run the scheduler in accordance with") protected static final String SCHEDULER_EXPRESSION = "scheduler.expression"; protected static final String DEFAULT_REPLICATION_AGENT = "publish"; @Property( value = {DEFAULT_REPLICATION_AGENT}, label = "Replication Agents", description = "List of replication agents to be suspended.", unbounded = PropertyUnbounded.ARRAY) protected static final String REPLICATION_AGENTS = "replication.agents"; @Reference private AgentManager agentMgr; private String schedulerExpression; private String[] replicationAgents; @Activate public void activate(ComponentContext context) { schedulerExpression = PropertiesUtil.toString(context.getProperties().get(SCHEDULER_EXPRESSION), DEFAULT_SCHEDULER_EXPRESSION); replicationAgents = PropertiesUtil.toStringArray(context.getProperties().get(REPLICATION_AGENTS), new String[] {DEFAULT_REPLICATION_AGENT}); LOG.debug("Scheduler cron expression - {}, replication agents - {}", schedulerExpression, Arrays.toString(replicationAgents)); } @Override public void run() { try { for (String agentName : replicationAgents) { Agent highAgent = agentMgr.getAgents().get(agentName + HIGH_PRIORITY_QUEUE_SUFFIX); if (highAgent != null) { ReplicationQueue highQueue = highAgent.getQueue(); int highQueueSize = getQueueSize(highQueue); if (highQueueSize < 0) { LOG.debug("High priproty queue {} is not found or corrupted. Agent was skipped", highAgent.getId()); continue; } Agent normAgent = agentMgr.getAgents().get(agentName); if (normAgent != null) { ReplicationQueue normQueue = normAgent.getQueue(); boolean pause = highQueueSize > 0; if (normQueue.isPaused() ^ pause) { //act only if the current state and desired state are different, so action is required LOG.debug("Current state of of {} is paused={}, it's being switch to paused={}", normAgent.getId(), normQueue.isPaused(), pause); normQueue.setPaused(pause); } } } } } catch (Exception e) { LOG.error(e.getMessage(), e); } } private static int getQueueSize(ReplicationQueue queue) { return Optional.ofNullable(queue) .map(ReplicationQueue::entries) .map(List::size) .orElse(-1); } } |
These three steps are all you need to prioritize your replication queue.
Author: Denis Glushkov