If you are using WebSphere MQ as the
transport protocol for your aggregation flows, you can correlate the initial
request message with the combined response message by preserving the MsgId
from the input MQMD and using it as the CorrelId of the output response.
Before
you start:
To complete this task, you must have completed the following
tasks:
When the initial request is processed by the fan-out flow, the
MsgId held in the MQMD is lost. If you want to correlate the aggregated output
message to the original input message, one way to do this is to preserve the
MsgId and specify it as the CorrelId in the aggregated output message.
The
task described here assumes that you have already created a fan-out flow and
a fan-in flow as two separate flows that run on the same broker. It also assumes
the minimum content for these flows; your flows can include whatever content
is appropriate, and you can adapt them in the way that is shown here to add
this additional function.
- Switch to the Broker Application Development
perspective.
- Open your fan-out flow and add a new branch that includes these
nodes:
- A Compute node named SaveMsgId, connected to the out terminal of the AggregateControl
node
- An MQOutput node called SavedMsgIdQ, connected to the out terminal of
SaveMsgId
- An AggregateRequest node, connected to the out terminal of SavedMsgIdQ
The updated fan-out flow is shown below. It now has
three branches from the AggregateControl node:- The new branch with Compute node SaveMsgID that adds an MQMD to a copy
of the input message and preserves the MsgId.
- The original branch that processes the request. This is shown with a single
Compute node HandleRequest that extracts a single request from the combined
input, adds an MQMD, and issues the new request to the MQOutput node RequestQ.
This is followed by an AggregateRequest node that records that this request
has been issued.
- The control branch, where a Compute node Add_MQMD adds an MQMD to the
control message and passes it to MQOutput node ControlQ to deliver to the
AggregateReply node in a separate fan-in flow.

- Code ESQL for the new Compute node SaveMsgId, as shown below:
CREATE COMPUTE MODULE SaveMsgId
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
CALL CopyMessageHeaders();
// Extract and preserve the MsgId of the original request message
SET OutputRoot.XML.MyMsgId = InputRoot.MQMD.MsgId;
END;
CREATE PROCEDURE CopyMessageHeaders()
BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER CARDINALITY(InputRoot.*[]);
WHILE I < J DO
SET OutputRoot.*[I] = InputRoot.*[I];
SET I = I + 1;
END WHILE;
END;
END MODULE;
- Configure the MQOutput node SavedMsgIdQ:
- Set the property Queue name to SAVEDMSGID.
- Select Request in the properties dialog navigator and select the check
box Request.
- Enter the name REPLIES for the Reply-to
queue property. This is the name of the queue to which reply
messages are sent in response to the requests sent by the fan-out flow.
- Configure the AggregateRequest node in the new branch by setting
property Folder to SavedMSGID
- Create a new message flow that contains:
- An MQInput node SavedMsgId, with property Queue
name set to SAVEDMSGID
- An MQReply node, connected to the out terminal of SavedMsgId
This is shown below:

- Open the fan-in flow and update its Compute node to handle the
new function. Add the ESQL shown below into the Main function:
SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
// Store the saved MQMD MsgId value from the specified folder in MQMD CorrelId field
SET OutputRoot.MQMD.CorrelId =
CAST(InputRoot.ComIbmAggregateReplyBody.SavedMSGID.XML.MyMsgId AS BLOB);
The
example fan-out flow is shown below; the Compute node SetCorrelID processes
the replies to create a combined response message, and now sets the MQMD correlation
identifier to associate this response with the initial request:

This task shows you one example of how to preserve the MQMD MsgId
to correlate the initial request with the final response. The additional function
has been added in a new branch of an existing fan-out flow; this adds another
message to the request and another message flow in the broker. You could achieve
the same result if you add the code to preserve the MsgId to an existing branch
in your fan-out flow.