diff options
Diffstat (limited to '')
-rw-r--r-- | ext/lsm1/lsm-test/lsmtest5.c | 633 |
1 files changed, 633 insertions, 0 deletions
diff --git a/ext/lsm1/lsm-test/lsmtest5.c b/ext/lsm1/lsm-test/lsmtest5.c new file mode 100644 index 0000000..f36184e --- /dev/null +++ b/ext/lsm1/lsm-test/lsmtest5.c @@ -0,0 +1,633 @@ + +/* +** This file is broken into three semi-autonomous parts: +** +** 1. The database functions. +** 2. The thread wrappers. +** 3. The implementation of the mt1.* tests. +*/ + +/************************************************************************* +** DATABASE CONTENTS: +** +** The database contains up to N key/value pairs, where N is some large +** number (say 10,000,000). Keys are integer values between 0 and (N-1). +** The value associated with each key is a pseudo-random blob of data. +** +** Key/value pair keys are encoded as the two bytes "k." followed by a +** 10-digit decimal number. i.e. key 45 -> "k.0000000045". +** +** As well as the key/value pairs, the database also contains checksum +** entries. The checksums form a hierarchy - for every F key/value +** entries there is one level 1 checksum. And for each F level 1 checksums +** there is one level 2 checksum. And so on. +** +** Checksum keys are encoded as the two byte "c." followed by the +** checksum level, followed by a 10 digit decimal number containing +** the value of the first key that contributes to the checksum value. +** For example, assuming F==10, the level 1 checksum that spans keys +** 10 to 19 is "c.1.0000000010". +** +** Clients may perform one of two operations on the database: a read +** or a write. +** +** READ OPERATIONS: +** +** A read operation scans a range of F key/value pairs. It computes +** the expected checksum and then compares the computed value to the +** actual value stored in the level 1 checksum entry. It then scans +** the group of F level 1 checksums, and compares the computed checksum +** to the associated level 2 checksum value, and so on until the +** highest level checksum value has been verified. +** +** If a checksum ever fails to match the expected value, the test +** has failed. +** +** WRITE OPERATIONS: +** +** A write operation involves writing (possibly clobbering) a single +** key/value pair. The associated level 1 checksum is then recalculated +** updated. Then the level 2 checksum, and so on until the highest +** level checksum has been modified. +** +** All updates occur inside a single transaction. +** +** INTERFACE: +** +** The interface used by test cases to read and write the db consists +** of type DbParameters and the following functions: +** +** dbReadOperation() +** dbWriteOperation() +*/ + +#include "lsmtest.h" + +typedef struct DbParameters DbParameters; +struct DbParameters { + int nFanout; /* Checksum fanout (F) */ + int nKey; /* Size of key space (N) */ +}; + +#define DB_KEY_BYTES (2+5+10+1) + +/* +** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size. +** This function populates the buffer with a nul-terminated key string +** corresponding to key iKey. +*/ +static void dbFormatKey( + DbParameters *pParam, + int iLevel, + int iKey, /* Key value */ + char *aBuf /* Write key string here */ +){ + if( iLevel==0 ){ + snprintf(aBuf, DB_KEY_BYTES, "k.%.10d", iKey); + }else{ + int f = 1; + int i; + for(i=0; i<iLevel; i++) f = f * pParam->nFanout; + snprintf(aBuf, DB_KEY_BYTES, "c.%d.%.10d", iLevel, f*(iKey/f)); + } +} + +/* +** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size. +** This function populates the buffer with the string representation of +** checksum value iVal. +*/ +static void dbFormatCksumValue(u32 iVal, char *aBuf){ + snprintf(aBuf, DB_KEY_BYTES, "%.10u", iVal); +} + +/* +** Return the highest level of checksum in the database described +** by *pParam. +*/ +static int dbMaxLevel(DbParameters *pParam){ + int iMax; + int n = 1; + for(iMax=0; n<pParam->nKey; iMax++){ + n = n * pParam->nFanout; + } + return iMax; +} + +static void dbCksum( + void *pCtx, /* IN/OUT: Pointer to u32 containing cksum */ + void *pKey, int nKey, /* Database key. Unused. */ + void *pVal, int nVal /* Database value. Checksum this. */ +){ + u8 *aVal = (u8 *)pVal; + u32 *pCksum = (u32 *)pCtx; + u32 cksum = *pCksum; + int i; + + unused_parameter(pKey); + unused_parameter(nKey); + + for(i=0; i<nVal; i++){ + cksum += (cksum<<3) + (int)aVal[i]; + } + + *pCksum = cksum; +} + +/* +** Compute the value of the checksum stored on level iLevel that contains +** data from key iKey by scanning the pParam->nFanout entries at level +** iLevel-1. +*/ +static u32 dbComputeCksum( + DbParameters *pParam, /* Database parameters */ + TestDb *pDb, /* Database connection handle */ + int iLevel, /* Level of checksum to compute */ + int iKey, /* Compute checksum for this key */ + int *pRc /* IN/OUT: Error code */ +){ + u32 cksum = 0; + if( *pRc==0 ){ + int nFirst; + int nLast; + int iFirst = 0; + int iLast = 0; + int i; + int f = 1; + char zFirst[DB_KEY_BYTES]; + char zLast[DB_KEY_BYTES]; + + assert( iLevel>=1 ); + for(i=0; i<iLevel; i++) f = f * pParam->nFanout; + + iFirst = f*(iKey/f); + iLast = iFirst + f - 1; + dbFormatKey(pParam, iLevel-1, iFirst, zFirst); + dbFormatKey(pParam, iLevel-1, iLast, zLast); + nFirst = strlen(zFirst); + nLast = strlen(zLast); + + *pRc = tdb_scan(pDb, (u32*)&cksum, 0, zFirst, nFirst, zLast, nLast,dbCksum); + } + + return cksum; +} + +static void dbReadOperation( + DbParameters *pParam, /* Database parameters */ + TestDb *pDb, /* Database connection handle */ + void (*xDelay)(void *), + void *pDelayCtx, + int iKey, /* Key to read */ + int *pRc /* IN/OUT: Error code */ +){ + const int iMax = dbMaxLevel(pParam); + int i; + + if( tdb_transaction_support(pDb) ) testBegin(pDb, 1, pRc); + for(i=1; *pRc==0 && i<=iMax; i++){ + char zCksum[DB_KEY_BYTES]; + char zKey[DB_KEY_BYTES]; + u32 iCksum = 0; + + iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc); + if( iCksum ){ + if( xDelay && i==1 ) xDelay(pDelayCtx); + dbFormatCksumValue(iCksum, zCksum); + dbFormatKey(pParam, i, iKey, zKey); + testFetchStr(pDb, zKey, zCksum, pRc); + } + } + if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc); +} + +static int dbWriteOperation( + DbParameters *pParam, /* Database parameters */ + TestDb *pDb, /* Database connection handle */ + int iKey, /* Key to write to */ + const char *zValue, /* Nul-terminated value to write */ + int *pRc /* IN/OUT: Error code */ +){ + const int iMax = dbMaxLevel(pParam); + char zKey[DB_KEY_BYTES]; + int i; + int rc; + + assert( iKey>=0 && iKey<pParam->nKey ); + dbFormatKey(pParam, 0, iKey, zKey); + + /* Open a write transaction. This may fail - SQLITE4_BUSY */ + if( *pRc==0 && tdb_transaction_support(pDb) ){ + rc = tdb_begin(pDb, 2); + if( rc==5 ) return 0; + *pRc = rc; + } + + testWriteStr(pDb, zKey, zValue, pRc); + for(i=1; i<=iMax; i++){ + char zCksum[DB_KEY_BYTES]; + u32 iCksum = 0; + + iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc); + dbFormatCksumValue(iCksum, zCksum); + dbFormatKey(pParam, i, iKey, zKey); + testWriteStr(pDb, zKey, zCksum, pRc); + } + if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc); + return 1; +} + +/************************************************************************* +** The following block contains testXXX() functions that implement a +** wrapper around the systems native multi-thread support. There are no +** synchronization primitives - just functions to launch and join +** threads. Wrapper functions are: +** +** testThreadSupport() +** +** testThreadInit() +** testThreadShutdown() +** testThreadLaunch() +** testThreadWait() +** +** testThreadSetHalt() +** testThreadGetHalt() +** testThreadSetResult() +** testThreadGetResult() +** +** testThreadEnterMutex() +** testThreadLeaveMutex() +*/ +typedef struct ThreadSet ThreadSet; +#ifdef LSM_MUTEX_PTHREADS + +#include <pthread.h> +#include <unistd.h> + +typedef struct Thread Thread; +struct Thread { + int rc; + char *zMsg; + pthread_t id; + void (*xMain)(ThreadSet *, int, void *); + void *pCtx; + ThreadSet *pThreadSet; +}; + +struct ThreadSet { + int bHalt; /* Halt flag */ + int nThread; /* Number of threads */ + Thread *aThread; /* Array of Thread structures */ + pthread_mutex_t mutex; /* Mutex used for cheating */ +}; + +/* +** Return true if this build supports threads, or false otherwise. If +** this function returns false, no other testThreadXXX() functions should +** be called. +*/ +static int testThreadSupport(){ return 1; } + +/* +** Allocate and return a thread-set handle with enough space allocated +** to handle up to nMax threads. Each call to this function should be +** matched by a call to testThreadShutdown() to delete the object. +*/ +static ThreadSet *testThreadInit(int nMax){ + int nByte; /* Total space to allocate */ + ThreadSet *p; /* Return value */ + + nByte = sizeof(ThreadSet) + sizeof(struct Thread) * nMax; + p = (ThreadSet *)testMalloc(nByte); + p->nThread = nMax; + p->aThread = (Thread *)&p[1]; + pthread_mutex_init(&p->mutex, 0); + + return p; +} + +/* +** Delete a thread-set object and release all resources held by it. +*/ +static void testThreadShutdown(ThreadSet *p){ + int i; + for(i=0; i<p->nThread; i++){ + testFree(p->aThread[i].zMsg); + } + pthread_mutex_destroy(&p->mutex); + testFree(p); +} + +static void *ttMain(void *pArg){ + Thread *pThread = (Thread *)pArg; + int iThread; + iThread = (pThread - pThread->pThreadSet->aThread); + pThread->xMain(pThread->pThreadSet, iThread, pThread->pCtx); + return 0; +} + +/* +** Launch a new thread. +*/ +static int testThreadLaunch( + ThreadSet *p, + int iThread, + void (*xMain)(ThreadSet *, int, void *), + void *pCtx +){ + int rc; + Thread *pThread; + + assert( iThread>=0 && iThread<p->nThread ); + + pThread = &p->aThread[iThread]; + assert( pThread->pThreadSet==0 ); + pThread->xMain = xMain; + pThread->pCtx = pCtx; + pThread->pThreadSet = p; + rc = pthread_create(&pThread->id, 0, ttMain, (void *)pThread); + + return rc; +} + +/* +** Set the thread-set "halt" flag. +*/ +static void testThreadSetHalt(ThreadSet *pThreadSet){ + pThreadSet->bHalt = 1; +} + +/* +** Return the current value of the thread-set "halt" flag. +*/ +static int testThreadGetHalt(ThreadSet *pThreadSet){ + return pThreadSet->bHalt; +} + +static void testThreadSleep(ThreadSet *pThreadSet, int nMs){ + int nRem = nMs; + while( nRem>0 && testThreadGetHalt(pThreadSet)==0 ){ + usleep(50000); + nRem -= 50; + } +} + +/* +** Wait for all threads launched to finish before returning. If nMs +** is greater than zero, set the "halt" flag to tell all threads +** to halt after waiting nMs milliseconds. +*/ +static void testThreadWait(ThreadSet *pThreadSet, int nMs){ + int i; + + testThreadSleep(pThreadSet, nMs); + testThreadSetHalt(pThreadSet); + for(i=0; i<pThreadSet->nThread; i++){ + Thread *pThread = &pThreadSet->aThread[i]; + if( pThread->xMain ){ + pthread_join(pThread->id, 0); + } + } +} + +/* +** Set the result for thread iThread. +*/ +static void testThreadSetResult( + ThreadSet *pThreadSet, /* Thread-set handle */ + int iThread, /* Set result for this thread */ + int rc, /* Result error code */ + char *zFmt, /* Result string format */ + ... /* Result string formatting args... */ +){ + va_list ap; + + testFree(pThreadSet->aThread[iThread].zMsg); + pThreadSet->aThread[iThread].rc = rc; + pThreadSet->aThread[iThread].zMsg = 0; + if( zFmt ){ + va_start(ap, zFmt); + pThreadSet->aThread[iThread].zMsg = testMallocVPrintf(zFmt, ap); + va_end(ap); + } +} + +/* +** Retrieve the result for thread iThread. +*/ +static int testThreadGetResult( + ThreadSet *pThreadSet, /* Thread-set handle */ + int iThread, /* Get result for this thread */ + const char **pzRes /* OUT: Pointer to result string */ +){ + if( pzRes ) *pzRes = pThreadSet->aThread[iThread].zMsg; + return pThreadSet->aThread[iThread].rc; +} + +/* +** Enter and leave the test case mutex. +*/ +#if 0 +static void testThreadEnterMutex(ThreadSet *p){ + pthread_mutex_lock(&p->mutex); +} +static void testThreadLeaveMutex(ThreadSet *p){ + pthread_mutex_unlock(&p->mutex); +} +#endif +#endif + +#if !defined(LSM_MUTEX_PTHREADS) +static int testThreadSupport(){ return 0; } + +#define testThreadInit(a) 0 +#define testThreadShutdown(a) +#define testThreadLaunch(a,b,c,d) 0 +#define testThreadWait(a,b) +#define testThreadSetHalt(a) +#define testThreadGetHalt(a) 0 +#define testThreadGetResult(a,b,c) 0 +#define testThreadSleep(a,b) 0 + +static void testThreadSetResult(ThreadSet *a, int b, int c, char *d, ...){ + unused_parameter(a); + unused_parameter(b); + unused_parameter(c); + unused_parameter(d); +} +#endif +/* End of threads wrapper. +*************************************************************************/ + +/************************************************************************* +** Below this point is the third part of this file - the implementation +** of the mt1.* tests. +*/ +typedef struct Mt1Test Mt1Test; +struct Mt1Test { + DbParameters param; /* Description of database to read/write */ + int nReadwrite; /* Number of read/write threads */ + int nFastReader; /* Number of fast reader threads */ + int nSlowReader; /* Number of slow reader threads */ + int nMs; /* How long to run for */ + const char *zSystem; /* Database system to test */ +}; + +typedef struct Mt1DelayCtx Mt1DelayCtx; +struct Mt1DelayCtx { + ThreadSet *pSet; /* Threadset to sleep within */ + int nMs; /* Sleep in ms */ +}; + +static void xMt1Delay(void *pCtx){ + Mt1DelayCtx *p = (Mt1DelayCtx *)pCtx; + testThreadSleep(p->pSet, p->nMs); +} + +#define MT1_THREAD_RDWR 0 +#define MT1_THREAD_SLOW 1 +#define MT1_THREAD_FAST 2 + +static void xMt1Work(lsm_db *pDb, void *pCtx){ +#if 0 + char *z = 0; + lsm_info(pDb, LSM_INFO_DB_STRUCTURE, &z); + printf("%s\n", z); + fflush(stdout); +#endif +} + +/* +** This is the main() proc for all threads in test case "mt1". +*/ +static void mt1Main(ThreadSet *pThreadSet, int iThread, void *pCtx){ + Mt1Test *p = (Mt1Test *)pCtx; /* Test parameters */ + Mt1DelayCtx delay; + int nRead = 0; /* Number of calls to dbReadOperation() */ + int nWrite = 0; /* Number of completed database writes */ + int rc = 0; /* Error code */ + int iPrng; /* Prng argument variable */ + TestDb *pDb; /* Database handle */ + int eType; + + delay.pSet = pThreadSet; + delay.nMs = 0; + if( iThread<p->nReadwrite ){ + eType = MT1_THREAD_RDWR; + }else if( iThread<(p->nReadwrite+p->nFastReader) ){ + eType = MT1_THREAD_FAST; + }else{ + eType = MT1_THREAD_SLOW; + delay.nMs = (p->nMs / 20); + } + + /* Open a new database connection. Initialize the pseudo-random number + ** argument based on the thread number. */ + iPrng = testPrngValue(iThread); + pDb = testOpen(p->zSystem, 0, &rc); + + if( rc==0 ){ + tdb_lsm_config_work_hook(pDb, xMt1Work, 0); + } + + /* Loop until either an error occurs or some other thread sets the + ** halt flag. */ + while( rc==0 && testThreadGetHalt(pThreadSet)==0 ){ + int iKey; + + /* Perform a read operation on an arbitrarily selected key. */ + iKey = (testPrngValue(iPrng++) % p->param.nKey); + dbReadOperation(&p->param, pDb, xMt1Delay, (void *)&delay, iKey, &rc); + if( rc ) continue; + nRead++; + + /* Attempt to write an arbitrary key value pair (and update the associated + ** checksum entries). dbWriteOperation() returns 1 if the write is + ** successful, or 0 if it failed with an LSM_BUSY error. */ + if( eType==MT1_THREAD_RDWR ){ + char aValue[50]; + char aRnd[25]; + + iKey = (testPrngValue(iPrng++) % p->param.nKey); + testPrngString(iPrng, aRnd, sizeof(aRnd)); + iPrng += sizeof(aRnd); + snprintf(aValue, sizeof(aValue), "%d.%s", iThread, aRnd); + nWrite += dbWriteOperation(&p->param, pDb, iKey, aValue, &rc); + } + } + testClose(&pDb); + + /* If an error has occured, set the thread error code and the threadset + ** halt flag to tell the other test threads to halt. Otherwise, set the + ** thread error code to 0 and post a message with the number of read + ** and write operations completed. */ + if( rc ){ + testThreadSetResult(pThreadSet, iThread, rc, 0); + testThreadSetHalt(pThreadSet); + }else{ + testThreadSetResult(pThreadSet, iThread, 0, "r/w: %d/%d", nRead, nWrite); + } +} + +static void do_test_mt1( + const char *zSystem, /* Database system name */ + const char *zPattern, /* Run test cases that match this pattern */ + int *pRc /* IN/OUT: Error code */ +){ + Mt1Test aTest[] = { + /* param, nReadwrite, nFastReader, nSlowReader, nMs, zSystem */ + { {10, 1000}, 4, 0, 0, 10000, 0 }, + { {10, 1000}, 4, 4, 2, 100000, 0 }, + { {10, 100000}, 4, 0, 0, 10000, 0 }, + { {10, 100000}, 4, 4, 2, 100000, 0 }, + }; + int i; + + for(i=0; *pRc==0 && i<ArraySize(aTest); i++){ + Mt1Test *p = &aTest[i]; + int bRun = testCaseBegin(pRc, zPattern, + "mt1.%s.db=%d,%d.ms=%d.rdwr=%d.fast=%d.slow=%d", + zSystem, p->param.nFanout, p->param.nKey, + p->nMs, p->nReadwrite, p->nFastReader, p->nSlowReader + ); + if( bRun ){ + TestDb *pDb; + ThreadSet *pSet; + int iThread; + int nThread; + + p->zSystem = zSystem; + pDb = testOpen(zSystem, 1, pRc); + + nThread = p->nReadwrite + p->nFastReader + p->nSlowReader; + pSet = testThreadInit(nThread); + for(iThread=0; *pRc==0 && iThread<nThread; iThread++){ + testThreadLaunch(pSet, iThread, mt1Main, (void *)p); + } + + testThreadWait(pSet, p->nMs); + for(iThread=0; *pRc==0 && iThread<nThread; iThread++){ + *pRc = testThreadGetResult(pSet, iThread, 0); + } + testCaseFinish(*pRc); + + for(iThread=0; *pRc==0 && iThread<nThread; iThread++){ + const char *zMsg = 0; + *pRc = testThreadGetResult(pSet, iThread, &zMsg); + printf(" Info: thread %d (%d): %s\n", iThread, *pRc, zMsg); + } + + testThreadShutdown(pSet); + testClose(&pDb); + } + } +} + +void test_mt( + const char *zSystem, /* Database system name */ + const char *zPattern, /* Run test cases that match this pattern */ + int *pRc /* IN/OUT: Error code */ +){ + if( testThreadSupport()==0 ) return; + do_test_mt1(zSystem, zPattern, pRc); +} |