Beispiel für die DataGrid-APIs

Die DataGrid-APIs unterstützen zwei gängige Programmiermuster für Grids: parallele Maps und parallel reduzierte Maps.

Parallele Maps

Die parallele Map lässt die Verarbeitung der Einträge für eine Gruppe von Schlüsseln zu und gibt für jeden verarbeiteten Eintrag ein Ergebnis zurück. Die Anwendung erstellt eine Liste mit Schlüsseln und empfängt nach dem Aufruf einer Map-Operation eine Map mit Schlüssel/Ergebnis-Paaren. Das Ergebnis ist das Ergebnis der Anwendung einer Funktion auf den Eintrag jedes Schlüssels. Die Funktion wird von der Anwendung bereitgestellt.

Ablauf des MapGridAgent-Aufrufs

Wenn die Methode AgentManager.callMapAgent mit einer Sammlung von Schlüsseln aufgerufen wird, wird die MapGridAgent-Instanz serialisiert und an jede primäre Partition gesendet, in die die Schlüssel aufgelöst werden. Das bedeutet, dass alle Instanzdaten, die im Agenten gespeichert sind, an den Server gesendet werden können. Jede primäre Partition besitzt damit eine Instanz des Agenten. Die Methode "process" wird für jede Instanz einmal für jeden Schlüssel aufgerufen, der in die Partition aufgelöst wird. Das Ergebnis jeder Methode "process" wird anschließend zurück in den Client serialisiert und an den Aufrufenden in einer Map-Instanz zurückgegeben, wo das Ergebnis als Wert dargestellt wird.

Wenn die Methode AgentManager.callMapAgent ohne Sammlung von Schlüsseln aufgerufen wird, wird die MapGridAgent-Instanz serialisiert und an jede primäre Partition gesendet. Das bedeutet, dass alle Instanzdaten, die im Agenten gespeichert sind, an den Server gesendet werden können. Jede primäre Partition besitzt damit eine Instanz (Partition) des Agenten. Die Methode processAllEntries wird für jede Partition aufgerufen. Das Ergebnis jeder Methode processAllEntries wird anschließend zurück in den Client serialisiert und an den Aufrufenden in einer Map-Instanz zurückgegeben. Im folgenden Beispiel wird davon ausgegangen, dass es eine Entität "Person" in der folgenden Form gibt:
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person
{
  @Id String ssn;
  String firstName;
  String surname;
  int age;
}
Die von der Anwendung bereitgestellte Funktion ist als Klasse geschrieben, die die Schnittstelle MapAgentGrid implementiert. Im Folgenden sehen Sie einen Beispielagenten mit einer Funktion, die das Alter einer Person mit zwei multipliziert zurückgibt.
public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
  private static final long serialVersionUID = -2006093916067992974L;

  int lowAge;
  int highAge;

  public Object process(Session s, ObjectMap map, Object key)
  {
    Person p = (Person)key;
    return new Integer(p.age * 2);
  }

  public Map processAllEntries(Session s, ObjectMap map)
  {
    EntityManager em = s.getEntityManager();
    Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
    q.setParameter(1, lowAge);
    q.setParameter(2, highAge);
    Iterator iter = q.getResultIterator();
    Map<Person, Interger> rc = new HashMap<Person, Integer>();
		while(iter.hasNext())
    {
			Person p = (Person)iter.next();
			rc.put(p, (Integer)process(s, map, p));
		}
		return rc;
	}
	public Class getClassForEntity()
	{
		return Person.class;
	}
}
Das vorherige Beispiel zeigt den Map-Agenten für die Verdopplung des Alters einer Person. Mit der ersten process-Methode wird die Person angegeben, die bearbeitet werden soll. Diese Methode gibt das Alter dieses Eintrags mal zwei zurück. Die zweite process-Methode wird für jede Partition ausgeführt. Sie sucht alle Person-Objekte, deren Alter zwischen "lowAge" und "highAge" liegt und gibt deren Alter verdoppelt zurück.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();

DoublePersonAgeAgent agent = new DoublePersonAgeAgent();

// Liste mit Schlüsseln erstellen
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);

// Ergebnisse für diese Einträge abrufen
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
Dieses Beispiel zeigt einen Client, der ein Session-Objekt und eine Referenz auf die Map "Person" abruft. Die Agentenoperation wird für eine bestimmte Map durchgeführt. Die Schnittstelle "AgentManager" wird von dieser Map abgerufen. Es wird eine aufzurufende Instanz des Agenten erstellt, und alle erforderlichen Statusinformationen werden dem Objekt durch das Setzen von Attributen hinzugefügt. In diesem Fall gibt es keine. Anschließend wird eine Liste mit Schlüsseln erstellt. Zurückgegeben werden eine Map mit den verdoppelten Werten für Person 1 und dieselben Werte für Person 2.

Anschließend wird der Agent für diese Gruppe von Schlüsseln aufgerufen. Die Methode "process" des Agenten wird mit einigen der angegebenen Schlüssel für jede Partition im Grid parallel aufgerufen. Es wird eine Map zurückgegeben, die die zusammengefassten Ergebnisse für den angegebenen Schlüssel enthält. In diesem Fall werden eine Map mit verdoppelten Werten für das Alter der Person 1 und dieselben Werte für Person 2 zurückgegeben.

Wenn der Schlüssel nicht vorhanden ist, wird der Agent trotzdem aufgerufen. Dieser Aufruf gibt dem Agenten die Möglichkeit, den Map-Eintrag zu erstellen. Wenn EntityAgentMixin verwendet wird, ist der zu verarbeitende Schlüssel nicht die Entität, sondern der eigentliche Tupelschlüsselwert für die Entität. Wenn die Schlüssel unbekannt sind, können alle Partitionen abgefragt werden, um Person-Objekte einer bestimmten Form zu suchen und deren Alter verdoppelt zurückzugeben. Es folgt ein Beispiel:
Session s = grid.getSession();
  ObjectMap map = s.getMap("Person");
  AgentManager amgr = map.getAgentManager();

  DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
  agent.lowAge = 20;
  agent.highAge = 9999;

  Map m = amgr.callMapAgent(agent);
Das vorherige Beispiel zeigt, wie der AgentManager für die Map "Person" abgerufen und der Agent mit dem Mindest- und Maximalalter für die Personen von Interesse erstellt und initialisiert wird. Anschließend wird der Agent mit der Methode callMapAgent aufgerufen. Beachten Sie, dass keine Schlüssel bereitgestellt werden. Dies bewirkt, dass das ObjectGrid den Agenten auf jeder Partition im Grid parallel aufruft und anschließend die zusammengefassten Ergebnisse an den Client zurückgibt. Der Agent sucht alle Person-Objekte im Grid, deren Alter zwischen dem Mindest- und dem Maximalwert liegt, und verdoppelt das Alter dieser Person-Objekte. Dieses Beispiel zeigt, wie die Grid-APIs verwendet werden können, um eine Abfrage auszuführen, die Entitäten sucht, die einer bestimmten Abfrage entsprechen. Der Agent wird serialisiert und vom ObjectGrid auf die Partitionen mit den benötigten Einträgen transportiert. Die Ergebnisse werden für den Rücktransport an den Client auf ähnliche Weise serialisiert. Bei der Verwendung der Map-APIs muss mit Vorsicht vorgegangen werden. Wenn das ObjectGrid Terabytes von Objekten hostet und auf vielen Servern ausgeführt wird, könnte dieses Verarbeitung die Clientmaschinen überlasten. Verwenden Sie Map-APIs, um einen kleinen teil zu verarbeiten. Wenn eine große Teilmenge verarbeitet werden muss, empfiehlt sich die Verwendung eines Reduktionsagenten, um die Verarbeitung auf das Datengrid auszulagern, anstatt sie in einem Client auszuführen.

Parallele Reduktions- oder Aggregationsagenten

Bei diesem Programmierstil wird eine Teilmenge der Einträge verarbeitet und ein einziges Ergebnis für die Eintragsgruppe ermittelt. Beispiele für ein solches Ergebnis sind Ein Reduktionsagent wird auf ähnlich Weise wie die Map-Agenten codiert und aufgerufen.

Ablauf des ReduceGridAgent-Aufrufs

Wenn die Methode AgentManager.callReduceAgent mit einer Sammlung von Schlüsseln aufgerufen wird, wird die ReduceGridAgent-Instanz serialisiert und an jede primäre Partition gesendet, in die die Schlüssel aufgelöst werden. Das bedeutet, dass alle Instanzdaten, die im Agenten gespeichert sind, an den Server gesendet werden können. Jede primäre Partition besitzt damit eine Instanz des Agenten. Die Methode reduce(Session s, ObjectMap map, Collection keys) wird einmal pro Instanz (Partition) mit dem Teil der Schlüssel aufgerufen, die in die Partition aufgelöst werden. Das Ergebnis jeder Methode "reduce" wird anschließend zurück in den Client serialisiert. Die Methode reduceResults wird in der ReduceGridAgent-Clientinstanz mit der Sammlung der Ergebnisse jedes fernen reduce-Aufrufs aufgerufen. Das Ergebnis der Methode reduceResults wird an den Aufrufenden der Methode callReduceAgent zurückgegeben.

Wenn die Methode AgentManager.callReduceAgent ohne eine Sammlung von Schlüsseln aufgerufen wird, wird ReduceGridAgentinstance serialisiert und an jede primäre Partition gesendet. Das bedeutet, dass alle Instanzdaten, die im Agenten gespeichert sind, an den Server gesendet werden können. Jede primäre Partition besitzt damit eine Instanz des Agenten. Die Methode reduce(Session s, ObjectMap map) wird einmal pro Instanz (Partition) aufgerufen. Das Ergebnis jeder Methode "reduce" wird anschließend zurück in den Client serialisiert. Die Methode reduceResults wird in der ReduceGridAgent-Clientinstanz mit der Sammlung der Ergebnisse jedes fernen reduce-Aufrufs aufgerufen. Das Ergebnis der Methode reduceResults wird an den Aufrufenden der Methode callReduceAgent zurückgegeben.Im Folgenden sehen Sie ein Beispiel für einen Reduktionsagenten, der einfach die Alter der übereinstimmenden Einträge hinzufügt:
package com.ibm.ws.objectgrid.test.agent.jdk5;

import java.util.Collection;
import java.util.Iterator;

import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;

public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;

	/**
	 * Wird im Server aufgerufen, wenn eine Sammlung von Schlüsseln an
	 * AgentManager.callReduceAgent() übergeben wird. Diese Methode wird
	 * in jedem primären Shard aufgerufen, für das der Schlüssel gilt.
	 */
	public Object reduce(Session s, ObjectMap map, Collection keyList) {
		try {
		int sum = 0;
			Iterator<PersonKey> iter = keyList.iterator();
		while (iter.hasNext()) {
				Object nextKey = iter.next();
				PersonKey pk = (PersonKey) nextKey;
				Person p = (Person) map.get(pk);
				sum += p.age;
			}

			return sum;
}catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

	/**
	 * Wird im Server aufgerufen, wenn keine Sammlung von Schlüsseln an
	 * AgentManager.callReduceAgent() übergeben wird. Dieses Methode wird
  * auf jedem primären Shard aufgerufen.
	 */
	public Object reduce(Session s, ObjectMap map) {
		ObjectQuery q = s
				.createObjectQuery("select p from Person p where p.age > -1");
		Iterator<Person> iter = q.getResultIterator();
		int sum = 0;
		while (iter.hasNext()) {
			Object nextKey = iter.next();
			Person p = (Person) nextKey;
			sum += p.age;
		}
		return sum;
	}

	/**
	 * Wird im Client aufgerufen, um die Anzahl der Ergebnisse aller Partitionen zu reduzieren.
	 */
	public Object reduceResults(Collection results) {
		// Wenn ein EntryErrorValue auftritt, eine RuntimeException auslösen,
		// um anzuzeigen, dass mindestens ein Fehler aufgetreten ist,
		// und EntryErrorValue in die ausgelöste Ausnahme
		// einschließen.
		Iterator<Integer> iter = results.iterator();
		int sum = 0;
		while (iter.hasNext()) {
			Object nextResult = iter.next();
			if (nextResult instanceof EntryErrorValue) {
				EntryErrorValue eev = (EntryErrorValue) nextResult;
				throw new RuntimeException(
						"Error encountered on one of the partitions: "
								+ nextResult, eev.getException());
			}

			sum += ((Integer) nextResult).intValue();
		}
		return new Integer(sum);
	}
}
Das vorherige Beispiel zeigt den Agenten. Der Agent setzt sich aus drei wichtigen Teilen zusammen. Im ersten Teil kann eine bestimmte Gruppe von Einträgen ohne Abfrage verarbeitet werden. Es findet eine Iteration durch die Gruppe der Einträge statt, bei der die Alter hinzugefügt werden. Die Methode gibt die Summe zurück. Im zweiten Teil wird eine Abfrage verwendet, um die zusammenzufassenden Einträge auszuwählen. Anschließend werden die Altersangaben aller übereinstimmenden Personen summiert. Die dritte Methode wird verwendet, um die Ergebnisse aller Partitionen zu einem einzigen Ergebnis zusammenzufassen. Das ObjectGrid führt die Eintragsaggregation parallel im Grid durch. Jede Partition liefert ein Zwischenergebnis, das mit den Zwischenergebnissen der anderen Partitionen zusammengefasst werden muss. Diese dritte Methode führt diese Task aus. Im folgenden Beispiel wird der Agent aufgerufen, und die Altersangaben aller Personen mit einem Alter zwischen 10 und 20 ausschließlich werden zusammengefasst:
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();

SumAgeReduceAgent agent = new SumAgeReduceAgent();

Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
	list.add(p);
p = new Person ();
p.ssn = "2";
	list.add(p);
	Integer v = (Integer)amgr.callReduceAgent(agent, list);// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();

Agentenfunktionen

Der Agent kann beliebige ObjectMap- oder EntityManager-Operationen in dem lokalen Shard ausführen, in dem er aktiv ist. Der Agent erhält ein Session-Objekt und kann Daten in der Partition, die vom Session-Objekt dargestellt wird, hinzufügen, aktualisieren, lesen oder entfernen. Einige Anwendungen fragen nur Daten vom Grid ab, aber Sie können einen Agenten auch so schreiben, dass das Alter aller Personen, die einer bestimmten Abfrage entsprechen, um 1 erhöht wird. Beim Aufruf des Agenten wird eine Transaktion im Session-Objekt erstellt, die festgeschrieben wird, wenn der Agent zurückkehrt, sofern keine Ausnahme ausgelöst wird.

Fehlerbehandlung

Wenn ein Map-Agent mit einem unbekannten Schlüssel aufgerufen wird, ist der zurückgegebene Wert ein Fehlerobjekt, das die Schnittstelle EntryErrorValue implementiert.

Transaktionen

Ein Map-Agent wird in einer anderen Transaktion als der Client ausgeführt. Agentenaufrufe können zu einer einzigen Transaktion gruppiert werden. Wenn ein Fehler im Agenten auftritt und eine Ausnahme ausgelöst wird, wird die Transaktion rückgängig gemacht. Alle Agenten, die in einer Transaktion erfolgreich ausgeführt wurden, werden zusammen mit dem fehlgeschlagenen Agenten rückgängig gemacht. AgentManager führt die rückgängig gemachten Agenten, die erfolgreich ausgeführt wurden, in einer neuen Transaktion erneut aus.