Correlating input request and output response aggregation messages

If you are using WebSphere MQ as the transport protocol for your aggregation flows, you can correlate the initial request message with the combined response message by preserving the MsgId from the input MQMD and using it as the CorrelId of the output response.

Before you start:

To complete this task, you must have completed the following tasks:

When the initial request is processed by the fan-out flow, the MsgId held in the MQMD is lost. If you want to correlate the aggregated output message to the original input message, one way to do this is to preserve the MsgId and specify it as the CorrelId in the aggregated output message.

The task described here assumes that you have already created a fan-out flow and a fan-in flow as two separate flows that run on the same broker. It also assumes the minimum content for these flows; your flows can include whatever content is appropriate, and you can adapt them in the way that is shown here to add this additional function.

  1. Switch to the Broker Application Development perspective.
  2. Open your fan-out flow and add a new branch that includes these nodes:
    • A Compute node named SaveMsgId, connected to the out terminal of the AggregateControl node
    • An MQOutput node called SavedMsgIdQ, connected to the out terminal of SaveMsgId
    • An AggregateRequest node, connected to the out terminal of SavedMsgIdQ
    The updated fan-out flow is shown below. It now has three branches from the AggregateControl node:
    1. The new branch with Compute node SaveMsgID that adds an MQMD to a copy of the input message and preserves the MsgId.
    2. The original branch that processes the request. This is shown with a single Compute node HandleRequest that extracts a single request from the combined input, adds an MQMD, and issues the new request to the MQOutput node RequestQ. This is followed by an AggregateRequest node that records that this request has been issued.
    3. The control branch, where a Compute node Add_MQMD adds an MQMD to the control message and passes it to MQOutput node ControlQ to deliver to the AggregateReply node in a separate fan-in flow.

    This shows a minimal fan-out flow (MQInput node connected to AggregateControl node, connected to control flow (Compute that adds an MQMD connected to MQOutput), normal flow (Compute connected to MQOutput conencted to AggregateRequest). It also has an additional flow to preserve the MsgId (Compute connected to MQOutput connected to AggregateRequest).
  3. Code ESQL for the new Compute node SaveMsgId, as shown below:
    CREATE COMPUTE MODULE SaveMsgId
      CREATE FUNCTION Main() RETURNS BOOLEAN
      BEGIN
        CALL CopyMessageHeaders();
    
    // Extract and preserve the MsgId of the original request message
        SET OutputRoot.XML.MyMsgId = InputRoot.MQMD.MsgId; 
      END;
      CREATE PROCEDURE CopyMessageHeaders()
      BEGIN
        DECLARE I INTEGER 1;
        DECLARE J INTEGER CARDINALITY(InputRoot.*[]);
        WHILE I < J DO
          SET OutputRoot.*[I] = InputRoot.*[I];
          SET I = I + 1;
        END WHILE;
      END;
    END MODULE;
  4. Configure the MQOutput node SavedMsgIdQ:
    • Set the property Queue name to SAVEDMSGID.
    • Select Request in the properties dialog navigator and select the check box Request.
    • Enter the name REPLIES for the Reply-to queue property. This is the name of the queue to which reply messages are sent in response to the requests sent by the fan-out flow.
  5. Configure the AggregateRequest node in the new branch by setting property Folder to SavedMSGID
  6. Create a new message flow that contains:
    • An MQInput node SavedMsgId, with property Queue name set to SAVEDMSGID
    • An MQReply node, connected to the out terminal of SavedMsgId
    This is shown below:
    This shows the new flow which receives the message containing the MQMD MsgId and routes it to the input queue REPLIES of the fan-in flow through an MQReply node
  7. Open the fan-in flow and update its Compute node to handle the new function. Add the ESQL shown below into the Main function:
    SET OutputRoot.MQMD.Version =  MQMD_CURRENT_VERSION; 
    // Store the saved MQMD MsgId value from the specified folder in MQMD CorrelId field
        SET OutputRoot.MQMD.CorrelId =
            CAST(InputRoot.ComIbmAggregateReplyBody.SavedMSGID.XML.MyMsgId AS BLOB);  
    The example fan-out flow is shown below; the Compute node SetCorrelID processes the replies to create a combined response message, and now sets the MQMD correlation identifier to associate this response with the initial request:
    This shows a minimal fan-in flow (MQInput node that receives the control message connected to AggregateReply node control terminal, MQInput node that receives the replies connected to AggregateReply in terminal.  AggregateReply node out terminal connected to Compute node SetCorrelId connected to MQOutput which delivers the combined response).
This task shows you one example of how to preserve the MQMD MsgId to correlate the initial request with the final response. The additional function has been added in a new branch of an existing fan-out flow; this adds another message to the request and another message flow in the broker. You could achieve the same result if you add the code to preserve the MsgId to an existing branch in your fan-out flow.
Related concepts
Message flows
Message flow aggregation
Related tasks
Configuring aggregation flows
Designing a message flow
Creating a message flow
Defining message flow content
Related reference
AggregateControl node
AggregateReply node
AggregateRequest node