Creating the aggregation fan-in flow

The aggregation fan-in flow receives the responses to the request messages sent out by the fan-out flow and constructs a combined response message containing all the responses received.

Before you start:

To complete this task, you must have completed the following task:

Depending on the timeout values that you have specified, the combined response message might be generated before all the replies have been received by the fan-in flow.

To review an example of a fan-in flow, see the Airline sample that is supplied with WebSphere Business Integration Message Broker.

To create the fan-in flow:

  1. Switch to the Broker Application Development perspective.
  2. Create a message flow to provide the fan-in processing.
  3. Add the following nodes in the editor view and configure and connect them as described:
    Input node
    The input node receives the responses to the multiple request messages generated from the fan-out flow.

    This must be an input node that supports the request/reply model. You can use the built-in nodes MQeInput and MQInput, or a user-defined input node that supports request/reply, or a mixture of these nodes (depending on the requirements of the applications that send these responses). The response received by each input node must be sent across the same protocol as the request to which it corresponds (for example, if you include an MQOutput node in the fan-out flow, the response to that request must be received by an MQInput node in this flow).

    1. Right-click the input node and click Properties.
    2. Specify the source of input messages for this node. For example, specify the name of a WebSphere MQ queue in the Basic property Queue Name from which the MQInput node retrieves messages.
    3. Optional: specify values for any other properties that you want to configure for this node.
    4. Connect the input node's out terminal to the in terminal of an AggregateReply node.

      This represents the simplest configuration; if appropriate, you can include other nodes between the input node and the AggregateReply node. For example, you might want to store the replies for audit purposes (in a Warehouse node).

    AggregateReply node
    The AggregateReply node receives the inbound responses from the input node through its in terminal. Each reply message received by the AggregateReply node is stored persistently in the broker database.

    When all the replies for a particular group of aggregation requests have been collected, the AggregateReply node creates an aggregated reply message and propagates this through the out terminal.

    1. Right-click the AggregateReply node and click Properties.
    2. Set the Aggregate Name property of the AggregateReply to identify this aggregation. Set this value to be the same value that you set for the Aggregate Name property in the corresponding AggregateControl node in the fan-out flow.
    3. Optional: set a value for the Unknown Message Timeout if you want to retain an unrecognized message before propagating it to the unknown terminal. If you are using separate fan-out and fan-in flows, you might want to set this value to a non-zero number in case there are delays in the arrival of the control message.
    4. Optional: connect the unknown terminal to another node or sequence of nodes if you want to explicitly handle unrecognized messages. If you do not connect this terminal to another node in the message flow, messages propagated through this terminal are discarded.
    5. Optional: if you have specified a timeout value for this aggregation in the AggregateControl node, connect the timeout terminal to another node or sequence of nodes if you want to explicitly handle timeouts that expire before all replies are received. Partially complete aggregated replies are sent to the timeout terminal if the timer expires. If you do not connect this terminal to another node in the message flow, messages propagated through this terminal are discarded.
    6. Optional: specify values for any other properties that you want to configure for this node.
    7. Connect the out terminal of the AggregateReply node to the in terminal of a Compute node.

    The AggregateReply node also receives on its control terminal the control message that was sent by the corresponding AggregateControl node on the fan-out flow (either directly or indirectly, as described in Associating fan-out and fan-in aggregation flows). Do not make modify the content of this control message.

    Compute node
    The Compute node receives the message that contains the combined responses. It is unlikely that this combined message is in a format that is valid for output, and you must configure this node to create a valid output message.
    1. Right-click the Compute node and click Properties.
    2. Specify the name of the ESQL module that customizes the function of this node in the Basic property ESQL Module.
    3. Right-click the node and click Open ESQL to open the ESQL file that contains the module for this node. The module is highlighted in the ESQL editor view.
    4. Code the ESQL to create a single output message from the aggregated replies in the input message.

      The structure of the aggregated reply message that is propagated through the out terminal, and information on how you can access its contents, are provided in Accessing the combined message contents.

    5. Optional: specify values for any other properties that you want to configure for this node.
    6. Connect the out terminal of the Compute node to the in terminal of the output node that represents the destination of the single response message.
    Output node
    Include an output node for your fan-in flow. This can be any of the built-in nodes, or a user-defined output node.
    1. Right-click the output node and click Properties.
    2. Specify the destination for the output message for this node. For example, specify the name of a WebSphere MQ queue in the Basic property Queue Name to which the MQOutput node sends messages.
    3. Optional: specify values for any other properties that you want to configure for this node.
  4. Press Ctrl-S to save the message flow and validate its configuration.

Accessing the combined message contents

The AggregateReply node creates a folder in the combined message tree below Root, called ComIbmAggregateReplyBody. Below this, it creates a number of folders using the folder names that you set in the AggregateRequest nodes. The associated reply messages are put beneath them.

For example, the request messages might have folder names:

  • TAXI
  • HOTEL

The resulting aggregated reply message created by the AggregateReply node might have a structure similar to that shown below:


This is a diagram of the tree for the aggregated message content created under element ComIbmAggregateReplyBody under Root. Its contents are described in the surrounding text.

You can use a Compute node to access the reply from the taxi company using the following correlation name:

InputRoot.ComIbmAggregateReplyBody.TAXI.xyz

The folder name does not have to be unique. If you have multiple requests with the folder name TAXI, you can access the separate replies using the array subscript notation, for example:

InputRoot.ComIbmAggregateReplyBody.TAXI[1].xyz
InputRoot.ComIbmAggregateReplyBody.TAXI[2].xyz

Related concepts
Message flows
Message flow aggregation
User-defined Input nodes
User-defined output nodes

Related tasks
Configuring aggregation flows
Creating the aggregation fan-out flow
Associating fan-out and fan-in aggregation flows
Setting timeouts for aggregation
Using multiple AggregateControl nodes
Handling exceptions and database deadlocks in aggregation flows
Designing a message flow
Creating a message flow
Defining message flow content
Developing user-defined extensions

Related reference
AggregateControl node
AggregateReply node
AggregateRequest node
Compute node
MQeInput node
MQeOutput node
MQInput node
MQOutput node