C での入力ノードの作成

始める前に

以下のトピックを読み、理解していることを確認してください。

ロード可能インプリメンテーション・ライブラリー、または LIL、 は C ノード (またはパーサー) のインプリメンテーション・モジュールです。 LIL はダイナミック・リンク・ライブラリー (DLL) としてインプリメントされます。 これにはファイル拡張子 .dll は付けられず、.lil が付けられます。

開発者が作成する必要のあるインプリメンテーション関数は、C 言語ノード・インプリメンテーション関数でリストしています。このプロセスに役立てるために WebSphere Business Integration Message Broker によって提供されるユーティリティー関数は、C 言語ノード・ユーティリティー関数でリストしています。

WebSphere Business Integration Message Broker では、SwitchNode および TransformNode という名前の 2 つのサンプル・ユーザー定義ノードのソースが準備されています。 これらのノードは現行の状態で使用することもできますし、変更を加えてもかまいません。

このトピックでは、 C を使用して入力ノードを作成するために実行する必要のあるステップについて説明します。 以下のステップについて要約しています。
  1. ノードの初期化
  2. 入力ノードとしてのノードの定義
  3. ノードのインスタンスの作成
  4. 属性の設定
  5. ノード機能のインプリメント
  6. デフォルトのメッセージ・パーサー属性のオーバーライド (オプション)
  7. ノードのインスタンスの削除

外部データのバッファーへの受信

入力ノードは、ノードからの出力が正しい形式になっている限り、 ファイル・システムや FTP 接続などのどんなタイプの外部ソースからでも、データを受信することができます。 キューやデータベースへの接続には、 提供されている IBM プリミティブ・ノードと API 呼び出しを使用する必要があります。 その主な理由は、プリミティブ・ノードにはエラー処理のためのセットアップがすでになされているためです。 データベース・テーブルへの直接アクセスのための mqgetmqput コマンドは使用しないでください。

ユーザーは入力データを格納する入力バッファー (またはビット・ストリーム) を準備し、 これをメッセージ・オブジェクトと関連付ける必要があります。 C API では、cniSetInputBuffer ユーティリティー関数を使用することによって、 入力メッセージを表す CciMessage オブジェクトにバッファーが割り当てられます。 たとえば、以下のようになります。
{
    static char* functionName = (char *)"_Input_run()";
    void*        buffer;
    CciTerminal* terminalObject;
    int          buflen = 4096;
    int          rc = CCI_SUCCESS;
    int          rcDispatch = CCI_SUCCESS;
    char         xmlData[] = "<A>data</A>";

    buffer = malloc(buflen);
    memcpy(buffer, &xmlData, sizeof(xmlData));
    cniSetInputBuffer(&rc, message, buffer, buflen);
}
/*propagate etc*/

free(buffer);
上記の例は、割り振られているメモリーの領域を示しています (buffer = malloc(buflen);)。 C でプログラミングしているとき、 メモリーを割り振る場合には、それが不要になったら開放しなければなりません。 メッセージがフローに伝搬されている間はいつでも、 ブローカーがこのメモリーにアクセスしようとする可能性があるので、 メモリーの解放は、同じ CciMessage に対して cniPropagate を呼び出した後にだけ行う必要があります。

スレッド化とトランザクションの制御

入力ノードの役割は、メッセージ・フローでメッセージが伝搬されたときに、 メッセージの処理を適切に終了することです。 特に、入力ノードはすべてのトランザクションをコミットまたはロールバックし、 スレッドをスレッド・プールに戻す必要があります。

各メッセージ・フロー・スレッドは、 各メッセージ・フローごとに維持されているスレッドのプールから割り振られ、 cniRun 関数で実行を開始します。 スレッドの動作は、cniDispatchThread ユーティリティー関数と適切な戻り値を使用して判断します。

本書では、一般にトランザクション という用語を、 グローバルな整合トランザクションまたはブローカー制御トランザクションのどちらかを言い表すために使用しています。 グローバルな整合トランザクションは、 XA 準拠の Transaction Manager としての WebSphere MQ か、 z/OS でのリソース・リカバリー・サービス (RRS) のどちらかによって 整合がとられます。 WebSphere Business Integration Message Broker は、データベース・リソースをコミット (またはロールバック) してから、 WebSphere MQ 作業単位をコミットすることによってトランザクションを制御しますが、 ユーザー定義ノードが使用されている場合、 ブローカーはリソースの更新を自動的にコミットすることはできません。 ユーザー定義ノードが、戻り値を使用して、トランザクションが正常かどうかを示し、 トランザクションをコミットするかロールバックするかを制御します。 ブローカーのインフラストラクチャーが処理されない例外を検出した場合、 トランザクションはロールバックされます。

以下の表では、サポートされている各戻り値と、 それぞれがトランザクションに及ぼす影響、 および現行スレッドに対してブローカーが行う処理について説明しています。

戻り値 トランザクションへの影響 スレッドに対するブローカーの処置
CCI_SUCCESS_CONTINUE コミットされます。 cniRun 関数で同じスレッドを再度呼び出します。
CCI_SUCCESS_RETURN コミットされます。 スレッド・プールにスレッドを戻します。
CCI_FAILURE_CONTINUE ロールバックされます。 cniRun 関数で同じスレッドを再度呼び出します。
CCI_FAILURE_RETURN ロールバックされます。 スレッド・プールにスレッドを戻します。
CCI_TIMEOUT 適用されません。 関数が、入力メッセージを待っている間に、定期的にタイムアウトします。 cniRun 関数で同じスレッドを再度呼び出します。
以下は、 cniDispatchThread 関数での SUCCESS_RETURN 戻りコードの使用例です。
{
  ...
    cniDispatchThread(&rcDispatch, ((NODE_CONTEXT_ST *)context)->nodeObject);
  ...
    if (rcDispatch == CCI_NO_THREADS_AVAILABLE) return CCI_SUCCESS_CONTINUE;
    else return CCI_SUCCESS_RETURN;
}     

メッセージの伝搬

メッセージを伝搬する前に、何のメッセージ・フロー・データを伝搬するか、 また、どのターミナルでデータを受信するかを決定する必要があります。

terminalObject は、ユーザー定義ノード自体が保守しているリストに由来します。

たとえば、出力ターミナルにメッセージを伝搬するには、 次のように、cniPropagate 関数を使用します。
  if (terminalObject) {
        if (cniIsTerminalAttached(&rc, terminalObject)) {
            if (rc == CCI_SUCCESS) {
                cniPropagate(&rc, terminalObject, destinationList, exceptionList, message);
      }
    }

上記の例で、 cniIsTerminalAttached 関数は、 メッセージを指定されたターミナルに伝搬できるか検査するために使用されています。 cniIsTerminalAttached 関数が使用されず、 ターミナルがコネクターによって他のノードに付加されていない場合、 メッセージは伝搬されません。 この関数を使用するならば、 ターミナルが接続されていない場合でも、 ノードの動作を変更することができます。

ノードの初期化

以下の手順は、ノードの初期化方法を表しています。

  1. LIL がロードされ、オペレーティング・システムによって初期化されると、 ブローカーによって初期化関数 bipGetMessageflowNodeFactory が呼び出されます。 ブローカーは、 LIL が実行できる事柄と、ブローカーが LIL を呼び出す方法を理解するために、この関数を呼び出します。 たとえば、以下のようになります。
    CciFactory LilFactoryExportPrefix * LilFactoryExportSuffix
    bipGetMessageflowNodeFactory()
  2. bipGetMessageflowNodeFactory 関数が、 ユーティリティー関数 cniCreateNodeFactory を呼び出します。 この関数は、LIL がサポートするすべてのノードのファクトリー名 (またはグループ名) を戻します。 たとえば、以下のようになります。
    {
    	CciFactory* factoryObject;
    	int rc = 0;
    	CciChar factoryName[] = L"SwitchNodeFactory";
    	CCI_EXCEPTION_ST exception_st;
    
    	/* Create the Node Factory for this node */
    	factoryObject = cniCreateNodeFactory(0, factoryName);
    	if (factoryObject == CCI_NULL_ADDR) {
    		if (rc == CCI_EXCEPTION) {
    			/* Get details of the exception */
    			cciGetLastExceptionData(&rc, &exception_st);
    
    			/* Any local error handling can go here */
    
    			/* Rethrow the exception */
    			cciRethrowLastException(&rc);
    		}
    
    		/* Any further local error handling can go here */
    	}
    	else {
    		/* Define the nodes supported by this factory */
    		defineSwitchNode(factoryObject);
    	}
    
    	/* Return address of this factory object to the broker */
    	return(factoryObject);
    }
  3. LIL は次にユーティリティー関数 cniDefineNodeClass を呼び出して、 各ノードの名前と、インプリメンテーション関数のアドレスの仮想関数テーブルを渡します。 たとえば、 InputxNode という名前の単一のノードとその関数テーブルを定義するには、 次のようにします。
    void defineInputxNode(void* factoryObject){
    	static CNI_VFT vftable = {CNI_VFT_DEFAULT};
    
    	/* Setup function table with pointers to node implementation functions */
    	vftable.iFpCreateNodeContext = _createNodeContext;
    	vftable.iFpDeleteNodeContext = _deleteNodeContext;
    	vftable.iFpGetAttributeName = _getAttributeName;
    	vftable.iFpSetAttribute = _setAttribute;
    	vftable.iFpGetAttribute = _getAttribute;
    	vftable.iFpRun = _run;
    
    	cniDefineNodeClass(0, factoryObject, L"InputxNode", &vftable);
    
    	return;
    }

入力ノードとしてのノードの定義

ユーザー定義ノードは、cniRun インプリメンテーション関数をインプリメントすることによって、 入力ノードの機能を提供するという役割を果たします。 ユーザー定義ノードは cniRun 関数をインプリメントしていなければなりません。 そうでないと、ブローカーはそのユーザー定義ノードをロードせず、 cniDefineNodeClass ユーティリティー関数が失敗して、 CCI_MISSING_IMPL_FUNCTION が戻されます。 ユーザー定義入力ノードが含まれたメッセージ・フローが正常にデプロイされていれば、 ブローカーは定期的にそのノードの cniRun インプリメンテーション関数を呼び出します。

たとえば、以下のようになります。
int cniRun(
    CciContext* context,
    CciMessage* destinationList,
    CciMessage* exceptionList,
    CciMessage* message
){          
  ...
    /* Get data from external source */
    return CCI_SUCCESS_CONTINUE;
}
この戻り値は、定期的にブローカーに制御を戻すために使用されます。

ユーザー定義入力ノードが含まれたメッセージ・フローが正常にデプロイされていれば、 メッセージ・フローに渡される各メッセージごとに、 このノードの cniRun 関数が呼び出されます。

入力ノードには cniEvaluate をインプリメントすることもできますが、 これはお勧めしません。

ノードのインスタンスの作成

以下の手順は、ノードをインスタンス化する方法を表しています。

  1. ブローカーは、関数ポインターのテーブルを受信すると、 ユーザー定義ノードの各インスタンス化ごとに関数 cniCreateNodeContext を呼び出します。 ユーザー定義ノードを使用する 3 つのメッセージ・フローがある場合は、 3 つのそれぞれに対して cniCreateNodeContext 関数が呼び出されます。 この関数は、そのユーザー定義ノードのインスタンス化用に、 構成された属性の値を保持するためのメモリーを割り振ります。 たとえば、以下のようになります。
    1. cniCreateNodeContext 関数を呼び出します。
      CciContext* _Switch_createNodeContext(
          CciFactory* factoryObject,
          CciChar*    nodeName,
          CciNode*    nodeObject
      ){
          static char* functionName = (char *)"_Switch_createNodeContext()";
          NODE_CONTEXT_ST* p;
          CciChar          buffer[256];
      
    2. ローカル・コンテキストにポインターを割り振り、コンテキスト・エリアを消去します。
            p = (NODE_CONTEXT_ST *)malloc(sizeof(NODE_CONTEXT_ST));
      
          if (p) {
                memset(p, 0, sizeof(NODE_CONTEXT_ST));
    3. コンテキストの中のノード・オブジェクト・ポインターを保存します。
         p->nodeObject = nodeObject;
    4. ノード名を保存します。
        CciCharNCpy((CciChar*)&p->nodeName, nodeName, MAX_NODE_NAME_LEN);
  2. 入力ノードには、いくつかの出力ターミナルが関連付けられますが、 通常は入力ターミナルはありません。 ノードをインスタンス化するときに、 ユーティリティー関数 cniCreateOutputTerminal を使用して、 入力ノードに出力ターミナルを追加します。 これらの関数は、cniCreateNodeContext インプリメンテーション関数内で呼び出されなければなりません。 たとえば、 3 つの出力ターミナルを持つ入力ノードを定義するには、 次のようにします。
       {
                const CciChar* ucsOut = CciString("out", BIP_DEF_COMP_CCSID) ;
                insOutputTerminalListEntry(p, (CciChar*)ucsOut);
                free((void *)ucsOut) ;
        }
        {
                const CciChar* ucsFailure = CciString("failure", BIP_DEF_COMP_CCSID) ;
                insOutputTerminalListEntry(p, (CciChar*)ucsFailure);
                free((void *)ucsFailure) ;
        }    
        {
                const CciChar* ucsCatch = CciString("catch", BIP_DEF_COMP_CCSID) ;
                insOutputTerminalListEntry(p, (CciChar*)ucsCatch);
          free((void *)ucsCatch) ;    }

属性の設定

属性は、ブローカーを開始するとき、 あるいは新しい値を持つメッセージ・フローを再デプロイするときに設定します。

出力ターミナルの作成後、 ブローカーは cniSetAttribute 関数を呼び出して、 構成されているユーザー定義ノードの属性の値を渡します。 たとえば、以下のようになります。
    {
            const CciChar* ucsAttr = CciString("nodeTraceSetting", BIP_DEF_COMP_CCSID) ;
            insAttrTblEntry(p, (CciChar*)ucsAttr, CNI_TYPE_INTEGER);
            _setAttribute(p, (CciChar*)ucsAttr, (CciChar*)constZero);
            free((void *)ucsAttr) ;
    }
    {
            const CciChar* ucsAttr = CciString("nodeTraceOutfile", BIP_DEF_COMP_CCSID) ;
            insAttrTblEntry(p, (CciChar*)ucsAttr, CNI_TYPE_STRING);
            _setAttribute(p, (CciChar*)ucsAttr, (CciChar*)constSwitchTraceLocation);
            free((void *)ucsAttr) ;
    }

ノード機能のインプリメント

入力ノードがあることをブローカーが認識している場合、 ブローカーは定期的にこのノードの cniRun 関数を呼び出します。 cniRun 関数は、 実行すべきアクションを判断しなければなりません。 処理のためのデータがあれば、cniRun 関数は cniDispatchThread を呼び出してメッセージを処理し、 そうでなければ CCI_TIMEOUT と共に制御を戻して、 ブローカーが他のスレッド上の他のメッセージの処理に進むことができるようにします。 スレッドがディスパッチされていないと、 ブローカーはこのスレッドの中でそのすべての時間を費やし、 そのために他の事柄を何も実行しなくなります。

たとえば、cniDispatchThread を呼び出してメッセージを処理するか、 そうでなければ CCI_TIMEOUT と共に制御を戻すようにノードを構成するには、 次のようにします。
If ( anything to do )
	CniDispatchThread;

      /* do the work */

	If ( work done O.K.)
		Return CCI_SUCCESS_CONTINUE;
	Else
		Return CCI_FAILURE_CONTINUE;
Else
  Return CCI_TIMEOUT;

デフォルトのメッセージ・パーサー属性のオーバーライド (オプション)

入力ノードは通常、 最初に入力メッセージを構文解析するメッセージ・パーサーを判別します。 たとえば、プリミティブ MQInput ノードは、 MQMD ヘッダーを構文解析するために MQMD パーサーが必要であることを指示します。 ユーザー定義入力ノードは、 デフォルトとして組み込まれる、オーバーライドも可能な以下の属性を使用することによって、 適切なヘッダーかメッセージ・パーサー、および構文解析の制御のモードを選択することができます。

rootParserClassName
ユーザー定義入力ノードによってサポートされているメッセージ形式を構文解析するルート・パーサーの名前を定義します。 デフォルトでは GenericRoot に設定されます。 これは、ブローカーにパーサーを割り振らせたりチェーニングするために提供されているルート・パーサーです。 おそらく、ノードでこの属性値を変更する必要はありません。
firstParserClassName
ビット・ストリームを構文解析する役割を持つパーサーのチェーンの中で、 最初のパーサーの名前を定義します。 デフォルトでは XML に設定されます。
messageDomainProperty
入力メッセージを構文解析するのに必要なメッセージ・パーサーの名前を定義するオプションの属性。サポートされる値は、MQInput ノードによってサポートされるものと同じです。 (MQInput ノードの詳細については、MQInput ノードを参照してだくさい。
messageSetProperty
MRM パーサーが messageDomainProperty 属性によって指定された場合にのみ、 Message Set フィールドのメッセージ・セットの ID (またはメッセージ・セット名) を定義するオプションの属性。
messageTypeProperty
MRM パーサーが messageDomainProperty 属性によって指定された場合にのみ、 MessageType フィールドのメッセージの ID を定義するオプションの属性。
messageFormatProperty
MRM パーサーが messageDomainProperty 属性によって指定された場合にのみ、 Message Format フィールドのメッセージの形式を定義するオプションの属性。
常に既知の構造のデータで始まるユーザー定義ノードを作成している場合は、 特定のパーサーがデータの開始を処理するようにすることができます。 たとえば、MQInputNode は WebSphere MQ キューからのみデータを読み取るので、 このデータには常に先頭に MQMD が付いており、 MQInputNode は firstParserClassName を MQHMD に設定しています。 ユーザー定義ノードが常に、特定のパーサー、つまり "MYPARSER"、で構文解析できる構造で始まるデータを処理する場合は、 次のようにして firstParserClassName を MYPARSER に設定します。
  1. インプリメンテーション関数を宣言します。
    CciFactory LilFactoryExportPrefix * LilFactoryExportSuffix bipGetMessageflowNodeFactory()
    {
      ....
       CciFactory*     factoryObject;
      ....
        factoryObject = cniCreateNodeFactory(0, (unsigned short *)constPluginNodeFactory);
      ...
        vftable.iFpCreateNodeContext = _createNodeContext;
        vftable.iFpSetAttribute      = _setAttribute;
        vftable.iFpGetAttribute      = _getAttribute;
      ...  
        cniDefineNodeClass(&rc, factoryObject, (CciChar*)constSwitchNode, &vftable);
      ...
        return(factoryObject);
    }
  2. cniCreateNodeContext インプリメンテーション関数で属性を設定します。
    CciContext* _createNodeContext(
        CciFactory* factoryObject,
        CciChar*    nodeName,
        CciNode*    nodeObject
    ){
        NODE_CONTEXT_ST* p;
      ...
    
            /* Allocate a pointer to the local context */
            p = (NODE_CONTEXT_ST *)malloc(sizeof(NODE_CONTEXT_ST));
            /* Create attributes and set default values */
        {
                const CciChar* ucsAttrName  = CciString("firstParserClassName", BIP_DEF_COMP_CCSID) ;
                const CciChar* ucsAttrValue = CciString("MYPARSER", BIP_DEF_COMP_CCSID) ;
                insAttrTblEntry(p, (CciChar*)ucsAttrName, CNI_TYPE_INTEGER);
                /*see sample BipSampPluginNode.c for implementation of insAttrTblEntry*/
    
                _setAttribute(p, (CciChar*)ucsAttrName, (CciChar*)ucsAttrValue);
                free((void *)ucsAttrName) ;
                free((void *)ucsAttrValue) ;
        }

ノードのインスタンスの削除

メッセージ・フローが再デプロイされたり、 実行グループの処理が (mqsistop コマンドを使用して) 停止されたりすると、 ノードは破棄されます。 ノードが破棄されるとき、 使用されていたメモリーおよび保持されていたリソースがあれば、 それらは開放される必要があります。 これは、cniDeleteNodeContext 関数を使用して行います。 たとえば、以下のようになります。

void _deleteNodeContext(
    CciContext* context
){
    static char* functionName = (char *)"_deleteNodeContext()";

    return;
}

関連概念
ユーザー定義拡張機能の計画
ランタイム環境でのユーザー定義拡張機能
ユーザー定義拡張機能の設計
ユーザー定義の入力ノード

関連タスク
ユーザー定義拡張機能の開発
提供されているサンプルのインプリメント
C ユーザー定義拡張機能のコンパイル

関連資料
MQInput ノード
C ユーザー定義ノード API