summaryrefslogtreecommitdiffstats
path: root/ext/lsm1/lsm-test/lsmtest_tdb3.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/lsm1/lsm-test/lsmtest_tdb3.c')
-rw-r--r--ext/lsm1/lsm-test/lsmtest_tdb3.c1429
1 files changed, 1429 insertions, 0 deletions
diff --git a/ext/lsm1/lsm-test/lsmtest_tdb3.c b/ext/lsm1/lsm-test/lsmtest_tdb3.c
new file mode 100644
index 0000000..e29497a
--- /dev/null
+++ b/ext/lsm1/lsm-test/lsmtest_tdb3.c
@@ -0,0 +1,1429 @@
+
+#include "lsmtest_tdb.h"
+#include "lsm.h"
+#include "lsmtest.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#ifndef _WIN32
+# include <unistd.h>
+#endif
+#include <stdio.h>
+
+#ifndef _WIN32
+# include <sys/time.h>
+#endif
+
+typedef struct LsmDb LsmDb;
+typedef struct LsmWorker LsmWorker;
+typedef struct LsmFile LsmFile;
+
+#define LSMTEST_DFLT_MT_MAX_CKPT (8*1024)
+#define LSMTEST_DFLT_MT_MIN_CKPT (2*1024)
+
+#ifdef LSM_MUTEX_PTHREADS
+#include <pthread.h>
+
+#define LSMTEST_THREAD_CKPT 1
+#define LSMTEST_THREAD_WORKER 2
+#define LSMTEST_THREAD_WORKER_AC 3
+
+/*
+** There are several different types of worker threads that run in different
+** test configurations, depending on the value of LsmWorker.eType.
+**
+** 1. Checkpointer.
+** 2. Worker with auto-checkpoint.
+** 3. Worker without auto-checkpoint.
+*/
+struct LsmWorker {
+ LsmDb *pDb; /* Main database structure */
+ lsm_db *pWorker; /* Worker database handle */
+ pthread_t worker_thread; /* Worker thread */
+ pthread_cond_t worker_cond; /* Condition var the worker waits on */
+ pthread_mutex_t worker_mutex; /* Mutex used with worker_cond */
+ int bDoWork; /* Set to true by client when there is work */
+ int worker_rc; /* Store error code here */
+ int eType; /* LSMTEST_THREAD_XXX constant */
+ int bBlock;
+};
+#else
+struct LsmWorker { int worker_rc; int bBlock; };
+#endif
+
+static void mt_shutdown(LsmDb *);
+
+lsm_env *tdb_lsm_env(void){
+ static int bInit = 0;
+ static lsm_env env;
+ if( bInit==0 ){
+ memcpy(&env, lsm_default_env(), sizeof(env));
+ bInit = 1;
+ }
+ return &env;
+}
+
+typedef struct FileSector FileSector;
+typedef struct FileData FileData;
+
+struct FileSector {
+ u8 *aOld; /* Old data for this sector */
+};
+
+struct FileData {
+ int nSector; /* Allocated size of apSector[] array */
+ FileSector *aSector; /* Array of file sectors */
+};
+
+/*
+** bPrepareCrash:
+** If non-zero, the file wrappers maintain enough in-memory data to
+** simulate the effect of a power-failure on the file-system (i.e. that
+** unsynced sectors may be written, not written, or overwritten with
+** arbitrary data when the crash occurs).
+**
+** bCrashed:
+** Set to true after a crash is simulated. Once this variable is true, all
+** VFS methods other than xClose() return LSM_IOERR as soon as they are
+** called (without affecting the contents of the file-system).
+**
+** env:
+** The environment object used by all lsm_db* handles opened by this
+** object (i.e. LsmDb.db plus any worker connections). Variable env.pVfsCtx
+** always points to the containing LsmDb structure.
+*/
+struct LsmDb {
+ TestDb base; /* Base class - methods table */
+ lsm_env env; /* Environment used by connection db */
+ char *zName; /* Database file name */
+ lsm_db *db; /* LSM database handle */
+
+ lsm_cursor *pCsr; /* Cursor held open during read transaction */
+ void *pBuf; /* Buffer for tdb_fetch() output */
+ int nBuf; /* Allocated (not used) size of pBuf */
+
+ /* Crash testing related state */
+ int bCrashed; /* True once a crash has occurred */
+ int nAutoCrash; /* Number of syncs until a crash */
+ int bPrepareCrash; /* True to store writes in memory */
+
+ /* Unsynced data (while crash testing) */
+ int szSector; /* Assumed size of disk sectors (512B) */
+ FileData aFile[2]; /* Database and log file data */
+
+ /* Other test instrumentation */
+ int bNoRecovery; /* If true, assume DMS2 is locked */
+
+ /* Work hook redirection */
+ void (*xWork)(lsm_db *, void *);
+ void *pWorkCtx;
+
+ /* IO logging hook */
+ void (*xWriteHook)(void *, int, lsm_i64, int, int);
+ void *pWriteCtx;
+
+ /* Worker threads (for lsm_mt) */
+ int nMtMinCkpt;
+ int nMtMaxCkpt;
+ int eMode;
+ int nWorker;
+ LsmWorker *aWorker;
+};
+
+#define LSMTEST_MODE_SINGLETHREAD 1
+#define LSMTEST_MODE_BACKGROUND_CKPT 2
+#define LSMTEST_MODE_BACKGROUND_WORK 3
+#define LSMTEST_MODE_BACKGROUND_BOTH 4
+
+/*************************************************************************
+**************************************************************************
+** Begin test VFS code.
+*/
+
+struct LsmFile {
+ lsm_file *pReal; /* Real underlying file */
+ int bLog; /* True for log file. False for db file */
+ LsmDb *pDb; /* Database handle that uses this file */
+};
+
+static int testEnvFullpath(
+ lsm_env *pEnv, /* Environment for current LsmDb */
+ const char *zFile, /* Relative path name */
+ char *zOut, /* Output buffer */
+ int *pnOut /* IN/OUT: Size of output buffer */
+){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ return pRealEnv->xFullpath(pRealEnv, zFile, zOut, pnOut);
+}
+
+static int testEnvOpen(
+ lsm_env *pEnv, /* Environment for current LsmDb */
+ const char *zFile, /* Name of file to open */
+ int flags,
+ lsm_file **ppFile /* OUT: New file handle object */
+){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmDb *pDb = (LsmDb *)pEnv->pVfsCtx;
+ int rc; /* Return Code */
+ LsmFile *pRet; /* The new file handle */
+ int nFile; /* Length of string zFile in bytes */
+
+ nFile = strlen(zFile);
+ pRet = (LsmFile *)testMalloc(sizeof(LsmFile));
+ pRet->pDb = pDb;
+ pRet->bLog = (nFile > 4 && 0==memcmp("-log", &zFile[nFile-4], 4));
+
+ rc = pRealEnv->xOpen(pRealEnv, zFile, flags, &pRet->pReal);
+ if( rc!=LSM_OK ){
+ testFree(pRet);
+ pRet = 0;
+ }
+
+ *ppFile = (lsm_file *)pRet;
+ return rc;
+}
+
+static int testEnvRead(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ if( p->pDb->bCrashed ) return LSM_IOERR;
+ return pRealEnv->xRead(p->pReal, iOff, pData, nData);
+}
+
+static int testEnvWrite(lsm_file *pFile, lsm_i64 iOff, void *pData, int nData){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ LsmDb *pDb = p->pDb;
+
+ if( pDb->bCrashed ) return LSM_IOERR;
+
+ if( pDb->bPrepareCrash ){
+ FileData *pData2 = &pDb->aFile[p->bLog];
+ int iFirst;
+ int iLast;
+ int iSector;
+
+ iFirst = (int)(iOff / pDb->szSector);
+ iLast = (int)((iOff + nData - 1) / pDb->szSector);
+
+ if( pData2->nSector<(iLast+1) ){
+ int nNew = ( ((iLast + 1) + 63) / 64 ) * 64;
+ assert( nNew>iLast );
+ pData2->aSector = (FileSector *)testRealloc(
+ pData2->aSector, nNew*sizeof(FileSector)
+ );
+ memset(&pData2->aSector[pData2->nSector],
+ 0, (nNew - pData2->nSector) * sizeof(FileSector)
+ );
+ pData2->nSector = nNew;
+ }
+
+ for(iSector=iFirst; iSector<=iLast; iSector++){
+ if( pData2->aSector[iSector].aOld==0 ){
+ u8 *aOld = (u8 *)testMalloc(pDb->szSector);
+ pRealEnv->xRead(
+ p->pReal, (lsm_i64)iSector*pDb->szSector, aOld, pDb->szSector
+ );
+ pData2->aSector[iSector].aOld = aOld;
+ }
+ }
+ }
+
+ if( pDb->xWriteHook ){
+ int rc;
+ int nUs;
+ struct timeval t1;
+ struct timeval t2;
+
+ gettimeofday(&t1, 0);
+ assert( nData>0 );
+ rc = pRealEnv->xWrite(p->pReal, iOff, pData, nData);
+ gettimeofday(&t2, 0);
+
+ nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
+ pDb->xWriteHook(pDb->pWriteCtx, p->bLog, iOff, nData, nUs);
+ return rc;
+ }
+
+ return pRealEnv->xWrite(p->pReal, iOff, pData, nData);
+}
+
+static void doSystemCrash(LsmDb *pDb);
+
+static int testEnvSync(lsm_file *pFile){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ LsmDb *pDb = p->pDb;
+ FileData *pData = &pDb->aFile[p->bLog];
+ int i;
+
+ if( pDb->bCrashed ) return LSM_IOERR;
+
+ if( pDb->nAutoCrash ){
+ pDb->nAutoCrash--;
+ if( pDb->nAutoCrash==0 ){
+ doSystemCrash(pDb);
+ pDb->bCrashed = 1;
+ return LSM_IOERR;
+ }
+ }
+
+ if( pDb->bPrepareCrash ){
+ for(i=0; i<pData->nSector; i++){
+ testFree(pData->aSector[i].aOld);
+ pData->aSector[i].aOld = 0;
+ }
+ }
+
+ if( pDb->xWriteHook ){
+ int rc;
+ int nUs;
+ struct timeval t1;
+ struct timeval t2;
+
+ gettimeofday(&t1, 0);
+ rc = pRealEnv->xSync(p->pReal);
+ gettimeofday(&t2, 0);
+
+ nUs = (t2.tv_sec - t1.tv_sec) * 1000000 + (t2.tv_usec - t1.tv_usec);
+ pDb->xWriteHook(pDb->pWriteCtx, p->bLog, 0, 0, nUs);
+ return rc;
+ }
+
+ return pRealEnv->xSync(p->pReal);
+}
+
+static int testEnvTruncate(lsm_file *pFile, lsm_i64 iOff){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ if( p->pDb->bCrashed ) return LSM_IOERR;
+ return pRealEnv->xTruncate(p->pReal, iOff);
+}
+
+static int testEnvSectorSize(lsm_file *pFile){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ return pRealEnv->xSectorSize(p->pReal);
+}
+
+static int testEnvRemap(
+ lsm_file *pFile,
+ lsm_i64 iMin,
+ void **ppOut,
+ lsm_i64 *pnOut
+){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ return pRealEnv->xRemap(p->pReal, iMin, ppOut, pnOut);
+}
+
+static int testEnvFileid(
+ lsm_file *pFile,
+ void *ppOut,
+ int *pnOut
+){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+ return pRealEnv->xFileid(p->pReal, ppOut, pnOut);
+}
+
+static int testEnvClose(lsm_file *pFile){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ LsmFile *p = (LsmFile *)pFile;
+
+ pRealEnv->xClose(p->pReal);
+ testFree(p);
+ return LSM_OK;
+}
+
+static int testEnvUnlink(lsm_env *pEnv, const char *zFile){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ unused_parameter(pEnv);
+ return pRealEnv->xUnlink(pRealEnv, zFile);
+}
+
+static int testEnvLock(lsm_file *pFile, int iLock, int eType){
+ LsmFile *p = (LsmFile *)pFile;
+ lsm_env *pRealEnv = tdb_lsm_env();
+
+ if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
+ return LSM_BUSY;
+ }
+ return pRealEnv->xLock(p->pReal, iLock, eType);
+}
+
+static int testEnvTestLock(lsm_file *pFile, int iLock, int nLock, int eType){
+ LsmFile *p = (LsmFile *)pFile;
+ lsm_env *pRealEnv = tdb_lsm_env();
+
+ if( iLock==2 && eType==LSM_LOCK_EXCL && p->pDb->bNoRecovery ){
+ return LSM_BUSY;
+ }
+ return pRealEnv->xTestLock(p->pReal, iLock, nLock, eType);
+}
+
+static int testEnvShmMap(lsm_file *pFile, int iRegion, int sz, void **pp){
+ LsmFile *p = (LsmFile *)pFile;
+ lsm_env *pRealEnv = tdb_lsm_env();
+ return pRealEnv->xShmMap(p->pReal, iRegion, sz, pp);
+}
+
+static void testEnvShmBarrier(void){
+}
+
+static int testEnvShmUnmap(lsm_file *pFile, int bDel){
+ LsmFile *p = (LsmFile *)pFile;
+ lsm_env *pRealEnv = tdb_lsm_env();
+ return pRealEnv->xShmUnmap(p->pReal, bDel);
+}
+
+static int testEnvSleep(lsm_env *pEnv, int us){
+ lsm_env *pRealEnv = tdb_lsm_env();
+ return pRealEnv->xSleep(pRealEnv, us);
+}
+
+static void doSystemCrash(LsmDb *pDb){
+ lsm_env *pEnv = tdb_lsm_env();
+ int iFile;
+ int iSeed = pDb->aFile[0].nSector + pDb->aFile[1].nSector;
+
+ char *zFile = pDb->zName;
+ char *zFree = 0;
+
+ for(iFile=0; iFile<2; iFile++){
+ lsm_file *pFile = 0;
+ int i;
+
+ pEnv->xOpen(pEnv, zFile, 0, &pFile);
+ for(i=0; i<pDb->aFile[iFile].nSector; i++){
+ u8 *aOld = pDb->aFile[iFile].aSector[i].aOld;
+ if( aOld ){
+ int iOpt = testPrngValue(iSeed++) % 3;
+ switch( iOpt ){
+ case 0:
+ break;
+
+ case 1:
+ testPrngArray(iSeed++, (u32 *)aOld, pDb->szSector/4);
+ /* Fall-through */
+
+ case 2:
+ pEnv->xWrite(
+ pFile, (lsm_i64)i * pDb->szSector, aOld, pDb->szSector
+ );
+ break;
+ }
+ testFree(aOld);
+ pDb->aFile[iFile].aSector[i].aOld = 0;
+ }
+ }
+ pEnv->xClose(pFile);
+ zFree = zFile = sqlite3_mprintf("%s-log", pDb->zName);
+ }
+
+ sqlite3_free(zFree);
+}
+/*
+** End test VFS code.
+**************************************************************************
+*************************************************************************/
+
+/*************************************************************************
+**************************************************************************
+** Begin test compression hooks.
+*/
+
+#ifdef HAVE_ZLIB
+#include <zlib.h>
+
+static int testZipBound(void *pCtx, int nSrc){
+ return compressBound(nSrc);
+}
+
+static int testZipCompress(
+ void *pCtx, /* Context pointer */
+ char *aOut, int *pnOut, /* OUT: Buffer containing compressed data */
+ const char *aIn, int nIn /* Buffer containing input data */
+){
+ uLongf n = *pnOut; /* In/out buffer size for compress() */
+ int rc; /* compress() return code */
+
+ rc = compress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
+ *pnOut = n;
+ return (rc==Z_OK ? 0 : LSM_ERROR);
+}
+
+static int testZipUncompress(
+ void *pCtx, /* Context pointer */
+ char *aOut, int *pnOut, /* OUT: Buffer containing uncompressed data */
+ const char *aIn, int nIn /* Buffer containing input data */
+){
+ uLongf n = *pnOut; /* In/out buffer size for uncompress() */
+ int rc; /* uncompress() return code */
+
+ rc = uncompress((Bytef*)aOut, &n, (Bytef*)aIn, nIn);
+ *pnOut = n;
+ return (rc==Z_OK ? 0 : LSM_ERROR);
+}
+
+static int testConfigureCompression(lsm_db *pDb){
+ static lsm_compress zip = {
+ 0, /* Context pointer (unused) */
+ 1, /* Id value */
+ testZipBound, /* xBound method */
+ testZipCompress, /* xCompress method */
+ testZipUncompress /* xUncompress method */
+ };
+ return lsm_config(pDb, LSM_CONFIG_SET_COMPRESSION, &zip);
+}
+#endif /* ifdef HAVE_ZLIB */
+
+/*
+** End test compression hooks.
+**************************************************************************
+*************************************************************************/
+
+static int test_lsm_close(TestDb *pTestDb){
+ int i;
+ int rc = LSM_OK;
+ LsmDb *pDb = (LsmDb *)pTestDb;
+
+ lsm_csr_close(pDb->pCsr);
+ lsm_close(pDb->db);
+
+ /* If this is a multi-threaded database, wait on the worker threads. */
+ mt_shutdown(pDb);
+ for(i=0; i<pDb->nWorker && rc==LSM_OK; i++){
+ rc = pDb->aWorker[i].worker_rc;
+ }
+
+ for(i=0; i<pDb->aFile[0].nSector; i++){
+ testFree(pDb->aFile[0].aSector[i].aOld);
+ }
+ testFree(pDb->aFile[0].aSector);
+ for(i=0; i<pDb->aFile[1].nSector; i++){
+ testFree(pDb->aFile[1].aSector[i].aOld);
+ }
+ testFree(pDb->aFile[1].aSector);
+
+ memset(pDb, sizeof(LsmDb), 0x11);
+ testFree((char *)pDb->pBuf);
+ testFree((char *)pDb);
+ return rc;
+}
+
+static void mt_signal_worker(LsmDb*, int);
+
+static int waitOnCheckpointer(LsmDb *pDb, lsm_db *db){
+ int nSleep = 0;
+ int nKB;
+ int rc;
+
+ do {
+ nKB = 0;
+ rc = lsm_info(db, LSM_INFO_CHECKPOINT_SIZE, &nKB);
+ if( rc!=LSM_OK || nKB<pDb->nMtMaxCkpt ) break;
+#ifdef LSM_MUTEX_PTHREADS
+ mt_signal_worker(pDb,
+ (pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ? 0 : 1)
+ );
+#endif
+ usleep(5000);
+ nSleep += 5;
+ }while( 1 );
+
+#if 0
+ if( nSleep ) printf("# waitOnCheckpointer(): nSleep=%d\n", nSleep);
+#endif
+
+ return rc;
+}
+
+static int waitOnWorker(LsmDb *pDb){
+ int rc;
+ int nLimit = -1;
+ int nSleep = 0;
+
+ rc = lsm_config(pDb->db, LSM_CONFIG_AUTOFLUSH, &nLimit);
+ do {
+ int nOld, nNew, rc2;
+ rc2 = lsm_info(pDb->db, LSM_INFO_TREE_SIZE, &nOld, &nNew);
+ if( rc2!=LSM_OK ) return rc2;
+ if( nOld==0 || nNew<(nLimit/2) ) break;
+#ifdef LSM_MUTEX_PTHREADS
+ mt_signal_worker(pDb, 0);
+#endif
+ usleep(5000);
+ nSleep += 5;
+ }while( 1 );
+
+#if 0
+ if( nSleep ) printf("# waitOnWorker(): nSleep=%d\n", nSleep);
+#endif
+
+ return rc;
+}
+
+static int test_lsm_write(
+ TestDb *pTestDb,
+ void *pKey,
+ int nKey,
+ void *pVal,
+ int nVal
+){
+ LsmDb *pDb = (LsmDb *)pTestDb;
+ int rc = LSM_OK;
+
+ if( pDb->eMode==LSMTEST_MODE_BACKGROUND_CKPT ){
+ rc = waitOnCheckpointer(pDb, pDb->db);
+ }else if(
+ pDb->eMode==LSMTEST_MODE_BACKGROUND_WORK
+ || pDb->eMode==LSMTEST_MODE_BACKGROUND_BOTH
+ ){
+ rc = waitOnWorker(pDb);
+ }
+
+ if( rc==LSM_OK ){
+ rc = lsm_insert(pDb->db, pKey, nKey, pVal, nVal);
+ }
+ return rc;
+}
+
+static int test_lsm_delete(TestDb *pTestDb, void *pKey, int nKey){
+ LsmDb *pDb = (LsmDb *)pTestDb;
+ return lsm_delete(pDb->db, pKey, nKey);
+}
+
+static int test_lsm_delete_range(
+ TestDb *pTestDb,
+ void *pKey1, int nKey1,
+ void *pKey2, int nKey2
+){
+ LsmDb *pDb = (LsmDb *)pTestDb;
+ return lsm_delete_range(pDb->db, pKey1, nKey1, pKey2, nKey2);
+}
+
+static int test_lsm_fetch(
+ TestDb *pTestDb,
+ void *pKey,
+ int nKey,
+ void **ppVal,
+ int *pnVal
+){
+ int rc;
+ LsmDb *pDb = (LsmDb *)pTestDb;
+ lsm_cursor *csr;
+
+ if( pKey==0 ) return LSM_OK;
+
+ if( pDb->pCsr==0 ){
+ rc = lsm_csr_open(pDb->db, &csr);
+ if( rc!=LSM_OK ) return rc;
+ }else{
+ csr = pDb->pCsr;
+ }
+
+ rc = lsm_csr_seek(csr, pKey, nKey, LSM_SEEK_EQ);
+ if( rc==LSM_OK ){
+ if( lsm_csr_valid(csr) ){
+ const void *pVal; int nVal;
+ rc = lsm_csr_value(csr, &pVal, &nVal);
+ if( nVal>pDb->nBuf ){
+ testFree(pDb->pBuf);
+ pDb->pBuf = testMalloc(nVal*2);
+ pDb->nBuf = nVal*2;
+ }
+ memcpy(pDb->pBuf, pVal, nVal);
+ *ppVal = pDb->pBuf;
+ *pnVal = nVal;
+ }else{
+ *ppVal = 0;
+ *pnVal = -1;
+ }
+ }
+ if( pDb->pCsr==0 ){
+ lsm_csr_close(csr);
+ }
+ return rc;
+}
+
+static int test_lsm_scan(
+ TestDb *pTestDb,
+ void *pCtx,
+ int bReverse,
+ void *pFirst, int nFirst,
+ void *pLast, int nLast,
+ void (*xCallback)(void *, void *, int , void *, int)
+){
+ LsmDb *pDb = (LsmDb *)pTestDb;
+ lsm_cursor *csr;
+ lsm_cursor *csr2 = 0;
+ int rc;
+
+ if( pDb->pCsr==0 ){
+ rc = lsm_csr_open(pDb->db, &csr);
+ if( rc!=LSM_OK ) return rc;
+ }else{
+ rc = LSM_OK;
+ csr = pDb->pCsr;
+ }
+
+ /* To enhance testing, if both pLast and pFirst are defined, seek the
+ ** cursor to the "end" boundary here. Then the next block seeks it to
+ ** the "start" ready for the scan. The point is to test that cursors
+ ** can be reused. */
+ if( pLast && pFirst ){
+ if( bReverse ){
+ rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_LE);
+ }else{
+ rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_GE);
+ }
+ }
+
+ if( bReverse ){
+ if( pLast ){
+ rc = lsm_csr_seek(csr, pLast, nLast, LSM_SEEK_LE);
+ }else{
+ rc = lsm_csr_last(csr);
+ }
+ }else{
+ if( pFirst ){
+ rc = lsm_csr_seek(csr, pFirst, nFirst, LSM_SEEK_GE);
+ }else{
+ rc = lsm_csr_first(csr);
+ }
+ }
+
+ while( rc==LSM_OK && lsm_csr_valid(csr) ){
+ const void *pKey; int nKey;
+ const void *pVal; int nVal;
+ int cmp;
+
+ lsm_csr_key(csr, &pKey, &nKey);
+ lsm_csr_value(csr, &pVal, &nVal);
+
+ if( bReverse && pFirst ){
+ cmp = memcmp(pFirst, pKey, MIN(nKey, nFirst));
+ if( cmp>0 || (cmp==0 && nFirst>nKey) ) break;
+ }else if( bReverse==0 && pLast ){
+ cmp = memcmp(pLast, pKey, MIN(nKey, nLast));
+ if( cmp<0 || (cmp==0 && nLast<nKey) ) break;
+ }
+
+ xCallback(pCtx, (void *)pKey, nKey, (void *)pVal, nVal);
+
+ if( bReverse ){
+ rc = lsm_csr_prev(csr);
+ }else{
+ rc = lsm_csr_next(csr);
+ }
+ }
+
+ if( pDb->pCsr==0 ){
+ lsm_csr_close(csr);
+ }
+ return rc;
+}
+
+static int test_lsm_begin(TestDb *pTestDb, int iLevel){
+ int rc = LSM_OK;
+ LsmDb *pDb = (LsmDb *)pTestDb;
+
+ /* iLevel==0 is a no-op. */
+ if( iLevel==0 ) return 0;
+
+ if( pDb->pCsr==0 ) rc = lsm_csr_open(pDb->db, &pDb->pCsr);
+ if( rc==LSM_OK && iLevel>1 ){
+ rc = lsm_begin(pDb->db, iLevel-1);
+ }
+
+ return rc;
+}
+static int test_lsm_commit(TestDb *pTestDb, int iLevel){
+ LsmDb *pDb = (LsmDb *)pTestDb;
+
+ /* If iLevel==0, close any open read transaction */
+ if( iLevel==0 && pDb->pCsr ){
+ lsm_csr_close(pDb->pCsr);
+ pDb->pCsr = 0;
+ }
+
+ /* If iLevel==0, close any open read transaction */
+ return lsm_commit(pDb->db, MAX(0, iLevel-1));
+}
+static int test_lsm_rollback(TestDb *pTestDb, int iLevel){
+ LsmDb *pDb = (LsmDb *)pTestDb;
+
+ /* If iLevel==0, close any open read transaction */
+ if( iLevel==0 && pDb->pCsr ){
+ lsm_csr_close(pDb->pCsr);
+ pDb->pCsr = 0;
+ }
+
+ return lsm_rollback(pDb->db, MAX(0, iLevel-1));
+}
+
+/*
+** A log message callback registered with lsm connections. Prints all
+** messages to stderr.
+*/
+static void xLog(void *pCtx, int rc, const char *z){
+ unused_parameter(rc);
+ /* fprintf(stderr, "lsm: rc=%d \"%s\"\n", rc, z); */
+ if( pCtx ) fprintf(stderr, "%s: ", (char *)pCtx);
+ fprintf(stderr, "%s\n", z);
+ fflush(stderr);
+}
+
+static void xWorkHook(lsm_db *db, void *pArg){
+ LsmDb *p = (LsmDb *)pArg;
+ if( p->xWork ) p->xWork(db, p->pWorkCtx);
+}
+
+#define TEST_NO_RECOVERY -1
+#define TEST_COMPRESSION -3
+
+#define TEST_MT_MODE -2
+#define TEST_MT_MIN_CKPT -4
+#define TEST_MT_MAX_CKPT -5
+
+
+int test_lsm_config_str(
+ LsmDb *pLsm,
+ lsm_db *db,
+ int bWorker,
+ const char *zStr,
+ int *pnThread
+){
+ struct CfgParam {
+ const char *zParam;
+ int bWorker;
+ int eParam;
+ } aParam[] = {
+ { "autoflush", 0, LSM_CONFIG_AUTOFLUSH },
+ { "page_size", 0, LSM_CONFIG_PAGE_SIZE },
+ { "block_size", 0, LSM_CONFIG_BLOCK_SIZE },
+ { "safety", 0, LSM_CONFIG_SAFETY },
+ { "autowork", 0, LSM_CONFIG_AUTOWORK },
+ { "autocheckpoint", 0, LSM_CONFIG_AUTOCHECKPOINT },
+ { "mmap", 0, LSM_CONFIG_MMAP },
+ { "use_log", 0, LSM_CONFIG_USE_LOG },
+ { "automerge", 0, LSM_CONFIG_AUTOMERGE },
+ { "max_freelist", 0, LSM_CONFIG_MAX_FREELIST },
+ { "multi_proc", 0, LSM_CONFIG_MULTIPLE_PROCESSES },
+ { "worker_automerge", 1, LSM_CONFIG_AUTOMERGE },
+ { "test_no_recovery", 0, TEST_NO_RECOVERY },
+ { "bg_min_ckpt", 0, TEST_NO_RECOVERY },
+
+ { "mt_mode", 0, TEST_MT_MODE },
+ { "mt_min_ckpt", 0, TEST_MT_MIN_CKPT },
+ { "mt_max_ckpt", 0, TEST_MT_MAX_CKPT },
+
+#ifdef HAVE_ZLIB
+ { "compression", 0, TEST_COMPRESSION },
+#endif
+ { 0, 0 }
+ };
+ const char *z = zStr;
+ int nThread = 1;
+
+ if( zStr==0 ) return 0;
+
+ assert( db );
+ while( z[0] ){
+ const char *zStart;
+
+ /* Skip whitespace */
+ while( *z==' ' ) z++;
+ zStart = z;
+
+ while( *z && *z!='=' ) z++;
+ if( *z ){
+ int eParam;
+ int i;
+ int iVal;
+ int iMul = 1;
+ int rc;
+ char zParam[32];
+ int nParam = z-zStart;
+ if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
+
+ memcpy(zParam, zStart, nParam);
+ zParam[nParam] = '\0';
+ rc = testArgSelect(aParam, "param", zParam, &i);
+ if( rc!=0 ) return rc;
+ eParam = aParam[i].eParam;
+
+ z++;
+ zStart = z;
+ while( *z>='0' && *z<='9' ) z++;
+ if( *z=='k' || *z=='K' ){
+ iMul = 1;
+ z++;
+ }else if( *z=='M' || *z=='M' ){
+ iMul = 1024;
+ z++;
+ }
+ nParam = z-zStart;
+ if( nParam==0 || nParam>sizeof(zParam)-1 ) goto syntax_error;
+ memcpy(zParam, zStart, nParam);
+ zParam[nParam] = '\0';
+ iVal = atoi(zParam) * iMul;
+
+ if( eParam>0 ){
+ if( bWorker || aParam[i].bWorker==0 ){
+ lsm_config(db, eParam, &iVal);
+ }
+ }else{
+ switch( eParam ){
+ case TEST_NO_RECOVERY:
+ if( pLsm ) pLsm->bNoRecovery = iVal;
+ break;
+ case TEST_MT_MODE:
+ if( pLsm ) nThread = iVal;
+ break;
+ case TEST_MT_MIN_CKPT:
+ if( pLsm && iVal>0 ) pLsm->nMtMinCkpt = iVal*1024;
+ break;
+ case TEST_MT_MAX_CKPT:
+ if( pLsm && iVal>0 ) pLsm->nMtMaxCkpt = iVal*1024;
+ break;
+#ifdef HAVE_ZLIB
+ case TEST_COMPRESSION:
+ testConfigureCompression(db);
+ break;
+#endif
+ }
+ }
+ }else if( z!=zStart ){
+ goto syntax_error;
+ }
+ }
+
+ if( pnThread ) *pnThread = nThread;
+ if( pLsm && pLsm->nMtMaxCkpt < pLsm->nMtMinCkpt ){
+ pLsm->nMtMinCkpt = pLsm->nMtMaxCkpt;
+ }
+
+ return 0;
+ syntax_error:
+ testPrintError("syntax error at: \"%s\"\n", z);
+ return 1;
+}
+
+int tdb_lsm_config_str(TestDb *pDb, const char *zStr){
+ int rc = 0;
+ if( tdb_lsm(pDb) ){
+#ifdef LSM_MUTEX_PTHREADS
+ int i;
+#endif
+ LsmDb *pLsm = (LsmDb *)pDb;
+
+ rc = test_lsm_config_str(pLsm, pLsm->db, 0, zStr, 0);
+#ifdef LSM_MUTEX_PTHREADS
+ for(i=0; rc==0 && i<pLsm->nWorker; i++){
+ rc = test_lsm_config_str(0, pLsm->aWorker[i].pWorker, 1, zStr, 0);
+ }
+#endif
+ }
+ return rc;
+}
+
+int tdb_lsm_configure(lsm_db *db, const char *zConfig){
+ return test_lsm_config_str(0, db, 0, zConfig, 0);
+}
+
+static int testLsmStartWorkers(LsmDb *, int, const char *, const char *);
+
+static int testLsmOpen(
+ const char *zCfg,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ static const DatabaseMethods LsmMethods = {
+ test_lsm_close,
+ test_lsm_write,
+ test_lsm_delete,
+ test_lsm_delete_range,
+ test_lsm_fetch,
+ test_lsm_scan,
+ test_lsm_begin,
+ test_lsm_commit,
+ test_lsm_rollback
+ };
+
+ int rc;
+ int nFilename;
+ LsmDb *pDb;
+
+ /* If the bClear flag is set, delete any existing database. */
+ assert( zFilename);
+ if( bClear ) testDeleteLsmdb(zFilename);
+ nFilename = strlen(zFilename);
+
+ pDb = (LsmDb *)testMalloc(sizeof(LsmDb) + nFilename + 1);
+ memset(pDb, 0, sizeof(LsmDb));
+ pDb->base.pMethods = &LsmMethods;
+ pDb->zName = (char *)&pDb[1];
+ memcpy(pDb->zName, zFilename, nFilename + 1);
+
+ /* Default the sector size used for crash simulation to 512 bytes.
+ ** Todo: There should be an OS method to obtain this value - just as
+ ** there is in SQLite. For now, LSM assumes that it is smaller than
+ ** the page size (default 4KB).
+ */
+ pDb->szSector = 256;
+
+ /* Default values for the mt_min_ckpt and mt_max_ckpt parameters. */
+ pDb->nMtMinCkpt = LSMTEST_DFLT_MT_MIN_CKPT;
+ pDb->nMtMaxCkpt = LSMTEST_DFLT_MT_MAX_CKPT;
+
+ memcpy(&pDb->env, tdb_lsm_env(), sizeof(lsm_env));
+ pDb->env.pVfsCtx = (void *)pDb;
+ pDb->env.xFullpath = testEnvFullpath;
+ pDb->env.xOpen = testEnvOpen;
+ pDb->env.xRead = testEnvRead;
+ pDb->env.xWrite = testEnvWrite;
+ pDb->env.xTruncate = testEnvTruncate;
+ pDb->env.xSync = testEnvSync;
+ pDb->env.xSectorSize = testEnvSectorSize;
+ pDb->env.xRemap = testEnvRemap;
+ pDb->env.xFileid = testEnvFileid;
+ pDb->env.xClose = testEnvClose;
+ pDb->env.xUnlink = testEnvUnlink;
+ pDb->env.xLock = testEnvLock;
+ pDb->env.xTestLock = testEnvTestLock;
+ pDb->env.xShmBarrier = testEnvShmBarrier;
+ pDb->env.xShmMap = testEnvShmMap;
+ pDb->env.xShmUnmap = testEnvShmUnmap;
+ pDb->env.xSleep = testEnvSleep;
+
+ rc = lsm_new(&pDb->env, &pDb->db);
+ if( rc==LSM_OK ){
+ int nThread = 1;
+ lsm_config_log(pDb->db, xLog, 0);
+ lsm_config_work_hook(pDb->db, xWorkHook, (void *)pDb);
+
+ rc = test_lsm_config_str(pDb, pDb->db, 0, zCfg, &nThread);
+ if( rc==LSM_OK ) rc = lsm_open(pDb->db, zFilename);
+
+ pDb->eMode = nThread;
+#ifdef LSM_MUTEX_PTHREADS
+ if( rc==LSM_OK && nThread>1 ){
+ testLsmStartWorkers(pDb, nThread, zFilename, zCfg);
+ }
+#endif
+
+ if( rc!=LSM_OK ){
+ test_lsm_close((TestDb *)pDb);
+ pDb = 0;
+ }
+ }
+
+ *ppDb = (TestDb *)pDb;
+ return rc;
+}
+
+int test_lsm_open(
+ const char *zSpec,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ return testLsmOpen(zSpec, zFilename, bClear, ppDb);
+}
+
+int test_lsm_small_open(
+ const char *zSpec,
+ const char *zFile,
+ int bClear,
+ TestDb **ppDb
+){
+ const char *zCfg = "page_size=256 block_size=64 mmap=1024";
+ return testLsmOpen(zCfg, zFile, bClear, ppDb);
+}
+
+int test_lsm_lomem_open(
+ const char *zSpec,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ /* "max_freelist=4 autocheckpoint=32" */
+ const char *zCfg =
+ "page_size=256 block_size=64 autoflush=16 "
+ "autocheckpoint=32"
+ "mmap=0 "
+ ;
+ return testLsmOpen(zCfg, zFilename, bClear, ppDb);
+}
+
+int test_lsm_lomem2_open(
+ const char *zSpec,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ /* "max_freelist=4 autocheckpoint=32" */
+ const char *zCfg =
+ "page_size=512 block_size=64 autoflush=0 mmap=0 "
+ ;
+ return testLsmOpen(zCfg, zFilename, bClear, ppDb);
+}
+
+int test_lsm_zip_open(
+ const char *zSpec,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ const char *zCfg =
+ "page_size=256 block_size=64 autoflush=16 "
+ "autocheckpoint=32 compression=1 mmap=0 "
+ ;
+ return testLsmOpen(zCfg, zFilename, bClear, ppDb);
+}
+
+lsm_db *tdb_lsm(TestDb *pDb){
+ if( pDb->pMethods->xClose==test_lsm_close ){
+ return ((LsmDb *)pDb)->db;
+ }
+ return 0;
+}
+
+int tdb_lsm_multithread(TestDb *pDb){
+ int ret = 0;
+ if( tdb_lsm(pDb) ){
+ ret = ((LsmDb*)pDb)->eMode!=LSMTEST_MODE_SINGLETHREAD;
+ }
+ return ret;
+}
+
+void tdb_lsm_enable_log(TestDb *pDb, int bEnable){
+ lsm_db *db = tdb_lsm(pDb);
+ if( db ){
+ lsm_config_log(db, (bEnable ? xLog : 0), (void *)"client");
+ }
+}
+
+void tdb_lsm_application_crash(TestDb *pDb){
+ if( tdb_lsm(pDb) ){
+ LsmDb *p = (LsmDb *)pDb;
+ p->bCrashed = 1;
+ }
+}
+
+void tdb_lsm_prepare_system_crash(TestDb *pDb){
+ if( tdb_lsm(pDb) ){
+ LsmDb *p = (LsmDb *)pDb;
+ p->bPrepareCrash = 1;
+ }
+}
+
+void tdb_lsm_system_crash(TestDb *pDb){
+ if( tdb_lsm(pDb) ){
+ LsmDb *p = (LsmDb *)pDb;
+ p->bCrashed = 1;
+ doSystemCrash(p);
+ }
+}
+
+void tdb_lsm_safety(TestDb *pDb, int eMode){
+ assert( eMode==LSM_SAFETY_OFF
+ || eMode==LSM_SAFETY_NORMAL
+ || eMode==LSM_SAFETY_FULL
+ );
+ if( tdb_lsm(pDb) ){
+ int iParam = eMode;
+ LsmDb *p = (LsmDb *)pDb;
+ lsm_config(p->db, LSM_CONFIG_SAFETY, &iParam);
+ }
+}
+
+void tdb_lsm_prepare_sync_crash(TestDb *pDb, int iSync){
+ assert( iSync>0 );
+ if( tdb_lsm(pDb) ){
+ LsmDb *p = (LsmDb *)pDb;
+ p->nAutoCrash = iSync;
+ p->bPrepareCrash = 1;
+ }
+}
+
+void tdb_lsm_config_work_hook(
+ TestDb *pDb,
+ void (*xWork)(lsm_db *, void *),
+ void *pWorkCtx
+){
+ if( tdb_lsm(pDb) ){
+ LsmDb *p = (LsmDb *)pDb;
+ p->xWork = xWork;
+ p->pWorkCtx = pWorkCtx;
+ }
+}
+
+void tdb_lsm_write_hook(
+ TestDb *pDb,
+ void (*xWrite)(void *, int, lsm_i64, int, int),
+ void *pWriteCtx
+){
+ if( tdb_lsm(pDb) ){
+ LsmDb *p = (LsmDb *)pDb;
+ p->xWriteHook = xWrite;
+ p->pWriteCtx = pWriteCtx;
+ }
+}
+
+int tdb_lsm_open(const char *zCfg, const char *zDb, int bClear, TestDb **ppDb){
+ return testLsmOpen(zCfg, zDb, bClear, ppDb);
+}
+
+#ifdef LSM_MUTEX_PTHREADS
+
+/*
+** Signal worker thread iWorker that there may be work to do.
+*/
+static void mt_signal_worker(LsmDb *pDb, int iWorker){
+ LsmWorker *p = &pDb->aWorker[iWorker];
+ pthread_mutex_lock(&p->worker_mutex);
+ p->bDoWork = 1;
+ pthread_cond_signal(&p->worker_cond);
+ pthread_mutex_unlock(&p->worker_mutex);
+}
+
+/*
+** This routine is used as the main() for all worker threads.
+*/
+static void *worker_main(void *pArg){
+ LsmWorker *p = (LsmWorker *)pArg;
+ lsm_db *pWorker; /* Connection to access db through */
+
+ pthread_mutex_lock(&p->worker_mutex);
+ while( (pWorker = p->pWorker) ){
+ int rc = LSM_OK;
+
+ /* Do some work. If an error occurs, exit. */
+
+ pthread_mutex_unlock(&p->worker_mutex);
+ if( p->eType==LSMTEST_THREAD_CKPT ){
+ int nKB = 0;
+ rc = lsm_info(pWorker, LSM_INFO_CHECKPOINT_SIZE, &nKB);
+ if( rc==LSM_OK && nKB>=p->pDb->nMtMinCkpt ){
+ rc = lsm_checkpoint(pWorker, 0);
+ }
+ }else{
+ int nWrite;
+ do {
+
+ if( p->eType==LSMTEST_THREAD_WORKER ){
+ waitOnCheckpointer(p->pDb, pWorker);
+ }
+
+ nWrite = 0;
+ rc = lsm_work(pWorker, 0, 256, &nWrite);
+
+ if( p->eType==LSMTEST_THREAD_WORKER && nWrite ){
+ mt_signal_worker(p->pDb, 1);
+ }
+ }while( nWrite && p->pWorker );
+ }
+ pthread_mutex_lock(&p->worker_mutex);
+
+ if( rc!=LSM_OK && rc!=LSM_BUSY ){
+ p->worker_rc = rc;
+ break;
+ }
+
+ /* The thread will wake up when it is signaled either because another
+ ** thread has created some work for this one or because the connection
+ ** is being closed. */
+ if( p->pWorker && p->bDoWork==0 ){
+ pthread_cond_wait(&p->worker_cond, &p->worker_mutex);
+ }
+ p->bDoWork = 0;
+ }
+ pthread_mutex_unlock(&p->worker_mutex);
+
+ return 0;
+}
+
+
+static void mt_stop_worker(LsmDb *pDb, int iWorker){
+ LsmWorker *p = &pDb->aWorker[iWorker];
+ if( p->pWorker ){
+ void *pDummy;
+ lsm_db *pWorker;
+
+ /* Signal the worker to stop */
+ pthread_mutex_lock(&p->worker_mutex);
+ pWorker = p->pWorker;
+ p->pWorker = 0;
+ pthread_cond_signal(&p->worker_cond);
+ pthread_mutex_unlock(&p->worker_mutex);
+
+ /* Join the worker thread. */
+ pthread_join(p->worker_thread, &pDummy);
+
+ /* Free resources allocated in mt_start_worker() */
+ pthread_cond_destroy(&p->worker_cond);
+ pthread_mutex_destroy(&p->worker_mutex);
+ lsm_close(pWorker);
+ }
+}
+
+static void mt_shutdown(LsmDb *pDb){
+ int i;
+ for(i=0; i<pDb->nWorker; i++){
+ mt_stop_worker(pDb, i);
+ }
+}
+
+/*
+** This callback is invoked by LSM when the client database writes to
+** the database file (i.e. to flush the contents of the in-memory tree).
+** This implies there may be work to do on the database, so signal
+** the worker threads.
+*/
+static void mt_client_work_hook(lsm_db *db, void *pArg){
+ LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */
+
+ /* Invoke the user level work-hook, if any. */
+ if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
+
+ /* Wake up worker thread 0. */
+ mt_signal_worker(pDb, 0);
+}
+
+static void mt_worker_work_hook(lsm_db *db, void *pArg){
+ LsmDb *pDb = (LsmDb *)pArg; /* LsmDb database handle */
+
+ /* Invoke the user level work-hook, if any. */
+ if( pDb->xWork ) pDb->xWork(db, pDb->pWorkCtx);
+}
+
+/*
+** Launch worker thread iWorker for database connection pDb.
+*/
+static int mt_start_worker(
+ LsmDb *pDb, /* Main database structure */
+ int iWorker, /* Worker number to start */
+ const char *zFilename, /* File name of database to open */
+ const char *zCfg, /* Connection configuration string */
+ int eType /* Type of worker thread */
+){
+ int rc = 0; /* Return code */
+ LsmWorker *p; /* Object to initialize */
+
+ assert( iWorker<pDb->nWorker );
+ assert( eType==LSMTEST_THREAD_CKPT
+ || eType==LSMTEST_THREAD_WORKER
+ || eType==LSMTEST_THREAD_WORKER_AC
+ );
+
+ p = &pDb->aWorker[iWorker];
+ p->eType = eType;
+ p->pDb = pDb;
+
+ /* Open the worker connection */
+ if( rc==0 ) rc = lsm_new(&pDb->env, &p->pWorker);
+ if( zCfg ){
+ test_lsm_config_str(pDb, p->pWorker, 1, zCfg, 0);
+ }
+ if( rc==0 ) rc = lsm_open(p->pWorker, zFilename);
+ lsm_config_log(p->pWorker, xLog, (void *)"worker");
+
+ /* Configure the work-hook */
+ if( rc==0 ){
+ lsm_config_work_hook(p->pWorker, mt_worker_work_hook, (void *)pDb);
+ }
+
+ if( eType==LSMTEST_THREAD_WORKER ){
+ test_lsm_config_str(0, p->pWorker, 1, "autocheckpoint=0", 0);
+ }
+
+ /* Kick off the worker thread. */
+ if( rc==0 ) rc = pthread_cond_init(&p->worker_cond, 0);
+ if( rc==0 ) rc = pthread_mutex_init(&p->worker_mutex, 0);
+ if( rc==0 ) rc = pthread_create(&p->worker_thread, 0, worker_main, (void *)p);
+
+ return rc;
+}
+
+
+static int testLsmStartWorkers(
+ LsmDb *pDb, int eModel, const char *zFilename, const char *zCfg
+){
+ int rc;
+
+ if( eModel<1 || eModel>4 ) return 1;
+ if( eModel==1 ) return 0;
+
+ /* Configure a work-hook for the client connection. Worker 0 is signalled
+ ** every time the users connection writes to the database. */
+ lsm_config_work_hook(pDb->db, mt_client_work_hook, (void *)pDb);
+
+ /* Allocate space for two worker connections. They may not both be
+ ** used, but both are allocated. */
+ pDb->aWorker = (LsmWorker *)testMalloc(sizeof(LsmWorker) * 2);
+ memset(pDb->aWorker, 0, sizeof(LsmWorker) * 2);
+
+ switch( eModel ){
+ case LSMTEST_MODE_BACKGROUND_CKPT:
+ pDb->nWorker = 1;
+ test_lsm_config_str(0, pDb->db, 0, "autocheckpoint=0", 0);
+ rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_CKPT);
+ break;
+
+ case LSMTEST_MODE_BACKGROUND_WORK:
+ pDb->nWorker = 1;
+ test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
+ rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER_AC);
+ break;
+
+ case LSMTEST_MODE_BACKGROUND_BOTH:
+ pDb->nWorker = 2;
+ test_lsm_config_str(0, pDb->db, 0, "autowork=0", 0);
+ rc = mt_start_worker(pDb, 0, zFilename, zCfg, LSMTEST_THREAD_WORKER);
+ if( rc==0 ){
+ rc = mt_start_worker(pDb, 1, zFilename, zCfg, LSMTEST_THREAD_CKPT);
+ }
+ break;
+ }
+
+ return rc;
+}
+
+
+int test_lsm_mt2(
+ const char *zSpec,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ const char *zCfg = "mt_mode=2";
+ return testLsmOpen(zCfg, zFilename, bClear, ppDb);
+}
+
+int test_lsm_mt3(
+ const char *zSpec,
+ const char *zFilename,
+ int bClear,
+ TestDb **ppDb
+){
+ const char *zCfg = "mt_mode=4";
+ return testLsmOpen(zCfg, zFilename, bClear, ppDb);
+}
+
+#else
+static void mt_shutdown(LsmDb *pDb) {
+ unused_parameter(pDb);
+}
+int test_lsm_mt(const char *zFilename, int bClear, TestDb **ppDb){
+ unused_parameter(zFilename);
+ unused_parameter(bClear);
+ unused_parameter(ppDb);
+ testPrintError("threads unavailable - recompile with LSM_MUTEX_PTHREADS\n");
+ return 1;
+}
+#endif