private void dispatchQueueEvent(MapEvent mapEvent) {
EventListener[] eList =
m_listenerSupport.getListeners(AlwaysFilter.INSTANCE).listeners();
int size = eList.length;
MapListener mListener = (MapListener) eList[Base.getRandom().nextInt(size)];
mapEvent.dispatch(mListener);
} The method collects all the registered Listeners on that Cache node, picks one from the list randomly and dispatches the Map Event to it. Second component is a Custom NamedCache that extends Coherence's WrapperNamedCache. The key method is it's addMapListener (). public void addMapListener(MapListener listener, Filter filter, boolean fLite) {
if (singleListener == null) {
singleListener = new InternalListener();
}
m_listenerSupport.addListener(listener, AlwaysFilter.INSTANCE, false);
super.addMapListener(singleListener, filter, fLite);
} And then at the end an EntryProcessor that makes sure even if Listeners are distributively registered one and only one of those Listeners receive the message. This is done by setting an event dispatch state that every thread checks against before dispatching the event. The class is pretty simple as well:More details with more source code has been provided at http://sites.google.com/site/miscellaneouscomponents/Home/ezmq
private class MLSEntryProcessor implements InvocableMap.EntryProcessor, Serializable {
private MapEvent mapEvent;
public MLSEntryProcessor(MapEvent mapEvent) {
this.mapEvent = mapEvent;
}
public Object process(InvocableMap.Entry entry) {
String state = (String) entry.getValue();
if (state == null) {
try {
dispatchQueueEvent(mapEvent);
entry.setValue(STATE.DISPATCHED.name(), true);
} catch (Exception exp) {
exp.printStackTrace();
}
}
return null;
}
public Map processAll(Set set) {
return Collections.EMPTY_MAP;
}
}
Enjoy!
No comments:
Post a Comment