With WebSphere® eXtreme Scale, you can provide a first-in first-out (FIFO) queue-like capability for all maps. WebSphere eXtreme Scale tracks the insertion order for all maps. A client can ask a map for the next unlocked entry in a map in the order of insertion and lock the entry. This process allows multiple clients to consume entries from the map efficiently.
Session session = ...; ObjectMap map = session.getMap("xxx"); // this needs to be set somewhere to stop this loop boolean timeToStop = false; while(!timeToStop) { session.begin(); Object msgKey = map.getNextKey(5000); if(msgKey == null) { // current partition is exhausted, call it again in // a new transaction to move to next partition session.rollback(); continue; } Message m = (Message)map.get(msgKey); // now consume the message ... // need to remove it map.remove(msgKey); session.commit(); }
For client mode, if the Java™ virtual machine (JVM) is a client, then the client initially connects to a random partition primary. If no work exists in that partition, then the client moves to the next partition to look for work. The client either finds a partition with entries or loops around to the initial random partition. If the client loops around to the initial partition, then it returns a null value to the application. If the client finds a partition with a map that has entries, then it consumes entries from there until no entries are available for the timeout period. After the timeout passes, then null is returned. This action means that when null is returned and a partitioned map is used, then it you should start a new transaction and resume listening. The previous code sample fragment has this behavior.
Job.java package tutorial.fifo; import com.ibm.websphere.projector.annotations.Entity; import com.ibm.websphere.projector.annotations.Id; @Entity public class Job { @Id String jobId; int jobState; }
JobEvent.java package tutorial.fifo; import com.ibm.websphere.projector.annotations.Entity; import com.ibm.websphere.projector.annotations.Id; import com.ibm.websphere.projector.annotations.OneToOne; @Entity public class JobEvent { @Id String eventId; @OneToOne Job job; }The job has as ID and state, which is an integer. Suppose you want to increment the state whenever an event arrived. The events are stored in the JobEvent Map. Each entry has a reference to the job the event concerns. The code for the listener to do this looks like the following example:
JobEventListener.java package tutorial.fifo; import com.ibm.websphere.objectgrid.ObjectGridException; import com.ibm.websphere.objectgrid.ObjectMap; import com.ibm.websphere.objectgrid.Session; import com.ibm.websphere.objectgrid.em.EntityManager; public class JobEventListener { boolean stopListening; public synchronized void stopListening() { stopListening = true; } synchronized boolean isStopped() { return stopListening; } public void processJobEvents(Session session) throws ObjectGridException { EntityManager em = session.getEntityManager(); ObjectMap jobEvents = session.getMap("JobEvent"); while(!isStopped()) { em.getTransaction().begin(); Object jobEventKey = jobEvents.getNextKey(5000); if(jobEventKey == null) { em.getTransaction().rollback(); continue; } JobEvent event = (JobEvent)em.find(JobEvent.class, jobEventKey); // process the event, here we just increment the // job state event.job.jobState++; em.getTransaction().commit(); } } }
The listener is started on a thread by the application. The listener runs until the stopListening method is called. The processJobEvents method is run on the thread until the stopListening method is called. The loop blocks waiting for an eventKey from the JobEvent Map and then uses the EntityManager to access the event object, dereference to the job and increment the state.
The EntityManager API does not have a getNextKey method, but the ObjectMap does. So, the code uses the ObjectMap for JobEvent to get the key. If a map is used with entities then it does not store objects anymore. Instead, it stores Tuples; a Tuple object for the key and a Tuple object for the value. The EntityManager.find method accepts a Tuple for the key.
em.getTransaction().begin(); Job job = em.find(Job.class, "Job Key"); JobEvent event = new JobEvent(); event.id = Random.toString(); event.job = job; em.persist(event); // insert it em.getTransaction().commit();You find the job for the event, construct an event, point it to the job, insert it in the JobEvent Map and commit the transaction.