Les API DataGrid prennent en charge deux modèles communs de programmation de grille : les mappes parallèles et les réductions parallèles.
Mappe parallèle
Une mappe parallèle permet de traiter les entrées correspondant à un ensemble de clés et renvoie un résultat pour chaque entrée traitée. L'application dresse une liste des clés et reçoit une mappe de paires clé/résultat après avoir appelé une opération Map. Le résultat est le résultat de l'exécution d'une fonction sur l'entrée correspondant à chaque clé. La fonction est fournie par l'application.
Flux d'appels MapGridAgent
Lorsque la méthode AgentManager.callMapAgent est appelée avec une collection de clés, l'instance MapGridAgent est sérialisée et envoyée à chaque partition principale dans laquelle la clé est résolue. Ceci signifie que les données d'instance stockées dans l'agent peuvent être envoyées au serveur. Chaque partition principale détient donc une instance de l'agent. La méthode process est appelée pour chaque instance une fois par clé se résolvant dans la partition. Le résultat est alors sérialisé vers le client et renvoyé à l'appelant dans une instance Map, dans laquelle le résultat est représenté en tant que valeur de la mappe.
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person {
@Id String ssn;
String firstName;
String surname;
int age;
}
La fonction fournie par l'application est écrite en tant que classe
implémentant l'interface MapAgentGrid. L'exemple d'agent suivant
présente une fonction permettant de renvoyer l'âge d'une personne multiplié par
deux.public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
private static final long serialVersionUID = -2006093916067992974L;
int lowAge;
int highAge;
public Object process(Session s, ObjectMap map, Object key)
{
Person p = (Person)key;
return new Integer(p.age * 2);
}
public Map processAllEntries(Session s, ObjectMap map)
{
EntityManager em = s.getEntityManager();
Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
q.setParameter(1, lowAge);
q.setParameter(2, highAge);
Iterator iter = q.getResultIterator();
Map<Person, Interger> rc = new HashMap<Person, Integer>();
while(iter.hasNext())
{
Person p = (Person)iter.next();
rc.put(p, (Integer)process(s, map, p));
}
return rc;
}
public Class getClassForEntity()
{
return Person.class;
}
}
L'exemple précédent présente l'agent Map utilisé pour doubler une personne. La première méthode est fournie avec
la personne concernée et renvoie le double de l'âge de cette entrée. La seconde est appelée pour chaque partition. Elle recherche tous les objets Person dont l'âge est
compris entre lowAge et highAge et renvoie leur âge après l'avoir multiplié par deux. Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
// réalisation d'une liste de clés
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);
// obtention des résultats pour ces entrées
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);
// Fermer la session (facultatif dans les versions 7.1.1 et ultérieures) pour améliorer les performances
s.close();
L'exemple précédent
illustre un client qui obtient une session et une référence à la mappe
Person. L'opération d'agent est exécutée sur une mappe donnée.
L'interface AgentManager
est extraite de cette mappe. Une instance de l'agent à appeler
est créée et les états nécessaires sont ajoutés à l'objet
en définissant des attributs. Dans ce cas, il n'y en a aucun.
La liste des clés est
alors établie. Une mappe contenant les valeurs correspondant à la personne 1 multipliées par deux
et ces mêmes valeurs pour la personne 2 est renvoyée.L'agent est alors appelé pour cet ensemble de clés. La méthode process de l'agent est appelée sur chaque partition avec certaines clés définies dans la grille en parallèle. Une mappe contenant les résultats fusionnés relatifs à la clé définie est renvoyée. Dans ce cas, une mappe contient les valeurs indiquant l'âge de la personne 1 multiplié par deux et ces mêmes valeurs pour la personne 2 sont renvoyées.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
agent.lowAge = 20;
agent.highAge = 9999;
Map m = amgr.callMapAgent(agent);
L'exemple
précédent présente le gestionnaire d'agents obtenu pour la mappe Person, ainsi
que l'agent construit et initialisé avec les âges inférieur et supérieur pour
les personnes concernées. L'agent est alors appelé à l'aide de la méthode callMapAgent.
Notez qu'aucune clé n'est fournie. La grille d'objets appelle alors
l'agent sur chaque partition de la grille en parallèle, puis renvoie
les résultats fusionnés au client. Cet ensemble de résultats contient tous les objets
Person de la grille dont l'âge est compris entre la valeur inférieure et la valeur supérieure
et calcule leur âge multiplié par deux. Les API de la grille peuvent ainsi être utilisés
pour exécuter une requête permettant de rechercher des entités correspondant à une certaine requête. L'agent est sérialisé et transporté par l'ObjectGrid vers
les partitions pour lesquelles ces entrées sont recherchées. Les résultats sont sérialisés
de manière similaire en vue de leur transport vers le client. Utilisez l'API Map
avec prudence. Si la grille d'objets hébergeait des téraoctets d'objets
et s'exécutait sur de nombreux serveurs, seules les machines client
auraient à supporter une charge importante. Utilisez les API Map pour traiter un petit sous-ensemble.
Si le nombre de transactions à traiter est plus élevé, utilisez un agent de réduction pour exécuter
le traitement dans la grille plutôt que sur le client.Réduction parallèle ou agents de regroupement
Flux d'appels ReduceGridAgent
Lorsque la méthode AgentManager.callReduceAgent est appelée avec une collection de clés, l'instance ReduceGridAgent est sérialisée et envoyée à chaque partition principale dans laquelle la clé est résolue. Ceci signifie que les données d'instance stockées dans l'agent peuvent être envoyées au serveur. Chaque partition principale détient donc une instance de l'agent. La méthode reduce(Session s, ObjectMap map, Collection keys) est appelée une fois par instance (partition) avec le sous-ensemble de clés se résolvant dans la partition. Le résultat de chaque méthode reduce est alors sérialisé vers le client. La méthode reduceResults est appelée sur l'instance ReduceGridAgent du client avec la collection des résultats de chaque appel distant de la méthode reduce. Le résultat de la méthode reduceResults est renvoyé à l'appelant de la méthode callReduceAgent.
package com.ibm.ws.objectgrid.test.agent.jdk5;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;
public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;
/**
* Appelé sur le serveur si une collection de clés est transmise à
* AgentManager.callReduceAgent(). Cet appel est effectué sur chaque
* fragment primaire auquel la clé s'applique.
*/
public Object reduce(Session s, ObjectMap map, Collection keyList) {
try {
int sum = 0;
Iterator<PersonKey> iter = keyList.iterator();
while (iter.hasNext()) {
Object nextKey = iter.next();
PersonKey pk = (PersonKey) nextKey;
Person p = (Person) map.get(pk);
sum += p.age;
}
return sum;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* Appelé sur le serveur si une collection de clés n'est PAS transmise à
* AgentManager.callReduceAgent(). Cet appel est effectué sur chaque fragment primaire.
*/
public Object reduce(Session s, ObjectMap map) {
ObjectQuery q = s
.createObjectQuery("select p from Person p where p.age > -1");
Iterator<Person> iter = q.getResultIterator();
int sum = 0;
while (iter.hasNext()) {
Object nextKey = iter.next();
Person p = (Person) nextKey;
sum += p.age;
}
return sum;
}
/**
* Appelé sur le client pour limiter les résultats de toutes les partitions.
*/
public Object reduceResults(Collection results) {
// Si EntryErrorValue est rencontré, lancement d'un RuntimeException
// pour indiquer qu'il y a au moins une défaillance et inclusion de chaque
// EntryErrorValue
// dans l'exception générée.
Iterator<Integer> iter = results.iterator();
int sum = 0;
while (iter.hasNext()) {
Object nextResult = iter.next();
if (nextResult instanceof EntryErrorValue) {
EntryErrorValue eev = (EntryErrorValue) nextResult;
throw new RuntimeException(
"Error encountered on one of the partitions: "
+ nextResult, eev.getException());
}
sum += ((Integer) nextResult).intValue();
}
return new Integer(sum);
}
}
L'exemple précédent présente cet agent.
Il se compose
de trois parties principales. La première permet le traitement d'un ensemble précis de données
sans requête. Elle interagit avec les entrées, en ajoutant leur âge. La somme est renvoyée à la méthode.
La deuxième utilise une
requête pour sélectionner les entrées à regrouper. Elle ajoute alors tous
les âges correspondants. La troisième méthode permet de regrouper les résultats
de chaque partition dans un seul résultat. La grille d'objets procède à ce regroupement
en parallèle dans la grille.
Chaque partition produit
un résultat intermédiaire à regrouper avec les résultats
intermédiaires des autres partitions. La troisième méthode exécute cette tâche. Dans
l'exemple suivant, l'agent est appelé et seuls les âges des personnes
âgées de 10 à 20 ans sont regroupés :Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
SumAgeReduceAgent agent = new SumAgeReduceAgent();
Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);
// Fermer la session (facultatif dans les versions 7.1.1 et ultérieures) pour améliorer les performances
s.close();
Fonctions de l'agent
L'agent est libre d'exécuter des opérations ObjectMap ou EntityManager dans le fragment local dans lequel il s'exécute. Il reçoit une Session et il peut ajouter, mettre à jour, interroger, lire ou supprimer des données de la partition que la Session représente. Certaines applications interrogent uniquement les données de la grille, mais vous pouvez aussi écrire permettant d'incrémenter de 1 les âges des personnes correspondant à une certaine requête. Une transaction s'exécute sur la Session lors de l'appel de l'agent. Elle est validée lorsque l'agent renvoie un résultat, à moins qu'une exception se produise.
Traitement des erreurs
Si un agent de mappe est appelé avec une clé inconnue, la valeur renvoyée est un objet d'erreur qui implémente l'interface EntryErrorValue.
Transactions
Un agent de mappe s'exécute dans une transaction distincte du client. Les appels d'agent peuvent être groupés dans une même transaction. Si un agent échoue et renvoie une exception, la transaction est annulée. Tous les agents dont l'exécution dans une transaction a abouti sont annulés en même temps que l'agent ayant échoué. Le gestionnaire d'agents relance l'exécution des agents annulés dont l'exécution a abouti dans une nouvelle transaction.