/*******************************************************************************
**
** Source File Name = thdsrver.sqC
**
** Licensed Materials - Property of IBM
**
** (C) COPYRIGHT International Business Machines Corp. 1999
** All Rights Reserved.
**
** US Government Users Restricted Rights - Use, duplication or
** disclosure restricted by GSA ADP Schedule Contract with IBM Corp.
**
**
** PURPOSE : illustrate the use of DB2 multiple context APIs
** in a UNIX development environment.
**
** This sample program uses the posix threads APIs for thread creation and
** management. On UnixWare and Sun systems it is also possible to use the
** "Unix International" threads APIs (thd_create, ...).
**
** The program maintains a pool of contexts. When a context becomes
** available, a thread is created and dispatched to do the specified work.
**
** The work generated consists of statements to delete entries from
** either the STAFF or EMPLOYEE tables of the SAMPLE database.
**
** EXTERNAL DEPENDENCIES :
** - Ensure existence of the sample database. If it does not
** exist, run the 'db2sampl' command.
** - Precompile with the SQL precompiler (PREP in DB2)
** - Bind to a database (BIND in DB2)
** - Compile and link with the C++ compiler and options for
** multi-threaded applications supported by your platform.
**
** For more information about these samples see the README file.
**
** For more information on programming in C++, see the
** - "Programming in C and C++" section of the Application Development Guide
**
** For more information on building C++ applications, see the:
** - "Building C++ Applications" section of the Application Building Guide.
**
** For more information on the SQL language see the SQL Reference.
*******************************************************************************/
#ifdef USE_UI_THREADS
/* UnixWare and Sun both have the "Unix International" threads APIs available */
#include <thread.h>
#include <synch.h>
#else
#include <pthread.h>
#endif
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <iostream.h>
#include <stdio.h>
#include <sql.h>
#include "util.h"
#ifdef USE_UI_THREADS
/* Hide the differences in the threads implementations */
#define pthread_exit(x) thr_exit(x)
#define pthread_mutex_lock(x) mutex_lock(x)
#define pthread_mutex_unlock(x) mutex_unlock(x)
#define pthread_mutex_init(x,y) mutex_init(x, USYNC_THREAD, y)
#define pthread_cond_init(x,y) cond_init(x, USYNC_THREAD, y)
#define pthread_cond_wait(x,y) cond_wait(x,y)
#define pthread_cond_signal(x) cond_signal(x)
#define pthread_mutex_t mutex_t
#define pthread_cond_t cond_t
#define pthread_t thread_t
#endif
#ifdef DB2SCO
/* UnixWare threads have a very small default stack */
#define DEFAULT_STACK_SIZE 0x20000
#else
#define DEFAULT_STACK_SIZE 0
#endif
#define check_expected(condition) \
{ \
if (!(condition)) \
{ char buf[256]; \
sprintf(buf , "%s:%i unexpected error: \"%s\" was false\n", \
__FILE__, __LINE__, #condition); \
cerr << buf ; \
exit(1); \
} \
}
#define CHECKERR(context, CE_STR, pStatus) \
{ char buf[256]; \
sprintf(buf,"Context nb.: %i\n%s", context, CE_STR); \
if (check_error (buf, &sqlca) != 0) \
{ *(pStatus) = sqlca.sqlcode; \
} \
}
// The context pool consists of an array of Context types.
class Context
{ public:
void *ctx;
int free;
};
class GlobalData
{ public:
int contexts ; // size of context pool
int loops ; // amount of work for the client to create
int commit ; // commit the work done
int verbose ;
char database[15] ;
char userid[15];
char password[15];
Context * ctxlist;
#ifndef USE_UI_THREADS
pthread_attr_t attr; // global thread attributes
#endif
pthread_t * thd; // array of thread ids
// for management of the context pool
int contextsFree;
pthread_cond_t cond;
pthread_mutex_t cond_m;
public:
// Constructors
GlobalData(void)
{ contexts = 8;
loops = 15;
commit = 0;
verbose = 1;
strcpy(database, "SAMPLE");
strcpy(userid, "");
strcpy(password, "");
};
};
class CommandLineReader
{ static GlobalData * gData;
public:
void Read(int argc, char* argv[]);
private:
void Usage(char* argv0);
};
// Dispatcher.GenerateWork creates the following type of object
// which is passed to each worker thread.
class Work
{ public:
char database[15]; // database for thread to connect to
char userid[15];
char password[15];
char * command; // dynamic sql statement to execute
int context; // context to use for connection
public:
Work(void)
{ command = (char *) 0;
};
~Work(void)
{ if(command != (char *)0 )
{ delete command;
}
};
void SetDatabase(char * db)
{ strcpy(database, db);
};
void SetUserid(char * uid)
{ strcpy(userid, uid);
};
void SetPassword(char * pswd)
{ strcpy(password, pswd);
};
void SetCommand(char * cmd)
{ int cmdLen = (int)strlen(cmd);
if(command != (char *)0 )
{ delete command;
}
command = new char[cmdLen+1];
strcpy(command, cmd);
};
};
class Dispatcher
{ static GlobalData * gData;
public:
void Initialize(void);
void GenerateWork(void);
void Dispatch(Work * workItem);
};
class SqlThread
{ static GlobalData * gData;
public:
static void * DoWork(void * args );
private:
static void CleanUp(Work * work, int connectDone, long * pStatus);
};
GlobalData globalData;
GlobalData * CommandLineReader::gData = &globalData;
GlobalData * Dispatcher::gData = &globalData;
GlobalData * SqlThread::gData = &globalData;
int main(int argc, char *argv[])
{ CommandLineReader clReader;
clReader.Read(argc, argv);
cout << "Sample C++ program: THDSRVER \n";
Dispatcher dispatcher;
dispatcher.Initialize();
dispatcher.GenerateWork();
return 0;
} // end of program : thdsrver.sqC
// Class CommandLineReader - Implementation
void
CommandLineReader::Read(int argc, char* argv[])
{ int c;
// read any command line options
while ( (c = getopt(argc, argv, "d:u:p:l:c:qCh")) != EOF )
{ switch (c)
{ case 'd' :
strcpy(gData->database, optarg);
break;
case 'u' :
strcpy(gData->userid, optarg);
break;
case 'p' :
strcpy(gData->password, optarg);
break;
case 'l' :
gData->loops = atoi(optarg);
break;
case 'c' :
gData->contexts = atoi(optarg);
break;
case 'q' :
gData->verbose = 0;
break;
case 'C' :
gData->commit = 1;
break;
case 'h' :
default :
Usage(argv[0]);
break;
}
}
}
void
CommandLineReader::Usage(char* argv0)
{ char * program = strrchr(argv0, '/');
if (!program) program = argv0;
cerr << "usage: " << program << "\n"
<< " [-d database] [-u userid] [-p password]\n"
<< " [-l loops] [-c contexts] [-q] [-C] [-h]\n\n"
<< " -d alternate sample database or database alias.\n"
<< " -u user id.\n"
<< " -p password.\n"
<< " -l number of loops.\n"
<< " -c size of context pool to use.\n"
<< " -q quiet mode.\n"
<< " -C commit changes made.\n"
<< " -h print this message.\n" ;
exit(1);
}
// Class Dispatcher - Implementation
// Initialize any global program state. This includes the attributes
// used for each thread creation, the setting of the multi-manual context type
// and the creation of the context pool.
void
Dispatcher::Initialize()
{ int i, rc;
struct sqlca sqlca;
gData->contextsFree = gData->contexts;
gData->ctxlist = new Context[gData->contexts];
check_expected(gData->ctxlist != NULL);
gData->thd = new pthread_t[gData->contexts];
check_expected(gData->thd != NULL);
#ifndef USE_UI_THREADS
rc = pthread_attr_init(&(gData->attr));
check_expected(rc == 0);
rc = pthread_attr_setdetachstate(&(gData->attr), PTHREAD_CREATE_DETACHED);
check_expected(rc == 0);
#ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
#if (defined(DB2IRIX))
rc = pthread_attr_setscope(&(gData->attr), PTHREAD_SCOPE_PROCESS);
#else
rc = pthread_attr_setscope(&(gData->attr), PTHREAD_SCOPE_SYSTEM);
#endif
check_expected(rc == 0);
#endif
#endif
sqleSetTypeCtx(SQL_CTX_MULTI_MANUAL);
if (gData->verbose)
{ cout << "creating context pool of size " << gData->contexts << "\n";
}
for (i = 0 ; i < gData->contexts ; i++)
{ rc = sqleBeginCtx(&((gData->ctxlist)[i].ctx),
SQL_CTX_CREATE_ONLY, NULL,
&sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
(gData->ctxlist)[i].free = 1;
}
rc = pthread_mutex_init(&(gData->cond_m), NULL);
check_expected(rc == 0);
rc = pthread_cond_init(&(gData->cond), NULL);
check_expected(rc == 0);
}
// Construct a "random" sql statement to execute in a connection to an
// arbitrary database.
//
// Note that the exclusive use of the SAMPLE database here is not a db2
// restriction, but is a convienience to simplify this program.
void
Dispatcher::GenerateWork()
{ char * delete_str1 = "DELETE FROM STAFF WHERE ID=%i";
char * delete_str2 = "DELETE FROM EMPLOYEE WHERE EMPNO='%06i'";
int i, empno;
unsigned int seed = getpid();
Work * workItem;
char cmd[256];
// Generate work to be done in each thread.
for (i = 0 ; i < gData->loops ; i++)
{ workItem = new Work;
workItem->SetDatabase(gData->database);
workItem->SetUserid(gData->userid);
workItem->SetPassword(gData->password);
// The employee numbers are in the 10-350 range and are multiples of
// 10.
empno = ((rand_r(&seed) % 34) + 1) * 10;
sprintf(cmd, i%2 ? delete_str1 : delete_str2, empno);
workItem->SetCommand(cmd);
Dispatch(workItem);
}
if (gData->verbose)
{ cout << "all workers started, exiting main\n" ;
}
pthread_exit(0);
}
// The current thread will be suspended until the required resources
// are available (ie: a context is free). At this point a thread is created
// to execute the specified sql statement.
void
Dispatcher::Dispatch(Work * workItem)
{ int rc, ctx;
char buf[256];
rc = pthread_mutex_lock(&(gData->cond_m));
check_expected(rc == 0);
while (!gData->contextsFree)
{ rc = pthread_cond_wait(&(gData->cond), &(gData->cond_m));
check_expected(rc == 0);
}
// there is at least one free context at this point, find one
for (ctx = 0 ; ctx < gData->contexts ; ctx++)
{ if ((gData->ctxlist)[ctx].free)
{ break;
}
}
(gData->ctxlist)[ctx].free = 0;
gData->contextsFree--;
rc = pthread_mutex_unlock(&gData->cond_m);
check_expected(rc == 0);
workItem->context = ctx;
if (gData->verbose)
{ sprintf(buf, "creating thread on context %i for sql statement:\n"
"\t\"%s\"\n", ctx, workItem->command);
cout << buf;
}
#ifdef USE_UI_THREADS
rc = thr_create(NULL, DEFAULT_STACK_SIZE, SqlThread::DoWork,
(void*)workItem,
THR_BOUND | THR_DETACHED, &(gData->thd)[ctx]);
#else
rc = pthread_create(&(gData->thd)[ctx], &(gData->attr),
SqlThread::DoWork, (void*)workItem);
#endif
check_expected(rc == 0);
}
// Class SqlThread - Implementation
// Execute the sql statement. This is the "main" routine for each of the
// worker threads.
//
// A context will be attached to for the connection, a connection will be done,
// and a simple sql statement will be prepared and executed.
//
// After this, or in the event of non-terminal error, the context will be
// detached from if an attachment has occurred, and any further resource
// deallocation will occur.
//
// Before termination a condition will be signalled to wake up dispatch if
// no contexts had been available.
void *
SqlThread::DoWork(void * args)
{ int rc;
long status = 0;
struct sqlca sqlca;
EXEC SQL BEGIN DECLARE SECTION;
char dbname[15];
char user[15];
char pswd[15];
char statement[256];
EXEC SQL END DECLARE SECTION;
Work * work = (Work *)args;
char buf[256];
strcpy(dbname, work->database);
strcpy(user, work->userid);
strcpy(pswd, work->password);
if (gData->verbose)
{ sprintf(buf, "%i: sqleAttachToCtx\n", work->context);
cout << buf;
}
rc = sqleAttachToCtx((gData->ctxlist)[work->context].ctx, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
if (gData->verbose)
{ sprintf(buf, "%i: CONNECT TO %s\n", work->context, dbname);
cout << buf ;
}
if(strlen(user)==0)
{ EXEC SQL CONNECT TO :dbname;
}
else
{ EXEC SQL CONNECT TO :dbname USER:user USING:pswd;
}
CHECKERR (work->context, "CONNECT TO DATABASE", &status);
if( sqlca.sqlcode != 0 )
{ CleanUp( work, 0, &status);
}
else
{ strcpy(statement, work->command);
if (gData->verbose)
{ sprintf(buf ,"%i: EXECUTE \"%s\"\n", work->context, statement);
cout << buf ;
}
EXEC SQL EXECUTE IMMEDIATE :statement;
CHECKERR (work->context, "EXECUTE IMMEDIATE", &status) ;
CleanUp( work, 1, &status);
}
return (void *)status; // this could be obtained with a pthread_join
// if the thread were created undetached
}
void
SqlThread::CleanUp(Work * work, int connectDone, long * pStatus)
{ int rc;
struct sqlca sqlca;
char buf[256];
if (connectDone)
{ if (gData->commit)
{ if (gData->verbose)
{ sprintf(buf, "%i: COMMIT\n", work->context);
cout << buf ;
}
EXEC SQL COMMIT;
CHECKERR (work->context, "COMMIT", pStatus) ;
}
else
{ if (gData->verbose)
{ sprintf(buf, "%i: ROLLBACK\n", work->context);
cout << buf ;
}
EXEC SQL ROLLBACK;
CHECKERR (work->context, "ROLLBACK", pStatus) ;
}
if (gData->verbose)
{ sprintf(buf, "%i: CONNECT RESET\n", work->context);
cout << buf ;
}
EXEC SQL CONNECT RESET;
CHECKERR (work->context, "CONNECT RESET", pStatus) ;
}
if (gData->verbose)
{ sprintf(buf, "%i: sqleDetachFromCtx\n", work->context);
cout << buf ;
}
rc = sqleDetachFromCtx((gData->ctxlist)[work->context].ctx, NULL, &sqlca);
check_expected(rc == 0 && sqlca.sqlcode == 0);
rc = pthread_mutex_lock(&gData->cond_m);
check_expected(rc == 0);
if (gData->verbose)
{ sprintf(buf, "%i: marking context free\n", work->context);
cout << buf ;
}
(gData->ctxlist)[work->context].free = 1;
gData->contextsFree++;
rc = pthread_cond_signal(&gData->cond);
check_expected(rc == 0);
rc = pthread_mutex_unlock(&gData->cond_m);
check_expected(rc == 0);
delete(work->command);
delete(work);
}