The following code and pseudocode outlines an example implementation for a JMS receiver.
public class CustomJMSReceiver implements ReceiverInterface {
private Context m_context = null;
private ReceiverConfig m_rcvConfig = null;
String receiverType = "CustomJMS";
public void init(Context context, ReceiverConfig receiverConfig) {
this.m_context = context;
this.m_rcvConfig = receiverConfig;
return;
}
public void refreshConfig(ReceiverConfig rcvconfig) throws BCGReceiverException {
this.m_rcvConfig = rcvconfig;
// Check which receiver targets are updated, added newly, or deleted
// If new target is added, create a new thread and start polling the target.
// If current target is updated, stop the thread that is polling the
// target, and, using the updated configuration information, start polling.
// If the current target is deleted, stop the thread which is polling the
// target and delete the thread that is responsible for polling the target.
...
return;
}
public void startReceiving() throws BCGReceiverException {
// Read the list of targets in the ReceiverConfig object.
// For each target, create a UserJMSThread and start the thread.
...
return;
}
public void processResponse(ResponseCorrelation respCorr,
ReceiverDocumentInterface response) throws BCGReceiverException {
ReceiverDocumentInterface responseDocs[];
responseDocs = rcvFramework.postProcess(receiverType, target, response);
// Process the responseDocs.
// Get the correlation information, for example, reply-to-queue
// and correlation ID, and send the responses to the reply-to-queue queue.
...
return;
}
public void stopReceiving() throws BCGReceiverException {
// Get the list of UserJMSReceiverThreads associated with each target.
// Call the stop method.
...
return;
}
private class UserJMSReceiverThread extends Thread {
String target; // Name of the target
String receiverType = "CustomJMS";
Config targetConfig;
public UserJMSReceiverThread(Config targetConfig) {
target = targetConfig.getName();
this.targetConfig = targetConfig;
// Create the queue session, connection, queue receiver
// for this target.
...
}
public void run() {
while (true) {
try {
// Call the receive method on the queue.
// If a message is available, read the message and process the
// document.
...
processDocument(data);
...
// else continue to poll the queue.
...
} catch(Exception e) {
...
}
}
}
// Upon receiving the document from the queue, start processing the
// documenting by using Receiver FrameWork APIs.
public void processDocument(byte[] data) throws BCGReceiverException {
// Get the temporary location where data can be written.
File tempDir = BCGReceiverUtil.getTempDir();
// Now create the temp file and write the data into it.
File fileLocation = new File(tempDir, fileStr);
FileOutputStream fos = new FileOutStream(fileLocation);
fow.write(data);
fos.close();
// Create the ReceiverDocument object.
ReceiverDocumentInterface request = BCGReceiverUtil.createReceiverDocument();
// Set document, transport headers, and BCG headers in the request.
request.setDocument(fileLocation);
String destination = targetConfig.getAttribute(
BCGDocumentConstants.BCG_TARGET_DESTINATION)
request.setAttribute(BCGDocumentConstants.BCG_RCVR_DESTINATION, destination);
...
// Now start processing the document using ReceiverFrameWork APIs.
ReceiverFrameWorkInterface rcvFramework =
BCGReceiverUtil.getReceiverFramework();
ReceiverDocumentInterface requestDocs[] =
rcvFramework.preprocess(receiverType, target, request);
// Check whether the requestDocs length is 1; if yes, document is not.
// Split into multiple documents. In this example, it is assumed
// that there is no splitting.
ReceiverDocumentInterface requestDoc = requestDocs[0];
boolean sync = rcvFramework.syncCheck(receiverType, target, requestDoc);
...
if (!sync) {
// Request is not synchronous.
rcvFramework.process(receiverType, target, requestDoc);
} else {
// Request is synchronous. Your receiver can make a blocking
// or nonblocking process call to the framework. The flow in
// this example is for illustration purpose only.
// Depending on your requirements, your receiver can make
// only one type of synchronous process call.
if (isRequestBlocking(requestDoc)) {
ReceiverDocumentInterface responseDoc;
ReceiverDocumentInterface responseDocs[];
rcvFramework.process(receiverType, requestDoc, responseDoc);
responseDocs =
rcvFramework.postProcess(receiverType, target, responseDoc);
// Process the responseDocs.
// Get the correlation information, for example, reply-to-queue and
// correlation ID, and send the responses to reply-to-queue queue.
...
} else {
ResponseCorrelation respCorr;
// Create response correlation by using the information that
// you can use later in CustomJMSReceiver.processResponse
// to correlate response with the request.
...
rcvFramework.process(receiverType, requestDoc, responseCorr)
...
// In case of nonblocking process, whenever response is
// available, Receiver Framework calls
// CustomJMSReceiver.processResponse.
}
}
}
public void isRequestBlocking(ReceiverDocumentInterface request) {
// Return true if you want to invoke Receiver Framework
// by using blocking process call for this request.
// Return false if you want to use nonblocking one.
...
return true;
}
}
}