Coherence supports three types of Events:
Problem Statement: How to dispatch custom TimerEvent from Timer scheme
Ideally when a new service is added it has to be added in Coherence components package. This is a core layer and without using the right tools it is not possible to change. The component IDE is not publicly available. So whats the hack?
First, create a TimerService class:
Second, write a TimerListener interface:
- MapEvents: Generated because of CRUD operation on the Cache.
- MemberEvent: Generated when a cluster Member joins or leaves a CacheService and,
- ServiceEvent: A generic event framework that can be dispatched by the cache services (MemberEvents are of type ServiceEvent).
Problem Statement: How to dispatch custom TimerEvent from Timer scheme
Ideally when a new service is added it has to be added in Coherence components package. This is a core layer and without using the right tools it is not possible to change. The component IDE is not publicly available. So whats the hack?
First, create a TimerService class:
public class TimerService implements Service { private Collectionlisteners = new ArrayList (); public void addServiceListener (...) {...} public void removeServiceListener (...) {...} }
public interface TimerListener extends ServiceListener {}Then a TimerEvent like:
public class TimerEvent extends ServiceEvent { protected int m_nId; protected Service m_service; public TimerEvent (Service service, int nId) { super (service, nId); m_service = service; m_nId = nId; } ... public void dispatch(TimerListener listener) { switch (getId()) { case TimerEvent.SERVICE_STARTED: listener.serviceStarted(this); break; case TimerEvent.SERVICE_STOPPED: listener.serviceStopped(this); break; } } ... }Now the trick is without creating a TimerService component how do I use it? And, here is a hack: Create a custom ConfigurableCacheFactory and return a singleton TimerService from it's ensureService () method. This class looks like:
public class ExtendedConfigurableCacheFactory extends DefaultConfigurableCacheFactory { /** * Default Constructor */ public ExtendedConfigurableCacheFactory() { super(); } /** * Constructor loads the cache configuration from a given path and the * classloader to use. * * @param path * @param loader */ public ExtendedConfigurableCacheFactory(String path, ClassLoader loader) { super(path, loader); } /** * Constructor loads the cache configuration from a given path using the * default classloader. * @param path */ public ExtendedConfigurableCacheFactory(String path) { super(path); } /** * Constructor to load the coherence cache configuration * @param xmlConfig */ public ExtendedConfigurableCacheFactory(XmlElement xmlConfig) { super(xmlConfig); } ... public Service ensureService(String serviceName) { if (serviceName.equals("DistributedTimerService") { return SingletonTimerService.TIMER_SERVICE; } else { return super.ensureService (serviceName); } } private static class SingletonTimerService { protected TimerService TIMER_SERVICE = new TimerService (); } }Now the last leg of this problem is how to manage these TimerEvents and how to dispatch them? A few well positioned EntryProcessors can do this. First, an EP that updates the state of the Timer Task:
public class TimerStatusUpdateProcessor extends AbstractProcessor { /** * Status of Job */ public static enum Status { SCHEDULED, RUNNING, COMPLETED, ; } private Status status; /** * Constructor to pass the Job's status to be set * @param status */ public TimerStatusUpdateProcessor(Status status) { this.status = status; } /** * Sets the status of the Job's status * @param entry * @return */ public Object process(InvocableMap.Entry entry) { entry.setValue(status); return null; } }
Next a class CoherenceJob that all Quartz Timer Job has to extend. This CoherenceJob makes sure that only one Job runs across the entire cluster. A sneak preview of CoherenceJob is:
public abstract class CoherenceJob implements Job, Serializable { public CoherenceJob() { } public void execute(JobExecutionContext context) { // -- Only run if the Job is scheduled NamedCache nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME); Member member = nCache.getCacheService().getCluster().getLocalMember(); nCache.invoke(context.getJobDetail().getFullName(), new JobExecutor(context, member)); } public abstract void process(JobExecutionContext context); private class JobExecutor extends AbstractProcessor implements Serializable { private transient JobExecutionContext context; private Member member; public JobExecutor(JobExecutionContext context, Member member) { this.context = context; this.member = member; } public Object process(InvocableMap.Entry entry) { String configName = System.getProperty("tangosol.coherence.cacheconfig"); if (configName == null) { configName = "coherence-cache-config.xml"; } InvocationService iS = (InvocationService) new DefaultConfigurableCacheFactory(configName). ensureService("JobInvocationService"); iS.query(new JobProcessor(context), Collections.singleton(member)); return null; } } private class JobProcessor implements Invocable { private JobExecutionContext context; private transient InvocationService iS; public JobProcessor(JobExecutionContext context) { this.context = context; } public void init(InvocationService invocationService) { iS = invocationService; } public void run() { System.out.println("Processing the real work"); process(context); } public Object getResult() { return null; } } }So what about the Client? Here you go:
public class TimerTest extends TestCase { public TimerTest(String sTestName) { super(sTestName); } public static void main(String args[]) { Service service = CacheFactory.getConfigurableCacheFactory(). ensureService("DistributedTimerService"); service.addServiceListener(new MyTimerListener ()); try { System.in.read(); } catch (Exception exp) { exp.printStackTrace(); } System.exit(1); } protected void setUp() throws Exception { super.setUp(); } protected void tearDown() throws Exception { super.tearDown(); } private static class MyTimerListener implements TimerListener { public void serviceStarting(ServiceEvent serviceEvent) { System.out.println("Service Starting: " + serviceEvent.getId()); } public void serviceStarted(ServiceEvent serviceEvent) { System.out.println("Service Started: " + serviceEvent.getId()); } public void serviceStopping(ServiceEvent serviceEvent) {} public void serviceStopped(ServiceEvent serviceEvent) { System.out.println("Service Stopped: " + serviceEvent.getId()); } } }The problem with the last TimerService is that only those instances will dispatch events that has listeners registered in it's own JVM. With this solution TimerListeners are only registered at the client. As this TimerService is a single instance per classloader there has to be a mechanism to dispatch changes in one to other nodes. One solution is to piggyback ServiceEvents on top of MapEvents. With this change the TimerService becomes:
public class TimerService implements Service, MapListener { private NamedCache nCache = null; private Collectionlisteners = new ArrayList (); public TimerService() {} public void addServiceListener(ServiceListener serviceListener) { nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME); nCache.addMapListener(this); nCache.getCacheService().addServiceListener(serviceListener); listeners.add((TimerListener)serviceListener); } public void removeServiceListener(ServiceListener serviceListener) { nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME); nCache.getCacheService().removeServiceListener(serviceListener); listeners.remove(serviceListener); }
Rest is left on your creativity. The entire project can be downloaded from Here... Enjoy!... public void entryInserted(MapEvent mapEvent) { notifyOtherTimerServices(mapEvent); } public void entryUpdated(MapEvent mapEvent) { notifyOtherTimerServices(mapEvent); } public void entryDeleted(MapEvent mapEvent) {} private void notifyOtherTimerServices(MapEvent mapEvent) { String newValue = ((TimerStatusUpdateProcessor.Status)mapEvent.getNewValue()).name(); String oldValue = ""; if (mapEvent.getOldValue() != null) { oldValue = ((TimerStatusUpdateProcessor.Status)mapEvent.getOldValue()).name(); } if (!(oldValue.equals("") && newValue.equals("COMPLETED"))) { int s = (newValue == TimerStatusUpdateProcessor.Status.valueOf("SCHEDULED").name()) ? TimerEvent.SERVICE_STARTED : TimerEvent.SERVICE_STOPPED; TimerEvent tE = new TimerEvent(this, s); Collection listeners = this.getListeners(); for (TimerListener listener : listeners) { tE.dispatch(listener); } } } }
No comments:
Post a Comment