As APIs do DataGrid suportam dois padrões de programação de grade comuns: mapa de paralelo e redução de paralelo.
Mapa de Paralelos
O mapa de paralelos permite que as entradas de um conjunto de chaves sejam processadas e retorna um resultado para cada entrada processada. O aplicativo faz uma lista de chaves e recebe um Mapa de pares chave/resultado depois de chamar uma operação do Mapa. O resultado provém da aplicação de uma função à entrada de cada chave. A função é fornecida pelo aplicativo.
Fluxo de chamada do MapGridAgent
Quando o método AgentManager.callMapAgent é chamado com uma coleção de chaves, a instância MapGridAgent é serializada e enviada para cada partição primária que as chaves resolvem. Isto significa que quaisquer dados de instância armazenados no agente podem ser enviados para o servidor. Portanto, cada partição primária possui uma instância do agente. O método process é chamado para cada instância uma vez para cada chave que resolve a partição. O resultado de cada método process é, então, serializado de volta para o cliente e retornado para o responsável pela chamada em uma instância de Mapa, onde o resultado é representado como o valor no mapa.
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person {
@Id String ssn;
String firstName;
String surname;
int age;
}
A função fornecida pelo aplicativo é redigida como uma classe
que implementa a interface MapAgentGrid. Este é um
agente de exemplo que mostra uma função para retornar a idade de uma Pessoa
multiplicada por dois.public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
private static final long serialVersionUID = -2006093916067992974L;
int lowAge;
int highAge;
public Object process(Session s, ObjectMap map, Object key)
{
Person p = (Person)key;
return new Integer(p.age * 2);
}
public Map processAllEntries(Session s, ObjectMap map)
{
EntityManager em = s.getEntityManager();
Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
q.setParameter(1, lowAge);
q.setParameter(2, highAge);
Iterator iter = q.getResultIterator();
Map<Person, Interger> rc = new HashMap<Person, Integer>();
while(iter.hasNext())
{
Person p = (Person)iter.next();
rc.put(p, (Integer)process(s, map, p));
}
return rc;
}
public Class getClassForEntity()
{
return Person.class;
}
}
O exemplo anterior mostra o agente de Mapa para duplicar
uma entidade de Pessoa. O primeiro método de processo é fornecido com a Pessoa com a
qual trabalhar e retorna o dobro da idade dessa entrada. O segundo método do processo é chamado para cada partição e localiza todos os objetos Person com idades entre lowAge e highAge e retorna os dobros das idades. Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
// make a list of keys
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);
// get the results for those entries
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);
// Close the session (optional in Version 7.1.1 and later)
for improved performance
s.close();
O exemplo
anterior mostra um cliente que obtém uma Sessão e uma referência ao Mapa de
Pessoa. A operação do agente é executada em um Mapa específico.
A interface AgentManager é recuperada a partir de tal Mapa. Uma instância do agente a ser chamado é criada e qualquer estado necessário incluído no objeto pelos atributos da configuração; nesse caso, não há nenhum.
Uma lista de chaves é construída. Um Mapa com os valores para pessoa 1 dobrados, e os mesmos valores para pessoa 2 são retornados.Em seguida, o agente é chamado para esse conjunto de chaves. O método de processamento dos agentes é chamado em cada partição com algumas chaves especificadas na grade em paralelo. Um Mapa é retornado, fornecendo os resultados combinados para a chave especificada. Nesse caso, um Mapa com os valores que mantêm a idade da pessoa 1 dobrada e a mesma da pessoa 2 é retornado.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
agent.lowAge = 20;
agent.highAge = 9999;
Map m = amgr.callMapAgent(agent);
O exemplo anterior mostra o AgentManager sendo obtido para o Mapa da Pessoa, e o agente construído e inicializado com as idades mínima e máxima para Pessoa de interesse. O agente é então chamado com o uso do método callMapAgent.
Observe que nenhuma chave é fornecida. Como resultado, o ObjectGrid chama
o agente em cada partição na grade em paralelo e retorna os
resultados mesclados ao cliente. Este conjunto de devoluções contêm todos os objetos Pessoais
na grade com uma idade entre menor e maior e calcula
a idade desses objetos Pessoais duplicada. Esse exemplo mostra como as
APIs na grade podem ser usadas para executar uma consulta para localizar as entidades que correspondem a uma
determinada consulta. O agente é serializado e transportado pelo ObjectGrid
para as partições com as entradas necessárias. Os resultados são similarmente serializados para o transporte de volta ao cliente. É necessário ter cuidado com as APIs do Mapa. Se o ObjectGrid estivesse hospedando terabytes de objetos
e sendo executado em muitos servidores, esse processamento potencialmente sobrecarregaria
as máquinas do cliente. Use as APIs de Mapa para processar um subconjunto pequeno.
Se for necessário processar um grande subconjunto, use um agente de redução para executar o processamento
fora da grade de dados em vez de fazê-lo em um cliente.Redução de Paralelo ou agentes de agregação
Fluxo de chamada ReduceGridAgent
Quando o método AgentManager.callReduceAgent é chamado com uma coleção de chaves, a instância ReduceGridAgent é serializada e enviada para cada partição primária que as chaves resolvem. Isto significa que quaisquer dados de instância armazenados no agente podem ser enviados para o servidor. Portanto, cada partição primária possui uma instância do agente. O método reduce(Session s, ObjectMap map, Collection keys) é chamado uma vez por instância (partição) com o subconjunto de chaves que resolve a partição. O resultado de cada método de redução é, então, serializado de volta para o cliente. O método reduceResults é chamado na instância ReduceGridAgent do cliente com a coleção de cada resultado de cada chamada de redução remota. O resultado do método reduceResults é retornado para o responsável pela chamada do método callReduceAgent.
package com.ibm.ws.objectgrid.test.agent.jdk5;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;
public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;
/**
* Invoked on the server if a collection of keys is passed to
* AgentManager.callReduceAgent(). This is invoked on each primary shard
* where the key applies.
*/
public Object reduce(Session s, ObjectMap map, Collection keyList) {
try {
int sum = 0;
Iterator<PersonKey> iter = keyList.iterator();
while(iter.hasNext()) {
Object nextKey = iter.next();
PersonKey pk = (PersonKey) nextKey;
Person p = (Person) map.get(pk);
sum += p.age;
}
return sum;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* Invoked on the server if a collection of keys is NOT passed to
* AgentManager.callReduceAgent(). This is invoked on every primary shard.
*/
public Object reduce(Session s, ObjectMap map) {
ObjectQuery q = s
.createObjectQuery("select p from Person p where p.age > -1");
Iterator<Person> iter = q.getResultIterator();
int sum = 0;
while (iter.hasNext()) {
Object nextKey = iter.next();
Person p = (Person) nextKey;
sum += p.age;
}
return sum;
}
/**
* Invoked on the client to reduce the results from all partitions.
*/
public Object reduceResults(Collection results) {
// If we encounter an EntryErrorValue, then throw a RuntimeException
// to indicate that there was at least one failure and include each
// EntryErrorValue
// as part of the thrown exception.
Iterator<Integer> iter = results.iterator();
int sum = 0;
while (iter.hasNext()) {
Object nextResult = iter.next();
if (nextResult instanceof EntryErrorValue) {
EntryErrorValue eev = (EntryErrorValue) nextResult;
throw new RuntimeException(
"Erro encontrado em uma das partições: "
+ nextResult, eev.getException());
}
sum += ((Integer) nextResult).intValue();
}
return new Integer(sum);
}
}
O exemplo anterior mostra o agente.
O agente tem três partes importantes. A
primeira permite que um conjunto específico de entradas seja processado sem uma
consulta. Ela itera o conjunto de entradas, incluindo
as idades. A soma é retornada do método.
A segunda utiliza uma consulta para selecionar
as entradas a serem agregadas. Em seguida, ela soma todas as idades Person
correspondentes. O terceiro método é utilizado para agregar os resultados de cada
partição a um único resultado. O ObjectGrid executa a agregação de entradas em
paralelo por meio do grade.
Cada partição produz um resultado intermediário que deve
ser agregado aos resultados intermediários de outra partição. Esse terceiro método
executa essa tarefa. No exemplo a seguir,
o agente é chamado e as idades de todas as Pessoas exclusivamente com idades entre 10 e 20
são agregadas:Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
SumAgeReduceAgent agent = new SumAgeReduceAgent();
Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);
// Close the session (optional in Version 7.1.1 and later)
for improved performance
s.close();
Funções do agente
O agente é livre para fazer operações de ObjectMap ou EntityManager dentro do shard local onde está executando. O agente recebe uma Sessão e pode incluir, atualizar, consultar, ler ou remover dados da partição que a Sessão representa. Alguns aplicativos consultam somente dados da grade, mas também é possível gravar um agente para incrementar todas as idades da Pessoa em 1 que correspondam a uma determinada consulta. Existe uma transação na Sessão quando o agente é chamado, e é consolidado quando o agente retorna, a menos que uma exceção seja lançada
Manipulação de erros
Se um agente de mapa for chamado com uma chave desconhecida, o valor retornado será um objeto de erro que implementa a interface EntryErrorValue.
Transações
Um agente de mapas é executado numa transação separada do cliente. As chamadas do agente podem ser agrupadas numa única transação. Se um agente falhar e lançar uma exceção, a transação será retrocedida. Quaisquer agentes executados com êxito em uma transação retrocederão com o agente com falha. O AgentManager executará novamente os agentes retrocedidos que foram executados com êxito em uma nova transação.