Répartition de modifications entre des machines virtuelles Java homologues

Les objets LogSequence et LogElement répartissent les modifications entre des machines virtuelles Java homologues et, à l'aide d'un plug-in ObjectGridEventListener, ils communiquent les modifications intervenues dans une transaction eXtreme Scale.

Pour plus d'informations sur l'utilisation de JMS (Java Message Service) pour répartir les modifications transactionnelles, voir Répartition des transactions.

Il est impératif au préalable que l'instance ObjectGrid soit mise en cache par ObjectGridManager. Voir les méthodes createObjectGrid pour plus d'informations à ce sujet. La valeur booléenne de cacheInstance doit être définie comme true.

Il n'est pas nécessaire que vous implémentiez vous-même ce mécanisme. Il existe en effet un mécanisme pré-intégré de réplication entre homologues qui se chargera de cette fonction. Voir Configuration de la réplication entre homologues avec JMS.

Les objets fournissent aux applications le moyen de publier facilement les modifications qui sont intervenues dans un ObjectGrid, à savoir un transport de messages vers des ObjectGrids homologues situés sur des machines virtuelles Java pour appliquer ces modifications sur ces JVM. La classe LogSequenceTransformer est indispensable pour cette prise en charge. Nous allons expliquer ici comment écrire, pour la propagation des messages, un programme d'écoute utilisant un système de messagerie Java Message Service (JMS). En fait un plug-in IBM permet à eXtreme Scale de prendre en charge la transmission de LogSequences qui résultent de la validation d'une transaction eXtreme Scale entre des membres d'un cluster WebSphere Application Server. Cette fonction n'est pas activée par défaut, mais il est possible de la configurer pour la rendre opérationnelle. Mais, lorsque l'utilisateur ou le producteur ne sont pas WebSphere Application Server, le recours à un système externe de messagerie JMS peut s'avérer nécessaire.

Implémenter le mécanisme

La classe LogSequenceTransformer et les API ObjectGridEventListener, LogSequence et LogElement permettent l'utilisation de n'importe quel mécanisme de publication/abonnement pour la répartition des modifications et le filtrage des mappes à répartir. Les fragments de code utilisés ici montrent comment exploiter ces API avec JMS pour construire un ObjectGrid d'égal à égal, partagé par des applications hébergées sur plusieurs sortes de plateformes partageant un transport commun de messages.

Initialisation du plug-in

L'ObjectGrid appelle la méthode initialize du plug-in, qui fait partie du contrat de l'interface ObjectGridEventListener, lorsque l'ObjectGrid démarre. La méthode initialize doit obtenir ses ressources JMS (connexions, sessions et diffuseurs de publications) et elle doit démarrer l'unité d'exécution qu'est le programme d'écoute JMS.

Les exemples suivants illustrent la méthode initialize :

exemple de méthode initialize
public void initialize(Session session) {
        mySession = session;
        myGrid = session.getObjectGrid();
        try {
            if (mode == null) {
                throw new ObjectGridRuntimeException("No mode specified");
            }
            if (userid != null) {
                connection = topicConnectionFactory.createTopicConnection(userid, 
								password);
            } else
                connection = topicConnectionFactory.createTopicConnection();

            // need to start the connection to receive messages.
            connection.start();

            // the jms session is not transactional (false).
            jmsSession = connection.createTopicSession(false, 
							javax.jms.Session.AUTO_ACKNOWLEDGE);
            if (topic == null)
                if (topicName == null) {
                    throw new ObjectGridRuntimeException("Topic not specified");
                } else {
                    topic = jmsSession.createTopic(topicName);
                }
            publisher = jmsSession.createPublisher(topic);
            // start the listener thread.
            listenerRunning = true;
            listenerThread = new Thread(this);
            listenerThread.start();
        } catch (Throwable e) {
            throw new ObjectGridRuntimeException("Cannot initialize", e);
        }
    }

L'unité d'exécution lancée par le code est une unité Java 2 Platform, Standard Edition (Java SE). Pour une exécution sur WebSphere Application Server version 6.x ou sur WebSphere Application Server version 5.x Enterprise, vous devrez utiliser l'API de bean asynchrone pour lancer cette unité d'exécution de démon. Vous pouvez également utiliser les API communes. Voici un exemple de fragment de code montrant la même action effectuée à l'aide d'un gestionnaire de travaux :

// start the listener thread.
listenerRunning = true;
workManager.startWork(this, true);

Par ailleurs, le plug-in doit implémenter l'interface Work et non l'interface Runnable. Vous devez également ajouter une méthode release pour définir comme false la variable listenerRunning. Le plug-in doit être fourni avec une instance WorkManager dans son constructeur ou par injection si l'on utilise un conteneur IoC (Inversion of Control).

Transmission des modifications

Voici un exemple de méthode transactionEnd pour la publication des modifications locales apportées à un ObjectGrid. Cet exemple utilise JMS, bien qu'il soit possible d'utiliser n'importe quel transport de messages capable de publication/abonnement fiable.

Exemple de méthode transactionEnd
// This method is synchronized to make sure the
    // messages are published in the order the transaction
    // were committed. If we started publishing the messages
    // in parallel then the receivers could corrupt the Map
    // as deletes may arrive before inserts etc.
    public synchronized void transactionEnd(String txid, boolean isWriteThroughEnabled,
				boolean committed,
            Collection changes) {
        try {
            // must be write through and commited.
            if (isWriteThroughEnabled && committed) {
                // write the sequences to a byte []
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(bos);
                if (publishMaps.isEmpty()) {
                    // serialize the whole collection
                    LogSequenceTransformer.serialize(changes, oos, this, mode);
                } else {
                    // filter LogSequences based on publishMaps contents
                    Collection publishChanges = new ArrayList();
                    Iterator iter = changes.iterator();
                    while (iter.hasNext()) {
                        LogSequence ls = (LogSequence) iter.next();
                        if (publishMaps.contains(ls.getMapName())) {
                            publishChanges.add(ls);
                        }
                    }
                    LogSequenceTransformer.serialize(publishChanges, oos, this, mode);
                }
                // make an object message for the changes
                oos.flush();
                ObjectMessage om = jmsSession.createObjectMessage(bos.toByteArray());
                // set properties
                om.setStringProperty(PROP_TX, txid);
                om.setStringProperty(PROP_GRIDNAME, myGrid.getName());
                // transmit it.
                publisher.publish(om);
            }
        } catch (Throwable e) {
            throw new ObjectGridRuntimeException("Cannot push changes", e);
        }
    }
Cette méthode utilise plusieurs variables d'instance :
  • La variable jmsSession : session JMS servant à publier les messages. Elle est créée lors de l'initialisation du plug-in.
  • La variable mode : le mode de répartition.
  • La variable publishMaps : ensemble contenant le nom de chacune des mappes dont les modifications sont à publier. La variable vide signifie que la totalité des mappes sont à publier.
  • La variable publisher : objet TopicPublisher qui est créé durant l'initialisation du plug-in.

Réception et application des messages d'actualisation

Vient à présent la méthode run. Cette méthode s'exécute en boucle jusqu'à ce que l'application arrête la boucle. Chaque itération de la boucle tente de réceptionner un message JMS et de l'appliquer à l'ObjectGrid.

Exemple de méthode run de message JMS
private synchronized boolean isListenerRunning() {
        return listenerRunning;
    }

    public void run () {
        try {
            System.out.println("Listener starting");
            // get a jms session for receiving the messages.
            // Non transactional.
            TopicSession myTopicSession;
            myTopicSession = connection.createTopicSession(false, javax.jms.
							Session.AUTO_ACKNOWLEDGE);

            // get a subscriber for the topic, true indicates don't receive
            // messages transmitted using publishers
            // on this connection. Otherwise, we'd receive our own updates.
            TopicSubscriber subscriber = myTopicSession.createSubscriber(topic,
							null, true);
            System.out.println("Listener started");
            while (isListenerRunning()) {
                ObjectMessage om = (ObjectMessage) subscriber.receive(2000);
                if (om != null) {
                    // Use Session that was passed in on the initialize...
                    // very important to use no write through here
                    mySession.beginNoWriteThrough();
                    byte[] raw = (byte[]) om.getObject();
                    ByteArrayInputStream bis = new ByteArrayInputStream(raw);
                    ObjectInputStream ois = new ObjectInputStream(bis);
                    // inflate the LogSequences
                    Collection collection = LogSequenceTransformer.inflate(ois,
											myGrid);
                    Iterator iter = collection.iterator();
                    while (iter.hasNext()) {
                        // process each Maps changes according to the mode when
                        // the LogSequence was serialized
                        LogSequence seq = (LogSequence) iter.next();
                        mySession.processLogSequence(seq);
                    }
                    mySession.commit();
                } // if there was a message
            } // while loop
            // stop the connection
            connection.close();
        } catch (IOException e) {
            System.out.println("IO Exception: " + e);
        } catch (JMSException e) {
            System.out.println("JMS Exception: " + e);
        } catch (ObjectGridException e) {
            System.out.println("ObjectGrid exception: " + e);
            System.out.println("Caused by: " + e.getCause());
        } catch (Throwable e) {
            System.out.println("Exception : " + e);
        }
        System.out.println("Listener stopped");
    }