Aggregation サンプルについて

Aggregation サンプルは、AggregateControl、AggregateRequest、および AggregateReply の各ノードを使用する、4 つに分岐した単純な集約操作を示します。 これには、4 つに分岐した集約をインプリメントするための 3 つのメッセージ・フローが含まれています。すなわち、FanOut、RequestReplyApp、および FanIn です。

FanOut メッセージ・フロー

このフローは着信する要求メッセージを取得し、4 つの異なる要求メッセージを生成し、それらを要求/応答によって送り出して、集約操作のトラッキングを開始します。

Aggregation FanOut フロー
AggregateControl ノードの Control ターミナルは接続されず、MQInput ノードの「トランザクション・モード」「はい」に設定されています。 この設計の理由の詳細については、Aggregation サンプルの拡張を参照することができます。

FanOut フローには AggregateControl ノードと AggregateRequest ノードが含まれており、これは集約処理を開始するために使用されます。 AggregateControl ノードは要求メッセージを、その Out ターミナルに接続されている 4 つのブランチに伝搬します (順序は未定義)。 それぞれのブランチには、個々の要求を生成する BuildRequest Compute ノードがあります。 以下の ESQL コードは BuildRequest1 Compute ノードで使用されます。

CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;

CopyQuarter プロシージャーは入力メッセージからメッセージ・ヘッダーをコピーし、次いで <SaleList> エレメントの 4 分の 1 を抽出します。 提供されるサンプルには 8 つの <SaleList> エレメントが示されていて、 それぞれの要求メッセージには 2 つの <SaleList> エレメントが入ります。 以下の例は、このプロシージャーの ESQL コードを示しています。

CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
							 IN output REFERENCE,
							 IN jumps INTEGER)
BEGIN
	CALL CopyMessageHeaders(input, output);
	CREATE LASTCHILD OF output DOMAIN 'XMLNSC';
	CREATE LASTCHILD OF output.XMLNSC NAME 'SaleEnvelope';
	DECLARE xmlIn REFERENCE TO input.XMLNSC.SaleEnvelope;
	DECLARE xmlOut REFERENCE TO output.XMLNSC.SaleEnvelope;
	IF LASTMOVE(xmlOut) <> TRUE THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('could not create output message');
	END IF;

	DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
	DECLARE quarter INTEGER invoices/4;
	IF invoices <> (quarter*4) THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('not divisible by 4', invoices);
	END IF;	
	
	IF jumps > 3 THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('too many jumps', jumps);
	END IF;		
	
	DECLARE count INTEGER 1;
	DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
	WHILE count <= quarter DO
		SET xmlOut.SaleList[count] = copyRef;
		MOVE copyRef NEXTSIBLING;
		SET count = count + 1;
	END WHILE;
END;

適切な数の <SaleList> エレメントを入力メッセージから出力メッセージにコピーする前に、 入力の状況に関する初期検査が行われます (<SaleList> エレメントの数は 4 で除算できなければならず、 必要な 4 分の 1 は 0、1、2、または 3 で選択します)。

CopyQuarter プロシージャーで呼び出される CopyMessageHeaders プロシージャーは、新しい Compute ノード用の生成済み ESQL で指定されている、付属の標準 CopyMessageHeaders プロシージャーに基づいています。 最大限に再利用する目的で、この CopyMessageHeaders プロシージャーは ESQL ファイルの有効範囲に移されています。これにより、すべての Compute ノードが同じプロシージャーを呼び出すことができます。

この有効範囲の変更には重要な影響があり、プロシージャーへの変更を必要とします。 Compute ノード内で、OutputRoot 参照には、メッセージ・ツリー・エレメントが InputRoot から OutputRoot にコピーされるときにドメイン情報が自動的に必ず保存されるようにする、特別なプロパティーがあります。 ただしこの場合、OutputRoot は参照として外部プロシージャーに渡されるので、ドメイン情報を明示的に保存する必要があります。 この保存は、以下のような CREATE LASTCHILD コマンドを追加することにより可能になります。

CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 2;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;

BuildRequest Compute ノードが要求メッセージを生成すると (Compute ノードを「LocalEnvironment とメッセージを渡す」に設定することにより)、その後、要求メッセージは MQOutput ノードによって AGGR_SAMPLE_REQUEST キューに出力されます。 (このサンプルでは、単純化するために 4 つの要求がすべて同じキューに書き込まれます。 ただし、実際のアプリケーションでは、このシナリオはおそらく現実的ではありません。) それぞれの AggregateRequest ノードには構成パラメーターとしてフォルダー名が指定されています。 これは、集約された応答メッセージにさまざまな応答を付加するときに、AggregateReply ノードによって使用されます。 AggregateRequest1 ノードは入力メッセージの最初の 4 分の 1 を使用し、AggregateRequest2 ノードは 2 番目の 4 分の 1 を使用します (以下同様)。

MQOutput ノードは、要求メッセージの ReplyTo キューとして AGGR_SAMPLE_REPLY を指定するように設定されています。 これは RequestReplyApp メッセージ・フローにより使用されます。

4 つの要求メッセージがすべて出力された後、AggregateControl ノードは、以下に示す手順で集約の状態をブローカーに内部的に保管します。

このシナリオを完了するための他の方法については、『サンプルの拡張』を参照してください。

フロー全体は、MQInput ノードで「トランザクション・モード」「はい」に設定したトランザクションの下で実行する必要があります。応答を受け取る前に最後の操作 (集約操作状態の保管) が済んでいるのが最も効率的だからです。

RequestReplyApp メッセージ・フロー

このフローを使用して、集約操作からの要求メッセージを通常処理するバックエンド・サービス・アプリケーションをシミュレートします。 実際のシステムでは、これらのバックエンド・サービス・アプリケーションは別のメッセージ・フローや既存のアプリケーションである可能性がありますが、Aggregation サンプルではそれ程の複雑さは必要ありません。 このフローには、要求/応答を正しく処理するのに最低限必要なものだけが含まれています。 このフローは、FanOut フローの MQOutput ノードが書き込みを行うのと同じキューから読み取りを行い、FanIn フローの入力ノードが読み取りを行うキューに出力します。 つまり、2 つのフローの間のメッセージング・ブリッジを提供します。 メッセージはその応答先キュー (FanOut フローの MQOutput ノードで設定) に書き込まれます。

Reply フロー
RequestReplyApp フローは、ブローカー・アーカイブ (BAR) ファイル内の追加の 3 つのインスタンスで指定され、結果として合計で 4 つのスレッドになります。 これにより、4 つの要求すべては必ず可能な限り速く処理されます。

FanIn メッセージ・フロー

このフローは RequestReplyApp フローからのすべての応答を受け取り、それらを単一の出力メッセージに集約します。 AggregateReply ノードからの出力メッセージは MQOutput ノードでは出力できないので、データをキューに書き出せる形式へと調整するために、Compute ノードが追加されます。
Aggregation ファンイン・フロー

FanIn メッセージ・フローにも、RequestReplyApp フローと同じ理由から、3 つの追加のインスタンスがあります。 最初の 3 つの着信応答はブローカーにより内部的に保管され、保管されている集約状態が更新されます。 4 番目の応答が処理されると、保管されている 3 つの応答が抽出され、4 つの応答メッセージすべてが組み合わされて出力メッセージになります。 このメッセージはキューに出力できる状態ではないので、BuildReply Compute ノードは以下の ESQL を呼び出します。

CREATE COMPUTE MODULE FanIn_BuildReply
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		SET OutputRoot.Properties = InputRoot.Properties;
		CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
		SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
		CREATE LASTCHILD OF OutputRoot DOMAIN 'XMLNSC';
		CREATE LASTCHILD OF OutputRoot.XMLNSC NAME 'ComIbmAggregateReplyBody';
		DECLARE next INTEGER 1;
		DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
		DECLARE repliesOut REFERENCE TO OutputRoot.XMLNSC.ComIbmAggregateReplyBody;
		WHILE next <= 4 DO -- 4-way aggregation
			CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
			SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
			SET repliesOut.*[next].SaleEnvelope = repliesIn.XMLNSC.SaleEnvelope;
			MOVE repliesIn NEXTSIBLING;
			SET next = next + 1;
		END WHILE;
		RETURN TRUE;
	END;
END MODULE;

ESQL は、入力メッセージの ComIbmAggregateReplyBody から出力メッセージの XML ツリーにデータをコピーする前に基本 MQMD を追加し、その一方で集約要求 ID およびフォルダーを保守します。 応答の順序は指定されていません。

テスト・メッセージ

集約メッセージ・フローの実行に使用するテスト・メッセージは、顧客の送り状明細が入った次のような XML メッセージです。 これには 8 つの <SaleList> エレメントに分けられた、約 8 KB のデータが含まれています。

<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>

サンプルのホームに戻る