/******************************************************************************* ** ** Source File Name = thdsrver.sqc ** ** Licensed Materials - Property of IBM ** ** (C) COPYRIGHT International Business Machines Corp. 1995, 2000 ** All Rights Reserved. ** ** US Government Users Restricted Rights - Use, duplication or ** disclosure restricted by GSA ADP Schedule Contract with IBM Corp. ** ** ** PURPOSE : illustrate the use of DB2 multiple context APIs ** in a UNIX development environment. ** ** This sample program uses the posix threads APIs for thread creation and ** management. On solaris systems it is also possible to use the solaris ** thread APIs (thd_create, ...). ** ** The program maintains a pool of contexts. A generate_work function ** is executed from main, and creates dynamic sql statements that are ** executed by worker threads. When a context becomes available, a ** thread is created and dispatched to do the specified work. ** ** The work generated consists of statements to delete entries from ** either the STAFF or EMPLOYEE tables of the SAMPLE database. ** ** EXTERNAL DEPENDENCIES : ** - Ensure existence of the sample database. If it does not ** exist, run the 'db2sampl' command. ** - Precompile with the SQL precompiler (PREP in DB2) ** - Bind to a database (BIND in DB2) ** - Compile and link with the C compiler and options for ** multi-threaded applications supported by your platform. ** ** For more information about these samples see the README file. ** ** For more information on programming in C, see the: ** - "Programming in C and C++" section of the Application Development Guide ** For more information on Building C Applications, see the: ** - "Building C Applications" section of the Application Building Guide. ** ** For more information on the SQL language see the SQL Reference. ** *******************************************************************************/ #ifdef USE_UI_THREADS /* UnixWare and Sun both have the "Unix International" threads APIs available */ #include <thread.h> #include <synch.h> #else #include <pthread.h> #endif #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <sql.h> #ifdef USE_UI_THREADS /* Hide the differences in the threads implementations */ #define pthread_exit(x) thr_exit(x) #define pthread_mutex_lock(x) mutex_lock(x) #define pthread_mutex_unlock(x) mutex_unlock(x) #define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y) #define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y) #define pthread_cond_wait(x,y) cond_wait(x,y) #define pthread_cond_signal(x) cond_signal(x) #define pthread_mutex_t mutex_t #define pthread_cond_t cond_t #define pthread_t thread_t #endif #if (defined(DB2SCO)) || (defined(DB2DYNIX)) || (defined(DB2HP)) /* UnixWare threads have a very small default stack */ #define DEFAULT_STACK_SIZE 0x20000 #else #define DEFAULT_STACK_SIZE 0 #endif /* * Generate work creates the following type of structure which is passed * to each worker thread. */ struct work { char database[15]; /* database for thread to connect to */ char userid[15]; char password[15]; char * command; /* dynamic sql statement to execute */ int context; /* context to use for connection */ }; /* * The context pool consists of an array of 'struct context' types. */ struct context { void * ctx; int free; }; /* * Global variables. */ int contexts = 8; /* size of context pool */ struct context * ctxlist; #ifndef USE_UI_THREADS pthread_attr_t attr; /* global thread attributes */ #endif pthread_t * thd; /* array of thread ids */ int loops = 15; /* amount of work for the client to create */ int commit = 0; /* commit the work done */ int verbose = 1; char database[15]; char userid[15]; char password[15]; /* for management of the context pool */ int contexts_free; pthread_cond_t cond; pthread_mutex_t cond_m; /* * Prototypes. */ void initialize(int argc, char * argv[]); void usage(char * argv0); void generate_work(void); void dispatch(struct work * work_item); void * do_work(void * args); /* each thread executes this function */ void clean_up(struct work * work_item, int connect_done, int *pStatus); #define check_expected(condition) \ { \ if (!(condition)) { \ fprintf(stderr, "%s:%i unexpected error: \"%s\" was false\n", \ __FILE__, __LINE__, #condition); \ exit(1); \ } \ } #define CHECKERR(context, CE_STR, pStatus) \ { char buf[256]; \ sprintf(buf,"Context nb.: %i\n%s", context, CE_STR); \ if (check_error (buf, &sqlca) != 0) \ { *(pStatus) = sqlca.sqlcode; \ } \ } int check_error (char eString[], struct sqlca *caPointer); int main(int argc, char * argv[]) { initialize(argc,argv); generate_work(); if (verbose) printf("all workers started, exiting main\n"); pthread_exit(0); } /* * Initialize any global program state. This includes the attributes * used for each thread creation, the setting of the multi-manual context type * and the creation of the context pool. */ void initialize(int argc, char * argv[]) { int c, i, rc; struct sqlca sqlca; strcpy(database, "SAMPLE"); strcpy(userid, ""); strcpy(password, ""); /* read any command line options */ while ( (c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF ) { switch (c) { case 'd' : strcpy(database, optarg); break; case 'u' : strcpy(userid, optarg); break; case 'p' : strcpy(password, optarg); break; case 'l' : loops = atoi(optarg); break; case 'c' : contexts = atoi(optarg); break; case 'q' : verbose = 0; break; case 'C' : commit = 1; break; case 'h' : default : usage(argv[0]); break; } } contexts_free = contexts; ctxlist = (struct context*)malloc(contexts*sizeof(struct context)); check_expected(ctxlist != NULL); thd = (pthread_t*)malloc(contexts*sizeof(pthread_t)); check_expected(thd != NULL); #ifndef USE_UI_THREADS rc = pthread_attr_init(&attr); check_expected(rc == 0); rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); check_expected(rc == 0); #if (defined(DB2DYNIX)) || (defined(DB2HP)) rc = pthread_attr_setstacksize(&attr, DEFAULT_STACK_SIZE); #endif #ifdef _POSIX_THREAD_PRIORITY_SCHEDULING #if (defined(DB2IRIX)) rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_PROCESS); #else rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); #endif check_expected(rc == 0); #endif #endif sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL); if (verbose) printf("creating context pool of size %i\n", contexts); for (i = 0 ; i < contexts ; i++) { rc = sqleBeginCtx(&ctxlist[i].ctx, SQL_CTX_CREATE_ONLY, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); ctxlist[i].free = 1; } rc = pthread_mutex_init(&cond_m, NULL); check_expected(rc == 0); rc = pthread_cond_init(&cond, NULL); check_expected(rc == 0); } /* * Print a friendly usage message. */ void usage(char * argv0) { char * program = strrchr(argv0, '/'); if (!program) program = argv0; fprintf(stderr, "usage: %s \n" " [-d database] [-u userid] [-p password]\n" " [-l loops] [-c contexts] [-q] [-C] [-h]\n\n" " -d\t alternate sample database or database alias.\n" " -u\t user id.\n" " -p\t password.\n" " -l\t number of loops.\n" " -c\t size of context pool to use.\n" " -q\t quiet mode.\n" " -C\t commit changes made.\n" " -h\t print this message.\n" , program); exit(1); } /* * Construct a "random" sql statement to execute in a connection to an * arbitrary database. * * Note that the exclusive use of the SAMPLE database here is not a db2 * restriction, but is a convienience to simplify this program. */ void generate_work(void) { int i, empno; unsigned int seed = getpid(); struct work * work_item; char buf[256]; /* The employee numbers are in the 10-350 range and are multiples of * 10. */ char * delete_str1 = "DELETE FROM STAFF WHERE ID=%i"; char * delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'"; /* Generate work to be done in each thread. */ for (i = 0 ; i < loops ; i++) { work_item = (struct work*)malloc(sizeof(struct work)); strcpy(work_item->database, database); strcpy(work_item->userid, userid); strcpy(work_item->password, password); empno = ((rand_r(&seed) % 34) + 1) * 10; sprintf(buf, i%2 ? delete_str1 : delete_str2, empno); work_item->command = strdup(buf); dispatch(work_item); } } /* * The current thread will be suspended until the required resources * are available (ie: a context is free). At this point a thread is created * to execute the specified sql statement. */ void dispatch(struct work * work_item) { int rc, ctx; rc = pthread_mutex_lock(&cond_m); check_expected(rc == 0); while (!contexts_free) { rc = pthread_cond_wait(&cond, &cond_m); check_expected(rc == 0); } /* there is at least one free context at this point, find one */ for (ctx = 0 ; ctx < contexts ; ctx++) { if (ctxlist[ctx].free) break; } ctxlist[ctx].free = 0; contexts_free--; rc = pthread_mutex_unlock(&cond_m); check_expected(rc == 0); work_item->context = ctx; if (verbose) printf("creating thread on context %i for sql statement:\n" "\t\"%s\"\n", ctx, work_item->command); #ifdef USE_UI_THREADS rc = thr_create(NULL, DEFAULT_STACK_SIZE, do_work, (void*)work_item, THR_BOUND | THR_DETACHED, &thd[ctx]); #else rc = pthread_create(&thd[ctx], &attr, do_work, (void*)work_item); #endif check_expected(rc == 0); } /* * Execute the sql statement. This is the "main" routine for each of the * worker threads. * * A context will be attached to for the connection, a connection will be done, * and a simple sql statement will be prepared and executed. * * After this, or in the event of non-terminal error, the context will be * detached from if an attachment has occurred, and any further resource * deallocation will occur. * * Before termination a condition will be signalled to wake up dispatch if * no contexts had been available. */ void * do_work(void * args) { int rc, status = 0; struct sqlca sqlca; EXEC SQL BEGIN DECLARE SECTION; char dbname[15]; char user[15]; char pswd[15]; char statement[256]; EXEC SQL END DECLARE SECTION; struct work * work = (struct work *)args; strcpy(dbname, work->database); strcpy(user, work->userid); strcpy(pswd, work->password); if (verbose) printf("%i: sqleAttachToCtx\n", work->context); rc = sqleAttachToCtx(ctxlist[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); if (verbose) printf("%i: CONNECT TO %s\n", work->context, dbname); if(strlen(user)==0) { EXEC SQL CONNECT TO :dbname; } else { EXEC SQL CONNECT TO :dbname USER:user USING:pswd; } CHECKERR (work->context, "CONNECT TO DATABASE", &status); if( sqlca.sqlcode != 0 ) { clean_up( work, 0, &status); } else { strcpy(statement, work->command); if (verbose) printf("%i: EXECUTE \"%s\"\n", work->context, statement); EXEC SQL EXECUTE IMMEDIATE :statement; CHECKERR (work->context, "EXECUTE IMMEDIATE", &status) ; clean_up( work, 1, &status); } return (void *)status; /* this could be obtained with a pthread_join if the thread were created undetached */ } void clean_up(struct work * work, int connect_done, int *pStatus) { int rc; struct sqlca sqlca; if (connect_done) { if (commit) { if (verbose) printf("%i: COMMIT\n", work->context); EXEC SQL COMMIT; CHECKERR (work->context, "COMMIT", pStatus) ; } else { if (verbose) printf("%i: ROLLBACK\n", work->context); EXEC SQL ROLLBACK; CHECKERR (work->context, "ROLLBACK", pStatus) ; } if (verbose) printf("%i: CONNECT RESET\n", work->context); EXEC SQL CONNECT RESET; CHECKERR (work->context, "CONNECT RESET", pStatus) ; } if (verbose) printf("%i: sqleDetachFromCtx\n", work->context); rc = sqleDetachFromCtx(ctxlist[work->context].ctx, NULL, &sqlca); check_expected(rc == 0 && sqlca.sqlcode == 0); rc = pthread_mutex_lock(&cond_m); check_expected(rc == 0); if (verbose) printf("%i: marking context free\n", work->context); ctxlist[work->context].free = 1; contexts_free++; rc = pthread_cond_signal(&cond); check_expected(rc == 0); rc = pthread_mutex_unlock(&cond_m); check_expected(rc == 0); free(work->command); free(work); } /******************************************************************************* ** Procedure : check_error ** ** Purpose : This procedure checks the SQLCACODE flag and prints out any ** information that is available related to the specific error. ** *******************************************************************************/ int check_error (char eString[], struct sqlca *caPointer) { char eBuffer[1024]; char sBuffer[1024]; char message[1024]; char messToken[1024]; short rc, Erc; int status=0; if (caPointer->sqlcode != 0 && caPointer->sqlcode != 100) { strcpy(message, ""); sprintf (messToken, "--- error report ---\n"); strcat(message, messToken); sprintf (messToken, "ERROR occurred : %s.\nSQLCODE : %ld\n", eString, caPointer->sqlcode); strcat(message, messToken); /**********************\ * GET SQLSTATE MESSAGE * \**********************/ rc = sqlogstt (sBuffer, 1024, 80, caPointer->sqlstate); /******************************\ * GET ERROR MESSAGE API called * \******************************/ Erc = sqlaintp (eBuffer, 1024, 80, caPointer); /* return code is the length of the eBuffer string */ if (Erc > 0) { sprintf (messToken, "%s", eBuffer); strcat(message, messToken); } if (caPointer->sqlcode < 0) { if (rc == 0) { sprintf (messToken, "\n%s", sBuffer); strcat(message, messToken); } sprintf (messToken, "--- end error report ---\n"); strcat(message, messToken); printf("%s", message); return 1; } else { /* errorCode is just a Warning message */ if (rc == 0) { sprintf (messToken, "\n%s", sBuffer); strcat(message, messToken); } sprintf (messToken, "--- end error report ---\n"); strcat(message, messToken); sprintf (messToken, "WARNING - CONTINUING PROGRAM WITH WARNINGS!\n"); strcat(message, messToken); printf("%s", message); return 0; } /* endif */ } /* endif */ return 0; }