diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:28:19 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:28:19 +0000 |
commit | 18657a960e125336f704ea058e25c27bd3900dcb (patch) | |
tree | 17b438b680ed45a996d7b59951e6aa34023783f2 /ext/lsm1/lsm_main.c | |
parent | Initial commit. (diff) | |
download | sqlite3-18657a960e125336f704ea058e25c27bd3900dcb.tar.xz sqlite3-18657a960e125336f704ea058e25c27bd3900dcb.zip |
Adding upstream version 3.40.1.upstream/3.40.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ext/lsm1/lsm_main.c')
-rw-r--r-- | ext/lsm1/lsm_main.c | 1008 |
1 files changed, 1008 insertions, 0 deletions
diff --git a/ext/lsm1/lsm_main.c b/ext/lsm1/lsm_main.c new file mode 100644 index 0000000..a9c48e0 --- /dev/null +++ b/ext/lsm1/lsm_main.c @@ -0,0 +1,1008 @@ +/* +** 2011-08-18 +** +** The author disclaims copyright to this source code. In place of +** a legal notice, here is a blessing: +** +** May you do good and not evil. +** May you find forgiveness for yourself and forgive others. +** May you share freely, never taking more than you give. +** +************************************************************************* +** +** The main interface to the LSM module. +*/ +#include "lsmInt.h" + + +#ifdef LSM_DEBUG +/* +** This function returns a copy of its only argument. +** +** When the library is built with LSM_DEBUG defined, this function is called +** whenever an error code is generated (not propagated - generated). So +** if the library is mysteriously returning (say) LSM_IOERR, a breakpoint +** may be set in this function to determine why. +*/ +int lsmErrorBkpt(int rc){ + /* Set breakpoint here! */ + return rc; +} + +/* +** This function contains various assert() statements that test that the +** lsm_db structure passed as an argument is internally consistent. +*/ +static void assert_db_state(lsm_db *pDb){ + + /* If there is at least one cursor or a write transaction open, the database + ** handle must be holding a pointer to a client snapshot. And the reverse + ** - if there are no open cursors and no write transactions then there must + ** not be a client snapshot. */ + + assert( (pDb->pCsr!=0||pDb->nTransOpen>0)==(pDb->iReader>=0||pDb->bRoTrans) ); + + assert( (pDb->iReader<0 && pDb->bRoTrans==0) || pDb->pClient!=0 ); + + assert( pDb->nTransOpen>=0 ); +} +#else +# define assert_db_state(x) +#endif + +/* +** The default key-compare function. +*/ +static int xCmp(void *p1, int n1, void *p2, int n2){ + int res; + res = memcmp(p1, p2, LSM_MIN(n1, n2)); + if( res==0 ) res = (n1-n2); + return res; +} + +static void xLog(void *pCtx, int rc, const char *z){ + (void)(rc); + (void)(pCtx); + fprintf(stderr, "%s\n", z); + fflush(stderr); +} + +/* +** Allocate a new db handle. +*/ +int lsm_new(lsm_env *pEnv, lsm_db **ppDb){ + lsm_db *pDb; + + /* If the user did not provide an environment, use the default. */ + if( pEnv==0 ) pEnv = lsm_default_env(); + assert( pEnv ); + + /* Allocate the new database handle */ + *ppDb = pDb = (lsm_db *)lsmMallocZero(pEnv, sizeof(lsm_db)); + if( pDb==0 ) return LSM_NOMEM_BKPT; + + /* Initialize the new object */ + pDb->pEnv = pEnv; + pDb->nTreeLimit = LSM_DFLT_AUTOFLUSH; + pDb->nAutockpt = LSM_DFLT_AUTOCHECKPOINT; + pDb->bAutowork = LSM_DFLT_AUTOWORK; + pDb->eSafety = LSM_DFLT_SAFETY; + pDb->xCmp = xCmp; + pDb->nDfltPgsz = LSM_DFLT_PAGE_SIZE; + pDb->nDfltBlksz = LSM_DFLT_BLOCK_SIZE; + pDb->nMerge = LSM_DFLT_AUTOMERGE; + pDb->nMaxFreelist = LSM_MAX_FREELIST_ENTRIES; + pDb->bUseLog = LSM_DFLT_USE_LOG; + pDb->iReader = -1; + pDb->iRwclient = -1; + pDb->bMultiProc = LSM_DFLT_MULTIPLE_PROCESSES; + pDb->iMmap = LSM_DFLT_MMAP; + pDb->xLog = xLog; + pDb->compress.iId = LSM_COMPRESSION_NONE; + return LSM_OK; +} + +lsm_env *lsm_get_env(lsm_db *pDb){ + assert( pDb->pEnv ); + return pDb->pEnv; +} + +/* +** If database handle pDb is currently holding a client snapshot, but does +** not have any open cursors or write transactions, release it. +*/ +static void dbReleaseClientSnapshot(lsm_db *pDb){ + if( pDb->nTransOpen==0 && pDb->pCsr==0 ){ + lsmFinishReadTrans(pDb); + } +} + +static int getFullpathname( + lsm_env *pEnv, + const char *zRel, + char **pzAbs +){ + int nAlloc = 0; + char *zAlloc = 0; + int nReq = 0; + int rc; + + do{ + nAlloc = nReq; + rc = pEnv->xFullpath(pEnv, zRel, zAlloc, &nReq); + if( nReq>nAlloc ){ + zAlloc = lsmReallocOrFreeRc(pEnv, zAlloc, nReq, &rc); + } + }while( nReq>nAlloc && rc==LSM_OK ); + + if( rc!=LSM_OK ){ + lsmFree(pEnv, zAlloc); + zAlloc = 0; + } + *pzAbs = zAlloc; + return rc; +} + +/* +** Check that the bits in the db->mLock mask are consistent with the +** value stored in db->iRwclient. An assert shall fail otherwise. +*/ +static void assertRwclientLockValue(lsm_db *db){ +#ifndef NDEBUG + u64 msk; /* Mask of mLock bits for RWCLIENT locks */ + u64 rwclient = 0; /* Bit corresponding to db->iRwclient */ + + if( db->iRwclient>=0 ){ + rwclient = ((u64)1 << (LSM_LOCK_RWCLIENT(db->iRwclient)-1)); + } + msk = ((u64)1 << (LSM_LOCK_RWCLIENT(LSM_LOCK_NRWCLIENT)-1)) - 1; + msk -= (((u64)1 << (LSM_LOCK_RWCLIENT(0)-1)) - 1); + + assert( (db->mLock & msk)==rwclient ); +#endif +} + +/* +** Open a new connection to database zFilename. +*/ +int lsm_open(lsm_db *pDb, const char *zFilename){ + int rc; + + if( pDb->pDatabase ){ + rc = LSM_MISUSE; + }else{ + char *zFull; + + /* Translate the possibly relative pathname supplied by the user into + ** an absolute pathname. This is required because the supplied path + ** is used (either directly or with "-log" appended to it) for more + ** than one purpose - to open both the database and log files, and + ** perhaps to unlink the log file during disconnection. An absolute + ** path is required to ensure that the correct files are operated + ** on even if the application changes the cwd. */ + rc = getFullpathname(pDb->pEnv, zFilename, &zFull); + assert( rc==LSM_OK || zFull==0 ); + + /* Connect to the database. */ + if( rc==LSM_OK ){ + rc = lsmDbDatabaseConnect(pDb, zFull); + } + + if( pDb->bReadonly==0 ){ + /* Configure the file-system connection with the page-size and block-size + ** of this database. Even if the database file is zero bytes in size + ** on disk, these values have been set in shared-memory by now, and so + ** are guaranteed not to change during the lifetime of this connection. + */ + if( rc==LSM_OK && LSM_OK==(rc = lsmCheckpointLoad(pDb, 0)) ){ + lsmFsSetPageSize(pDb->pFS, lsmCheckpointPgsz(pDb->aSnapshot)); + lsmFsSetBlockSize(pDb->pFS, lsmCheckpointBlksz(pDb->aSnapshot)); + } + } + + lsmFree(pDb->pEnv, zFull); + assertRwclientLockValue(pDb); + } + + assert( pDb->bReadonly==0 || pDb->bReadonly==1 ); + assert( rc!=LSM_OK || (pDb->pShmhdr==0)==(pDb->bReadonly==1) ); + + return rc; +} + +int lsm_close(lsm_db *pDb){ + int rc = LSM_OK; + if( pDb ){ + assert_db_state(pDb); + if( pDb->pCsr || pDb->nTransOpen ){ + rc = LSM_MISUSE_BKPT; + }else{ + lsmMCursorFreeCache(pDb); + lsmFreeSnapshot(pDb->pEnv, pDb->pClient); + pDb->pClient = 0; + + assertRwclientLockValue(pDb); + + lsmDbDatabaseRelease(pDb); + lsmLogClose(pDb); + lsmFsClose(pDb->pFS); + /* assert( pDb->mLock==0 ); */ + + /* Invoke any destructors registered for the compression or + ** compression factory callbacks. */ + if( pDb->factory.xFree ) pDb->factory.xFree(pDb->factory.pCtx); + if( pDb->compress.xFree ) pDb->compress.xFree(pDb->compress.pCtx); + + lsmFree(pDb->pEnv, pDb->rollback.aArray); + lsmFree(pDb->pEnv, pDb->aTrans); + lsmFree(pDb->pEnv, pDb->apShm); + lsmFree(pDb->pEnv, pDb); + } + } + return rc; +} + +int lsm_config(lsm_db *pDb, int eParam, ...){ + int rc = LSM_OK; + va_list ap; + va_start(ap, eParam); + + switch( eParam ){ + case LSM_CONFIG_AUTOFLUSH: { + /* This parameter is read and written in KB. But all internal + ** processing is done in bytes. */ + int *piVal = va_arg(ap, int *); + int iVal = *piVal; + if( iVal>=0 && iVal<=(1024*1024) ){ + pDb->nTreeLimit = iVal*1024; + } + *piVal = (pDb->nTreeLimit / 1024); + break; + } + + case LSM_CONFIG_AUTOWORK: { + int *piVal = va_arg(ap, int *); + if( *piVal>=0 ){ + pDb->bAutowork = *piVal; + } + *piVal = pDb->bAutowork; + break; + } + + case LSM_CONFIG_AUTOCHECKPOINT: { + /* This parameter is read and written in KB. But all internal processing + ** (including the lsm_db.nAutockpt variable) is done in bytes. */ + int *piVal = va_arg(ap, int *); + if( *piVal>=0 ){ + int iVal = *piVal; + pDb->nAutockpt = (i64)iVal * 1024; + } + *piVal = (int)(pDb->nAutockpt / 1024); + break; + } + + case LSM_CONFIG_PAGE_SIZE: { + int *piVal = va_arg(ap, int *); + if( pDb->pDatabase ){ + /* If lsm_open() has been called, this is a read-only parameter. + ** Set the output variable to the page-size according to the + ** FileSystem object. */ + *piVal = lsmFsPageSize(pDb->pFS); + }else{ + if( *piVal>=256 && *piVal<=65536 && ((*piVal-1) & *piVal)==0 ){ + pDb->nDfltPgsz = *piVal; + }else{ + *piVal = pDb->nDfltPgsz; + } + } + break; + } + + case LSM_CONFIG_BLOCK_SIZE: { + /* This parameter is read and written in KB. But all internal + ** processing is done in bytes. */ + int *piVal = va_arg(ap, int *); + if( pDb->pDatabase ){ + /* If lsm_open() has been called, this is a read-only parameter. + ** Set the output variable to the block-size in KB according to the + ** FileSystem object. */ + *piVal = lsmFsBlockSize(pDb->pFS) / 1024; + }else{ + int iVal = *piVal; + if( iVal>=64 && iVal<=65536 && ((iVal-1) & iVal)==0 ){ + pDb->nDfltBlksz = iVal * 1024; + }else{ + *piVal = pDb->nDfltBlksz / 1024; + } + } + break; + } + + case LSM_CONFIG_SAFETY: { + int *piVal = va_arg(ap, int *); + if( *piVal>=0 && *piVal<=2 ){ + pDb->eSafety = *piVal; + } + *piVal = pDb->eSafety; + break; + } + + case LSM_CONFIG_MMAP: { + int *piVal = va_arg(ap, int *); + if( pDb->iReader<0 && *piVal>=0 ){ + pDb->iMmap = *piVal; + rc = lsmFsConfigure(pDb); + } + *piVal = pDb->iMmap; + break; + } + + case LSM_CONFIG_USE_LOG: { + int *piVal = va_arg(ap, int *); + if( pDb->nTransOpen==0 && (*piVal==0 || *piVal==1) ){ + pDb->bUseLog = *piVal; + } + *piVal = pDb->bUseLog; + break; + } + + case LSM_CONFIG_AUTOMERGE: { + int *piVal = va_arg(ap, int *); + if( *piVal>1 ) pDb->nMerge = *piVal; + *piVal = pDb->nMerge; + break; + } + + case LSM_CONFIG_MAX_FREELIST: { + int *piVal = va_arg(ap, int *); + if( *piVal>=2 && *piVal<=LSM_MAX_FREELIST_ENTRIES ){ + pDb->nMaxFreelist = *piVal; + } + *piVal = pDb->nMaxFreelist; + break; + } + + case LSM_CONFIG_MULTIPLE_PROCESSES: { + int *piVal = va_arg(ap, int *); + if( pDb->pDatabase ){ + /* If lsm_open() has been called, this is a read-only parameter. + ** Set the output variable to true if this connection is currently + ** in multi-process mode. */ + *piVal = lsmDbMultiProc(pDb); + }else{ + pDb->bMultiProc = *piVal = (*piVal!=0); + } + break; + } + + case LSM_CONFIG_READONLY: { + int *piVal = va_arg(ap, int *); + /* If lsm_open() has been called, this is a read-only parameter. */ + if( pDb->pDatabase==0 && *piVal>=0 ){ + pDb->bReadonly = *piVal = (*piVal!=0); + } + *piVal = pDb->bReadonly; + break; + } + + case LSM_CONFIG_SET_COMPRESSION: { + lsm_compress *p = va_arg(ap, lsm_compress *); + if( pDb->iReader>=0 && pDb->bInFactory==0 ){ + /* May not change compression schemes with an open transaction */ + rc = LSM_MISUSE_BKPT; + }else{ + if( pDb->compress.xFree ){ + /* Invoke any destructor belonging to the current compression. */ + pDb->compress.xFree(pDb->compress.pCtx); + } + if( p->xBound==0 ){ + memset(&pDb->compress, 0, sizeof(lsm_compress)); + pDb->compress.iId = LSM_COMPRESSION_NONE; + }else{ + memcpy(&pDb->compress, p, sizeof(lsm_compress)); + } + rc = lsmFsConfigure(pDb); + } + break; + } + + case LSM_CONFIG_SET_COMPRESSION_FACTORY: { + lsm_compress_factory *p = va_arg(ap, lsm_compress_factory *); + if( pDb->factory.xFree ){ + /* Invoke any destructor belonging to the current factory. */ + pDb->factory.xFree(pDb->factory.pCtx); + } + memcpy(&pDb->factory, p, sizeof(lsm_compress_factory)); + break; + } + + case LSM_CONFIG_GET_COMPRESSION: { + lsm_compress *p = va_arg(ap, lsm_compress *); + memcpy(p, &pDb->compress, sizeof(lsm_compress)); + break; + } + + default: + rc = LSM_MISUSE; + break; + } + + va_end(ap); + return rc; +} + +void lsmAppendSegmentList(LsmString *pStr, char *zPre, Segment *pSeg){ + lsmStringAppendf(pStr, "%s{%d %d %d %d}", zPre, + pSeg->iFirst, pSeg->iLastPg, pSeg->iRoot, pSeg->nSize + ); +} + +static int infoGetWorker(lsm_db *pDb, Snapshot **pp, int *pbUnlock){ + int rc = LSM_OK; + + assert( *pbUnlock==0 ); + if( !pDb->pWorker ){ + rc = lsmBeginWork(pDb); + if( rc!=LSM_OK ) return rc; + *pbUnlock = 1; + } + if( pp ) *pp = pDb->pWorker; + return rc; +} + +static void infoFreeWorker(lsm_db *pDb, int bUnlock){ + if( bUnlock ){ + int rcdummy = LSM_BUSY; + lsmFinishWork(pDb, 0, &rcdummy); + } +} + +int lsmStructList( + lsm_db *pDb, /* Database handle */ + char **pzOut /* OUT: Nul-terminated string (tcl list) */ +){ + Level *pTopLevel = 0; /* Top level of snapshot to report on */ + int rc = LSM_OK; + Level *p; + LsmString s; + Snapshot *pWorker; /* Worker snapshot */ + int bUnlock = 0; + + /* Obtain the worker snapshot */ + rc = infoGetWorker(pDb, &pWorker, &bUnlock); + if( rc!=LSM_OK ) return rc; + + /* Format the contents of the snapshot as text */ + pTopLevel = lsmDbSnapshotLevel(pWorker); + lsmStringInit(&s, pDb->pEnv); + for(p=pTopLevel; rc==LSM_OK && p; p=p->pNext){ + int i; + lsmStringAppendf(&s, "%s{%d", (s.n ? " " : ""), (int)p->iAge); + lsmAppendSegmentList(&s, " ", &p->lhs); + for(i=0; rc==LSM_OK && i<p->nRight; i++){ + lsmAppendSegmentList(&s, " ", &p->aRhs[i]); + } + lsmStringAppend(&s, "}", 1); + } + rc = s.n>=0 ? LSM_OK : LSM_NOMEM; + + /* Release the snapshot and return */ + infoFreeWorker(pDb, bUnlock); + *pzOut = s.z; + return rc; +} + +static int infoFreelistCb(void *pCtx, int iBlk, i64 iSnapshot){ + LsmString *pStr = (LsmString *)pCtx; + lsmStringAppendf(pStr, "%s{%d %lld}", (pStr->n?" ":""), iBlk, iSnapshot); + return 0; +} + +int lsmInfoFreelist(lsm_db *pDb, char **pzOut){ + Snapshot *pWorker; /* Worker snapshot */ + int bUnlock = 0; + LsmString s; + int rc; + + /* Obtain the worker snapshot */ + rc = infoGetWorker(pDb, &pWorker, &bUnlock); + if( rc!=LSM_OK ) return rc; + + lsmStringInit(&s, pDb->pEnv); + rc = lsmWalkFreelist(pDb, 0, infoFreelistCb, &s); + if( rc!=LSM_OK ){ + lsmFree(pDb->pEnv, s.z); + }else{ + *pzOut = s.z; + } + + /* Release the snapshot and return */ + infoFreeWorker(pDb, bUnlock); + return rc; +} + +static int infoTreeSize(lsm_db *db, int *pnOldKB, int *pnNewKB){ + ShmHeader *pShm = db->pShmhdr; + TreeHeader *p = &pShm->hdr1; + + /* The following code suffers from two race conditions, as it accesses and + ** trusts the contents of shared memory without verifying checksums: + ** + ** * The two values read - TreeHeader.root.nByte and oldroot.nByte - are + ** 32-bit fields. It is assumed that reading from one of these + ** is atomic - that it is not possible to read a partially written + ** garbage value. However the two values may be mutually inconsistent. + ** + ** * TreeHeader.iLogOff is a 64-bit value. And lsmCheckpointLogOffset() + ** reads a 64-bit value from a snapshot stored in shared memory. It + ** is assumed that in each case it is possible to read a partially + ** written garbage value. If this occurs, then the value returned + ** for the size of the "old" tree may reflect the size of an "old" + ** tree that was recently flushed to disk. + ** + ** Given the context in which this function is called (as a result of an + ** lsm_info(LSM_INFO_TREE_SIZE) request), neither of these are considered to + ** be problems. + */ + *pnNewKB = ((int)p->root.nByte + 1023) / 1024; + if( p->iOldShmid ){ + if( p->iOldLog==lsmCheckpointLogOffset(pShm->aSnap1) ){ + *pnOldKB = 0; + }else{ + *pnOldKB = ((int)p->oldroot.nByte + 1023) / 1024; + } + }else{ + *pnOldKB = 0; + } + + return LSM_OK; +} + +int lsm_info(lsm_db *pDb, int eParam, ...){ + int rc = LSM_OK; + va_list ap; + va_start(ap, eParam); + + switch( eParam ){ + case LSM_INFO_NWRITE: { + int *piVal = va_arg(ap, int *); + *piVal = lsmFsNWrite(pDb->pFS); + break; + } + + case LSM_INFO_NREAD: { + int *piVal = va_arg(ap, int *); + *piVal = lsmFsNRead(pDb->pFS); + break; + } + + case LSM_INFO_DB_STRUCTURE: { + char **pzVal = va_arg(ap, char **); + rc = lsmStructList(pDb, pzVal); + break; + } + + case LSM_INFO_ARRAY_STRUCTURE: { + LsmPgno pgno = va_arg(ap, LsmPgno); + char **pzVal = va_arg(ap, char **); + rc = lsmInfoArrayStructure(pDb, 0, pgno, pzVal); + break; + } + + case LSM_INFO_ARRAY_PAGES: { + LsmPgno pgno = va_arg(ap, LsmPgno); + char **pzVal = va_arg(ap, char **); + rc = lsmInfoArrayPages(pDb, pgno, pzVal); + break; + } + + case LSM_INFO_PAGE_HEX_DUMP: + case LSM_INFO_PAGE_ASCII_DUMP: { + LsmPgno pgno = va_arg(ap, LsmPgno); + char **pzVal = va_arg(ap, char **); + int bUnlock = 0; + rc = infoGetWorker(pDb, 0, &bUnlock); + if( rc==LSM_OK ){ + int bHex = (eParam==LSM_INFO_PAGE_HEX_DUMP); + rc = lsmInfoPageDump(pDb, pgno, bHex, pzVal); + } + infoFreeWorker(pDb, bUnlock); + break; + } + + case LSM_INFO_LOG_STRUCTURE: { + char **pzVal = va_arg(ap, char **); + rc = lsmInfoLogStructure(pDb, pzVal); + break; + } + + case LSM_INFO_FREELIST: { + char **pzVal = va_arg(ap, char **); + rc = lsmInfoFreelist(pDb, pzVal); + break; + } + + case LSM_INFO_CHECKPOINT_SIZE: { + int *pnKB = va_arg(ap, int *); + rc = lsmCheckpointSize(pDb, pnKB); + break; + } + + case LSM_INFO_TREE_SIZE: { + int *pnOld = va_arg(ap, int *); + int *pnNew = va_arg(ap, int *); + rc = infoTreeSize(pDb, pnOld, pnNew); + break; + } + + case LSM_INFO_COMPRESSION_ID: { + unsigned int *piOut = va_arg(ap, unsigned int *); + if( pDb->pClient ){ + *piOut = pDb->pClient->iCmpId; + }else{ + rc = lsmInfoCompressionId(pDb, piOut); + } + break; + } + + default: + rc = LSM_MISUSE; + break; + } + + va_end(ap); + return rc; +} + +static int doWriteOp( + lsm_db *pDb, + int bDeleteRange, + const void *pKey, int nKey, /* Key to write or delete */ + const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */ +){ + int rc = LSM_OK; /* Return code */ + int bCommit = 0; /* True to commit before returning */ + + if( pDb->nTransOpen==0 ){ + bCommit = 1; + rc = lsm_begin(pDb, 1); + } + + if( rc==LSM_OK ){ + int eType = (bDeleteRange ? LSM_DRANGE : (nVal>=0?LSM_WRITE:LSM_DELETE)); + rc = lsmLogWrite(pDb, eType, (void *)pKey, nKey, (void *)pVal, nVal); + } + + lsmSortedSaveTreeCursors(pDb); + + if( rc==LSM_OK ){ + int pgsz = lsmFsPageSize(pDb->pFS); + int nQuant = LSM_AUTOWORK_QUANT * pgsz; + int nBefore; + int nAfter; + int nDiff; + + if( nQuant>pDb->nTreeLimit ){ + nQuant = LSM_MAX(pDb->nTreeLimit, pgsz); + } + + nBefore = lsmTreeSize(pDb); + if( bDeleteRange ){ + rc = lsmTreeDelete(pDb, (void *)pKey, nKey, (void *)pVal, nVal); + }else{ + rc = lsmTreeInsert(pDb, (void *)pKey, nKey, (void *)pVal, nVal); + } + + nAfter = lsmTreeSize(pDb); + nDiff = (nAfter/nQuant) - (nBefore/nQuant); + if( rc==LSM_OK && pDb->bAutowork && nDiff!=0 ){ + rc = lsmSortedAutoWork(pDb, nDiff * LSM_AUTOWORK_QUANT); + } + } + + /* If a transaction was opened at the start of this function, commit it. + ** Or, if an error has occurred, roll it back. */ + if( bCommit ){ + if( rc==LSM_OK ){ + rc = lsm_commit(pDb, 0); + }else{ + lsm_rollback(pDb, 0); + } + } + + return rc; +} + +/* +** Write a new value into the database. +*/ +int lsm_insert( + lsm_db *db, /* Database connection */ + const void *pKey, int nKey, /* Key to write or delete */ + const void *pVal, int nVal /* Value to write. Or nVal==-1 for a delete */ +){ + return doWriteOp(db, 0, pKey, nKey, pVal, nVal); +} + +/* +** Delete a value from the database. +*/ +int lsm_delete(lsm_db *db, const void *pKey, int nKey){ + return doWriteOp(db, 0, pKey, nKey, 0, -1); +} + +/* +** Delete a range of database keys. +*/ +int lsm_delete_range( + lsm_db *db, /* Database handle */ + const void *pKey1, int nKey1, /* Lower bound of range to delete */ + const void *pKey2, int nKey2 /* Upper bound of range to delete */ +){ + int rc = LSM_OK; + if( db->xCmp((void *)pKey1, nKey1, (void *)pKey2, nKey2)<0 ){ + rc = doWriteOp(db, 1, pKey1, nKey1, pKey2, nKey2); + } + return rc; +} + +/* +** Open a new cursor handle. +** +** If there are currently no other open cursor handles, and no open write +** transaction, open a read transaction here. +*/ +int lsm_csr_open(lsm_db *pDb, lsm_cursor **ppCsr){ + int rc = LSM_OK; /* Return code */ + MultiCursor *pCsr = 0; /* New cursor object */ + + /* Open a read transaction if one is not already open. */ + assert_db_state(pDb); + + if( pDb->pShmhdr==0 ){ + assert( pDb->bReadonly ); + rc = lsmBeginRoTrans(pDb); + }else if( pDb->iReader<0 ){ + rc = lsmBeginReadTrans(pDb); + } + + /* Allocate the multi-cursor. */ + if( rc==LSM_OK ){ + rc = lsmMCursorNew(pDb, &pCsr); + } + + /* If an error has occured, set the output to NULL and delete any partially + ** allocated cursor. If this means there are no open cursors, release the + ** client snapshot. */ + if( rc!=LSM_OK ){ + lsmMCursorClose(pCsr, 0); + dbReleaseClientSnapshot(pDb); + } + + assert_db_state(pDb); + *ppCsr = (lsm_cursor *)pCsr; + return rc; +} + +/* +** Close a cursor opened using lsm_csr_open(). +*/ +int lsm_csr_close(lsm_cursor *p){ + if( p ){ + lsm_db *pDb = lsmMCursorDb((MultiCursor *)p); + assert_db_state(pDb); + lsmMCursorClose((MultiCursor *)p, 1); + dbReleaseClientSnapshot(pDb); + assert_db_state(pDb); + } + return LSM_OK; +} + +/* +** Attempt to seek the cursor to the database entry specified by pKey/nKey. +** If an error occurs (e.g. an OOM or IO error), return an LSM error code. +** Otherwise, return LSM_OK. +*/ +int lsm_csr_seek(lsm_cursor *pCsr, const void *pKey, int nKey, int eSeek){ + return lsmMCursorSeek((MultiCursor *)pCsr, 0, (void *)pKey, nKey, eSeek); +} + +int lsm_csr_next(lsm_cursor *pCsr){ + return lsmMCursorNext((MultiCursor *)pCsr); +} + +int lsm_csr_prev(lsm_cursor *pCsr){ + return lsmMCursorPrev((MultiCursor *)pCsr); +} + +int lsm_csr_first(lsm_cursor *pCsr){ + return lsmMCursorFirst((MultiCursor *)pCsr); +} + +int lsm_csr_last(lsm_cursor *pCsr){ + return lsmMCursorLast((MultiCursor *)pCsr); +} + +int lsm_csr_valid(lsm_cursor *pCsr){ + return lsmMCursorValid((MultiCursor *)pCsr); +} + +int lsm_csr_key(lsm_cursor *pCsr, const void **ppKey, int *pnKey){ + return lsmMCursorKey((MultiCursor *)pCsr, (void **)ppKey, pnKey); +} + +int lsm_csr_value(lsm_cursor *pCsr, const void **ppVal, int *pnVal){ + return lsmMCursorValue((MultiCursor *)pCsr, (void **)ppVal, pnVal); +} + +void lsm_config_log( + lsm_db *pDb, + void (*xLog)(void *, int, const char *), + void *pCtx +){ + pDb->xLog = xLog; + pDb->pLogCtx = pCtx; +} + +void lsm_config_work_hook( + lsm_db *pDb, + void (*xWork)(lsm_db *, void *), + void *pCtx +){ + pDb->xWork = xWork; + pDb->pWorkCtx = pCtx; +} + +void lsmLogMessage(lsm_db *pDb, int rc, const char *zFormat, ...){ + if( pDb->xLog ){ + LsmString s; + va_list ap, ap2; + lsmStringInit(&s, pDb->pEnv); + va_start(ap, zFormat); + va_start(ap2, zFormat); + lsmStringVAppendf(&s, zFormat, ap, ap2); + va_end(ap); + va_end(ap2); + pDb->xLog(pDb->pLogCtx, rc, s.z); + lsmStringClear(&s); + } +} + +int lsm_begin(lsm_db *pDb, int iLevel){ + int rc; + + assert_db_state( pDb ); + rc = (pDb->bReadonly ? LSM_READONLY : LSM_OK); + + /* A value less than zero means open one more transaction. */ + if( iLevel<0 ) iLevel = pDb->nTransOpen + 1; + if( iLevel>pDb->nTransOpen ){ + int i; + + /* Extend the pDb->aTrans[] array if required. */ + if( rc==LSM_OK && pDb->nTransAlloc<iLevel ){ + TransMark *aNew; /* New allocation */ + int nByte = sizeof(TransMark) * (iLevel+1); + aNew = (TransMark *)lsmRealloc(pDb->pEnv, pDb->aTrans, nByte); + if( !aNew ){ + rc = LSM_NOMEM; + }else{ + nByte = sizeof(TransMark) * (iLevel+1 - pDb->nTransAlloc); + memset(&aNew[pDb->nTransAlloc], 0, nByte); + pDb->nTransAlloc = iLevel+1; + pDb->aTrans = aNew; + } + } + + if( rc==LSM_OK && pDb->nTransOpen==0 ){ + rc = lsmBeginWriteTrans(pDb); + } + + if( rc==LSM_OK ){ + for(i=pDb->nTransOpen; i<iLevel; i++){ + lsmTreeMark(pDb, &pDb->aTrans[i].tree); + lsmLogTell(pDb, &pDb->aTrans[i].log); + } + pDb->nTransOpen = iLevel; + } + } + + return rc; +} + +int lsm_commit(lsm_db *pDb, int iLevel){ + int rc = LSM_OK; + + assert_db_state( pDb ); + + /* A value less than zero means close the innermost nested transaction. */ + if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1); + + if( iLevel<pDb->nTransOpen ){ + if( iLevel==0 ){ + int rc2; + /* Commit the transaction to disk. */ + if( rc==LSM_OK ) rc = lsmLogCommit(pDb); + if( rc==LSM_OK && pDb->eSafety==LSM_SAFETY_FULL ){ + rc = lsmFsSyncLog(pDb->pFS); + } + rc2 = lsmFinishWriteTrans(pDb, (rc==LSM_OK)); + if( rc==LSM_OK ) rc = rc2; + } + pDb->nTransOpen = iLevel; + } + dbReleaseClientSnapshot(pDb); + return rc; +} + +int lsm_rollback(lsm_db *pDb, int iLevel){ + int rc = LSM_OK; + assert_db_state( pDb ); + + if( pDb->nTransOpen ){ + /* A value less than zero means close the innermost nested transaction. */ + if( iLevel<0 ) iLevel = LSM_MAX(0, pDb->nTransOpen - 1); + + if( iLevel<=pDb->nTransOpen ){ + TransMark *pMark = &pDb->aTrans[(iLevel==0 ? 0 : iLevel-1)]; + lsmTreeRollback(pDb, &pMark->tree); + if( iLevel ) lsmLogSeek(pDb, &pMark->log); + pDb->nTransOpen = iLevel; + } + + if( pDb->nTransOpen==0 ){ + lsmFinishWriteTrans(pDb, 0); + } + dbReleaseClientSnapshot(pDb); + } + + return rc; +} + +int lsm_get_user_version(lsm_db *pDb, unsigned int *piUsr){ + int rc = LSM_OK; /* Return code */ + + /* Open a read transaction if one is not already open. */ + assert_db_state(pDb); + if( pDb->pShmhdr==0 ){ + assert( pDb->bReadonly ); + rc = lsmBeginRoTrans(pDb); + }else if( pDb->iReader<0 ){ + rc = lsmBeginReadTrans(pDb); + } + + /* Allocate the multi-cursor. */ + if( rc==LSM_OK ){ + *piUsr = pDb->treehdr.iUsrVersion; + } + + dbReleaseClientSnapshot(pDb); + assert_db_state(pDb); + return rc; +} + +int lsm_set_user_version(lsm_db *pDb, unsigned int iUsr){ + int rc = LSM_OK; /* Return code */ + int bCommit = 0; /* True to commit before returning */ + + if( pDb->nTransOpen==0 ){ + bCommit = 1; + rc = lsm_begin(pDb, 1); + } + + if( rc==LSM_OK ){ + pDb->treehdr.iUsrVersion = iUsr; + } + + /* If a transaction was opened at the start of this function, commit it. + ** Or, if an error has occurred, roll it back. */ + if( bCommit ){ + if( rc==LSM_OK ){ + rc = lsm_commit(pDb, 0); + }else{ + lsm_rollback(pDb, 0); + } + } + + return rc; +} |