Monday, July 27, 2009

Why Continental should apologize

We are never short of controversies. India's former President Dr. Abdul Kalam was frisked by Continental security staff at Delhi Airport prior to boarding a flight to US. Since then there has been articles in support and opposition to if Continental was right to do so. First of all he is not any other "Government appointed VIP", He is an ex-President of India. I don't think American ex-Presidents go through security checks in America or outside. He is not some minister that you would need an ID card to find out. He should have had some security clearance. Even if Continental had legal rights to frisk anyone they want they should have been cautious who they pick. And even if legally they are not required to, Continental should apologize to probably the best President India had since 1960s as their moral responsibility.

Wednesday, July 22, 2009

Custom Events in Coherence

Coherence supports three types of Events:
  • MapEvents: Generated because of CRUD operation on the Cache.
  • MemberEvent: Generated when a cluster Member joins or leaves a CacheService and,
  • ServiceEvent: A generic event framework that can be dispatched by the cache services (MemberEvents are of type ServiceEvent). 
 I wrote a Quartz based Timer processes in Coherence late last year by creating a custom cache-scheme "timer-scheme". It was natural to think in terms of generating events when a Job is scheduled to run and has completed. These events would help clients to know when a Coherence Task has been scheduled and completed. The biggest challenge was how to add a new TimerEvent in Coherence?
Problem Statement: How to dispatch custom TimerEvent from Timer scheme

Ideally when a new service is added it has to be added in Coherence components package. This is a core layer and without using the right tools it is not possible to change. The component IDE is not publicly available. So whats the hack?

First, create a TimerService class:


public class TimerService implements Service {
   private Collection listeners = new ArrayList();
   public void addServiceListener (...) {...}
   public void removeServiceListener (...) {...}
}
Second, write a TimerListener interface:


public interface TimerListener extends ServiceListener {}
Then a TimerEvent like:


public class TimerEvent extends ServiceEvent {
   protected int m_nId;
   protected Service m_service;

   public TimerEvent (Service service, int nId) {
     super (service, nId);
     m_service = service;
     m_nId = nId;
   }
   ...
   public void dispatch(TimerListener listener) {
     switch (getId()) {
        case TimerEvent.SERVICE_STARTED:
             listener.serviceStarted(this);
        break;

        case TimerEvent.SERVICE_STOPPED:
             listener.serviceStopped(this);
        break;
     }
   }
   ...
}
Now the trick is without creating a TimerService component how do I use it? And, here is a hack: Create a custom ConfigurableCacheFactory and return a singleton TimerService from it's ensureService () method. This class looks like:


public class ExtendedConfigurableCacheFactory
   extends DefaultConfigurableCacheFactory {

   /**
    * Default Constructor
    */
   public ExtendedConfigurableCacheFactory() {
     super();
   }

   /**
    * Constructor loads the cache configuration from a given path and the
    * classloader to use.
    *
    * @param path
    * @param loader
    */
   public ExtendedConfigurableCacheFactory(String path, ClassLoader loader) {
      super(path, loader);
   }

   /**
    * Constructor loads the cache configuration from a given path using the
    * default classloader.
    * @param path
    */
   public ExtendedConfigurableCacheFactory(String path) {
      super(path);
   }

   /**
    * Constructor to load the coherence cache configuration
    * @param xmlConfig
    */
   public ExtendedConfigurableCacheFactory(XmlElement xmlConfig) {
      super(xmlConfig);
   }
   ...
   public Service ensureService(String serviceName) {
      if (serviceName.equals("DistributedTimerService") {
          return SingletonTimerService.TIMER_SERVICE;
      } else {
          return super.ensureService (serviceName);
      }
  }

  private static class SingletonTimerService {
     protected TimerService TIMER_SERVICE = new TimerService ();
  }
}
Now the last leg of this problem is how to manage these TimerEvents and how to dispatch them? A few well positioned EntryProcessors can do this. First, an EP that updates the state of the Timer Task:


public class TimerStatusUpdateProcessor extends AbstractProcessor {
   /**
    * Status of Job
    */
   public static enum Status {
      SCHEDULED,
      RUNNING,
      COMPLETED,
      ;
   }

   private Status status;

   /**
    * Constructor to pass the Job's status to be set
    * @param status
    */
   public TimerStatusUpdateProcessor(Status status) {
     this.status = status;
   }

   /**
    * Sets the status of the Job's status
    * @param entry
    * @return
    */
   public Object process(InvocableMap.Entry entry) {
      entry.setValue(status);
      return null;
   }
}

Next a class CoherenceJob that all Quartz Timer Job has to extend. This CoherenceJob makes sure that only one Job runs across the entire cluster. A sneak preview of CoherenceJob is:


public abstract class CoherenceJob implements Job, Serializable {
   public CoherenceJob() {
   }

   public void execute(JobExecutionContext context) {

     // -- Only run if the Job is scheduled
     NamedCache nCache =
        CacheFactory.getCache(CoherenceTrigger.CACHE_NAME);

     Member member =
        nCache.getCacheService().getCluster().getLocalMember();
     nCache.invoke(context.getJobDetail().getFullName(),
           new JobExecutor(context, member));
   }

   public abstract void process(JobExecutionContext context);

   private class JobExecutor extends AbstractProcessor implements Serializable {

    private transient JobExecutionContext context;
    private Member member;

    public JobExecutor(JobExecutionContext context, Member member) {
      this.context = context;
      this.member = member;
    }

    public Object process(InvocableMap.Entry entry) {
      String configName = System.getProperty("tangosol.coherence.cacheconfig");
      if (configName == null) {
        configName = "coherence-cache-config.xml";
      }
      InvocationService iS =
          (InvocationService) new DefaultConfigurableCacheFactory(configName).
            ensureService("JobInvocationService");
            iS.query(new JobProcessor(context), Collections.singleton(member));
      return null;
    }
}

private class JobProcessor implements Invocable {

    private JobExecutionContext context;
    private transient InvocationService iS;

    public JobProcessor(JobExecutionContext context) {
      this.context = context;
    }

    public void init(InvocationService invocationService) {
      iS = invocationService;
    }

    public void run() {
     System.out.println("Processing the real work");
     process(context);
    }

    public Object getResult() {
     return null;
    }
  }
}
So what about the Client? Here you go:


public class TimerTest extends TestCase {
   public TimerTest(String sTestName) {
     super(sTestName);
   }

   public static void main(String args[]) {
     Service service = CacheFactory.getConfigurableCacheFactory().
                             ensureService("DistributedTimerService");
     service.addServiceListener(new MyTimerListener ());
 
     try {
      System.in.read();
     } catch (Exception exp) {
        exp.printStackTrace();
     }
     System.exit(1);
   }

   protected void setUp() throws Exception {
     super.setUp();
   }

   protected void tearDown() throws Exception {
     super.tearDown();
   }

   private static class MyTimerListener implements TimerListener {

     public void serviceStarting(ServiceEvent serviceEvent) {
     System.out.println("Service Starting: " + serviceEvent.getId());
   }

   public void serviceStarted(ServiceEvent serviceEvent) {
     System.out.println("Service Started: " + serviceEvent.getId());    
   }

   public void serviceStopping(ServiceEvent serviceEvent) {}

   public void serviceStopped(ServiceEvent serviceEvent) {
     System.out.println("Service Stopped: " + serviceEvent.getId());
   }
  }
}
The problem with the last TimerService is that only those instances will dispatch events that has listeners registered in it's own JVM. With this solution TimerListeners are only registered at the client. As this TimerService is a single instance per classloader there has to be a mechanism to dispatch changes in one to other nodes. One solution is to piggyback ServiceEvents on top of MapEvents. With this change the TimerService becomes:


public class TimerService implements Service, MapListener {

 private NamedCache nCache = null;
 private Collection listeners = new ArrayList();

 public TimerService() {}

 public void addServiceListener(ServiceListener serviceListener) {
     nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME);
     nCache.addMapListener(this);
     nCache.getCacheService().addServiceListener(serviceListener);
     listeners.add((TimerListener)serviceListener);
 }

 public void removeServiceListener(ServiceListener serviceListener) {
     nCache = CacheFactory.getCache(CoherenceTrigger.CACHE_NAME);
     nCache.getCacheService().removeServiceListener(serviceListener);
     listeners.remove(serviceListener);
 }
...
public void entryInserted(MapEvent mapEvent) {
     notifyOtherTimerServices(mapEvent);
}

public void entryUpdated(MapEvent mapEvent) {
    notifyOtherTimerServices(mapEvent);
}

public void entryDeleted(MapEvent mapEvent) {}

private void notifyOtherTimerServices(MapEvent mapEvent) {
    String newValue =
      ((TimerStatusUpdateProcessor.Status)mapEvent.getNewValue()).name();

    String oldValue = "";
    if (mapEvent.getOldValue() != null) {
      oldValue =
       ((TimerStatusUpdateProcessor.Status)mapEvent.getOldValue()).name();
    }
    if (!(oldValue.equals("") && newValue.equals("COMPLETED"))) {
      int s =
        (newValue ==
         TimerStatusUpdateProcessor.Status.valueOf("SCHEDULED").name()) ?
         TimerEvent.SERVICE_STARTED : TimerEvent.SERVICE_STOPPED;
      TimerEvent tE = new TimerEvent(this, s);
      Collection listeners = this.getListeners();
      for (TimerListener listener : listeners) {
        tE.dispatch(listener);
      }
    } 
  }
}
Rest is left on your creativity. The entire project can be downloaded from Here... Enjoy!

Monday, July 13, 2009

GM Ad

Our drive to success is as smooth as our vehicles drive. We are the New General Motors.