Distribución de cambios entre JVM de igual

Los objetos LogSequence y LogElement distribuyen cambios entre JVM de igual y comunican los cambios que se han producido en una transacción de eXtreme Scale con un plug-in ObjectGridEventListener.

Si desea más información sobre cómo se puede utilizar JMS (Java Message Service) para distribuir los cambios transaccionales, consulte Distribución de transacciones.

Como requisito previo, ObjectGridManager debe almacenar en memoria caché la instancia de ObjectGrid. Si desea más información, consulte los métodos createObjectGrid. El valor booleano de cacheInstance debe establecerse en true.

No es necesario implementar este mecanismo. Existe un mecanismo de réplica de igual a igual incorporado para que utilice esta función. Consulte Configuración de réplica de igual a igual con JMS.

Los objetos proporcionan una forma sencilla para que una aplicación publique los cambios producidos en un ObjectGrid mediante el transporte de un mensaje a ObjectGrids de igual en Mäquinas virtuales Java remotas y después los aplique en dicha JVM . La clase LogSequenceTransformer es fundamental para habilitar este soporte. Este artículo examina cómo escribir un escucha mediante el sistema de mensajería JMS (Java Message Service) para propagar los mensajes. Con ese fin, eXtreme Scale soporta la transmisión de LogSequences que genera una operación de confirmación de la transacción de eXtreme Scale entre varios miembros del clúster de WebSphere Application Server con un plug-in proporcionado por IBM. Esta función no está habilitada de manera predeterminada, pero puede configurarse. Sin embargo, si el consumidor o el productor no es un WebSphere Application Server, podría ser necesario utilizar un sistema de mensajería JMS externo.

Implementación del mecanismo

La clase LogSequenceTransformer y las API ObjectGridEventListener, LogSequence y LogElement permiten el uso de operaciones fiables de publicar y suscribir para distribuir los cambios y filtrar las correlaciones que desea distribuir. Los fragmentos de código de este tema muestran cómo utilizar estas API con JMS para crear un ObjectGrid de igual a igual compartido por aplicaciones que se alojan en un conjunto diverso de plataformas que comparten un transporte de mensajes común.

Inicializar el plug-in

ObjectGrid llama al método initialize del plug-in, parte del contrato de la interfaz ObjectGridEventListener, cuando ObjectGrid se inicia. El método initialize debe obtener sus recursos JMS, incluidos las conexiones, sesiones y editores, e iniciar la hebra que es la escucha JMS.

En los ejemplos siguientes se muestra el método initialize:

Ejemplo del método initialize
public void initialize(Session session) {
        mySession = session;
        myGrid = session.getObjectGrid();
        try {
            if (mode == null) {
                throw new ObjectGridRuntimeException("No mode specified");
            }
            if (userid != null) {
                connection = topicConnectionFactory.createTopicConnection(userid, 
								password);
            } else
                connection = topicConnectionFactory.createTopicConnection();

            // debe iniciarse la conexión para recibir mensajes.
            connection.start();

            // la sesión jms no es transaccional (false).
            jmsSession = connection.createTopicSession(false, 
							javax.jms.Session.AUTO_ACKNOWLEDGE);
            if (topic == null)
                if (topicName == null) {
                    throw new ObjectGridRuntimeException("Topic not specified");
                } else {
                    topic = jmsSession.createTopic(topicName);
                }
            publisher = jmsSession.createPublisher(topic);
            
// iniciar la hebra de la escucha.
            listenerRunning = true;
            listenerThread = new Thread(this);
            listenerThread.start();
        } catch (Throwable e) {
            throw new ObjectGridRuntimeException("Cannot initialize", e);
        }
    }

El código para iniciar la hebra utiliza una hebra Java 2 Platform, Standard Edition (Java SE). Si ejecuta un servidor WebSphere Application Server versión 6.x o un servidor WebSphere Application Server versión 5.x Enterprise, utilice la interfaz de programación de aplicaciones (API) del bean asíncrono para iniciar esta hebra de daemon. También puede utilizar las API comunes. A continuación se muestra un ejemplo de fragmento de código de sustitución que muestra la misma acción mediante el uso de un gestor de trabajo:

// iniciar la hebra de la escucha.
listenerRunning = true;
workManager.startWork(this, true);

El plug-in también debe implementar la interfaz Work en lugar de la interfaz Runnable. Debe además añadir un método release para establecer la variable listenerRunning en false. El plug-in debe proporcionarse con una instancia WorkManager en su constructor y mediante inyección si se utiliza un contenedor IoC (Inversión de control).

Transmitir los cambios

A continuación se muestra un método transactionEnd de ejemplo para publicar los cambios locales realizados en un ObjectGrid. En este ejemplo se utiliza un JMS, aunque puede utilizarse cualquier transporte de mensajes capaz de producir una mensajería de publicar y suscribir fiable.

Ejemplo del método transactionEnd
// Este método se sincroniza para garantizar que
    // los mensajes se publican en el orden en que se
    // confirmó la transacción. Si se empieza publicando los mensajes
    // en paralelo, los receptores podrían dañar la correlación
    // ya que las operaciones de supresión pueden llegar antes que las de inserción, etc.
    public synchronized void transactionEnd(String txid, boolean isWriteThroughEnabled, boolean committed,
            Collection changes) {
        try {
            // debe utilizarse la modalidad write through y confirmarse.
            if (isWriteThroughEnabled && committed) {
                // escribir las secuencias en un byte []
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(bos);
                if (publishMaps.isEmpty()) {
                    // serializar toda la colección
                    LogSequenceTransformer.serialize(changes, oos, this, mode);
                } else {
                    // filtrar LogSequences basado en el contenido de publishMaps
                    Collection publishChanges = new ArrayList();
                    Iterator iter = changes.iterator();
                    while (iter.hasNext()) {
                        LogSequence ls = (LogSequence) iter.next();
                        if (publishMaps.contains(ls.getMapName())) {
                            publishChanges.add(ls);
                        }
                    }
                    LogSequenceTransformer.serialize(publishChanges, oos, this, mode);
                }
                // realizar un mensaje de objeto para los cambios
                oos.flush();
                ObjectMessage om = jmsSession.createObjectMessage(bos.toByteArray());
                // establecer propiedades
                om.setStringProperty(PROP_TX, txid);
                om.setStringProperty(PROP_GRIDNAME, myGrid.getName());
                // transmitirlo.
                publisher.publish(om);
            }
        } catch (Throwable e) {
            throw new ObjectGridRuntimeException("Cannot push changes", e);
        }
    }
Este método utiliza diversas variables de instancia:
  • Variable jmsSession: sesión JMS que se utiliza para publicar mensajes. Se crea al inicializarse el plug-in.
  • Variable mode: modo de distribución.
  • Variable publishMaps: conjunto que contiene el nombre de cada correlación con los cambios que se van a publicar. Si la variable está vacía, se publicarán todas las correlaciones.
  • Variable publisher: objeto TopicPublisher que se crea durante el método initialize del plug-in.

Recibir y aplicar mensajes de actualización

A continuación se muestra el método run. Este método se ejecuta en un bucle hasta que la aplicación detiene el bucle. Cada repetición del bucle intenta recibir un mensaje JMS y aplicarlo a ObjectGrid.

Ejemplo del método de ejecución del mensaje JMS 
private synchronized boolean isListenerRunning() {
        return listenerRunning;
    }

    public void run() {
        try {
            System.out.println("Listener starting");
            // obtener una sesión jms para recibir los mensajes.
            // No transaccional.
            TopicSession myTopicSession;
            myTopicSession = connection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);

            // obtener un suscriptor para el tema, true indica no recibir
            // mensajes transmitidos mediante editores
            // en esta conexión. De lo contrario, recibiríamos nuestras propias actualizaciones.
            TopicSubscriber subscriber = myTopicSession.createSubscriber(topic, null, true);
            System.out.println("Listener started");
            while (isListenerRunning()) {
                ObjectMessage om = (ObjectMessage) subscriber.receive(2000);
                if (om != null) {
                    // Usar objeto Session pasado en el método initialize...
                    // es muy importante utilizarlo, sin write through
                    mySession.beginNoWriteThrough();
                    byte[] raw = (byte[]) om.getObject();
                    ByteArrayInputStream bis = new ByteArrayInputStream(raw);
                    ObjectInputStream ois = new ObjectInputStream(bis);
                    // inflar LogSequences
                    Collection collection = LogSequenceTransformer.inflate(ois, myGrid);
                    Iterator iter = collection.iterator();
                    while (iter.hasNext()) {
                        // procesar los cambios de las correlaciones de acuerdo con la modalidad
                        // una vez serializado LogSequence
                        LogSequence seq = (LogSequence) iter.next();
                        mySession.processLogSequence(seq);
                    }
                    mySession.commit();
                } // if there was a message
            } // while loop
            // detener la conexión
            connection.close();
        } catch (IOException e) {
            System.out.println("IO Exception: " + e);
        } catch (JMSException e) {
            System.out.println("JMS Exception: " + e);
        } catch (ObjectGridException e) {
            System.out.println("ObjectGrid exception: " + e);
            System.out.println("Caused by: " + e.getCause());
        } catch (Throwable e) {
            System.out.println("Exception : " + e);
        }
        System.out.println("Listener stopped");
    }