Java でのメッセージ処理ノードまたは出力ノードの作成

始める前に

WebSphere Business Integration Message Broker では、SwitchNode および TransformNode という名前の 2 つのサンプル・ユーザー定義ノードのソースが準備されています。 これらのノードは現行の状態で使用することもできますし、変更を加えてもかまいません。

概念上、メッセージ処理ノードは何らかの形でメッセージを処理するために使用され、 出力ノードはメッセージをビット・ストリームとして出力するために使用されます。 しかし、メッセージ処理ノードや出力ノードをコーディングするとき、 これらは基本的に同じものです。 出力ノードでメッセージ処理を実行することもできますし、 同様に、メッセージ処理ノードを使用してメッセージをビット・ストリームとして出力することもできます。 単純化するため、 このトピックではメッセージ処理ノードとしてノードを主に参照していますが、 どちらのタイプのノードの機能についても取り上げます。

分散プラットフォーム上でブローカーをデプロイする予定の z/OS で、 Java ノードを開発しないでください。 これは、z/OS 上の Java のレベルが、分散プラットフォーム上で Java のレベルと互換性のある コードを作成しない可能性があるためです。

メッセージ・データへのアクセス

多くの場合、ユーザー定義ノードは、 その入力ターミナルで受け取ったメッセージの内容にアクセスする必要があります。 メッセージは、構文エレメントのツリーとして表されます。 ユーティリティー関数のグループが、メッセージ管理、メッセージ・バッファー・アクセス、 構文エレメント・ナビゲーション、および構文エレメント・アクセス用に提供されます。

MbElement クラスには、構文エレメントへのインターフェースがあります。 Java API の詳細については、Javadoc を参照してください。

たとえば、以下のようになります。

  1. XML メッセージの中の関係のある構文エレメントにナビゲートするには、次のようにします。
            MbElement rootElement = assembly.getMessage().getRootElement();
            MbElement switchElement =
    			rootElement.getLastChild().getFirstChild().getFirstChild();
  2. このエレメントの値で示されたターミナルを選択するには、次のようにします。
            String terminalName;
        String elementValue = (String)switchElement.getValue();
            if(elementValue.equals("add"))
                terminalName = "add";
            else if(elementValue.equals("change"))
                terminalName = "change";
            else if(elementValue.equals("delete"))
                terminalName = "delete";
            else if(elementValue.equals("hold"))
                terminalName = "hold";
            else
                terminalName = "failure";
        
        MbOutputTerminal out = getOutputTerminal(terminalName);

メッセージ・オブジェクトの変換

受信した入力メッセージは読み取り専用です。 したがって、メッセージを変換するには、まずそれを新しい出力メッセージに書き込む必要があります。 入力メッセージからエレメントをコピーするか、 あるいは、新しいエレメントを作成してそれらをメッセージに付加することができます。 新しいエレメントは通常はパーサーのドメインに入ります。

MbMessage クラスには、コピー・コンストラクターと、 メッセージのルート・エレメントを取得するメソッドがあります。 MbElement クラスには、構文エレメントへのインターフェースがあります。

たとえば、組み込みメッセージを含む着信メッセージ・アセンブリーがある場合は、次のようにします。
  1. メッセージ・アセンブリーとその組み込みメッセージの新しいコピーを作成します。
            MbMessage newMsg = new MbMessage(assembly.getMessage());
            MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg);
  2. XML メッセージの中の関係のある構文エレメントにナビゲートします。
        MbElement rootElement = newAssembly.getMessage().getRootElement();
            MbElement switchElement =
    			rootElement.getFirstElementByPath("/XML/data/action");
  3. 既存のエレメントの値を変更します。
      String elementValue = (String)switchElement.getValue();
            if(elementValue.equals("add"))
                switchElement.setValue("change");
            else if(elementValue.equals("change"))
                switchElement.setValue("delete");
            else if(elementValue.equals("delete"))
                switchElement.setValue("hold");
            else
                switchElement.setValue("failure");
  4. 切り替えタグの子として新しいタグを追加します。
        MbElement tag = switchElement.createElementAsLastChild(MbElement.TYPE_NAME,
                                                                                                                          "PreviousValue",
                                                                                                                          elementValue);
  5. この新しいタグに属性を追加します。
        tag.createElementAsFirstChild(MbElement.TYPE_NAME_VALUE,
                                                                        "NewValue",
                                                                        switchElement.getValue());
    
        MbOutputTerminal out = getOutputTerminal("out");

ESQL へのアクセス

ノードは、Compute ノードの ESQL 構文を使用して ESQL 式を呼び出すことができます。 ユーザーは、ESQL 式を使用してメッセージのコンポーネントを作成および変更し、 入力メッセージと外部データベースのデータの両方を参照できます。

以下の手順は、 ESQL を使用してユーザー定義ノードのトランザクションを制御する方法を表しています。
  1. 使用する ODBC データ・ソースの名前を設定します。 たとえば、
    String dataSourceName = "myDataSource";
  2. 実行する ESQL ステートメントを設定します。
    String statement = 
          "SET OutputRoot.XML.data = 
                        (SELECT Field2 FROM Database.Table1 WHERE Field1 = 1);";
    あるいは、結果を戻さないステートメントを実行する場合は、次のようにします。
    String statement = "PASSTHRU(
                                                    'INSERT INTO Database.Table1 VALUES(
                                                              InputRoot.XML.DataField1,
                                                              InputRoot.XML.DataField2)');";
  3. 以下の中から、トランザクションのタイプを選択します。
    MbSQLStatement.SQL_TRANSACTION_COMMIT
    ESQL ステートメントの実行の直後にトランザクションをコミットします。
    MbSQLStatement.SQL_TRANSACTION_AUTO
    メッセージ・フローの完了時にトランザクションをコミットします。 (必要に応じてロールバックが実行されます。)
    たとえば、
    int transactionType = MbSQLStatement.SQL_TRANSACTION_AUTO;
  4. ESQL ステートメントを取得します。 たとえば、
    MbSQLStatement sql = 
                  createSQLStatement(dataSourceName, statement, transactionType);
    トランザクション・タイプをデフォルトの MbSQLStatement.SQL_TRANSACTION_AUTO) にする場合は、 メソッド createSQLStatement(dataSource, statement) を使用することができます。
  5. 伝搬する新しいメッセージ・アセンブリーを作成します。
    MbMessageAssembly newAssembly = 
                  new MbMessageAssembly(assembly, assembly.getMessage());
  6. ESQL ステートメントを実行します。
    sql.select(assembly, newAssembly);
    あるいは、結果を戻さない ESQL ステートメントを実行する場合は、次のようにします。
    sql.execute(assembly);

ESQL の詳細については、ESQLを参照してください。

例外の処理

mbException クラスを使用して、 例外をキャッチして例外に接続します。 mbException クラスは、 ブローカー例外リスト内の例外の子を表す例外オブジェクトの配列を戻します。 戻される各エレメントは、その例外タイプを指定します。 例外に子が存在しない場合には、空の配列が戻されます。 次のコード・サンプルは、MbException クラスの使用法の例を示しています。

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
        try
      {

                // plug-in functionality

      }
        catch(MbException ex)
      {
                traverse(ex, 0);

                throw ex; // if re-throwing, it must be the original exception that was caught
      }
  }

    void traverse(MbException ex, int level)
  {
        if(ex != null)
      {
                // Do whatever action here
                System.out.println("Level: " + level);
                System.out.println(ex.toString());
                System.out.println("traceText:  " + ex.getTraceText());

                // traverse the hierarchy
                MbException e[] = ex.getNestedExceptions();
                int size = e.length;
                for(int i = 0; i < size; i++)
          {
                        traverse(e[i], level + 1);
          }
      }
  }

mbException クラスの使用法の詳細については、javadoc を参照してください。

こうした方法で、すべての現行の例外にアクセスできるユーザー定義メッセージ処理ノードまたは出力ノードを開発できます。 たとえば、 データベース例外をキャッチするには MbSQLStatement クラスを使用できます。 このクラスは「throwExceptionOnDatabaseError」属性の値を設定します。 この属性はデータベース・エラーの発生時のブローカー動作を決定します。 true に設定すると、 例外が送出される場合、ユーザー定義拡張機能がキャッチして処理できます。

次のコード・サンプルは、 MbSQLStatement クラスの使用法の例を示しています。

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
        MbMessage newMsg = new MbMessage(assembly.getMessage());
        MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg);

        String table = assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName();
    MbSQLStatement state = createSQLStatement( "dbName", 
                                                      "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" );

        state.setThrowExceptionOnDatabaseError(false);
        state.setTreatWarningsAsErrors(true);

        state.select( assembly, newAssembly );

        int sqlCode = state.getSQLCode();
        if(sqlCode != 0)
      {
                // Do error handling here

                System.out.println("sqlCode = " + sqlCode);
                System.out.println("sqlNativeError = " + state.getSQLNativeError());
                System.out.println("sqlState = " + state.getSQLState());
                System.out.println("sqlErrorText = " + state.getSQLErrorText());
      }

        getOutputTerminal("out").propagate(assembly);
  }

メッセージの伝搬

メッセージを伝搬する前に、何のメッセージ・フロー・データを伝搬するか、 また、どのノードのターミナルでデータを受信するかを決定する必要があります。 メッセージは伝搬する前にファイナライズする必要があります。 メッセージを伝搬したら、 出力メッセージを削除しなければなりません。

たとえば、以下のようになります。
  1. 出力ターミナル "out" にメッセージを伝搬するには、次のようにします。
    MbOutputTerminal out = getOutputTerminal("out");
            out.propagate(newAssembly);
  2. 出力メッセージを削除するには、次のようにします。
        newMsg.clearMessage();	

出力装置への書き込み

出力装置に書き込みをするには、 論理 (階層) メッセージをビット・ストリームに変換し直す必要があります。 これは、以下のように、MbMessagegetBuffer メソッドを使用して実行します。

public void evaluate( MbMessageAssembly assembly, MbInputTerminal in)
                                                                                                          throws MbException
{
    MbMessage msg = assembly.getMessage();
    byte[] bitstream = msg.getBuffer();

    // write the bitstream out somewhere
    writeBitstream( bitstream );   // user method

 }

通常、出力ノードの場合はメッセージは出力ターミナルに伝搬されないので、 ここで戻すことができます。

注: WebSphere MQ キューに書き込むときは、提供されている MQOutput ノードを使用しなければなりません。 これは、ブローカーが、スレッド単位で、内部的に WebSphere MQ 接続およびオープン・キュー・ハンドルを保守し、 それらはパフォーマンスを最適化するためにキャッシュに入れられるからです。 さらに、ブローカーは特定の WebSphere MQ イベントの発生時に リカバリー・シナリオを処理しますが、 WebSphere MQ MQI 呼び出しが ユーザー定義出力ノードで使用された場合、これは不利な影響を受けます。

新規 Java プロジェクトの作成

Java ノードは、次のように、ワークベンチから、準備されている plug-in development environment (PDE) を使用して作成できます。 このためには、次のようにして新規 Java プロジェクトを作成する必要があります。
  1. 「プラグイン開発 (Plug-in Development)」パースペクティブに切り替えます。
  2. 「ファイル (File)」>「新規 (New)」>「プロジェクト (Project)」とクリックします。 左方のメニューから「Java」を選択してから、 右方のメニューから「Java プロジェクト (Java Project)」を選択します。
  3. プロジェクトに名前を付けます。

    「Java 設定 (Java Settings)」パネルが表示されます。

  4. 「ライブラリー (Libraries)」タブを選択して、 「外部 JAR の追加 (Add External JARs)」をクリックします。
  5. install_dir\classes\jplugin.jar を選択します。
  6. 「その他 (Other)」タブのプロンプトに従って、 他のビルド設定を定義します。
  7. 「終了 (Finish)」をクリックします。
このようにしたら、次はこのプロジェクトの中で Java ノードのソースを開発することができます。

メッセージ処理ノード・クラスの宣言

MbNodeInterface をインプリメントし、 ブローカーのクラスパス (または LIL パス) に含められるすべてのクラスは、 メッセージ処理ノードとしてブローカーに登録されます。 MbNodeInterface をインプリメントする際は、 このクラスの evaluate メソッドもインプリメントする必要があります。 evaluate メソッドは、 フローに渡される各メッセージごとに、ブローカーによって呼び出されます。

メッセージ処理ノード・クラスを宣言するには、たとえば次のようにします。
package com.ibm.jplugins;

import com.ibm.broker.plugin.*;

public class BasicNode extends MbNode implements MbNodeInterface
次のようにして、ワークベンチで実行できます。
  1. 「ファイル (File)」>「新規 (New)」>「クラス (Class)」をクリックします。
  2. パッケージおよびクラス名フィールドに適切な値を設定します。
  3. 「スーパークラス (Superclass)」テキスト・フィールドのテキストを削除し、 「ブラウズ (Browse)」をクリックします。
  4. MbNode を選択します。
  5. 「インターフェース (Interfaces)」テキスト・フィールドの横にある「追加 (Add)」ボタンをクリックして、 MbNodeInterface を選択します。
  6. 「終了 (Finish)」をクリックします。

ノードのコンストラクターの定義

ノードがインスタンス化されるときは、 ユーザーのノード・クラスのコンストラクターが呼び出されます。 これは、ノードのターミナルを作成し、属性のデフォルト値を初期設定する場所です。

メッセージ処理ノードには、 いくつかの入力ターミナルと出力ターミナルが関連付けられています。 メソッド createInputTerminal および createOutputTerminal は、 ノードがインスタンス化される時にノードにターミナルを追加するために使用されます。 たとえば、1 つの入力ターミナルと 2 つの出力ターミナルを持つノードを作成するには、 次のようにします。

public MyNode() throws MbException
{
		// create terminals here
		createInputTerminal ("in");
		createOutputTerminal ("out");
		createOutputTerminal ("failure");
}

ノード名の宣言

ノードの名前を宣言して、ノードがワークベンチに識別されるようにする必要があります。 すべてのノード名は末尾が "Node" でなければなりません。 名前は以下のメソッドを使用して宣言します。

public static String getNodeName()
{
      return "BasicNode";
}
このメソッドが宣言されていない場合は、 Java API フレームワークが以下の規則に従ってデフォルトのノード名を作成します。
  • パッケージ名にはクラス名が付加されます。
  • ドットは除去され、 パッケージ名とクラス名の各部の最初の文字が大文字になります。
たとえば、 デフォルトでは以下のクラスにノード名 "ComIbmPluginsamplesBasicNode" が割り当てられます。
package com.ibm.pluginsamples;
public class BasicNode extends MbNode implements MbNodeInterface
{
   ...

属性の宣言

ノード属性は、Java Bean プロパティーと同じ方法で宣言します。 属性の getter メソッドおよび setter メソッドはユーザーが作成しなければなりません。 API フレームワークは、Java Bean 内視規則を使用して属性名を推測します。 たとえば、以下の 2 つのメソッドを宣言するには、次のようにします。

private String attributeVariable;

public String getFirstAttribute()
{
    return attributeVariable;
}

publc void setFirstAttribute(String value)
{
    attributeVariable = value;
}

ブローカーは、このノードが firstAttribute という属性を持つことを推測します。 この名前は、内部クラス・メンバー変数名ではなく、get または set メソッドの名前に由来します。 属性はストリングとしてしか公開できないので、 get または set メソッドで、数値タイプとストリングの変換を行う必要があります。 たとえば、次のメソッドは timeInSeconds という属性を定義します。

int seconds;

public String getTimeInSeconds()
{
    return Integer.toString(seconds);
}

public void setTimeInSeconds(String value)
{
    seconds = Integer.parseInt(value);
}

ノード機能のインプリメント

先に述べたとおり、 メッセージ処理ノードまたは出力ノードの場合、 MbNodeInterface で定義されている、 evaluate メソッドをインプリメントする必要があります。 これは、メッセージを処理するためにブローカーによって呼び出されます。 このメソッドは、ノード用のすべての処理関数を提供します。

evaluate メソッドには、ブローカーによって渡される以下の 2 つのパラメーターがあります。
  1. MbMessageAssembly。 これには、適切なメソッドを使用してアクセスできる以下のオブジェクトが含まれています。
    • 着信メッセージ
    • ローカル環境
    • グローバル環境
    • 例外リスト
  2. メッセージが着信した入力ターミナル。

メッセージ・フロー・データ (メッセージ、グローバル環境、ローカル環境、および例外リスト) が、 ノードの入力ターミナルで受信されます。

ノードのインスタンスの削除

ノードのインスタンスは、次のいずれかの場合に削除します。
  • ブローカーがシャットダウンされた。
  • ノードまたはノードを含むメッセージ・フローを除去し、 構成を再デプロイした。
ノードの削除中は、 ノードがソケットの終了などの終結処理操作を実行できるよう、 ノードに情報が通知されるようにする必要がある場合があります。 ノードがオプションの onDelete メソッドをインプリメントしていれば、 ノードが削除される前にこれがブローカーによって呼び出されます。

onDelete メソッドは次のようにしてインプリメントします。

public void onDelete()
{
    // perform node cleanup if necessary
}

関連概念
ユーザー定義拡張機能の計画
ランタイム環境でのユーザー定義拡張機能
ユーザー定義拡張機能の設計
ユーザー定義のメッセージ処理ノード
ユーザー定義の出力ノード
ESQL

関連タスク
ユーザー定義拡張機能の開発
提供されているサンプルのインプリメント
Java ユーザー定義ノードのコンパイル

関連資料
Java ユーザー定義ノード API
例外リスト構造