Wouldn't it be nice to have a Timer Service that runs inside a Coherence Cluster and can execute pre-established Jobs at a certain time of a day? Yes it is possible and I have uploaded a simple Quartz based implementation to download and test and this page explains its details.
How does it work?
It is difficult to have a concept of single time in a Peer-to-Peer topology based distributed computing environment. Unless there is a centralized server that acts as an arbiter or a global time service, it is hard to implement a timer service in P2P systems like Oracle Coherence. I am sure there may be other better ways to solve it but following is one way out. And the way is to forget if there is a global clock or not as long as at the time of execution the task status is checked against a finite state machine. Aha, so instead of asking someone what the time is each JVM or node of a cluster can maintain its own execution thread but checks against a single place what the job status is and then make a decision if job should or shouldn't be executed? Coherence distributed cache can be used as that single place holder of the state. Interesting huh? Scroll down:
First thing first:
Lets find a place where Jobs and their execution times can be defined for Coherence service to pick it up and schedule it. Instead of complicating the configuration lets use coherence cache configuration instead. We need to add a new XML Element in coherence cache configuration that recognizes a timer-scheme element that can be used to define Jobs and its time of execution. Following is a sample cache configuration:
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>TimerCache</cache-name>
<scheme-name>distributed-scheme</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<distributed-scheme>
<scheme-name>distributed-scheme</scheme-name>
<service-name>DistributedCache</service-name>
<backing-map-scheme>
<local-scheme></local-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<!-- Adding a new Timer Service -->
<timer-scheme>
<service-name>DistributedTimerService</service-name>
<job-configs>
<job>
<class-name>com.ezduck.coherence.timer.MyTimerJob</class-name>
<at-time>Sat Nov 15 17:23:00 MST 2008</at-time>
<repeat>true</repeat>
</job>
</job-configs>
</timer-scheme>
</caching-schemes>
</cache-config>
Second, a custom config loader:
With Xml Element defined we need to create a class that can read this new element. Almost everything in Coherence is pluggable. By default Coherence uses DefaultConfigurableCacheFactory to read the cache configuration file but a custom factory can be written and plugged in to read custom XML Element as desired. The class can be defined in tangosol-coherence-{override}-{mode}.xml as:
<!DOCTYPE coherence PUBLIC
"-//Oracle Corporation//DTD Oracle Coherence 3.3//EN"
"http://www.tangosol.com/dtd/coherence_3_3.dtd">
<coherence>
<configurable-cache-factory-config>
<class-name>com.ezduck.coherence.config.ExtendedConfigurableCacheFactory</class-name>
<init-params>
<init-param>
<param-type>string</param-type>
<param-value system-property="tangosol.coherence.cacheconfig">coherence-cache-config.xml</param-value>
</init-param>
</init-params>
</configurable-cache-factory-config>
<logging-config>
<severity-level system-property="tangosol.coherence.log.level">6</severity-level>
</logging-config>
</coherence>
Now what about the coherence-cache.dtd?
Update the coherence-cache.dtd as follows to avoid those IDE's warnings:
<!ENTITY % caching-scheme
"%clustered-caching-scheme; | %standalone-caching-scheme; | %composite-caching-scheme; | invocation-scheme | read-write-backing-map-scheme | versioned-backing-map-scheme | remote-cache-scheme | remote-invocation-scheme | proxy-scheme |
timer-scheme">
.....
<!ELEMENT timer-scheme (service-name?, job-configs?)>
<!ELEMENT job-configs (job?)>
<!ELEMENT job (class-name?, at-time?)>
<!ELEMENT at-time (#PCDATA)>
What about the ExtendedConfigurableCacheFactory?
package com.ezduck.coherence.config;
import com.ezduck.coherence.timer.CoherenceTrigger;
import java.util.Date;
import java.util.Iterator;
import com.tangosol.util.Base;
import com.tangosol.run.xml.XmlHelper;
import com.tangosol.run.xml.XmlElement;
import com.tangosol.net.DefaultConfigurableCacheFactory;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;
public class ExtendedConfigurableCacheFactory extends DefaultConfigurableCacheFactory {
public ExtendedConfigurableCacheFactory() {
super();
}
public ExtendedConfigurableCacheFactory(String path, ClassLoader loader) {
super(path, loader);
}
public ExtendedConfigurableCacheFactory(String path) {
super(path);
}
public ExtendedConfigurableCacheFactory(XmlElement xmlConfig) {
super(xmlConfig);
}
/**
* <timer-scheme>
<service-name>DistributedTimerService</service-name>
<job-configs>
<job>
<class-name>TimerJob</class-name>
<at-time>1:00PM</at-time>
</job>
</job-configs>
</timer-scheme>
* @param xmlConfig
*/
public void setConfig(XmlElement xmlConfig) {
XmlElement timerService =
XmlHelper.findElement(xmlConfig, "//caching-schemes/timer-scheme/");
if (timerService != null) {
XmlElement jobConfigs = timerService.findElement("job-configs");
Iterator iter = jobConfigs.getElements("job");
XmlElement jobElement = null;
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
try {
Scheduler scheduler = schedulerFactory.getScheduler();
Class clz = null;
while (iter.hasNext()) {
jobElement = (XmlElement) iter.next();
clz = Class.forName (jobElement.findElement("class-name").getString());
System.out.println("Job Clazz: " + clz.getName());
JobDetail job = new JobDetail (clz.getName(), "TimerGroup", clz);
String at = jobElement.findElement("at-time").getString();
CoherenceTrigger trigger =
new CoherenceTrigger("Trigger", "TimerGroup", new Date (at));
// -- Schedule it to run!
Date ft = scheduler.scheduleJob (job, trigger);
// -- new CoherenceJob (job), trigger);
System.out.println("Job will run at: " + ft.toString());
}
scheduler.start();
} catch (Exception exp) {
Base.err(exp);
} finally {
// -- Now the service has started so remove the timer-service from
// -- the XmlElement.
XmlHelper.removeElement(timerService.getParent(), "timer-scheme");
}
}
super.setConfig(xmlConfig);
}
public Service ensureService(String serviceName) {
return SingletonTimerService.TIMER_SERVICE;
}
private static class SingletonTimerService {
protected static TimerService TIMER_SERVICE = new TimerService ();
}
}
Almost there:
How do I change the state of the execution if SCHEDULED, RUNNING or COMPLETED in a thread safe way across multiple JVMs? Coherence has a concept of an EntryProcessor that makes sure only one and one thread can update a state of an Object at a given point of time and with an intrinsic lock mechanism. EntryProcessors can be used to make sure that a Timer Job should check if it is not being executed by another cluster node and then decide if it wants to execute itself or not.
#TimerStatusUpdateProcessor.java
package com.ezduck.coherence.timer.processor;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.processor.AbstractProcessor;
public class TimerStatusUpdateProcessor extends AbstractProcessor {
/**
* Status of Job
*/
public static enum Status {
SCHEDULED,
RUNNING,
COMPLETED,
;
}
private Status status;
/**
* Constructor to pass the Job's status to be set
* @param status
*/
public TimerStatusUpdateProcessor(Status status) {
this.status = status;
}
/**
* Sets the status of the Job's status
* @param entry
* @return
*/
public Object process(InvocableMap.Entry entry) {
entry.setValue(status);
return null;
}
}
You must have noticed the CoherenceTrigger class
CoherenceTrigger updates the state of the Job execution when scheduled and when completed. At this point I have let the Job itself to decide if it is running or not. CoherenceTrigger looks like:
package com.ezduck.coherence.timer;
import java.util.Date;
import com.tangosol.net.NamedCache;
import com.tangosol.net.CacheFactory;
import org.quartz.Calendar;
import org.quartz.SimpleTrigger;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.ezduck.coherence.timer.processor.TimerEntryProcessor;
public class CoherenceTrigger extends SimpleTrigger {
final static String CACHE_NAME = "TimerCache";
public CoherenceTrigger() {
super ();
}
public CoherenceTrigger(String name, String group) {
super (name, group);
}
public CoherenceTrigger (String name, String group, Date date) {
super (name, group, date);
}
public void triggered (Calendar calendar) {
NamedCache nCache = CacheFactory.getCache(CACHE_NAME);
nCache.invoke(getFullName() + getNextFireTime(),
new TimerStatusUpdateProcessor(TimerStatusUpdateProcessor.Status.valueOf("SCHEDULED")));
super.triggered(calendar);
}
public int executionComplete(JobExecutionContext context,
JobExecutionException exception) {
NamedCache nCache = CacheFactory.getCache(CACHE_NAME);
nCache.invoke(getFullName() + context.getFireTime(),
new TimerStatusUpdateProcessor(TimerStatusUpdateProcessor.Status.valueOf("COMPLETED")));
return super.executionComplete(context, exception);
}
}
How to add Custom Events?Coherence supports three kinds of Events:
- MapEvent - Events that are generated on a Cache because of CRUD operations.
- MemberEvent - When a Cluster Member leaves or joins a CacheService
- ServiceEvent - This is a generic Event that can be used to create custom events on top of Cache services
Now to add say a TimerEvent that are generated when a Job is scheduled and stopped we need to create a Service that can generate and dispatch these events to all the TimerListeners. Ideally this new Service has to be created at the component level, but for the lack of developer control on the Coherence components and a limited functionality of the Timer Service following approach can be taken:
# TimerService.javapublic class TimerService implements Service, Serializable {
private NamedCache nCache = null;
private Collection<TimerListener> listeners = new ArrayList<TimerListener> ();
public TimerService() {
}
public void addServiceListener(ServiceListener serviceListener) {
nCache = CacheFactory.getCache ( CoherenceTrigger.CACHE_NAME);
System.out.println("TimerService: Adding Service Listener");
nCache.getCacheService().addServiceListener(serviceListener);
listeners.add((TimerListener)serviceListener);
}
public void removeServiceListener(ServiceListener serviceListener) {
nCache = CacheFactory.getCache (CoherenceTrigger.CACHE_NAME);
nCache.getCacheService().removeServiceListener(serviceListener);
listeners.remove((TimerListener)serviceListener);
}
...
}
The problem with TimerService is only those TimerService will dispatch events that has listeners registered in the JVM. As this TimerService is single instance per JVM there has to be a mechanism to dispatch changes in one to other nodes. The solution is to piggy back ServiceEvent on MapEvent. With this change the TimerService becomes:
public class TimerService implements Service, MapListener {
private NamedCache nCache = null;
private Collection<TimerListener> listeners =
new ArrayList<TimerListener>();
public TimerService() {
}
public void addServiceListener(ServiceListener serviceListener) {
nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME);
nCache.addMapListener(this);
System.out.println("TimerService: Adding Service Listener");
nCache.getCacheService().addServiceListener(serviceListener);
listeners.add((TimerListener)serviceListener);
}
public void removeServiceListener(ServiceListener serviceListener) {
nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME);
nCache.getCacheService().removeServiceListener(serviceListener);
listeners.remove(serviceListener);
}
...
public void entryInserted(MapEvent mapEvent) {
notifyOtherTimerServices(mapEvent);
}
public void entryUpdated(MapEvent mapEvent) {
notifyOtherTimerServices(mapEvent);
}
public void entryDeleted(MapEvent mapEvent) {
}
private void notifyOtherTimerServices(MapEvent mapEvent) {
String newValue =
((TimerStatusUpdateProcessor.Status)mapEvent.getNewValue()).name();
String oldValue = "";
if (mapEvent.getOldValue() != null) {
oldValue =
((TimerStatusUpdateProcessor.Status)mapEvent.getOldValue()).name();
}
if (!(oldValue.equals("") && newValue.equals("COMPLETED"))) {
int s =
(newValue == TimerStatusUpdateProcessor.Status.valueOf("SCHEDULED").name()) ?
TimerEvent.SERVICE_STARTED : TimerEvent.SERVICE_STOPPED;
TimerEvent tE = new TimerEvent(this, s);
Collection<TimerListener> listeners = this.getListeners();
for (TimerListener listener : listeners) {
tE.dispatch(listener);
}
}
}}
# TimerEvent.javapackage com.ezduck.coherence.timer;
import com.tangosol.util.Base;
import com.tangosol.util.Listeners;
import com.tangosol.util.Service;
import com.tangosol.util.ServiceEvent;
import java.util.EventListener;
public class TimerEvent extends ServiceEvent {
/**
* The event's id.
*/
protected int m_nId;
protected Service m_service;
public TimerEvent (Service service, int nId) {
super (service, nId);
m_service = service;
m_nId = nId;
}
/**
* Return this event's id. The event id is one of the ENTRY_*
* enumerated constants.
*
* @return an id
*/
public int getId() {
return m_nId;
}
public Service getService () {
return m_service;
}
/**
* Dispatch this event to the specified listeners collection.
* <p>
* This call is equivalent to
* <pre>
* dispatch(listeners, true);
* </pre>
*
* @param listeners the listeners collection
*
* @throws ClassCastException if any of the targets is not
* an instance of MapListener interface
*/
public void dispatch(Listeners listeners) {
dispatch(listeners, true);
}
/**
* Dispatch this event to the specified listeners collection.
*
* @param listeners the listeners collection
* @param fStrict if true then any RuntimeException thrown by event
* handlers stops all further event processing and the
* exception is re-thrown; if false then all exceptions
* are logged and the process continues
*
* @throws ClassCastException if any of the targets is not
* an instance of MapListener interface
*/
public void dispatch(Listeners listeners, boolean fStrict) {
if (listeners != null) {
System.out.println("There are some registered listeners");
EventListener[] targets = listeners.listeners();
for (int i = targets.length; --i >= 0; ) {
TimerListener target = (TimerListener) targets[i];
try {
dispatch(target);
} catch (RuntimeException e) {
if (fStrict) {
throw e;
} else {
Base.err(e);
}
}
}
}
}
/**
* Dispatch this event to the specified MapListener.
*
* @param listener the listener
*/
public void dispatch(TimerListener listener) {
switch (getId()) {
case TimerEvent.SERVICE_STARTED:
listener.serviceStarted(this);
break;
case TimerEvent.SERVICE_STOPPED:
listener.serviceStopped(this);
break;
}
}
/**
* Get the event's description.
*
* @return this event's description
*/
protected String getDescription() {
switch (getId()) {
case SERVICE_STARTED:
return "Timer Started";
case SERVICE_STOPPED:
return "Timer Completed";
default:
throw new IllegalStateException();
}
}
/**
* Convert an event ID into a human-readable string.
*
* @param nId an event ID, one of the ENTRY_* enumerated values
*
* @return a corresponding human-readable string, for example "inserted"
*/
public static String getDescription(int nId) {
switch (nId) {
case SERVICE_STARTED:
return "SCHEDULED";
case SERVICE_STOPPED:
return "COMPLETED";
default:
return "<unknown>";
}
}
}
# TimerListener.javapackage com.ezduck.coherence.timer;
import com.tangosol.util.ServiceListener;
import java.io.Serializable;
public interface TimerListener extends ServiceListener, Serializable {
}
Look at the source to find more details how these new classes to support Timer events are integrated.
Whats next?
I have uploaded the source and configurations for everyone to use it. I would not bet my life on it but its free to use. I would like to expand its implementation and the following:
- start/stop should be part of TimerService class.
- ConfigurableCacheFactory should only be responsible to parse the timer-scheme.
- What happens if the node executing the task dies? There is no timeout or fail over mechanism.
History:
@07/22/2009:- Added support for TimerEvents and TimerListeners based on ServiceEvents and ServiceListeners.
- Created a CoherenceJob to control only one Quartz Job is executed in the cluster.
- Added ServiceEvent dispatching framework.
@11/24/2008:
- Added <repeat>true/false</repeat> in the timer-scheme's job config. Default is false.
- The trigger uses service-name. Throws an IllegalStateException if service-name is not defined.
- Updated cache-config.dtd to add repeat element.