In the Java codebase, each queue has its own set of rules. A solution can extend the behavior of these rules. All queue rules should descend from class com.ibm.mqe.MQeQueueRule.
In the C codebase, only a single set of rules is loaded. A user can implement different rules for different queues by loading other rules modules from the 'master' module. The master rules functions can then invoke the corresponding functions in any other modules as required.
Queue rules are called when:
This section describes some examples of the use of queue rules.
The first example shows a possible use of the message expired rule. Both queues and messages can have an expiry interval set. If this interval is exceeded, the message is flagged as being expired. At this point the messageExpired() rule is called. On return from this rule, the expired message is deleted.
In the following example, a copy of the message is put onto a Dead Letter Queue.
/* This rule puts a copy of any expired messages to a Dead Letter Queue */ public boolean messageExpired( MQeFields entry, MQeMsgObject msg ) throws Exception { /* Get the reference to the Queue Manager */ MQeQueueManager qmgr = MQeQueueManager.getReference( ((MQeQueueProxy)owner).getQueueManagerName()); /* need to set re-send flag so that put of message to new queue isn't rejected */ msg.putBoolean( MQe.Msg_Resend, true ); /* if the message contains an expiry interval field - remove it */ if ( msg.contains( MQe.Msg_ExpireTime ) { msg.delete( MQe.Msg_ExpireTime ); } /* put message onto dead letter queue */ qmgr.putMessage( null, MQe.DeadLetter_Queue_Name, msg, null, 0 ); /* Return true. Note that no use is made of this return value - the message is always deleted but the return value is kept for backward compatibility */ return (true); }
MQEVOID myRules_messageExpired( MQeRulesMessageExpired_in_ * pInput, MQeRulesMessageExpired_out_ * pOutput) { MQERETURN rc = MQERETURN_OK; MQeExceptBlock * pExceptBlock = (MQeExceptBlock *)(pOutput->pExceptBlock); MQEBOOL contains = MQE_FALSE; MQeFieldsHndl hMsg; MQeQueueManagerHndl hQueueManager; SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock); /* Set re-send flag so that attempt to put message to new queue isn't rejected */ // First, clone the message as the //input parameter is read-only rc = mqeFields_clone(pInput->hMsg, pExceptBlock, &hMsg); if(MQERETURN_OK == rc) { rc = mqeFields_putBoolean(hMsg, pExceptBlock, MQE_MSG_RESEND, MQE_TRUE); if(MQERETURN_OK == rc) { // if the message contains an expiry interval field - remove it rc = mqeFields_contains(hMsg, pExceptBlock, &contains, MQE_MSG_EXPIRETIME); if(MQERETURN_OK == rc && contains) { rc = mqeFields_delete(hMsg, pExceptBlock, MQE_MSG_EXPIRETIME); } if(MQERETURN_OK == rc) { // put message onto dead letter queue MQeStringHndl hQueueManagerName; rc = mqeQueueManager_getCurrentQueueManager(pExceptBlock, &hQueueManager); if(MQERETURN_OK == rc) { rc = mqeQueueManager_getName(hQueueManager, pExceptBlock, &hQueueManagerName); if(MQERETURN_OK == rc) { // use a temporary exception block as don't care // if dead letter queue does not exist MQeExceptBlock tempExceptBlock; SET_EXCEPT_BLOCK_TO_DEFAULT(&tempExceptBlock); rc = mqeQueueManager_putMessage( hQueueManager, &tempExceptBlock, hQueueManagerName, MQE_DEADLETTER_QUEUE_NAME, hMsg, NULL, 0 ); (MQEVOID)mqeString_free(hQueueManagerName, &tempExceptBlock); } } } } } }
The previous example sends any expired messages to the queue manager's dead-letter queue, the name of which is defined by the constant MQe.DeadLetter_Queue_Name in the Java codebase and MQE_DEADLETTER_QUEUE_NAME in the C codebase. It is worth noting that the queue manager rejects a put of a message that has previously been put onto another queue. This protects against a duplicate message being introduced into the Websphere MQ Everyplace network. So, before moving the message to the dead-letter queue, the rule must set the resend flag. This is done by adding the Java MQe.Msg_Resend or C MQE_MSG_RESEND field to the message.
The message expiry time field must be deleted before moving the message to the dead-letter queue. The following example shows how to log an event that occurs on the queue.
In the Java example, the event that occurs is the creation of a message listener, in the C example, it is a put message request. However, the principle is the same in both cases:
/* This rule logs the activation of the queue */ public void queueActivate() { try { logFile = new LogToDiskFile( \\log.txt ); log( MQe_Log_Information, Event_Activate, "Queue " + ((MQeQueueProxy)owner).getQueueManagerName() + " + " + ((MQeQueueProxy)owner).getQueueName() + " active" ); } catch( Exception e ) { e.printStackTrace( System.err ); } } /* This rule logs the closure of the queue */ public void queueClose() { try { log( MQe_Log_Information, Event_Closed, "Queue " + ((MQeQueueProxy)owner).getQueueManagerName() + " + " + ((MQeQueueProxy)owner).getQueueName() + " closed" ); /* close log file */ logFile.close(); } catch ( Exception e ) { e.printStackTrace( System.err ); } }
The addListener rule is shown in the following code. It uses the MQe.log method to add an Event_Queue_AddMsgListener event.
/* This rule logs the addition of a message listener */ public void addListener( MQeMessageListenerInterface listener, MQeFields filter ) throws Exception { log( MQe_Log_Information, Event_Queue_AddMsgListener, "Added listener on queue " + ((MQeQueueProxy)owner).getQueueManagerName() + "+" + ((MQeQueueProxy)owner).getQueueName() ); }
struct myRulesData_ { // rules instance structure MQeAdministratorHndl hAdmin; / administrator handle to carry around between // rules functions FILE * ifp; // file handle for logging rules }; typedef struct myRulesData_ myRules;
In the rules queue activate function, the file is opened and the activation of the queue logged:
MQEVOID myRules_activateQueue(MQeRulesActivateQueue_in_ * pInput, MQeRulesActivateQueue_out_ * pOutput) { MQERETURN rc = MQERETURN_OK; MQECHAR * qName; MQEINT32 size; // recover the private data from the input structure parameter pInput myRules * myData = (myRules *)(pInput->pPrivateData); MQeExceptBlock * pExceptBlock = (MQeExceptBlock *)(pOutput->pExceptBlock); SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock); if(myData->ifp == NULL) { // initialized to NULL in the rules initialization function myData->ifp = fopen("traceFile.txt","w"); rc = mqeString_getUtf8(pInput->hQueueName, pExceptBlock, NULL, &size); if(MQERETURN_OK == rc) { qName = malloc(size); rc = mqeString_getUtf8(pInput->hQueueName, pExceptBlock, qName, &size); if(MQERETURN_OK == rc && myData->ifp != NULL) { fprintf(myData->ifp, "Activating queue %s \n", qName); } } } }
In the rules queue close function, the file is closed after the closure of the queue is logged:
MQEVOID myRules_closeQueue(MQeRulesCloseQueue_in_ * pInput, MQeRulesCloseQueue_out_ * pOutput) { MQERETURN rc = MQERETURN_OK; MQECHAR * qName; MQEINT32 size; // recover the private data from the input structure parameter pInput myRules * myData = (myRules *)(pInput->pPrivateData); MQeExceptBlock * pExceptBlock = (MQeExceptBlock *)(pOutput->pExceptBlock); SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock); if(myData->ifp != NULL) { rc = mqeString_getUtf8(pInput->hQueueName, pExceptBlock, NULL, &size); if(MQERETURN_OK == rc) { qName = malloc(size); rc = mqeString_getUtf8(pInput->hQueueName, pExceptBlock, qName, &size); if(MQERETURN_OK == rc) { fprintf(myData->ifp, "Closing queue %s \n", qName); } } fclose(myData->ifp); MyData->ifp = NULL; } }
The rules put message function ensures that each put message operation is logged:
MQERETURN myRules_putMessage(MQeRulesPutMessage_in_ * pInput, MQeRulesPutMessage_out_ * pOutput) { MQERETURN rc = MQERETURN_OK; MQECHAR * qName, * qMgrName; MQEINT32 size; // recover the private data from the input structure parameter pInput myRules * myData = (myRules *)(pInput->pPrivateData); MQeExceptBlock * pExceptBlock = (MQeExceptBlock *)(pOutput->pExceptBlock); SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock); if(myData->ifp != NULL) { rc = mqeString_getUtf8(pInput->hQueueName, pExceptBlock, NULL, &size); if(MQERETURN_OK == rc) { qName = malloc(size); rc = mqeString_getUtf8(pInput->hQueueName, pExceptBlock, qName,&size); } if(MQERETURN_OK == rc) { rc = mqeString_getUtf8(pInput->hQueue_QueueManagerName, pExceptBlock, NULL, &size); if(MQERETURN_OK == rc) { qMgrName = malloc(size); rc = mqeString_getUtf8(pInput->hQueue_QueueManagerName, pExceptBlock, qMgrName, &size); } } if(MQERETURN_OK == rc) { fprintf(myData->ifp, "Putting a message onto queue %s on queue manager %s\n",qName, qMgrName); } } /* allow the operation to proceed regardless of what went wrong in this rule */ SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock); return EC(pExceptBlock); }