示例

下列示例显示如何实现定制顾问程序。

标准顾问程序

此样本源代码类似于标准负载均衡器 HTTP 顾问程序。它有以下功能:

  1. 发出一个发送请求,即一个“HEAD/HTTP”命令。
  2. 接收响应。不会解析该信息,但该响应会导致 getLoad 方法终止。
  3. getLoad 方法返回 0 以指示成功,或者返回 -1 以指示失败。

此顾问程序以正常方式运行,所以负载测量结果将基于执行套接字打开、发送、接收和关闭操作所需的耗用时间(以毫秒计)。

package CustomAdvisors;
import com.ibm.internet.lb.advisors.*;
public class ADV_sample extends ADV_Base implements ADV_MethodInterface {
  static final String ADV_NAME ="Sample";
  static final int ADV_DEF_ADV_ON_PORT = 80;
  static final int ADV_DEF_INTERVAL = 7;
  static final String ADV_SEND_REQUEST = 
    "HEAD / HTTP/1.0\r\nAccept: */*\r\nUser-Agent: " +
    "IBM_Load_Balancer_HTTP_Advisor\r\n\r\n";

//--------
// Constructor

  public ADV_sample() {
    super(ADV_NAME, "3.0.0.0-03.31.00", 
          ADV_DEF_ADV_ON_PORT, ADV_DEF_INTERVAL, "",
					false);
    super.setAdvisor( this );
  }

//--------
// ADV_AdvisorInitialize

  public void ADV_AdvisorInitialize() {
    return;                                  // usually an empty routine
  }

//--------
// getLoad

  public int getLoad(int iConnectTime, ADV_Thread caller) {
    int iRc;
    int iLoad = ADV_HOST_INACCESSIBLE;       // initialize to inaccessible

    iRc = caller.send(ADV_SEND_REQUEST);     // send the HTTP request to 
                                             // the server
    if (0 <= iRc) {                          // if the send is successful
      StringBuffer sbReceiveData = new StringBuffer("");   // allocate a buffer 
                                                           // for the response
      iRc = caller.receive(sbReceiveData);   // receive the result

      // parse the result here if you need to

      if (0 <= iRc) {          // if the receive is successful
        iLoad = 0;             // return 0 for success 
      }                        // (advisor's load value is ignored by
    }                          //  base in normal mode)
    return iLoad;
  }
}

侧流顾问程序

此样本举例说明如何抑制顾问程序基类打开的标准套接字。相反,此顾问程序打开一个侧流 Java 套接字来查询服务器。对于使用不同端口从常规客户机流量中侦听顾问程序查询的服务器,此过程很有用。

在此示例中,服务器在端口 11999 上进行侦听,并且在查询后返回负载值以及十六进制整数“4”。此样本以替换方式运行,即,顾问程序构造函数的最后一个参数将设置为 true,并且顾问程序基本代码使用返回的负载值,而不是使用耗用时间。

注意初始化例程中对 supressBaseOpeningSocket() 的调用。当不发送数据时,不需要抑制基本套接字。例如,您可能想打开套接字以确保顾问程序可以与服务器联系。在作出此选择之前,请仔细检查应用程序的需要。

package CustomAdvisors;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.Date;
import com.ibm.internet.lb.advisors.*;
import com.ibm.internet.lb.common.*;
import com.ibm.internet.lb.server.SRV_ConfigServer;

public class ADV_sidea extends ADV_Base implements ADV_MethodInterface {
  static final String ADV_NAME = "sidea";
  static final int ADV_DEF_ADV_ON_PORT = 12345;
  static final int ADV_DEF_INTERVAL = 7;

  // create an array of bytes with the load request message
  static final byte[] abHealth = {(byte)0x00, (byte)0x00, (byte)0x00, 
                                  (byte)0x04};

  public ADV_sidea() {
    super(ADV_NAME, "3.0.0.0-03.31.00", ADV_DEF_ADV_ON_PORT,
          ADV_DEF_INTERVAL, "", 
          true);         // replace mode parameter is true
    super.setAdvisor( this );
  }

//--------
// ADV_AdvisorInitialize

  public void ADV_AdvisorInitialize()
  { 
    suppressBaseOpeningSocket();   // tell base code not to open the 
                                   // standard socket 
      return;
  }

//--------
// getLoad

  public int getLoad(int iConnectTime, ADV_Thread caller) {
    int iRc; 
    int iLoad = ADV_HOST_INACCESSIBLE;    // -1
    int iControlPort = 11999;   // port on which to communicate with the server

    String sServer = caller.getCurrentServerId();   // address of server to query 
      try {
      socket soServer = new Socket(sServer, iControlPort);  // open socket to 
                                                            // server
      DataInputStream disServer = new DataInputStream(
                                      soServer.getInputStream());
      DataOutputStream dosServer = new DataOutputStream(
                                       soServer.getOutputStream());
      
      int iRecvTimeout = 10000;  // set timeout (in milliseconds)
                                 // for receiving data  
      soServer.setSoTimeout(iRecvTimeout);

      dosServer.writeInt(4);     // send a message to the server
      dosServer.flush();

      iLoad = disServer.readByte();   // receive the response from the server

    } catch (exception e) {
      system.out.println("Caught exception " + e);
    }
    return iLoad;    // return the load reported from the server 
  }
}

两端口顾问程序

此定制顾问程序样本演示了根据服务器本身的状态和正在同一服务器的另一个端口上运行的不同服务器守护程序的状态来检测该服务器的一个端口的故障的能力。例如,如果端口 80 上的 HTTP 守护程序停止响应,那么您可能还要停止将流量路由至端口 443 上的 SSL 守护程序。

因为此顾问程序将任何未发送响应的服务器都认为已停止运行,并将其标记为已关闭,所以此顾问程序比标准顾问程序更积极。标准顾问程序将未响应的服务器认为是速度非常慢。只要 HTTP 端口和 SSL 端口中的任意一个端口没有响应,那么此顾问程序就会将服务器标记为已对这两个端口关闭。

为了使用此定制顾问程序,管理员启动此顾问程序的两个实例:一个在 HTTP 端口上,另一个在 SSL 端口上。此顾问程序实例化两个静态全局散列表,一个用于 HTTP,另一个用于 SSL。每个顾问程序都会尝试与其服务器守护程序通信并将此事件的结果存储在散列表中。每个顾问程序返回给顾问程序基类的值取决于与本身的服务器守护程序通信的能力以及伙伴顾问程序与其守护程序通信的能力。

将使用下列定制方法。

检测到下列错误情况。

编写此样本是为了链接端口 80(用于 HTTP)和端口 443(用于 SSL),但可以修改此样本以适合任何端口组合。

package CustomAdvisors;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.Date;
import com.ibm.internet.lb.advisors.*;
import com.ibm.internet.lb.common.*;
import com.ibm.internet.lb.manager.*;
import com.ibm.internet.lb.server.SRV_ConfigServer;

//--------
// Define the table element for the hash tables used in this custom advisor

class ADV_nte implements Cloneable {
  private String  sCluster;
  private int     iPort;
  private String  sServer;
  private int     iLoad;
  private Date    dTimestamp;

//--------
// constructor

  public ADV_nte(String sClusterIn, int iPortIn, String sServerIn, 
                 int iLoadIn) {
    sCluster = sClusterIn;
    iPort = iPortIn;
    sServer = sServerIn;
    iLoad = iLoadIn;
    dTimestamp = new Date();
  }

//--------
// check whether this element is current or expired
  public boolean isCurrent(ADV_twop oThis) {
    boolean bCurrent;
    int iLifetimeMs = 3 * 1000 * oThis.getInterval();   // set lifetime as 
                                                        // 3 advisor cycles
    Date dNow = new Date();
    Date dExpires = new Date(dTimestamp.getTime() + iLifetimeMs);

    if (dNow.after(dExpires)) {
      bCurrent = false;
    } else {
      bCurrent = true;
    }
    return bCurrent;
  }

//--------
// value accessor(s)

  public int getLoadValue() { return iLoad; }
  
//--------
// clone (avoids corruption between threads)

  public synchronized Object Clone() {
      try {
      return super.clone();
    } catch (cloneNotSupportedException e) {
      return null;
    }
  }

}

//--------
// define the custom advisor

public class ADV_twop extends ADV_Base 
   implements ADV_MethodInterface, ADV_AdvisorVersionInterface {
 
  static final int ADV_TWOP_PORT_HTTP = 80;
  static final int ADV_TWOP_PORT_SSL = 443;

  //--------
  // define tables to hold port-specific history information

  static HashTable htTwopHTTP = new Hashtable();
  static HashTable htTwopSSL = new Hashtable();

  static final String ADV_TWOP_NAME = "twop";
  static final int ADV_TWOP_DEF_ADV_ON_PORT = 80;
  static final int ADV_TWOP_DEF_INTERVAL = 7;
  static final String ADV_HTTP_REQUEST_STRING = 
    "HEAD / HTTP/1.0\r\nAccept: */*\r\nUser-Agent: " +
    "IBM_LB_Custom_Advisor\r\n\r\n";

  //--------
  // create byte array with SSL client hello message
  
  public static final byte[] abClientHello = {
    (byte)0x80, (byte)0x1c,
    (byte)0x01,               // client hello
    (byte)0x03, (byte)0x00,   // SSL version
    (byte)0x00, (byte)0x03,   // cipher spec len (bytes)
    (byte)0x00, (byte)0x00,   // session ID len (bytes)
    (byte)0x00, (byte)0x10,   // challenge data len (bytes)
    (byte)0x00, (byte)0x00, (byte)0x03,   // cipher spec
    (byte)0x1A, (byte)0xFC, (byte)0xE5, (byte)Ox20,  // challenge data
    (byte)0xFD, (byte)0x3A, (byte)0x3C, (byte)0x18,  
    (byte)0xAB, (byte)0x67, (byte)0xB0, (byte)0x52, 
    (byte)0xB1, (byte)0x1D, (byte)0x55, (byte)0x44, (byte)0x0D, (byte)0x0A };

  //--------
  // constructor

  public ADV_twop() {
    super(ADV_TWOP_NAME, VERSION, ADV_TWOP_DEF_ADV_ON_PORT, 
          ADV_TWOP_DEF_INTERVAL, "", 
          false);    // false = load balancer times the response
    setAdvisor ( this );
  }

  //--------
  // ADV_AdvisorInitialize

  public void ADV_AdvisorInitialize() {
      return;
  }

  //--------
  // synchronized PUT and GET access routines for the hash tables

  synchronized ADV_nte getNte(Hashtable ht, String sName, String sHashKey) {
    ADV_nte nte = (ADV_nte)(ht.get(sHashKey));
    if (null != nte) {
      nte = (ADV_nte)nte.clone();
    }
    return nte;
  }
 synchronized void putNte(Hashtable ht, String sName, String sHashKey, 
                          ADV_nte nte) {
   ht.put(sHashKey,nte);
   return;
 }

  //--------
  // getLoadHTTP - determine HTTP load based on server response

  int getLoadHTTP(int iConnectTime, ADV_Thread caller) {
    int iLoad = ADV_HOST_INACCESSIBLE;

    int iRc = caller.send(ADV_HTTP_REQUEST_STRING);  // send request message 
                                                     // to server   
    if (0 <= iRc) {           // did the request return a failure? 
      StringBuffer sbReceiveData = new StringBuffer("")    // allocate a buffer 
                                                           // for the response
      iRc = caller.receive(sbReceiveData);    // get response from server

      if (0 <= iRc) {             // did the receive return a failure? 
        if (0 < sbReceiveData.length()) {      // is data there? 
          iLoad = SUCCESS;        // ignore retrieved data and  
                                  // return success code
        }
      }
    }
    return iLoad;
  }

  //--------
  // getLoadSSL() - determine SSL load based on server response

  int getLoadSSL(int iConnectTime, ASV_Thread caller) {
    int iLoad = ADV_HOST_INACCESSIBLE;
    int iRc;

    CMNByteArrayWrapper cbawClientHello = new CMNByteArrayWrapper(
                                                  abClientHello);
    Socket socket = caller.getSocket();

    try {
        socket.getOutputStream().write(abClientHello);
        // Perform a receive.
        socket.getInputStream().read();
        // If receive is successful, return load of 0.  We are not concerned with
        // data's contents, and the load is calculated by the ADV_Thread thread.
        iLoad = 0;
    } catch (IOException e) {
        // Upon error, iLoad will default to it.
    }
    return iLoad;
  }

  //--------
  // getLoad - merge results from the HTTP and SSL methods

  public int getLoad(int iConnectTime, ADV_Thread caller) {
    int iLoadHTTP;
    int iLoadSSL;
    int iLoad;
    int iRc;

    String sCluster = caller.getCurrentClusterId();   // current cluster address
    int iPort = getAdviseOnPort();
    String sServer = caller.getCurrentServerId();
    String sHashKey = sCluster = ":" + sServer;     // hash table key

    if (ADV_TWOP_PORT_HTTP == iPort) {              // handle an HTTP server
      iLoadHTTP = getLoadHTTP(iConnectTime, caller);  // get the load for HTTP

      ADV_nte nteHTTP = newADV_nte(sCluster, iPort, sServer, iLoadHTTP);
      putNte(htTwopHTTP, "HTTP", sHashKey, nteHTTP);  // save HTTP load 
                                                      // information
      ADV_nte nteSSL = getNte(htTwopSSL, "SSL", sHashKey);  // get SSL 
                                                            // information 
      if (null != nteSSL) { 
        if (true == nteSSL.isCurrent(this)) {         // check the time stamp
          if (ADV_HOST_INACCESSIBLE != nteSSL.getLoadValue()) {    // is SSL 
                                                                   // working?
            iLoad = iLoadHTTP;
          } else {    // SSL is not working, so mark the HTTP server down
            iLoad= ADV_HOST_INACCESSIBLE;
          }
        } else {      // SSL information is expired, so mark the 
                      // HTTP server down
          iLoad = ADV_HOST_INACCESSIBLE; 
        }
      } else {        // no load information about SSL, report 
                      // getLoadHTTP() results
        iLoad = iLoadHTTP;
      }
    }
    else if (ADV_TWOP_PORT_SSL == iPort) {           // handle an SSL server
      iLoadSSL = getLoadSSL(iConnectTime, caller);   // get load for SSL

      ADV_nte nteSSL = new ADV_nte(sCluster, iPort, sServer, iLoadSSL);
      putNte(htTwopSSL, "SSL", sHashKey, nteSSL);   // save SSL load info.

      ADV_nte nteHTTP = getNte(htTwopHTTP, "SSL", sHashKey);   // get HTTP 
                                                               // information
      if (null != nteHTTP) {
        if (true == nteHTTP.isCurrent(this)) {       // check the timestamp
          if (ADV_HOST_INACCESSIBLE != nteHTTP.getLoadValue()) {  // is HTTP 
                                                                  // working?
            iLoad = iLoadSSL; 
          } else {   // HTTP server is not working, so mark SSL down
            iLoad = ADV_HOST_INACCESSIBLE; 
          }
        } else {     // expired information from HTTP, so mark SSL down
          iLoad = ADV_HOST_INACCESSIBLE; 
        }
      } else {       // no load information about HTTP, report 
                     // getLoadSSL() results
        iLoad = iLoadSSL;
      }
    }

  //--------
  // error handler

    else { 
      iLoad = ADV_HOST_INACCESSIBLE;
    }
    return iLoad;
  }
}

WebSphere Application Server 顾问程序

install_path/servers/samples/CustomAdvisors/ 目录中包含 WebSphere® Application Server 的一个样本定制顾问程序。本文档中未复制完整的代码。

完整的顾问程序仅比该样本稍微复杂一点。它增加了一个专门的解析例程,该例程比上面显示的 StringTokenizer 示例更简洁。

样本代码中较复杂的部分在 Java servlet 中。除了其他方法以外,servlet 还包含 servlet 规范所需要的两个方法(init() 和 service())以及 Java.lang.thread 类所需要的一个方法(run())。

以下是 servlet 代码的相关片段。

...

  public void init(ServletConfig config) throws ServletException {
    super.init(config);
    ...
    _checker = new Thread(this);
    _checker.start();
  }
  
  public void run() {
    setStatus(GOOD);

    while (true) {
      if (!getKeepRunning()) 
        return;
      setStatus(figureLoad());
      setLastUpdate(new java.util.Date());

      try {
        _checker.sleep(_interval * 1000);
      } catch (Exception ignore) { ; }
    }
  }

  public void service(HttpServletRequest req, HttpServletResponse res)
                      throws ServletException, IOException {

    ServletOutputStream out = null;
    try {
      out = res.getOutputStream();
    } catch (Exception e) { ... }
    ...
    res.setContentType("text/x-application-LBAdvisor");
    out.println(getStatusString());
    out.println(getLastUpdate().toString());
    out.flush();
        return;
  }

  ...

使用从顾问程序返回的数据

无论您是使用应用程序服务器的现有部分的标准调用,还是添加新的代码段以作为定制顾问程序的服务器端等效件,您都可能要检查返回的负载值并更改服务器行为。Java StringTokenizer 类及其相关联的方法使此调查容易进行。

一个典型的 HTTP 命令的内容可能是 GET /index.html HTTP/1.0

对此命令的一个典型的响应可能是以下内容。

HTTP/1.1 200 OK
Date: Mon, 20 November 2000 14:09:57 GMT
Server: Apache/1.3.12 (Linux and UNIX)
Content-Location: index.html.en
Vary: negotiate
TCN: choice
Last-Modified: Fri, 20 Oct 2000 15:58:35 GMT
ETag: "14f3e5-1a8-39f06bab;39f06a02"
Accept-Ranges: bytes
Content-Length: 424
Connection: close
Content-Type: text/html
Content-Language: en

<!DOCTYPE HTML PUBLIC "-//w3c//DTD HTML 3.2 Final//EN">
<HTML><HEAD><TITLE>Test Page</TITLE></HEAD>
<BODY><H1>Apache server</H1>
<HR>
<P><P>This Web server is running Apache 1.3.12.
<P><HR>
<P><IMG SRC="apache_pb.gif" ALT="">
</BODY></HTML>

关注的项包含在第一行中,具体来说就是 HTTP 返回码。

HTTP 规范对返回码进行了分类,可以将返回码概述为以下内容:

如果您非常准确地知道服务器有可能返回的代码,那么您的代码可能不需要像此示例那样详细。然而,请记住,限制您检测的返回码可能会限制程序以后的灵活性。

以下示例是一个独立的 Java 程序,它包含一个最低限度的 HTTP 客户机。该示例调用一个简单的通用解析器以检查 HTTP 响应。

import java.io.*;
import java.util.*;
import java.net.*;

public class ParseTest {
  static final int iPort = 80;
  static final String sServer = "www.ibm.com";
  static final String sQuery = "GET /index.html HTTP/1.0\r\n\r\n";
  static final String sHTTP10 = "HTTP/1.0";
  static final String sHTTP11 = "HTTP/1.1";

  public static void main(String[] Arg) {
    String sHTTPVersion = null;
    String sHTTPReturnCode = null;
    String sResponse = null;
    int iRc = 0;
    BufferedReader brIn = null;
    PrintWriter psOut = null;
    Socket soServer= null;
    StringBuffer sbText = new StringBuffer(40);

    try {
      soServer = new Socket(sServer, iPort);
      brIn = new BufferedReader(new InputStreamReader(
                                    soServer.getInputStream()));
      psOut = new PrintWriter(soServer.getOutputStream());
      psOut.println(sQuery);
      psOut.flush();
      sResponse = brIn.readLine();
      try {
        soServer.close();
      } catch (Exception sc) {;}
    }  catch (Exception swr) {;}
    
    StringTokenizer st = new StringTokenizer(sResponse, " ");
    if (true == st.hasMoreTokens()) {
      sHTTPVersion = st.nextToken();
      if (sHTTPVersion.equals(sHTTP110) || sHTTPVersion.equals(sHTTP11)) {
        System.out.println("HTTP Version: " + sHTTPVersion);
      } else {
        System.out.println("Invalid HTTP Version: " + sHTTPVersion);
      }
    } else {
      System.out.println("Nothing was returned");
      return;
    }

    if (true == st.hasMoreTokens()) {
      sHTTPReturnCode = st.nextToken();
      try {
        iRc = Integer.parseInt(sHTTPReturnCode);
      } catch (NumberFormatException ne) {;}

      switch (iRc) {
      case(200): 
        System.out.println("HTTP Response code: OK, " + iRc);
        break;
      case(400): case(401): case(402): case(403): case(404): 
        System.out.println("HTTP Response code: Client Error, " + iRc);
        break; 
      case(500): case(501): case(502): case(503):
        System.out.println("HTTP Response code: Server Error, " + iRc);
        break;
      default: 
        System.out.println("HTTP Response code: Unknown, " + iRc);
        break;
      }
    }

    if (true == st.hasMoreTokens()) {
      while (true == st.hasMoreTokens()) {
        sbText.append(st.nextToken());
        sbText.append("  ");
        }
      System.out.println("HTTP Response phrase: " + sbText.toString());
    }
  }
}