FIFO キューとしてのマップ

WebSphere® eXtreme Scale を使用すると、すべてのマップに first-in first-out (FIFO) キューと類似する機能を持たせることができます。WebSphere eXtreme Scale は、すべてのマップの挿入順序を追跡します。クライアントはマップに対して、マップ内への挿入順序で次のアンロック済みエントリーを要求し、そのエントリーをロックすることができます。このプロセスにより、複数のクライアントが、そのマップのエントリーを効率的に消費できるようになります。

FIFO の例

以下のコード・スニペットは、マップが使い切られるまで、マップからエントリーを処理するループに入るクライアントを示しています。このループはトランザクションを開始してから、ObjectMap.getNextKey(5000) メソッドを呼び出します。このメソッドは、次に使用可能なアンロック済みエントリーのキーを戻して、これをロックします。トランザクションが 5000 ミリ秒を超えてブロックされていると、メソッドはヌルを戻します。
Session session = ...;
ObjectMap map = session.getMap("xxx");
// this needs to be set somewhere to stop this loop
boolean timeToStop = false;

while(!timeToStop)
{
  session.begin();
  Object msgKey = map.getNextKey(5000);
  if(msgKey == null)
  {
    // current partition is exhausted, call it again in
    // a new transaction to move to next partition
    session.rollback();
    continue;
  }
  Message m = (Message)map.get(msgKey);
  // now consume the message
  ...
  // need to remove it
  map.remove(msgKey);
  session.commit();
}

ローカル・モードとクライアント・モードの比較

アプリケーションがクライアントではなくローカル・コアを使用している場合は、上述したメカニズムで処理が行われます。

クライアント・モードでは、Java 仮想マシン (JVM) がクライアントである場合、そのクライアントは、まずランダムのプライマリー区画に接続します。その区画に作業が存在しなければ、クライアントはその作業を求めて次の区画に移動します。クライアントは、エントリーの存在する区画を検出するか、最初のランダム区画の周辺でループするかのいずれかとなります。最初の区画の周辺でループすることになった場合のクライアントは、アプリケーションにヌル値を戻します。エントリーのあるマップを持つ区画を検出した場合のクライアントは、そのタイムアウト期間に使用可能なエントリーがなくなるまで、そのマップのエントリーを消費します。タイムアウトになると、ヌルが戻されます。このアクションでは、区画に分割されたマップが使用されている場合にヌルが戻されると、新規トランザクションを開始して listen を再開することになります。 前述のコード例の断片は、このように振る舞います。

クライアントとしての実行中に、キーが戻されると、その時点では、該当のトランザクションは、そのキーのエントリーを持つ区画にバインドされています。そのトランザクション中に他のマップの更新を行わなければ、問題はありません。 更新を行う場合は、キーを取得したマップと同じ区画にあるマップのみ更新可能です。getNextKey メソッドから戻されたエントリーは、その区画内にある関連データを検出する方法をアプリケーションに示す必要があります。 例えば、イベントとそのイベントの影響を受けるジョブの 2 つのマップがあるとします。 以下のエンティティーでこの 2 つのマップを定義します。
Job.java
package tutorial.fifo;

import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;

@Entity
public class Job
{
	@Id String jobId;

	int jobState;
}
JobEvent.java
package tutorial.fifo;

import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
import com.ibm.websphere.projector.annotations.OneToOne;

@Entity
public class JobEvent
{
	@Id String eventId;
	@OneToOne Job job;
}
ジョブには ID と状態 (整数) があります。 イベントが着信したら状態を増分するとします。 イベントは JobEvent マップに保管されています。 エントリーには、そのイベントが関与するジョブへの参照があります。リスナーがこれを実行するためのコードは、以下の例のようになります。
JobEventListener.java
package tutorial.fifo;

import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.em.EntityManager;

public class JobEventListener
{
	boolean stopListening;

	public synchronized void stopListening()
	{
		stopListening = true;
	}

	synchronized boolean isStopped()
	{
		return stopListening;
	}

	public void processJobEvents(Session session)
		throws ObjectGridException
	{
		EntityManager em = session.getEntityManager();
		ObjectMap jobEvents = session.getMap("JobEvent");
		while(!isStopped())
		{
			em.getTransaction().begin();

			Object jobEventKey = jobEvents.getNextKey(5000);
			if(jobEventKey == null)
			{
				em.getTransaction().rollback();
				continue;
			}
			JobEvent event = (JobEvent)em.find(JobEvent.class, jobEventKey);
			// process the event, here we just increment the
			// job state
			event.job.jobState++;
			em.getTransaction().commit();
		}
	}
}

リスナーは、スレッド上でアプリケーションによって開始されています。リスナーは、stopListening メソッドが呼び出されるまで実行されます。 つまり、stopListening メソッドが呼び出されるまで、processJobEvents メソッドがスレッド上で実行されるということです。 ループ・ブロックは JobEvent マップからの eventKey を待機してから、EntityManager を使用してイベント・オブジェクトにアクセスし、ジョブを逆参照し、状態を増分します。

EntityManager API には getNextKey メソッドがありませんが、ObjectMap にはあります。 そのためこのコードでは、JobEvent にキーを取得させるために ObjectMap を使用します。 エンティティーを持つマップを使用すると、そのマップはそれ以上オブジェクトを保管しません。 その代わりに、Tuple を保管します。この Tuple とは、キーの Tuple オブジェクト、および値の Tuple オブジェクトです。EntityManager.find メソッドは、キーのタプルを受け入れます。

イベントを作成するためのコードは、以下の例のようになります。
em.getTransaction().begin();
Job job = em.find(Job.class, "Job Key");
JobEvent event = new JobEvent();
event.id = Random.toString();
event.job = job;
em.persist(event); // insert it
em.getTransaction().commit();
イベントのジョブを検索し、イベントを構成し、そのイベントにジョブを指示し、JobEvent マップに挿入し、トランザクションをコミットします。

ローダーおよび FIFO マップ

ローダーで FIFO キューとして使用されたマップを戻す場合は、追加作業がいくつか必要になることがあります。マップ内のエントリーの順序が問題ではない場合は、追加作業はありません。順序が問題となる場合は、挿入されたすべてのレコードをバックエンドに存続させる際に、それらのレコードにシーケンス番号を追加する必要があります。プリロードのメカニズムも、始動時にこの順序でレコードを挿入するように記述する必要があります。