Java での入力ノードの作成

始める前に

以下のトピックを読み、理解していることを確認してください。

WebSphere Business Integration Message Broker では、SwitchNode および TransformNode という名前の 2 つのサンプル・ユーザー定義ノードのソースが準備されています。 これらのノードは現行の状態で使用することもできますし、変更を加えてもかまいません。

Java ユーザー定義ノードは、.jar ファイルとして配布されます。 このトピックでは、 Java を使用して入力ノードを作成するために実行する必要のあるステップについて説明します。 以下のステップについて要約しています。
  1. 新規 Java プロジェクトの作成
  2. 入力ノード・クラスの宣言
  3. ノードのコンストラクターの定義
  4. ノード名の宣言
  5. 属性の宣言
  6. ノード機能のインプリメント
  7. ノードのインスタンスの削除

分散プラットフォーム上でブローカーをデプロイする予定の z/OS で、 Java ノードを開発しないでください。 これは、z/OS 上の Java のレベルが、分散プラットフォーム上で Java のレベルと互換性のある コードを作成しない可能性があるためです。

外部データのバッファーへの受信

入力ノードは、ノードからの出力が正しい形式になっている限り、 どんなタイプの外部ソース (ファイル・システム、キュー、またはデータベースなど) からでも、 他の Java プログラムと同じようにデータを受信することができます。

ユーザーは入力データを格納する入力バッファー (またはビット・ストリーム) を準備し、 これをメッセージ・オブジェクトと関連付けます。 MbInputNode クラスの createMessage メソッドを使用してバイト・アレイからメッセージを作成し、 このメッセージから有効なメッセージ・アセンブリーを生成します (これらのメソッドの詳細については、Javadoc を参照してください)。 たとえば、ファイルから入力データを読み取るには、次のようにします。

  1. ファイルから読み取る入力ストリームを作成します。
    FileInputStream inputStream = new FileInputStream("myfile.msg");
  2. 入力ファイルのサイズのバイト・アレイを作成します。
    byte[] buffer = new byte[inputStream.available()];
  3. ファイルからバイト・アレイに読み取ります。
    inputStream.read(buffer);
  4. 入力ストリームを閉じます。
    inputStream.close();
  5. キューに書き込むメッセージを作成します。
    MbMessage msg = createMessage(buffer);
  6. このメッセージを保管するための新しいメッセージ・アセンブリーを作成します。
    msg.finalizeMessage(MbMessage.FINALIZE_VALIDATE);
    MbMessageAssembly newAssembly = 
              new MbMessageAssembly(assembly, msg);

メッセージの伝搬

メッセージ・アセンブリーを作成したら、それをノードのターミナルの 1 つに伝搬することができます。

たとえば、メッセージ・アセンブリーを "out" ターミナルに伝搬するには、次のようにします。
MbOutputTerminal out = getOutputTerminal("out");
out.propagate(newAssembly);

スレッド化とトランザクション動作の制御

ブローカー・インフラストラクチャーは、 メッセージ処理の完了時の WebSphere MQ やデータベースの作業単位のコミットの制御などのトランザクション問題を処理します。 しかし、ユーザー定義のノードが使用されている場合、 ブローカーはリソースの更新を自動的にコミットすることはできません。

各メッセージ・フロー・スレッドは、 各メッセージ・フローごとに維持されているスレッドのプールから割り振られ、 run メソッドで実行を開始します。

ユーザー定義ノードが戻り値を使用して、トランザクションが正常かどうかを示し、 トランザクションをコミットするかロールバックするか、 また、いつスレッドをプールに戻すかを制御します。 ブローカーのインフラストラクチャーが処理されない例外を検出した場合、 トランザクションはロールバックされます。

トランザクションとスレッド化の動作は、以下の適切な戻り値を使用して判断します。

MbInputNode.SUCCESS_CONTINUE
トランザクションはコミットされ、 ブローカーが同じスレッドを使用して再び run メソッドを呼び出します。
MbInputNode.SUCCESS_RETURN
トランザクションがコミットされ、 このメッセージ・フローに対する唯一のスレッドでなければ、スレッドがスレッド・プールに戻されます。
MbInputNode.FAILURE_CONTINUE
トランザクションはロールバックされ、 ブローカーが同じスレッドを使用して再び run メソッドを呼び出します。
MbInputNode.FAILURE_RETURN
トランザクションがロールバックされ、 このメッセージ・フローに対する唯一のスレッドでなければ、スレッドがスレッド・プールに戻されます。
MbInputNode.TIMEOUT
run メソッドが、 入力データの到着を待ちながらいつまでもフローを阻止することがあってはなりません。 フローがユーザー・コードによって阻止されている間は、 そのブローカーをシャットダウンしたり再構成したりすることができません。 run メソッドは、 定期的に run メソッドから制御を戻してブローカーに譲る必要があります。 特定の期間 (たとえば、5 秒間) が過ぎても入力データが受信されない場合、 メソッドは TIMEOUT 戻りコードと共に制御を戻さなければなりません。 ブローカーをシャットダウンしたり再構成したりする必要がなければ、 入力ノードの run メソッドがすぐにまた呼び出されます。
マルチスレッド・メッセージ・フローを作成する場合は、 メッセージの作成後、そのメッセージを出力ターミナルに伝搬する前に、 dispatchThread メソッドを呼び出します。 こうすると、他のスレッドがメッセージを処理している間に、 1 つのスレッドだけがデータを待つようになります。 スレッド・プールからは、 メッセージ・フローの additionalInstances 属性で指定された最大限度までの新規スレッドが取得されます。 たとえば、以下のようになります。
public int run( MbMessageAssembly assembly ) throws MbException
{
    byte[] data = getDataWithTimeout();  // user supplied method
                                                                              // returns null if timeout
    if( data == null )
        return TIMEOUT;

    MbMessage msg = createMessage( data );
    msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE );
  MbMessageAssembly newAssembly = 
              new MbMessageAssembly( assembly, msg );

    dispatchThread();

    getOutputTerminal( "out" ).propagate( newAssembly );

    return SUCCESS_RETURN;
}

例外の処理

mbException クラスを使用して、 例外をキャッチして例外に接続します。 mbException クラスは、 ブローカー例外リスト内の例外の子を表す例外オブジェクトの配列を戻します。 戻される各エレメントは、その例外タイプを指定します。 例外に子が存在しない場合には、空の配列が戻されます。 次のコード・サンプルは、MbException クラスの使用法の例を示しています。

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
        try
      {

                // plug-in functionality

      }
        catch(MbException ex)
      {
                traverse(ex, 0);

                throw ex; // if re-throwing, it must be the original exception that was caught
      }
  }

    void traverse(MbException ex, int level)
  {
        if(ex != null)
      {
                // Do whatever action here
                System.out.println("Level: " + level);
                System.out.println(ex.toString());
                System.out.println("traceText:  " + ex.getTraceText());

                // traverse the hierarchy
                MbException e[] = ex.getNestedExceptions();
                int size = e.length;
                for(int i = 0; i < size; i++)
          {
                        traverse(e[i], level + 1);
          }
      }
  }

mbException クラスの使用法の詳細については、javadoc を参照してください。

こうした方法で、すべての現行の例外にアクセスできるユーザー定義メッセージ処理ノードまたは出力ノードを開発できます。 たとえば、 データベース例外をキャッチするには MbSQLStatement クラスを使用できます。 このクラスは「throwExceptionOnDatabaseError」属性の値を設定します。 この属性はデータベース・エラーの発生時のブローカー動作を決定します。 true に設定すると、 例外が送出される場合、ユーザー定義拡張機能がキャッチして処理できます。

次のコード・サンプルは、 MbSQLStatement クラスの使用法の例を示しています。

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
        MbMessage newMsg = new MbMessage(assembly.getMessage());
        MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg);

        String table = assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName();
    MbSQLStatement state = createSQLStatement( "dbName", 
                                                      "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" );

        state.setThrowExceptionOnDatabaseError(false);
        state.setTreatWarningsAsErrors(true);

        state.select( assembly, newAssembly );

        int sqlCode = state.getSQLCode();
        if(sqlCode != 0)
      {
                // Do error handling here

                System.out.println("sqlCode = " + sqlCode);
                System.out.println("sqlNativeError = " + state.getSQLNativeError());
                System.out.println("sqlState = " + state.getSQLState());
                System.out.println("sqlErrorText = " + state.getSQLErrorText());
      }

        getOutputTerminal("out").propagate(assembly);
  }

新規 Java プロジェクトの作成

Java ノードは、次のように、ワークベンチから、準備されている plug-in development environment (PDE) を使用して作成できます。 このためには、次のようにして新規 Java プロジェクトを作成する必要があります。
  1. 「プラグイン開発 (Plug-in Development)」パースペクティブに切り替えます。
  2. 「ファイル (File)」>「新規 (New)」>「プロジェクト (Project)」とクリックします。 左方のメニューから「Java」を選択してから、 右方のメニューから「Java プロジェクト (Java Project)」を選択します。
  3. プロジェクトに名前を付けます。

    「Java 設定 (Java Settings)」パネルが表示されます。

  4. 「ライブラリー (Libraries)」タブを選択して、 「外部 JAR の追加 (Add External JARs)」をクリックします。
  5. install_dir\classes\jplugin.jar を選択します。
  6. 「その他 (Other)」タブのプロンプトに従って、 他のビルド設定を定義します。
  7. 「終了 (Finish)」をクリックします。
このようにしたら、次はこのプロジェクトの中で Java ノードのソースを開発することができます。

入力ノード・クラスの宣言

MbInputNodeInterface をインプリメントし、 ブローカーのクラスパス (または LIL パス) に含められるすべてのクラスは、 入力ノードとしてブローカーに登録されます。 MbInputNodeInterface をインプリメントする場合は、 このクラスの run メソッドもインプリメントしなければなりません。 run メソッドは、 メッセージ・フローの開始を表し、 メッセージを公式化するデータを含んでいて、これをフローに伝搬します。 ブローカーは、スレッドが使用可能になると、 指定されたスレッド化モデルに従って run メソッドを呼び出します。

入力ノード・クラスを宣言するには、たとえば次のようにします。

package com.ibm.jplugins;

import com.ibm.broker.plugin.*;

public class BasicInputNode extends MbInputNode implements MbInputNodeInterface
{
...
次のようにして、ワークベンチで実行できます。
  1. 「ファイル (File)」>「新規 (New)」>「クラス (Class)」をクリックします。
  2. パッケージおよびクラス名フィールドに適切な値を設定します。
  3. 「スーパークラス (Superclass)」テキスト・フィールドのテキストを削除し、 「ブラウズ (Browse)」をクリックします。
  4. MbInputNode を選択します。
  5. 「インターフェース (Interfaces)」テキスト・フィールドの横にある「追加 (Add)」ボタンをクリックして、 MbInputNodeInterface を選択します。
  6. 「終了 (Finish)」をクリックします。

ノードのコンストラクターの定義

ノードがインスタンス化されるときは、 ユーザーのノード・クラスのコンストラクターが呼び出されます。 これは、ノードのターミナルを作成し、属性のデフォルト値を初期設定する場所です。

入力ノードには、いくつかの出力ターミナルが関連付けられますが、 通常は入力ターミナルはありません。 ノードに出力ターミナルを追加するには、 ノードをインスタンス化するときに createOutputTerminal メソッドを使用します。 たとえば、3 つの出力ターミナルを持つノードを作成するには、 次のようにします。

public BasicInputNode() throws MbException
{
	createOutputTerminal ("out");
	createOutputTerminal ("failure");
	createOutputTerminal ("catch");
      setAttribute ("firstParserClassName","myParser");
      attributeVariable  = new String ("none");
}

ノード名の宣言

ノードの名前を宣言して、ノードがワークベンチに識別されるようにする必要があります。 すべてのノード名は末尾が "Node" でなければなりません。 名前は以下のメソッドを使用して宣言します。

public static String getNodeName()
{
      return "BasicInputNode";
}
このメソッドが宣言されていない場合は、 Java API フレームワークが以下の規則に従ってデフォルトのノード名を作成します。
  • パッケージ名にはクラス名が付加されます。
  • ドットは除去され、 パッケージ名とクラス名の各部の最初の文字が大文字になります。
たとえば、 デフォルトでは以下のクラスにノード名 "ComIbmPluginsamplesBasicInputNode" が割り当てられます。
package com.ibm.pluginsamples;
public class BasicInputNode extends MbInputNode implements MbInputNodeInterface
{
   ...

属性の宣言

ノード属性は、Java Bean プロパティーと同じ方法で宣言します。 属性の getter メソッドおよび setter メソッドはユーザーが作成しなければなりません。 API フレームワークは、Java Bean 内視規則を使用して属性名を推測します。 たとえば、以下の 2 つのメソッドを宣言するには、次のようにします。

private String attributeVariable;

public String getFirstAttribute()
{
    return attributeVariable;
}

publc void setFirstAttribute(String value)
{
    attributeVariable = value;
}

ブローカーは、このノードが firstAttribute という属性を持つことを推測します。 この名前は、内部クラス・メンバー変数名ではなく、get または set メソッドの名前に由来します。 属性はストリングとしてしか公開できないので、 get または set メソッドで、数値タイプとストリングの変換を行う必要があります。 たとえば、次のメソッドは timeInSeconds という属性を定義します。

int seconds;

public String getTimeInSeconds()
{
    return Integer.toString(seconds);
}

public void setTimeInSeconds(String value)
{
    seconds = Integer.parseInt(value);
}

ノード機能のインプリメント

先に述べたとおり、ブローカーは run メソッドを呼び出して入力メッセージを作成します。 このメソッドは、入力ノードのためのすべてのアプリケーション処理機能を備えている必要があります。

デフォルトのメッセージ・パーサー属性のオーバーライド (オプション)

入力ノードは通常、 最初に入力メッセージを構文解析するメッセージ・パーサーを判別します。 たとえば、プリミティブ MQInput ノードは、 MQMD ヘッダーを構文解析するために MQMD パーサーが必要であることを指示します。 ユーザー定義入力ノードは、 デフォルトとして組み込まれる、オーバーライドも可能な以下の属性を使用することによって、 適切なヘッダーかメッセージ・パーサー、および構文解析の制御のモードを選択することができます。

rootParserClassName
ユーザー定義入力ノードによってサポートされているメッセージ形式を構文解析するルート・パーサーの名前を定義します。 デフォルトでは GenericRoot に設定されます。 これは、ブローカーにパーサーを割り振らせたりチェーニングするために提供されているルート・パーサーです。 おそらく、ノードでこの属性値を変更する必要はありません。
firstParserClassName
ビット・ストリームを構文解析する役割を持つパーサーのチェーンの中で、 最初のパーサーの名前を定義します。 デフォルトでは XML に設定されます。
messageDomainProperty
入力メッセージを構文解析するのに必要なメッセージ・パーサーの名前を定義するオプションの属性。サポートされる値は、MQInput ノードによってサポートされるものと同じです。 (MQInput ノードの詳細については、MQInput ノードを参照してだくさい。
messageSetProperty
MRM パーサーが messageDomainProperty 属性によって指定された場合にのみ、 Message Set フィールドのメッセージ・セットの ID (またはメッセージ・セット名) を定義するオプションの属性。
messageTypeProperty
MRM パーサーが messageDomainProperty 属性によって指定された場合にのみ、 MessageType フィールドのメッセージの ID を定義するオプションの属性。
messageFormatProperty
MRM パーサーが messageDomainProperty 属性によって指定された場合にのみ、 Message Format フィールドのメッセージの形式を定義するオプションの属性。

ノードのインスタンスの削除

ノードのインスタンスは、次のいずれかの場合に削除します。
  • ブローカーがシャットダウンされた。
  • ノードまたはノードを含むメッセージ・フローを除去し、 構成を再デプロイした。
ノードの削除中は、 ノードがソケットの終了などの終結処理操作を実行できるよう、 ノードに情報が通知されるようにする必要がある場合があります。 ノードがオプションの onDelete メソッドをインプリメントしていれば、 ノードが削除される前にこれがブローカーによって呼び出されます。

onDelete メソッドは次のようにしてインプリメントします。

public void onDelete()
{
    // perform node cleanup if necessary
}

関連概念
ユーザー定義拡張機能の計画
ランタイム環境でのユーザー定義拡張機能
ユーザー定義拡張機能の設計
ユーザー定義の入力ノード

関連タスク
ユーザー定義拡張機能の開発
提供されているサンプルのインプリメント
Java ユーザー定義ノードのコンパイル

関連資料
MQInput ノード
例外リスト構造
Java ユーザー定義ノード API