Sunday, November 01, 2009

Functional Programming with Coherence

If you come across the following, don't get surprised. Its called Functional programming.

object FP {
def doSomething (callback: () => Unit) {
callback();
}
def letsPrint() {
println ("Ha ha!");
}
def main (args: Array[String]) {
doSomething (letsPrint)
}
}
Functions in Scala are treated as Objects that can be passed around. The concept of Object has still not changed - Its a place holder of application state. So in Functional Programming Model, functions are states and if it is legal then why not manage them in a state repository like Oracle Coherence?

Problem Statement: Executing new Entry Processors without having to deploy them in the cluster and continue to achieve 100% up time?

In a simpler English it means how can I push new EntryProcessor(s) and execute them without having to bounce the cluster to deploy the new class. The solution lies in the concept of Functional Programming - Passing functions to the Executor but this time slightly in a different way. Lets again, begin with a cache configuration:
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>EPFeeder</cache-name>
<scheme-name>feeder-scheme</scheme-name>
</cache-mapping>
<cache-mapping>
<cache-name>EPCache</cache-name>
<scheme-name>ep-scheme</scheme-name>
</cache-mapping>
</caching-scheme-mapping>

<caching-schemes>
<class-scheme>
<scheme-name>ep-scheme</scheme-name>
<class-name>DynaEPCache</class-name>
<init-params>
<init-param>
<param-type>string</param-type>
<param-value>{cache-name}</param-value>
</init-param>
<init-param>
<param-type>string</param-type>
<param-value>coherence-cache-config.xml</param-value>
</init-param>
</init-params>
</class-scheme>
<distributed-scheme>
<scheme-name>feeder-scheme</scheme-name>
<service-name>EPFeederScheme</service-name>
<backing-map-scheme>
<read-write-backing-map-scheme>
<internal-cache-scheme>
<local-scheme>
<high-units>100MB</high-units>
<listener>
<class-scheme>
<class-name>EPListener</class-name>
</class-scheme>
</listener>
</local-scheme>
</internal-cache-scheme>
</read-write-backing-map-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>

<proxy-scheme>
...
</proxy-scheme>

<invocation-scheme>
<scheme-name>InvocationService</scheme-name>
<service-name>InvocationService</service-name>
<thread-count>2</thread-count>
<autostart>true</autostart>
</invocation-scheme>
</caching-schemes>
</cache-config>

EPFeeder Cache is where we store the Functions (Implementations of process() method of the EntryProcessor) against the EP's class name. EPCache is a demo cache where your data resides and you would be executing new EPs against. Next Step is to see how the client code will look like:
public class DynaCUtilTest extends TestCase {
....
private String getEPImpl() {

StringBuffer sBuffer = new StringBuffer();
sBuffer.append("public Object process (Entry entry) {");
sBuffer.append ("System.out.println(\"In process\");");
sBuffer.append ("System.out.println(\"Key:\" + entry.getKey());");
sBuffer.append ("System.out.println(\"Value:\" + entry.getValue());");
sBuffer.append("return null;");
sBuffer.append("}");

return sBuffer.toString();
}

public void testCreateEP()
String impl = getEPImpl();
String clzName = "EPClass_v1";
NamedCache eCache = CacheFactory.getCache("EPFeeder");
eCache.put(clzName, impl);

NamedCache nCache = CacheFactory.getCache("EPCache");
nCache.invoke("A", new EPClass_v1());
}
}
getEPImpl () is the implementation that we would replace the process() method with as we feed new EP classes, in this case the first version of it named EPClass_v1. So what happens next?
When a new implementation is put in the EPFeeder Cache, A Backing Map Listener picks up this event and creates a new class (EntryProcessor) on all the cluster members dynamically using an Invocation Service. This step achieves 100% up time for Coherence. The Backing Map Listener (EPListener) looks something like this:

public class EPListener extends MultiplexingMapListener {

public EPListener() {
}

protected void onMapEvent(MapEvent mapEvent) {
String key = ...;
String impl = ...;
InvocationService iS =
(InvocationService)CacheFactory.getService("InvocationService");
Invocable inv = new EPCreator (key, impl);
// -- Create a new Class on all nodes
Set set = CacheFactory.getCluster ().getMemberSet();
iS.query(inv, set);
}

}
The core of this listener is the magic EPCreator Invocable but before we look at the EPCreator let see an EPInterface:
import com.tangosol.util.InvocableMap;

public interface EPInterface extends InvocableMap.EntryProcessor {
}
The most critical piece of this puzzle is the Invocable and how it does its magic. The EPCreator executes the following in it's run() method. Lets put it in its own Util class (DynaCUtil):
    public static Class createEP(String clzName, String impl) {
ClassPool pool = ClassPool.getDefault();
pool.importPackage("com.tangosol.util.InvocableMap.EntryProcessor");
pool.importPackage("com.tangosol.util.InvocableMap.Entry");
Class clz = null;
CtClass eClass = null;
boolean shouldCreate = false;
try {
eClass = pool.get(clzName);
} catch (NotFoundException e) {
shouldCreate = true;
}
if (shouldCreate) {
eClass = pool.makeClass(clzName);
eClass.setInterfaces(new CtClass[] {
pool.makeClass("EPInterface") });
try {
eClass.addConstructor(CtNewConstructor.defaultConstructor(eClass));
} catch (CannotCompileException e) {
e.printStackTrace();
}
try {
eClass.addMethod(CtNewMethod.make(impl, eClass));
StringBuffer sBuffer = new StringBuffer ();
sBuffer.append ("public java.util.Map processAll(java.util.Set set) {");
sBuffer.append ("System.out.println(\"In processAll\");");
sBuffer.append ("return java.util.Collections.EMPTY_MAP;");
sBuffer.append ("}");
eClass.addMethod (CtNewMethod.make (sBuffer.toString(), eClass));

} catch (CannotCompileException e) {
e.printStackTrace();
}
try {
clz = eClass.toClass();
} catch (CannotCompileException e) {
e.printStackTrace();
}
}

What the heck was it? The Invocable uses JavaAssist to create a new EntryProcessor on the fly. Now the last question is as Coherence is a self-healing system where new nodes can join and leave anytime, how to make sure new EP Classes are available to the new nodes? And the answer is a Custom NamedCache which also is the last piece in the puzzle. The class would look something like the following:

public class DynaEPCache extends WrapperNamedCache {

public Object invoke(Object oKey, InvocableMap.EntryProcessor agent) {
String name = agent.getClass().getName();
createEP (name, (String) CacheFactory.getCache("EPFeeder").get (name));
return super.invoke(oKey, agent);
}

public Map invokeAll(Collection collKeys, EntryProcessor agent) {
....
}

private void createEP (String name, String impl) {
if (impl == null) {
throw new RuntimeException ("EntryProcessor not created yet!");
}
DynaCUtil.createEP(name, impl);
}
}

A much more advanced implementation is sitting on my laptop that a pieces I will soon upload to http://sites.google.com/site/miscellaneouscomponents/Home. In the meantime just Enjoy!

2 comments:

Anonymous said...

Hi Ashish

Interesting approach. I've done a similar thing with Groovy.

Cheers
Rick

Ashish said...

Brian suggested to use an Exector service and offload the class creation to another thread in the MapListener that is configured as a BML to avoid any latency introduced by this processing.