Creating a message processing node in C

Before you start

A loadable implementation library, or a LIL, is the implementation module for a C node (or parser). A LIL is implemented as a dynamic link library (DLL). It does not have the file extension .dll but .lil.

The implementation functions that have to be written by the developer are listed in C language node implementation functions. The utility functions that are provided by WebSphere Business Integration Message Broker to aid this process are listed in C language node utility functions.

WebSphere Business Integration Message Broker provides the source for two sample user-defined nodes call SwitchNode and TransformNode. You can use these nodes in their current state, or you can modify them.

Conceptually, a message processing node is used to process a message in some way, and an output node is used to output a message as a bitstream. However, when you code a message processing node or an output node, they are essentially the same thing. You can perform message processing within in output node, and likewise you can output a message to a bitstream using a message processing node. For simplicity, this topic mainly refers to the node as a message processing node, however, it discusses the functionality of both types of node.

Accessing message data

In many cases, the user-defined node needs to access the contents of the message received on its input terminal. The message is represented as a tree of syntax elements. Groups of utility functions are provided for message management, message buffer access, syntax element navigation, and syntax element access. (See C language node utility functions for details of the utility functions.)

The types of query you are likely to want to perform include:
  • Obtaining the root element of the required message object
  • Navigate or query the tree by asking for child or sibling elements by name
  • Getting the type of the element
  • Getting the value of the element

For example, to query the name and type of the first child of body:

void cniEvaluate( ...               
){                                    
  ...
/* Navigate to the target element */ 
  rootElement = cniRootElement(&rc, message);
  bodyElement = cniLastChild(&rc, rootElement);
  bodyFirstChild = cniFirstChild(&rc, bodyElement);

/* Query the name and value of the target element */
  cniElementName(&rc, bodyFirstChild, (CciChar*)&elementname, sizeof(elementName)); 
  bytes = cniElementCharacterValue(
		&rc, bodyfirstChild, (CciChar*)&eValue, sizeof(eValue));
  ...    
}

Transforming a message object

The received input message is read-only, so before a message can be transformed, you must write it to a new output message using the cniCreateMessage function. You can copy elements from the input message, or you can create new elements and attach them to the message. New elements are generally in a parser's domain.

For example:
  1. To write the incoming message to a new message:
    {
      ...
      context = cniGetMessageContext(&rc, message)); 
      outMsg = cniCreateMessage(&rc, context)); 
      ...
    }
  2. To modify the value of a target element:
      cniSetElementIntegerValue(&rc, targetElement, L"newValue", 8); 
  3. After finalizing and propagating the message, you must delete the output message using the cniDeleteMessage function:
     cniDeleteMessage(&rc, outMsg);

Accessing ESQL

Nodes can invoke ESQL expressions using Compute node ESQL syntax. You can create and modify the components of the message using ESQL expressions, and you can refer to elements of both the input message and data from an external database using the cniSqlCreateStatement, cniSqlSelect, cniSqlDeleteStatement and cniSqlExecute functions.

For example, to populate the Result element from the contents of a column in a database table:

{
  ...
  sqlExpr = cniSqlCreateStatement(&rc, 
   (NODE_CONTEXT_ST *)context->nodeObject,
   L"DB", CCI_SQL_TRANSACTION_AUTO,
   L"SET OutputRoot.XML.Result[] = (SELECT T.C1 AS Col1 FROM Database.TABLE AS T;");
  ...
  cniSqlSelect(&rc, sqlExpr, destinationList, exceptionList, message, outMsg);
  cniSqlDeleteStatement(&rc, sqlExpr);
  ...                                                               
}

For more information about ESQL, see ESQL.

Propagating a message

Before you propagate a message, you have to decide what message flow data you want to propagate, and what terminal is to receive the data.
  1. If the message has changed, you should finalize the message before you propagate it using the cniFinalize function. For example:
          cniFinalize(&rc, outMsg, CCI_FINALIZE_NONE);
  2. The terminalObject is derived from a list that the user-defined node maintains itself. To propagate the message to the output terminal, use the cniPropagate function:
      if (terminalObject) {
        if (cniIsTerminalAttached(&rc, terminalObject)) {
          if (rc == CCI_SUCCESS) {
            cniPropagate(&rc, terminalObject, destinationList, exceptionList, outMsg);
          }
        }
  3. If you created a new output message using cniCreateMessage, after propagating the message, you must delete the output message using the cniDeleteMessage function:
     cniDeleteMessage(&rc, outMsg);

Writing to an output device

A transformed message needs to be serialized to a bitstream. The bitstream can then be accessed and written to an output device. You write the message to a bitstream using the cniWriteBuffer function. For example:
{
  ...
  cniWriteBuffer(&rc, message);
  writeToDevice(cniBufferPointer(&rc, message), cniBufferSize(&rc, message));
  ...                                                               
}
A message can be serialized only once.
Note: You must use the supplied MQOutput node when writing to WebSphere MQ queues, because the broker internally maintains a WebSphere MQ connection and open queue handles on a thread-by-thread basis, and these are cached to optimize performance. In addition, the broker handles recovery scenarios when certain WebSphere MQ events occur, and this would be adversely affected if WebSphere MQ MQI calls were used in a user-defined output node.

Declaring your node to the broker

The following procedure shows you how to declare your node to the broker:

  1. The initialization function, bipGetMessageflowNodeFactory, is called by the broker after the LIL has been loaded and initialized by the operating system. This is called from the broker configuration thread. The broker calls this function to understand what your LIL is able to do and how the broker should call the LIL. For example:
    CciFactory LilFactoryExportPrefix * LilFactoryExportSuffix
    bipGetMessageflowNodeFactory()
  2. The bipGetMessageflowNodeFactory function then calls the utility function cniCreateNodeFactory. This function passes back a factory name (or group name) for all the nodes that your LIL supports. For example:
    {
    	CciFactory* factoryObject;
    	int rc = 0;
    	CciChar factoryName[] = L"SwitchNodeFactory";
    	CCI_EXCEPTION_ST exception_st;
    
    	/* Create the Node Factory for this node */
    	factoryObject = cniCreateNodeFactory(0, factoryName);
    	if (factoryObject == CCI_NULL_ADDR) {
    		if (rc == CCI_EXCEPTION) {
    			/* Get details of the exception */
    			cciGetLastExceptionData(&rc, &exception_st);
    
    			/* Any local error handling can go here */
    
    			/* Rethrow the exception */
    			cciRethrowLastException(&rc);
    		}
    
    		/* Any further local error handling can go here */
    	}
    	else {
    		/* Define the nodes supported by this factory */
    		defineSwitchNode(factoryObject);
    	}
    
    	/* Return address of this factory object to the broker */
    	return(factoryObject);
    }

Defining the node as a message processing node

The LIL should then call the utility function cniDefineNodeClass to pass the name of each node and a virtual function table of the addresses of the implementation functions. For example, to define a single node called SwitchNode, and its function table:
void defineSwitchNode(void* factoryObject){
	static CNI_VFT vftable = {CNI_VFT_DEFAULT};

	/* Setup function table with pointers to node implementation functions */
	vftable.iFpCreateNodeContext = _createNodeContext;
	vftable.iFpDeleteNodeContext = _deleteNodeContext;
	vftable.iFpGetAttributeName = _getAttributeName;
	vftable.iFpSetAttribute = _setAttribute;
	vftable.iFpGetAttribute = _getAttribute;
	vftable.iFpEvaluate = _evaluate;

	cniDefineNodeClass(0, factoryObject, L"SwitchNode", &vftable);

	return;
}
This is called from the configuration thread.

A user-defined node identifies itself as providing the capability of a message processing or output node by implementing the cniEvaluate function. User-defined nodes have to either implement a cniEvaluate or a cniRun implementation function, or both, otherwise the broker does not load the user-defined node, and the cniDefineNodeClass utility function fails, returning CCI_MISSING_IMPL_FUNCTION.

When a message flow containing a user-defined message processing node is deployed successfully, the node's cniEvaluate function is called for each message passed through the message flow.

Message flow data is received at the input terminal of the node, that is, the message, global environment, local environment, and exception list.

For example:
void cniEvaluate(                
  CciContext* context,                
  CciMessage* destinationList,        
  CciMessage* exceptionList,          
  CciMessage* message                 
){                                    
  ...
}

Creating an instance of the node

The following procedure shows you how to instantiate your node:

  1. When the broker has received the table of function pointers, it calls the function cniCreateNodeContext for each instantiation of the user-defined node. If you have three message flows that are using your user-defined node, your cniCreateNodeContext function is called for each of them. This function should allocate memory for that instantiation of the user-defined node to hold the values for the configured attributes. For example:
    1. The user function cniCreateNodeContext is called:
      CciContext* _Switch_createNodeContext(
        CciFactory* factoryObject,
        CciChar*    nodeName,
        CciNode*    nodeObject
      ){
        static char* functionName = (char *)"_Switch_createNodeContext()";
        NODE_CONTEXT_ST* p;
        CciChar          buffer[256];
      
      
    2. Allocate a pointer to the local context and clear the context area:
        p = (NODE_CONTEXT_ST *)malloc(sizeof(NODE_CONTEXT_ST));
      
        if (p) {
           memset(p, 0, sizeof(NODE_CONTEXT_ST));
    3. Save the node object pointer in the context:
         p->nodeObject = nodeObject;
    4. Save the node name:
       CciCharNCpy((CciChar*)&p->nodeName, nodeName, MAX_NODE_NAME_LEN);
  2. The broker calls the appropriate utility functions to find out about the node's input terminals and output terminals. A node has a number of input terminals and output terminals associated with it. Within the user function cniCreateNodeContext, calls should be made to cniCreateInputTerminal and cniCreateOutputTerminal to define the user node's terminals. These functions must be invoked within the cniCreateNodeContext implementation function. For example, to define a node with one input terminal and two output terminals:
        {
          const CciChar* ucsIn = CciString("in", BIP_DEF_COMP_CCSID) ;
          insInputTerminalListEntry(p, (CciChar*)ucsIn);
          free((void *)ucsIn) ;
        }
        {
          const CciChar* ucsOut = CciString("out", BIP_DEF_COMP_CCSID) ;
          insOutputTerminalListEntry(p, (CciChar*)ucsOut);
          free((void *)ucsOut) ;
        }
        {
          const CciChar* ucsFailure = CciString("failure", BIP_DEF_COMP_CCSID) ;
          insOutputTerminalListEntry(p, (CciChar*)ucsFailure);
          free((void *)ucsFailure) ;
        }

Setting attributes

Attributes are set whenever you start the broker, or when you redeploy a message flow with new values. Attributes are set by the broker calling user code on the configuration thread. The user code needs to store these attributes in its node context area, for use when processing messages later.

Following the creation of input and output terminals, the broker calls the cniSetAttribute function to pass the values for the configured attributes for this instantiation of the user-defined node. For example:
    {
      const CciChar* ucsAttr = CciString("nodeTraceSetting", BIP_DEF_COMP_CCSID) ;
      insAttrTblEntry(p, (CciChar*)ucsAttr, CNI_TYPE_INTEGER);
      _setAttribute(p, (CciChar*)ucsAttr, (CciChar*)constZero);
      free((void *)ucsAttr) ;
    }
    {
      const CciChar* ucsAttr = CciString("nodeTraceOutfile", BIP_DEF_COMP_CCSID) ;
      insAttrTblEntry(p, (CciChar*)ucsAttr, CNI_TYPE_STRING);
      _setAttribute(p, (CciChar*)ucsAttr, (CciChar*)constSwitchTraceLocation);
      free((void *)ucsAttr) ;
    }

Implementing the node functionality

When the broker retrieves a message from the queue and that message arrives at the input terminal of your user-defined message processing or output node, the broker calls the implementation function cniEvaluate. This function is called on the message processing thread and it should decide what to do with the message. This function might be called on multiple threads, especially if additional instances are used.

Deleting an instance of the message processing node

To delete an instance of a node, you use the cniDeleteNodeContext function. For example:

void _deleteNodeContext(
  CciContext* context
){
  static char* functionName = (char *)"_deleteNodeContext()";

  return;
}

The cniDeleteNodeContext function is provided by the user, and is called by the broker when a message flow is deleted.

Related reference
C user-defined node API