Tutorial 4: Request Resource Allocation in a Cluster and Start Containers Using Threads

This tutorial describes how to create a registered EGO client that requests resource allocation in a cluster and starts containers on the hosts. This sample program uses threads to make the allocation requests to Platform EGO and start containers on the hosts allocated by Platform EGO.

Using this tutorial, you will ...

Underlying principles

This code sample uses global data structures (workloadP, resourcesP, and monitorP) that are accessible by different threads. Since these data structures are considered shared resources, a mutex (mutual exclusion) object is used to prevent simultaneous modification of the data. The mutex object can be locked and unlocked by individual threads, thereby controlling access to the respective data structure. In conjunction with the mutex object, a condition variable enables threads to wait for the data to enter a defined state before accessing the data.

This sample implements three threads: resource, work, and monitor, in addition to the main thread.

The resource thread is responsible for getting the allocation request from the resource queue and making an allocation request to Platform EGO. Once the resource thread enters the wait state, the addResourceCB() and containerStateChgCB() callback methods and finalize() method set the condition variable that enables the resource thread to resume execution. The resource thread cycles through the queue until all allocation requests have been processed.

The work thread is responsible for getting the allocation reply from the work queue, adding the resource to the resource collection structure, and starting a container on the allocated host slot. Once the work thread enters the wait state, the add_resources() (called from the main thread) and finalize() methods set the condition variable that enables the work thread to resume execution. The thread cycles through the queue until containers have been started on all allocated host slots.


Step 1: Preprocessor directives and global variable declarations

The first step is to include a reference to the system and API header files, followed by the declaration of global variables and structures that are implemented in the sample.

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include "vem.api.h"
#include "samples.h"

int samples_shutdown = 0;
work_state_t *workloadP;
resource_state_t *resourcesP;
monitor_state_t *monitorP;


Step 2: Implement the principal method

Lines 4-7: define and initialize a data structure that is used to request a connection with the EGO host cluster. The data structure contains a reference to a configuration file where the master host name and port numbers are stored.

Line 8: pass the data structure as an argument to the vem_open () method, which opens a connection to the master host. If the connection attempt is successful, a handle is returned; otherwise the method returns NULL. The handle acts as a communication channel to the master host and all subsequent communication occurs through this handle.

Lines 15-16: the vem_name_t structure (defined as clusterName) is initialized with NULL. This structure holds the cluster name, system name, and version. The vem_uname () method is passed the communication handle and, if successful, returns a valid vem_name_t structure ; otherwise the method returns NULL.

Line 24: the cluster info is printed out to the screen.

Lines 26-43: define the client info structure. Use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. Note that Platform EGO is equipped with a number of default clients (services) such as the Service Controller, so as a minimum, the info relevant to these clients is printed out and the associated memory is released.

1	 int 
2	 sample4() 
3	 {
4	   vem_openreq_t orequest;
5	   vem_handle_t *vhandle = NULL;
6	   orequest.file = "ego.conf";
7	   orequest.flags=0; 
8	 	 vhandle = vem_open(&orequest);
9	   if (vhandle == NULL) {
10	   	 // error opening
11	    	 fprintf(stderr, "Error opening cluster: %s\n",  vem_strerror(vemerrno));
12	    	 return -1;
13	   }
14	 
15	   vem_name_t *clusterName = NULL;
16	   clusterName = vem_uname(vhandle);
17	   if (clusterName == NULL) {
18	   	 // error connecting
19	    	 fprintf(stderr, "Error connecting to cluster: %s\n", 
20	  vem_strerror(vemerrno));
21	    	 return -2;
22	   }
23	   
24	   fprintf(stdout, " Connected... %s %s %4.2f\n", clusterName->clustername,
25	  clusterName->sysname, clusterName->version);
26	 vem_clientinfo_t *clients;
27	   int  rc = vem_locate(vhandle, NULL, &clients); 
28	   if (rc >=0) {
29	     if (rc == 0) {
30	    	   printf("No registered clients exist\n");
31	     } else {
32	   	   int i=0;
33	   	   for (i=0; i<rc; i++) {
34	    	     printf("%s %s %s\n", clients[i].name, clients[i].description,
35	    	     clients[i].location);
36	   	   }
37	   	   // free
38	   	   vem_clear_clientinfo(clients);  	   
39	     }
40	   } else {
41	   	 // error connecting
42	    	 fprintf(stderr, "Error geting clients: %s\n",  vem_strerror(vemerrno));
43	   }

Lines 44-47: authenticate the user to Platform EGO.

Lines 48-52: define and initialize a structure for callback methods. These callback methods are invoked by Platform EGO when resources are added or reclaimed, or when a change occurs to host status or a container. When Platform EGO wants to communicate about these events, it invokes these methods thereby calling back to the client.

44	   if (login(vhandle, username, password)<0) {
45	    	 fprintf(stderr, "Error logon: %s\n",  vem_strerror(vemerrno));
46	    	 goto leave;
47	 }
48	   vem_clientcallback_t cbf;
49	   cbf.addResource = addResourceCB;
50	   cbf.reclaimForce = reclaimForceCB;
51	   cbf.containerStateChg = containerStateChgCB;
52	   cbf.hostStateChange = hostStateChangeCB;

Lines 53-67: define the vem_allocation_info_reply_t and vem_container_info_reply_t structures. If a client gets disconnected and then re-registers, its existing allocations and containers are returned to these structures. If the client had never registered before, the structures would be empty. Define and initialize a structure (rreq) that holds client info for registration purposes. (This includes assigning the client callback structure (cbf) to the callback member of the rreq structure.) Register with Platform EGO via the open connection using vem_register().

53	 vem_allocation_info_reply_t aireply;
54	   vem_container_info_reply_t  cireply;
55	   vem_registerreq_t rreq;
56	 
57	   rreq.name = "sample4_client";
58	   rreq.description = "Sample4 Client";
59	   rreq.flags = VEM_REGISTER_TTL;
60	   rreq.ttl = 3;
61	   rreq.cb = &cbf;
62	   
63	   rc = vem_register(vhandle, &rreq, &aireply, &cireply);
64	   if (rc < 0) {
65	     fprintf(stderr, "Error registering: %s\n",  vem_strerror(vemerrno));  	 
66	    	 goto leave;
67	   }

Lines 68-71: print out information related to the allocation requests and containers. Once the info is printed out, the memory for the allocations is freed.

Lines 75-82: the vem_gethostgroupinfo() method collects the information for the requested hostgroup. In this case, the requested hostgroup in the input argument is set to NULL, which means that information about all hostgroups is requested. If the method call is successful, hostgroup information is printed out to the screen.

68	   print_vem_allocation_info_reply(&aireply);
69	   print_vem_container_info_reply(&cireply);
70	   // freeup any previous allocations
71	   release_vem_allocation(vhandle, &aireply); 
72	 vem_hostgroupreq_t hgroupreq;
73	   hgroupreq.grouplist = NULL;
74	   vem_hostgroup_t *hgroup;
75	   rc = vem_gethostgroupinfo(vhandle, &hgroupreq, &hgroup); 
76	   if (rc < 0) {
77	     fprintf(stderr, "Error getting hostgroup: %s\n", 
78	  vem_strerror(vemerrno));  	 
79	   } else {
80	   	 printf("%s %s %d %d\n", hgroup->groupName, hgroup->members, hgroup->free,
81	  hgroup->allocated);
82	   }

Lines 83-95: define and initialize structures for the workload, resources and monitor threads. These structures are global in scope.

Lines 96-105: create and run the three threads.

Line 107-115: get half the number of available host slots and make a corresponding number of resource allocation requests via the add_resources() method. This method adds a new allocation request to the resource queue and increments the queue index (next_item). The add_resources() method also sets the condition variable, which tells the waiting resource_thread that a new allocation request has been added to the resource queue. The resource_thread resumes execution and the resource_mutex object is unlocked.

When a resource is added, the addResourceCB() callback method is executed. The callback method adds the allocation reply structure to the workload queue at position next_item and increments the index (next_item). The condition variable is set, which tells the waiting work_thread that a new allocation reply has been added to the workload queue. The work_thread resumes execution and the work_mutex object is unlocked. The allocation reply is also printed out.

83	  pthread_t worker_thread, resource_thread, monitor_thread;
84	   work_state_t workload;
85	   resource_state_t resources;
86	   monitor_state_t monitor;
87	 
88	   // globals so that callback functions can find the queues/lock/cond var
89	   workloadP = &workload;
90	   resourcesP = &resources;
91	   monitorP = &monitor;
92	   
93	   initialize_workload(&workload, vhandle);
94	   initialize_resources(&resources, vhandle);
95	   initialize_monitor(&monitor, vhandle);
96	 if (pthread_create(&worker_thread, NULL, work_thread_fn, &workload)) {
97	     perror("Error creating worker thread: ");  	 
98	   }
99	 if (pthread_create(&resource_thread, NULL, resource_thread_fn,
100	  &resources)) {
101	     perror("Error creating resource thread: ");  	 
102	   }
103	 if (pthread_create(&monitor_thread, NULL, monitor_thread_fn, &monitor)) {
104	     perror("Error creating monitor thread: ");  	 
105	   }
106	 // Request half of them, one if just one is available
107	   int numavailable = getNumberOfHostSlotsAvailable(vhandle);
108	   fprintf(stderr, "Available Slots=%d\n", numavailable);
109	   if(numavailable > 0) {
110	   	 int num_request = (numavailable / 2) > 1 ? (numavailable / 2): 1; //3;
111	     vem_allocreq_t *aloc_spec = get_alloc_spec();
112	     // aloc_spec->maxslots = 1;
113	 // add to request Q
114	     add_resources(num_request, aloc_spec);
115	   }

Lines 116-117: pause the main thread for 180 milliseconds. The finalize() method sets the samples_shutdown flag to 1 and sets the condition variable for all three threads. The shutdown flag causes the three threads to end execution.

Lines 119-126: block the main thread until all three threads have finished. Clean up the thread states by destroying the mutex object and condition variable associated with each thread.

Lines 128-143: use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. Note that Platform EGO is equipped with a number of default clients (services) such as the Service Controller, so as a minimum, the info relevant to these clients is printed out and the associated memory is released.

116	 sleep(180);
117	 finalize();
118	 // wait for worker, resource, monitor threads to finish
119	 pthread_join(worker_thread, NULL);
120	   pthread_join(resource_thread, NULL);
121	   pthread_join(monitor_thread, NULL);
122	 
123	   // clean up thread states
124	   finalize_workload(workloadP);
125	   finalize_resources(resourcesP);
126	   finalize_monitor(monitorP);
127	 
128	   rc = vem_locate(vhandle, NULL, &clients); 
129	   if (rc >=0) {
130	     if (rc == 0) {
131	    	   printf("No registered clients exist\n");
132	     } else {
133	   	   int i=0;
134	   	   for (i=0; i<rc; i++) {
135	    	     printf("%s %s %s\n", clients[i].name, clients[i].description,
136	    	     clients[i].location);
137	   	   }
138	       vem_clear_clientinfo(clients);  
139	     }
140	   } else {
141	   	 // error connecting
142	    	 fprintf(stderr, "Error geting clients: %s\n",  vem_strerror(vemerrno));
143	   }
144	  bailout:
145	   rc = vem_unregister(vhandle);
146	   if (rc < 0) {
147	     	 fprintf(stderr, "Error unregistering: %s\n",  vem_strerror(vemerrno));  	 
148	   }
149	   if (logout(vhandle)<0) {
150	    	 fprintf(stderr, "Error logoff: %s\n",  vem_strerror(vemerrno));
151	   }
152	 
153	 leave:  
154	   // free memory
155	   vem_free_uname(clusterName);    
156	   vem_close(vhandle);
157	   
158	   return 0;
159	 } 


Step 3: Make resource allocation requests to Platform EGO (resource thread)

Lock the resource_mutex and wait for the condition variable to be set. Once the condition variable is set by the addResourceCB() callback method and thread execution resumes, get the allocation request from the resource queue using the work_items index. Increment the index. Make an allocation request to Platform EGO and retrieve and store the allocation ID. Continue this cycle until all the allocation requests in the resource queue have been processed.

void *resource_thread_fn(resource_state_t *resourcesP)
{
while(!samples_shutdown) {
     pthread_mutex_lock(&resourcesP->resource_mutex);
     resourcesP->ready = 1;
     pthread_cond_wait(&resourcesP->resource_cond,
 &resourcesP->resource_mutex);
     fprintf(stderr, "Need to allocate?\n");
     while(resourcesP->work_items < resourcesP->next_item) {
 	    // deque alocspec
   	    vem_allocreq_t *aloc_spec =
 resourcesP->queue[resourcesP->work_items++].alocreq;
       vem_allocation_id_t alocid = get_resource(resourcesP->vhandle,
 aloc_spec);
       if(alocid != NULL) {
    	  // add to the allocated ids
    	  if(resourcesP->num_resources < MAX_RESOURCES) {
           resourcesP->alocids[resourcesP->num_resources++] = alocid;
    	   } else {
    	 	 // TODO Grow
    	 	 fprintf(stderr, "Exceeded Limit\n");
    	 	 finalize();
    	   }
       } else {
       	 fprintf(stderr, "Could not allocate\n");
       }
     } /* else {
    	 // no new request, maybe shutdown request
       }*/	 
   	 pthread_mutex_unlock(&resourcesP->resource_mutex);
	 // if done   	 
   }	 
  // free alocid memory
fprintf(stderr, "ResourceThread Shutdown\n"); 	 
   return NULL;
}


Step 4: Get resource allocation reply from Platform EGO and start containers (work thread)

Lock the work_mutex and wait for the condition variable to be set. Once the condition variable is set and thread execution resumes, retrieve the allocation reply from the work queue using the work_items index. Increment the index and retrieve the container specification.

The addto_monitor_resource() method is called for each allocated host, which adds the host name, allocation ID, and allocation state to the resource collection. The method also increments the resources num counter (monitorP->resources->num).

For each slot in each host, start a container using the allocation ID, host name, and container specification. If successful, increment the running containers counter. Unlock the work_mutex.

void *work_thread_fn(work_state_t *workloadP)
{

   while(!samples_shutdown /* && workloadP->num_containers_running > 0 */) {
   	 // wait until a resource host is allocated
    pthread_mutex_lock(&workloadP->work_mutex);
    workloadP->ready = 1;
   	 pthread_cond_wait(&workloadP->work_cond, &workloadP->work_mutex);

    fprintf(stderr, "Received Resource?\n");
    int rc = 0;
    // pick out from queue
    while (workloadP->work_items < workloadP->next_item) {
      vem_allocreply_t * allocReply =
 &workloadP->queue[workloadP->work_items++];
      int i=0, j=0;
      vem_container_spec_t *conspec = get_container_spec();
      
      for(i=0; i<allocReply->nhost; i++) {
      	 
      	 addto_monitor_resource(monitorP, allocReply->host[i].name,
 allocReply->allocId);
      	 
      	 for(j=0; j<allocReply->host[i].slots; j++) {
          rc= startContainer(workloadP->vhandle, allocReply->allocId,
 allocReply->host[i].name, conspec);
          // if successful
          if (!rc) {
            ++workloadP->num_containers_running;
          }
      	 }
      }
     } /* else {
     	 // probably woken up as a container has changed state
    	 fprintf(stderr, "Nothing to do\n");
     } */
   	 pthread_mutex_unlock(&workloadP->work_mutex);
   }	 
   fprintf(stderr, "WorkThread Shutdown\n"); 	 
   return NULL;
}


Step 5: Calculate the average host load (monitor thread)

Lock the monitor_mutex and get the current time. The thread now waits for either a host state change to be signalled by Platform EGO or the wait time to expire. If a state change occurs, the corresponding callback method (hostStateChangeCB) is invoked by Platform EGO, which updates the host state in the resource collection. The condition variable is then set to reactivate the monitor thread. If the thread resumes execution due to wait time expiration, the average computer load is calculated and printed out.

void *monitor_thread_fn(monitor_state_t *monitorP)
{
   struct timespec timeout;
   struct timeval  now;
   int rc;
     
   while(!samples_shutdown) {
   	 // wait until change in host/container status is received
   	 pthread_mutex_lock(&monitorP->monitor_mutex);
    monitorP->ready = 1;
gettimeofday(&now);
    timeout.tv_sec = now.tv_sec + 30;
    timeout.tv_nsec = now.tv_usec *1000;
	 rc = pthread_cond_timedwait(&monitorP->monitor_cond,
 &monitorP->monitor_mutex, &timeout);
   	 
    // Currently no way to get container from id.
    //print_vem_container(vem_container_t *container);
    if(rc == ETIMEDOUT) {
      vem_hostinfo_t *hinfo = NULL;
      char **attrs;
      int numh;
      double *loads;
   	   double load = computeAverageLoad(monitorP, &numh, &hinfo, &attrs, &loads);
   	   fprintf(stderr, "\nMonitor: Avg. Load =%6.2f\n", load);
   	   if(hinfo != NULL) {
   	   	 free(loads);
   	     vem_free_hostinfo(hinfo, numh, attrs);
   	   }
    } else {
    	 // we were signaled
   	    fprintf(stderr, "Received Event?\n");
    }
   	 pthread_mutex_unlock(&monitorP->monitor_mutex);
   	 
   	 // update activity information
   }
   fprintf(stderr, "MonitorThread Shutdown\n"); 	 
   return NULL;
}


Step 6: Client callback methods

These callback methods are invoked by Platform EGO when resources are added or reclaimed, or when a change occurs to host status or a container. When Platform EGO wants to communicate about these events, it invokes these methods thereby calling back to the client.

The addResourceCB() method locks the work_mutex object. The method then adds the allocation reply structure to the workload queue at position next_item and increments the index (next_item). The condition variable is set, which tells the waiting work_thread that a new allocation reply has been added to the workload queue. The work_mutex object is unlocked and the allocation reply is printed out.

int 
addResourceCB(vem_allocreply_t *areply)
{
	 printf("addResource Call Back\n");
    
    pthread_mutex_lock(&workloadP->work_mutex);
    // check if thread is ready?
    while(!workloadP->ready) {
      pthread_mutex_unlock(&workloadP->work_mutex);
      sleep(1);
      pthread_mutex_lock(&workloadP->work_mutex);
    }

    // add to queue
    if(workloadP->next_item < MAX_CONTAINERS) {
      vem_allocreply_clone_deep(&workloadP->queue[workloadP->next_item++],
 areply);
    } else {
    	 //TODO increase capacity
    	 fprintf(stderr, "Exceeded limit");
    	 finalize();
    }
    pthread_cond_signal(&workloadP->work_cond);
    pthread_mutex_unlock(&workloadP->work_mutex);
    	 
    print_vem_allocreply(areply);
    return 0;
}

The containerStateChgCB() method prints out the container ID and the new container state.

The work_mutex is locked and the method cycles through the list of containers and finds the container ID associated with the state change. If the new state indicates that the container has finished running, the workloadP->container_state is updated with the new state and the total number of running containers is decremented. The condition variable is set to signal the work_thread to resume execution and the work_mutex object is unlocked.

int 
containerStateChgCB(vem_containerstatechg_t *cschange)
{
	 printf("containerStateChg Call Back\n");
	 printf("%s %d\n", cschange->containerId, cschange->newState);
	 
	 pthread_mutex_lock(&workloadP->work_mutex);
	 update_container_state(workloadP, cschange);
    pthread_cond_signal(&workloadP->work_cond);
    pthread_mutex_unlock(&workloadP->work_mutex);
    
    return 0;
}

The hostStateChangeCB() method prints out the host name and the new host state. After locking the monitor_mutex, the method calls the update_host_state() method which, in turn, updates the resource collection with the new host state info. The condition variable is set, which signals to the waiting monitor_thread to resume execution.

int 
hostStateChangeCB(vem_hoststatechange_t *hschange)
{
	 printf("hostStateChange Call Back\n");
	 printf("%s %d\n", hschange->name, hschange->newState);
	 pthread_mutex_lock(&monitorP->monitor_mutex);
	 update_host_state(monitorP, hschange);
    pthread_cond_signal(&monitorP->monitor_cond);
    pthread_mutex_unlock(&monitorP->monitor_mutex);
	 
    return 0;
}


Step 7: Calculate the average activity load on the resources

In order to determine if a resource is too busy to receive jobs, a load index value is calculated and compared to a corresponding load threshold parameter. The following code retrieves and processes the load index value for r1m, the 1-minute CPU run queue length.

The method cycles through each resource in the resource collection and checks the allocation status. If the resource is allocated, a counter is incremented that keeps track of the number of allocated resources. The resource name is also stored (hostlist array), which is assigned to the hinforeq structure. Since the host list is used as an input to vem_getHostInfo(), the method will only update info for hosts in the list, as well as return the number of allocated hosts.

The getLoadAttribute() method cycles through the list of host attributes looking for the r1m load index. (The r1m load index represents the average number of processes ready to use the CPU during a one-minute interval.) This method returns the attribute array index corresponding to load index r1m. The value for r1m is retrieved for each allocated resource and converted to a double data type. These values are stored in an array as well as added to a variable (load), which is used as an accumulator. The total sum is then divided by the number of allocated resources to yield the average load index value of r1m.

double computeAverageLoad(monitor_state_t *monitorP, int *nhostsP, vem_hostinfo_t 
**hinfoP, char*** host_attributesP, double **loadsP)
{
  double load = 0.0;
  double *loads = NULL;
  
  // assumes caller has acquired the lock
  // finds allocated hosts
  resource_collection_t *resources = monitorP->resources;
  int i, j=0, count=0;
  for(i=0; i<resources->num; i++){
  	 if(resources->allocstate[i] == RESOURCE_ALLOCATED) {
  	   count++;	 
  	 }
  }
/* No hosts allocated */
  if(count == 0) {
  	 *nhostsP          = 0;
    *host_attributesP = NULL;
   	 *hinfoP = NULL;
   	 *loadsP = NULL;
  	 return 0;
  }
char **hostlist              = calloc(count, sizeof(char*));
  
  for(i=0; i<resources->num; i++){
  	 if(resources->allocstate[i] == RESOURCE_ALLOCATED) {
  	   hostlist[j]     = resources->names[i]; 
  	   j++;
  	 }
  }

int hin;
  vem_hostinforeq_t hinforeq;
  hinforeq.resreq = "";        // resource string, unimplemented
  hinforeq.hostlist = hostlist; 
  vem_hostinfo_t *hinfo = NULL; // out parameter, set by vem_getHostInfo
  char **attrs = NULL;          // out parameter, set by vem_getHostInfo
hin = vem_getHostInfo(monitorP->vhandle, &hinforeq, &hinfo, &attrs);   	 
if (hin < 0) {
  	 // error
   	 fprintf(stderr, "Error getting hostinfo: %s %d\n",  vem_strerror(vemerrno),
 hin);
    *nhostsP    = hin;
    *host_attributesP = attrs;
   	 *hinfoP = NULL;
   	 *loadsP = loads;
   	 return -1.0;
  } else {
    //TODO are the hosts in hinfo, in the same order as hinforeq?
    print_hostInfo(hin, hinfo, attrs);
    // find load attribute
    int index = get_load_attribute("r1m", attrs);
    loads = calloc(hin, sizeof(double));
    double l;
    for(i=0; i<hin; i++) {
    	 l = get_load(&hinfo[i], index);
    	 loads[i] = l;
    	 load += l;
    }
free(hostlist);
  }
  *nhostsP    = hin;
  *hinfoP     = hinfo;
  *host_attributesP = attrs;
  *loadsP = loads;
  return (count == 0) ? 0 : load / count;
}

double get_load(vem_hostinfo_t *hinfo, int index)
{
	 double rv=-1.0;
    vem_value_t value;
    value = hinfo->attributes[index];
    rv = value.value.v_float32;
    return rv;
}

int get_load_attribute(char *load, char **attrs)
{
  int j=-1;
  while(attrs[++j] != NULL) {
  	 if(!strcmp(load, attrs[j])) {
  	 	 return j;
  	 }
  }
  return j;
}


Run the client application

  1. Select Run > Run.

    The Run dialog appears.

  2. In the Configurations list, either select an EGO C Client Application or click New for a new configuration.

    For a new configuration, enter the configuration name.

  3. Enter the project name and C/C++ Application name.
  4. Click Apply and then Run.

Sample output

[ Top ]


      Date Modified: November 30, 2006
Platform Computing: www.platform.com

© 1994-2006 Platform Computing Corporation. All rights reserved.