The Aggregation sample demonstrates a simple four-way aggregation operation, using the Aggregate Control, Request, and Reply nodes. It contains three message flows to implement a four-way aggregation: FanOut, RequestReplyApp, and FanIn.
This is the flow that takes the incoming request message, generates four different request messages, sends them out on request/reply, and starts the tracking of the aggregation operation:
Note that the Control terminal of the AggregateControl node
is not wired, and that the
Transaction Mode of the MQInput node is set to Yes.
You can find out more about the reasons for this design
in Extending the Aggregation sample.
The FanOut flow contains the Aggregate Control and Request nodes, which are used to start the aggregation processing. The Aggregate Control node propagates the request message down each of the four branches connected to its Out terminal (in no defined order). Each branch has a "BuildRequest" Compute node to generate the individual request. The following ESQL is used in the BuildRequest1 Compute node:
CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;
The CopyQuarter procedure copies the message headers from the input message, and then extracts one quarter of the <SaleList> elements. In the supplied example, there are eight <SaleList> elements, hence each request message will contain two of those. The ESQL for this procedure is as follows:
CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
IN output REFERENCE,
IN jumps INTEGER)
BEGIN
CALL CopyMessageHeaders(input, output);
CREATE LASTCHILD OF output DOMAIN 'XML';
CREATE LASTCHILD OF output.XML NAME 'SaleEnvelope';
DECLARE xmlIn REFERENCE TO input.XML.SaleEnvelope;
DECLARE xmlOut REFERENCE TO output.XML.SaleEnvelope;
IF LASTMOVE(xmlOut) <> TRUE THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('could not create output message');
END IF;
DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
DECLARE quarter INTEGER invoices/4;
IF invoices <> (quarter*4) THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('not divisible by 4', invoices);
END IF;
IF jumps > 3 THEN
THROW USER EXCEPTION CATALOG 'WMQIv600' MESSAGE 2949 VALUES ('too many jumps', jumps);
END IF;
DECLARE count INTEGER 1;
DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
WHILE count <= quarter DO
SET xmlOut.SaleList[count] = copyRef;
MOVE copyRef NEXTSIBLING;
SET count = count + 1;
END WHILE;
END;
There is some initial verification on the status of the inputs (the number of <SaleList> elements must be divisible by four, and the required quarter is selected by 0, 1, 2 or 3) before the appropriate number of <SaleList> elements is copied from the input message into the output message.
The CopyMessageHeaders procedure, as called in the CopyQuarter procedure, is based on the supplied standard CopyMessageHeaders procedure provided in the generated ESQL for a new Compute node. To maximize reuse, this was moved up to the scope of the ESQL file, so that all the Compute nodes could call the same procedure.
This re-scoping has an important implication, necessitating a change to the procedure. Within a Compute node, the OutputRoot reference has special properties that automatically ensure domain information is preserved when message tree elements are copied from InputRoot to OutputRoot. However in this case, OutputRoot is passed as a reference to an external procedure, hence the domain information must be explicitly preserved. This is accomplished by the addition of the CREATE LASTCHILD command:
CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;
After the "BuildRequest" Compute node has generated the request message by setting the Compute node to Pass LocalEnvironment and Message, it is output by an MQOutput node to the AGGR_SAMPLE_REQUEST queue. (For simplicity in this sample, all four requests are put to the same queue, but this is probably not realistic for a real application.) Each Aggregate Request node has a FolderName specified as a configuration parameter, which is used by the Aggregate Reply node when appending the various replies into the aggregated reply message. "Request1" uses the first quarter of the input message, "Request2" the second quarter, and so on.
The MQOutput nodes are set to specify AGGR_SAMPLE_REPLY as the ReplyTo queue on the request messages - this is used by the RequestReplyApp message flow.
After all four request messages are output, the Aggregate Control node stores the state of the aggregation internally in the broker. The following steps will happen to accomplish this:
There are other ways of accomplishing this - see Extending the sample for more details.
This whole flow must be done under a transaction, with transactionMode set to YES on the MQInput node, because it is most efficient if the last operation (the storing of the aggregation operation state) is complete before any replies are received.
You use this flow to simulate the back-end service applications that would normally process the request messages from the aggregation operation. In a real system, these could be other message flows or existing applications, but this level of complexity is not required for the Aggregation sample - so the flow contains the minimum required for correct request/reply processing. This flow reads from the same queue that the MQOutput nodes in the FanOut flow write to, and it outputs to the queue that the input node in the FanIn flow reads from - it provides a messaging bridge between the two flows. The messages are put to their reply-to queue (as set by the MQOutput nodes in the FanOut flow).
The RequestReplyApp flow is specified with three additional instances
in the bar file, resulting in four threads in total. This ensures that all four
requests are processed as quickly as possible.
This flow receives all the replies from the RequestReplyApp flow,
and aggregates them into a single output message. The output
message from the Aggregate Reply node cannot be output by an MQOutput
node, so a Compute node is added to tweak the data into a format where
it can be written out to a queue.
The FanIn message flow also has three additional instances, for the same reasons as the RequestReplyApp flow. The first three incoming replies are stored internally by the broker, and the stored aggregation state is updated. When the fourth reply is processed, the three stored replies are extracted and all four reply messages are built into an output message. This message is not in a state where it can be output to a queue, so the "BuildReply" Compute node invokes the following ESQL to rectify this:
CREATE COMPUTE MODULE FanIn_BuildReply
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputRoot.Properties = InputRoot.Properties;
CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
SET OutputRoot.MQMD.StrucId = MQMD_STRUC_ID;
SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
CREATE LASTCHILD OF OutputRoot DOMAIN 'XML';
CREATE LASTCHILD OF OutputRoot.XML NAME 'ComIbmAggregateReplyBody';
DECLARE next INTEGER 1;
DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
DECLARE repliesOut REFERENCE TO OutputRoot.XML.ComIbmAggregateReplyBody;
WHILE next <= 4 DO -- 4-way aggregation
CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
SET repliesOut.*[next].SaleEnvelope = repliesIn.XML.SaleEnvelope;
MOVE repliesIn NEXTSIBLING;
SET next = next + 1;
END WHILE;
RETURN TRUE;
END;
END MODULE;
The ESQL adds a rudimentary MQMD, before copying the data from ComIbmAggregateReplyBody in the input message into an XML tree in the output message, while maintaining the aggregate request identifiers and folders. The order of the replies is not specified.
The test message used to drive the aggregation message flow is a straightforward XML message that contains invoice details for a customer. It contains approximately 8 KB of data, in eight separate <SaleList> elements.
<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>