入力要求と出力応答の集約メッセージの相関

集約フロー用のトランスポート・プロトコルとして WebSphere MQ を使用している場合、入力 MQMD からの MsgId を保存し、出力応答の CorrelId としてそれを使用することによって、 初期の要求メッセージと複合応答メッセージを相関させることができます。

始める前に:

このタスクを実行するには、以下のタスクを完了している必要があります。

初期の要求がファンアウト・フローによって処理される場合、 MQMD に保留されている MsgId は失われます。 集約された出力メッセージを元の入力メッセージに相関させたい場合、 これを行う1 つの方法は、MsgId を保存し、それを集約された出力メッセージで CorrelId として指定することです。

ここで説明されているタスクは、ファンアウト・フローおよびファンイン・フローを、 同じブローカーで実行する 2 つの別個のフローとしてすでに作成していることが前提です。 これらのフローの最小の内容も前提とされています。すなわち、フローにはいかなる内容であれ適切なものを含むことができ、 この追加の関数を追加するためにここで表示されている方法でそれを適応させることができます。

  1. 「ブローカー・アプリケーション開発 (Broker Application Development)」パースペクティブに切り替えます。
  2. ファンアウト・フローを開き、これらのノードを含む新規ブランチを追加します。
    • SaveMsgId という名前の Compute ノード、AggregateControl ノードの out ターミナルに接続する。
    • SavedMsgIdQ と呼ばれる MQOutput ノード、SaveMsgId の out ターミナルに接続する。
    • AggregateRequest ノード、SavedMsgIdQ の out ターミナルに接続する。
    更新されたファンアウト・フローを以下に示します。 ここでは、AggregateControl ノードからの 3 つのブランチがあります。
    1. MQMD を入力メッセージのコピーに追加し、MsgId を保持する Compute ノードの SaveMsgID のある新規ブランチ。
    2. 要求を処理する元のブランチ。 これは、複合入力から単一の要求を抽出し、MQMD を追加し、新規要求を MQOutput ノードの RequestQ に送出する Compute ノードの HandleRequest とともに示されています。 これに、この要求が送出されたことを記録する AggregateRequest ノードが続きます。
    3. 制御ブランチ。Compute ノードの Add_MQMD が MQMD を制御メッセージに追加し、 別個のファンイン・フローにある AggregateReply ノードに送信するために それを MQOutput ノードの ControlQ に渡すところです。

    これは、最小のファンアウト・フロー (AggregateControl ノードに接続されている MQInput ノード) を示しており、 制御フロー (MQOutput に接続された MQMD を追加する Compute)、 通常フロー (AggregateRequest に接続されている MQOutput に接続されている Compute) に接続されています。 さらに、MsgId (AggregateRequest に接続されている MQOutput に接続されている Compute) を保存する追加のフローもあります。
  3. 下記に示されているように、ESQL を新規 Compute ノードの SaveMsgId 用にコーディングします。
    CREATE COMPUTE MODULE SaveMsgId
             CREATE FUNCTION Main() RETURNS BOOLEAN
        BEGIN
        CALL CopyMessageHeaders();
    
    // Extract and preserve the MsgId of the original request message
        SET OutputRoot.XML.MyMsgId = InputRoot.MQMD.MsgId;
      END;
      CREATE PROCEDURE CopyMessageHeaders()
        BEGIN
        DECLARE I INTEGER 1;
        DECLARE J INTEGER CARDINALITY(InputRoot.*[]);
        WHILE I < J DO
          SET OutputRoot.*[I] = InputRoot.*[I];
          SET I = I + 1;
        END WHILE;
      END;
    END MODULE;
  4. MQOutput ノードの SavedMsgIdQ を構成します。
    • プロパティーの「キュー名」SAVEDMSGIDに設定する。
    • プロパティー・ダイアログ・ナビゲーターで「要求 (Request)」を選択し、チェック・ボックス「要求 (Request)」を選択する。
    • 「応答先キュー」プロパティーに名前 REPLIES を入力する。 これが、ファンアウト・フローによって送信された要求に応じて応答メッセージが送信されるキューの名前になります。
  5. プロパティーの「フォルダー」SavedMSGID に設定して、 新規ブランチに AggregateRequest ノードを構成します。
  6. 以下を含む新規メッセージ・フローを作成します。
    • MQInput ノードの SavedMsgId、プロパティーの「キュー名」SAVEDMSGID に設定されている。
    • MQReply ノード、SavedMsgId の out ターミナルに接続する。
    これが下記に示されています。
    これは、MQMD MsgId を含むメッセージを受け取り、 それを MQReply ノードを通るファンイン・フローの入力キューである REPLIES に発送する新規フローを示しています。
  7. ファンイン・フローを開き、Compute ノードを更新して新規関数を処理します。 下記に示されている ESQL をメインの関数に追加します。
    SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
    // Store the saved MQMD MsgId value from the specified folder in MQMD CorrelId field
        SET OutputRoot.MQMD.CorrelId =
            CAST(InputRoot.ComIbmAggregateReplyBody.SavedMSGID.XML.MyMsgId AS BLOB);  
    ファンアウトの例が下記に示されています。 Compute ノードの SetCorrelID は、応答を処理して複合応答メッセージを作成し、 MQMD の相関 ID を設定してこの応答を初期の要求と関連付けます。
    これは、最小のファンイン・フロー (AggregateReply ノードの制御ターミナルに接続している制御メッセージを受け取る MQInput ノード、ターミナルで AggregateReply に接続している応答を受け取る MQInput ノード、 複合応答を送信する MQOutput に接続している Compute ノードの SetCorrelId に接続している AggregateReply ノードの out ターミナル) を示しています。
このタスクは、初期の要求を最終の要求を相関させるために、 どのようにして MQMD MsgId を保存するかについての例を示しています。 追加の関数は、既存のファンアウト・フローの新規ブランチに追加されており、 これは要求への他のメッセージおよび、ブローカーの他のメッセージ・フローを追加します。 コードを追加してファンアウト・フローの既存のブランチに MsgId を保存する場合、 同じ結果をアーカイブできます。

関連概念
メッセージ・フロー
メッセージ・フローの集約

関連タスク
集約フローの構成
メッセージ・フローの設計
メッセージ・フローの作成
メッセージ・フローの内容の定義

関連資料
AggregateControl ノード
AggregateReply ノード
AggregateRequest ノード