Transmit rule

The transmit() rule is only called if the triggerTransmission() rule allows transmission. It returns a value of true or MQERETURN_OK. The transmit() rule is called for every remote queue definition that holds messages awaiting transmission. This means that the rule can decide which messages should be transmitted on a queue by queue basis. The example rule below only allows message transmission from a queue if the queue has a default priority greater than 5. If a message has not been assigned a priority before being placed on a queue, it is given the queue's default priority.

Java codebase
public boolean transmit( MQeQueueProxy queue )    {
    if ( queue.getDefaultPriority() > 5 )    {    
        return (true);
    }
    else    {
        return (false);
    }
}

C codebase
/* The following function is mapped to the fPtrTransmitQueue function*/
/* pointer in the user's initialization 
/* function output parameter structure. */
 
MQERETURN myRules_TransmitQueue( MQeRulesTransmitQueue_in_ * pInput,                
                        MQeRulesTransmitQueue_out_ * pOutput) {
    MQERETURN rc = MQERETURN_OK;    
    MQEBYTE queuePriority;
      
    MQeRemoteAsyncQParms queueParms = REMOTE_ASYNC_Q_INIT_VAL;
    myRules * myData = (myRules *)(pInput->pPrivateData);
 
    MQeExceptBlock * pExceptBlock = 
                   (MQeExceptBlock *)(pOutput->pExceptBlock);    
    SET_EXCEPT_BLOCK_TO_DEFAULT(pExceptBlock);
    
    /* inquire upon the default priority of the queue*/
    /* specify the subject of the inquire 
			in the queue parameter structure*/
	 queueParms.baseParms.opFlags =  QUEUE_PRIORITY_OP ;
    
    rc =  mqeAdministrator_AsyncRemoteQueue_inquire(myData->hAdmin,
                                     pExceptBlock,          
                                     pInput->hQueueName, 
                                     pInput->hQueue_QueueManagerName,
                                     &queueParms);
    // if the default priority is less than 6, disallow the operation
    if(MQERETURN_OK == rc 
			&& queueParms.baseParms.queuePriority < 6) {
        SET_EXCEPT_BLOCK(pExceptBlock, 
                         MQERETURN_RULES_DISALLOWED_BY_RULE, 
									MQEREASON_NA);
    }
}

A sensible extension to this rule can allow all messages to be transmitted at 'off-peak' time. This allows only messages from high-priority queues to be transmitted during peak periods.

A more complex example

The following example assumes that the transmission of the messages takes place over a communications network that charges for the time taken for transmission. It also assumes that there is a cheap-rate period when the unit-time cost is lower. The rules block any transmission of messages until the cheap-rate period. During the cheap-rate period, the queue manager is triggered at regular intervals.

Java codebase
import com.ibm.mqe.*;
import java.util.*;
/**
* Example set of queue manager 
		rules which trigger the transmission
* of any messages waiting to be sent.
*
* These rules only trigger the 
		transmission of messages if the current
* time is between the values defined 
		in the variables cheapRatePeriodStart
* and cheapRatePeriodEnd
* (This example assumes that transmission 
			will take place over a
* communication network which charges 
			for the time taken to transmit)
*/
public class ExampleQueueManagerRules extends MQeQueueManagerRule
implements Runnable
{
    // default interval between triggers is 15 seconds                          
    private static final long 
					MILLISECS_BETWEEN_TRIGGER_TRANSMITS = 15000;
    
    // interval between which we c
			heck whether the queue manager is closing down.
   		 private static final long 
					MILLISECS_BETWEEN_CLOSE_CHECKS = 1000 ;
    
    // Max wait of ten seconds to kill off 
				the background thread when 
    // the queue manager is closing down.
    private static final long 
				MAX_WAIT_FOR_BACKGROUND_THREAD_MILLISECONDS = 10000;
    
    // Reference to the control block used to 
				communicate with the background thread
    // which does a sleep-trigger-sleep-trigger loop.
    // Note that freeing such blocks for garbage 
				collection will not stop the thread
    // to which it refers.
    private Thread th = null;
    
    // Flag which is set when shutdown of 
				the background thread is required.
    // Volatile because the thread using the 
				flag and the thread setting it to true
    // are different threads, and it is 
				important that the flag is not held in 
    // CPU registers, or one thread will 
				see a different value to the other.
    private volatile boolean toldToStop = false; 
	 //cheap rate transmission period start and end times 
    protected int cheapRatePeriodStart = 18;  /*18:00 hrs */
    protected int cheapRatePeriodEnd = 9;     /*09:00 hrs */
}

The cheapRatePeriodStart and cheapRatePeriodEnd functions define the extent of this cheap rate period. In this example, the cheap-rate period is defined as being between 18:00 hours in the evening until 09:00 hours the following morning.

The constant MILLISECS_BETWEEN_TRIGGER_TRANSMITS defines the period of time, in milliseconds, between each triggering of the queue manager. In this example, the trigger interval is defined to be 15 seconds.

The triggering of the queue manager is handled by a background thread that wakes up at the end of the triggerInterval period. If the current time is inside the cheap rate period, it calls the MQeQueueManager.triggerTransmission() method to initiate an attempt to transmit all messages awaiting transmission. The background thread is created in the queueManagerActivate() rule and stopped in the queueManagerClose() rule. The queue manager calls these rules when it is activated and closed respectively.

/**
* Overrides MQeQueueManagerRule.queueManagerActivate()
* Starts a timer thread
*/
public void queueManagerActivate()throws Exception {
    super.queueManagerActivate();
    // background thread which triggers transmission
    th = new Thread(this, "TriggerThread");
    toldToStop = false;
    th.start();    // start timer thread        
}
 
 
/**
* Overrides MQeQueueManagerRule.queueManagerClose()
* Stops the timer thread
*/
 public void queueManagerClose()throws Exception { 
    super.queueManagerClose();
        
    // Tell the background thread to stop, 
			as the queue manager is closing now.
    toldToStop = true ;
        
    // Now wait for the background thread, 
			if it's not already stopped.
    if ( th != null)  {
        try { 
        // Only wait for a certain time before 
				giving up and timing out.
         th.join( MAX_WAIT_FOR_BACKGROUND_THREAD_MILLISECONDS );
     
     		// Free up the thread control block for garbage collection.          	
            th = null ;
        } catch (InterruptedException e) { 
            // Don't propogate the exception. 
            // Assume that the thread will stop shortly anyway.
        }
    }    
}
 
 

The code to handle the background thread looks like this:

/**
* Timer thread
* Triggers queue manager every interval until thread is stopped
*/
public void run()    {
    /* Do a sleep-trigger-sleep-trigger loop until the */
	 /*  queue manager closes or we get an exception.*/
    while ( !toldToStop) {
        try {           		
        		
            // Count down until we've waited enough
        	// We do a tight loop with a smaller granularity because 
        	// otherwise we would stop a queue manager from closing quickly
        	long timeToWait = MILLISECS_BETWEEN_TRIGGER_TRANSMITS ;
        	while( timeToWait > 0 && !toldToStop ) {
        		
	          // sleep for specified interval                                 
                Thread.sleep( MILLISECS_BETWEEN_CLOSE_CHECKS );
            		
                // We've waited for some time. 
							Account for this in the overall wait.
                timeToWait -= MILLISECS_BETWEEN_CLOSE_CHECKS ;
        	}
 	if( !toldToStop && timeToTransmit()) {
	          // trigger transmission on QMgr (which is rule owner)       
	          ((MQeQueueManager)owner).triggerTransmission();
            	        }
        	} catch ( Exception e ) {
            	e.printStackTrace();
        	}
    	  }
    }
}

The variable owner is defined by the class MQeRule, which is the ancestor of MQeQueueManagerRule. As part of its startup process, the queue manager activates the queue manager rules and passes a reference to itself to the rules object. This reference is stored in the variable owner.

The thread loops indefinitely, as it is stopped by the queueManagerClose() rule, and it sleeps until the end of the MILLISECS_BETWEEN_TRIGGER_TRANSMITS interval period. At the end of this interval, if it has not been told to stop, it calls the timeToTransmit() method to check if the current time is in the cheap-rate transmission period. If this method succeeds, the queue manager's triggerTransmission() rule is called.The timeToTransmit method is shown in the following code:

protected boolean timeToTransmit()    {
    /* get current time */
    Calendar calendar = Calendar.getInstance();
    calendar.setTime( new Date() );
    /* get hour */
    int hour = calendar.get( Calendar.HOUR_OF_DAY );
    if ( hour >= cheapRatePeriodStart || hour 
						< cheapRatePeriodEnd ) {
        return true; /* cheap rate */
    }
    else    {
        return false; /* not cheap rate */
    }
}

C codebase
The C example emulates the java codebase example. While the native C codebase is entirely single-threaded, it is possible for the user to write platform-specific code in which threads are created. In this example of a user-written queue manager activate rule, a thread is spawned which loops, sleeping for a period of time defined in a triggerInterval variable and then, providing it has not been asked to stop, checking that we are in a cheap rate period prior to attempting to trigger transmission. Data, which is required between rules invocations, is stored in the rule's private data structure. Refer to the WebSphere MQ Everyplace C Programming Reference on how rules private data is carried around between rules invocations. The queue manager's close rule function is used to provide the thread's terminating condition, setting a boolean switch, carryOn to MQE_FALSE. This switch can be initialized to MQE_TRUE in the rules initialization function. This function waits until the thread is suspended before passing control back to the application.

The private data structure passed between rule invocations is as follows:

struct myRules_st_ {       
// rules instance structure
    MQeAdministratorHndl hAdmin;  
// administrator handle to carry around between	
					                    
// rules functions
    MQEBOOL carryOn;              
// used for trigger transmission thread 
    MQEINT32 triggerInterval;     
// used for trigger transmission thread
    HANDLE hThread;               
// handle for the trigger transmission thread 
};
     
typedef struct myRules_st_ myRules;
 
The queue manager activate rule:
 
MQEVOID myRules_activateQueueManager( MQeRulesActivateQMgr_in_ * pInput,
                                    MQeRulesActivateQMgr_out_ * pOutput) {
    // retrieve exception block - passed from application
    MQeExceptBlock * pExceptBlock = (MQeExceptBlock *)
													(pOutput->pExceptBlock);
           
    // retrieve private data structure passed 
			between user's rules invocations
    myRules * myData = (myRules *)(pInput->pPrivateData);
           
    MQeQueueManagerHndl hQueueManager;
    MQERETURN rc = MQERETURN_OK;
               
    rc = mqeQueueManager_getCurrentQueueManager(pExceptBlock, 
																&queueManager);
    if(MQERETURN_OK == rc) {		
        // set up the private data administrator 
					handle using the retrieved    
        // application queue manager handle.  
					This is done here rather than in
        // the rules initialization function as the 
					queue manager has not yet been           
			// activated fully when the rules 
			//initialization function is invoked.
        rc = mqeAdministrator_new(pExceptBlock,
										&myData>hAdmin,hQueueManager);
    }
    if(MQERETURN_OK == rc) {		
        DWORD tid;
        // Launch thread to govern calls to trigger transmission
        myData->hThread = (HANDLE) CreateThread(NULL,
                                            0,
                                            timeToTrigger,
                                            (MQEVOID *)myData,
                                            0,
                                            &tId); 
																if(myData>hThread == NULL) {
            // thread creation failed
            SET_EXCEPT_BLOCK(pExceptBlock, 
											MQERETURN_RULES_ERROR, 
											MQEREASON_NA);
        }      
    }  
}

The timeToTrigger function provides the equivalent functionality of the run() method in the java example above. Notice the use of the private data variable carryOn, type MQEBOOL, as one of the conditions for the while loop to continue. Once this variable has a value of MQE_FALSE, the while loop will terminate, causing the thread to terminate when the function is exited.

DWORD _stdcall timeToTrigger(myRules * rulesStruct) {
 
    MQERETURN rc = MQERETURN_OK;
    MQeQueueManagerHndl hQueueManager;
    MQeExceptBlock exceptBlock;
    myRules * myData = (myRules *)rulesStruct;
    SET_EXCEPT_BLOCK_TO_DEFAULT(&exceptBlock);
 
    /* retrieve the current queue manager */
    rc = mqeQueueManager_getCurrentQueueManager(&exceptBlock, 
																&hQueueManager);
    if(MQERETURN_OK == rc) {
        /* so long as there is not a grave 
				internal error and the termination
           condition has not been set */
        while(!(EC(&exceptBlock) == 
								MQERETURN_QUEUE_MANAGER_ERROR &&
                ERC(&exceptBlock) == 
								MQEREASON_INTERNAL_ERROR) && 
                myData->carryOn == MQE_TRUE) {
           /* Are we in a cheap rate transmission period? */
            if(timeToTransmit())    {
                /* if so,  attempt to trigger transmission */
                rc = mqeQueueManager_triggerTransmission(hQueueManager, 
                                        &exceptBlock);
 
                /* wait for the duration of the trigger interval */
                Sleep(myData->triggerInterval);
            }
        }		
    }
    return 0;
}

The timeToTransmit() function returns a boolean to indicate whether or not we are in a cheap transmission period:

MQEBOOL timeToTransmit() {
 
    SYSTEMTIME timeInfo;
    GetLocalTime(&timeInfo);
 
    if (timeInfo.wHour >= 18 || timeInfo.wHour < 9) {
        return MQE_TRUE;
    } else {
        return MQE_FALSE;
    }
}

It would probably be a better idea to define constants for the cheap rate interval boundary times and carry these around in the rules private data structure also but that has been not been done here for reasons of clarity.

The function returns MQE_TRUE to suggest that we are in a cheap rate period, that is between the hours of 18:00 and 09:00. A return value of MQE_TRUE is one of the prerequisites for transmission to be triggered in timeToTrigger(). Finally, the queue manager close rule is used to terminate the thread.Notice that one of the conditions for termination of the timeToTrigger() function is for the boolean variable carryOn to have a value of MQE_FALSE. In the close function, the value of carryOn is set to false. But, there may still be a considerable lapse of time between when this value is set to MQE_FALSE and when the timeToTrigger() function is exited. The value of triggerInterval + the time taken to perform a triggerTransmission operation. Also, we wait for the thread to terminate in this function. We also call triggerTransmission() one more time in case there are still some pending messages.

	MQEVOID myRules_CloseQMgr( MQeRulesCloseQMgr_in_ * pInput,
                            MQeRulesCloseQMgr_out_ * pOutput)     {
    MQERETURN rc = MQERETURN_OK;
    MQeQueueManagerHndl hQueueManager;
    myRules * myData = (myRules *)pInput->pPrivateData;
    DWORD result;
    MQeExceptBlock exceptBlock = 
							*((MQeExceptBlock *)pOutput->pExceptBlock);
    SET_EXCEPT_BLOCK_TO_DEFAULT(&exceptBlock);
    // Effect the ending of the thread by 
				setting the MQEBOOL continue to MQE_FALSE 
    // This leads to a return from timeToTrigger() 
					and hence the implicit call 
    // to _endthread
    myData->carryOn = MQE_FALSE;
 
    /* wait for the thread in any case */
    result = WaitForSingleObject(myData->hThread, INFINITE);
 
    /* retrieve the current queue manager */
    rc = mqeQueueManager_getCurrentQueueManager(&exceptBlock, 
																	&hQueueManager);
    if(MQERETURN_OK == rc) {
        /* attempt to trigger transmission one 
			/* last time to clean up queue */
        rc = mqeQueueManager_triggerTransmission(hQueueManager, 
																	&exceptBlock);
    }    
}     
 


© IBM Corporation 2002. All Rights Reserved