Sunday, January 18, 2009

Implementing JMS Queue on top of Oracle Coherence

In this series about building JMS on top of reliable and fast Oracle Coherence data grid, I added the functionality of a JMS Queue. Projects like ezMQ re-iterates a fact to perceive Coherence data grid as a high availability System of Record not mere a Cache Provider. The solution to build a JMS Queue is a little tricky compared to implementing a JMS Topic on top of Oracle Coherence. The reason is inherent behavior of Coherence to broadcast the cache events to all Map Listeners. The solution revolves around the following method:

private void dispatchQueueEvent(MapEvent mapEvent) {
EventListener[] eList =
m_listenerSupport.getListeners(AlwaysFilter.INSTANCE).listeners();
int size = eList.length;
MapListener mListener = (MapListener) eList[Base.getRandom().nextInt(size)];
mapEvent.dispatch(mListener);
}
The method collects all the registered Listeners on that Cache node, picks one from the list randomly and dispatches the Map Event to it. Second component is a Custom NamedCache that extends Coherence's WrapperNamedCache. The key method is it's addMapListener ().
    public void addMapListener(MapListener listener, Filter filter, boolean fLite) {
if (singleListener == null) {
singleListener = new InternalListener();
}
m_listenerSupport.addListener(listener, AlwaysFilter.INSTANCE, false);
super.addMapListener(singleListener, filter, fLite);
}
And then at the end an EntryProcessor that makes sure even if Listeners are distributively registered one and only one of those Listeners receive the message. This is done by setting an event dispatch state that every thread checks against before dispatching the event. The class is pretty simple as well:

private class MLSEntryProcessor implements InvocableMap.EntryProcessor, Serializable {

private MapEvent mapEvent;

public MLSEntryProcessor(MapEvent mapEvent) {
this.mapEvent = mapEvent;
}

public Object process(InvocableMap.Entry entry) {
String state = (String) entry.getValue();
if (state == null) {
try {
dispatchQueueEvent(mapEvent);
entry.setValue(STATE.DISPATCHED.name(), true);
} catch (Exception exp) {
exp.printStackTrace();
}
}
return null;
}

public Map processAll(Set set) {
return Collections.EMPTY_MAP;
}

}
More details with more source code has been provided at http://sites.google.com/site/miscellaneouscomponents/Home/ezmq
Enjoy!

No comments: