Änderungen an Peer-JVMs verteilen

Die Objekte "LogSequence" und "LogElement" verteilen Änderungen zwischen Peer-JVMs und kommunizieren Änderungen, die in einer eXtreme-Scale-Transaktion stattfinden über ein ObjectGridEventListener-Plug-in.

Weitere Informationen zur Verwendung von Java Message Service (JMS) für die Verteilung von Transaktionsänderungen finden Sie im Abschnitt Transaktionen verteilen.

Eine Voraussetzung ist, dass die ObjectGrid-Instanz vom ObjectGridManager zwischengespeichert wird. Weitere Einzelheiten finden Sie in den Informationen zu den createObjectGrid-Methoden. Die boolesche Eigenschaft "cacheInstance" muss auf "true" gesetzt werden.

Dieser Mechanismus muss nicht implementiert werden. Sie können einen integrierten Mechanismus für die Peer-to-Peer-Replikation verwenden, um diese Funktion zu nutzen. Weitere Informationen finden Sie unter Peer-to-Peer-Replikation mit JMS konfigurieren.

Eine Anwendung kann diese Objekte verwenden, um Änderungen, die in einem ObjectGrid vorgenommen werden, problemlos über einen Nachrichtentransport an die Peer-ObjectGrids in fernen Java Virtual Machines zu veröffentlichen und die Änderungen anschließend in dieser JVM anzuwenden. Die Klasse "LogSequenceTransformer" ist für die Aktivierung dieser Unterstützung kritisch. In diesem Abschnitt wird beschrieben, wie ein Listener mit einem JMS-Messaging-System für die Weitergabe der Nachrichten geschrieben wird. Zu diesem Zweck unterstützt eXtreme Scale die Übertragung von LogSequence-Objekten, die sich aus der Festschreibung einer eXtreme-Scale-Transaktion ergeben, über ein von IBM bereitgestelltes Plug-in an die Cluster-Member von WebSphere Application Server. Diese Funktion ist standardmäßig nicht aktiviert, kann aber konfiguriert werden. Wenn der Konsument oder Erzeuger jedoch kein WebSphere Application Server ist, kann die Verwendung eines externen JMS-Messaging-Systems erforderlich sein.

Mechanismus implementieren

Die Klasse "LogSequenceTransformer" und die APIs "ObjectGridEventListener", "LogSequence" und "LogElement" lassen die Verwendung jedes zuverlässigen Publish/Subscribe-Mechanismus für die Verteilung der Änderungen und die Filterung der zu verteilenden Maps zu. Die Snippets in diesem Abschnitt veranschaulichen, wie diese APIs mit JMS verwendet werden können, um ein Peer-to-Peer-ObjectGrid zu erstellen, das von Anwendungen gemeinsam genutzt wird, die sich auf verschiedenen Plattformen befinden, die einen gemeinsamen Nachrichtentransport verwenden.

Plug-in initialisieren

Das ObjectGrid ruft im Rahmen des ObjectGridEventListener-Schnittstellenvertrags die Methode "initialize" des Plug-ins auf, wenn das ObjectGrid gestartet wird. Die Methode "initialize" muss seine JMS-Ressourcen, einschließlich Verbindungen, Sitzungen und Veröffentlichungskomponenten (so genannten Publishern), anfordern und den Thread für den JMS-Listener starten.

Die folgenden Beispiele zeigen die Methode "initialize":

Beispiel für die Methode initialize
public void initialize(Session session) {
        mySession = session;
        myGrid = session.getObjectGrid();
        try {
            if (mode == null) {
                throw new ObjectGridRuntimeException("No mode specified");
            }
            if (userid != null) {
                connection = topicConnectionFactory.createTopicConnection(userid,
								password);
                        } else
                connection = topicConnectionFactory.createTopicConnection();

            // Die Verbindung muss gestartet werden, um Nachrichten zu empfangen.
            connection.start();

            // Die JMS_Sitzung ist nicht transaktionsorientiert (false).
            jmsSession = connection.createTopicSession(false,
							javax.jms.Session.AUTO_ACKNOWLEDGE);
                        if (topic == null)
                if (topicName == null) {
                    throw new ObjectGridRuntimeException("Topic not specified");
                } else {
                    topic = jmsSession.createTopic(topicName);
                }
            publisher = jmsSession.createPublisher(topic);
            // Listener-Thread starten.
            listenerRunning = true;
            listenerThread = new Thread(this);
            listenerThread.start();
        } catch (Throwable e) {
            throw new ObjectGridRuntimeException("Cannot initialize", e);
        }
    }

Der Code zum Starten des Threads verwendet einen Java-SE-Thread (Java 2 Platform, Standard Edition). Wenn Sie einen Server von WebSphere Application Server Version 6.x oder WebSphere Application Server Version 5.x ausführen, verwenden Sie die API für asynchrone Beans, um diesen Dämon-Thread zu starten. Sie können auch die allgemeinen APIs verwenden. Im Folgenden sehen Sie ein Beispiel für ein Ersatz-Snippet, dass diese Aktion mit einem Arbeitsmanager veranschaulicht:

// Listener-Thread starten.
listenerRunning = true;
workManager.startWork(this, true);

Das Plug-in muss die Schnittstelle "Work" an Stelle der Schnittstelle "Runnable" implementieren. Außerdem müssen Sie eine Methode "release" hinzufügen, um die Variable "listenerRunning" auf "false" zu setzen. Das Plug-in muss mit einer WorkManager-Instanz in seinem Konstruktor bzw. bei Verwendung eines IoC-Containers (Inversion of Control) durch Injektion bereitgestellt werden.

Änderungen übertragen

Im Folgenden sehen Sie eine Beispielmethode "transactionEnd" für die Veröffentlichung der lokalen Änderungen, die in einem ObjectGrid vorgenommen werden. In diesem Beispiel wird JMS verwendet, aber Sie können jeden Nachrichtentransport verwenden, der zuverlässiges Publish/Subscribe-Messaging unterstützt.

Beispiel für die Methode transactionEnd
     // Diese Methode wird synchronisiert, um sicherzustellen,
    // dass die Nachrichten in der Reihenfolge veröffentlicht werden, in die
    // Transaktionen festgeschrieben werden. Falls die Veröffentlichung der Nachrichten
    // parallel gestartet wird, könnten die Empfänger die Map beschädigen,
    // da Löschanforderungen vor Einfügeanforderungen usw. ankommen könnten.
    public synchronized void transactionEnd(String txid, boolean isWriteThroughEnabled, boolean committed,
            Collection changes) {
        try {
            // Muss Write-through und festgeschrieben sein.
            if (isWriteThroughEnabled && committed) {
                // Folgen in eine Bytefeldgruppe (byte []) schreiben.
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(bos);
                if (publishMaps.isEmpty()) {
                    // Gesamte Sammlung serialisieren
                    LogSequenceTransformer.serialize(changes, oos, this, mode);
                } else {
                    // LogSequence-Objekte auf der Basis des publishMaps-Inhalts filtern.
                    Collection publishChanges = new ArrayList();
                    Iterator iter = changes.iterator();
                    		while (iter.hasNext()) {
                        LogSequence ls = (LogSequence) iter.next();
                        if (publishMaps.contains(ls.getMapName())) {
                            publishChanges.add(ls);
                        }
                    }
                    LogSequenceTransformer.serialize(publishChanges, oos, this, mode);
                }
                // Objektnachricht für die Änderungen erstellen.
                oos.flush();
                ObjectMessage om = jmsSession.createObjectMessage(bos.toByteArray());
                // Eigenschaften festlegen.
                om.setStringProperty(PROP_TX, txid);
                om.setStringProperty(PROP_GRIDNAME, myGrid.getName());
                // Übertragen.
                publisher.publish(om);
            }
        } catch (Throwable e) {
            throw new ObjectGridRuntimeException("Cannot push changes", e);
        }
    }
In dieser Methode werden verschiedene Instanzvariablen verwendet:
  • Variable jmsSession: Eine JMS-Sitzung, die zum Veröffentlichen von Nachrichten verwendet wird. Sie wird bei der Initialisierung des Plug-ins erstellt.
  • Variable mode: Der Verteilungsmodus.
  • Variable publishMaps: Eine Gruppe, die die Namen der einzelnen Maps mit zu veröffentlichenden Änderungen enthält. Wenn die Variable leer ist, werden alle Maps veröffentlicht.
  • Variable publisher: Ein TopicPublisher-Objekt, das während der Ausführung der Methode "initialize" des Plug-ins ausgeführt wird.

Aktualisierungsnachricht empfangen und anwenden

Im Folgenden sehen Sie eine Beispielmethode "run". Diese Methode wird in einer Schleife ausgeführt, bis die Anwendung die Schleife stoppt. In jeder Schleifeniteration wird versucht, eine JMS-Nachricht zu empfangen und auf das ObjectGrid anzuwenden.

Beispiel für die Methode run für JMS-Nachrichten
private synchronized boolean isListenerRunning() {
        return listenerRunning;
    }

    public void run() {
        try {
            System.out.println("Listener starting");
            // JMS-Sitzung für den Empfang der Nachrichten abrufen.
            // Nicht transaktionsorientiert.
            TopicSession myTopicSession;
            myTopicSession = connection.createTopicSession(false, javax.jms.
							Session.AUTO_ACKNOWLEDGE);

            
            // Subskribenten für das Topic abrufen. True gibt an, dass keine Nachrichten
            // empfangen werden, die über Publisher in dieser Verbindung übertragen
            // wurden. Sonst werden eigene Aktualisierungen empfangen.
            TopicSubscriber subscriber = myTopicSession.createSubscriber(topic,
							null, true);
                        System.out.println("Listener started");
            while (isListenerRunning()) {
                ObjectMessage om = (ObjectMessage) subscriber.receive(2000);
                if (om != null) {
                    // Sitzung verwenden, die bei der Initialisierung übergeben wurde.
                    // Sehr wichtig, dass Write-through (Durchschreiben) hier nicht
                    // verwendet wird.
                    mySession.beginNoWriteThrough();
                    byte[] raw = (byte[]) om.getObject();
                    ByteArrayInputStream bis = new ByteArrayInputStream(raw);
                    ObjectInputStream ois = new ObjectInputStream(bis);
                    // LogSequence-Objekte dekomprimieren.
                    Collection collection = LogSequenceTransformer.inflate(ois,
											myGrid);
                                        Iterator iter = collection.iterator();
                    		while (iter.hasNext()) {
                        // Änderungen jeder Map entsprechend dem bei der
                        // Serialisierung des LogSequence-Objekts verwendeten
                        // Modus verarbeiten.
                                                LogSequence seq = (LogSequence) iter.next();
                        mySession.processLogSequence(seq);
                    }
                    mySession.commit();
                } // Wenn eine Nachricht vorhanden ist
            } // while loop
            // Verbindung stoppen.
            connection.close();
        } catch (IOException e) {
            System.out.println("IO Exception: " + e);
        } catch (JMSException e) {
            System.out.println("JMS Exception: " + e);
        } catch (ObjectGridException e) {
            System.out.println("ObjectGrid exception: " + e);
            System.out.println("Caused by: " + e.getCause());
        } catch (Throwable e) {
            System.out.println("Exception : " + e);
        }
        System.out.println("Listener stopped");
    }