Las API de DataGrid admiten dos patrones de programación de cuadrícula comunes: correlación paralela y reducción paralela.
Correlación paralela
La correlación paralela permite que las entradas de un conjunto de claves se procesen y devuelve un resultado para cada entrada procesada. La aplicación efectúa un listado de claves y recibe una correlación de pares de clave/resultado después de invocar la operación de correlación. El resultado es la consecuencia de aplicar una función a la entrada de cada clave. La función la suministra la aplicación.
Flujo de llamadas MapGridAgent
Cuando se invoca el método AgentManager.callMapAgent con una colección de claves, la instancia MapGridAgent se serializa y se envía a cada partición primaria en la que se resuelven las claves. Esto significa que los datos de la instancia almacenados en el agente pueden enviarse al servidor. Por consiguiente, cada partición primaria tiene una instancia del agente. El método de proceso se invoca para cada instancia una vez por cada clave que se resuelve en la partición. El resultado de cada método de proceso se vuelve a serializar en el cliente y se devuelve al llamante en una instancia de correlación, donde el resultado se representa como el valor en la correlación.
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person
{
@Id String ssn;
String firstName;
String surname;
int age;
}
La función proporcionada por la aplicación está escrita como una clase que implementa la interfaz MapAgentGrid. Agente de ejemplo que muestra una función que devuelve la edad de una persona (Person) multiplicada por dos.public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
private static final long serialVersionUID = -2006093916067992974L;
int lowAge;
int highAge;
public Object process(Session s, ObjectMap map, Object key)
{
Person p = (Person)key;
return new Integer(p.age * 2);
}
public Map processAllEntries(Session s, ObjectMap map)
{
EntityManager em = s.getEntityManager();
Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
q.setParameter(1, lowAge);
q.setParameter(2, highAge);
Iterator iter = q.getResultIterator();
Map<Person, Interger> rc = new HashMap<Person, Integer>();
while(iter.hasNext())
{
Person p = (Person)iter.next();
rc.put(p, (Integer)process(s, map, p));
}
return rc;
}
public Class getClassForEntity()
{
return Person.class;
}
}
El ejemplo anterior muestra el agente de correlación con la edad doblada de una persona. El primer método de proceso proporciona la persona con la que trabajar y devuelve el doble de la edad de esa entrada. El segundo método de proceso se llama para cada partición
y se buscan todos los objetos Person con una edad comprendida entre un valor
lowAge y un valor highAge, y se devuelve el doble de las edades. Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
// efectúa una lista de las claves
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);
// obtiene los resultados de las entradas
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);// Cierre la sesión (opcional en la versión 7.1.1 y posterior) para un mejor rendimiento
s.close();
El ejemplo anterior muestra un cliente que obtiene una sesión y una referencia a la correlación de persona. La operación del agente se realiza en una correlación específica. La interfaz
AgentManager se recupera de dicha correlación. Se crea una instancia del agente
que se va a invocar y se añade cualquier estado necesario al objeto mediante el
establecimiento de atributos (no hay ninguno, en este caso). Se crea una lista
de las claves. Se devuelve una correlación con los valores para la persona 1 doblados y los mismos valores para la persona 2. Se invoca el agente para ese conjunto de claves. El método de proceso del agente se invoca en cada partición con algunas de las claves especificadas en la cuadrícula en paralelo. Se devuelve una correlación con los resultados fusionados de la clave especificada. En este caso, se devuelven los valores con el doble de la edad de la persona 1 y lo mismo con la persona 2.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
agent.lowAge = 20;
agent.highAge = 9999;
Map m = amgr.callMapAgent(agent);
El ejemplo anterior muestra el
AgentManager que se obtiene para la correlación Person (persona) y el agente construido e inicializado con las edades bajas y altas para las personas de interés. A continuación, se invoca el agente utilizando el método callMapAgent.
Observe que
no se proporciona ninguna clave. Como resultado, ObjectGrid invoca el agente en todas las particiones de la cuadrícula en paralelo y devuelve los resultados fusionados al cliente. Este conjunto de resultados contiene todos los objetos Person en la cuadrícula con una edad comprendida entre los valores bajos y
altos y calcula el doble de la edad de esos objetos Person. Este ejemplo muestra cómo pueden utilizarse las API de la cuadrícula para ejecutar una consulta que busque entidades que coincidan con una consulta determinada. ObjectGrid serializa y transporta el agente a las particiones con las entradas necesarias.
Los resultados se serializan de forma similar y se transportan al cliente. Las
API de correlación deben tratarse con cuidado. Si ObjectGrid contuviera
terabytes de objetos y se ejecutara en muchos servidores, posiblemente este proceso saturaría las máquinas cliente. Utilice las API de correlación para procesar un subconjunto pequeño.
Si necesita procesar un subconjunto de gran tamaño, se utilice un agente de reducción para realizar el proceso en la cuadrícula de datos en lugar de en un cliente. Reducción paralela o agentes de agregación
Flujo de llamadas ReduceGridAgent
Cuando se invoca el método AgentManager.callReduceAgent con una colección de claves, la instancia de ReduceGridAgent se serializa y se envía a cada partición primaria en la que se resuelven las claves. Esto significa que los datos de la instancia almacenados en el agente pueden enviarse al servidor. Por consiguiente, cada partición primaria tiene una instancia del agente. El método reduce(Session s, ObjectMap map, Collection keys) se invoca una vez por instancia (partición) con el subconjunto de claves que se resuelve en la partición. El resultado de cada método de reducción se vuelve a serializar en el cliente. El método reduceResults se invoca en la instancia ReduceGridAgent del cliente con la colección de cada resultado de cada invocación de reducción remota. El resultado del método reduceResults se devuelve al llamante del método callReduceAgent.
package com.ibm.ws.objectgrid.test.agent.jdk5;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;
public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;
/**
* Se invoca en el servidor si se pasa una colección de claves a
* AgentManager.callReduceAgent(). Se invoca en cada fragmento primario
* en el que se aplica la clave.
*/
public Object reduce(Session s, ObjectMap map, Collection keyList) {
try {
int sum = 0;
Iterator<PersonKey> iter = keyList.iterator();
while (iter.hasNext()) {
Object nextKey = iter.next();
PersonKey pk = (PersonKey) nextKey;
Person p = (Person) map.get(pk);
sum += p.age;
}
return sum;
} catch(Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* Se invoca en el servidor si NO se pasa una colección de claves a
* AgentManager.callReduceAgent(). Se invoca en cada fragmento primario.
*/
public Object reduce(Session s, ObjectMap map) {
ObjectQuery q = s
.createObjectQuery("select p from Person p where p.age > -1");
Iterator<Person> iter = q.getResultIterator();
int sum = 0;
while (iter.hasNext()) {
Object nextKey = iter.next();
Person p = (Person) nextKey;
sum += p.age;
}
return sum;
}
/**
* Se invoca en el cliente para reducir los resultados de todas las particiones.
*/
public Object reduceResults(Collection results) {
// Si aparece un EntryErrorValue, se debe emitir una RuntimeException
// para indicar que hubo al menos un fallo e incluir cada
// EntryErrorValue
// como parte de la excepción emitida.
Iterator<Integer> iter = results.iterator();
int sum = 0;
while (iter.hasNext()) {
Object nextResult = iter.next();
if (nextResult instanceof EntryErrorValue) {
EntryErrorValue eev = (EntryErrorValue) nextResult;
throw new RuntimeException(
"Error encountered on one of the partitions: "
+ nextResult, eev.getException());
}
sum += ((Integer) nextResult).intValue();
}
return new Integer(sum);
}
}
El ejemplo anterior muestra el agente. El agente tiene tres partes
importantes.
La primera permite que un conjunto específico de entradas se procesen sin una
consulta. Itera sobre el conjunto de entradas, añadiendo las edades.
La suma se devuelve desde el método. La segunda parte utiliza una consulta para
seleccionar las entradas que se agregarán. A continuación, se suman todas las
edades de Person coincidentes. El tercer método se utiliza para agregar los
resultados de cada partición a un único resultado. ObjectGrid realiza la
agregación de entradas en paralelo en la cuadrícula. Cada partición produce un
resultado intermedio que debe agregarse con otros resultados intermedios de
particiones.
Este tercer método realiza dicha tarea.
En el ejemplo siguiente, se invoca el agente y se agregan las edades de todas las personas comprendidas entre 10 y 20 exclusivamente:Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
SumAgeReduceAgent agent = new SumAgeReduceAgent();
Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);// Cierre la sesión (opcional en la versión 7.1.1 y posterior) para un mejor rendimiento
s.close();
Funciones del agente
El agente puede llevar a cabo cualquier operación de ObjectMap o EntityManager dentro del fragmento local donde se ejecuta. El agente recibe un objeto Session y puede añadir, actualizar, consultar, leer o eliminar datos de la partición que representa el objeto Session. Algunas aplicaciones sólo consultan los datos de la cuadrícula, pero también puede escribir un agente para aumentar en 1 todas las edades de las entidades Person que coincidan con una consulta determinada. Existe una transacción en el objeto Session cuando se llama al agente, y se confirma cuando se devuelve el agente, a menos que se lance una excepción.
Manejo de errores
Si se invoca un agente de correlación con una clave desconocida, el valor devuelto es un objeto de error que implementa la interfaz EntryErrorValue.
Transacciones
Un agente de correlación se ejecuta en una transacción independiente del cliente. Las invocaciones de agente se pueden agrupar en una única transacción. Si se produce una anomalía en un agente (emite una excepción), la transacción se retrotrae. Cualquier agente que se haya ejecutado correctamente en una transacción se retrotraerá con el agente que ha fallado. AgentManager vuelve a ejecutar los agentes retrotraídos que se ejecutaron correctamente en una nueva transacción.