Learn more about Platform products at http://www.platform.com



Tutorial 5: Request Resource Allocation in a Cluster and Start Containers Based on Host Loading

This tutorial describes how to create a registered EGO client that requests resource allocation in a cluster and starts containers on the hosts. Based on host loading, the program either adds more resources or releases some.

Using this tutorial, you will ...

Underlying principles

Refer to Tutorial 4: Underlying principles for a description of the multi-thread technique used in this sample.


Step 1: Preprocessor directives

The first step is to include a reference to the system and API header files. The samples.h header file contains the method declarations that are common to all of the samples.

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/time.h>
#include "vem.api.h"
#include "samples.h"


Step 2: Implement the principal method

Lines 4-7: define and initialize a data structure that is used to request a connection with the EGO host cluster. The data structure contains a reference to a configuration file where the master host name and port numbers are stored.

Line 8: pass the data structure as an argument to the vem_open () method, which opens a connection to the master host. If the connection attempt is successful, a handle is returned; otherwise the method returns NULL. The handle acts as a communication channel to the master host and all subsequent communication occurs through this handle.

Lines 16-17: the vem_name_t structure (defined as clusterName) is initialized with NULL. This structure holds the cluster name, system name, and version. The vem_uname () method is passed the communication handle and, if successful, returns a valid vem_name_t structure ; otherwise the method returns NULL

Line 24: the cluster info is printed out to the screen.

Lines 27-43: locate all the registered clients and print out the client info (name, description, and location). Define the client info structure. Use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. Note that Platform EGO is equipped with a number of default clients (services) such as the Service Controller, so as a minimum, the info relevant to these clients is printed out and the associated memory is released.

1	 int 
2	 sample5() 
3	 {
4	   vem_openreq_t orequest;
5	   vem_handle_t *vhandle = NULL;
6	   orequest.file = "ego.conf";
7	   orequest.flags=0;
8	   vhandle = vem_open(&orequest);
9	 
10	   if (vhandle == NULL) {
11	   	 // error opening
12	    	 fprintf(stderr, "Error opening cluster: %s\n",  vem_strerror(vemerrno));
13	    	 return -1;
14	   }
15	 
16	   vem_name_t *clusterName = NULL;
17	   clusterName = vem_uname(vhandle);
18	   if (clusterName == NULL) {
19	   	 // error connecting
20	    	 fprintf(stderr, "Error connecting to cluster: %s\n", 
21	  vem_strerror(vemerrno));
22	    	 return -2;
23	   }
24	 fprintf(stdout, " Connected... %s %s %4.2f\n", clusterName->clustername,
25	  clusterName->sysname, clusterName->version);
26	 vem_clientinfo_t *clients;
27	   int  rc = vem_locate(vhandle, NULL, &clients); 
28	   if (rc >=0) {
29	     if (rc == 0) {
30	    	   printf("No registered clients exist\n");
31	     } else {
32	   	   int i=0;
33	   	   for (i=0; i<rc; i++) {
34	    	     printf("%s %s %s\n", clients[i].name, clients[i].description,
35	    	     clients[i].location);
36	   	   }
37	   	   // free
38	   	   vem_clear_clientinfo(clients);  	   
39	     }
40	   } else {
41	   	 // error connecting
42	    	 fprintf(stderr, "Error geting clients: %s\n",  vem_strerror(vemerrno));
43	   }

Lines 44-47: authenticate the user to Platform EGO.

Lines 48-52: define and initialize a structure for callback methods. These callback methods are invoked by Platform EGO when resources are added or reclaimed, or when a change occurs to host status or a container. When Platform EGO wants to communicate about these events, it invokes these methods thereby calling back to the client.

Lines 53-67: define the vem_allocation_info_reply_t and vem_container_info_reply_t structures. If a client gets disconnected and then re-registers, its existing allocations and containers are returned to these structures. If the client had never registered before, the structures would be empty. Define and initialize a structure (rreq) that holds client info for registration purposes. (This includes assigning the client callback structure (cbf) to the callback member of the rreq structure.) Register with Platform EGO via the open connection using vem_register().

Lines 68-71: print out information related to the allocation requests and containers. Once the info is printed out, the memory for the allocations is freed.

Lines 75-82: the vem_gethostgroupinfo() method collects the information for the requested hostgroup. In this case, the requested hostgroup in the input argument is set to NULL, which means that information about all hostgroups is requested. If the method call is successful, hostgroup information is printed out to the screen.

44	   if (login(vhandle, username, password)<0) {
45	    	 fprintf(stderr, "Error logon: %s\n",  vem_strerror(vemerrno));
46	    	 goto leave;
47	   }
48	 vem_clientcallback_t cbf;
49	   cbf.addResource = addResourceCB;
50	   cbf.reclaimForce = reclaimForceCB;
51	   cbf.containerStateChg = containerStateChgCB;
52	   cbf.hostStateChange = hostStateChangeCB;
53	 vem_allocation_info_reply_t aireply;
54	   vem_container_info_reply_t  cireply;
55	   vem_registerreq_t rreq;
56	 
57	   rreq.name = "sample5_client";
58	   rreq.description = "Sample5 Client";
59	   rreq.flags = VEM_REGISTER_TTL;
60	   rreq.ttl = 3;
61	   rreq.cb = &cbf;
62	   
63	   rc = vem_register(vhandle, &rreq, &aireply, &cireply);
64	   if (rc < 0) {
65	     fprintf(stderr, "Error registering: %s\n",  vem_strerror(vemerrno));  	 
66	    	 goto leave;
67	   }
68	   print_vem_allocation_info_reply(&aireply);
69	   print_vem_container_info_reply(&cireply);
70	   // freeup any previous allocations
71	   release_vem_allocation(vhandle, &aireply);
72	 vem_hostgroupreq_t hgroupreq;
73	   hgroupreq.grouplist = NULL;
74	   vem_hostgroup_t *hgroup;
75	   rc = vem_gethostgroupinfo(vhandle, &hgroupreq, &hgroup); 
76	   if (rc < 0) {
77	     fprintf(stderr, "Error getting hostgroup: %s\n", 
78	  vem_strerror(vemerrno));  	 
79	   } else {
80	   	 printf("%s %s %d %d\n", hgroup->groupName, hgroup->members, hgroup->free,
81	  hgroup->allocated);
82	   }

Lines 83-95: define and initialize structures for the workload, resources and monitor threads. These structures are global in scope.

Lines 97-107: create and run the three threads. Refer to Tutorial 4: Underlying principles for further details of the workload and resource threads.

Lines 109-117: get half the number of available host slots and make a corresponding number of resource allocation requests via the add_resources() method. This method adds a new allocation request to the resource queue and increments the queue index (next_item). The add_resources() method also sets the condition variable, which tells the waiting resource_thread that a new allocation request has been added to the resource queue. The resource_thread resumes execution and the resource_mutex object is unlocked.

83	 pthread_t worker_thread, resource_thread, monitor_thread;
84	   work_state_t workload;
85	   resource_state_t resources;
86	   monitor_state_t monitor;
87	 
88	   // globals so that callback functions can find the queues/lock/cond var
89	   workloadP = &workload;
90	   resourcesP = &resources;
91	   monitorP = &monitor;
92	   
93	   initialize_workload(&workload, vhandle);
94	   initialize_resources(&resources, vhandle);
95	   initialize_monitor(&monitor, vhandle);
96	   
97	 if (pthread_create(&worker_thread, NULL, work_thread_fn, &workload)) {
98	     perror("Error creating worker thread: ");  	 
99	   }
100	 if (pthread_create(&resource_thread, NULL, resource_thread_fn,
101	  &resources)) {
102	     perror("Error creating resource thread: ");  	 
103	   }
104	 if (pthread_create(&monitor_thread, NULL, monitor_thread_extended_fn,
105	  &monitor)) {
106	     perror("Error creating monitor thread: ");  	 
107	   }
108	 // Request half of them, one if just one is available
109	   int numavailable = getNumberOfHostSlotsAvailable(vhandle);
110	   fprintf(stderr, "Available Slots=%d\n", numavailable);
111	   if(numavailable > 0) {
112	   	 int num_request = 4; //(numavailable / 2) > 1 ? (numavailable / 2): 1; //3;
113	     vem_allocreq_t *aloc_spec = get_alloc_spec();
114	     // aloc_spec->maxslots = 1;
115	 // add to request Q
116	     add_resources(num_request, aloc_spec);
117	   }

When a resource is added, the addResourceCB() callback method is executed. The callback method adds the allocation reply structure to the workload queue at position next_item and increments the index (next_item). The condition variable is set, which tells the waiting work_thread that a new allocation reply has been added to the workload queue. The work_thread resumes execution and the work_mutex object is unlocked. The allocation reply is also printed out.

Lines 118-119: pause the main thread for 180 milliseconds. The finalize() method sets the shutdown flag to 1 and sets the condition variable for all three threads. The shutdown flag causes the three threads to end execution.

Lines 122-128: block the main thread until all three threads have finished. Clean up the thread states by destroying the mutex object and condition variable associated with each thread.

Lines 130-145: use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. If successful, print out the client info and free the associated memory.

118	 sleep(180);
119	 finalize();
120	   // wait for worker, resource, monitor threads to be finish
121	   
122	   pthread_join(worker_thread, NULL);
123	   pthread_join(resource_thread, NULL);
124	   pthread_join(monitor_thread, NULL);
125	 // clean up thread states
126	   finalize_workload(workloadP);
127	   finalize_resources(resourcesP);
128	   finalize_monitor(monitorP);
129	 
130	   rc = vem_locate(vhandle, NULL, &clients); 
131	   if (rc >=0) {
132	     if (rc == 0) {
133	    	   printf("No registered clients exist\n");
134	     } else {
135	   	   int i=0;
136	   	   for (i=0; i<rc; i++) {
137	    	     printf("%s %s %s\n", clients[i].name, clients[i].description,
138	    	     clients[i].location);
139	   	   }
140	       vem_clear_clientinfo(clients);  
141	     }
142	   } else {
143	   	 // error connecting
144	    	 fprintf(stderr, "Error geting clients: %s\n",  vem_strerror(vemerrno));
145	   }
146	 bailout:
147	   rc = vem_unregister(vhandle);
148	   if (rc < 0) {
149	     	 fprintf(stderr, "Error unregistering: %s\n",  vem_strerror(vemerrno));  	 
150	   }
151	 if (logout(vhandle)<0) {
152	    	 fprintf(stderr, "Error logoff: %s\n",  vem_strerror(vemerrno));
153	   }
154	 leave:  
155	   // free memory
156	   vem_free_uname(clusterName);    
157	   vem_close(vhandle);
158	 return 0;
159	 } 


Step 3: Add or release resources based on average host load (monitor thread)

Lock the monitor_mutex and get the current time. The thread now waits for either a host state change to be signalled by Platform EGO or the wait time to expire. If a state change occurs, the corresponding callback method (hostStateChangeCB) is invoked by Platform EGO, which updates the host state in the resource collection. The condition variable is then set to reactivate the monitor thread. If the thread resumes execution due to wait time expiration, the average computer load is calculated and printed out; refer to Tutorial 4: Step 7: Calculate the average activity load on the resources

If the average host load is greater than a predetermined threshold, an additional resource is allocated via the add_resources() method; refer to Tutorial 4: Step 6: Client callback methods for further details of the add_resources() method. If the average computer load is less than a predetermined threshold and the number of allocated hosts is greater than 0, release a resource via the release_resources() method; refer to the next step.

void *monitor_thread_extended_fn(monitor_state_t *monitorP)
{
   struct timespec timeout;
   struct timeval  now;
   int rc;
	 
   while(!samples_shutdown) {
   	 // wait until change in host/container status is received
   	 pthread_mutex_lock(&monitorP->monitor_mutex);
    monitorP->ready = 1;
    
    gettimeofday(&now, NULL);
    timeout.tv_sec = now.tv_sec + 30;
    timeout.tv_nsec = now.tv_usec *1000;
    
   	 rc = pthread_cond_timedwait(&monitorP->monitor_cond, 
&monitorP->monitor_mutex, &timeout);
   	 
    // Currently no way to get container from id.
    //print_vem_container(vem_container_t *container);
    if (rc == ETIMEDOUT) {
      char **attrs = NULL;
      int numh = -1;
      vem_hostinfo_t *hinfo = NULL;
      double *loads = NULL;
   	  

 	   double load = computeAverageLoad(monitorP, &numh, &hinfo, &attrs, &loads);
      fprintf(stderr, "\n Monitor: Avg. Load =%6.2f\n", load);
   	 
   	   // update resources based on load
   	   int delta = 1;
	   vem_allocreq_t *aloc_spec = get_alloc_spec();
    	   if(load > UPPER_THRESHOLD) {
   	     add_resources(delta, aloc_spec);	 
   	   } else if (load < LOWER_THRESHOLD && numh > 0) {
   	     release_resources(monitorP->vhandle, delta, aloc_spec, numh, hinfo, 
attrs, loads);
   	   }
      if(hinfo != NULL) {
        vem_free_hostinfo(hinfo, numh, attrs);
      }
   	   if(loads != NULL) {
   	     free(loads);
      }	    	   
    } else {
      	 fprintf(stderr, "Received Event?\n"); 	 
    }
    pthread_mutex_unlock(&monitorP->monitor_mutex);
   }
   fprintf(stderr, "MonitorThread Shutdown\n"); 	 
   return NULL;
}


Step 4: Release resources from Platform EGO

The release_resources() method determines the least loaded host and makes a request to Platform EGO to release this allocated resource.

Define an array of structures to hold the individual host loads and associated index. Use the qsort() method to sort the host loads in ascending order.

For each resource to be released and starting with the least loaded host, get the host name and number of slots.

Get the index of the least loaded host in the resource collection. The find_host_index() method looks for the matching host name in the resource collection, and if allocated, returns the index.

Define the release request structure with information such as the allocation ID of the host to release, as well as number of hosts/slots to release. Pass this structure to the vem_release() method, which causes Platform EGO to release the resource(s). The remove_monitor_resource() method searches in the resource collection for the name of the host to be released. When a match is found, the host's allocation state is updated to released status, i.e., unallocated.

int release_resources(vem_handle_t* vhandle, int delta, vem_allocreq_t 
*alocreq, int num_hosts, vem_hostinfo_t *hinfo, char **attrs, double *loads)
{
  // which ones to release? Sort the load and release the ones that are least
 used
  // create a struct of load and index to sort
  int e, index, hindex, i;
  sort_element_t *element_array = calloc(num_hosts, sizeof(sort_element_t));

  for(e=0; e<num_hosts; e++) {
    element_array[e].load  = loads[e];  	 
    element_array[e].index = e;
  }
qsort(element_array, num_hosts, sizeof(sort_element_t), sort_fn);
vem_host_t             host;
   vem_releasereq_t       release;
   resource_collection_t *resources = monitorP->resources;
for(i=0; i<delta; i++) {
     index = element_array[i].index;
     host.name  = hinfo[index].name;
     host.slots = 1;                      // should be made generic
     hindex = find_host_index(resources, hinfo[index].name);
     release.allocId = resources->allocId[hindex];
     release.nhosts  = 1;
     release.hosts   = &host;
     release.flags   = VEM_RELEASE_AUTOADJ;
int rc = vem_release(vhandle, &release);
     if (rc < 0) {
   	   fprintf(stderr, "Error releasing resource: %s\n", 
 vem_strerror(vemerrno));
     }
     
     //mark them as released in the monitor host list
     if(rc == 0) {
   	    remove_monitor_resource(monitorP, host.name);
   	  }	 
   }
free(element_array);   
   return 0;
}

int sort_fn(const void *e1, const void *e2)
{
  sort_element_t *d1 = (sort_element_t *)e1; 	 
  sort_element_t *d2 = (sort_element_t *)e2; 
  return (d1->load == d2->load) ? 0 : ( (d1->load < d2->load) ? -1 : +1);	 
}

int find_host_index(resource_collection_t *resources, char *hname)
{
	 int i, index=-1;
	 
	 if (hname == NULL) return index;
	 
	 for(i=0; i<resources->num; i++) {
	 	 if(!strcmp(hname, resources->names[i]) && (resources->allocstate[i] ==
 RESOURCE_ALLOCATED)) {
	 	 	 return i;
	 	 }
	 }
  return index;
}

void remove_monitor_resource(monitor_state_t *monitorP, char *hostname)
{
	 fprintf(stderr, "remove host\n");
	 // assumes the caller acquired the lock
	 int i;
	 resource_collection_t *resources = monitorP->resources;
	 if(hostname == NULL) {
	 	 return;
	 }
	 
	 for(i=0; i< resources->num; i++){
      if(!strcmp(hostname, resources->names[i]) && (resources->allocstate[i]
 != RESOURCE_RELEASED)) {
        resources->allocstate[i] = RESOURCE_RELEASED;	 
	   }
    }	 
}


Run the client application

  1. Select Run > Run.

    The Run dialog appears.

  2. In the Configurations list, either select an EGO C Client Application or click New for a new configuration.

    For a new configuration, enter the configuration name.

  3. Enter the project name and C/C++ Application name.
  4. Click Apply and then Run.

Sample Output

[ Top ]


[ Platform Documentation ]


      Date Modified: July 12, 2006
Platform Computing: www.platform.com

Platform Support: support@platform.com
Platform Information Development: doc@platform.com

Copyright © 1994-2006 Platform Computing Corporation. All rights reserved.