diff options
Diffstat (limited to 'ext/lsm1/lsm-test/lsmtest_tdb3.c')
-rw-r--r-- | ext/lsm1/lsm-test/lsmtest_tdb3.c | 1429 |
1 files changed, 1429 insertions, 0 deletions
diff --git a/ext/lsm1/lsm-test/lsmtest_tdb3.c b/ext/lsm1/lsm-test/lsmtest_tdb3.c new file mode 100644 index 0000000..e29497a --- /dev/null +++ b/ext/lsm1/lsm-test/lsmtest_tdb3.c @@ -0,0 +1,1429 @@ + +#include "lsmtest_tdb.h" +#include "lsm.h" +#include "lsmtest.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#ifndef _WIN32 +# include <unistd.h> +#endif +#include <stdio.h> + +#ifndef _WIN32 +# include <sys/time.h> +#endif + +typedef struct LsmDb LsmDb; +typedef struct LsmWorker LsmWorker; +typedef struct LsmFile LsmFile; + +#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024) +#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024) + +#ifdef LSM_MUTEX_PTHREADS +#include <pthread.h> + +#define LSMTEST_THREAD_CKPT 1 +#define LSMTEST_THREAD_WORKER 2 +#define LSMTEST_THREAD_WORKER_AC 3 + +/* +** There are several different types of worker threads that run in different +** test configurations, depending on the value of LsmWorker.eType. +** +** 1. Checkpointer. +** 2. Worker with auto-checkpoint. +** 3. Worker without auto-checkpoint. +*/ +struct LsmWorker { + LsmDb *pDb; /* Main database structure */ + lsm_db *pWorker; /* Worker database handle */ + pthread_t worker_thread; /* Worker thread */ + pthread_cond_t worker_cond; /* Condition var the worker waits on */ + pthread_mutex_t worker_mutex; /* Mutex used with worker_cond */ + int bDoWork; /* Set to true by client when there is work */ + int worker_rc; /* Store error code here */ + int eType; /* LSMTEST_THREAD_XXX constant */ + int bBlock; +}; +#else +struct LsmWorker { int worker_rc; int bBlock; }; +#endif + +static void mt_shutdown(LsmDb *); + +lsm_env *tdb_lsm_env(void){ + static int bInit = 0; + static lsm_env env; + if( bInit==0 ){ + memcpy(&env, lsm_default_env(), sizeof(env)); + bInit = 1; + } + return &env; +} + +typedef struct FileSector FileSector; +typedef struct FileData FileData; + +struct FileSector { + u8 *aOld; /* Old data for this sector */ +}; + +struct FileData { + int nSector; /* Allocated size of apSector[] array */ + FileSector *aSector; /* Array of file sectors */ +}; + +/* +** bPrepareCrash: +** If non-zero, the file wrappers maintain enough in-memory data to +** simulate the effect of a power-failure on the file-system (i.e. that +** unsynced sectors may be written, not written, or overwritten with +** arbitrary data when the crash occurs). +** +** bCrashed: +** Set to true after a crash is simulated. Once this variable is true, all +** VFS methods other than xClose() return LSM_IOERR as soon as they are +** called (without affecting the contents of the file-system). +** +** env: +** The environment object used by all lsm_db* handles opened by this +** object (i.e. LsmDb.db plus any worker connections). Variable env.pVfsCtx +** always points to the containing LsmDb structure. +*/ +struct LsmDb { + TestDb base; /* Base class - methods table */ + lsm_env env; /* Environment used by connection db */ + char *zName; /* Database file name */ + lsm_db *db; /* LSM database handle */ + + lsm_cursor *pCsr; /* Cursor held open during read transaction */ + void *pBuf; /* Buffer for tdb_fetch() output */ + int nBuf; /* Allocated (not used) size of pBuf */ + + /* Crash testing related state */ + int bCrashed; /* True once a crash has occurred */ + int nAutoCrash; /* Number of syncs until a crash */ + int bPrepareCrash; /* True to store writes in memory */ + + /* Unsynced data (while crash testing) */ + int szSector; /* Assumed size of disk sectors (512B) */ + FileData aFile[2]; /* Database and log file data */ + + /* Other test instrumentation */ + int bNoRecovery; /* If true, assume DMS2 is locked */ + + /* Work hook redirection */ + void (*xWork)(lsm_db *, void *); + void *pWorkCtx; + + /* IO logging hook */ + void (*xWriteHook)(void *, int, lsm_i64, int, int); + void *pWriteCtx; + + /* Worker threads (for lsm_mt) */ + int nMtMinCkpt; + int nMtMaxCkpt; + int eMode; + int nWorker; + LsmWorker *aWorker; +}; + +#define LSMTEST_MODE_SINGLETHREAD 1 +#define LSMTEST_MODE_BACKGROUND_CKPT 2 +#define LSMTEST_MODE_BACKGROUND_WORK 3 +#define LSMTEST_MODE_BACKGROUND_BOTH 4 + +/************************************************************************* +************************************************************************** +** Begin test VFS code. +*/ + +struct LsmFile { + lsm_file *pReal; /* Real underlying file */ + int bLog; /* True for log file. False for db file */ + LsmDb *pDb; /* Database handle that uses this file */ +}; + +static int testEnvFullpath( + lsm_env *pEnv, /* Environment for current LsmDb */ + const char *zFile, /* Relative path name */ + char *zOut, /* Output buffer */ + int *pnOut /* IN/OUT: Size of output buffer */ +){ + lsm_env *pRealEnv = tdb_lsm_env(); + return pRealEnv->xFullpath(pRealEnv, zFile, zOut, pnOut); +} + +static int testEnvOpen( + lsm_env *pEnv, /* Environment for current LsmDb */ + const char *zFile, /* Name of file to open */ + int flags, + lsm_file **ppFile /* OUT: New file handle object */ +){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmDb *pDb = (LsmDb *)pEnv->pVfsCtx; + int rc; /* Return Code */ + LsmFile *pRet; /* The new file handle */ + int nFile; /* Length of string zFile in bytes */ + + nFile = strlen(zFile); + pRet = (LsmFile *)testMalloc(sizeof(LsmFile)); + pRet->pDb = pDb; + pRet->bLog = (nFile > 4 && 0==memcmp("-log", &zFile[nFile-4], 4)); + + rc = pRealEnv->xOpen(pRealEnv, zFile, flags, &pRet->pReal); + if( rc!=LSM_OK ){ + testFree(pRet); + pRet = 0; + } + + *ppFile = (lsm_file *)pRet; + return rc; +} + +static int testEnvRead(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + if( p->pDb->bCrashed ) return LSM_IOERR; + return pRealEnv->xRead(p->pReal, iOff, pData, nData); +} + +static int testEnvWrite(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + LsmDb *pDb = p->pDb; + + if( pDb->bCrashed ) return LSM_IOERR; + + if( pDb->bPrepareCrash ){ + FileData *pData2 = &pDb->aFile[p->bLog]; + int iFirst; + int iLast; + int iSector; + + iFirst = (int)(iOff / pDb->szSector); + iLast = (int)((iOff + nData - 1) / pDb->szSector); + + if( pData2->nSector<(iLast+1) ){ + int nNew = ( ((iLast + 1) + 63) / 64 ) * 64; + assert( nNew>iLast ); + pData2->aSector = (FileSector *)testRealloc( + pData2->aSector, nNew*sizeof(FileSector) + ); + memset(&pData2->aSector[pData2->nSector], + 0, (nNew - pData2->nSector) * sizeof(FileSector) + ); + pData2->nSector = nNew; + } + + for(iSector=iFirst; iSector<=iLast; iSector++){ + if( pData2->aSector[iSector].aOld==0 ){ + u8 *aOld = (u8 *)testMalloc(pDb->szSector); + pRealEnv->xRead( + p->pReal, (lsm_i64)iSector*pDb->szSector, aOld, pDb->szSector + ); + pData2->aSector[iSector].aOld = aOld; + } + } + } + + if( pDb->xWriteHook ){ + int rc; + int nUs; + struct timeval t1; + struct timeval t2; + + gettimeofday(&t1, 0); + assert( nData>0 ); + rc = pRealEnv->xWrite(p->pReal, iOff, pData, nData); + gettimeofday(&t2, 0); + + nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec); + pDb->xWriteHook(pDb->pWriteCtx, p->bLog, iOff, nData, nUs); + return rc; + } + + return pRealEnv->xWrite(p->pReal, iOff, pData, nData); +} + +static void doSystemCrash(LsmDb *pDb); + +static int testEnvSync(lsm_file *pFile){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + LsmDb *pDb = p->pDb; + FileData *pData = &pDb->aFile[p->bLog]; + int i; + + if( pDb->bCrashed ) return LSM_IOERR; + + if( pDb->nAutoCrash ){ + pDb->nAutoCrash--; + if( pDb->nAutoCrash==0 ){ + doSystemCrash(pDb); + pDb->bCrashed = 1; + return LSM_IOERR; + } + } + + if( pDb->bPrepareCrash ){ + for(i=0; i<pData->nSector; i++){ + testFree(pData->aSector[i].aOld); + pData->aSector[i].aOld = 0; + } + } + + if( pDb->xWriteHook ){ + int rc; + int nUs; + struct timeval t1; + struct timeval t2; + + gettimeofday(&t1, 0); + rc = pRealEnv->xSync(p->pReal); + gettimeofday(&t2, 0); + + nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec); + pDb->xWriteHook(pDb->pWriteCtx, p->bLog, 0, 0, nUs); + return rc; + } + + return pRealEnv->xSync(p->pReal); +} + +static int testEnvTruncate(lsm_file *pFile, lsm_i64 iOff){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + if( p->pDb->bCrashed ) return LSM_IOERR; + return pRealEnv->xTruncate(p->pReal, iOff); +} + +static int testEnvSectorSize(lsm_file *pFile){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + return pRealEnv->xSectorSize(p->pReal); +} + +static int testEnvRemap( + lsm_file *pFile, + lsm_i64 iMin, + void **ppOut, + lsm_i64 *pnOut +){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + return pRealEnv->xRemap(p->pReal, iMin, ppOut, pnOut); +} + +static int testEnvFileid( + lsm_file *pFile, + void *ppOut, + int *pnOut +){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + return pRealEnv->xFileid(p->pReal, ppOut, pnOut); +} + +static int testEnvClose(lsm_file *pFile){ + lsm_env *pRealEnv = tdb_lsm_env(); + LsmFile *p = (LsmFile *)pFile; + + pRealEnv->xClose(p->pReal); + testFree(p); + return LSM_OK; +} + +static int testEnvUnlink(lsm_env *pEnv, const char *zFile){ + lsm_env *pRealEnv = tdb_lsm_env(); + unused_parameter(pEnv); + return pRealEnv->xUnlink(pRealEnv, zFile); +} + +static int testEnvLock(lsm_file *pFile, int iLock, int eType){ + LsmFile *p = (LsmFile *)pFile; + lsm_env *pRealEnv = tdb_lsm_env(); + + if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){ + return LSM_BUSY; + } + return pRealEnv->xLock(p->pReal, iLock, eType); +} + +static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){ + LsmFile *p = (LsmFile *)pFile; + lsm_env *pRealEnv = tdb_lsm_env(); + + if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){ + return LSM_BUSY; + } + return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType); +} + +static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){ + LsmFile *p = (LsmFile *)pFile; + lsm_env *pRealEnv = tdb_lsm_env(); + return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp); +} + +static void testEnvShmBarrier(void){ +} + +static int testEnvShmUnmap(lsm_file *pFile, int bDel){ + LsmFile *p = (LsmFile *)pFile; + lsm_env *pRealEnv = tdb_lsm_env(); + return pRealEnv->xShmUnmap(p->pReal, bDel); +} + +static int testEnvSleep(lsm_env *pEnv, int us){ + lsm_env *pRealEnv = tdb_lsm_env(); + return pRealEnv->xSleep(pRealEnv, us); +} + +static void doSystemCrash(LsmDb *pDb){ + lsm_env *pEnv = tdb_lsm_env(); + int iFile; + int iSeed = pDb->aFile[0].nSector + pDb->aFile[1].nSector; + + char *zFile = pDb->zName; + char *zFree = 0; + + for(iFile=0; iFile<2; iFile++){ + lsm_file *pFile = 0; + int i; + + pEnv->xOpen(pEnv, zFile, 0, &pFile); + for(i=0; i<pDb->aFile[iFile].nSector; i++){ + u8 *aOld = pDb->aFile[iFile].aSector[i].aOld; + if( aOld ){ + int iOpt = testPrngValue(iSeed++) % 3; + switch( iOpt ){ + case 0: + break; + + case 1: + testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4); + /* Fall-through */ + + case 2: + pEnv->xWrite( + pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector + ); + break; + } + testFree(aOld); + pDb->aFile[iFile].aSector[i].aOld = 0; + } + } + pEnv->xClose(pFile); + zFree = zFile = sqlite3_mprintf("%s-log", pDb->zName); + } + + sqlite3_free(zFree); +} +/* +** End test VFS code. +************************************************************************** +*************************************************************************/ + +/************************************************************************* +************************************************************************** +** Begin test compression hooks. +*/ + +#ifdef HAVE_ZLIB +#include <zlib.h> + +static int testZipBound(void *pCtx, int nSrc){ + return compressBound(nSrc); +} + +static int testZipCompress( + void *pCtx, /* Context pointer */ + char *aOut, int *pnOut, /* OUT: Buffer containing compressed data */ + const char *aIn, int nIn /* Buffer containing input data */ +){ + uLongf n = *pnOut; /* In/out buffer size for compress() */ + int rc; /* compress() return code */ + + rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); + *pnOut = n; + return (rc==Z_OK ? 0 : LSM_ERROR); +} + +static int testZipUncompress( + void *pCtx, /* Context pointer */ + char *aOut, int *pnOut, /* OUT: Buffer containing uncompressed data */ + const char *aIn, int nIn /* Buffer containing input data */ +){ + uLongf n = *pnOut; /* In/out buffer size for uncompress() */ + int rc; /* uncompress() return code */ + + rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn); + *pnOut = n; + return (rc==Z_OK ? 0 : LSM_ERROR); +} + +static int testConfigureCompression(lsm_db *pDb){ + static lsm_compress zip = { + 0, /* Context pointer (unused) */ + 1, /* Id value */ + testZipBound, /* xBound method */ + testZipCompress, /* xCompress method */ + testZipUncompress /* xUncompress method */ + }; + return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip); +} +#endif /* ifdef HAVE_ZLIB */ + +/* +** End test compression hooks. +************************************************************************** +*************************************************************************/ + +static int test_lsm_close(TestDb *pTestDb){ + int i; + int rc = LSM_OK; + LsmDb *pDb = (LsmDb *)pTestDb; + + lsm_csr_close(pDb->pCsr); + lsm_close(pDb->db); + + /* If this is a multi-threaded database, wait on the worker threads. */ + mt_shutdown(pDb); + for(i=0; i<pDb->nWorker && rc==LSM_OK; i++){ + rc = pDb->aWorker[i].worker_rc; + } + + for(i=0; i<pDb->aFile[0].nSector; i++){ + testFree(pDb->aFile[0].aSector[i].aOld); + } + testFree(pDb->aFile[0].aSector); + for(i=0; i<pDb->aFile[1].nSector; i++){ + testFree(pDb->aFile[1].aSector[i].aOld); + } + testFree(pDb->aFile[1].aSector); + + memset(pDb, sizeof(LsmDb), 0x11); + testFree((char *)pDb->pBuf); + testFree((char *)pDb); + return rc; +} + +static void mt_signal_worker(LsmDb*, int); + +static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){ + int nSleep = 0; + int nKB; + int rc; + + do { + nKB = 0; + rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB); + if( rc!=LSM_OK || nKB<pDb->nMtMaxCkpt ) break; +#ifdef LSM_MUTEX_PTHREADS + mt_signal_worker(pDb, + (pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ? 0 : 1) + ); +#endif + usleep(5000); + nSleep += 5; + }while( 1 ); + +#if 0 + if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep); +#endif + + return rc; +} + +static int waitOnWorker(LsmDb *pDb){ + int rc; + int nLimit = -1; + int nSleep = 0; + + rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit); + do { + int nOld, nNew, rc2; + rc2 = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew); + if( rc2!=LSM_OK ) return rc2; + if( nOld==0 || nNew<(nLimit/2) ) break; +#ifdef LSM_MUTEX_PTHREADS + mt_signal_worker(pDb, 0); +#endif + usleep(5000); + nSleep += 5; + }while( 1 ); + +#if 0 + if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep); +#endif + + return rc; +} + +static int test_lsm_write( + TestDb *pTestDb, + void *pKey, + int nKey, + void *pVal, + int nVal +){ + LsmDb *pDb = (LsmDb *)pTestDb; + int rc = LSM_OK; + + if( pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ){ + rc = waitOnCheckpointer(pDb, pDb->db); + }else if( + pDb->eMode==LSMTEST_MODE_BACKGROUND_WORK + || pDb->eMode==LSMTEST_MODE_BACKGROUND_BOTH + ){ + rc = waitOnWorker(pDb); + } + + if( rc==LSM_OK ){ + rc = lsm_insert(pDb->db, pKey, nKey, pVal, nVal); + } + return rc; +} + +static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){ + LsmDb *pDb = (LsmDb *)pTestDb; + return lsm_delete(pDb->db, pKey, nKey); +} + +static int test_lsm_delete_range( + TestDb *pTestDb, + void *pKey1, int nKey1, + void *pKey2, int nKey2 +){ + LsmDb *pDb = (LsmDb *)pTestDb; + return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2); +} + +static int test_lsm_fetch( + TestDb *pTestDb, + void *pKey, + int nKey, + void **ppVal, + int *pnVal +){ + int rc; + LsmDb *pDb = (LsmDb *)pTestDb; + lsm_cursor *csr; + + if( pKey==0 ) return LSM_OK; + + if( pDb->pCsr==0 ){ + rc = lsm_csr_open(pDb->db, &csr); + if( rc!=LSM_OK ) return rc; + }else{ + csr = pDb->pCsr; + } + + rc = lsm_csr_seek(csr, pKey, nKey, LSM_SEEK_EQ); + if( rc==LSM_OK ){ + if( lsm_csr_valid(csr) ){ + const void *pVal; int nVal; + rc = lsm_csr_value(csr, &pVal, &nVal); + if( nVal>pDb->nBuf ){ + testFree(pDb->pBuf); + pDb->pBuf = testMalloc(nVal*2); + pDb->nBuf = nVal*2; + } + memcpy(pDb->pBuf, pVal, nVal); + *ppVal = pDb->pBuf; + *pnVal = nVal; + }else{ + *ppVal = 0; + *pnVal = -1; + } + } + if( pDb->pCsr==0 ){ + lsm_csr_close(csr); + } + return rc; +} + +static int test_lsm_scan( + TestDb *pTestDb, + void *pCtx, + int bReverse, + void *pFirst, int nFirst, + void *pLast, int nLast, + void (*xCallback)(void *, void *, int , void *, int) +){ + LsmDb *pDb = (LsmDb *)pTestDb; + lsm_cursor *csr; + lsm_cursor *csr2 = 0; + int rc; + + if( pDb->pCsr==0 ){ + rc = lsm_csr_open(pDb->db, &csr); + if( rc!=LSM_OK ) return rc; + }else{ + rc = LSM_OK; + csr = pDb->pCsr; + } + + /* To enhance testing, if both pLast and pFirst are defined, seek the + ** cursor to the "end" boundary here. Then the next block seeks it to + ** the "start" ready for the scan. The point is to test that cursors + ** can be reused. */ + if( pLast && pFirst ){ + if( bReverse ){ + rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_LE); + }else{ + rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_GE); + } + } + + if( bReverse ){ + if( pLast ){ + rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_LE); + }else{ + rc = lsm_csr_last(csr); + } + }else{ + if( pFirst ){ + rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_GE); + }else{ + rc = lsm_csr_first(csr); + } + } + + while( rc==LSM_OK && lsm_csr_valid(csr) ){ + const void *pKey; int nKey; + const void *pVal; int nVal; + int cmp; + + lsm_csr_key(csr, &pKey, &nKey); + lsm_csr_value(csr, &pVal, &nVal); + + if( bReverse && pFirst ){ + cmp = memcmp(pFirst, pKey, MIN(nKey, nFirst)); + if( cmp>0 || (cmp==0 && nFirst>nKey) ) break; + }else if( bReverse==0 && pLast ){ + cmp = memcmp(pLast, pKey, MIN(nKey, nLast)); + if( cmp<0 || (cmp==0 && nLast<nKey) ) break; + } + + xCallback(pCtx, (void *)pKey, nKey, (void *)pVal, nVal); + + if( bReverse ){ + rc = lsm_csr_prev(csr); + }else{ + rc = lsm_csr_next(csr); + } + } + + if( pDb->pCsr==0 ){ + lsm_csr_close(csr); + } + return rc; +} + +static int test_lsm_begin(TestDb *pTestDb, int iLevel){ + int rc = LSM_OK; + LsmDb *pDb = (LsmDb *)pTestDb; + + /* iLevel==0 is a no-op. */ + if( iLevel==0 ) return 0; + + if( pDb->pCsr==0 ) rc = lsm_csr_open(pDb->db, &pDb->pCsr); + if( rc==LSM_OK && iLevel>1 ){ + rc = lsm_begin(pDb->db, iLevel-1); + } + + return rc; +} +static int test_lsm_commit(TestDb *pTestDb, int iLevel){ + LsmDb *pDb = (LsmDb *)pTestDb; + + /* If iLevel==0, close any open read transaction */ + if( iLevel==0 && pDb->pCsr ){ + lsm_csr_close(pDb->pCsr); + pDb->pCsr = 0; + } + + /* If iLevel==0, close any open read transaction */ + return lsm_commit(pDb->db, MAX(0, iLevel-1)); +} +static int test_lsm_rollback(TestDb *pTestDb, int iLevel){ + LsmDb *pDb = (LsmDb *)pTestDb; + + /* If iLevel==0, close any open read transaction */ + if( iLevel==0 && pDb->pCsr ){ + lsm_csr_close(pDb->pCsr); + pDb->pCsr = 0; + } + + return lsm_rollback(pDb->db, MAX(0, iLevel-1)); +} + +/* +** A log message callback registered with lsm connections. Prints all +** messages to stderr. +*/ +static void xLog(void *pCtx, int rc, const char *z){ + unused_parameter(rc); + /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */ + if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx); + fprintf(stderr, "%s\n", z); + fflush(stderr); +} + +static void xWorkHook(lsm_db *db, void *pArg){ + LsmDb *p = (LsmDb *)pArg; + if( p->xWork ) p->xWork(db, p->pWorkCtx); +} + +#define TEST_NO_RECOVERY -1 +#define TEST_COMPRESSION -3 + +#define TEST_MT_MODE -2 +#define TEST_MT_MIN_CKPT -4 +#define TEST_MT_MAX_CKPT -5 + + +int test_lsm_config_str( + LsmDb *pLsm, + lsm_db *db, + int bWorker, + const char *zStr, + int *pnThread +){ + struct CfgParam { + const char *zParam; + int bWorker; + int eParam; + } aParam[] = { + { "autoflush", 0, LSM_CONFIG_AUTOFLUSH }, + { "page_size", 0, LSM_CONFIG_PAGE_SIZE }, + { "block_size", 0, LSM_CONFIG_BLOCK_SIZE }, + { "safety", 0, LSM_CONFIG_SAFETY }, + { "autowork", 0, LSM_CONFIG_AUTOWORK }, + { "autocheckpoint", 0, LSM_CONFIG_AUTOCHECKPOINT }, + { "mmap", 0, LSM_CONFIG_MMAP }, + { "use_log", 0, LSM_CONFIG_USE_LOG }, + { "automerge", 0, LSM_CONFIG_AUTOMERGE }, + { "max_freelist", 0, LSM_CONFIG_MAX_FREELIST }, + { "multi_proc", 0, LSM_CONFIG_MULTIPLE_PROCESSES }, + { "worker_automerge", 1, LSM_CONFIG_AUTOMERGE }, + { "test_no_recovery", 0, TEST_NO_RECOVERY }, + { "bg_min_ckpt", 0, TEST_NO_RECOVERY }, + + { "mt_mode", 0, TEST_MT_MODE }, + { "mt_min_ckpt", 0, TEST_MT_MIN_CKPT }, + { "mt_max_ckpt", 0, TEST_MT_MAX_CKPT }, + +#ifdef HAVE_ZLIB + { "compression", 0, TEST_COMPRESSION }, +#endif + { 0, 0 } + }; + const char *z = zStr; + int nThread = 1; + + if( zStr==0 ) return 0; + + assert( db ); + while( z[0] ){ + const char *zStart; + + /* Skip whitespace */ + while( *z==' ' ) z++; + zStart = z; + + while( *z && *z!='=' ) z++; + if( *z ){ + int eParam; + int i; + int iVal; + int iMul = 1; + int rc; + char zParam[32]; + int nParam = z-zStart; + if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; + + memcpy(zParam, zStart, nParam); + zParam[nParam] = '\0'; + rc = testArgSelect(aParam, "param", zParam, &i); + if( rc!=0 ) return rc; + eParam = aParam[i].eParam; + + z++; + zStart = z; + while( *z>='0' && *z<='9' ) z++; + if( *z=='k' || *z=='K' ){ + iMul = 1; + z++; + }else if( *z=='M' || *z=='M' ){ + iMul = 1024; + z++; + } + nParam = z-zStart; + if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error; + memcpy(zParam, zStart, nParam); + zParam[nParam] = '\0'; + iVal = atoi(zParam) * iMul; + + if( eParam>0 ){ + if( bWorker || aParam[i].bWorker==0 ){ + lsm_config(db, eParam, &iVal); + } + }else{ + switch( eParam ){ + case TEST_NO_RECOVERY: + if( pLsm ) pLsm->bNoRecovery = iVal; + break; + case TEST_MT_MODE: + if( pLsm ) nThread = iVal; + break; + case TEST_MT_MIN_CKPT: + if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024; + break; + case TEST_MT_MAX_CKPT: + if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024; + break; +#ifdef HAVE_ZLIB + case TEST_COMPRESSION: + testConfigureCompression(db); + break; +#endif + } + } + }else if( z!=zStart ){ + goto syntax_error; + } + } + + if( pnThread ) *pnThread = nThread; + if( pLsm && pLsm->nMtMaxCkpt < pLsm->nMtMinCkpt ){ + pLsm->nMtMinCkpt = pLsm->nMtMaxCkpt; + } + + return 0; + syntax_error: + testPrintError("syntax error at: \"%s\"\n", z); + return 1; +} + +int tdb_lsm_config_str(TestDb *pDb, const char *zStr){ + int rc = 0; + if( tdb_lsm(pDb) ){ +#ifdef LSM_MUTEX_PTHREADS + int i; +#endif + LsmDb *pLsm = (LsmDb *)pDb; + + rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr, 0); +#ifdef LSM_MUTEX_PTHREADS + for(i=0; rc==0 && i<pLsm->nWorker; i++){ + rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr, 0); + } +#endif + } + return rc; +} + +int tdb_lsm_configure(lsm_db *db, const char *zConfig){ + return test_lsm_config_str(0, db, 0, zConfig, 0); +} + +static int testLsmStartWorkers(LsmDb *, int, const char *, const char *); + +static int testLsmOpen( + const char *zCfg, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + static const DatabaseMethods LsmMethods = { + test_lsm_close, + test_lsm_write, + test_lsm_delete, + test_lsm_delete_range, + test_lsm_fetch, + test_lsm_scan, + test_lsm_begin, + test_lsm_commit, + test_lsm_rollback + }; + + int rc; + int nFilename; + LsmDb *pDb; + + /* If the bClear flag is set, delete any existing database. */ + assert( zFilename); + if( bClear ) testDeleteLsmdb(zFilename); + nFilename = strlen(zFilename); + + pDb = (LsmDb *)testMalloc(sizeof(LsmDb) + nFilename + 1); + memset(pDb, 0, sizeof(LsmDb)); + pDb->base.pMethods = &LsmMethods; + pDb->zName = (char *)&pDb[1]; + memcpy(pDb->zName, zFilename, nFilename + 1); + + /* Default the sector size used for crash simulation to 512 bytes. + ** Todo: There should be an OS method to obtain this value - just as + ** there is in SQLite. For now, LSM assumes that it is smaller than + ** the page size (default 4KB). + */ + pDb->szSector = 256; + + /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */ + pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT; + pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT; + + memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env)); + pDb->env.pVfsCtx = (void *)pDb; + pDb->env.xFullpath = testEnvFullpath; + pDb->env.xOpen = testEnvOpen; + pDb->env.xRead = testEnvRead; + pDb->env.xWrite = testEnvWrite; + pDb->env.xTruncate = testEnvTruncate; + pDb->env.xSync = testEnvSync; + pDb->env.xSectorSize = testEnvSectorSize; + pDb->env.xRemap = testEnvRemap; + pDb->env.xFileid = testEnvFileid; + pDb->env.xClose = testEnvClose; + pDb->env.xUnlink = testEnvUnlink; + pDb->env.xLock = testEnvLock; + pDb->env.xTestLock = testEnvTestLock; + pDb->env.xShmBarrier = testEnvShmBarrier; + pDb->env.xShmMap = testEnvShmMap; + pDb->env.xShmUnmap = testEnvShmUnmap; + pDb->env.xSleep = testEnvSleep; + + rc = lsm_new(&pDb->env, &pDb->db); + if( rc==LSM_OK ){ + int nThread = 1; + lsm_config_log(pDb->db, xLog, 0); + lsm_config_work_hook(pDb->db, xWorkHook, (void *)pDb); + + rc = test_lsm_config_str(pDb, pDb->db, 0, zCfg, &nThread); + if( rc==LSM_OK ) rc = lsm_open(pDb->db, zFilename); + + pDb->eMode = nThread; +#ifdef LSM_MUTEX_PTHREADS + if( rc==LSM_OK && nThread>1 ){ + testLsmStartWorkers(pDb, nThread, zFilename, zCfg); + } +#endif + + if( rc!=LSM_OK ){ + test_lsm_close((TestDb *)pDb); + pDb = 0; + } + } + + *ppDb = (TestDb *)pDb; + return rc; +} + +int test_lsm_open( + const char *zSpec, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + return testLsmOpen(zSpec, zFilename, bClear, ppDb); +} + +int test_lsm_small_open( + const char *zSpec, + const char *zFile, + int bClear, + TestDb **ppDb +){ + const char *zCfg = "page_size=256 block_size=64 mmap=1024"; + return testLsmOpen(zCfg, zFile, bClear, ppDb); +} + +int test_lsm_lomem_open( + const char *zSpec, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + /* "max_freelist=4 autocheckpoint=32" */ + const char *zCfg = + "page_size=256 block_size=64 autoflush=16 " + "autocheckpoint=32" + "mmap=0 " + ; + return testLsmOpen(zCfg, zFilename, bClear, ppDb); +} + +int test_lsm_lomem2_open( + const char *zSpec, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + /* "max_freelist=4 autocheckpoint=32" */ + const char *zCfg = + "page_size=512 block_size=64 autoflush=0 mmap=0 " + ; + return testLsmOpen(zCfg, zFilename, bClear, ppDb); +} + +int test_lsm_zip_open( + const char *zSpec, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + const char *zCfg = + "page_size=256 block_size=64 autoflush=16 " + "autocheckpoint=32 compression=1 mmap=0 " + ; + return testLsmOpen(zCfg, zFilename, bClear, ppDb); +} + +lsm_db *tdb_lsm(TestDb *pDb){ + if( pDb->pMethods->xClose==test_lsm_close ){ + return ((LsmDb *)pDb)->db; + } + return 0; +} + +int tdb_lsm_multithread(TestDb *pDb){ + int ret = 0; + if( tdb_lsm(pDb) ){ + ret = ((LsmDb*)pDb)->eMode!=LSMTEST_MODE_SINGLETHREAD; + } + return ret; +} + +void tdb_lsm_enable_log(TestDb *pDb, int bEnable){ + lsm_db *db = tdb_lsm(pDb); + if( db ){ + lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client"); + } +} + +void tdb_lsm_application_crash(TestDb *pDb){ + if( tdb_lsm(pDb) ){ + LsmDb *p = (LsmDb *)pDb; + p->bCrashed = 1; + } +} + +void tdb_lsm_prepare_system_crash(TestDb *pDb){ + if( tdb_lsm(pDb) ){ + LsmDb *p = (LsmDb *)pDb; + p->bPrepareCrash = 1; + } +} + +void tdb_lsm_system_crash(TestDb *pDb){ + if( tdb_lsm(pDb) ){ + LsmDb *p = (LsmDb *)pDb; + p->bCrashed = 1; + doSystemCrash(p); + } +} + +void tdb_lsm_safety(TestDb *pDb, int eMode){ + assert( eMode==LSM_SAFETY_OFF + || eMode==LSM_SAFETY_NORMAL + || eMode==LSM_SAFETY_FULL + ); + if( tdb_lsm(pDb) ){ + int iParam = eMode; + LsmDb *p = (LsmDb *)pDb; + lsm_config(p->db, LSM_CONFIG_SAFETY, &iParam); + } +} + +void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync){ + assert( iSync>0 ); + if( tdb_lsm(pDb) ){ + LsmDb *p = (LsmDb *)pDb; + p->nAutoCrash = iSync; + p->bPrepareCrash = 1; + } +} + +void tdb_lsm_config_work_hook( + TestDb *pDb, + void (*xWork)(lsm_db *, void *), + void *pWorkCtx +){ + if( tdb_lsm(pDb) ){ + LsmDb *p = (LsmDb *)pDb; + p->xWork = xWork; + p->pWorkCtx = pWorkCtx; + } +} + +void tdb_lsm_write_hook( + TestDb *pDb, + void (*xWrite)(void *, int, lsm_i64, int, int), + void *pWriteCtx +){ + if( tdb_lsm(pDb) ){ + LsmDb *p = (LsmDb *)pDb; + p->xWriteHook = xWrite; + p->pWriteCtx = pWriteCtx; + } +} + +int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){ + return testLsmOpen(zCfg, zDb, bClear, ppDb); +} + +#ifdef LSM_MUTEX_PTHREADS + +/* +** Signal worker thread iWorker that there may be work to do. +*/ +static void mt_signal_worker(LsmDb *pDb, int iWorker){ + LsmWorker *p = &pDb->aWorker[iWorker]; + pthread_mutex_lock(&p->worker_mutex); + p->bDoWork = 1; + pthread_cond_signal(&p->worker_cond); + pthread_mutex_unlock(&p->worker_mutex); +} + +/* +** This routine is used as the main() for all worker threads. +*/ +static void *worker_main(void *pArg){ + LsmWorker *p = (LsmWorker *)pArg; + lsm_db *pWorker; /* Connection to access db through */ + + pthread_mutex_lock(&p->worker_mutex); + while( (pWorker = p->pWorker) ){ + int rc = LSM_OK; + + /* Do some work. If an error occurs, exit. */ + + pthread_mutex_unlock(&p->worker_mutex); + if( p->eType==LSMTEST_THREAD_CKPT ){ + int nKB = 0; + rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB); + if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){ + rc = lsm_checkpoint(pWorker, 0); + } + }else{ + int nWrite; + do { + + if( p->eType==LSMTEST_THREAD_WORKER ){ + waitOnCheckpointer(p->pDb, pWorker); + } + + nWrite = 0; + rc = lsm_work(pWorker, 0, 256, &nWrite); + + if( p->eType==LSMTEST_THREAD_WORKER && nWrite ){ + mt_signal_worker(p->pDb, 1); + } + }while( nWrite && p->pWorker ); + } + pthread_mutex_lock(&p->worker_mutex); + + if( rc!=LSM_OK && rc!=LSM_BUSY ){ + p->worker_rc = rc; + break; + } + + /* The thread will wake up when it is signaled either because another + ** thread has created some work for this one or because the connection + ** is being closed. */ + if( p->pWorker && p->bDoWork==0 ){ + pthread_cond_wait(&p->worker_cond, &p->worker_mutex); + } + p->bDoWork = 0; + } + pthread_mutex_unlock(&p->worker_mutex); + + return 0; +} + + +static void mt_stop_worker(LsmDb *pDb, int iWorker){ + LsmWorker *p = &pDb->aWorker[iWorker]; + if( p->pWorker ){ + void *pDummy; + lsm_db *pWorker; + + /* Signal the worker to stop */ + pthread_mutex_lock(&p->worker_mutex); + pWorker = p->pWorker; + p->pWorker = 0; + pthread_cond_signal(&p->worker_cond); + pthread_mutex_unlock(&p->worker_mutex); + + /* Join the worker thread. */ + pthread_join(p->worker_thread, &pDummy); + + /* Free resources allocated in mt_start_worker() */ + pthread_cond_destroy(&p->worker_cond); + pthread_mutex_destroy(&p->worker_mutex); + lsm_close(pWorker); + } +} + +static void mt_shutdown(LsmDb *pDb){ + int i; + for(i=0; i<pDb->nWorker; i++){ + mt_stop_worker(pDb, i); + } +} + +/* +** This callback is invoked by LSM when the client database writes to +** the database file (i.e. to flush the contents of the in-memory tree). +** This implies there may be work to do on the database, so signal +** the worker threads. +*/ +static void mt_client_work_hook(lsm_db *db, void *pArg){ + LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */ + + /* Invoke the user level work-hook, if any. */ + if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx); + + /* Wake up worker thread 0. */ + mt_signal_worker(pDb, 0); +} + +static void mt_worker_work_hook(lsm_db *db, void *pArg){ + LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */ + + /* Invoke the user level work-hook, if any. */ + if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx); +} + +/* +** Launch worker thread iWorker for database connection pDb. +*/ +static int mt_start_worker( + LsmDb *pDb, /* Main database structure */ + int iWorker, /* Worker number to start */ + const char *zFilename, /* File name of database to open */ + const char *zCfg, /* Connection configuration string */ + int eType /* Type of worker thread */ +){ + int rc = 0; /* Return code */ + LsmWorker *p; /* Object to initialize */ + + assert( iWorker<pDb->nWorker ); + assert( eType==LSMTEST_THREAD_CKPT + || eType==LSMTEST_THREAD_WORKER + || eType==LSMTEST_THREAD_WORKER_AC + ); + + p = &pDb->aWorker[iWorker]; + p->eType = eType; + p->pDb = pDb; + + /* Open the worker connection */ + if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker); + if( zCfg ){ + test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0); + } + if( rc==0 ) rc = lsm_open(p->pWorker, zFilename); + lsm_config_log(p->pWorker, xLog, (void *)"worker"); + + /* Configure the work-hook */ + if( rc==0 ){ + lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb); + } + + if( eType==LSMTEST_THREAD_WORKER ){ + test_lsm_config_str(0, p->pWorker, 1, "autocheckpoint=0", 0); + } + + /* Kick off the worker thread. */ + if( rc==0 ) rc = pthread_cond_init(&p->worker_cond, 0); + if( rc==0 ) rc = pthread_mutex_init(&p->worker_mutex, 0); + if( rc==0 ) rc = pthread_create(&p->worker_thread, 0, worker_main, (void *)p); + + return rc; +} + + +static int testLsmStartWorkers( + LsmDb *pDb, int eModel, const char *zFilename, const char *zCfg +){ + int rc; + + if( eModel<1 || eModel>4 ) return 1; + if( eModel==1 ) return 0; + + /* Configure a work-hook for the client connection. Worker 0 is signalled + ** every time the users connection writes to the database. */ + lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb); + + /* Allocate space for two worker connections. They may not both be + ** used, but both are allocated. */ + pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * 2); + memset(pDb->aWorker, 0, sizeof(LsmWorker) * 2); + + switch( eModel ){ + case LSMTEST_MODE_BACKGROUND_CKPT: + pDb->nWorker = 1; + test_lsm_config_str(0, pDb->db, 0, "autocheckpoint=0", 0); + rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_CKPT); + break; + + case LSMTEST_MODE_BACKGROUND_WORK: + pDb->nWorker = 1; + test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0); + rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER_AC); + break; + + case LSMTEST_MODE_BACKGROUND_BOTH: + pDb->nWorker = 2; + test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0); + rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER); + if( rc==0 ){ + rc = mt_start_worker(pDb, 1, zFilename, zCfg, LSMTEST_THREAD_CKPT); + } + break; + } + + return rc; +} + + +int test_lsm_mt2( + const char *zSpec, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + const char *zCfg = "mt_mode=2"; + return testLsmOpen(zCfg, zFilename, bClear, ppDb); +} + +int test_lsm_mt3( + const char *zSpec, + const char *zFilename, + int bClear, + TestDb **ppDb +){ + const char *zCfg = "mt_mode=4"; + return testLsmOpen(zCfg, zFilename, bClear, ppDb); +} + +#else +static void mt_shutdown(LsmDb *pDb) { + unused_parameter(pDb); +} +int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){ + unused_parameter(zFilename); + unused_parameter(bClear); + unused_parameter(ppDb); + testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n"); + return 1; +} +#endif |