Tutorial 4: Request Resource Allocation in a Cluster and Start Containers Using Threads
This tutorial describes how to create a registered EGO client that requests resource allocation in a cluster and starts containers on the hosts. This sample program uses threads to make the allocation requests to Platform EGO and start containers on the hosts allocated by Platform EGO.
Using this tutorial, you will ...
- Open a connection to Platform EGO
- Print out cluster information
- Check if there are any registered clients connected to Platform EGO
- Log on to Platform EGO
- Initialize a structure for client callback methods
- Register the client with Platform EGO
- Print out allocation and container reply info from a previous connection
- Print out host group information
- Define and initialize structures for the work, resource, and monitor threads
- Create and run the three threads
- Determine the number of available host slots and make a resource allocation request for half of them
- Store allocation requests in a resource queue and make allocation requests to Platform EGO
- Retrieve the allocation reply from the work queue and start a container on each host slot
- Calculate the average host load
- Check for registered clients connected to Platform EGO and print out info
- Unregister the client
Underlying principles
This code sample uses global data structures (workloadP, resourcesP, and monitorP) that are accessible by different threads. Since these data structures are considered shared resources, a mutex (mutual exclusion) object is used to prevent simultaneous modification of the data. The mutex object can be locked and unlocked by individual threads, thereby controlling access to the respective data structure. In conjunction with the mutex object, a condition variable enables threads to wait for the data to enter a defined state before accessing the data.
This sample implements three threads: resource, work, and monitor, in addition to the main thread.
The resource thread is responsible for getting the allocation request from the resource queue and making an allocation request to Platform EGO. Once the resource thread enters the wait state, the addResourceCB() and containerStateChgCB() callback methods and finalize() method set the condition variable that enables the resource thread to resume execution. The resource thread cycles through the queue until all allocation requests have been processed.
The work thread is responsible for getting the allocation reply from the work queue, adding the resource to the resource collection structure, and starting a container on the allocated host slot. Once the work thread enters the wait state, the add_resources() (called from the main thread) and finalize() methods set the condition variable that enables the work thread to resume execution. The thread cycles through the queue until containers have been started on all allocated host slots.
![]()
Step 1: Preprocessor directives and global variable declarations
The first step is to include a reference to the system and API header files, followed by the declaration of global variables and structures that are implemented in the sample.
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <errno.h> #include "vem.api.h" #include "samples.h" int samples_shutdown = 0; work_state_t *workloadP; resource_state_t *resourcesP; monitor_state_t *monitorP;Step 2: Implement the principal method
Lines 4-7: define and initialize a data structure that is used to request a connection with the EGO host cluster. The data structure contains a reference to a configuration file where the master host name and port numbers are stored.
Line 8: pass the data structure as an argument to the vem_open () method, which opens a connection to the master host. If the connection attempt is successful, a handle is returned; otherwise the method returns NULL. The handle acts as a communication channel to the master host and all subsequent communication occurs through this handle.
Lines 15-16: the vem_name_t structure (defined as clusterName) is initialized with NULL. This structure holds the cluster name, system name, and version. The vem_uname () method is passed the communication handle and, if successful, returns a valid vem_name_t structure ; otherwise the method returns NULL.
Line 24: the cluster info is printed out to the screen.
Lines 26-43: define the client info structure. Use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. Note that Platform EGO is equipped with a number of default clients (services) such as the Service Controller, so as a minimum, the info relevant to these clients is printed out and the associated memory is released.
1 int 2 sample4() 3 { 4 vem_openreq_t orequest; 5 vem_handle_t *vhandle = NULL; 6 orequest.file = "ego.conf"; 7 orequest.flags=0; 8 vhandle = vem_open(&orequest); 9 if (vhandle == NULL) { 10 // error opening 11 fprintf(stderr, "Error opening cluster: %s\n", vem_strerror(vemerrno)); 12 return -1; 13 } 14 15 vem_name_t *clusterName = NULL; 16 clusterName = vem_uname(vhandle); 17 if (clusterName == NULL) { 18 // error connecting 19 fprintf(stderr, "Error connecting to cluster: %s\n", 20 vem_strerror(vemerrno)); 21 return -2; 22 } 23 24 fprintf(stdout, " Connected... %s %s %4.2f\n", clusterName->clustername, 25 clusterName->sysname, clusterName->version); 26 vem_clientinfo_t *clients; 27 int rc = vem_locate(vhandle, NULL, &clients); 28 if (rc >=0) { 29 if (rc == 0) { 30 printf("No registered clients exist\n"); 31 } else { 32 int i=0; 33 for (i=0; i<rc; i++) { 34 printf("%s %s %s\n", clients[i].name, clients[i].description, 35 clients[i].location); 36 } 37 // free 38 vem_clear_clientinfo(clients); 39 } 40 } else { 41 // error connecting 42 fprintf(stderr, "Error geting clients: %s\n", vem_strerror(vemerrno)); 43 }Lines 44-47: authenticate the user to Platform EGO.
Lines 48-52: define and initialize a structure for callback methods. These callback methods are invoked by Platform EGO when resources are added or reclaimed, or when a change occurs to host status or a container. When Platform EGO wants to communicate about these events, it invokes these methods thereby calling back to the client.
44 if (login(vhandle, username, password)<0) { 45 fprintf(stderr, "Error logon: %s\n", vem_strerror(vemerrno)); 46 goto leave; 47 } 48 vem_clientcallback_t cbf; 49 cbf.addResource = addResourceCB; 50 cbf.reclaimForce = reclaimForceCB; 51 cbf.containerStateChg = containerStateChgCB; 52 cbf.hostStateChange = hostStateChangeCB;Lines 53-67: define the vem_allocation_info_reply_t and vem_container_info_reply_t structures. If a client gets disconnected and then re-registers, its existing allocations and containers are returned to these structures. If the client had never registered before, the structures would be empty. Define and initialize a structure (rreq) that holds client info for registration purposes. (This includes assigning the client callback structure (cbf) to the callback member of the rreq structure.) Register with Platform EGO via the open connection using vem_register().
53 vem_allocation_info_reply_t aireply; 54 vem_container_info_reply_t cireply; 55 vem_registerreq_t rreq; 56 57 rreq.name = "sample4_client"; 58 rreq.description = "Sample4 Client"; 59 rreq.flags = VEM_REGISTER_TTL; 60 rreq.ttl = 3; 61 rreq.cb = &cbf; 62 63 rc = vem_register(vhandle, &rreq, &aireply, &cireply); 64 if (rc < 0) { 65 fprintf(stderr, "Error registering: %s\n", vem_strerror(vemerrno)); 66 goto leave; 67 }Lines 68-71: print out information related to the allocation requests and containers. Once the info is printed out, the memory for the allocations is freed.
Lines 75-82: the vem_gethostgroupinfo() method collects the information for the requested hostgroup. In this case, the requested hostgroup in the input argument is set to NULL, which means that information about all hostgroups is requested. If the method call is successful, hostgroup information is printed out to the screen.
68 print_vem_allocation_info_reply(&aireply); 69 print_vem_container_info_reply(&cireply); 70 // freeup any previous allocations 71 release_vem_allocation(vhandle, &aireply); 72 vem_hostgroupreq_t hgroupreq; 73 hgroupreq.grouplist = NULL; 74 vem_hostgroup_t *hgroup; 75 rc = vem_gethostgroupinfo(vhandle, &hgroupreq, &hgroup); 76 if (rc < 0) { 77 fprintf(stderr, "Error getting hostgroup: %s\n", 78 vem_strerror(vemerrno)); 79 } else { 80 printf("%s %s %d %d\n", hgroup->groupName, hgroup->members, hgroup->free, 81 hgroup->allocated); 82 }Lines 83-95: define and initialize structures for the workload, resources and monitor threads. These structures are global in scope.
Lines 96-105: create and run the three threads.
Line 107-115: get half the number of available host slots and make a corresponding number of resource allocation requests via the add_resources() method. This method adds a new allocation request to the resource queue and increments the queue index (next_item). The add_resources() method also sets the condition variable, which tells the waiting resource_thread that a new allocation request has been added to the resource queue. The resource_thread resumes execution and the resource_mutex object is unlocked.
When a resource is added, the addResourceCB() callback method is executed. The callback method adds the allocation reply structure to the workload queue at position next_item and increments the index (next_item). The condition variable is set, which tells the waiting work_thread that a new allocation reply has been added to the workload queue. The work_thread resumes execution and the work_mutex object is unlocked. The allocation reply is also printed out.
83 pthread_t worker_thread, resource_thread, monitor_thread; 84 work_state_t workload; 85 resource_state_t resources; 86 monitor_state_t monitor; 87 88 // globals so that callback functions can find the queues/lock/cond var 89 workloadP = &workload; 90 resourcesP = &resources; 91 monitorP = &monitor; 92 93 initialize_workload(&workload, vhandle); 94 initialize_resources(&resources, vhandle); 95 initialize_monitor(&monitor, vhandle); 96 if (pthread_create(&worker_thread, NULL, work_thread_fn, &workload)) { 97 perror("Error creating worker thread: "); 98 } 99 if (pthread_create(&resource_thread, NULL, resource_thread_fn, 100 &resources)) { 101 perror("Error creating resource thread: "); 102 } 103 if (pthread_create(&monitor_thread, NULL, monitor_thread_fn, &monitor)) { 104 perror("Error creating monitor thread: "); 105 } 106 // Request half of them, one if just one is available 107 int numavailable = getNumberOfHostSlotsAvailable(vhandle); 108 fprintf(stderr, "Available Slots=%d\n", numavailable); 109 if(numavailable > 0) { 110 int num_request = (numavailable / 2) > 1 ? (numavailable / 2): 1; //3; 111 vem_allocreq_t *aloc_spec = get_alloc_spec(); 112 // aloc_spec->maxslots = 1; 113 // add to request Q 114 add_resources(num_request, aloc_spec); 115 }Lines 116-117: pause the main thread for 180 milliseconds. The finalize() method sets the samples_shutdown flag to 1 and sets the condition variable for all three threads. The shutdown flag causes the three threads to end execution.
Lines 119-126: block the main thread until all three threads have finished. Clean up the thread states by destroying the mutex object and condition variable associated with each thread.
Lines 128-143: use vem_locate() to get all registered clients. Since NULL is provided as the client name, all registered clients will be located and the method returns the number of registered clients. Note that Platform EGO is equipped with a number of default clients (services) such as the Service Controller, so as a minimum, the info relevant to these clients is printed out and the associated memory is released.
116 sleep(180); 117 finalize(); 118 // wait for worker, resource, monitor threads to finish 119 pthread_join(worker_thread, NULL); 120 pthread_join(resource_thread, NULL); 121 pthread_join(monitor_thread, NULL); 122 123 // clean up thread states 124 finalize_workload(workloadP); 125 finalize_resources(resourcesP); 126 finalize_monitor(monitorP); 127 128 rc = vem_locate(vhandle, NULL, &clients); 129 if (rc >=0) { 130 if (rc == 0) { 131 printf("No registered clients exist\n"); 132 } else { 133 int i=0; 134 for (i=0; i<rc; i++) { 135 printf("%s %s %s\n", clients[i].name, clients[i].description, 136 clients[i].location); 137 } 138 vem_clear_clientinfo(clients); 139 } 140 } else { 141 // error connecting 142 fprintf(stderr, "Error geting clients: %s\n", vem_strerror(vemerrno)); 143 } 144 bailout: 145 rc = vem_unregister(vhandle); 146 if (rc < 0) { 147 fprintf(stderr, "Error unregistering: %s\n", vem_strerror(vemerrno)); 148 } 149 if (logout(vhandle)<0) { 150 fprintf(stderr, "Error logoff: %s\n", vem_strerror(vemerrno)); 151 } 152 153 leave: 154 // free memory 155 vem_free_uname(clusterName); 156 vem_close(vhandle); 157 158 return 0; 159 }Step 3: Make resource allocation requests to Platform EGO (resource thread)
Lock the resource_mutex and wait for the condition variable to be set. Once the condition variable is set by the addResourceCB() callback method and thread execution resumes, get the allocation request from the resource queue using the work_items index. Increment the index. Make an allocation request to Platform EGO and retrieve and store the allocation ID. Continue this cycle until all the allocation requests in the resource queue have been processed.
void *resource_thread_fn(resource_state_t *resourcesP) { while(!samples_shutdown) { pthread_mutex_lock(&resourcesP->resource_mutex); resourcesP->ready = 1; pthread_cond_wait(&resourcesP->resource_cond, &resourcesP->resource_mutex); fprintf(stderr, "Need to allocate?\n"); while(resourcesP->work_items < resourcesP->next_item) { // deque alocspec vem_allocreq_t *aloc_spec = resourcesP->queue[resourcesP->work_items++].alocreq; vem_allocation_id_t alocid = get_resource(resourcesP->vhandle, aloc_spec); if(alocid != NULL) { // add to the allocated ids if(resourcesP->num_resources < MAX_RESOURCES) { resourcesP->alocids[resourcesP->num_resources++] = alocid; } else { // TODO Grow fprintf(stderr, "Exceeded Limit\n"); finalize(); } } else { fprintf(stderr, "Could not allocate\n"); } } /* else { // no new request, maybe shutdown request }*/ pthread_mutex_unlock(&resourcesP->resource_mutex); // if done } // free alocid memory fprintf(stderr, "ResourceThread Shutdown\n"); return NULL; }
Step 4: Get resource allocation reply from Platform EGO and start containers (work thread)
Lock the work_mutex and wait for the condition variable to be set. Once the condition variable is set and thread execution resumes, retrieve the allocation reply from the work queue using the work_items index. Increment the index and retrieve the container specification.
The addto_monitor_resource() method is called for each allocated host, which adds the host name, allocation ID, and allocation state to the resource collection. The method also increments the resources num counter (monitorP->resources->num).
For each slot in each host, start a container using the allocation ID, host name, and container specification. If successful, increment the running containers counter. Unlock the work_mutex.
void *work_thread_fn(work_state_t *workloadP) { while(!samples_shutdown /* && workloadP->num_containers_running > 0 */) { // wait until a resource host is allocated pthread_mutex_lock(&workloadP->work_mutex); workloadP->ready = 1; pthread_cond_wait(&workloadP->work_cond, &workloadP->work_mutex); fprintf(stderr, "Received Resource?\n"); int rc = 0; // pick out from queue while (workloadP->work_items < workloadP->next_item) { vem_allocreply_t * allocReply = &workloadP->queue[workloadP->work_items++]; int i=0, j=0; vem_container_spec_t *conspec = get_container_spec(); for(i=0; i<allocReply->nhost; i++) { addto_monitor_resource(monitorP, allocReply->host[i].name, allocReply->allocId); for(j=0; j<allocReply->host[i].slots; j++) { rc= startContainer(workloadP->vhandle, allocReply->allocId, allocReply->host[i].name, conspec); // if successful if (!rc) { ++workloadP->num_containers_running; } } } } /* else { // probably woken up as a container has changed state fprintf(stderr, "Nothing to do\n"); } */ pthread_mutex_unlock(&workloadP->work_mutex); } fprintf(stderr, "WorkThread Shutdown\n"); return NULL; }
Step 5: Calculate the average host load (monitor thread)
Lock the monitor_mutex and get the current time. The thread now waits for either a host state change to be signalled by Platform EGO or the wait time to expire. If a state change occurs, the corresponding callback method (hostStateChangeCB) is invoked by Platform EGO, which updates the host state in the resource collection. The condition variable is then set to reactivate the monitor thread. If the thread resumes execution due to wait time expiration, the average computer load is calculated and printed out.
void *monitor_thread_fn(monitor_state_t *monitorP) { struct timespec timeout; struct timeval now; int rc; while(!samples_shutdown) { // wait until change in host/container status is received pthread_mutex_lock(&monitorP->monitor_mutex); monitorP->ready = 1; gettimeofday(&now); timeout.tv_sec = now.tv_sec + 30; timeout.tv_nsec = now.tv_usec *1000; rc = pthread_cond_timedwait(&monitorP->monitor_cond, &monitorP->monitor_mutex, &timeout); // Currently no way to get container from id. //print_vem_container(vem_container_t *container); if(rc == ETIMEDOUT) { vem_hostinfo_t *hinfo = NULL; char **attrs; int numh; double *loads; double load = computeAverageLoad(monitorP, &numh, &hinfo, &attrs, &loads); fprintf(stderr, "\nMonitor: Avg. Load =%6.2f\n", load); if(hinfo != NULL) { free(loads); vem_free_hostinfo(hinfo, numh, attrs); } } else { // we were signaled fprintf(stderr, "Received Event?\n"); } pthread_mutex_unlock(&monitorP->monitor_mutex); // update activity information } fprintf(stderr, "MonitorThread Shutdown\n"); return NULL; }
Step 6: Client callback methods
These callback methods are invoked by Platform EGO when resources are added or reclaimed, or when a change occurs to host status or a container. When Platform EGO wants to communicate about these events, it invokes these methods thereby calling back to the client.
The addResourceCB() method locks the work_mutex object. The method then adds the allocation reply structure to the workload queue at position next_item and increments the index (next_item). The condition variable is set, which tells the waiting work_thread that a new allocation reply has been added to the workload queue. The work_mutex object is unlocked and the allocation reply is printed out.
int addResourceCB(vem_allocreply_t *areply) { printf("addResource Call Back\n"); pthread_mutex_lock(&workloadP->work_mutex); // check if thread is ready? while(!workloadP->ready) { pthread_mutex_unlock(&workloadP->work_mutex); sleep(1); pthread_mutex_lock(&workloadP->work_mutex); } // add to queue if(workloadP->next_item < MAX_CONTAINERS) { vem_allocreply_clone_deep(&workloadP->queue[workloadP->next_item++], areply); } else { //TODO increase capacity fprintf(stderr, "Exceeded limit"); finalize(); } pthread_cond_signal(&workloadP->work_cond); pthread_mutex_unlock(&workloadP->work_mutex); print_vem_allocreply(areply); return 0; }The containerStateChgCB() method prints out the container ID and the new container state.
The work_mutex is locked and the method cycles through the list of containers and finds the container ID associated with the state change. If the new state indicates that the container has finished running, the workloadP->container_state is updated with the new state and the total number of running containers is decremented. The condition variable is set to signal the work_thread to resume execution and the work_mutex object is unlocked.
int containerStateChgCB(vem_containerstatechg_t *cschange) { printf("containerStateChg Call Back\n"); printf("%s %d\n", cschange->containerId, cschange->newState); pthread_mutex_lock(&workloadP->work_mutex); update_container_state(workloadP, cschange); pthread_cond_signal(&workloadP->work_cond); pthread_mutex_unlock(&workloadP->work_mutex); return 0; }The hostStateChangeCB() method prints out the host name and the new host state. After locking the monitor_mutex, the method calls the update_host_state() method which, in turn, updates the resource collection with the new host state info. The condition variable is set, which signals to the waiting monitor_thread to resume execution.
int hostStateChangeCB(vem_hoststatechange_t *hschange) { printf("hostStateChange Call Back\n"); printf("%s %d\n", hschange->name, hschange->newState); pthread_mutex_lock(&monitorP->monitor_mutex); update_host_state(monitorP, hschange); pthread_cond_signal(&monitorP->monitor_cond); pthread_mutex_unlock(&monitorP->monitor_mutex); return 0; }Step 7: Calculate the average activity load on the resources
In order to determine if a resource is too busy to receive jobs, a load index value is calculated and compared to a corresponding load threshold parameter. The following code retrieves and processes the load index value for r1m, the 1-minute CPU run queue length.
The method cycles through each resource in the resource collection and checks the allocation status. If the resource is allocated, a counter is incremented that keeps track of the number of allocated resources. The resource name is also stored (hostlist array), which is assigned to the hinforeq structure. Since the host list is used as an input to vem_getHostInfo(), the method will only update info for hosts in the list, as well as return the number of allocated hosts.
The getLoadAttribute() method cycles through the list of host attributes looking for the r1m load index. (The r1m load index represents the average number of processes ready to use the CPU during a one-minute interval.) This method returns the attribute array index corresponding to load index r1m. The value for r1m is retrieved for each allocated resource and converted to a double data type. These values are stored in an array as well as added to a variable (load), which is used as an accumulator. The total sum is then divided by the number of allocated resources to yield the average load index value of r1m.
double computeAverageLoad(monitor_state_t *monitorP, int *nhostsP, vem_hostinfo_t **hinfoP, char*** host_attributesP, double **loadsP) { double load = 0.0; double *loads = NULL; // assumes caller has acquired the lock // finds allocated hosts resource_collection_t *resources = monitorP->resources; int i, j=0, count=0; for(i=0; i<resources->num; i++){ if(resources->allocstate[i] == RESOURCE_ALLOCATED) { count++; } } /* No hosts allocated */ if(count == 0) { *nhostsP = 0; *host_attributesP = NULL; *hinfoP = NULL; *loadsP = NULL; return 0; } char **hostlist = calloc(count, sizeof(char*)); for(i=0; i<resources->num; i++){ if(resources->allocstate[i] == RESOURCE_ALLOCATED) { hostlist[j] = resources->names[i]; j++; } }
int hin; vem_hostinforeq_t hinforeq; hinforeq.resreq = ""; // resource string, unimplemented hinforeq.hostlist = hostlist; vem_hostinfo_t *hinfo = NULL; // out parameter, set by vem_getHostInfo char **attrs = NULL; // out parameter, set by vem_getHostInfo hin = vem_getHostInfo(monitorP->vhandle, &hinforeq, &hinfo, &attrs); if (hin < 0) { // error fprintf(stderr, "Error getting hostinfo: %s %d\n", vem_strerror(vemerrno), hin); *nhostsP = hin; *host_attributesP = attrs; *hinfoP = NULL; *loadsP = loads; return -1.0; } else { //TODO are the hosts in hinfo, in the same order as hinforeq? print_hostInfo(hin, hinfo, attrs); // find load attribute int index = get_load_attribute("r1m", attrs); loads = calloc(hin, sizeof(double)); double l; for(i=0; i<hin; i++) { l = get_load(&hinfo[i], index); loads[i] = l; load += l; } free(hostlist); } *nhostsP = hin; *hinfoP = hinfo; *host_attributesP = attrs; *loadsP = loads; return (count == 0) ? 0 : load / count; }
double get_load(vem_hostinfo_t *hinfo, int index) { double rv=-1.0; vem_value_t value; value = hinfo->attributes[index]; rv = value.value.v_float32; return rv; } int get_load_attribute(char *load, char **attrs) { int j=-1; while(attrs[++j] != NULL) { if(!strcmp(load, attrs[j])) { return j; } } return j; }Run the client application
- Select Run > Run.
The Run dialog appears.
- In the Configurations list, either select an EGO C Client Application or click New for a new configuration.
For a new configuration, enter the configuration name.
- Enter the project name and C/C++ Application name.
- Click Apply and then Run.
Sample output
![]()
![]()
![]()
![]()
[ Top ]
Date Modified: November 30, 2006
Platform Computing: www.platform.com
© 1994-2006 Platform Computing Corporation. All rights reserved.