Java 入力ノードの機能の拡張

始める前に

以下のトピックを読み、理解していることを確認してください。
ユーザー定義ノードを作成した後、以下の関数が使用可能になります。
  1. 外部データのバッファーへの受信
  2. メッセージの伝搬
  3. スレッド化とトランザクション性の制御
  4. 例外の処理

外部データのバッファーへの受信

入力ノードは、ノードからの出力が正しい形式になっている限り、 どんなタイプの外部ソース (ファイル・システム、キュー、またはデータベースなど) からでも、 他の Java プログラムと同じようにデータを受信することができます。

ユーザーは入力データを格納する入力バッファー (またはビット・ストリーム) を準備し、これをメッセージ・オブジェクトと関連付けます。 MbInputNode クラスの createMessage メソッドを使用してバイト・アレイからメッセージを作成し、 このメッセージから有効なメッセージ・アセンブリーを生成します。 これらのメソッドについての詳細は、com/ibm/broker/plugin/package-overview.htmlを参照してください。 たとえば、ファイルから入力データを読み取るには、次のようにします。

  1. ファイルから読み取る入力ストリームを作成します。
    FileInputStream inputStream = new FileInputStream("myfile.msg");
  2. 入力ファイルのサイズのバイト・アレイを作成します。
    byte[] buffer = new byte[inputStream.available()];
  3. ファイルからバイト・アレイに読み取ります。
    inputStream.read(buffer);
  4. 入力ストリームを閉じます。
    inputStream.close();
  5. キューに書き込むメッセージを作成します。
    MbMessage msg = createMessage(buffer);
  6. このメッセージを保管するための新しいメッセージ・アセンブリーを作成します。
    msg.finalizeMessage(MbMessage.FINALIZE_VALIDATE);
    MbMessageAssembly newAssembly = 
              new MbMessageAssembly(assembly, msg);

メッセージの伝搬

メッセージ・アセンブリーを作成したら、それをノードのターミナルの 1 つに伝搬することができます。

たとえば、メッセージ・アセンブリーを "out" ターミナルに伝搬するには、次のようにします。
MbOutputTerminal out = getOutputTerminal("out");
out.propagate(newAssembly);

スレッド化とトランザクション性の制御

ブローカー・インフラストラクチャーは、 メッセージ処理の完了時の WebSphere MQ やデータベースの作業単位のコミットの制御などのトランザクション問題を処理します。 しかし、ユーザー定義のノードが使用されている場合、 ブローカーはリソースの更新を自動的にコミットすることはできません。

各メッセージ・フロー・スレッドは、 各メッセージ・フローごとに維持されているスレッドのプールから割り振られ、 run メソッドで実行を開始します。

ユーザー定義ノードが戻り値を使用して、トランザクションが正常かどうかを示し、 トランザクションをコミットするかロールバックするか、 また、いつスレッドをプールに戻すかを制御します。 ブローカーのインフラストラクチャーが処理されない例外を検出した場合、 トランザクションはロールバックされます。

トランザクションとスレッド化の動作は、以下の適切な戻り値を使用して判断します。

MbInputNode.SUCCESS_CONTINUE
トランザクションはコミットされ、 ブローカーが同じスレッドを使用して再び run メソッドを呼び出します。
MbInputNode.SUCCESS_RETURN
トランザクションがコミットされ、 このメッセージ・フローに対する唯一のスレッドでなければ、スレッドがスレッド・プールに戻されます。
MbInputNode.FAILURE_CONTINUE
トランザクションはロールバックされ、 ブローカーが同じスレッドを使用して再び run メソッドを呼び出します。
MbInputNode.FAILURE_RETURN
トランザクションがロールバックされ、 このメッセージ・フローに対する唯一のスレッドでなければ、スレッドがスレッド・プールに戻されます。
MbInputNode.TIMEOUT
run メソッドが、 入力データの到着を待ちながらいつまでもフローを阻止することがあってはなりません。 フローがユーザー・コードによってブロックされている間は、 そのブローカーをシャットダウンしたり再構成したりすることができません。 run メソッドは、 定期的に run メソッドから制御を戻してブローカーに譲る必要があります。 特定の期間 (たとえば、5 秒間) が過ぎても入力データが受信されない場合、 メソッドは TIMEOUT 戻りコードと共に制御を戻さなければなりません。 ブローカーをシャットダウンしたり再構成したりする必要がなければ、 入力ノードの run メソッドがすぐにまた呼び出されます。
マルチスレッド・メッセージ・フローを作成する場合は、 メッセージの作成後、そのメッセージを出力ターミナルに伝搬する前に、 dispatchThread メソッドを呼び出します。 こうすると、他のスレッドがメッセージを処理している間に、 1 つのスレッドだけがデータを待つようになります。 スレッド・プールからは、 メッセージ・フローの additionalInstances 属性で指定された最大限度までの新規スレッドが取得されます。 以下に例を示します。
public int run( MbMessageAssembly assembly ) throws MbException
{
  byte[] data = getDataWithTimeout();  // user supplied method
                                       // returns null if timeout
  if( data == null )
    return TIMEOUT;

  MbMessage msg = createMessage( data );
  msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE );
  MbMessageAssembly newAssembly =
       new MbMessageAssembly( assembly, msg );

  dispatchThread();

  getOutputTerminal( "out" ).propagate( newAssembly );

  return SUCCESS_RETURN;
}

例外の処理

mbException クラスを使用して、 例外をキャッチして例外に接続します。 mbException クラスは、 ブローカー例外リスト内の例外の子を表す例外オブジェクトの配列を戻します。 戻される各エレメントは、その例外タイプを指定します。 例外に子が存在しない場合には、空の配列が戻されます。 次のコード・サンプルは、MbException クラスの使用法の例を示しています。

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
    try
      {

        // plug-in functionality

      }
    catch(MbException ex)
      {
        traverse(ex, 0);

        throw ex; // if re-throwing, it must be the original exception that was caught
      }
  }

  void traverse(MbException ex, int level)
  {
    if(ex != null)
      {
        // Do whatever action here
        System.out.println("Level: " + level);
        System.out.println(ex.toString());
        System.out.println("traceText:  " + ex.getTraceText());

        // traverse the hierarchy
        MbException e[] = ex.getNestedExceptions();
        int size = e.length;
        for(int i = 0; i < size; i++)
          {
            traverse(e[i], level + 1);
          }
      }
  }

mbException クラスの使用法の詳細については、javadoc を参照してください。

すべての現行の例外にアクセスできるような仕方で、ユーザー定義メッセージ処理ノードまたは出力ノードを開発できます。 たとえば、 データベース例外をキャッチするには MbSQLStatement クラスを使用できます。 このクラスは「throwExceptionOnDatabaseError」属性の値を設定します。 この属性はデータベース・エラーの発生時のブローカー動作を決定します。 true に設定すると、例外がスローされる場合、ユーザー定義拡張機能がキャッチして処理できます。

次のコード・サンプルは、 MbSQLStatement クラスの使用法の例を示しています。

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
    MbMessage newMsg = new MbMessage(assembly.getMessage());
    MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg);

    String table =
       assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName();

    MbSQLStatement state = createSQLStatement( "dbName",
       "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" );

    state.setThrowExceptionOnDatabaseError(false);
    state.setTreatWarningsAsErrors(true);

    state.select( assembly, newAssembly );

    int sqlCode = state.getSQLCode();
    if(sqlCode != 0)
      {
        // Do error handling here

        System.out.println("sqlCode = " + sqlCode);
        System.out.println("sqlNativeError = " + state.getSQLNativeError());
        System.out.println("sqlState = " + state.getSQLState());
        System.out.println("sqlErrorText = " + state.getSQLErrorText());
      }

    getOutputTerminal("out").propagate(assembly);
  }
関連情報
Java API
特記事項 | 商標 | ダウンロード | ライブラリー | サポート | フィードバック
Copyright IBM Corporation 1999, 2005 Last updated: 11/07/2005
as24987_