Os objetos LogSequence e LogElement distribuem mudanças entre JVMs peer e comunicam as mudanças que ocorreram em uma transação do eXtreme Scale com um plug-in do ObjectGridEventListener.
Para obter informações adicionais sobre como a JMS (Java Message Service) pode ser usado para distribuir mudanças transacionais, consulte Distribuindo Transações.
Um pré-requisito é que a instância do ObjectGrid deve ser armazenada em cache pelo ObjectGridManager. Consulte Métodos createObjectGrid para obter mais informações. O valor booleano cacheInstance deve ser configurado como true.
Não é necessário que você implemente esse mecanismo. Há um mecanismo de replicação ponto a ponto integrado disponível para utilizar esta função. Consulte Configurando Replicação Ponto a Ponto com o JMS.
Os objetos fornecem um meio para que um aplicativo publique facilmente alterações que ocorreram em um ObjectGrid utilizando um transporte de mensagens para ObjectGrids peer nas Java Virtual Machines remotas e, então, aplica estas alterações em tais JVM. A classe LogSequenceTransformer é importante para ativar este suporte. Este artigo examina como escrever um listener usando um sistema de mensagens JVM (Java Message Service) para propagação das mensagens. Para este fim, o eXtreme Scale suporta a transmissão de LogSequences que resultam de uma consolidação transacional de um eXtreme Scale através dos membros do cluster do WebSphere Application Server com um plug-in fornecido pela IBM. Esta função não é ativada por padrão, mas pode ser configurada para ser operacional. Porém, quando o consumidor ou produtor não for um WebSphere Application Server, o uso de um sistema de mensagens JMS externo pode ser necessário.
A classe LogSequenceTransformer e as APIs ObjectGridEventListener, LogSequence e LogElement permitem que qualquer publicação-e-assinatura confiável seja utilizada para distribuir as alterações e filtrar os mapas que você deseja distribuir. Os snippets neste tópico mostram como utilizar estas APIs com o JMS para criar um ObjectGrid ponto a ponto compartilhado por aplicativos que estão hospedados em um conjunto diverso de plataforma compartilhando um transporte de mensagens comum.
Inicialize o plug-in
O ObjectGrid chama o método de inicialização do plug-in, parte do contrato da interface ObjectGridEventListener, quando o ObjectGrid inicia. O método de inicialização deve obter seus recursos JMS, incluindo conexões, sessões, e publicadores, e inicia o encadeamento que é o listener JMS.
Os exemplos a seguir mostram o método de inicialização:
exemplo do método de inicializar
public void initialize(Session session) {
mySession = session;
myGrid = session.getObjectGrid();
try {
if (mode == null) {
throw new ObjectGridRuntimeException("No mode specified");
}
if (userid != null) {
connection = topicConnectionFactory.createTopicConnection
(userid, password);
} else
connection = topicConnectionFactory.createTopicConnection();
// need to start the connection to receive messages.
connection.start();
// the jms session is not transactional (false).
jmsSession = connection.createTopicSession(false,
javax.jms.Session.AUTO_ACKNOWLEDGE);
if(topic == null)
if (topicName == null) {
throw new ObjectGridRuntimeException("Topic not specified");
} else {
topic = jmsSession.createTopic(topicName);
}
publisher = jmsSession.createPublisher(topic);
// iniciar o encadeamento do listener.
listenerRunning = true;
listenerThread = new Thread(this);
listenerThread.start();
} catch (Throwable e) {
throw new ObjectGridRuntimeException("Cannot initialize", e);
}
}
O código para iniciar o encadeamento usa um encadeamento Java 2 Platform, Standard Edition (Java SE). Se você estiver executando um WebSphere Application Server Versão 6.x ou um servidor enterprise WebSphere Application Server Version 5.x, use a interface de programação de aplicativos (API) do bean assíncrono para iniciar este encadeamento de daemon. Também é possível utilizar as APIs comuns. A seguir está um snippet de substituição de exemplo que mostra a mesma ação utilizando um gerenciador de trabalho:
// iniciar o encadeamento do listener.
listenerRunning = true;
workManager.startWork(this, true);
O plug-in também deve implementar a interface Work em vez da interface Runnable. Também é necessário incluir um método de liberação para configurar a variável listenerRunning como false. O plug-in deve ser fornecido com uma instância do gerenciador de trabalho em seu construtor ou por injeção, se estiver utilizando um contêiner IoC (Inversion of Control).
Transmita as alterações
A seguir, está um método transactionEnd de amostra para a publicação das alterações locais que são feitas em um ObjectGrid. Esta amostra utiliza o JMS, embora seja possível utilizar qualquer transporte de mensagens que seja capaz de emitir mensagens de publicação-e-assinatura confiáveis.
Exemplo do método transactionEnd
// This method is synchronized to make sure the
// messages are published in the order the transaction
// were committed. If we started publishing the messages
// in parallel then the receivers could corrupt the Map
// as deletes may arrive before inserts etc.
public synchronized void transactionEnd(String txid, boolean isWriteThroughEnabled, boolean committed,
Collection changes) {
try {
// must be write through and commited.
if (isWriteThroughEnabled && committed) {
// write the sequences to a byte []
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
if (publishMaps.isEmpty()) {
// serialize the whole collection
LogSequenceTransformer.serialize(changes, oos, this, mode);
} else {
// filter LogSequences based on publishMaps contents
Collection publishChanges = new ArrayList();
Iterator iter = changes.iterator();
while (iter.hasNext()) {
LogSequence ls = (LogSequence) iter.next();
if (publishMaps.contains(ls.getMapName())) {
publishChanges.add(ls);
}
}
LogSequenceTransformer.serialize(publishChanges, oos, this, mode);
}
// make an object message for the changes
oos.flush();
ObjectMessage om = jmsSession.createObjectMessage(bos.toByteArray());
// set properties
om.setStringProperty(PROP_TX, txid);
om.setStringProperty(PROP_GRIDNAME, myGrid.getName());
// transmit it.
publisher.publish(om);
}
} catch (Throwable e) {
throw new ObjectGridRuntimeException("Cannot push changes", e);
}
}
Receber e aplicar mensagens de atualização
A seguir está o método de execução. Este método é executado em um loop até que o aplicativo pare o loop. Cada iteração do loop tenta receber uma mensagem JMS e aplicá-la ao ObjectGrid.
Exemplo do método de execução de mensagem JMS
private synchronized boolean isListenerRunning() {
return listenerRunning;
}
public void run () {
try {
System.out.println("Listener starting");
// get a jms session for receiving the messages.
// Non transactional.
TopicSession myTopicSession;
myTopicSession = connection.createTopicSession(false, javax.jms.
Session.AUTO_ACKNOWLEDGE);
// get a subscriber for the topic, true indicates don't receive
// messages transmitted using publishers
// on this connection. Otherwise, we'd receive our own updates.
TopicSubscriber subscriber = myTopicSession.createSubscriber(topic,
null, true);
System.out.println("Listener started");
while (isListenerRunning()) {
ObjectMessage om = (ObjectMessage)subscriber.receive(2000);
if (om != null) {
// Use Session that was passed in on the initialize...
// very important to use no write through here
mySession.beginNoWriteThrough();
byte[] raw = (byte[])om.getObject();
ByteArrayInputStream bis = new ByteArrayInputStream(raw);
ObjectInputStream ois = new ObjectInputStream(bis);
// inflate the LogSequences
Collection collection = LogSequenceTransformer.inflate(ois,
myGrid);
Iterator iter = collection.iterator();
while (iter.hasNext()) {
// process each Maps changes according to the mode when
// the LogSequence was serialized
LogSequence seq = (LogSequence)iter.next();
mySession.processLogSequence(seq);
}
mySession.commit();
} // if there was a message
} // loop while
// stop the connection
connection.close();
} catch (IOException e) {
System.out.println("IO Exception: " + e);
} catch (JMSException e) {
System.out.println("JMS Exception: " + e);
} catch (ObjectGridException e) {
System.out.println("ObjectGrid exception: " + e);
System.out.println("Caused by: " + e.getCause());
} catch (Throwable e) {
System.out.println("Exception : " + e);
}
System.out.println("Listener stopped");
}