Wednesday, November 09, 2011

Coherence Transaction, Semaphore and a Joke

A DBA walks in a NOSQL bar hangs around for a few minutes, turns back and leaves. He couldn't find a table. [Quoted]

When the Data Grids came to existence it tried to solve a different set of problems that a typical relational database was not geared to address. Some failed trying imitating a Database, some remained distributed cache managers and some just got lost in space. The ones who succeeded were the ones who remained focused in solving the performance, scalability, manageability and predictability of the application state in the middle tier - actually all of them. But as they continued to sink in the Enterprise application strategies one question remained to completely solve - Transactions. Oracle Coherence couldn't defer a solution for too long either and announced a new cache scheme in its v3.6 to support true transactions extending and then deprecating its TransactionMap. Before I show how this new scheme could be used in one way lets talk about how some of these transactional problems were addressed earlier.

If an analogy has to be made between a Database and Coherence, we can think of cache schemes as a database schema and the caches as database tables.

Different solutions popped up for different level of sensitivities of desired guarantees.

Problem: Update Person and its Address together

Solving it in domain model
Solving it in the scheme
Solving it in process and, 
Solving it correctly

Domain Model
Coherence EntryProcessors provide an unique guarantee - Without an use of explicit lock only one process can modify an Entry at a given time no matter how many backups of that Entry is maintained in the grid. And as node fails the in-flight executions will move to the new owners of the partitions as Entries are repartitioned. For the clients as if nothing has happened other than some delays.
public class Person {
   private Address address;
}
NamedCache nCache = CacheFactory.getCache("Person");
nCache.put ("person1", new Person());

nCache.invoke("person1", new PersonAddressUpdaterEntryProcessor());
made sure that Person and his address were updated together while the Person was already locked. The key to this solution lies in the domain model by hiding any direct access to the Address by encapsulating it inside the Person who owns it.
But what if everytime a Person is updated its not always that his Address is touched? This requires that Person and Address can be split into multiple caches and for cases when only Person is updated the deserialization cost can be reduced.

Key Association and Transaction Lite
Everyone who has bought or rented a place knows the following three words - Location, Location, Location. When it comes to Transactions its colocation, colocation, colocation. Move related data together so that challenges of failures like network, nodes and partial failures can be contained with in a single node. In pre v3.7, not completely safe in all scenarios but as long as Addresses are not directly updated a clever solution could be used to update them together.
public class Person {
 private String id;
 public String getKey() {
    return id;
 }
}
public class Address implements KeyAssociation {
 public Address (String personId) { .. }
 public Object getAssociatedKey () {
    return personId;
 }
}
NamedCache pCache = CacheFactory.getCache("Person");
NamedCache aCache = CacheFactory.getCache("Address");

pCache.put ("person1", new Person());
aCache.put ("address1", new Address("person1"));
nCache.invoke ("person1", new MyEntryProcessor());

The key association between the Person and the Address will make sure that no matter if two different caches are used to put these two Entries they will end up together as long as these two caches use the same cache scheme. And then the following:
public class MyEntryProcessor implements InvocableMap.EntryProcessor {
public Object process (InvocableMap.Entry entry) {
  final BackingMapManagerContext addressCtx = ((BinaryEntry) entry).getContext();
  Map map = addressCtx.getBackingMap ("Address");
  Address address = map.get ("address1");
  ...
  entry.setValue(...);
}
}
This works as long as no other process updates the Address while the Person is locked by the EntryProcessor.
If you are already using v3.7 the getBackingMap() API has been deprecated and replaced with thread safe getBackingMapEntry() API and this new framework is called TransactionLite.  If Entries could be co-located then Transaction Lite is worth looking at.
Following code replaces the one above with TransactionLite framework:

// -- Assuming EntryProcessor is executed on "Person" cache and the address in the "Address" cache with its key AddressKey key associated to the Person's key.

final BackingMapManagerContext ctx = ((BinaryEntry) entry).getContext();
BackingMapContext backingMapContext = ctx.getBackingMapContext("Address");
Map.Entry backingMapEntry = backingMapContext.getBackingMapEntry(
                ctx.getKeyToInternalConverter().convert(new AddressKey(personId)));
((BinaryEntry) backingMapEntry).setValue(.., true);


A dirty little way if you know your finger crossing has yielded positive results
Problem of transactions become severe when associated objects are meant to be updated in parallel. What if the same Address is being shared by multiple People? Using KeyAssociation is not recommended. As more and more People share the same Address (like a Dorm) KeyAssociation will make the partition bloated and unevenly balanced something that preferably be avoided. If provisioning is done right (or even over provisioned) and cluster nodes appear to not fail and atomicity is not absolutely required but good to have then an EntryProcessor can be invoked with in another EntryProcessor as long as the Person and Address caches use different cache schemes (Different service names). But all bets are off once say Address's EP succeeds but the Person's fail. Setting the backup count to '0' would minimize the error window but still this is just not a solution. Coherence did have a now deprecated API TransactionMap that allowed multiple operations to be committed in a single transaction albeit nothing goes wrong as described here. This API has now been deprecated and replaced with the Transaction scheme.

Transactional Scheme
A new cache scheme has been added post v3.6 that allows updates to multiple cache entries in a single transaction as long as the caches belong to the same transactional scheme. Make sure following cache scheme is defined:
<cache-config> ...
<caching-schemes>
 <transactional-scheme>
     <scheme-name>transactional-scheme</scheme-name>
      <service-name>TransactionalCache</service-name>
      <autostart>></autostart>
  </transactional-scheme>
</caching-schemes>
</cache-config>

If Person and Address use the same TransactionalCache scheme


This is a simpler problem to solve with the current scheme.
DefaultConnectionFactory factory = new DefaultConnectionFactory();
Connection connection = factory.createConnection("TransactionalCache");
connection.setAutoCommit (false);
connection.setIsolationLevel(Isolation.READ_COMMITTED);

OptimisiticNamedCache personCache = connection.getNamedCache("Person");
OptimisticNamedCache addressCache = connection.getNamedCache("Address");

Person p = personCache("p1");
Address a = addressCache("a1");

update(p, a);

try {
 connection.commit();
} catch (..) { }
finally {
  connection.close();
}

Twist - Person and Address in two different cache schemes

Modified Problem: Update two Persons in a single Transaction and then update the Address if the previous transaction succeeds

This is tricky and this is what I call a dependency transaction

Lets take a twist and introduce a new type of InvocableMap.Entry:
public interface TransactionalEntry extends InvocableMap.Entry { 
  Status status getTxStatus();
  Object getAnotherEntry (Object key); 
  void update (Object key, Object value, Filter predicate);
}


Lets use this new interface in line with running an EntryProcessor:
public class SomeUtilClass {
   public Object invoke (final K key, final InvocableMap.EntryProcessor processor) {
   final DefaultConnectionFactory factory = new DefaultConnectionFactory();
   final Connection connection = factory.createConnection ("TransactionalCache");
   connection.setAutoCommit(false);
   connection.setIsolationLevel (Isolation.READ_COMMITTED);

   final TransactionState state = connection.getTransactionState();

   TransactionEntry entry = new TransactionalEntry () {
     private final OptimisticNamedCache oCache = connection.getNamedCache(cacheName);

     @Override
     public Object getKey() {
       return key;
     }

     @Override
     public Object getValue() {
       return oCache.get(key);
     }

     @Override
     public Object getAnotherEntry(Object relatedKey) {
       return oCache.get(relatedKey);
     }

     @Override
     public void update(Object key, Object value, Filter predicate) {
       oCache.update(key, value, predicate);
     }

     @Override
     public Status getTransactionStatus() {
       return state.getStatus();
     }

     // -- Other methods
     ...
  };
    try {
        processor.process (entry);
        connection.commit();
    } catch (Exception exp) {
         connection.rollback();
    } finally {
          connection.close();
          synchronized (entry.getTransactionStatus()) {
                entry.getTxStatus().notifyAll();
           }
    }

}

}
Now which processor is passed in the previously declared invoke method? It is an EntryProcessor that is not necessarily executed as an EntryProcessor but this model could gain some solid points in the design consistency.
public class MyProcessor implements InvocableMap.EntryProcessor {
   private final ValueExtractor extractor = new Reflectionxtractor ("currentVersion", ...);

 public Object process (final InvocableMap.Entry entry) {
   final TransactionalEntry txEntry = (TransactionalEntry) entry;
   final Person firstPerson = (Person) txEntry.getValue();
   Filter predicate = new WhateverFilter (extractor, firstPerson.currentVersion());
   firstPerson.incrementVersion();

   Person anotherPerson = (Person) txEntry.getAnotherEntry ("anotherPersonKey");
   anotherPerson.incrementVersion();

   // -- This needs to be transactional
   doSomething (firstPerson, anotherPerson);

   txEntry.update (entry.getKey(), firstPerson, predicate);

     // -- Use the Status as a Semaphore
   executerService.execute (new Runnable () {
         @Override
         public void run () {
            Status status = ((TransactionalEntry) entry).getTransactionStatus();
            synchronized (status) {
              try {
                status.wait();
              } catch (InterruptedException i) { ... }
                     
              switch (status) {
                 case COMMITTED:
                   // -- Update the Address.
                   // -- Now the two Person objects are guaranteed to be single transactionally
                   // -- updated and address could have been done in the same way, had address
                   // -- used the same Transactional scheme. 
                   // -- If Address is in a different cache scheme and the system is provisioned
                   // -- right that put() succeeds then the Address will only get updated after
                   // -- Person(s) have been successfully updated. 
                   // -- The updateAddress() could use another EntryProcessor 
                   updateAddress (address);
                   break;
                 case ROLLEDBACK:
                   break;
                 default:
              }
            }
         }
      });
    return firstPerson;
 }
}
Enjoy!