Der Mustercode 'Aggregation' stellt eine einfache Aggregationsoperation mit vier Verzweigungen unter Verwendung der AggregateControl-, AggregateRequest- und AggregateReply-Knoten dar. Er enthält drei Nachrichtenflüsse ('FanOut', 'RequestReplyApp' und 'FanIn'), um eine Aggregation mit vier Verzweigungen zu implementieren.
Dieser Nachrichtenfluss ruft die eingehende Anforderungsnachricht ab, erstellt vier verschiedene Anforderungsnachrichten, sendet diese auf eine Anfrage/Antwort-Anforderung hin und startet die Überwachung der Aggregationsoperation.
Das Steuerterminal des AggregateControl-Knotens verfügt über keine Wire-Verbindung
und der Transaktionsmodus des MQInput-Knotens ist auf
Yes (Ja) gesetzt.
Weitere Informationen zu den Gründen für dieses Design finden Sie unter
Mustercode 'Aggregation' erweitern.
Der Nachrichtenfluss 'FanOut' enthält den AggregateControl- und AggregateRequest-Knoten, mit denen die Verarbeitung der Aggregation gestartet wird. Der AggregateControl-Knoten gibt die Anforderungsnachricht über jede der vier Verzweigungen, die mit dessen Ausgangsterminal verbunden sind, (in unbestimmter Reihenfolge) weiter. Jede Verzweigung besitzt den Rechenknoten 'BuildRequest', um die individuelle Anforderung zu erstellen. Im Rechenknoten 'BuildRequest1' wird der folgende ESQL-Code verwendet :
CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;
Die CopyQuarter-Prozedur kopiert die Nachrichtenheader der Eingabenachricht und extrahiert dann ein Viertel der Elemente der <SaleList>. In dem bereitgestellten Beispiel werden acht <SaleList>-Elemente bereitgestellt, wobei jede Anforderungsnachricht zwei <SaleList>-Elemente enthält. Im Folgenden der ESQL-Code für diesen Vorgang:
CREATE PROCEDURE CopyQuarter(IN input REFERENCE, IN output REFERENCE, IN jumps INTEGER) BEGIN CALL CopyMessageHeaders(input, output); CREATE LASTCHILD OF output DOMAIN 'XMLNSC'; CREATE LASTCHILD OF output.XMLNSC NAME 'SaleEnvelope'; DECLARE xmlIn REFERENCE TO input.XMLNSC.SaleEnvelope; DECLARE xmlOut REFERENCE TO output.XMLNSC.SaleEnvelope; IF LASTMOVE(xmlOut) <> TRUE THEN THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('could not create output message'); END IF; DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER); DECLARE quarter INTEGER invoices/4; IF invoices <> (quarter*4) THEN THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('not divisible by 4', invoices); END IF; IF jumps > 3 THEN THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('too many jumps', jumps); END IF; DECLARE count INTEGER 1; DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count]; WHILE count <= quarter DO SET xmlOut.SaleList[count] = copyRef; MOVE copyRef NEXTSIBLING; SET count = count + 1; END WHILE; END;
Bevor die entsprechende Anzahl an <SaleList>-Elementen aus der Eingabe- in die Ausgabenachricht kopiert wird, wird zunächst der Status der Eingaben überprüft (die Anzahl der <SaleList>-Elemente muss durch vier teilbar sein und das erforderliche Viertel wird über 0, 1, 2 oder 3 ausgewählt).
Die CopyMessageHeaders-Prozedur, die in der CopyQuarter-Prozedur aufgerufen wird, basiert auf der bereitgestellten Standardprozedur CopyMessageHeaders, die in der erstellten ESQL für einen neuen Rechenknoten zur Verfügung gestellt wird. Um die Wiederverwendung zu maximieren, wurde diese CopyMessageHeaders-Prozedur in den Bereich der ESQL-Datei verschoben, sodass alle Rechenknoten dieselbe Prozedur aufrufen können.
Diese Verschiebung hat zur Folge, dass eine Änderung der Prozedur notwendig wird. Innerhalb eines Rechenknotens hat die OutputRoot-Referenz besondere Eigenschaften, die automatisch sicherstellen, dass Domäneninformationen beibehalten werden, wenn Nachrichtenbaumstrukturelemente vom InputRoot in den OutputRoot kopiert werden. Jedoch wird in diesem Fall OutputRoot als Referenz an eine externe Prozedur weitergeleitet. Daher muss die Domäneninformation explizit beibehalten werden. Die Beibehaltung wird durch Hinzufügen des Befehls CREATE LASTCHILD erreicht:
CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 2;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;
Nachdem der Rechenknoten 'BuildRequest' die Anforderungsnachricht erstellt hat, indem der Rechenknoten auf Pass LocalEnvironment and Message gesetzt wurde, wird die Anforderungsnachricht durch einen MQOutput-Knoten an die Warteschlange 'AGGR_SAMPLE_REQUEST' ausgegeben. (Der Einfachheit halber werden in diesem Muster alle vier Anforderungen in dieselbe Warteschlange eingereiht. Für eine echte Anwendung ist dies jedoch unrealistisch.) Zu jedem AggregateRequest-Knoten ist ein Ordnername als Konfigurationsparameter angegeben, der vom AggregateReply-Knoten beim Hinzufügen der verschiedenen Antworten zu der zusammengefassten Antwortnachricht verwendet wird. Der Knoten 'AggregateRequest1' verwendet das erste Viertel der Eingabenachricht, der Knoten 'AggregateRequest2' das zweite usw.
Die Einstellungen der MQOutput-Knoten werden so gesetzt, dass 'AGGR_SAMPLE_REPLY' als ReplyTo-Warteschlange in den Anforderungsnachrichten festgelegt wird. Diese Warteschlange wird vom Nachrichtenfluss 'RequestReplyApp' verwendet.
Nachdem alle vier Anforderungsnachrichten ausgegeben wurden, speichert der AggregateControl-Knoten den Status der Aggregation intern im Broker, indem er folgende Schritte ausführt:
Informationen zu anderen Möglichkeiten für die Ausführung dieses Szenarios finden Sie im Abschnitt Erweiterung des Mustercodes.
Der gesamte Nachrichtenfluss muss unter einer Transaktion ausgeführt werden, wobei der Transaktionsmodus auf dem MQInput-Knoten auf Ja gesetzt sein muss, da es am effizientesten ist, wenn die letzte Operation (das Speichern des Aggregationsoperationsstatus) beendet ist, bevor Antworten empfangen werden.
Mit diesem Nachrichtenfluss können die Back-End-Serviceanwendungen simuliert werden, die die Anforderungsnachrichten der Aggregationsoperation normalerweise verarbeiten. In einem echten System können dies andere Nachrichtenflüsse oder bestehende Anwendungen sein. Dieser Komplexitätsgrad ist für den Mustercode 'Aggregation' jedoch nicht erforderlich, deshalb enthält der Nachrichtenfluss nur die Elemente, die für eine korrekte Anfrage/Antwort-Verarbeitung erforderlich sind. Dieser Nachrichtenfluss liest aus derselben Warteschlange, in die die MQOutput-Knoten im Fluss 'FanOut' schreiben, und er gibt Daten an die Warteschlange aus, aus der der Empfangsknoten im Fluss 'FanIn' liest. Er stellt eine Nachrichtenaustauschbrücke zwischen den beiden Nachrichtenflüssen zur Verfügung. Die Nachrichten werden in ihrer Warteschlange für zu beantwortende Nachrichten eingereiht (wie von den MQOutput-Knoten im Fluss 'FanOut' gesetzt).
Der Nachrichtenfluss 'RequestReplyApp' wird in der BAR-Datei (Brokerarchivdatei) mit drei zusätzlichen Instanzen definiert, sodass sich insgesamt vier Threads ergeben; dadurch wird sichergestellt, dass alle vier Anforderungen so schnell wie möglich verarbeitet werden.
Dieser Nachrichtenfluss empfängt alle Antworten vom Nachrichtenfluss 'RequestReplyApp'
und fasst diese in einer einzelnen Ausgabenachricht zusammen. Die Ausgabenachricht vom
AggregateReply-Knoten kann nicht von einem MQOutput-Knoten ausgegeben werden.
Deshalb wird ein Rechenknoten hinzugefügt, der die Daten in ein Format bringt, mit dem
sie in eine Warteschlange geschrieben werden können.
Der Nachrichtenfluss 'FanIn' hat aus demselben Grund wie der Fluss 'RequestReplyApp' auch drei zusätzliche Instanzen. Die ersten drei eingehenden Antworten werden intern vom Broker gespeichert, und der gespeicherte Status der Aggregation wird aktualisiert. Wenn die vierte Antwort verarbeitet wird, werden die drei gespeicherten Antworten extrahiert, und alle vier Antwortnachrichten werden in einer Ausgabenachricht zusammengefasst. Diese Nachricht befindet sich nicht in einem Status, in dem sie an eine Warteschlange ausgegeben werden kann. Deshalb ruft der Rechenknoten 'BuildReply' die folgende ESQL auf:
CREATE COMPUTE MODULE FanIn_BuildReply CREATE FUNCTION Main() RETURNS BOOLEAN BEGIN SET OutputRoot.Properties = InputRoot.Properties; CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD'; SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION; CREATE LASTCHILD OF OutputRoot DOMAIN 'XMLNSC'; CREATE LASTCHILD OF OutputRoot.XMLNSC NAME 'ComIbmAggregateReplyBody'; DECLARE next INTEGER 1; DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next]; DECLARE repliesOut REFERENCE TO OutputRoot.XMLNSC.ComIbmAggregateReplyBody; WHILE next <= 4 DO -- 4-way aggregation CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn); SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR); SET repliesOut.*[next].SaleEnvelope = repliesIn.XMLNSC.SaleEnvelope; MOVE repliesIn NEXTSIBLING; SET next = next + 1; END WHILE; RETURN TRUE; END; END MODULE;
Die ESQL fügt eine rudimentäre MQMD-Struktur hinzu, bevor sie die Daten von ComIbmAggregateReplyBody in der Eingabenachricht in eine XML-Baumstruktur in der Ausgabenachricht kopiert. Dabei werden die Aggregat-IDs und -ordner beibehalten. Die Reihenfolge der Antworten ist nicht angegeben.
Bei der Testnachricht, mit der der Aggregationsnachrichtenfluss gesteuert wird, handelt es sich um eine XML-Nachricht, die Rechnungsangaben für den Kunden enthält. Sie enthält Daten mit einer Größe von etwa 8 KB in acht einzelnen <SaleList>-Elementen.
<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>