/******************************************************************************* ** ** Source File Name = thdsrver.sqC ** ** Licensed Materials - Property of IBM ** ** (C) COPYRIGHT International Business Machines Corp. 1999 ** All Rights Reserved. ** ** US Government Users Restricted Rights - Use, duplication or ** disclosure restricted by GSA ADP Schedule Contract with IBM Corp. ** ** ** PURPOSE : illustrate the use of DB2 multiple context APIs ** in a UNIX development environment. ** ** This sample program uses the posix threads APIs for thread creation and ** management. On UnixWare and Sun systems it is also possible to use the ** "Unix International" threads APIs (thd_create, ...). ** ** The program maintains a pool of contexts. When a context becomes ** available, a thread is created and dispatched to do the specified work. ** ** The work generated consists of statements to delete entries from ** either the STAFF or EMPLOYEE tables of the SAMPLE database. ** ** EXTERNAL DEPENDENCIES : ** - Ensure existence of the sample database. If it does not ** exist, run the 'db2sampl' command. ** - Precompile with the SQL precompiler (PREP in DB2) ** - Bind to a database (BIND in DB2) ** - Compile and link with the C++ compiler and options for ** multi-threaded applications supported by your platform. ** ** For more information about these samples see the README file. ** ** For more information on programming in C++, see the ** - "Programming in C and C++" section of the Application Development Guide ** ** For more information on building C++ applications, see the: ** - "Building C++ Applications" section of the Application Building Guide. ** ** For more information on the SQL language see the SQL Reference. *******************************************************************************/ #ifdef USE_UI_THREADS /* UnixWare and Sun both have the "Unix International" threads APIs available */ #include <thread.h> #include <synch.h> #else #include <pthread.h> #endif #include <unistd.h> #include <string.h> #include <stdlib.h> #include <iostream.h> #include <stdio.h> #include <sql.h> #include "util.h" #ifdef USE_UI_THREADS /* Hide the differences in the threads implementations */ #define pthread_exit(x) thr_exit(x) #define pthread_mutex_lock(x) mutex_lock(x) #define pthread_mutex_unlock(x) mutex_unlock(x) #define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y) #define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y) #define pthread_cond_wait(x,y) cond_wait(x,y) #define pthread_cond_signal(x) cond_signal(x) #define pthread_mutex_t mutex_t #define pthread_cond_t cond_t #define pthread_t thread_t #endif #ifdef DB2SCO /* UnixWare threads have a very small default stack */ #define DEFAULT_STACK_SIZE 0x20000 #else #define DEFAULT_STACK_SIZE 0 #endif #define check_expected(condition) \ { \ if (!(condition)) \ { char buf[256]; \ sprintf(buf , "%s:%i unexpected error: \"%s\" was false\n", \ __FILE__, __LINE__, #condition); \ cerr << buf ; \ exit(1); \ } \ } #define CHECKERR(context, CE_STR, pStatus) \ { char buf[256]; \ sprintf(buf,"Context nb.: %i\n%s", context, CE_STR); \ if (check_error (buf, &sqlca) != 0) \ { *(pStatus) = sqlca.sqlcode; \ } \ } // The context pool consists of an array of Context types. class Context { public: void *ctx; int free; }; class GlobalData { public: int contexts ; // size of context pool int loops ; // amount of work for the client to create int commit ; // commit the work done int verbose ; char database[15] ; char userid[15]; char password[15]; Context * ctxlist; #ifndef USE_UI_THREADS pthread_attr_t attr; // global thread attributes #endif pthread_t * thd; // array of thread ids // for management of the context pool int contextsFree; pthread_cond_t cond; pthread_mutex_t cond_m; public: // Constructors GlobalData(void) { contexts = 8; loops = 15; commit = 0; verbose = 1; strcpy(database, "SAMPLE"); strcpy(userid, ""); strcpy(password, ""); }; }; class CommandLineReader { static GlobalData * gData; public: void Read(int argc, char* argv[]); private: void Usage(char* argv0); }; // Dispatcher.GenerateWork creates the following type of object // which is passed to each worker thread. class Work { public: char database[15]; // database for thread to connect to char userid[15]; char password[15]; char * command; // dynamic sql statement to execute int context; // context to use for connection public: Work(void) { command = (char *) 0; }; ~Work(void) { if(command != (char *)0 ) { delete command; } }; void SetDatabase(char * db) { strcpy(database, db); }; void SetUserid(char * uid) { strcpy(userid, uid); }; void SetPassword(char * pswd) { strcpy(password, pswd); }; void SetCommand(char * cmd) { int cmdLen = (int)strlen(cmd); if(command != (char *)0 ) { delete command; } command = new char[cmdLen+1]; strcpy(command, cmd); }; }; class Dispatcher { static GlobalData * gData; public: void Initialize(void); void GenerateWork(void); void Dispatch(Work * workItem); }; class SqlThread { static GlobalData * gData; public: static void * DoWork(void * args ); private: static void CleanUp(Work * work, int connectDone, long * pStatus); }; GlobalData globalData; GlobalData * CommandLineReader::gData = &globalData; GlobalData * Dispatcher::gData = &globalData; GlobalData * SqlThread::gData = &globalData; int main(int argc, char *argv[]) { CommandLineReader clReader; clReader.Read(argc, argv); cout << "Sample C++ program: THDSRVER \n"; Dispatcher dispatcher; dispatcher.Initialize(); dispatcher.GenerateWork(); return 0; } // end of program : thdsrver.sqC // Class CommandLineReader - Implementation void CommandLineReader::Read(int argc, char* argv[]) { int c; // read any command line options while ( (c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF ) { switch (c) { case 'd' : strcpy(gData->database, optarg); break; case 'u' : strcpy(gData->userid, optarg); break; case 'p' : strcpy(gData->password, optarg); break; case 'l' : gData->loops = atoi(optarg); break; case 'c' : gData->contexts = atoi(optarg); break; case 'q' : gData->verbose = 0; break; case 'C' : gData->commit = 1; break; case 'h' : default : Usage(argv[0]); break; } } } void CommandLineReader::Usage(char* argv0) { char * program = strrchr(argv0, '/'); if (!program) program = argv0; cerr << "usage: " << program << "\n" << " [-d database] [-u userid] [-p password]\n" << " [-l loops] [-c contexts] [-q] [-C] [-h]\n\n" << " -d alternate sample database or database alias.\n" << " -u user id.\n" << " -p password.\n" << " -l number of loops.\n" << " -c size of context pool to use.\n" << " -q quiet mode.\n" << " -C commit changes made.\n" << " -h print this message.\n" ; exit(1); } // Class Dispatcher - Implementation // Initialize any global program state. This includes the attributes // used for each thread creation, the setting of the multi-manual context type // and the creation of the context pool. void Dispatcher::Initialize() { int i, rc; struct sqlca sqlca; gData->contextsFree = gData->contexts; gData->ctxlist = new Context[gData->contexts]; check_expected(gData->ctxlist != NULL); gData->thd = new pthread_t[gData->contexts]; check_expected(gData->thd != NULL); #ifndef USE_UI_THREADS rc = pthread_attr_init(&(gData->attr)); check_expected(rc == 0); rc = pthread_attr_setdetachstate(&(gData->attr), PTHREAD_CREATE_DETACHED); check_expected(rc == 0); #ifdef _POSIX_THREAD_PRIORITY_SCHEDULING #if (defined(DB2IRIX)) rc = pthread_attr_setscope(&(gData->attr), PTHREAD_SCOPE_PROCESS); #else rc = pthread_attr_setscope(&(gData->attr), PTHREAD_SCOPE_SYSTEM); #endif check_expected(rc == 0); #endif #endif sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL); if (gData->verbose) { cout << "creating context pool of size " << gData->contexts << "\n"; } for (i = 0 ; i < gData->contexts ; i++) { rc = sqleBeginCtx(&((gData->ctxlist)[i].ctx), SQL_CTX_CREATE_ONLY, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); (gData->ctxlist)[i].free = 1; } rc = pthread_mutex_init(&(gData->cond_m), NULL); check_expected(rc == 0); rc = pthread_cond_init(&(gData->cond), NULL); check_expected(rc == 0); } // Construct a "random" sql statement to execute in a connection to an // arbitrary database. // // Note that the exclusive use of the SAMPLE database here is not a db2 // restriction, but is a convienience to simplify this program. void Dispatcher::GenerateWork() { char * delete_str1 = "DELETE FROM STAFF WHERE ID=%i"; char * delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'"; int i, empno; unsigned int seed = getpid(); Work * workItem; char cmd[256]; // Generate work to be done in each thread. for (i = 0 ; i < gData->loops ; i++) { workItem = new Work; workItem->SetDatabase(gData->database); workItem->SetUserid(gData->userid); workItem->SetPassword(gData->password); // The employee numbers are in the 10-350 range and are multiples of // 10. empno = ((rand_r(&seed) % 34) + 1) * 10; sprintf(cmd, i%2 ? delete_str1 : delete_str2, empno); workItem->SetCommand(cmd); Dispatch(workItem); } if (gData->verbose) { cout << "all workers started, exiting main\n" ; } pthread_exit(0); } // The current thread will be suspended until the required resources // are available (ie: a context is free). At this point a thread is created // to execute the specified sql statement. void Dispatcher::Dispatch(Work * workItem) { int rc, ctx; char buf[256]; rc = pthread_mutex_lock(&(gData->cond_m)); check_expected(rc == 0); while (!gData->contextsFree) { rc = pthread_cond_wait(&(gData->cond), &(gData->cond_m)); check_expected(rc == 0); } // there is at least one free context at this point, find one for (ctx = 0 ; ctx < gData->contexts ; ctx++) { if ((gData->ctxlist)[ctx].free) { break; } } (gData->ctxlist)[ctx].free = 0; gData->contextsFree--; rc = pthread_mutex_unlock(&gData->cond_m); check_expected(rc == 0); workItem->context = ctx; if (gData->verbose) { sprintf(buf, "creating thread on context %i for sql statement:\n" "\t\"%s\"\n", ctx, workItem->command); cout << buf; } #ifdef USE_UI_THREADS rc = thr_create(NULL, DEFAULT_STACK_SIZE, SqlThread::DoWork, (void*)workItem, THR_BOUND | THR_DETACHED, &(gData->thd)[ctx]); #else rc = pthread_create(&(gData->thd)[ctx], &(gData->attr), SqlThread::DoWork, (void*)workItem); #endif check_expected(rc == 0); } // Class SqlThread - Implementation // Execute the sql statement. This is the "main" routine for each of the // worker threads. // // A context will be attached to for the connection, a connection will be done, // and a simple sql statement will be prepared and executed. // // After this, or in the event of non-terminal error, the context will be // detached from if an attachment has occurred, and any further resource // deallocation will occur. // // Before termination a condition will be signalled to wake up dispatch if // no contexts had been available. void * SqlThread::DoWork(void * args) { int rc; long status = 0; struct sqlca sqlca; EXEC SQL BEGIN DECLARE SECTION; char dbname[15]; char user[15]; char pswd[15]; char statement[256]; EXEC SQL END DECLARE SECTION; Work * work = (Work *)args; char buf[256]; strcpy(dbname, work->database); strcpy(user, work->userid); strcpy(pswd, work->password); if (gData->verbose) { sprintf(buf, "%i: sqleAttachToCtx\n", work->context); cout << buf; } rc = sqleAttachToCtx((gData->ctxlist)[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); if (gData->verbose) { sprintf(buf, "%i: CONNECT TO %s\n", work->context, dbname); cout << buf ; } if(strlen(user)==0) { EXEC SQL CONNECT TO :dbname; } else { EXEC SQL CONNECT TO :dbname USER:user USING:pswd; } CHECKERR (work->context, "CONNECT TO DATABASE", &status); if( sqlca.sqlcode != 0 ) { CleanUp( work, 0, &status); } else { strcpy(statement, work->command); if (gData->verbose) { sprintf(buf ,"%i: EXECUTE \"%s\"\n", work->context, statement); cout << buf ; } EXEC SQL EXECUTE IMMEDIATE :statement; CHECKERR (work->context, "EXECUTE IMMEDIATE", &status) ; CleanUp( work, 1, &status); } return (void *)status; // this could be obtained with a pthread_join // if the thread were created undetached } void SqlThread::CleanUp(Work * work, int connectDone, long * pStatus) { int rc; struct sqlca sqlca; char buf[256]; if (connectDone) { if (gData->commit) { if (gData->verbose) { sprintf(buf, "%i: COMMIT\n", work->context); cout << buf ; } EXEC SQL COMMIT; CHECKERR (work->context, "COMMIT", pStatus) ; } else { if (gData->verbose) { sprintf(buf, "%i: ROLLBACK\n", work->context); cout << buf ; } EXEC SQL ROLLBACK; CHECKERR (work->context, "ROLLBACK", pStatus) ; } if (gData->verbose) { sprintf(buf, "%i: CONNECT RESET\n", work->context); cout << buf ; } EXEC SQL CONNECT RESET; CHECKERR (work->context, "CONNECT RESET", pStatus) ; } if (gData->verbose) { sprintf(buf, "%i: sqleDetachFromCtx\n", work->context); cout << buf ; } rc = sqleDetachFromCtx((gData->ctxlist)[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); rc = pthread_mutex_lock(&gData->cond_m); check_expected(rc == 0); if (gData->verbose) { sprintf(buf, "%i: marking context free\n", work->context); cout << buf ; } (gData->ctxlist)[work->context].free = 1; gData->contextsFree++; rc = pthread_cond_signal(&gData->cond); check_expected(rc == 0); rc = pthread_mutex_unlock(&gData->cond_m); check_expected(rc == 0); delete(work->command); delete(work); }