Wednesday, April 22, 2009

Push Replication - You know it alright

So you have heard about it alright here, here, here, this one too, here and even here and of course a hero of numerous talks at NYSIG, UKSIG and more. I have had opportunities to work on multi-cluster Coherence solutions at many client locations and also be part of a project whose one member is an active contributor to Push Replication. So what new do I have to bring on table? A working example for Incubator junkies for one. If you have followed Brian Oliver's write-up at Push Replication page I am sure you already know how to set this up and get running in just a few minutes. So I thought to spice it up a little. So lets build the following:
Setting up Active-Activen clusters
Active/Active is pretty similar to how we set up Active/Passive clusters but it needs some special classes to be used. First, make sure SafePublishingCacheStore is configured in the cache config and as we register a publisher we use SafeLocalCachePublisher instead of LocalCachePublisher.

Making use of introduce element in cache configuration

Start using introduce tag while writing coherence cache config. This is one of the very useful features introduced in coherence common incubator pattern. introduce tag allows us re-use of common cache configurations and is as simple as:
<cache-config>
<introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml"/>
</cache-config>


Dynamic subscription of subscriber clusters
One architecture that Push replication supports and possibly the most popular one as well, is a hub-n-spoke model. In the hub-n-spoke model not only the spoke clusters know about the hub but the hub knows about all the clusters on the spokes as well at least at the time of deployment. This "knowledge" of the other cluster is in shape of a set of remote-cache-schemes. Recently I came across a requirement where the number of spoke clusters were not known at the time of deployment. This ever expanding subscriber cluster introduces new challenges to Push replication deployments. Coherence is all about 100% up time and stopping the hub every time a new subscriber cluster joins, is not a preferable deployment. So lets introduce how new clusters can dynamically join the hub so that hub does not know about the subscriber but subscriber knows about the hub.

Lets start with some cache configurations and how it will look in a production environment. The following samples are part of a proof of concept and there is a scope of a few tweakings.

Coherence Cache Configuration on the hub
<?xml version="1.0" encoding="windows-1252" ?>
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
 <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml" />
  <caching-scheme-mapping>
  </caching-scheme-mapping>
 <caching-schemes>
     <proxy-scheme>
         <scheme-name>proxy-scheme</scheme-name>
         <service-name>ProxyCacheScheme</service-name>
         <acceptor-config>
             <tcp-acceptor>
                 <local-address>
                     <address>localhost</address>
                     <port>20000</port>
                     <reusable>true</reusable>
                 </local-address>
                 <keep-alive-enabled>true</keep-alive-enabled>
             </tcp-acceptor>
          </acceptor-config>
         <autostart>true</autostart>
     </proxy-scheme>            
 </caching-schemes>
</cache-config>
Hmm... The only configuration the hub cache config has is proxy-scheme. The hub does not know anything about who the subscribers will be. Will explain later how it will be done, scroll down.

coherence-pushreplicationpattern-cache-config.xml
Its pretty much the same as seen when you download the push replication project, just replace PublishingCacheStore with SafePublishingCacheStore.

Subscriber Cluster Cache Configuration
Cache configuration deployed on the subscriber cluster looks a little more complete as subscriber knows about which hub it has to connect to. The configuration looks like:
<cache-config>
 <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml" />
 
 <caching-scheme-mapping>
 </caching-scheme-mapping>
 <caching-schemes>
     <proxy-scheme>
         <scheme-name>proxy-scheme</scheme-name>
         <service-name>ProxyCacheScheme</service-name>
         <acceptor-config>
             <tcp-acceptor>
                 <local-address>
                     <address>localhost</address>
                     <port>9099</port>
                     <reusable>true</reusable>
                 </local-address>
                 <keep-alive-enabled>true</keep-alive-enabled>
             </tcp-acceptor>            
         </acceptor-config>
         <autostart>true</autostart>        
     </proxy-scheme>    
 
     <remote-invocation-scheme>
         <scheme-name>RemoteSiteInvocationService</scheme-name>
         <service-name>RemoteSiteInvocationService</service-name>
         <initiator-config>
             <tcp-initiator>
                 <remote-addresses>
                     <socket-address>
                         <address>localhost</address>
                         <port>20000</port>
                     </socket-address>
                 </remote-addresses>
             </tcp-initiator>
         </initiator-config>    
     </remote-invocation-scheme>
 
     <invocation-scheme>
         <scheme-name>invocation-scheme</scheme-name>
         <service-name>InvocationService</service-name>
         <autostart>true</autostart>
     </invocation-scheme>
 </caching-schemes>

</cache-config>


Dynamic Registration of Subscriber Cluster
The following example is a very scaled down sample tuned to be run on a single machine. Change it as needed. Also the following class should be made a MBean so that it can be executed from a JMX console.
Run:
java -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.clusteraddress=<subscriber-multicast-ip> -Dtangosol.coherence.cacheconfig=subscriber-cache-config.xml PushReplicationClient <proxy-port> <subscriber-name>
public class PushReplicationClient implements Serializable {

 public PushReplicationClient() {
 }

 public static void main(String[] args) {
     PushReplicationClient pC = new PushReplicationClient();
     String cacheName = "publishing-cache";
     String remoteServiceName = args[1];
     NamedCache nCache = CacheFactory.getCache(cacheName);
     PublisherRegistrationTask rTask =
                          pC.new PublisherRegistrationTask(cacheName,
                                        remoteServiceName, remoteServiceName);
     InvocationService iS =
         (InvocationService) CacheFactory.getService("RemoteSiteInvocationService");
     iS.query(pC.new SubscriberTask(remoteServiceName, Integer.parseInt(args[0])),
              null);
     iS.query(rTask, null);
     Member sM = CacheFactory.getCluster().getOldestMember();
     InvocationService isLocal =
         (InvocationService) CacheFactory.getService("InvocationService");
     rTask = pC.new PublisherRegistrationTask(cacheName,
                          "RemoteSiteInvocationService", remoteServiceName);
                                                                              
     isLocal.execute(rTask, new HashSet(Collections.singletonList(sM)), null);

 }

 private class PublisherRegistrationTask implements Invocable {

     private String cacheName;
     private String serviceName;
     private String publisherName;

     public PublisherRegistrationTask(String cacheName, String serviceName,
                                      String publisherName) {
         this.cacheName = cacheName;
         this.serviceName = serviceName;
         this.publisherName = publisherName;
     }

     public void init(InvocationService invocationService) {

     }

     public void run() {
         PushReplicationManager pM =
             DefaultPushReplicationMananger.getInstance();
         BatchPublisher batchPublisher =
             new RemoteInvocationPublisher(serviceName,
                                           new BatchPublisherAdapter(
                                           new SafeLocalCachePublisher(cacheName)),
                                           true, 10000, 100, 10000, 5);
         pM.registerBatchPublisher(cacheName, publisherName,
                                   batchPublisher);
     }

     public Object getResult() {
         return null;
     }

 }

 private class SubscriberTask implements Invocable {

     private String serviceName;
     private int port;

     public SubscriberTask(String serviceName, int port) {
         this.serviceName = serviceName;
         this.port = port;
     }

     public void init(InvocationService invocationService) {

     }

     public void run() {
         ConfigurableCacheFactory factory =
             CacheFactory.getConfigurableCacheFactory();
         XmlElement root = factory.getConfig();
         XmlElement cS = root.findElement("caching-schemes");
         XmlElement riS = cS.addElement("remote-invocation-scheme");
         riS.addElement("scheme-name").setString(serviceName);
         riS.addElement("service-name").setString(serviceName);
         XmlElement iC = riS.addElement("initiator-config");
         XmlElement tI = iC.addElement("tcp-initiator");
         XmlElement rA = tI.addElement("remote-addresses");
         XmlElement sA = rA.addElement("socket-address");
         sA.addElement("address").setString("localhost");
         sA.addElement("port").setInt(port);
         factory.setConfig(root);
         System.out.println(cS);
     }

     public Object getResult() {
         return null;
     }

 }

}

There are three parts of this class:
  1. Updates the cache configuration deployed on the hub to register itself.
  2. Registers a Publisher on the hub so that changes made in the hub is pushed to this subscriber cluster.
  3. Registers a publisher in the local cluster so that the replication is in Active-Active mode. Changes made in the subscriber cluster also is pushed to the hub's cache and thereafter to other subscriber clusters.
Execute this program for each subscriber cluster that needs to join the hub. Change the subscriber cache configuration accordingly. While I was developing this sample I found an issue in SafeLocalCachePublisher as it was missing a default constructor. A JIRA has been opened and will be fixed in the next Incubator release. In the meantime download the push replication source code and add a default constructor in SafeLocalCachePublisher. So thats pretty much it. Running geographically distributed dynamically subscribed multi-clusters in a hub-n-spoke architecture in less than 10 minutes and then staying up 100% of the time. A complete project with an MBean can be downloaded from Dynamic Push Replication Subscription page. Enjoy!

3 comments:

Kiran said...

Hi Asish,
Nice blog. Do you have the source code for this implementation? I tried to get it from the links and I got an invalid page.

Regards,
Kiran.

Kiran said...

Hi Ashish,

Nice blog! Could you please give the source code for this? I tried to see the source code through the links that you gave on the site but I got a broken link when trying to download the source code.

Regards,
Kiran.

Ashish said...

Hi Kiran,
The link still worked I checked that has some sample implementation. Though I highly suggest work with your Oracle contact for an implementation if you are looking something similar. This was written long time back when the Push Replication was still in its early implementations. I have not seen the latest and am sure this implementation would not work as is with its current release. If you let me know who is the Customer I may ping someone in Oracle who would work with you on this if not already the case.

-Ashish