summaryrefslogtreecommitdiffstats
path: root/ext/lsm1/lsm-test/lsmtest5.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ext/lsm1/lsm-test/lsmtest5.c633
1 files changed, 633 insertions, 0 deletions
diff --git a/ext/lsm1/lsm-test/lsmtest5.c b/ext/lsm1/lsm-test/lsmtest5.c
new file mode 100644
index 0000000..f36184e
--- /dev/null
+++ b/ext/lsm1/lsm-test/lsmtest5.c
@@ -0,0 +1,633 @@
+
+/*
+** This file is broken into three semi-autonomous parts:
+**
+** 1. The database functions.
+** 2. The thread wrappers.
+** 3. The implementation of the mt1.* tests.
+*/
+
+/*************************************************************************
+** DATABASE CONTENTS:
+**
+** The database contains up to N key/value pairs, where N is some large
+** number (say 10,000,000). Keys are integer values between 0 and (N-1).
+** The value associated with each key is a pseudo-random blob of data.
+**
+** Key/value pair keys are encoded as the two bytes "k." followed by a
+** 10-digit decimal number. i.e. key 45 -> "k.0000000045".
+**
+** As well as the key/value pairs, the database also contains checksum
+** entries. The checksums form a hierarchy - for every F key/value
+** entries there is one level 1 checksum. And for each F level 1 checksums
+** there is one level 2 checksum. And so on.
+**
+** Checksum keys are encoded as the two byte "c." followed by the
+** checksum level, followed by a 10 digit decimal number containing
+** the value of the first key that contributes to the checksum value.
+** For example, assuming F==10, the level 1 checksum that spans keys
+** 10 to 19 is "c.1.0000000010".
+**
+** Clients may perform one of two operations on the database: a read
+** or a write.
+**
+** READ OPERATIONS:
+**
+** A read operation scans a range of F key/value pairs. It computes
+** the expected checksum and then compares the computed value to the
+** actual value stored in the level 1 checksum entry. It then scans
+** the group of F level 1 checksums, and compares the computed checksum
+** to the associated level 2 checksum value, and so on until the
+** highest level checksum value has been verified.
+**
+** If a checksum ever fails to match the expected value, the test
+** has failed.
+**
+** WRITE OPERATIONS:
+**
+** A write operation involves writing (possibly clobbering) a single
+** key/value pair. The associated level 1 checksum is then recalculated
+** updated. Then the level 2 checksum, and so on until the highest
+** level checksum has been modified.
+**
+** All updates occur inside a single transaction.
+**
+** INTERFACE:
+**
+** The interface used by test cases to read and write the db consists
+** of type DbParameters and the following functions:
+**
+** dbReadOperation()
+** dbWriteOperation()
+*/
+
+#include "lsmtest.h"
+
+typedef struct DbParameters DbParameters;
+struct DbParameters {
+ int nFanout; /* Checksum fanout (F) */
+ int nKey; /* Size of key space (N) */
+};
+
+#define DB_KEY_BYTES (2+5+10+1)
+
+/*
+** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size.
+** This function populates the buffer with a nul-terminated key string
+** corresponding to key iKey.
+*/
+static void dbFormatKey(
+ DbParameters *pParam,
+ int iLevel,
+ int iKey, /* Key value */
+ char *aBuf /* Write key string here */
+){
+ if( iLevel==0 ){
+ snprintf(aBuf, DB_KEY_BYTES, "k.%.10d", iKey);
+ }else{
+ int f = 1;
+ int i;
+ for(i=0; i<iLevel; i++) f = f * pParam->nFanout;
+ snprintf(aBuf, DB_KEY_BYTES, "c.%d.%.10d", iLevel, f*(iKey/f));
+ }
+}
+
+/*
+** Argument aBuf[] must point to a buffer at least DB_KEY_BYTES in size.
+** This function populates the buffer with the string representation of
+** checksum value iVal.
+*/
+static void dbFormatCksumValue(u32 iVal, char *aBuf){
+ snprintf(aBuf, DB_KEY_BYTES, "%.10u", iVal);
+}
+
+/*
+** Return the highest level of checksum in the database described
+** by *pParam.
+*/
+static int dbMaxLevel(DbParameters *pParam){
+ int iMax;
+ int n = 1;
+ for(iMax=0; n<pParam->nKey; iMax++){
+ n = n * pParam->nFanout;
+ }
+ return iMax;
+}
+
+static void dbCksum(
+ void *pCtx, /* IN/OUT: Pointer to u32 containing cksum */
+ void *pKey, int nKey, /* Database key. Unused. */
+ void *pVal, int nVal /* Database value. Checksum this. */
+){
+ u8 *aVal = (u8 *)pVal;
+ u32 *pCksum = (u32 *)pCtx;
+ u32 cksum = *pCksum;
+ int i;
+
+ unused_parameter(pKey);
+ unused_parameter(nKey);
+
+ for(i=0; i<nVal; i++){
+ cksum += (cksum<<3) + (int)aVal[i];
+ }
+
+ *pCksum = cksum;
+}
+
+/*
+** Compute the value of the checksum stored on level iLevel that contains
+** data from key iKey by scanning the pParam->nFanout entries at level
+** iLevel-1.
+*/
+static u32 dbComputeCksum(
+ DbParameters *pParam, /* Database parameters */
+ TestDb *pDb, /* Database connection handle */
+ int iLevel, /* Level of checksum to compute */
+ int iKey, /* Compute checksum for this key */
+ int *pRc /* IN/OUT: Error code */
+){
+ u32 cksum = 0;
+ if( *pRc==0 ){
+ int nFirst;
+ int nLast;
+ int iFirst = 0;
+ int iLast = 0;
+ int i;
+ int f = 1;
+ char zFirst[DB_KEY_BYTES];
+ char zLast[DB_KEY_BYTES];
+
+ assert( iLevel>=1 );
+ for(i=0; i<iLevel; i++) f = f * pParam->nFanout;
+
+ iFirst = f*(iKey/f);
+ iLast = iFirst + f - 1;
+ dbFormatKey(pParam, iLevel-1, iFirst, zFirst);
+ dbFormatKey(pParam, iLevel-1, iLast, zLast);
+ nFirst = strlen(zFirst);
+ nLast = strlen(zLast);
+
+ *pRc = tdb_scan(pDb, (u32*)&cksum, 0, zFirst, nFirst, zLast, nLast,dbCksum);
+ }
+
+ return cksum;
+}
+
+static void dbReadOperation(
+ DbParameters *pParam, /* Database parameters */
+ TestDb *pDb, /* Database connection handle */
+ void (*xDelay)(void *),
+ void *pDelayCtx,
+ int iKey, /* Key to read */
+ int *pRc /* IN/OUT: Error code */
+){
+ const int iMax = dbMaxLevel(pParam);
+ int i;
+
+ if( tdb_transaction_support(pDb) ) testBegin(pDb, 1, pRc);
+ for(i=1; *pRc==0 && i<=iMax; i++){
+ char zCksum[DB_KEY_BYTES];
+ char zKey[DB_KEY_BYTES];
+ u32 iCksum = 0;
+
+ iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc);
+ if( iCksum ){
+ if( xDelay && i==1 ) xDelay(pDelayCtx);
+ dbFormatCksumValue(iCksum, zCksum);
+ dbFormatKey(pParam, i, iKey, zKey);
+ testFetchStr(pDb, zKey, zCksum, pRc);
+ }
+ }
+ if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc);
+}
+
+static int dbWriteOperation(
+ DbParameters *pParam, /* Database parameters */
+ TestDb *pDb, /* Database connection handle */
+ int iKey, /* Key to write to */
+ const char *zValue, /* Nul-terminated value to write */
+ int *pRc /* IN/OUT: Error code */
+){
+ const int iMax = dbMaxLevel(pParam);
+ char zKey[DB_KEY_BYTES];
+ int i;
+ int rc;
+
+ assert( iKey>=0 && iKey<pParam->nKey );
+ dbFormatKey(pParam, 0, iKey, zKey);
+
+ /* Open a write transaction. This may fail - SQLITE4_BUSY */
+ if( *pRc==0 && tdb_transaction_support(pDb) ){
+ rc = tdb_begin(pDb, 2);
+ if( rc==5 ) return 0;
+ *pRc = rc;
+ }
+
+ testWriteStr(pDb, zKey, zValue, pRc);
+ for(i=1; i<=iMax; i++){
+ char zCksum[DB_KEY_BYTES];
+ u32 iCksum = 0;
+
+ iCksum = dbComputeCksum(pParam, pDb, i, iKey, pRc);
+ dbFormatCksumValue(iCksum, zCksum);
+ dbFormatKey(pParam, i, iKey, zKey);
+ testWriteStr(pDb, zKey, zCksum, pRc);
+ }
+ if( tdb_transaction_support(pDb) ) testCommit(pDb, 0, pRc);
+ return 1;
+}
+
+/*************************************************************************
+** The following block contains testXXX() functions that implement a
+** wrapper around the systems native multi-thread support. There are no
+** synchronization primitives - just functions to launch and join
+** threads. Wrapper functions are:
+**
+** testThreadSupport()
+**
+** testThreadInit()
+** testThreadShutdown()
+** testThreadLaunch()
+** testThreadWait()
+**
+** testThreadSetHalt()
+** testThreadGetHalt()
+** testThreadSetResult()
+** testThreadGetResult()
+**
+** testThreadEnterMutex()
+** testThreadLeaveMutex()
+*/
+typedef struct ThreadSet ThreadSet;
+#ifdef LSM_MUTEX_PTHREADS
+
+#include <pthread.h>
+#include <unistd.h>
+
+typedef struct Thread Thread;
+struct Thread {
+ int rc;
+ char *zMsg;
+ pthread_t id;
+ void (*xMain)(ThreadSet *, int, void *);
+ void *pCtx;
+ ThreadSet *pThreadSet;
+};
+
+struct ThreadSet {
+ int bHalt; /* Halt flag */
+ int nThread; /* Number of threads */
+ Thread *aThread; /* Array of Thread structures */
+ pthread_mutex_t mutex; /* Mutex used for cheating */
+};
+
+/*
+** Return true if this build supports threads, or false otherwise. If
+** this function returns false, no other testThreadXXX() functions should
+** be called.
+*/
+static int testThreadSupport(){ return 1; }
+
+/*
+** Allocate and return a thread-set handle with enough space allocated
+** to handle up to nMax threads. Each call to this function should be
+** matched by a call to testThreadShutdown() to delete the object.
+*/
+static ThreadSet *testThreadInit(int nMax){
+ int nByte; /* Total space to allocate */
+ ThreadSet *p; /* Return value */
+
+ nByte = sizeof(ThreadSet) + sizeof(struct Thread) * nMax;
+ p = (ThreadSet *)testMalloc(nByte);
+ p->nThread = nMax;
+ p->aThread = (Thread *)&p[1];
+ pthread_mutex_init(&p->mutex, 0);
+
+ return p;
+}
+
+/*
+** Delete a thread-set object and release all resources held by it.
+*/
+static void testThreadShutdown(ThreadSet *p){
+ int i;
+ for(i=0; i<p->nThread; i++){
+ testFree(p->aThread[i].zMsg);
+ }
+ pthread_mutex_destroy(&p->mutex);
+ testFree(p);
+}
+
+static void *ttMain(void *pArg){
+ Thread *pThread = (Thread *)pArg;
+ int iThread;
+ iThread = (pThread - pThread->pThreadSet->aThread);
+ pThread->xMain(pThread->pThreadSet, iThread, pThread->pCtx);
+ return 0;
+}
+
+/*
+** Launch a new thread.
+*/
+static int testThreadLaunch(
+ ThreadSet *p,
+ int iThread,
+ void (*xMain)(ThreadSet *, int, void *),
+ void *pCtx
+){
+ int rc;
+ Thread *pThread;
+
+ assert( iThread>=0 && iThread<p->nThread );
+
+ pThread = &p->aThread[iThread];
+ assert( pThread->pThreadSet==0 );
+ pThread->xMain = xMain;
+ pThread->pCtx = pCtx;
+ pThread->pThreadSet = p;
+ rc = pthread_create(&pThread->id, 0, ttMain, (void *)pThread);
+
+ return rc;
+}
+
+/*
+** Set the thread-set "halt" flag.
+*/
+static void testThreadSetHalt(ThreadSet *pThreadSet){
+ pThreadSet->bHalt = 1;
+}
+
+/*
+** Return the current value of the thread-set "halt" flag.
+*/
+static int testThreadGetHalt(ThreadSet *pThreadSet){
+ return pThreadSet->bHalt;
+}
+
+static void testThreadSleep(ThreadSet *pThreadSet, int nMs){
+ int nRem = nMs;
+ while( nRem>0 && testThreadGetHalt(pThreadSet)==0 ){
+ usleep(50000);
+ nRem -= 50;
+ }
+}
+
+/*
+** Wait for all threads launched to finish before returning. If nMs
+** is greater than zero, set the "halt" flag to tell all threads
+** to halt after waiting nMs milliseconds.
+*/
+static void testThreadWait(ThreadSet *pThreadSet, int nMs){
+ int i;
+
+ testThreadSleep(pThreadSet, nMs);
+ testThreadSetHalt(pThreadSet);
+ for(i=0; i<pThreadSet->nThread; i++){
+ Thread *pThread = &pThreadSet->aThread[i];
+ if( pThread->xMain ){
+ pthread_join(pThread->id, 0);
+ }
+ }
+}
+
+/*
+** Set the result for thread iThread.
+*/
+static void testThreadSetResult(
+ ThreadSet *pThreadSet, /* Thread-set handle */
+ int iThread, /* Set result for this thread */
+ int rc, /* Result error code */
+ char *zFmt, /* Result string format */
+ ... /* Result string formatting args... */
+){
+ va_list ap;
+
+ testFree(pThreadSet->aThread[iThread].zMsg);
+ pThreadSet->aThread[iThread].rc = rc;
+ pThreadSet->aThread[iThread].zMsg = 0;
+ if( zFmt ){
+ va_start(ap, zFmt);
+ pThreadSet->aThread[iThread].zMsg = testMallocVPrintf(zFmt, ap);
+ va_end(ap);
+ }
+}
+
+/*
+** Retrieve the result for thread iThread.
+*/
+static int testThreadGetResult(
+ ThreadSet *pThreadSet, /* Thread-set handle */
+ int iThread, /* Get result for this thread */
+ const char **pzRes /* OUT: Pointer to result string */
+){
+ if( pzRes ) *pzRes = pThreadSet->aThread[iThread].zMsg;
+ return pThreadSet->aThread[iThread].rc;
+}
+
+/*
+** Enter and leave the test case mutex.
+*/
+#if 0
+static void testThreadEnterMutex(ThreadSet *p){
+ pthread_mutex_lock(&p->mutex);
+}
+static void testThreadLeaveMutex(ThreadSet *p){
+ pthread_mutex_unlock(&p->mutex);
+}
+#endif
+#endif
+
+#if !defined(LSM_MUTEX_PTHREADS)
+static int testThreadSupport(){ return 0; }
+
+#define testThreadInit(a) 0
+#define testThreadShutdown(a)
+#define testThreadLaunch(a,b,c,d) 0
+#define testThreadWait(a,b)
+#define testThreadSetHalt(a)
+#define testThreadGetHalt(a) 0
+#define testThreadGetResult(a,b,c) 0
+#define testThreadSleep(a,b) 0
+
+static void testThreadSetResult(ThreadSet *a, int b, int c, char *d, ...){
+ unused_parameter(a);
+ unused_parameter(b);
+ unused_parameter(c);
+ unused_parameter(d);
+}
+#endif
+/* End of threads wrapper.
+*************************************************************************/
+
+/*************************************************************************
+** Below this point is the third part of this file - the implementation
+** of the mt1.* tests.
+*/
+typedef struct Mt1Test Mt1Test;
+struct Mt1Test {
+ DbParameters param; /* Description of database to read/write */
+ int nReadwrite; /* Number of read/write threads */
+ int nFastReader; /* Number of fast reader threads */
+ int nSlowReader; /* Number of slow reader threads */
+ int nMs; /* How long to run for */
+ const char *zSystem; /* Database system to test */
+};
+
+typedef struct Mt1DelayCtx Mt1DelayCtx;
+struct Mt1DelayCtx {
+ ThreadSet *pSet; /* Threadset to sleep within */
+ int nMs; /* Sleep in ms */
+};
+
+static void xMt1Delay(void *pCtx){
+ Mt1DelayCtx *p = (Mt1DelayCtx *)pCtx;
+ testThreadSleep(p->pSet, p->nMs);
+}
+
+#define MT1_THREAD_RDWR 0
+#define MT1_THREAD_SLOW 1
+#define MT1_THREAD_FAST 2
+
+static void xMt1Work(lsm_db *pDb, void *pCtx){
+#if 0
+ char *z = 0;
+ lsm_info(pDb, LSM_INFO_DB_STRUCTURE, &z);
+ printf("%s\n", z);
+ fflush(stdout);
+#endif
+}
+
+/*
+** This is the main() proc for all threads in test case "mt1".
+*/
+static void mt1Main(ThreadSet *pThreadSet, int iThread, void *pCtx){
+ Mt1Test *p = (Mt1Test *)pCtx; /* Test parameters */
+ Mt1DelayCtx delay;
+ int nRead = 0; /* Number of calls to dbReadOperation() */
+ int nWrite = 0; /* Number of completed database writes */
+ int rc = 0; /* Error code */
+ int iPrng; /* Prng argument variable */
+ TestDb *pDb; /* Database handle */
+ int eType;
+
+ delay.pSet = pThreadSet;
+ delay.nMs = 0;
+ if( iThread<p->nReadwrite ){
+ eType = MT1_THREAD_RDWR;
+ }else if( iThread<(p->nReadwrite+p->nFastReader) ){
+ eType = MT1_THREAD_FAST;
+ }else{
+ eType = MT1_THREAD_SLOW;
+ delay.nMs = (p->nMs / 20);
+ }
+
+ /* Open a new database connection. Initialize the pseudo-random number
+ ** argument based on the thread number. */
+ iPrng = testPrngValue(iThread);
+ pDb = testOpen(p->zSystem, 0, &rc);
+
+ if( rc==0 ){
+ tdb_lsm_config_work_hook(pDb, xMt1Work, 0);
+ }
+
+ /* Loop until either an error occurs or some other thread sets the
+ ** halt flag. */
+ while( rc==0 && testThreadGetHalt(pThreadSet)==0 ){
+ int iKey;
+
+ /* Perform a read operation on an arbitrarily selected key. */
+ iKey = (testPrngValue(iPrng++) % p->param.nKey);
+ dbReadOperation(&p->param, pDb, xMt1Delay, (void *)&delay, iKey, &rc);
+ if( rc ) continue;
+ nRead++;
+
+ /* Attempt to write an arbitrary key value pair (and update the associated
+ ** checksum entries). dbWriteOperation() returns 1 if the write is
+ ** successful, or 0 if it failed with an LSM_BUSY error. */
+ if( eType==MT1_THREAD_RDWR ){
+ char aValue[50];
+ char aRnd[25];
+
+ iKey = (testPrngValue(iPrng++) % p->param.nKey);
+ testPrngString(iPrng, aRnd, sizeof(aRnd));
+ iPrng += sizeof(aRnd);
+ snprintf(aValue, sizeof(aValue), "%d.%s", iThread, aRnd);
+ nWrite += dbWriteOperation(&p->param, pDb, iKey, aValue, &rc);
+ }
+ }
+ testClose(&pDb);
+
+ /* If an error has occured, set the thread error code and the threadset
+ ** halt flag to tell the other test threads to halt. Otherwise, set the
+ ** thread error code to 0 and post a message with the number of read
+ ** and write operations completed. */
+ if( rc ){
+ testThreadSetResult(pThreadSet, iThread, rc, 0);
+ testThreadSetHalt(pThreadSet);
+ }else{
+ testThreadSetResult(pThreadSet, iThread, 0, "r/w: %d/%d", nRead, nWrite);
+ }
+}
+
+static void do_test_mt1(
+ const char *zSystem, /* Database system name */
+ const char *zPattern, /* Run test cases that match this pattern */
+ int *pRc /* IN/OUT: Error code */
+){
+ Mt1Test aTest[] = {
+ /* param, nReadwrite, nFastReader, nSlowReader, nMs, zSystem */
+ { {10, 1000}, 4, 0, 0, 10000, 0 },
+ { {10, 1000}, 4, 4, 2, 100000, 0 },
+ { {10, 100000}, 4, 0, 0, 10000, 0 },
+ { {10, 100000}, 4, 4, 2, 100000, 0 },
+ };
+ int i;
+
+ for(i=0; *pRc==0 && i<ArraySize(aTest); i++){
+ Mt1Test *p = &aTest[i];
+ int bRun = testCaseBegin(pRc, zPattern,
+ "mt1.%s.db=%d,%d.ms=%d.rdwr=%d.fast=%d.slow=%d",
+ zSystem, p->param.nFanout, p->param.nKey,
+ p->nMs, p->nReadwrite, p->nFastReader, p->nSlowReader
+ );
+ if( bRun ){
+ TestDb *pDb;
+ ThreadSet *pSet;
+ int iThread;
+ int nThread;
+
+ p->zSystem = zSystem;
+ pDb = testOpen(zSystem, 1, pRc);
+
+ nThread = p->nReadwrite + p->nFastReader + p->nSlowReader;
+ pSet = testThreadInit(nThread);
+ for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
+ testThreadLaunch(pSet, iThread, mt1Main, (void *)p);
+ }
+
+ testThreadWait(pSet, p->nMs);
+ for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
+ *pRc = testThreadGetResult(pSet, iThread, 0);
+ }
+ testCaseFinish(*pRc);
+
+ for(iThread=0; *pRc==0 && iThread<nThread; iThread++){
+ const char *zMsg = 0;
+ *pRc = testThreadGetResult(pSet, iThread, &zMsg);
+ printf(" Info: thread %d (%d): %s\n", iThread, *pRc, zMsg);
+ }
+
+ testThreadShutdown(pSet);
+ testClose(&pDb);
+ }
+ }
+}
+
+void test_mt(
+ const char *zSystem, /* Database system name */
+ const char *zPattern, /* Run test cases that match this pattern */
+ int *pRc /* IN/OUT: Error code */
+){
+ if( testThreadSupport()==0 ) return;
+ do_test_mt1(zSystem, zPattern, pRc);
+}