When a Session is started the Machine sets itself in a Begin state, Each next() then switches the machine from the current state to the next. Processor(s) can be registered at different states of the machine.
The states are Begin -> Work -> Finish -> Null (Sink) and are defined via an Enum:
public enum State { BEGIN, WORK, FINISH; }
Lets start with a Session (Also the state machine)
public class Session
{ private String BEGIN; private String WORK; private String END; /** * Create a Session state machine with three uni-directional states. * BEGIN -> WORK -> FINISH -> Null */ private enum SessionState
{ BEGIN() { @Override public SessionState next() { return WORK; } }, WORK() { @Override public SessionState next() { return FINISH; } }, FINISH() { @Override public SessionState next() { return null; } }; public abstract SessionState next(); } private SessionState STATE = SessionState.BEGIN; private String sessionId; private PropertyChangeSupport propertyChangeSupport; private Session() { propertyChangeSupport = new PropertyChangeSupport(this); } public static Session newSession() { return new Session(); } void next() { synchronized (STATE) { SessionState old = STATE; STATE = STATE.next(); if (STATE != null) { propertyChangeSupport.firePropertyChange(STATE.name(), old, STATE); } } } boolean isValid() { return STATE != null; } String sessionId() { switch (STATE) { case BEGIN: if (sessionId == null) { sessionId = UUID.randomUUID().toString(); } return sessionId; case WORK: if (sessionId == null || sessionId.trim().equals("")) { throw new IllegalStateException("Session Id can not be null in WORK"); } return sessionId; case FINISH: if (sessionId == null || sessionId.trim().equals("")) { throw new IllegalStateException("Session Id can not be null in FINISH"); } return sessionId; default: throw new IllegalStateException("Session has not yet started"); } } void addStateChangeListener(State state, PropertyChangeListener listener) { propertyChangeSupport.addPropertyChangeListener(state.name(), listener); } }
Lets build some processing units - One for Work and the other for Finish states
public class WorkSessionListener implements PropertyChangeListener { private final String sessionId; public WorkSessionListener(String sessionId) { this.sessionId = sessionId; } public void propertyChange(PropertyChangeEvent evt) { System.out.println ("[" + sessionId + " - Working]=" + evt.getOldValue() + " -> " + evt.getNewValue()); } } public class FinishSessionListener implements PropertyChangeListener { private final String sessionId; public FinishSessionListener(String sessionId) { this.sessionId = sessionId; } public void propertyChange(PropertyChangeEvent evt) { System.out.println ("[" + sessionId + " - Finish]=" + evt.getOldValue() + " -> " + evt.getNewValue()); } }
Now lets test it
import org.junit.Test; public class SessionStateChangeTest { @Test public void testSessionStateChange() { final Session session = Session.newSession(); final Session session2 = Session.newSession(); session.addStateChangeListener(State.WORK, new WorkSessionListener(session.sessionId())); session.addStateChangeListener(State.FINISH, new FinishSessionListener(session.sessionId())); session2.addStateChangeListener(State.WORK, new WorkSessionListener(session2.sessionId())); Thread t1 = new Thread(new Runnable() { public void run() { while (session.isValid()) { session.next(); } } }); Thread t2 = new Thread(new Runnable() { public void run() { while (session2.isValid()) { session2.next(); } } }); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException exp) { } } }
We should see two BEGIN -> WORK and one WORK -> FINISH messages for two session ids.
Enjoy!