Creating an input node in C

Before you start

Ensure that you have read and understood the following topics:

A loadable implementation library, or a LIL, is the implementation module for a C node (or parser). A LIL is implemented as a dynamic link library (DLL). It does not have the file extension .dll but .lil.

The implementation functions that have to be written by the developer are listed in C language node implementation functions. The utility functions that are provided by WebSphere Business Integration Message Broker to aid this process are listed in C language node utility functions.

WebSphere Business Integration Message Broker provides the source for two sample user-defined nodes call SwitchNode and TransformNode. You can use these nodes in their current state, or you can modify them.

Receiving external data into a buffer

An input node can receive data from any type of external source, such as a file system or FTP connections, as long as the output from the node is in the correct format. For connections to queues or databases, you should use the IBM primitive nodes and the API calls supplied, principally because the primitive nodes are already set up for error handling. Do not use the mqget or mqput commands for direct access to database tables.

You must provide an input buffer (or bitstream) to contain input data, and associate it with a message object. In the C API, the buffer is attached to the CciMessage object representing the input message by using the cniSetInputBuffer utility function. For example:
{
  static char* functionName = (char *)"_Input_run()";
  void*        buffer;
  CciTerminal* terminalObject;
  int          buflen = 4096;
  int          rc = CCI_SUCCESS;
  int          rcDispatch = CCI_SUCCESS;
  char         xmlData[] = "<A>data</A>";

  buffer = malloc(buflen);
  memcpy(buffer, &xmlData, sizeof(xmlData));
  cniSetInputBuffer(&rc, message, buffer, buflen);
}
/*propagate etc*/

free(buffer);
The above example illustrates an area of memory being allocated (buffer = malloc(buflen);). When programming in C, whenever you allocate memory you must free it when you no longer need it. The broker might attempt to access this memory at any time whilst the message is being propagated through the flow, so you should free the memory only after calling cniPropagate on the same CciMessage.

Controlling threading and transactions

An input node has a responsibility to perform appropriate end of message processing when a message has been propagated through a message flow. Specifically, the input node needs to cause any transactions to be committed or rolled back, and return threads to the thread pool.

Each message flow thread is allocated from a pool of threads maintained for each message flow, and starts execution in the cniRun function. You determine the behavior of a thread using the cniDispatchThread utility function together with the appropriate return value.

The term transaction is used generically here to describe either a globally coordinated transaction or a broker controlled transaction. Globally coordinated transactions are coordinated by either WebSphere MQ as an XA compliant Transaction Manager, or Resource Recovery Service (RRS) on z/OS . WebSphere Business Integration Message Broker controls transactions by committing (or rolling back) any database resources and then committing any WebSphere MQ units of work, however, if a user-defined node is used, any resource updates cannot be automatically committed by the broker. The user-defined node uses return values to indicate whether a transaction has been successful, and to control whether transactions are committed or rolled-back. Any unhandled exceptions are caught by the broker infrastructure, and the transaction is rolled back.

The following table describes each of the supported return values, the affect each one has on any transactions, and what the broker does with the current thread.

Return value Affect on transaction Broker action on the thread
CCI_SUCCESS_CONTINUE committed Calls the same thread again in the cniRun function.
CCI_SUCCESS_RETURN committed Returns the thread to the thread pool.
CCI_FAILURE_CONTINUE rolled back Calls the same thread again in the cniRun function.
CCI_FAILURE_RETURN rolled back Returns the thread to the thread pool.
CCI_TIMEOUT Not applicable. The function periodically times out while waiting for an input message. Calls the same thread again in the cniRun function.
The following is an example of using the SUCCESS_RETURN return code with the cniDispatchThread function:
{
  ...
  cniDispatchThread(&rcDispatch, ((NODE_CONTEXT_ST *)context)->nodeObject);
  ...
  if (rcDispatch == CCI_NO_THREADS_AVAILABLE) return CCI_SUCCESS_CONTINUE;  
  else return CCI_SUCCESS_RETURN;
}     

Propagating the message

Before you propagate a message, you have to decide what message flow data you want to propagate, and what terminal is to receive the data.

The terminalObject is derived from a list that the user-defined node maintains itself.

For example, to propagate the message to the output terminal, you use the cniPropagate function:
  if (terminalObject) {
    if (cniIsTerminalAttached(&rc, terminalObject)) {
      if (rc == CCI_SUCCESS) {
        cniPropagate(&rc, terminalObject, destinationList, exceptionList, message);
      }
    }

In the above example, the cniIsTerminalAttached function is used to test whether the message can be propagated to the specified terminal. If you do not use the cniIsTerminalAttached function, and the terminal is not attached to another node by a connector, the message is not propagated. If you do use this function, you can modify the node's behavior when a terminal is not connected.

Initializing the node

The following procedure shows you how to initialize your node:

  1. The initialization function, bipGetMessageflowNodeFactory, is called by the broker after the LIL has been loaded and initialized by the operating system. The broker calls this function to understand what your LIL is able to do and how the broker should call the LIL. For example:
    CciFactory LilFactoryExportPrefix * LilFactoryExportSuffix
    bipGetMessageflowNodeFactory()
  2. The bipGetMessageflowNodeFactory function then calls the utility function cniCreateNodeFactory. This function passes back a factory name (or group name) for all the nodes that your LIL supports. For example:
    {
    	CciFactory* factoryObject;
    	int rc = 0;
    	CciChar factoryName[] = L"SwitchNodeFactory";
    	CCI_EXCEPTION_ST exception_st;
    
    	/* Create the Node Factory for this node */
    	factoryObject = cniCreateNodeFactory(0, factoryName);
    	if (factoryObject == CCI_NULL_ADDR) {
    		if (rc == CCI_EXCEPTION) {
    			/* Get details of the exception */
    			cciGetLastExceptionData(&rc, &exception_st);
    
    			/* Any local error handling can go here */
    
    			/* Rethrow the exception */
    			cciRethrowLastException(&rc);
    		}
    
    		/* Any further local error handling can go here */
    	}
    	else {
    		/* Define the nodes supported by this factory */
    		defineSwitchNode(factoryObject);
    	}
    
    	/* Return address of this factory object to the broker */
    	return(factoryObject);
    }
  3. The LIL should then call the utility function cniDefineNodeClass to pass the name of each node, and a virtual function table of the addresses of the implementation functions. For example, to define a single node called InputxNode and its function table:
    void defineInputxNode(void* factoryObject){
    	static CNI_VFT vftable = {CNI_VFT_DEFAULT};
    
    	/* Setup function table with pointers to node implementation functions */
    	vftable.iFpCreateNodeContext = _createNodeContext;
    	vftable.iFpDeleteNodeContext = _deleteNodeContext;
    	vftable.iFpGetAttributeName = _getAttributeName;
    	vftable.iFpSetAttribute = _setAttribute;
    	vftable.iFpGetAttribute = _getAttribute;
    	vftable.iFpRun = _run;
    
    	cniDefineNodeClass(0, factoryObject, L"InputxNode", &vftable);
    
    	return;
    }

Defining the node as an input node

A user-defined node identifies itself as providing the capability of an input node by implementing the cniRun implementation function. User-defined input nodes have to implement a cniRun function, otherwise the broker does not load the user-defined node, and the cniDefineNodeClass utility function fails, returning CCI_MISSING_IMPL_FUNCTION. When a message flow containing a user-defined input node is deployed successfully, the broker calls the node's cniRun implementation function at regular intervals.

For example:
int cniRun(                       
  CciContext* context,                
  CciMessage* destinationList,        
  CciMessage* exceptionList,          
  CciMessage* message
){          
  ...
  /* Get data from external source */
  return CCI_SUCCESS_CONTINUE;                                    
}
The return value should be used to return control periodically to the broker.

When a message flow containing a user-defined input node is deployed successfully, the node's cniRun function is called for each message passed through the message flow.

Input nodes can also implement cniEvaluate, however this is not recommended.

Creating an instance of the node

The following procedure shows you how to instantiate your node:

  1. When the broker has received the table of function pointers, it calls the function cniCreateNodeContext for each instantiation of the user-defined node. If you have three message flows that are using your user-defined node, your cniCreateNodeContext function is called for each of them. This function should allocate memory for that instantiation of the user-defined node to hold the values for the configured attributes. For example:
    1. Call the cniCreateNodeContext function:
      CciContext* _Switch_createNodeContext(
        CciFactory* factoryObject,
        CciChar*    nodeName,
        CciNode*    nodeObject
      ){
        static char* functionName = (char *)"_Switch_createNodeContext()";
        NODE_CONTEXT_ST* p;
        CciChar          buffer[256];
      
    2. Allocate a pointer to the local context and clear the context area:
        p = (NODE_CONTEXT_ST *)malloc(sizeof(NODE_CONTEXT_ST));
      
        if (p) {
           memset(p, 0, sizeof(NODE_CONTEXT_ST));
    3. Save the node object pointer in the context:
         p->nodeObject = nodeObject;
    4. Save the node name:
       CciCharNCpy((CciChar*)&p->nodeName, nodeName, MAX_NODE_NAME_LEN);
  2. An input node has a number of output terminals associated with it, but does not typically have any input terminals. Use the utility function cniCreateOutputTerminal to add output terminals to an input node when the node is instantiated. These functions must be invoked within the cniCreateNodeContext implementation function. For example, to define an input node with three output terminals:
       {
          const CciChar* ucsOut = CciString("out", BIP_DEF_COMP_CCSID) ;
          insOutputTerminalListEntry(p, (CciChar*)ucsOut);
          free((void *)ucsOut) ;
        }
        {
          const CciChar* ucsFailure = CciString("failure", BIP_DEF_COMP_CCSID) ;
          insOutputTerminalListEntry(p, (CciChar*)ucsFailure);
          free((void *)ucsFailure) ;
        }    
        {
          const CciChar* ucsCatch = CciString("catch", BIP_DEF_COMP_CCSID) ;
          insOutputTerminalListEntry(p, (CciChar*)ucsCatch);
          free((void *)ucsCatch) ;    }

Setting attributes

Attributes are set whenever you start the broker, or when you redeploy the message flow with new values.

Following the creation of output terminals, the broker calls the cniSetAttribute function to pass the values for the configured attributes of the user-defined node. For example:
    {
      const CciChar* ucsAttr = CciString("nodeTraceSetting", BIP_DEF_COMP_CCSID) ;
      insAttrTblEntry(p, (CciChar*)ucsAttr, CNI_TYPE_INTEGER);
      _setAttribute(p, (CciChar*)ucsAttr, (CciChar*)constZero);
      free((void *)ucsAttr) ;
    }
    {
      const CciChar* ucsAttr = CciString("nodeTraceOutfile", BIP_DEF_COMP_CCSID) ;
      insAttrTblEntry(p, (CciChar*)ucsAttr, CNI_TYPE_STRING);
      _setAttribute(p, (CciChar*)ucsAttr, (CciChar*)constSwitchTraceLocation);
      free((void *)ucsAttr) ;
    }

Implementing the node functionality

When the broker knows that it has an input node, it calls the cniRun function of this node at regular intervals. The cniRun function must then decide what course of action it should take. If data is available for processing, the cniRun function should call cniDispatchThread and process the message, or return with CCI_TIMEOUT so that the broker can continue to process other messages on other threads. If a thread is not dispatched, the broker spends all of its time within this thread, which stops it from doing anything else.

For example, to configure the node to call cniDispatchThread and process the message, or return with CCI_TIMEOUT:
If ( anything to do )
	CniDispatchThread;

   /* do the work */

	If ( work done O.K.)
		Return CCI_SUCCESS_CONTINUE;
	Else
		Return CCI_FAILURE_CONTINUE;
Else
  Return CCI_TIMEOUT;

Overriding the default message parser attributes (optional)

An input node implementation normally determines what message parser initially parses an input message. For example, the primitive MQInput node dictates that an MQMD parser is required to parse the MQMD header. A user-defined input node can select an appropriate header or message parser, and the mode in which the parsing is controlled, by using the following attributes that are included as default, which you can override:

rootParserClassName
Defines the name of the root parser that parses message formats supported by the user-defined input node. It defaults to GenericRoot, a supplied root parser that causes the broker to allocate and chain parsers together. It is unlikely that a node would need to modify this attribute value.
firstParserClassName
Defines the name of the first parser, in what might be a chain of parsers that are responsible for parsing the bitstream. It defaults to XML.
messageDomainProperty
An optional attribute that defines the name of the message parser required to parse the input message. The supported values are the same as those supported by the MQInput node. (See MQInput node for more information about the MQInput node.)
messageSetProperty
An optional attribute that defines the message set identifier, or the message set name, in the Message Set field, only if the MRM parser was specified by the messageDomainProperty attribute.
messageTypeProperty
An optional attribute that defines the identifier of the message in the MessageType field, only if the MRM parser was specified by the messageDomainProperty attribute.
messageFormatProperty
An optional attribute that defines the format of the message in the Message Format field, only if the MRM parser was specified by the messageDomainProperty attribute.
If you have written a user-defined input node that always begins with data of a known structure, you can ensure that a certain parser deals with the start of the data. For example, the MQInputNode only reads data from WebSphere MQ queues, so this data always has an MQMD at the beginning, and the MQInputNode sets firstParserClassName to MQHMD. If your user-defined node always deals with data that begins with a structure that can be parsed by a certain parser, say "MYPARSER", you set firstParserClassName to MYPARSER as follows:
  1. Declare the implementation functions:
    CciFactory LilFactoryExportPrefix * LilFactoryExportSuffix bipGetMessageflowNodeFactory()
    {
      ....
      CciFactory*      factoryObject;
      ....
      factoryObject = cniCreateNodeFactory(0, (unsigned short *)constPluginNodeFactory);
      ...
      vftable.iFpCreateNodeContext = _createNodeContext;
      vftable.iFpSetAttribute      = _setAttribute;
      vftable.iFpGetAttribute      = _getAttribute;
      ...  
      cniDefineNodeClass(&rc, factoryObject, (CciChar*)constSwitchNode, &vftable);
      ...
      return(factoryObject);
    }
  2. Set the attribute in the cniCreateNodeContext implementation function:
    CciContext* _createNodeContext(
      CciFactory* factoryObject,
      CciChar*    nodeName,
      CciNode*    nodeObject
    ){
      NODE_CONTEXT_ST* p;
      ...
    
        /* Allocate a pointer to the local context */
        p = (NODE_CONTEXT_ST *)malloc(sizeof(NODE_CONTEXT_ST));
        /* Create attributes and set default values */
        {
          const CciChar* ucsAttrName  = CciString("firstParserClassName", BIP_DEF_COMP_CCSID) ;
          const CciChar* ucsAttrValue = CciString("MYPARSER", BIP_DEF_COMP_CCSID) ;
          insAttrTblEntry(p, (CciChar*)ucsAttrName, CNI_TYPE_INTEGER);
          /*see sample BipSampPluginNode.c for implementation of insAttrTblEntry*/
    
          _setAttribute(p, (CciChar*)ucsAttrName, (CciChar*)ucsAttrValue);
          free((void *)ucsAttrName) ;
          free((void *)ucsAttrValue) ;
        }

Deleting an instance of the node

Nodes are destroyed when a message flow is redeployed, or when the execution group process is stopped (using the mqsistop command). When a node is destroyed, it should free any used memory and release any held resources. You do this using the cniDeleteNodeContext function. For example:

void _deleteNodeContext(
  CciContext* context
){
  static char* functionName = (char *)"_deleteNodeContext()";

  return;
}

Related concepts
Planning user-defined extensions
User-defined extensions in the runtime environment
Designing user-defined extensions
User-defined Input nodes

Related tasks
Developing user-defined extensions
Implementing the provided samples
Compiling a C user-defined extension

Related reference
MQInput node
C user-defined node API