Ejemplo de la API de DataGrid

Las API de DataGrid admiten dos patrones de programación de cuadrícula comunes: correlación paralela y reducción paralela.

Correlación paralela

La correlación paralela permite que las entradas de un conjunto de claves se procesen y devuelve un resultado para cada entrada procesada. La aplicación efectúa un listado de claves y recibe una correlación de pares de clave/resultado después de invocar la operación de correlación. El resultado es la consecuencia de aplicar una función a la entrada de cada clave. La función la suministra la aplicación.

Flujo de llamadas MapGridAgent

Cuando se invoca el método AgentManager.callMapAgent con una colección de claves, la instancia MapGridAgent se serializa y se envía a cada partición primaria en la que se resuelven las claves. Esto significa que los datos de la instancia almacenados en el agente pueden enviarse al servidor. Por consiguiente, cada partición primaria tiene una instancia del agente. El método de proceso se invoca para cada instancia una vez por cada clave que se resuelve en la partición. El resultado de cada método de proceso se vuelve a serializar en el cliente y se devuelve al llamante en una instancia de correlación, donde el resultado se representa como el valor en la correlación.

Cuando se invoca el método AgentManager.callMapAgent sin una colección de claves, la instancia MapGridAgent se serializa y se envía a todas las particiones primarias. Esto significa que los datos de la instancia almacenados en el agente pueden enviarse al servidor. Por consiguiente, cada partición primaria tiene una instancia (partición) del agente. El método processAllEntries se invoca para cada partición. El resultado de cada método processAllEntries se vuelve a serializar en el cliente y se devuelve al llamante en una instancia de correlación. En el siguiente ejemplo se presupone que existe una entidad Person con la forma siguiente:
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person
{
  @Id String ssn;
  String firstName;
  String surname;
  int age;
}
La función proporcionada por la aplicación está escrita como una clase que implementa la interfaz MapAgentGrid. Agente de ejemplo que muestra una función que devuelve la edad de una persona (Person) multiplicada por dos.
public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
  private static final long serialVersionUID = -2006093916067992974L;

  int lowAge;
  int highAge;

  public Object process(Session s, ObjectMap map, Object key)
  {
    Person p = (Person)key;
    return new Integer(p.age * 2);
  }

  public Map processAllEntries(Session s, ObjectMap map)
  {
    EntityManager em = s.getEntityManager();
    Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
    q.setParameter(1, lowAge);
    q.setParameter(2, highAge);
    Iterator iter = q.getResultIterator();
    Map<Person, Interger> rc = new HashMap<Person, Integer>();
	  while(iter.hasNext())
    {
			Person p = (Person)iter.next();
			rc.put(p, (Integer)process(s, map, p));
		}
		return rc;
	}
	public Class getClassForEntity()
	{
		return Person.class;
	}
}
El ejemplo anterior muestra el agente de correlación con la edad doblada de una persona. El primer método de proceso proporciona la persona con la que trabajar y devuelve el doble de la edad de esa entrada. El segundo método de proceso se llama para cada partición y se buscan todos los objetos Person con una edad comprendida entre un valor lowAge y un valor highAge, y se devuelve el doble de las edades.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();

DoublePersonAgeAgent agent = new DoublePersonAgeAgent();

// efectúa una lista de las claves
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);

// obtiene los resultados de las entradas
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);// Cierre la sesión (opcional en la versión 7.1.1 y posterior) para un mejor rendimiento
s.close();
El ejemplo anterior muestra un cliente que obtiene una sesión y una referencia a la correlación de persona. La operación del agente se realiza en una correlación específica. La interfaz AgentManager se recupera de dicha correlación. Se crea una instancia del agente que se va a invocar y se añade cualquier estado necesario al objeto mediante el establecimiento de atributos (no hay ninguno, en este caso). Se crea una lista de las claves. Se devuelve una correlación con los valores para la persona 1 doblados y los mismos valores para la persona 2.

Se invoca el agente para ese conjunto de claves. El método de proceso del agente se invoca en cada partición con algunas de las claves especificadas en la cuadrícula en paralelo. Se devuelve una correlación con los resultados fusionados de la clave especificada. En este caso, se devuelven los valores con el doble de la edad de la persona 1 y lo mismo con la persona 2.

Aunque la clave no exista, se invoca el agente. De esta manera, el agente tiene la oportunidad de crear la entrada de correlación. Si usa EntityAgentMixin, la clave que se procesará no será la entidad, sino el valor clave del tuple real de la entidad. Si las claves no se conocen, puede solicitar a todas las particiones que busquen los objetos Person de una forma determinada y que devuelvan el doble de la edad. A continuación se muestra un ejemplo:
Session s = grid.getSession();
  ObjectMap map = s.getMap("Person");
  AgentManager amgr = map.getAgentManager();

  DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
  agent.lowAge = 20;
  agent.highAge = 9999;

  Map m = amgr.callMapAgent(agent);
El ejemplo anterior muestra el AgentManager que se obtiene para la correlación Person (persona) y el agente construido e inicializado con las edades bajas y altas para las personas de interés. A continuación, se invoca el agente utilizando el método callMapAgent. Observe que no se proporciona ninguna clave. Como resultado, ObjectGrid invoca el agente en todas las particiones de la cuadrícula en paralelo y devuelve los resultados fusionados al cliente. Este conjunto de resultados contiene todos los objetos Person en la cuadrícula con una edad comprendida entre los valores bajos y altos y calcula el doble de la edad de esos objetos Person. Este ejemplo muestra cómo pueden utilizarse las API de la cuadrícula para ejecutar una consulta que busque entidades que coincidan con una consulta determinada. ObjectGrid serializa y transporta el agente a las particiones con las entradas necesarias. Los resultados se serializan de forma similar y se transportan al cliente. Las API de correlación deben tratarse con cuidado. Si ObjectGrid contuviera terabytes de objetos y se ejecutara en muchos servidores, posiblemente este proceso saturaría las máquinas cliente. Utilice las API de correlación para procesar un subconjunto pequeño. Si necesita procesar un subconjunto de gran tamaño, se utilice un agente de reducción para realizar el proceso en la cuadrícula de datos en lugar de en un cliente.

Reducción paralela o agentes de agregación

Este estilo de programación procesa un subconjunto de entradas y calcula un resultado único para el grupo de entradas. Ejemplos de dicho resultado son: Un agente de reducción está codificado y se invoca de forma parecida a los agentes de correlación.

Flujo de llamadas ReduceGridAgent

Cuando se invoca el método AgentManager.callReduceAgent con una colección de claves, la instancia de ReduceGridAgent se serializa y se envía a cada partición primaria en la que se resuelven las claves. Esto significa que los datos de la instancia almacenados en el agente pueden enviarse al servidor. Por consiguiente, cada partición primaria tiene una instancia del agente. El método reduce(Session s, ObjectMap map, Collection keys) se invoca una vez por instancia (partición) con el subconjunto de claves que se resuelve en la partición. El resultado de cada método de reducción se vuelve a serializar en el cliente. El método reduceResults se invoca en la instancia ReduceGridAgent del cliente con la colección de cada resultado de cada invocación de reducción remota. El resultado del método reduceResults se devuelve al llamante del método callReduceAgent.

Cuando se invoca el método AgentManager.callReduceAgent sin una colección de claves, la instancia ReduceGridAgent se serializa y se envía a todas las particiones primarias. Esto significa que los datos de la instancia almacenados en el agente pueden enviarse al servidor. Por consiguiente, cada partición primaria tiene una instancia del agente. El método reduce(Session s, ObjectMap map) se invoca una vez por instancia (partición). El resultado de cada método de reducción se vuelve a serializar en el cliente. El método reduceResults se invoca en la instancia ReduceGridAgent del cliente con la colección de cada resultado de cada invocación de reducción remota. El resultado del método reduceResults se devuelve al llamante del método callReduceAgent.A continuación se muestra un ejemplo de un agente de reducción que simplemente añade las edades de las entradas coincidentes.
package com.ibm.ws.objectgrid.test.agent.jdk5;

import java.util.Collection;
import java.util.Iterator;

import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;

public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;

	/**
	 * Se invoca en el servidor si se pasa una colección de claves a
	 * AgentManager.callReduceAgent(). Se invoca en cada fragmento primario
	 * en el que se aplica la clave.
	 */
	public Object reduce(Session s, ObjectMap map, Collection keyList) {
		try {
		int sum = 0;
			Iterator<PersonKey> iter = keyList.iterator();
			while (iter.hasNext()) {
				Object nextKey = iter.next();
				PersonKey pk = (PersonKey) nextKey;
				Person p = (Person) map.get(pk);
				sum += p.age;
			}

			return sum;
		} catch(Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

	/**
	 * Se invoca en el servidor si NO se pasa una colección de claves a
	 * AgentManager.callReduceAgent(). Se invoca en cada fragmento primario.
	 */
	public Object reduce(Session s, ObjectMap map) {
		ObjectQuery q = s
				.createObjectQuery("select p from Person p where p.age > -1");
		Iterator<Person> iter = q.getResultIterator();
		int sum = 0;
		while (iter.hasNext()) {
			Object nextKey = iter.next();
			Person p = (Person) nextKey;
			sum += p.age;
		}
		return sum;
	}

	/**
	 * Se invoca en el cliente para reducir los resultados de todas las particiones.
	 */
	public Object reduceResults(Collection results) {
		// Si aparece un EntryErrorValue, se debe emitir una RuntimeException
		// para indicar que hubo al menos un fallo e incluir cada
		// EntryErrorValue
		// como parte de la excepción emitida.
		Iterator<Integer> iter = results.iterator();
		int sum = 0;
		while (iter.hasNext()) {
			Object nextResult = iter.next();
			if (nextResult instanceof EntryErrorValue) {
				EntryErrorValue eev = (EntryErrorValue) nextResult;
				throw new RuntimeException(
						"Error encountered on one of the partitions: "
								+ nextResult, eev.getException());
			}

			sum += ((Integer) nextResult).intValue();
		}
		return new Integer(sum);
	}
}
El ejemplo anterior muestra el agente. El agente tiene tres partes importantes. La primera permite que un conjunto específico de entradas se procesen sin una consulta. Itera sobre el conjunto de entradas, añadiendo las edades. La suma se devuelve desde el método. La segunda parte utiliza una consulta para seleccionar las entradas que se agregarán. A continuación, se suman todas las edades de Person coincidentes. El tercer método se utiliza para agregar los resultados de cada partición a un único resultado. ObjectGrid realiza la agregación de entradas en paralelo en la cuadrícula. Cada partición produce un resultado intermedio que debe agregarse con otros resultados intermedios de particiones. Este tercer método realiza dicha tarea. En el ejemplo siguiente, se invoca el agente y se agregan las edades de todas las personas comprendidas entre 10 y 20 exclusivamente:
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();

SumAgeReduceAgent agent = new SumAgeReduceAgent();

Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
	list.add(p);
p = new Person ();
p.ssn = "2";
	list.add(p);
	Integer v = (Integer)amgr.callReduceAgent(agent, list);// Cierre la sesión (opcional en la versión 7.1.1 y posterior) para un mejor rendimiento
s.close();

Funciones del agente

El agente puede llevar a cabo cualquier operación de ObjectMap o EntityManager dentro del fragmento local donde se ejecuta. El agente recibe un objeto Session y puede añadir, actualizar, consultar, leer o eliminar datos de la partición que representa el objeto Session. Algunas aplicaciones sólo consultan los datos de la cuadrícula, pero también puede escribir un agente para aumentar en 1 todas las edades de las entidades Person que coincidan con una consulta determinada. Existe una transacción en el objeto Session cuando se llama al agente, y se confirma cuando se devuelve el agente, a menos que se lance una excepción.

Manejo de errores

Si se invoca un agente de correlación con una clave desconocida, el valor devuelto es un objeto de error que implementa la interfaz EntryErrorValue.

Transacciones

Un agente de correlación se ejecuta en una transacción independiente del cliente. Las invocaciones de agente se pueden agrupar en una única transacción. Si se produce una anomalía en un agente (emite una excepción), la transacción se retrotrae. Cualquier agente que se haya ejecutado correctamente en una transacción se retrotraerá con el agente que ha fallado. AgentManager vuelve a ejecutar los agentes retrotraídos que se ejecutaron correctamente en una nueva transacción.