Funktionalität eines Java-Empfangsknotens erweitern

Vorbereitungen:

Stellen Sie sicher, dass Sie folgende Abschnitte gelesen und verstanden haben:
Nach dem Erstellen eines benutzerdefinierten Knotens sind folgende Funktionen verfügbar:
  1. Externe Daten in einem Puffer empfangen
  2. Nachricht weitergeben
  3. Threading und Transaktionalität steuern
  4. Ausnahmebedingungen verarbeiten

Externe Daten in einem Puffer empfangen

Ein Empfangsknoten kann wie jedes andere Java-Programm Daten aus allen Arten von externen Quellen, z. B. einem Dateisystem, einer Warteschlange oder einer Datenbank, empfangen, solange die Ausgabe des Knotens im richtigen Format vorliegt.

Sie müssen einen Eingabepuffer (oder einen Bitstrom) bereithalten, in dem Eingabedaten enthalten sind, und ihn einem Nachrichtenobjekt zuordnen. Dann erstellen Sie mit der Methode createMessage der Klasse MbInputNode eine Nachricht aus einer Bytefeldgruppe, und anschließend generieren Sie aus dieser Nachricht ein gültiges Nachrichten-Assembly. Weitere Informationen zu diesen Methoden finden Sie im Abschnitt Java-API. Wenn Sie beispielsweise die Eingabe aus einer Datei lesen möchten, gehen Sie wie folgt vor:

  1. Erstellen Sie einen Eingabedatenstrom, um die Daten aus der Datei zu lesen:
    FileInputStream inputStream = new FileInputStream("myfile.msg");
  2. Erstellen Sie eine Bytefeldgruppe, die dieselbe Größe wie die Eingabedatei hat:
    byte[] buffer = new byte[inputStream.available()];
  3. Lesen Sie die Daten aus der Datei in die Bytefeldgruppe ein:
    inputStream.read(buffer);
  4. Schließen Sie den Eingabedatenstrom:
    inputStream.close();
  5. Erstellen Sie eine Nachricht, die in die Warteschlange eingereiht werden soll:
    MbMessage msg = createMessage(buffer);
  6. Erstellen Sie ein neues Nachrichten-Assembly, das diese Nachricht enthalten soll:
    msg.finalizeMessage(MbMessage.FINALIZE_VALIDATE);
    MbMessageAssembly newAssembly = 
         new MbMessageAssembly(assembly, msg);

Nachricht weitergeben

Wenn Sie ein Nachrichten-Assembly erstellt haben, können Sie dies an einen der Terminals des Knotens weitergeben.

Geben Sie beispielsweise Folgendes ein, um das Nachrichten-Assembly an das Ausgangsterminal weiterzugeben:
MbOutputTerminal out = getOutputTerminal("out");
out.propagate(newAssembly);

Threading und Transaktionalität steuern

Die Broker-Infrastruktur verarbeitet Transaktionen wie das Festschreiben einer WebSphere MQ- oder Datenbank-Arbeitseinheit, wenn die Nachrichtenverarbeitung abgeschlossen ist. Wenn jedoch ein benutzerdefinierter Knoten verwendet wird, können Ressourcenaktualisierungen nicht automatisch vom Broker festgeschrieben werden.

Jeder Nachrichtenfluss-Thread wird aus einem Thread-Pool zugeordnet, der für jeden Nachrichtenfluss verwaltet wird, und startet die Ausführung in der Methode run.

Der benutzerdefinierte Knoten verwendet Rückgabewerte, um anzugeben, ob eine Transaktion erfolgreich war, um zu steuern, ob Transaktionen festgeschrieben oder zurückgesetzt werden, und um zu steuern, wann der Thread an den Pool zurückgegeben wird. Alle nicht verarbeiteten Ausnahmebedingungen werden von der Broker-Infrastruktur abgefangen, und die Transaktion wird zurückgesetzt.

Sie bestimmen das Verhalten von Transaktionen und Threads, indem Sie den entsprechenden Rückgabewert verwenden:

MbInputNode.SUCCESS_CONTINUE
Die Transaktion wird festgeschrieben, und der Broker ruft die Methode run erneut mit demselben Thread auf.
MbInputNode.SUCCESS_RETURN
Die Transaktion wird festgeschrieben, und der Thread wird an den Thread-Pool zurückgegeben, wobei vorausgesetzt wird, dass dies nicht der einzige Thread für diesen Nachrichtenfluss ist.
MbInputNode.FAILURE_CONTINUE
Die Transaktion wird zurückgesetzt, und der Broker ruft die Methode run erneut mit demselben Thread auf.
MbInputNode.FAILURE_RETURN
Die Transaktion wird zurückgesetzt, und der Thread wird an den Thread-Pool zurückgegeben, wobei vorausgesetzt wird, dass dies nicht der einzige Thread für diesen Nachrichtenfluss ist.
MbInputNode.TIMEOUT
Die Methode run darf nicht unbegrenzt blockieren, während sie auf Eingabedaten wartet. Während der Nachrichtenfluss durch Benutzercode blockiert wird, können Sie den Broker nicht beenden oder neu konfigurieren. Die Methode run muss die Steuerung regelmäßig durch Rückgabe an den Broker abgeben. Wenn nach einer bestimmten Zeit (z. B. 5 Sekunden) keine Eingabedaten empfangen wurden, sollte die Rückgabe der Methode mit dem Rückkehrcode TIMEOUT erfolgen. Die Methode run des Empfangsknotens wird dann gleich wieder aufgerufen, vorausgesetzt, der Broker muss nicht neu konfiguriert oder beendet werden.
Um Multithread-Nachrichtenflüsse zu erstellen, rufen Sie die Methode dispatchThread auf, nachdem eine Nachricht erstellt wurde, aber bevor die Nachricht an ein Ausgabeterminal weitergegeben wird. Dadurch wird gewährleistet, dass nur ein Thread auf Daten wartet, während andere Threads die Nachricht verarbeiten. Neue Threads werden bis zu dem Maximalwert, der durch das Attribut 'additionalInstances' des Nachrichtenflusses angegeben wird, aus dem Thread-Pool abgerufen. Beispiel:
public int run( MbMessageAssembly assembly ) throws MbException
{
  byte[] data = getDataWithTimeout();  // user supplied method
                                       // returns null if timeout
  if( data == null )
    return TIMEOUT;

  MbMessage msg = createMessage( data );
  msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE );
  MbMessageAssembly newAssembly = 
       new MbMessageAssembly( assembly, msg );

  dispatchThread();

  getOutputTerminal( "out" ).propagate( newAssembly );

  return SUCCESS_RETURN;
}

Ausnahmebedingungen verarbeiten

Mit Hilfe der Klasse 'mbException' fangen Sie Ausnahmebedingungen ab und greifen darauf zu. Die Klasse 'mbException' gibt eine Feldgruppe von Ausnahmeobjekten zurück, die die untergeordneten Elemente einer Ausnahmebedingung in der Ausnahmeliste des Brokers darstellen. Jedes zurückgegebene Element gibt seinen Ausnahmetyp an. Wenn eine Ausnahmebedingung keine untergeordneten Elemente hat, wird eine leere Feldgruppe zurückgegeben. Im folgenden Codebeispiel ist ein Beispiel für die Syntax der Klasse 'MbException' dargestellt.

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
    try
      {

        // plug-in functionality

      }
    catch(MbException ex)
      {
        traverse(ex, 0);

        throw ex; // if re-throwing, it must be the original exception that was caught
      }
  }

  void traverse(MbException ex, int level)
  {
    if(ex != null)
      {
        // Do whatever action here
        System.out.println("Level: " + level);
        System.out.println(ex.toString());
        System.out.println("traceText:  " + ex.getTraceText());

        // traverse the hierarchy
        MbException e[] = ex.getNestedExceptions();
        int size = e.length;
        for(int i = 0; i < size; i++)
          {
            traverse(e[i], level + 1);
          }
      }
  }

Weitere Informationen zur Verwendung der Klasse 'mbException' finden Sie in der Javadoc.

Sie können einen benutzerdefinierten Nachrichtenverarbeitungs- oder Sendeknoten so entwickeln, dass er auf alle aktuellen Ausnahmebedingungen zugreifen kann. Sie können beispielsweise die Klasse 'MbSQLStatement' verwenden, um Datenbankausnahmebedingungen abzufangen. Diese Klasse legt den Wert des Attributs 'throwExceptionOnDatabaseError' fest, das das Verhalten des Brokers bestimmt, wenn ein Datenbankfehler auftritt. Wenn dieser Wert auf 'True' festgelegt wird, kann eine Ausnahmebedingung abgefangen und von der benutzerdefinierten Erweiterung bearbeitet werden.

Im folgenden Codebeispiel ist ein Beispiel für die Verwendung der Klasse 'MbSQLStatement' dargestellt.

public void evaluate(MbMessageAssembly assembly, MbInputTerminal inTerm) throws MbException
  {
    MbMessage newMsg = new MbMessage(assembly.getMessage());
    MbMessageAssembly newAssembly = new MbMessageAssembly(assembly, newMsg);

    String table = 
       assembly.getMessage().getRootElement().getLastChild().getFirstChild().getName();

    MbSQLStatement state = createSQLStatement( "dbName", 
       "SET OutputRoot.XML.integer[] = PASSTHRU('SELECT * FROM " + table + "');" );

    state.setThrowExceptionOnDatabaseError(false);
    state.setTreatWarningsAsErrors(true);

    state.select( assembly, newAssembly );

    int sqlCode = state.getSQLCode();
    if(sqlCode != 0)
      {
        // Do error handling here

        System.out.println("sqlCode = " + sqlCode);
        System.out.println("sqlNativeError = " + state.getSQLNativeError());
        System.out.println("sqlState = " + state.getSQLState());
        System.out.println("sqlErrorText = " + state.getSQLErrorText());
      }

    getOutputTerminal("out").propagate(assembly);
  }
Zugehörige Informationen
Java-API
Bemerkungen | Marken | Downloads | Bibliothek | Unterstützung | Rückmeldung
Copyright IBM Corporation 1999, 2005 Letzte Aktualisierung: Nov 17, 2005
as24987_