DataGrid API では、グリッド・プログラミングの 2 つの一般的なパターンである、並列マップと並列削減がサポートされます。
並列マップ
並列マップでは、一連のキーのエントリーを処理することができ、 処理されたそれぞれのエントリーに対する結果が返されます。アプリケーションでは、キーのリストが作成され、Map オペレーションの呼び出し後、 キー/結果ペアの Map を受け取ります。結果は、各キーのエントリーに対して関数が適用されたものです。関数はアプリケーションによって提供されます。
MapGridAgent 呼び出しのフロー
キーのコレクションを使用して AgentManager.callMapAgent メソッドが 呼び出されると、MapGridAgent インスタンスがシリアライズされ、各キーで解決された それぞれのプライマリー区画に送信されます。すなわち、エージェントに保管されているインスタンス・データは、 すべてサーバーに送信できます。したがって、各プライマリー区画は、 エージェントのインスタンスを 1 つ保持します。各キーのインスタンスごとに 1 回 process メソッドが 呼び出され、その結果、区画が解決されます。 各 process メソッドの結果はその後、シリアライズされてクライアントへ返され、 マップ・インスタンス内で呼び出し元に返されます。そこでは、結果はマップの中の値として 提示されます。
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person
{
@Id String ssn;
String firstName;
String surname;
int age;
}
アプリケーション提供の関数は、MapAgentGrid インターフェースを実装するクラスとして作成されます。
Person の年齢を 2 倍にした値を返す関数を示すエージェントの例。
public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
private static final long serialVersionUID = -2006093916067992974L;
int lowAge;
int highAge;
public Object process(Session s, ObjectMap map, Object key)
{
Person p = (Person)key;
return new Integer(p.age * 2);
}
public Map processAllEntries(Session s, ObjectMap map)
{
EntityManager em = s.getEntityManager();
Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
q.setParameter(1, lowAge);
q.setParameter(2, highAge);
Iterator iter = q.getResultIterator();
Map<Person, Interger> rc = new HashMap<Person, Integer>();
while(iter.hasNext())
{
Person p = (Person)iter.next();
rc.put(p, (Integer)process(s, map, p));
}
return rc;
}
public Class getClassForEntity()
{
return Person.class;
}
}
上記の例は、Person を 2 倍にする Map エージェントを示しています。
最初の process メソッドでは、処理する Person が提供され、そのエントリーの 2 倍の年齢を返します。
2 番目の
process メソッドは、各区画で呼び出され、年齢が lowAge と highAge 間にあるすべての Person オブジェクトを検出し、
その年齢を 2 倍にした値を返します。Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
// make a list of keys
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);
// get the results for those entries
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);
// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
上記の例は、Session と、Person Map への参照を取得するクライアントを示しています。
エージェント・オペレーションは、特定の Map に対して実行されます。AgentManager インターフェースはその Map から取得されます。
呼び出されるエージェントのインスタンスが作成され、属性を設定することにより、必要な状態がオブジェクトに追加されます。
ただし、この例では追加はありません。次に、キーのリストが構成されます。person 1 については 2 倍にした値と、person 2 については同じ値を保持する Map が
戻されます。エージェントがキー・セットに対して呼び出されます。指定したキーを使用して、グリッド内の各区画で、 並行してエージェントの process メソッドが呼び出されます。Map は、指定のキーに対する結果をマージして戻されます。 この例では、person 1 の年齢を 2 倍にした値および person 2 の同様の値を保持する Map が返されます。
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
agent.lowAge = 20;
agent.highAge = 9999;
Map m = amgr.callMapAgent(agent);
上の例では、
AgentManager が Person Map のために取得され、
エージェントは、該当の Person の最小年齢と最大年齢で構成され、初期化されて
います。次に、callMapAgent メソッドを使用してエージェントが呼び出されます。
キーが提供されていないことに注意してください。結果として、ObjectGrid は、グリッド内のすべての区画でエージェントを並行して呼び出し、マージされた結果をクライアントに返します。
このリターンのセットは、最低と最高の間にある年齢を持つ、グリッド内のすべての Person オブジェクトを含み、それらの Person オブジェクトの年齢の 2 倍を計算します。
この例は、グリッド API をどのように使用すれば、特定の照会と一致するエンティティーを検出する照会を実行できるかを示しています。
エージェントは、ObjectGrid により、シリアライズされて、必要なエントリーとともに区画へトランスポートされます。
結果も同様に、クライアントへのトランスポートのためにシリアライズされます。Map API には注意が必要です。仮に、ObjectGrid がテラバイト単位のオブジェクトをホストしていて、多数のサーバーで実行されるようなことがあったとすると、クライアント・マシンはこの処理に対応できなかったと思われます。
Map API を使用して小規模のサブセットを処理するようにしてください。
大規模なサブセットを処理する必要がある場合は、削減エージェントを使用して、1 つのクライアントではなくデータ・グリッド内で処理を行うようにしてください。並列削減または集約エージェント
ReduceGridAgent 呼び出しのフロー
キーのコレクションを使用して AgentManager.callReduceAgent メソッドが 呼び出されると、ReduceGridAgent インスタンスがシリアライズされ、各キーで解決された それぞれのプライマリー区画に送信されます。すなわち、エージェントに保管されているインスタンス・データは、 すべてサーバーに送信できます。したがって、各プライマリー区画は、 エージェントのインスタンスを 1 つ保持します。reduce(Session s, ObjectMap map, Collection keys) メソッドは、 インスタンス (区画) ごとに 1 回、区画に解決されるキーのサブセットを指定して 呼び出されます。各 reduce メソッドの結果はその後、シリアライズされて クライアントへ返されます。reduceResults メソッドは、 各リモートでの reduce 呼び出しから返されたそれぞれの結果のコレクションを使用して、 クライアント ReduceGridAgent インスタンスに対して呼び出されます。reduceResults メソッドの 結果は、callReduceAgent メソッドの呼び出し元に返されます。
package com.ibm.ws.objectgrid.test.agent.jdk5;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;
public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;
/**
* Invoked on the server if a collection of keys is passed to
* AgentManager.callReduceAgent(). This is invoked on each primary shard
* where the key applies.
*/
public Object reduce(Session s, ObjectMap map, Collection keyList) {
try {
int sum = 0;
Iterator<PersonKey> iter = keyList.iterator();
while (iter.hasNext()) {
Object nextKey = iter.next();
PersonKey pk = (PersonKey) nextKey;
Person p = (Person) map.get(pk);
sum += p.age;
}
return sum;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* Invoked on the server if a collection of keys is NOT passed to
* AgentManager.callReduceAgent(). This is invoked on every primary shard.
*/
public Object reduce(Session s, ObjectMap map) {
ObjectQuery q = s
.createObjectQuery("select p from Person p where p.age > -1");
Iterator<Person> iter = q.getResultIterator();
int sum = 0;
while (iter.hasNext()) {
Object nextKey = iter.next();
Person p = (Person) nextKey;
sum += p.age;
}
return sum;
}
/**
* Invoked on the client to reduce the results from all partitions.
*/
public Object reduceResults(Collection results) {
// If we encounter an EntryErrorValue, then throw a RuntimeException
// to indicate that there was at least one failure and include each
// EntryErrorValue
// as part of the thrown exception.
Iterator<Integer> iter = results.iterator();
int sum = 0;
while (iter.hasNext()) {
Object nextResult = iter.next();
if (nextResult instanceof EntryErrorValue) {
EntryErrorValue eev = (EntryErrorValue) nextResult;
throw new RuntimeException(
"Error encountered on one of the partitions: "
+ nextResult, eev.getException());
}
sum += ((Integer) nextResult).intValue();
}
return new Integer(sum);
}
}
上の例はエージェントを示しています。このエージェントには、3 つの重要部分があります。1 番目の部分では、
特定のエントリー・セットが照会なしで処理されます。そのエントリー・セットが繰り返されて、年齢が加算されます。
メソッドから合計が返されます。
2 番目の部分では、照会が使用され、集約されるエントリーが選択されます。該当するすべての Person の年齢が合計されます。
3 番目のメソッドは、各区画からの結果を単一の結果に集約するために使用されます。ObjectGrid では、
グリッド中のエントリー集約が並行して実行されます。各区画で中間結果が作成されるので、それを他の区画の中間結果と合わせて集約する必要があります。
3 番目のメソッドでこのタスクが実行されます。次の例では、エージェントが呼び出され、年齢が 10 歳から 20 歳までのすべての Person の年齢のみが集約されます。
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
SumAgeReduceAgent agent = new SumAgeReduceAgent();
Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);
// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
エージェントの機能
エージェントは、それが稼働しているローカル断片の内部で、自由に ObjectMap または EntityManager 操作を 実行できます。エージェントは Session を受け取り、 その Session が表す区画のデータの追加、更新、照会、読み取り、または削除を 行うことができます。グリッドからデータを照会するだけのアプリケーションもありますが、特定の照会に一致するすべての Person の年齢を 1 だけ増やすようなエージェントを作成することもできます。 エージェントが呼び出されるときには Session にトランザクションがあり、 例外がスローされない限り、エージェントが戻るときにそのトランザクションはコミットされます。
エラー処理
マップ・エージェントが不明なキーで呼び出されると、返される値は、EntryErrorValue インターフェースを実装するエラー・オブジェクトです。
トランザクション
マップ・エージェントはクライアントから分離したトランザクションで実行されます。 エージェントの呼び出しは単一トランザクションにグループ化される場合があります。エージェントが失敗して、例外をスローすると、トランザクションはロールバックされます。 トランザクション内で 正常に実行したエージェントがある場合、失敗したエージェントと一緒にそれらのエージェントも ロールバックされます。AgentManager は、正常に実行した、ロールバックされたエージェントを、新しいトランザクションで再実行 します。