El ejemplo de Agregación muestra una sencilla operación de agregación de cuatro vías que utiliza los nodos AggregateControl, AggregateRequest y AggregateReply. Contiene tres flujos de mensajes para implementar una agregación de cuatro formas: FanOut, RequestReplyApp y FanIn.
Es el flujo que toma el mensaje de petición entrante, genera cuatro mensajes de petición distintos, los envía tras una petición/respuesta e inicia el seguimiento de la operación de agregación.
El terminal de Control del nodo AggregateControl no está conectado y la Modalidad de transacción del nodo MQInput está establecida en Sí.
Encontrará más información sobre las razones por las que se ha elegido este diseño en el apartado
Ampliación del ejemplo de Agregación.
El flujo de mensajes FanOut contiene los nodos AggregateControl y AggregateRequest que se utilizan para iniciar el proceso de agregación. El nodo AggregateControl propaga el mensaje de petición a cada una de las cuatro ramas conectadas a su terminal de salida en un orden no definido. Cada rama tiene un nodo Compute BuildRequest para generar la petición individual. En el nodo BuildRequest1 Compute se utiliza el siguiente código ESQL:
CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;
El procedimiento CopyQuarter (copiar cuarto) copia las cabeceras de mensajes del mensaje de entrada y después extrae un cuarta parte de los elementos de la <SaleList>. En el ejemplo que se facilita, hay ocho elementos de <SaleList> y, por lo tanto, cada mensaje de petición contendrá dos elementos <SaleList>. El ejemplo siguiente muestra el código ESQL de este procedimiento:
CREATE PROCEDURE CopyQuarter(IN input REFERENCE, IN output REFERENCE, IN jumps INTEGER) BEGIN CALL CopyMessageHeaders(input, output); CREATE LASTCHILD OF output DOMAIN 'XMLNSC'; CREATE LASTCHILD OF output.XMLNSC NAME 'SaleEnvelope'; DECLARE xmlIn REFERENCE TO input.XMLNSC.SaleEnvelope; DECLARE xmlOut REFERENCE TO output.XMLNSC.SaleEnvelope; IF LASTMOVE(xmlOut) <> TRUE THEN THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('no se ha podido crear el mensaje de salida'); END IF; DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER); DECLARE quarter INTEGER invoices/4; IF invoices <> (quarter*4) THEN THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('no es divisible por 4', invoices); END IF; IF jumps > 3 THEN THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('demasiado saltos', jumps); END IF; DECLARE count INTEGER 1; DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count]; WHILE count <= quarter DO SET xmlOut.SaleList[count] = copyRef; MOVE copyRef NEXTSIBLING; SET count = count + 1; END WHILE; END;
Es necesario hacer algunas verificaciones iniciales sobre el estado de las entradas (el número de elementos de la <SaleList> ha de ser divisible por cuatro y el cuarto requerido se selecciona mediante 0, 1, 2 ó 3) antes de copiar el número adecuado de elementos de <SaleList> desde el mensaje de entrada en el mensaje de salida.
El procedimiento CopyMessageHeaders (copiar cabeceras de mensajes), como se le llama en el procedimiento CopyQuarter (copiar cuarto), se basa en el procedimiento estándar CopyMessageHeaders suministrado que procede del ESQL generado para un nuevo nodo Compute. Para maximizar la reutilización, este procedimiento CopyMessageHeaders se ha trasladado al ámbito del archivo ESQL, de forma que todos los nodos Compute puedan llamar al mismo procedimiento.
Este nuevo ámbito tiene una implicación importante y requiere un cambio en el procedimiento. Dentro del nodo Compute, la referencia OutputRoot (raíz de salida) tiene propiedades especiales que aseguran automáticamente que la información del dominio se conserva cuando los tres elementos del mensaje se copian desde InputRoot (raíz de entrada) a OutputRoot. Sin embargo, en este caso, OutputRoot se pasa como referencia a un procedimiento externo y, por lo tanto, la información del dominio ha de conservarse explícitamente. Esta conservación se lleva a cabo añadiendo el mandato CREATE LASTCHILD:
CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 2;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;
Después de que el nodo Compute BuildRequest haya generado el mensaje de petición estableciendo el nodo Compute en Pass LocalEnvironment and Message, un nodo MQOutput efectuará su salida a la cola AGGR_SAMPLE_REQUEST. (Para que este ejemplo sea más sencillo, se han transferido las cuatro peticiones a la misma cola, pero es probable que este escenario no resulte realista si se trata de una aplicación real.) Cada nodo AggregateRequest tiene un nombre de carpeta, especificado como un parámetro de configuración, que el nodo AggregateReply utiliza al añadir las diversas respuestas al mensaje de respuesta agregado. El nodo AggregateRequest1 utiliza el primer cuarto del mensaje de entrada, el nodo AggregateRequest2 el segundo cuarto, y así sucesivamente.
Los nodos MQOutput se establecen para especificar AGGR_SAMPLE_REPLY como la cola ReplyTo en los mensajes de petición, que utiliza el flujo de mensajes RequestReplyApp.
Cuando se realiza la salida de los cuatro mensajes, el nodo AggregateControl almacena el estado internamente el estado de la agregación en el intermediario, como se muestra en los pasos siguientes:
Para obtener más información acerca de otros modos de realizar ese escenario, consulte Ampliación del ejemplo.
El flujo completo ha de realizarse bajo una transacción, con Modalidad de transacción establecido en YES en el nodo MQInput, debido a que es el procedimiento más eficiente si la última operación (el almacenamiento del estado de la operación de agregación) ha terminado antes de recibir las respuestas.
Este flujo se utiliza para simular las aplicaciones de servicio de fondo que normalmente procesan los mensajes procedentes de la operación de agregación. En un sistema real, pueden ser otros flujos de mensajes o aplicaciones existentes, pero este nivel de complejidad no es necesario para el ejemplo de Agregación, de este modo, el flujo contiene lo mínimo necesario un proceso correcto de petición/respuesta. Este flujo lee de la misma cola en la que graban los nodos MQOutput del flujo FanOut, realiza la salida en la cola en la que lee el nodo de entrada de FanIn y proporciona un puente de mensajería entre los dos flujos de mensajes. Los mensajes de transfieren a su cola de respuestas (como establecen los nodos MQOutput en el flujo FanOut).
El flujo de mensajes RequestReplyApp se especifica con tres instancias
adicionales en el archivo de intermediario (BAR), lo que genera cuatro hebras en total, lo que
garantiza que las cuatro peticiones se procesen lo más rápidamente posible.
Este flujo recibe todas las respuestas del flujo de mensajes RequestReplyApp y las agrega en un solo mensaje de salida. Un nodo MQOutput no puede efectuar la salida del mensaje de salida del nodo
AggregateReply, por lo que se añade un nodo Compute para convertir los datos a un formato con el que pueden
grabarse en una cola.
El flujo de mensajes de abanico de entrada (FanIn) tiene también tres instancias adicionales por los mismos motivos que el flujo de mensajes RequestReplyApp. Las tres primeras respuestas entrantes las almacena internamente el intermediario y el estado de agregación almacenado se actualiza. Cuando se procesa la cuarta respuesta, las tres respuestas almacenadas se extraen y los cuatro mensajes de respuesta se incorporan a un mensaje de salida. El estado de ese mensaje no le permite ser transferido a una cola, por lo que el nodo Compute BuildReply invoca el siguiente ESQL:
CREATE COMPUTE MODULE FanIn_BuildReply CREATE FUNCTION Main() RETURNS BOOLEAN BEGIN SET OutputRoot.Properties = InputRoot.Properties; CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD'; SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION; CREATE LASTCHILD OF OutputRoot DOMAIN 'XMLNSC'; CREATE LASTCHILD OF OutputRoot.XMLNSC NAME 'ComIbmAggregateReplyBody'; DECLARE next INTEGER 1; DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next]; DECLARE repliesOut REFERENCE TO OutputRoot.XMLNSC.ComIbmAggregateReplyBody; WHILE next <= 4 DO -- 4-way aggregation CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn); SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR); SET repliesOut.*[next].SaleEnvelope = repliesIn.XMLNSC.SaleEnvelope; MOVE repliesIn NEXTSIBLING; SET next = next + 1; END WHILE; RETURN TRUE; END; END MODULE;
El ESQL añade un MQMD rudimentario antes de copiar los datos de ComIbmAggregateReplyBody del mensaje de entrada a un árbol XML del mensaje de salida, mientras mantiene las carpetas y los identificadores de la petición de agregación. El orden de las respuestas no se especifica.
El mensaje de prueba que se utiliza para dirigir el flujo de mensajes de agregación es un mensaje XML sencillo que contiene detalles de factura para un cliente. Contiene aproximadamente 8 KB de datos, en ocho elementos distintos de <SaleList>.
<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>