Exemplo da API do DataGrid

As APIs do DataGrid suportam dois padrões de programação de grade comuns: mapa de paralelo e redução de paralelo.

Mapa de Paralelos

O mapa de paralelos permite que as entradas de um conjunto de chaves sejam processadas e retorna um resultado para cada entrada processada. O aplicativo faz uma lista de chaves e recebe um Mapa de pares chave/resultado depois de chamar uma operação do Mapa. O resultado provém da aplicação de uma função à entrada de cada chave. A função é fornecida pelo aplicativo.

Fluxo de chamada do MapGridAgent

Quando o método AgentManager.callMapAgent é chamado com uma coleção de chaves, a instância MapGridAgent é serializada e enviada para cada partição primária que as chaves resolvem. Isto significa que quaisquer dados de instância armazenados no agente podem ser enviados para o servidor. Portanto, cada partição primária possui uma instância do agente. O método process é chamado para cada instância uma vez para cada chave que resolve a partição. O resultado de cada método process é, então, serializado de volta para o cliente e retornado para o responsável pela chamada em uma instância de Mapa, onde o resultado é representado como o valor no mapa.

Quando o método AgentManager.callMapAgent é chamado sem uma coleção de chaves, a instância MapGridAgent é serializada e enviada para todas as partições primárias. Isto significa que quaisquer dados de instância armazenados no agente podem ser enviados para o servidor. Cada partição principal, portanto, tem uma instância (partição) do agente. O método processAllEntries é chamado para cada partição. O resultado de cada método processAllEntries é então serializado de volta para o cliente e retornado ao responsável pela chamada em uma instância de Mapa. O exemplo a seguir parte da premissa de que há uma entidade de Pessoa com o seguinte formato:
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person {
  @Id String ssn;
  String firstName;
  String surname;
  int age;
}
A função fornecida pelo aplicativo é redigida como uma classe que implementa a interface MapAgentGrid. Este é um agente de exemplo que mostra uma função para retornar a idade de uma Pessoa multiplicada por dois.
public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
  private static final long serialVersionUID = -2006093916067992974L;

  int lowAge;
  int highAge;

  public Object process(Session s, ObjectMap map, Object key)
  {
    Person p = (Person)key;
    return new Integer(p.age * 2);
  }

  public Map processAllEntries(Session s, ObjectMap map)
  {
    EntityManager em = s.getEntityManager();
    Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
    q.setParameter(1, lowAge);
    q.setParameter(2, highAge);
    Iterator iter = q.getResultIterator();
    Map<Person, Interger> rc = new HashMap<Person, Integer>();
	  while(iter.hasNext())
    {
			Person p = (Person)iter.next();
			rc.put(p, (Integer)process(s, map, p));
		}
		return rc;
	}
	public Class getClassForEntity()
	{
		return Person.class;
	}
}
O exemplo anterior mostra o agente de Mapa para duplicar uma entidade de Pessoa. O primeiro método de processo é fornecido com a Pessoa com a qual trabalhar e retorna o dobro da idade dessa entrada. O segundo método do processo é chamado para cada partição e localiza todos os objetos Person com idades entre lowAge e highAge e retorna os dobros das idades.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();

DoublePersonAgeAgent agent = new DoublePersonAgeAgent();

// make a list of keys
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);

// get the results for those entries
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);
// Close the session (optional in Version 7.1.1 and later) 
for improved performance
s.close();
O exemplo anterior mostra um cliente que obtém uma Sessão e uma referência ao Mapa de Pessoa. A operação do agente é executada em um Mapa específico. A interface AgentManager é recuperada a partir de tal Mapa. Uma instância do agente a ser chamado é criada e qualquer estado necessário incluído no objeto pelos atributos da configuração; nesse caso, não há nenhum. Uma lista de chaves é construída. Um Mapa com os valores para pessoa 1 dobrados, e os mesmos valores para pessoa 2 são retornados.

Em seguida, o agente é chamado para esse conjunto de chaves. O método de processamento dos agentes é chamado em cada partição com algumas chaves especificadas na grade em paralelo. Um Mapa é retornado, fornecendo os resultados combinados para a chave especificada. Nesse caso, um Mapa com os valores que mantêm a idade da pessoa 1 dobrada e a mesma da pessoa 2 é retornado.

Mesmo que a chave não exista, o agente será chamado. Essa chamada oferece ao agente a oportunidade de criar a entrada do mapa. Se você estiver usando um EntityAgentMixin, a chave a ser processada não será a entidade, mas o valor real da chave Tupla da entidade. Se as chaves forem desconhecidas, será possível consultar todas as partições para localizar os objetos Pessoais de uma determinada forma e retornar suas idades duplicadas. Este é um exemplo:
Session s = grid.getSession();
  ObjectMap map = s.getMap("Person");
  AgentManager amgr = map.getAgentManager();

  DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
  agent.lowAge = 20;
  agent.highAge = 9999;

  Map m = amgr.callMapAgent(agent);
O exemplo anterior mostra o AgentManager sendo obtido para o Mapa da Pessoa, e o agente construído e inicializado com as idades mínima e máxima para Pessoa de interesse. O agente é então chamado com o uso do método callMapAgent. Observe que nenhuma chave é fornecida. Como resultado, o ObjectGrid chama o agente em cada partição na grade em paralelo e retorna os resultados mesclados ao cliente. Este conjunto de devoluções contêm todos os objetos Pessoais na grade com uma idade entre menor e maior e calcula a idade desses objetos Pessoais duplicada. Esse exemplo mostra como as APIs na grade podem ser usadas para executar uma consulta para localizar as entidades que correspondem a uma determinada consulta. O agente é serializado e transportado pelo ObjectGrid para as partições com as entradas necessárias. Os resultados são similarmente serializados para o transporte de volta ao cliente. É necessário ter cuidado com as APIs do Mapa. Se o ObjectGrid estivesse hospedando terabytes de objetos e sendo executado em muitos servidores, esse processamento potencialmente sobrecarregaria as máquinas do cliente. Use as APIs de Mapa para processar um subconjunto pequeno. Se for necessário processar um grande subconjunto, use um agente de redução para executar o processamento fora da grade de dados em vez de fazê-lo em um cliente.

Redução de Paralelo ou agentes de agregação

Este estilo de programação processa um subconjunto das entradas e calcula um único resultado para o grupo de entradas. Os exemplos de resultados podem ser: Um agente de redução é codificado e chamado de modo similar aos agentes de Mapa.

Fluxo de chamada ReduceGridAgent

Quando o método AgentManager.callReduceAgent é chamado com uma coleção de chaves, a instância ReduceGridAgent é serializada e enviada para cada partição primária que as chaves resolvem. Isto significa que quaisquer dados de instância armazenados no agente podem ser enviados para o servidor. Portanto, cada partição primária possui uma instância do agente. O método reduce(Session s, ObjectMap map, Collection keys) é chamado uma vez por instância (partição) com o subconjunto de chaves que resolve a partição. O resultado de cada método de redução é, então, serializado de volta para o cliente. O método reduceResults é chamado na instância ReduceGridAgent do cliente com a coleção de cada resultado de cada chamada de redução remota. O resultado do método reduceResults é retornado para o responsável pela chamada do método callReduceAgent.

Quando o método AgentManager.callReduceAgent é chamado sem uma coleção de chaves, o ReduceGridAgentinstance é serializado e enviado para cada partição primária. Isto significa que quaisquer dados de instância armazenados no agente podem ser enviados para o servidor. Portanto, cada partição primária possui uma instância do agente. O método reduce(Session s, ObjectMap map) é chamado uma vez por instância (partição). O resultado de cada método de redução é, então, serializado de volta para o cliente. O método reduceResults é chamado na instância ReduceGridAgent do cliente com a coleção de cada resultado de cada chamada de redução remota. O resultado do método reduceResults é retornado para o responsável pela chamada do método callReduceAgent. Este é um exemplo de um agente de redução que simplesmente inclui as idades das entradas compatíveis.
package com.ibm.ws.objectgrid.test.agent.jdk5;

import java.util.Collection;
import java.util.Iterator;

import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;

public class SumAgeReduceAgent implements ReduceGridAgent {
	private static final long serialVersionUID = 2521080771723284899L;

	/**
	 * Invoked on the server if a collection of keys is passed to
	 * AgentManager.callReduceAgent(). This is invoked on each primary shard
	 * where the key applies.
	 */
	public Object reduce(Session s, ObjectMap map, Collection keyList) {
		try {
			int sum = 0;
			Iterator<PersonKey> iter = keyList.iterator();
			while(iter.hasNext()) {
				Object nextKey = iter.next();
				PersonKey pk = (PersonKey) nextKey;
				Person p = (Person) map.get(pk);
				sum += p.age;
			}

			return sum;
		} catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}

	/**
	 * Invoked on the server if a collection of keys is NOT passed to
	 * AgentManager.callReduceAgent(). This is invoked on every primary shard.
	 */
	public Object reduce(Session s, ObjectMap map) {
		ObjectQuery q = s
				.createObjectQuery("select p from Person p where p.age > -1");
		Iterator<Person> iter = q.getResultIterator();
		int sum = 0;
		while (iter.hasNext()) {
			Object nextKey = iter.next();
			Person p = (Person) nextKey;
			sum += p.age;
		}
		return sum;
	}

	/**
	 * Invoked on the client to reduce the results from all partitions.
	 */
	public Object reduceResults(Collection results) {
		// If we encounter an EntryErrorValue, then throw a RuntimeException
		// to indicate that there was at least one failure and include each
		// EntryErrorValue
		// as part of the thrown exception.
		Iterator<Integer> iter = results.iterator();
		int sum = 0;
		while (iter.hasNext()) {
			Object nextResult = iter.next();
			if (nextResult instanceof EntryErrorValue) {
				EntryErrorValue eev = (EntryErrorValue) nextResult;
				throw new RuntimeException(
						"Erro encontrado em uma das partições: "
								+ nextResult, eev.getException());
			}

			sum += ((Integer) nextResult).intValue();
		}
		return new Integer(sum);
	}
}
O exemplo anterior mostra o agente. O agente tem três partes importantes. A primeira permite que um conjunto específico de entradas seja processado sem uma consulta. Ela itera o conjunto de entradas, incluindo as idades. A soma é retornada do método. A segunda utiliza uma consulta para selecionar as entradas a serem agregadas. Em seguida, ela soma todas as idades Person correspondentes. O terceiro método é utilizado para agregar os resultados de cada partição a um único resultado. O ObjectGrid executa a agregação de entradas em paralelo por meio do grade. Cada partição produz um resultado intermediário que deve ser agregado aos resultados intermediários de outra partição. Esse terceiro método executa essa tarefa. No exemplo a seguir, o agente é chamado e as idades de todas as Pessoas exclusivamente com idades entre 10 e 20 são agregadas:
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();

SumAgeReduceAgent agent = new SumAgeReduceAgent();

Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);
// Close the session (optional in Version 7.1.1 and later) 
for improved performance
s.close();

Funções do agente

O agente é livre para fazer operações de ObjectMap ou EntityManager dentro do shard local onde está executando. O agente recebe uma Sessão e pode incluir, atualizar, consultar, ler ou remover dados da partição que a Sessão representa. Alguns aplicativos consultam somente dados da grade, mas também é possível gravar um agente para incrementar todas as idades da Pessoa em 1 que correspondam a uma determinada consulta. Existe uma transação na Sessão quando o agente é chamado, e é consolidado quando o agente retorna, a menos que uma exceção seja lançada

Manipulação de erros

Se um agente de mapa for chamado com uma chave desconhecida, o valor retornado será um objeto de erro que implementa a interface EntryErrorValue.

Transações

Um agente de mapas é executado numa transação separada do cliente. As chamadas do agente podem ser agrupadas numa única transação. Se um agente falhar e lançar uma exceção, a transação será retrocedida. Quaisquer agentes executados com êxito em uma transação retrocederão com o agente com falha. O AgentManager executará novamente os agentes retrocedidos que foram executados com êxito em uma nova transação.