The transmit() rule is only called if the triggerTransmission() rule allows transmission. It returns a value of true or MQERETURN_OK. The transmit() rule is called for every remote queue definition that holds messages awaiting transmission. This means that the rule can decide which messages should be transmitted on a queue by queue basis. The example rule below only allows message transmission from a queue if the queue has a default priority greater than 5. If a message has not been assigned a priority before being placed on a queue, it is given the queue's default priority.
public boolean transmit( MQeQueueProxy queue ) { if ( queue.getDefaultPriority() > 5 ) { return (true); } else { return (false); } }
/* The following function is mapped to the fPtrTransmitQueue function*/ /* pointer in the user's initialization /* function output parameter structure. */ MQERETURN myRules_TransmitQueue( MQeRulesTransmitQueue_in_ * pInput, MQeRulesTransmitQueue_out_ * pOutput) { MQERETURN rc = MQERETURN_OK; MQEBYTE queuePriority; MQeRemoteAsyncQParms queueParms = REMOTE_ASYNC_Q_INIT_VAL; myRules * myData = (myRules *)(pInput->pPrivateData); MQeExceptBlock * pExceptBlock = (MQeExceptBlock *)(pOutput->pExceptBlock); SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock); /* inquire upon the default priority of the queue*/ /* specify the subject of the inquire in the queue parameter structure*/ queueParms.baseParms.opFlags = QUEUE_PRIORITY_OP ; rc = mqeAdministrator_AsyncRemoteQueue_inquire(myData->hAdmin, pExceptBlock, pInput->hQueueName, pInput->hQueue_QueueManagerName, &queueParms); // if the default priority is less than 6, disallow the operation if(MQERETURN_OK == rc && queueParms.baseParms.queuePriority < 6) { SET_EXCEPT_BLOCK(pExceptBlock, MQERETURN_RULES_DISALLOWED_BY_RULE, MQEREASON_NA); } }
A sensible extension to this rule can allow all messages to be transmitted at 'off-peak' time. This allows only messages from high-priority queues to be transmitted during peak periods.
The following example assumes that the transmission of the messages takes place over a communications network that charges for the time taken for transmission. It also assumes that there is a cheap-rate period when the unit-time cost is lower. The rules block any transmission of messages until the cheap-rate period. During the cheap-rate period, the queue manager is triggered at regular intervals.
import com.ibm.mqe.*; import java.util.*; /** * Example set of queue manager rules which trigger the transmission * of any messages waiting to be sent. * * These rules only trigger the transmission of messages if the current * time is between the values defined in the variables cheapRatePeriodStart * and cheapRatePeriodEnd * (This example assumes that transmission will take place over a * communication network which charges for the time taken to transmit) */ public class ExampleQueueManagerRules extends MQeQueueManagerRule implements Runnable { // default interval between triggers is 15 seconds private static final long MILLISECS_BETWEEN_TRIGGER_TRANSMITS = 15000; // interval between which we c heck whether the queue manager is closing down. private static final long MILLISECS_BETWEEN_CLOSE_CHECKS = 1000 ; // Max wait of ten seconds to kill off the background thread when // the queue manager is closing down. private static final long MAX_WAIT_FOR_BACKGROUND_THREAD_MILLISECONDS = 10000; // Reference to the control block used to communicate with the background thread // which does a sleep-trigger-sleep-trigger loop. // Note that freeing such blocks for garbage collection will not stop the thread // to which it refers. private Thread th = null; // Flag which is set when shutdown of the background thread is required. // Volatile because the thread using the flag and the thread setting it to true // are different threads, and it is important that the flag is not held in // CPU registers, or one thread will see a different value to the other. private volatile boolean toldToStop = false; //cheap rate transmission period start and end times protected int cheapRatePeriodStart = 18; /*18:00 hrs */ protected int cheapRatePeriodEnd = 9; /*09:00 hrs */ }
The cheapRatePeriodStart and cheapRatePeriodEnd functions define the extent of this cheap rate period. In this example, the cheap-rate period is defined as being between 18:00 hours in the evening until 09:00 hours the following morning.
The constant MILLISECS_BETWEEN_TRIGGER_TRANSMITS defines the period of time, in milliseconds, between each triggering of the queue manager. In this example, the trigger interval is defined to be 15 seconds.
The triggering of the queue manager is handled by a background thread that wakes up at the end of the triggerInterval period. If the current time is inside the cheap rate period, it calls the MQeQueueManager.triggerTransmission() method to initiate an attempt to transmit all messages awaiting transmission. The background thread is created in the queueManagerActivate() rule and stopped in the queueManagerClose() rule. The queue manager calls these rules when it is activated and closed respectively.
/** * Overrides MQeQueueManagerRule.queueManagerActivate() * Starts a timer thread */ public void queueManagerActivate()throws Exception { super.queueManagerActivate(); // background thread which triggers transmission th = new Thread(this, "TriggerThread"); toldToStop = false; th.start(); // start timer thread } /** * Overrides MQeQueueManagerRule.queueManagerClose() * Stops the timer thread */ public void queueManagerClose()throws Exception { super.queueManagerClose(); // Tell the background thread to stop, as the queue manager is closing now. toldToStop = true ; // Now wait for the background thread, if it's not already stopped. if ( th != null) { try { // Only wait for a certain time before giving up and timing out. th.join( MAX_WAIT_FOR_BACKGROUND_THREAD_MILLISECONDS ); // Free up the thread control block for garbage collection. th = null ; } catch (InterruptedException e) { // Don't propogate the exception. // Assume that the thread will stop shortly anyway. } } }
The code to handle the background thread looks like this:
/** * Timer thread * Triggers queue manager every interval until thread is stopped */ public void run() { /* Do a sleep-trigger-sleep-trigger loop until the */ /* queue manager closes or we get an exception.*/ while ( !toldToStop) { try { // Count down until we've waited enough // We do a tight loop with a smaller granularity because // otherwise we would stop a queue manager from closing quickly long timeToWait = MILLISECS_BETWEEN_TRIGGER_TRANSMITS ; while( timeToWait > 0 && !toldToStop ) { // sleep for specified interval Thread.sleep( MILLISECS_BETWEEN_CLOSE_CHECKS ); // We've waited for some time. Account for this in the overall wait. timeToWait -= MILLISECS_BETWEEN_CLOSE_CHECKS ; } if( !toldToStop && timeToTransmit()) { // trigger transmission on QMgr (which is rule owner) ((MQeQueueManager)owner).triggerTransmission(); } } catch ( Exception e ) { e.printStackTrace(); } } } }
The variable owner is defined by the class MQeRule, which is the ancestor of MQeQueueManagerRule. As part of its startup process, the queue manager activates the queue manager rules and passes a reference to itself to the rules object. This reference is stored in the variable owner.
The thread loops indefinitely, as it is stopped by the queueManagerClose() rule, and it sleeps until the end of the MILLISECS_BETWEEN_TRIGGER_TRANSMITS interval period. At the end of this interval, if it has not been told to stop, it calls the timeToTransmit() method to check if the current time is in the cheap-rate transmission period. If this method succeeds, the queue manager's triggerTransmission() rule is called.The timeToTransmit method is shown in the following code:
protected boolean timeToTransmit() { /* get current time */ Calendar calendar = Calendar.getInstance(); calendar.setTime( new Date() ); /* get hour */ int hour = calendar.get( Calendar.HOUR_OF_DAY ); if ( hour >= cheapRatePeriodStart || hour < cheapRatePeriodEnd ) { return true; /* cheap rate */ } else { return false; /* not cheap rate */ } }
The private data structure passed between rule invocations is as follows:
struct myRules_st_ { // rules instance structure MQeAdministratorHndl hAdmin; // administrator handle to carry around between // rules functions MQEBOOL carryOn; // used for trigger transmission thread MQEINT32 triggerInterval; // used for trigger transmission thread HANDLE hThread; // handle for the trigger transmission thread }; typedef struct myRules_st_ myRules; The queue manager activate rule: MQEVOID myRules_activateQueueManager( MQeRulesActivateQMgr_in_ * pInput, MQeRulesActivateQMgr_out_ * pOutput) { // retrieve exception block - passed from application MQeExceptBlock * pExceptBlock = (MQeExceptBlock *) (pOutput->pExceptBlock); // retrieve private data structure passed between user's rules invocations myRules * myData = (myRules *)(pInput->pPrivateData); MQeQueueManagerHndl hQueueManager; MQERETURN rc = MQERETURN_OK; rc = mqeQueueManager_getCurrentQueueManager(pExceptBlock, &queueManager); if(MQERETURN_OK == rc) { // set up the private data administrator handle using the retrieved // application queue manager handle. This is done here rather than in // the rules initialization function as the queue manager has not yet been // activated fully when the rules //initialization function is invoked. rc = mqeAdministrator_new(pExceptBlock, &myData>hAdmin,hQueueManager); } if(MQERETURN_OK == rc) { DWORD tid; // Launch thread to govern calls to trigger transmission myData->hThread = (HANDLE) CreateThread(NULL, 0, timeToTrigger, (MQEVOID *)myData, 0, &tId); if(myData>hThread == NULL) { // thread creation failed SET_EXCEPT_BLOCK(pExceptBlock, MQERETURN_RULES_ERROR, MQEREASON_NA); } } }
The timeToTrigger function provides the equivalent functionality of the run() method in the java example above. Notice the use of the private data variable carryOn, type MQEBOOL, as one of the conditions for the while loop to continue. Once this variable has a value of MQE_FALSE, the while loop will terminate, causing the thread to terminate when the function is exited.
DWORD _stdcall timeToTrigger(myRules * rulesStruct) { MQERETURN rc = MQERETURN_OK; MQeQueueManagerHndl hQueueManager; MQeExceptBlock exceptBlock; myRules * myData = (myRules *)rulesStruct; SET_EXCEPT_BLOCK_TO_DEFAULT(&exceptBlock); /* retrieve the current queue manager */ rc = mqeQueueManager_getCurrentQueueManager(&exceptBlock, &hQueueManager); if(MQERETURN_OK == rc) { /* so long as there is not a grave internal error and the termination condition has not been set */ while(!(EC(&exceptBlock) == MQERETURN_QUEUE_MANAGER_ERROR && ERC(&exceptBlock) == MQEREASON_INTERNAL_ERROR) && myData->carryOn == MQE_TRUE) { /* Are we in a cheap rate transmission period? */ if(timeToTransmit()) { /* if so, attempt to trigger transmission */ rc = mqeQueueManager_triggerTransmission(hQueueManager, &exceptBlock); /* wait for the duration of the trigger interval */ Sleep(myData->triggerInterval); } } } return 0; }
The timeToTransmit() function returns a boolean to indicate whether or not we are in a cheap transmission period:
MQEBOOL timeToTransmit() { SYSTEMTIME timeInfo; GetLocalTime(&timeInfo); if (timeInfo.wHour >= 18 || timeInfo.wHour < 9) { return MQE_TRUE; } else { return MQE_FALSE; } }
It would probably be a better idea to define constants for the cheap rate interval boundary times and carry these around in the rules private data structure also but that has been not been done here for reasons of clarity.
The function returns MQE_TRUE to suggest that we are in a cheap rate period, that is between the hours of 18:00 and 09:00. A return value of MQE_TRUE is one of the prerequisites for transmission to be triggered in timeToTrigger(). Finally, the queue manager close rule is used to terminate the thread.Notice that one of the conditions for termination of the timeToTrigger() function is for the boolean variable carryOn to have a value of MQE_FALSE. In the close function, the value of carryOn is set to false. But, there may still be a considerable lapse of time between when this value is set to MQE_FALSE and when the timeToTrigger() function is exited. The value of triggerInterval + the time taken to perform a triggerTransmission operation. Also, we wait for the thread to terminate in this function. We also call triggerTransmission() one more time in case there are still some pending messages.
MQEVOID myRules_CloseQMgr( MQeRulesCloseQMgr_in_ * pInput, MQeRulesCloseQMgr_out_ * pOutput) { MQERETURN rc = MQERETURN_OK; MQeQueueManagerHndl hQueueManager; myRules * myData = (myRules *)pInput->pPrivateData; DWORD result; MQeExceptBlock exceptBlock = *((MQeExceptBlock *)pOutput->pExceptBlock); SET_EXCEPT_BLOCK_TO_DEFAULT(&exceptBlock); // Effect the ending of the thread by setting the MQEBOOL continue to MQE_FALSE // This leads to a return from timeToTrigger() and hence the implicit call // to _endthread myData->carryOn = MQE_FALSE; /* wait for the thread in any case */ result = WaitForSingleObject(myData->hThread, INFINITE); /* retrieve the current queue manager */ rc = mqeQueueManager_getCurrentQueueManager(&exceptBlock, &hQueueManager); if(MQERETURN_OK == rc) { /* attempt to trigger transmission one /* last time to clean up queue */ rc = mqeQueueManager_triggerTransmission(hQueueManager, &exceptBlock); } }