This example uses the standard Java classes to manipulate TCPIP and adds a protocol of its own on top. This protocol has a header consisting of a four byte length of the data in the data packet followed by the actual data. This is so that the receiving end knows how much data to expect.
This example is not meant as a replacement for the adapters that are supplied with WebSphere MQ Everyplace but rather as a simple introduction into how to create communications adapters. In reality, much more care should be taken with error handling, recovery, and parameter checking. Depending on the WebSphere MQ Everyplace configuration used, the supplied adapters may be sufficient.
A new class file is constructed, inheriting from MQeAdapter. Some variables are defined to hold this adapter's instance information, that is the name of the host, port number and the output stream objects.
The MQeAdapter constructor is used for the object, so no additional code needs to be added for the constructor.
public class MyTcpipAdapter extends MQeAdapter { protected String host = ""; protected int port = 80; protected Object readLock = new Object( ); protected ServerSocket serversocket = null; protected Socket socket = null; protected BufferedInputStream stream_in = null; protected BufferedOutputStream stream_out = null; protected Object writeLock = new Object( );
Next the activate method is coded. This is the method that extracts from the file descriptor the name of the target network address if a connector, or the listening port if a listener. The fileDesc parameter contains the adapter class name or alias name, and any network address data for the adapter for example MyTcpipAdapter:127.0.0.1:80. The thisParam parameter contains any parameter data that was set when the connection was defined by administration, the normal value would be "?Channel". The thisOpt parameter contains the adapter setup options that were set by administration, for example MQe_Adapter_LISTEN if this adapter is to listen for incoming connections.
public void activate( String fileDesc, Object thisParam, Object thisOpt, int thisValue1, int thisValue2 ) throws Exception { super.activate( fileDesc, thisParam, thisOpt, thisValue1, thisValue2 ); /* isolate the TCP/IP address - "MyTcpipAdapter:127.0.0.1:80" */ host = fileId.substring( fileId.indexOf( ':' ) + 1 ); i = host.indexOf( ':' ); /* find delimiter */ if ( i > -1 ) /* find it ? */ { port = (new Integer( host.substring( i + 1 ) )).intValue( ); host = host.substring( 0, i ); } }
The close method needs to be defined to close the output streams and flush any remaining data from the stream buffers. Close is called many time during a session between a client and a server, however, when the channel has completely finished with the adapter it calls WebSphere MQ Everyplace with the option MQe_Adapter_FINAL. If the adapter is to have one socket connection for the life of the channel then the call with MQe_Adapter_FINAL set, is the one to use to actually close the socket, other calls should just flush the buffers. If however a new socket is to be used on each request, then each call to WebSphere MQ Everyplace should close the socket, subsequent open calls should allocate a new socket:
public void close( Object opt ) throws Exception { if ( stream_out != null ) /* output stream ? */ { stream_out.flush(); /* empty the buffers */ stream_out.close(); /* close it */ stream_out = null; /* clear */ } if ( stream_in != null ) /* input stream ? */ { stream_in.close(); /* close it */ stream_in = null; /* clear */ } if ( socket != null ) /* socket ? */ { socket.close(); /* close it */ socket = null; /* clear */ } if ( serversocket != null ) /* serversocket ? */ { serversocket.close(); /* close it */ serversocket = null; /* clear */ } host = ""; port = 80; }
The control method needs to be coded to handle an MQe_Adapter_ACCEPT request, to accept an incoming connect request. This is only allowed if the socket is a listener (a server socket). Any options that were specified for the listen socket (excluding MQe_Adapter_LISTEN) are copied to the socket created as a result of the accept. This is accomplished by the use of another control option MQe_Adapter_SETSOCKET this allows a socket object to be passed to the adapter that was just instantiated.
public Object control( Object opt, Object ctrlObj ) throws Exception { if ( checkOption( opt, MQe.MQe_Adapter_LISTEN ) && checkOption( opt, MQe.MQe_Adapter_ACCEPT ) ) { /* CtrlObj - is a string representing the file descriptor of the */ /* MQeAdapter object to be returned e.g. "MyTcpip:" */ Socket ClientSocket = serversocket.accept(); /* wait connect */ String Destination = (String) ctrlObj; /* re-type object*/ int i = Destination.indexOf( ':' ); if ( i < 0 ) throw new MQeException( MQe.Except_Syntax, "Syntax:" + Destination ); /* remove the Listen option */ String NewOpt = (String) options; /* re-type to string */ int j = NewOpt.indexOf( MQe.MQe_Adapter_LISTEN ); NewOpt = NewOpt.substring( 0, j ) + NewOpt.substring ( j + MQe.MQe_Adapter_LISTEN.length( ) ); MQeAdapter Adapter = MQe.newAdapter ( Destination.substring( 0,i+1 ), parameter, NewOpt + MQe_Adapter_ACCEPT, -1, -1 ); /* assign the new socket to this new adapater */ Adapter.control( MQe.MQe_Adapter_SETSOCKET, ClientSocket); return( Adapter ); } else if ( checkOption( opt, MQe.MQe_Adapter_SETSOCKET ) ) { if ( stream_out != null ) stream_out.close(); if ( stream_in != null ) stream_in .close(); if ( ctrlObj != null ) /* socket supplied ?*/ { socket = (Socket) ctrlObj; /* save the socket */ stream_in = new BufferedInputStream (socket.getInputStream ()); stream_out = new BufferedOutputStream(socket.getOutputStream()); } else return( super.control( opt, ctrlObj ) ); }
The open method needs to check for a listening socket or a connector socket and create the appropriate socket object. Reinitialization of the input and output streams is achieved by using the control method, passing it a new socket object. The opt parameter may be set to MQe_Adapter_RESET, this means that any previous operations are now complete any new reads or writes constitute a new request.
public void open( Object opt ) throws Exception { if ( checkOption( MQe.MQe_Adapter_LISTEN ) ) serversocket = new ServerSocket( port, 32 ); else control( MQe.MQe_Adapter_SETSOCKET, new Socket( host, port ) ); }
The read method can take a parameter specifying the maximum record size to be read.
This examples calls internal routines to read the data bytes and do error recovery (if appropriate) then return the correct length byte array for the number of bytes read. Care needs to be taken to ensure that only one read at a time occurs on this socket. The opt parameter may be set to:
{ public byte[] read( Object opt, int recordSize ) throws Exception int Count = 0; /* number bytes read */ synchronized ( readLock ) /* only one at a time */ { if ( checkOption(opt, MQe.MQe_Adapter_HEADER ) ) { byte lreclBytes[] = new byte[4]; /* for the data length */ readBytes( lreclBytes, 0, 4 ); /* read the length */ int recordSize = byteToInt( lreclBytes, 0, 4 ); } if ( checkOption( opt, MQe.MQe_Adapter_CONTENT ) ) { byte Temp[] = new byte[recordSize]; /* allocate work array */ Count = readBytes( Temp, 0, recordSize);/* read data */ } } if ( Count < Temp.length ) /* read all length ? */ Temp = MQe.sliceByteArray( Temp, 0, Count ); return ( Temp ); /* Return the data */ }
The readByte method is an internal routine designed to read a single byte of data from the socket and to attempt to retry any errors a specific number of times, or throw an end of file exception if there is no more data to be read.
protected int readByte( ) throws Exception { int intChar = -1; /* input characater */ int RetryValue = 3; /* error retry count */ int Retry = RetryValue + 1; /* reset retry count */ do{ /* possible retry */ try /* catch io errors */ { intChar = stream_in.read(); /* read a character */ Retry = 0; /* dont retry */ } catch ( IOException e ) /* IO error occured */ { Retry = Retry - 1; /* decrement */ if ( Retry == 0 ) throw e; /* more attempts ? */ } } while ( Retry != 0 ); /* more attempts ? */ if ( intChar == -1 ) /* end of file ? */ throw new EOFException(); /* ... yes, EOF */ return( intChar ); /* return the byte */ }
The readBytes method is an internal routine designed to read a number of bytes of data from the socket and to attempt to retry any errors a specific number of times, or throw an end of file exception if there is no more data to be read.
protected int readBytes( byte buffer[], int offset, int recordSize ) throws Exception { int RetryValue = 3; int i = 0; /* start index */ while ( i < recordSize ) /* got it all in yet ? */ { /* ... no */ int NumBytes = 0; /* read count */ /* retry any errors based on the QoS Retry value */ int Retry = RetryValue + 1; /* error retry count */ do{ /* possible retry */ try /* catch io errors */ { NumBytes = stream_in.read( buffer, offset + i, recordSize - i ); Retry = 0; /* no retry */ } catch ( IOException e ) /* IO error occured */ { Retry = Retry - 1; /* decrement */ if ( Retry == 0 ) throw e; /* more attempts ? */ } } while ( Retry != 0 ); /* more attempts ? */ /* check for possible end of file */ if ( NumBytes < 0 ) /* errors ? */ throw new EOFException( ); /* ... yes */ i = i + NumBytes; /* accumulate */ } return ( i ); /* Return the count */ }
The readln method reads a string of bytes terminated by a 0x0A character it will ignore 0x0D characters.
{ synchronized ( readLock ) /* only one at a time */ { /* ignore the 4 byte length */ byte lreclBytes[] = new byte[4]; /* for the data length */ readBytes( lreclBytes, 0, 4 ); /* read the length */ int intChar = -1; /* input characater */ StringBuffer Result = new StringBuffer( 256 ); /* read Header from input stream */ while ( true ) /* until "newline" */ { intChar = readByte( ); /* read a single byte */ switch ( intChar ) /* what character */ { case -1: /* ... no character */ throw new EOFException(); /* ... yes, EOF */ case 10: /* eod of line */ return( Result.toString() ); /* all done */ case 13: /* ignore */ break; default: /* real data */ Result.append( (char) intChar ); /* append to string */ } /* end of line ? */ } } }
The status method returns status information about the adapter. In this example it returns for the option MQe_Adapter_NETWORK the network type (TCPIP), for the option MQe_Adapter_LOCALHOST it returns the tcpip local host address.
public String status( Object opt ) throws Exception { if ( checkOption( opt, MQe.MQe_Adapter_NETWORK ) ) return( "TCPIP" ); else if ( checkOption( opt, MQe.MQe_Adapter_LOCALHOST ) ) return( InetAddress.getLocalHost( ).toString() ); else return( super.status( opt ) ); }
The write method writes a block of data to the socket. It needs to ensure that only one write at a time can be issued to the socket. In this example it calls an internal routine writeBytes to write the actual data and perform any appropriate error recovery.
The opt parameter may be set to:
public void write( Object opt, int recordSize, byte data[] ) throws Exception { synchronized ( writeLock ) /* only one at a time */ { if ( checkOption( opt, MQe.MQe_Adapter_HEADER ) || checkOption( opt, MQe.MQe_Adapter_HEADERRSP ) ) writeBytes( intToByte( recordSize ), 0, 4 ); /* write length*/ writeBytes( data, 0, recordSize ); /* write the data */ if ( checkOption( opt, MQe.MQe_Adapter_FLUSH ) ) stream_out.flush( ); /* make sure it is sent */ } }
The writeBytes is an internal method that writes an array (or partial array) of bytes to a socket, and attempt a simple error recovery if errors occur.
protected void writeBytes( byte buffer[], int offset, int recordSize ) throws Exception { if ( buffer != null ) /* any data ? */ { /* break the data up into manageable chuncks */ int i = 0; /* Data index */ int j = recordSize; /* Data length */ int MaxSize = 4096; /* small buffer */ int RetryValue = 3; /* error retry count */ do{ /* as long as data */ if ( j < MaxSize ) /* smallbuffer ? */ MaxSize = j; int Retry = RetryValue + 1; /* error retry count */ do{ /* possible retry */ try /* catch io errors */ { stream_out.write( buffer, offset + i, MaxSize ); Retry = 0; /* don't retry */ } catch ( IOException e ) /* IO error occured */ { Retry = Retry - 1; /* decrement */ if ( Retry == 0 ) throw e; /* more attempts ? */ } } while ( Retry != 0 ); /* more attempts ? */ i = i + MaxSize; /* update index */ j = j - MaxSize; /* data left */ } while ( j > 0 ); /* till all data sent */ } }
The writeLn method writes a string of characters to the socket, terminating with 0x0A and 0x0D characters.
The opt parameter may be set to:
public void writeln( Object opt, String data ) throws Exception { if ( data == null ) /* any data ? */ data = ""; write( opt, -1, MQe.asciiToByte( data + "\r\n" ) ); /* write data */ }
This is now a complete (though very simple) tcpip adapter that will communicate to another copy of itself one of which was started as a listener and the other started as a connector.