summaryrefslogtreecommitdiffstats
path: root/ext/lsm1/lsm_ckpt.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/lsm1/lsm_ckpt.c')
-rw-r--r--ext/lsm1/lsm_ckpt.c1239
1 files changed, 1239 insertions, 0 deletions
diff --git a/ext/lsm1/lsm_ckpt.c b/ext/lsm1/lsm_ckpt.c
new file mode 100644
index 0000000..ba92a82
--- /dev/null
+++ b/ext/lsm1/lsm_ckpt.c
@@ -0,0 +1,1239 @@
+/*
+** 2011-09-11
+**
+** The author disclaims copyright to this source code. In place of
+** a legal notice, here is a blessing:
+**
+** May you do good and not evil.
+** May you find forgiveness for yourself and forgive others.
+** May you share freely, never taking more than you give.
+**
+*************************************************************************
+**
+** This file contains code to read and write checkpoints.
+**
+** A checkpoint represents the database layout at a single point in time.
+** It includes a log offset. When an existing database is opened, the
+** current state is determined by reading the newest checkpoint and updating
+** it with all committed transactions from the log that follow the specified
+** offset.
+*/
+#include "lsmInt.h"
+
+/*
+** CHECKPOINT BLOB FORMAT:
+**
+** A checkpoint blob is a series of unsigned 32-bit integers stored in
+** big-endian byte order. As follows:
+**
+** Checkpoint header (see the CKPT_HDR_XXX #defines):
+**
+** 1. The checkpoint id MSW.
+** 2. The checkpoint id LSW.
+** 3. The number of integer values in the entire checkpoint, including
+** the two checksum values.
+** 4. The compression scheme id.
+** 5. The total number of blocks in the database.
+** 6. The block size.
+** 7. The number of levels.
+** 8. The nominal database page size.
+** 9. The number of pages (in total) written to the database file.
+**
+** Log pointer:
+**
+** 1. The log offset MSW.
+** 2. The log offset LSW.
+** 3. Log checksum 0.
+** 4. Log checksum 1.
+**
+** Note that the "log offset" is not the literal byte offset. Instead,
+** it is the byte offset multiplied by 2, with least significant bit
+** toggled each time the log pointer value is changed. This is to make
+** sure that this field changes each time the log pointer is updated,
+** even if the log file itself is disabled. See lsmTreeMakeOld().
+**
+** See ckptExportLog() and ckptImportLog().
+**
+** Append points:
+**
+** 8 integers (4 * 64-bit page numbers). See ckptExportAppendlist().
+**
+** For each level in the database, a level record. Formatted as follows:
+**
+** 0. Age of the level (least significant 16-bits). And flags mask (most
+** significant 16-bits).
+** 1. The number of right-hand segments (nRight, possibly 0),
+** 2. Segment record for left-hand segment (8 integers defined below),
+** 3. Segment record for each right-hand segment (8 integers defined below),
+** 4. If nRight>0, The number of segments involved in the merge
+** 5. if nRight>0, Current nSkip value (see Merge structure defn.),
+** 6. For each segment in the merge:
+** 5a. Page number of next cell to read during merge (this field
+** is 64-bits - 2 integers)
+** 5b. Cell number of next cell to read during merge
+** 7. Page containing current split-key (64-bits - 2 integers).
+** 8. Cell within page containing current split-key.
+** 9. Current pointer value (64-bits - 2 integers).
+**
+** The block redirect array:
+**
+** 1. Number of redirections (maximum LSM_MAX_BLOCK_REDIRECTS).
+** 2. For each redirection:
+** a. "from" block number
+** b. "to" block number
+**
+** The in-memory freelist entries. Each entry is either an insert or a
+** delete. The in-memory freelist is to the free-block-list as the
+** in-memory tree is to the users database content.
+**
+** 1. Number of free-list entries stored in checkpoint header.
+** 2. Number of free blocks (in total).
+** 3. Total number of blocks freed during database lifetime.
+** 4. For each entry:
+** 2a. Block number of free block.
+** 2b. A 64-bit integer (MSW followed by LSW). -1 for a delete entry,
+** or the associated checkpoint id for an insert.
+**
+** The checksum:
+**
+** 1. Checksum value 1.
+** 2. Checksum value 2.
+**
+** In the above, a segment record consists of the following four 64-bit
+** fields (converted to 2 * u32 by storing the MSW followed by LSW):
+**
+** 1. First page of array,
+** 2. Last page of array,
+** 3. Root page of array (or 0),
+** 4. Size of array in pages.
+*/
+
+/*
+** LARGE NUMBERS OF LEVEL RECORDS:
+**
+** A limit on the number of rhs segments that may be present in the database
+** file. Defining this limit ensures that all level records fit within
+** the 4096 byte limit for checkpoint blobs.
+**
+** The number of right-hand-side segments in a database is counted as
+** follows:
+**
+** * For each level in the database not undergoing a merge, add 1.
+**
+** * For each level in the database that is undergoing a merge, add
+** the number of segments on the rhs of the level.
+**
+** A level record not undergoing a merge is 10 integers. A level record
+** with nRhs rhs segments and (nRhs+1) input segments (i.e. including the
+** separators from the next level) is (11*nRhs+20) integers. The maximum
+** per right-hand-side level is therefore 21 integers. So the maximum
+** size of all level records in a checkpoint is 21*40=820 integers.
+**
+** TODO: Before pointer values were changed from 32 to 64 bits, the above
+** used to come to 420 bytes - leaving significant space for a free-list
+** prefix. No more. To fix this, reduce the size of the level records in
+** a db snapshot, and improve management of the free-list tail in
+** lsm_sorted.c.
+*/
+#define LSM_MAX_RHS_SEGMENTS 40
+
+/*
+** LARGE NUMBERS OF FREELIST ENTRIES:
+**
+** There is also a limit (LSM_MAX_FREELIST_ENTRIES - defined in lsmInt.h)
+** on the number of free-list entries stored in a checkpoint. Since each
+** free-list entry consists of 3 integers, the maximum free-list size is
+** 3*100=300 integers. Combined with the limit on rhs segments defined
+** above, this ensures that a checkpoint always fits within a 4096 byte
+** meta page.
+**
+** If the database contains more than 100 free blocks, the "overflow" flag
+** in the checkpoint header is set and the remainder are stored in the
+** system FREELIST entry in the LSM (along with user data). The value
+** accompanying the FREELIST key in the LSM is, like a checkpoint, an array
+** of 32-bit big-endian integers. As follows:
+**
+** For each entry:
+** a. Block number of free block.
+** b. MSW of associated checkpoint id.
+** c. LSW of associated checkpoint id.
+**
+** The number of entries is not required - it is implied by the size of the
+** value blob containing the integer array.
+**
+** Note that the limit defined by LSM_MAX_FREELIST_ENTRIES is a hard limit.
+** The actual value used may be configured using LSM_CONFIG_MAX_FREELIST.
+*/
+
+/*
+** The argument to this macro must be of type u32. On a little-endian
+** architecture, it returns the u32 value that results from interpreting
+** the 4 bytes as a big-endian value. On a big-endian architecture, it
+** returns the value that would be produced by intepreting the 4 bytes
+** of the input value as a little-endian integer.
+*/
+#define BYTESWAP32(x) ( \
+ (((x)&0x000000FF)<<24) + (((x)&0x0000FF00)<<8) \
+ + (((x)&0x00FF0000)>>8) + (((x)&0xFF000000)>>24) \
+)
+
+static const int one = 1;
+#define LSM_LITTLE_ENDIAN (*(u8 *)(&one))
+
+/* Sizes, in integers, of various parts of the checkpoint. */
+#define CKPT_HDR_SIZE 9
+#define CKPT_LOGPTR_SIZE 4
+#define CKPT_APPENDLIST_SIZE (LSM_APPLIST_SZ * 2)
+
+/* A #define to describe each integer in the checkpoint header. */
+#define CKPT_HDR_ID_MSW 0
+#define CKPT_HDR_ID_LSW 1
+#define CKPT_HDR_NCKPT 2
+#define CKPT_HDR_CMPID 3
+#define CKPT_HDR_NBLOCK 4
+#define CKPT_HDR_BLKSZ 5
+#define CKPT_HDR_NLEVEL 6
+#define CKPT_HDR_PGSZ 7
+#define CKPT_HDR_NWRITE 8
+
+#define CKPT_HDR_LO_MSW 9
+#define CKPT_HDR_LO_LSW 10
+#define CKPT_HDR_LO_CKSUM1 11
+#define CKPT_HDR_LO_CKSUM2 12
+
+typedef struct CkptBuffer CkptBuffer;
+
+/*
+** Dynamic buffer used to accumulate data for a checkpoint.
+*/
+struct CkptBuffer {
+ lsm_env *pEnv;
+ int nAlloc;
+ u32 *aCkpt;
+};
+
+/*
+** Calculate the checksum of the checkpoint specified by arguments aCkpt and
+** nCkpt. Store the checksum in *piCksum1 and *piCksum2 before returning.
+**
+** The value of the nCkpt parameter includes the two checksum values at
+** the end of the checkpoint. They are not used as inputs to the checksum
+** calculation. The checksum is based on the array of (nCkpt-2) integers
+** at aCkpt[].
+*/
+static void ckptChecksum(u32 *aCkpt, u32 nCkpt, u32 *piCksum1, u32 *piCksum2){
+ u32 i;
+ u32 cksum1 = 1;
+ u32 cksum2 = 2;
+
+ if( nCkpt % 2 ){
+ cksum1 += aCkpt[nCkpt-3] & 0x0000FFFF;
+ cksum2 += aCkpt[nCkpt-3] & 0xFFFF0000;
+ }
+
+ for(i=0; (i+3)<nCkpt; i+=2){
+ cksum1 += cksum2 + aCkpt[i];
+ cksum2 += cksum1 + aCkpt[i+1];
+ }
+
+ *piCksum1 = cksum1;
+ *piCksum2 = cksum2;
+}
+
+/*
+** Set integer iIdx of the checkpoint accumulating in buffer *p to iVal.
+*/
+static void ckptSetValue(CkptBuffer *p, int iIdx, u32 iVal, int *pRc){
+ if( *pRc ) return;
+ if( iIdx>=p->nAlloc ){
+ int nNew = LSM_MAX(8, iIdx*2);
+ p->aCkpt = (u32 *)lsmReallocOrFree(p->pEnv, p->aCkpt, nNew*sizeof(u32));
+ if( !p->aCkpt ){
+ *pRc = LSM_NOMEM_BKPT;
+ return;
+ }
+ p->nAlloc = nNew;
+ }
+ p->aCkpt[iIdx] = iVal;
+}
+
+/*
+** Argument aInt points to an array nInt elements in size. Switch the
+** endian-ness of each element of the array.
+*/
+static void ckptChangeEndianness(u32 *aInt, int nInt){
+ if( LSM_LITTLE_ENDIAN ){
+ int i;
+ for(i=0; i<nInt; i++) aInt[i] = BYTESWAP32(aInt[i]);
+ }
+}
+
+/*
+** Object *p contains a checkpoint in native byte-order. The checkpoint is
+** nCkpt integers in size, not including any checksum. This function sets
+** the two checksum elements of the checkpoint accordingly.
+*/
+static void ckptAddChecksum(CkptBuffer *p, int nCkpt, int *pRc){
+ if( *pRc==LSM_OK ){
+ u32 aCksum[2] = {0, 0};
+ ckptChecksum(p->aCkpt, nCkpt+2, &aCksum[0], &aCksum[1]);
+ ckptSetValue(p, nCkpt, aCksum[0], pRc);
+ ckptSetValue(p, nCkpt+1, aCksum[1], pRc);
+ }
+}
+
+static void ckptAppend64(CkptBuffer *p, int *piOut, i64 iVal, int *pRc){
+ int iOut = *piOut;
+ ckptSetValue(p, iOut++, (iVal >> 32) & 0xFFFFFFFF, pRc);
+ ckptSetValue(p, iOut++, (iVal & 0xFFFFFFFF), pRc);
+ *piOut = iOut;
+}
+
+static i64 ckptRead64(u32 *a){
+ return (((i64)a[0]) << 32) + (i64)a[1];
+}
+
+static i64 ckptGobble64(u32 *a, int *piIn){
+ int iIn = *piIn;
+ *piIn += 2;
+ return ckptRead64(&a[iIn]);
+}
+
+
+/*
+** Append a 6-value segment record corresponding to pSeg to the checkpoint
+** buffer passed as the third argument.
+*/
+static void ckptExportSegment(
+ Segment *pSeg,
+ CkptBuffer *p,
+ int *piOut,
+ int *pRc
+){
+ ckptAppend64(p, piOut, pSeg->iFirst, pRc);
+ ckptAppend64(p, piOut, pSeg->iLastPg, pRc);
+ ckptAppend64(p, piOut, pSeg->iRoot, pRc);
+ ckptAppend64(p, piOut, pSeg->nSize, pRc);
+}
+
+static void ckptExportLevel(
+ Level *pLevel, /* Level object to serialize */
+ CkptBuffer *p, /* Append new level record to this ckpt */
+ int *piOut, /* IN/OUT: Size of checkpoint so far */
+ int *pRc /* IN/OUT: Error code */
+){
+ int iOut = *piOut;
+ Merge *pMerge;
+
+ pMerge = pLevel->pMerge;
+ ckptSetValue(p, iOut++, (u32)pLevel->iAge + (u32)(pLevel->flags<<16), pRc);
+ ckptSetValue(p, iOut++, pLevel->nRight, pRc);
+ ckptExportSegment(&pLevel->lhs, p, &iOut, pRc);
+
+ assert( (pLevel->nRight>0)==(pMerge!=0) );
+ if( pMerge ){
+ int i;
+ for(i=0; i<pLevel->nRight; i++){
+ ckptExportSegment(&pLevel->aRhs[i], p, &iOut, pRc);
+ }
+ assert( pMerge->nInput==pLevel->nRight
+ || pMerge->nInput==pLevel->nRight+1
+ );
+ ckptSetValue(p, iOut++, pMerge->nInput, pRc);
+ ckptSetValue(p, iOut++, pMerge->nSkip, pRc);
+ for(i=0; i<pMerge->nInput; i++){
+ ckptAppend64(p, &iOut, pMerge->aInput[i].iPg, pRc);
+ ckptSetValue(p, iOut++, pMerge->aInput[i].iCell, pRc);
+ }
+ ckptAppend64(p, &iOut, pMerge->splitkey.iPg, pRc);
+ ckptSetValue(p, iOut++, pMerge->splitkey.iCell, pRc);
+ ckptAppend64(p, &iOut, pMerge->iCurrentPtr, pRc);
+ }
+
+ *piOut = iOut;
+}
+
+/*
+** Populate the log offset fields of the checkpoint buffer. 4 values.
+*/
+static void ckptExportLog(
+ lsm_db *pDb,
+ int bFlush,
+ CkptBuffer *p,
+ int *piOut,
+ int *pRc
+){
+ int iOut = *piOut;
+
+ assert( iOut==CKPT_HDR_LO_MSW );
+
+ if( bFlush ){
+ i64 iOff = pDb->treehdr.iOldLog;
+ ckptAppend64(p, &iOut, iOff, pRc);
+ ckptSetValue(p, iOut++, pDb->treehdr.oldcksum0, pRc);
+ ckptSetValue(p, iOut++, pDb->treehdr.oldcksum1, pRc);
+ }else{
+ for(; iOut<=CKPT_HDR_LO_CKSUM2; iOut++){
+ ckptSetValue(p, iOut, pDb->pShmhdr->aSnap2[iOut], pRc);
+ }
+ }
+
+ assert( *pRc || iOut==CKPT_HDR_LO_CKSUM2+1 );
+ *piOut = iOut;
+}
+
+static void ckptExportAppendlist(
+ lsm_db *db, /* Database connection */
+ CkptBuffer *p, /* Checkpoint buffer to write to */
+ int *piOut, /* IN/OUT: Offset within checkpoint buffer */
+ int *pRc /* IN/OUT: Error code */
+){
+ int i;
+ LsmPgno *aiAppend = db->pWorker->aiAppend;
+
+ for(i=0; i<LSM_APPLIST_SZ; i++){
+ ckptAppend64(p, piOut, aiAppend[i], pRc);
+ }
+};
+
+static int ckptExportSnapshot(
+ lsm_db *pDb, /* Connection handle */
+ int bLog, /* True to update log-offset fields */
+ i64 iId, /* Checkpoint id */
+ int bCksum, /* If true, include checksums */
+ void **ppCkpt, /* OUT: Buffer containing checkpoint */
+ int *pnCkpt /* OUT: Size of checkpoint in bytes */
+){
+ int rc = LSM_OK; /* Return Code */
+ FileSystem *pFS = pDb->pFS; /* File system object */
+ Snapshot *pSnap = pDb->pWorker; /* Worker snapshot */
+ int nLevel = 0; /* Number of levels in checkpoint */
+ int iLevel; /* Used to count out nLevel levels */
+ int iOut = 0; /* Current offset in aCkpt[] */
+ Level *pLevel; /* Level iterator */
+ int i; /* Iterator used while serializing freelist */
+ CkptBuffer ckpt;
+
+ /* Initialize the output buffer */
+ memset(&ckpt, 0, sizeof(CkptBuffer));
+ ckpt.pEnv = pDb->pEnv;
+ iOut = CKPT_HDR_SIZE;
+
+ /* Write the log offset into the checkpoint. */
+ ckptExportLog(pDb, bLog, &ckpt, &iOut, &rc);
+
+ /* Write the append-point list */
+ ckptExportAppendlist(pDb, &ckpt, &iOut, &rc);
+
+ /* Figure out how many levels will be written to the checkpoint. */
+ for(pLevel=lsmDbSnapshotLevel(pSnap); pLevel; pLevel=pLevel->pNext) nLevel++;
+
+ /* Serialize nLevel levels. */
+ iLevel = 0;
+ for(pLevel=lsmDbSnapshotLevel(pSnap); iLevel<nLevel; pLevel=pLevel->pNext){
+ ckptExportLevel(pLevel, &ckpt, &iOut, &rc);
+ iLevel++;
+ }
+
+ /* Write the block-redirect list */
+ ckptSetValue(&ckpt, iOut++, pSnap->redirect.n, &rc);
+ for(i=0; i<pSnap->redirect.n; i++){
+ ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iFrom, &rc);
+ ckptSetValue(&ckpt, iOut++, pSnap->redirect.a[i].iTo, &rc);
+ }
+
+ /* Write the freelist */
+ assert( pSnap->freelist.nEntry<=pDb->nMaxFreelist );
+ if( rc==LSM_OK ){
+ int nFree = pSnap->freelist.nEntry;
+ ckptSetValue(&ckpt, iOut++, nFree, &rc);
+ for(i=0; i<nFree; i++){
+ FreelistEntry *p = &pSnap->freelist.aEntry[i];
+ ckptSetValue(&ckpt, iOut++, p->iBlk, &rc);
+ ckptSetValue(&ckpt, iOut++, (p->iId >> 32) & 0xFFFFFFFF, &rc);
+ ckptSetValue(&ckpt, iOut++, p->iId & 0xFFFFFFFF, &rc);
+ }
+ }
+
+ /* Write the checkpoint header */
+ assert( iId>=0 );
+ assert( pSnap->iCmpId==pDb->compress.iId
+ || pSnap->iCmpId==LSM_COMPRESSION_EMPTY
+ );
+ ckptSetValue(&ckpt, CKPT_HDR_ID_MSW, (u32)(iId>>32), &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_ID_LSW, (u32)(iId&0xFFFFFFFF), &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_NCKPT, iOut+2, &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_CMPID, pDb->compress.iId, &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_NBLOCK, pSnap->nBlock, &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_BLKSZ, lsmFsBlockSize(pFS), &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_NLEVEL, nLevel, &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_PGSZ, lsmFsPageSize(pFS), &rc);
+ ckptSetValue(&ckpt, CKPT_HDR_NWRITE, pSnap->nWrite, &rc);
+
+ if( bCksum ){
+ ckptAddChecksum(&ckpt, iOut, &rc);
+ }else{
+ ckptSetValue(&ckpt, iOut, 0, &rc);
+ ckptSetValue(&ckpt, iOut+1, 0, &rc);
+ }
+ iOut += 2;
+ assert( iOut<=1024 );
+
+#ifdef LSM_LOG_FREELIST
+ lsmLogMessage(pDb, rc,
+ "ckptExportSnapshot(): id=%lld freelist: %d", iId, pSnap->freelist.nEntry
+ );
+ for(i=0; i<pSnap->freelist.nEntry; i++){
+ lsmLogMessage(pDb, rc,
+ "ckptExportSnapshot(): iBlk=%d id=%lld",
+ pSnap->freelist.aEntry[i].iBlk,
+ pSnap->freelist.aEntry[i].iId
+ );
+ }
+#endif
+
+ *ppCkpt = (void *)ckpt.aCkpt;
+ if( pnCkpt ) *pnCkpt = sizeof(u32)*iOut;
+ return rc;
+}
+
+
+/*
+** Helper function for ckptImport().
+*/
+static void ckptNewSegment(
+ u32 *aIn,
+ int *piIn,
+ Segment *pSegment /* Populate this structure */
+){
+ assert( pSegment->iFirst==0 && pSegment->iLastPg==0 );
+ assert( pSegment->nSize==0 && pSegment->iRoot==0 );
+ pSegment->iFirst = ckptGobble64(aIn, piIn);
+ pSegment->iLastPg = ckptGobble64(aIn, piIn);
+ pSegment->iRoot = ckptGobble64(aIn, piIn);
+ pSegment->nSize = (int)ckptGobble64(aIn, piIn);
+ assert( pSegment->iFirst );
+}
+
+static int ckptSetupMerge(lsm_db *pDb, u32 *aInt, int *piIn, Level *pLevel){
+ Merge *pMerge; /* Allocated Merge object */
+ int nInput; /* Number of input segments in merge */
+ int iIn = *piIn; /* Next value to read from aInt[] */
+ int i; /* Iterator variable */
+ int nByte; /* Number of bytes to allocate */
+
+ /* Allocate the Merge object. If malloc() fails, return LSM_NOMEM. */
+ nInput = (int)aInt[iIn++];
+ nByte = sizeof(Merge) + sizeof(MergeInput) * nInput;
+ pMerge = (Merge *)lsmMallocZero(pDb->pEnv, nByte);
+ if( !pMerge ) return LSM_NOMEM_BKPT;
+ pLevel->pMerge = pMerge;
+
+ /* Populate the Merge object. */
+ pMerge->aInput = (MergeInput *)&pMerge[1];
+ pMerge->nInput = nInput;
+ pMerge->iOutputOff = -1;
+ pMerge->nSkip = (int)aInt[iIn++];
+ for(i=0; i<nInput; i++){
+ pMerge->aInput[i].iPg = ckptGobble64(aInt, &iIn);
+ pMerge->aInput[i].iCell = (int)aInt[iIn++];
+ }
+ pMerge->splitkey.iPg = ckptGobble64(aInt, &iIn);
+ pMerge->splitkey.iCell = (int)aInt[iIn++];
+ pMerge->iCurrentPtr = ckptGobble64(aInt, &iIn);
+
+ /* Set *piIn and return LSM_OK. */
+ *piIn = iIn;
+ return LSM_OK;
+}
+
+
+static int ckptLoadLevels(
+ lsm_db *pDb,
+ u32 *aIn,
+ int *piIn,
+ int nLevel,
+ Level **ppLevel
+){
+ int i;
+ int rc = LSM_OK;
+ Level *pRet = 0;
+ Level **ppNext;
+ int iIn = *piIn;
+
+ ppNext = &pRet;
+ for(i=0; rc==LSM_OK && i<nLevel; i++){
+ int iRight;
+ Level *pLevel;
+
+ /* Allocate space for the Level structure and Level.apRight[] array */
+ pLevel = (Level *)lsmMallocZeroRc(pDb->pEnv, sizeof(Level), &rc);
+ if( rc==LSM_OK ){
+ pLevel->iAge = (u16)(aIn[iIn] & 0x0000FFFF);
+ pLevel->flags = (u16)((aIn[iIn]>>16) & 0x0000FFFF);
+ iIn++;
+ pLevel->nRight = aIn[iIn++];
+ if( pLevel->nRight ){
+ int nByte = sizeof(Segment) * pLevel->nRight;
+ pLevel->aRhs = (Segment *)lsmMallocZeroRc(pDb->pEnv, nByte, &rc);
+ }
+ if( rc==LSM_OK ){
+ *ppNext = pLevel;
+ ppNext = &pLevel->pNext;
+
+ /* Allocate the main segment */
+ ckptNewSegment(aIn, &iIn, &pLevel->lhs);
+
+ /* Allocate each of the right-hand segments, if any */
+ for(iRight=0; iRight<pLevel->nRight; iRight++){
+ ckptNewSegment(aIn, &iIn, &pLevel->aRhs[iRight]);
+ }
+
+ /* Set up the Merge object, if required */
+ if( pLevel->nRight>0 ){
+ rc = ckptSetupMerge(pDb, aIn, &iIn, pLevel);
+ }
+ }
+ }
+ }
+
+ if( rc!=LSM_OK ){
+ /* An OOM must have occurred. Free any level structures allocated and
+ ** return the error to the caller. */
+ lsmSortedFreeLevel(pDb->pEnv, pRet);
+ pRet = 0;
+ }
+
+ *ppLevel = pRet;
+ *piIn = iIn;
+ return rc;
+}
+
+
+int lsmCheckpointLoadLevels(lsm_db *pDb, void *pVal, int nVal){
+ int rc = LSM_OK;
+ if( nVal>0 ){
+ u32 *aIn;
+
+ aIn = lsmMallocRc(pDb->pEnv, nVal, &rc);
+ if( aIn ){
+ Level *pLevel = 0;
+ Level *pParent;
+
+ int nIn;
+ int nLevel;
+ int iIn = 1;
+ memcpy(aIn, pVal, nVal);
+ nIn = nVal / sizeof(u32);
+
+ ckptChangeEndianness(aIn, nIn);
+ nLevel = aIn[0];
+ rc = ckptLoadLevels(pDb, aIn, &iIn, nLevel, &pLevel);
+ lsmFree(pDb->pEnv, aIn);
+ assert( rc==LSM_OK || pLevel==0 );
+ if( rc==LSM_OK ){
+ pParent = lsmDbSnapshotLevel(pDb->pWorker);
+ assert( pParent );
+ while( pParent->pNext ) pParent = pParent->pNext;
+ pParent->pNext = pLevel;
+ }
+ }
+ }
+
+ return rc;
+}
+
+/*
+** Return the data for the LEVELS record.
+**
+** The size of the checkpoint that can be stored in the database header
+** must not exceed 1024 32-bit integers. Normally, it does not. However,
+** if it does, part of the checkpoint must be stored in the LSM. This
+** routine returns that part.
+*/
+int lsmCheckpointLevels(
+ lsm_db *pDb, /* Database handle */
+ int nLevel, /* Number of levels to write to blob */
+ void **paVal, /* OUT: Pointer to LEVELS blob */
+ int *pnVal /* OUT: Size of LEVELS blob in bytes */
+){
+ Level *p; /* Used to iterate through levels */
+ int nAll= 0;
+ int rc;
+ int i;
+ int iOut;
+ CkptBuffer ckpt;
+ assert( nLevel>0 );
+
+ for(p=lsmDbSnapshotLevel(pDb->pWorker); p; p=p->pNext) nAll++;
+
+ assert( nAll>nLevel );
+ nAll -= nLevel;
+ for(p=lsmDbSnapshotLevel(pDb->pWorker); p && nAll>0; p=p->pNext) nAll--;
+
+ memset(&ckpt, 0, sizeof(CkptBuffer));
+ ckpt.pEnv = pDb->pEnv;
+
+ ckptSetValue(&ckpt, 0, nLevel, &rc);
+ iOut = 1;
+ for(i=0; rc==LSM_OK && i<nLevel; i++){
+ ckptExportLevel(p, &ckpt, &iOut, &rc);
+ p = p->pNext;
+ }
+ assert( rc!=LSM_OK || p==0 );
+
+ if( rc==LSM_OK ){
+ ckptChangeEndianness(ckpt.aCkpt, iOut);
+ *paVal = (void *)ckpt.aCkpt;
+ *pnVal = iOut * sizeof(u32);
+ }else{
+ *pnVal = 0;
+ *paVal = 0;
+ }
+
+ return rc;
+}
+
+/*
+** Read the checkpoint id from meta-page pPg.
+*/
+static i64 ckptLoadId(MetaPage *pPg){
+ i64 ret = 0;
+ if( pPg ){
+ int nData;
+ u8 *aData = lsmFsMetaPageData(pPg, &nData);
+ ret = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32) +
+ ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
+ }
+ return ret;
+}
+
+/*
+** Return true if the buffer passed as an argument contains a valid
+** checkpoint.
+*/
+static int ckptChecksumOk(u32 *aCkpt){
+ u32 nCkpt = aCkpt[CKPT_HDR_NCKPT];
+ u32 cksum1;
+ u32 cksum2;
+
+ if( nCkpt<CKPT_HDR_NCKPT || nCkpt>(LSM_META_RW_PAGE_SIZE)/sizeof(u32) ){
+ return 0;
+ }
+ ckptChecksum(aCkpt, nCkpt, &cksum1, &cksum2);
+ return (cksum1==aCkpt[nCkpt-2] && cksum2==aCkpt[nCkpt-1]);
+}
+
+/*
+** Attempt to load a checkpoint from meta page iMeta.
+**
+** This function is a no-op if *pRc is set to any value other than LSM_OK
+** when it is called. If an error occurs, *pRc is set to an LSM error code
+** before returning.
+**
+** If no error occurs and the checkpoint is successfully loaded, copy it to
+** ShmHeader.aSnap1[] and ShmHeader.aSnap2[], and set ShmHeader.iMetaPage
+** to indicate its origin. In this case return 1. Or, if the checkpoint
+** cannot be loaded (because the checksum does not compute), return 0.
+*/
+static int ckptTryLoad(lsm_db *pDb, MetaPage *pPg, u32 iMeta, int *pRc){
+ int bLoaded = 0; /* Return value */
+ if( *pRc==LSM_OK ){
+ int rc = LSM_OK; /* Error code */
+ u32 *aCkpt = 0; /* Pointer to buffer containing checkpoint */
+ u32 nCkpt; /* Number of elements in aCkpt[] */
+ int nData; /* Bytes of data in aData[] */
+ u8 *aData; /* Meta page data */
+
+ aData = lsmFsMetaPageData(pPg, &nData);
+ nCkpt = (u32)lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
+ if( nCkpt<=nData/sizeof(u32) && nCkpt>CKPT_HDR_NCKPT ){
+ aCkpt = (u32 *)lsmMallocRc(pDb->pEnv, nCkpt*sizeof(u32), &rc);
+ }
+ if( aCkpt ){
+ memcpy(aCkpt, aData, nCkpt*sizeof(u32));
+ ckptChangeEndianness(aCkpt, nCkpt);
+ if( ckptChecksumOk(aCkpt) ){
+ ShmHeader *pShm = pDb->pShmhdr;
+ memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
+ memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
+ memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
+ pShm->iMetaPage = iMeta;
+ bLoaded = 1;
+ }
+ }
+
+ lsmFree(pDb->pEnv, aCkpt);
+ *pRc = rc;
+ }
+ return bLoaded;
+}
+
+/*
+** Initialize the shared-memory header with an empty snapshot. This function
+** is called when no valid snapshot can be found in the database header.
+*/
+static void ckptLoadEmpty(lsm_db *pDb){
+ u32 aCkpt[] = {
+ 0, /* CKPT_HDR_ID_MSW */
+ 10, /* CKPT_HDR_ID_LSW */
+ 0, /* CKPT_HDR_NCKPT */
+ LSM_COMPRESSION_EMPTY, /* CKPT_HDR_CMPID */
+ 0, /* CKPT_HDR_NBLOCK */
+ 0, /* CKPT_HDR_BLKSZ */
+ 0, /* CKPT_HDR_NLEVEL */
+ 0, /* CKPT_HDR_PGSZ */
+ 0, /* CKPT_HDR_NWRITE */
+ 0, 0, 1234, 5678, /* The log pointer and initial checksum */
+ 0,0,0,0, 0,0,0,0, /* The append list */
+ 0, /* The redirected block list */
+ 0, /* The free block list */
+ 0, 0 /* Space for checksum values */
+ };
+ u32 nCkpt = array_size(aCkpt);
+ ShmHeader *pShm = pDb->pShmhdr;
+
+ aCkpt[CKPT_HDR_NCKPT] = nCkpt;
+ aCkpt[CKPT_HDR_BLKSZ] = pDb->nDfltBlksz;
+ aCkpt[CKPT_HDR_PGSZ] = pDb->nDfltPgsz;
+ ckptChecksum(aCkpt, array_size(aCkpt), &aCkpt[nCkpt-2], &aCkpt[nCkpt-1]);
+
+ memcpy(pShm->aSnap1, aCkpt, nCkpt*sizeof(u32));
+ memcpy(pShm->aSnap2, aCkpt, nCkpt*sizeof(u32));
+ memcpy(pDb->aSnapshot, aCkpt, nCkpt*sizeof(u32));
+}
+
+/*
+** This function is called as part of database recovery to initialize the
+** ShmHeader.aSnap1[] and ShmHeader.aSnap2[] snapshots.
+*/
+int lsmCheckpointRecover(lsm_db *pDb){
+ int rc = LSM_OK; /* Return Code */
+ i64 iId1; /* Id of checkpoint on meta-page 1 */
+ i64 iId2; /* Id of checkpoint on meta-page 2 */
+ int bLoaded = 0; /* True once checkpoint has been loaded */
+ int cmp; /* True if (iId2>iId1) */
+ MetaPage *apPg[2] = {0, 0}; /* Meta-pages 1 and 2 */
+
+ rc = lsmFsMetaPageGet(pDb->pFS, 0, 1, &apPg[0]);
+ if( rc==LSM_OK ) rc = lsmFsMetaPageGet(pDb->pFS, 0, 2, &apPg[1]);
+
+ iId1 = ckptLoadId(apPg[0]);
+ iId2 = ckptLoadId(apPg[1]);
+ cmp = (iId2 > iId1);
+ bLoaded = ckptTryLoad(pDb, apPg[cmp?1:0], (cmp?2:1), &rc);
+ if( bLoaded==0 ){
+ bLoaded = ckptTryLoad(pDb, apPg[cmp?0:1], (cmp?1:2), &rc);
+ }
+
+ /* The database does not contain a valid checkpoint. Initialize the shared
+ ** memory header with an empty checkpoint. */
+ if( bLoaded==0 ){
+ ckptLoadEmpty(pDb);
+ }
+
+ lsmFsMetaPageRelease(apPg[0]);
+ lsmFsMetaPageRelease(apPg[1]);
+
+ return rc;
+}
+
+/*
+** Store the snapshot in pDb->aSnapshot[] in meta-page iMeta.
+*/
+int lsmCheckpointStore(lsm_db *pDb, int iMeta){
+ MetaPage *pPg = 0;
+ int rc;
+
+ assert( iMeta==1 || iMeta==2 );
+ rc = lsmFsMetaPageGet(pDb->pFS, 1, iMeta, &pPg);
+ if( rc==LSM_OK ){
+ u8 *aData;
+ int nData;
+ int nCkpt;
+
+ nCkpt = (int)pDb->aSnapshot[CKPT_HDR_NCKPT];
+ aData = lsmFsMetaPageData(pPg, &nData);
+ memcpy(aData, pDb->aSnapshot, nCkpt*sizeof(u32));
+ ckptChangeEndianness((u32 *)aData, nCkpt);
+ rc = lsmFsMetaPageRelease(pPg);
+ }
+
+ return rc;
+}
+
+/*
+** Copy the current client snapshot from shared-memory to pDb->aSnapshot[].
+*/
+int lsmCheckpointLoad(lsm_db *pDb, int *piRead){
+ int nRem = LSM_ATTEMPTS_BEFORE_PROTOCOL;
+ ShmHeader *pShm = pDb->pShmhdr;
+ while( (nRem--)>0 ){
+ int nInt;
+
+ nInt = pShm->aSnap1[CKPT_HDR_NCKPT];
+ if( nInt<=(LSM_META_RW_PAGE_SIZE / sizeof(u32)) ){
+ memcpy(pDb->aSnapshot, pShm->aSnap1, nInt*sizeof(u32));
+ if( ckptChecksumOk(pDb->aSnapshot) ){
+ if( piRead ) *piRead = 1;
+ return LSM_OK;
+ }
+ }
+
+ nInt = pShm->aSnap2[CKPT_HDR_NCKPT];
+ if( nInt<=(LSM_META_RW_PAGE_SIZE / sizeof(u32)) ){
+ memcpy(pDb->aSnapshot, pShm->aSnap2, nInt*sizeof(u32));
+ if( ckptChecksumOk(pDb->aSnapshot) ){
+ if( piRead ) *piRead = 2;
+ return LSM_OK;
+ }
+ }
+
+ lsmShmBarrier(pDb);
+ }
+ return LSM_PROTOCOL_BKPT;
+}
+
+int lsmInfoCompressionId(lsm_db *db, u32 *piCmpId){
+ int rc;
+
+ assert( db->pClient==0 && db->pWorker==0 );
+ rc = lsmCheckpointLoad(db, 0);
+ if( rc==LSM_OK ){
+ *piCmpId = db->aSnapshot[CKPT_HDR_CMPID];
+ }
+
+ return rc;
+}
+
+int lsmCheckpointLoadOk(lsm_db *pDb, int iSnap){
+ u32 *aShm;
+ assert( iSnap==1 || iSnap==2 );
+ aShm = (iSnap==1) ? pDb->pShmhdr->aSnap1 : pDb->pShmhdr->aSnap2;
+ return (lsmCheckpointId(pDb->aSnapshot, 0)==lsmCheckpointId(aShm, 0) );
+}
+
+int lsmCheckpointClientCacheOk(lsm_db *pDb){
+ return ( pDb->pClient
+ && pDb->pClient->iId==lsmCheckpointId(pDb->aSnapshot, 0)
+ && pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap1, 0)
+ && pDb->pClient->iId==lsmCheckpointId(pDb->pShmhdr->aSnap2, 0)
+ );
+}
+
+int lsmCheckpointLoadWorker(lsm_db *pDb){
+ int rc;
+ ShmHeader *pShm = pDb->pShmhdr;
+ int nInt1;
+ int nInt2;
+
+ /* Must be holding the WORKER lock to do this. Or DMS2. */
+ assert(
+ lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL)
+ || lsmShmAssertLock(pDb, LSM_LOCK_DMS1, LSM_LOCK_EXCL)
+ );
+
+ /* Check that the two snapshots match. If not, repair them. */
+ nInt1 = pShm->aSnap1[CKPT_HDR_NCKPT];
+ nInt2 = pShm->aSnap2[CKPT_HDR_NCKPT];
+ if( nInt1!=nInt2 || memcmp(pShm->aSnap1, pShm->aSnap2, nInt2*sizeof(u32)) ){
+ if( ckptChecksumOk(pShm->aSnap1) ){
+ memcpy(pShm->aSnap2, pShm->aSnap1, sizeof(u32)*nInt1);
+ }else if( ckptChecksumOk(pShm->aSnap2) ){
+ memcpy(pShm->aSnap1, pShm->aSnap2, sizeof(u32)*nInt2);
+ }else{
+ return LSM_PROTOCOL_BKPT;
+ }
+ }
+
+ rc = lsmCheckpointDeserialize(pDb, 1, pShm->aSnap1, &pDb->pWorker);
+ if( pDb->pWorker ) pDb->pWorker->pDatabase = pDb->pDatabase;
+
+ if( rc==LSM_OK ){
+ rc = lsmCheckCompressionId(pDb, pDb->pWorker->iCmpId);
+ }
+
+#if 0
+ assert( rc!=LSM_OK || lsmFsIntegrityCheck(pDb) );
+#endif
+ return rc;
+}
+
+int lsmCheckpointDeserialize(
+ lsm_db *pDb,
+ int bInclFreelist, /* If true, deserialize free-list */
+ u32 *aCkpt,
+ Snapshot **ppSnap
+){
+ int rc = LSM_OK;
+ Snapshot *pNew;
+
+ pNew = (Snapshot *)lsmMallocZeroRc(pDb->pEnv, sizeof(Snapshot), &rc);
+ if( rc==LSM_OK ){
+ Level *pLvl;
+ int nFree;
+ int i;
+ int nLevel = (int)aCkpt[CKPT_HDR_NLEVEL];
+ int iIn = CKPT_HDR_SIZE + CKPT_APPENDLIST_SIZE + CKPT_LOGPTR_SIZE;
+
+ pNew->iId = lsmCheckpointId(aCkpt, 0);
+ pNew->nBlock = aCkpt[CKPT_HDR_NBLOCK];
+ pNew->nWrite = aCkpt[CKPT_HDR_NWRITE];
+ rc = ckptLoadLevels(pDb, aCkpt, &iIn, nLevel, &pNew->pLevel);
+ pNew->iLogOff = lsmCheckpointLogOffset(aCkpt);
+ pNew->iCmpId = aCkpt[CKPT_HDR_CMPID];
+
+ /* Make a copy of the append-list */
+ for(i=0; i<LSM_APPLIST_SZ; i++){
+ u32 *a = &aCkpt[CKPT_HDR_SIZE + CKPT_LOGPTR_SIZE + i*2];
+ pNew->aiAppend[i] = ckptRead64(a);
+ }
+
+ /* Read the block-redirect list */
+ pNew->redirect.n = aCkpt[iIn++];
+ if( pNew->redirect.n ){
+ pNew->redirect.a = lsmMallocZeroRc(pDb->pEnv,
+ (sizeof(struct RedirectEntry) * LSM_MAX_BLOCK_REDIRECTS), &rc
+ );
+ if( rc==LSM_OK ){
+ for(i=0; i<pNew->redirect.n; i++){
+ pNew->redirect.a[i].iFrom = aCkpt[iIn++];
+ pNew->redirect.a[i].iTo = aCkpt[iIn++];
+ }
+ }
+ for(pLvl=pNew->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
+ if( pLvl->nRight ){
+ pLvl->aRhs[pLvl->nRight-1].pRedirect = &pNew->redirect;
+ }else{
+ pLvl->lhs.pRedirect = &pNew->redirect;
+ }
+ }
+
+ /* Copy the free-list */
+ if( rc==LSM_OK && bInclFreelist ){
+ nFree = aCkpt[iIn++];
+ if( nFree ){
+ pNew->freelist.aEntry = (FreelistEntry *)lsmMallocZeroRc(
+ pDb->pEnv, sizeof(FreelistEntry)*nFree, &rc
+ );
+ if( rc==LSM_OK ){
+ int j;
+ for(j=0; j<nFree; j++){
+ FreelistEntry *p = &pNew->freelist.aEntry[j];
+ p->iBlk = aCkpt[iIn++];
+ p->iId = ((i64)(aCkpt[iIn])<<32) + aCkpt[iIn+1];
+ iIn += 2;
+ }
+ pNew->freelist.nEntry = pNew->freelist.nAlloc = nFree;
+ }
+ }
+ }
+ }
+
+ if( rc!=LSM_OK ){
+ lsmFreeSnapshot(pDb->pEnv, pNew);
+ pNew = 0;
+ }
+
+ *ppSnap = pNew;
+ return rc;
+}
+
+/*
+** Connection pDb must be the worker connection in order to call this
+** function. It returns true if the database already contains the maximum
+** number of levels or false otherwise.
+**
+** This is used when flushing the in-memory tree to disk. If the database
+** is already full, then the caller should invoke lsm_work() or similar
+** until it is not full before creating a new level by flushing the in-memory
+** tree to disk. Limiting the number of levels in the database ensures that
+** the records describing them always fit within the checkpoint blob.
+*/
+int lsmDatabaseFull(lsm_db *pDb){
+ Level *p;
+ int nRhs = 0;
+
+ assert( lsmShmAssertLock(pDb, LSM_LOCK_WORKER, LSM_LOCK_EXCL) );
+ assert( pDb->pWorker );
+
+ for(p=pDb->pWorker->pLevel; p; p=p->pNext){
+ nRhs += (p->nRight ? p->nRight : 1);
+ }
+
+ return (nRhs >= LSM_MAX_RHS_SEGMENTS);
+}
+
+/*
+** The connection passed as the only argument is currently the worker
+** connection. Some work has been performed on the database by the connection,
+** but no new snapshot has been written into shared memory.
+**
+** This function updates the shared-memory worker and client snapshots with
+** the new snapshot produced by the work performed by pDb.
+**
+** If successful, LSM_OK is returned. Otherwise, if an error occurs, an LSM
+** error code is returned.
+*/
+int lsmCheckpointSaveWorker(lsm_db *pDb, int bFlush){
+ Snapshot *pSnap = pDb->pWorker;
+ ShmHeader *pShm = pDb->pShmhdr;
+ void *p = 0;
+ int n = 0;
+ int rc;
+
+ pSnap->iId++;
+ rc = ckptExportSnapshot(pDb, bFlush, pSnap->iId, 1, &p, &n);
+ if( rc!=LSM_OK ) return rc;
+ assert( ckptChecksumOk((u32 *)p) );
+
+ assert( n<=LSM_META_RW_PAGE_SIZE );
+ memcpy(pShm->aSnap2, p, n);
+ lsmShmBarrier(pDb);
+ memcpy(pShm->aSnap1, p, n);
+ lsmFree(pDb->pEnv, p);
+
+ /* assert( lsmFsIntegrityCheck(pDb) ); */
+ return LSM_OK;
+}
+
+/*
+** This function is used to determine the snapshot-id of the most recently
+** checkpointed snapshot. Variable ShmHeader.iMetaPage indicates which of
+** the two meta-pages said snapshot resides on (if any).
+**
+** If successful, this function loads the snapshot from the meta-page,
+** verifies its checksum and sets *piId to the snapshot-id before returning
+** LSM_OK. Or, if the checksum attempt fails, *piId is set to zero and
+** LSM_OK returned. If an error occurs, an LSM error code is returned and
+** the final value of *piId is undefined.
+*/
+int lsmCheckpointSynced(lsm_db *pDb, i64 *piId, i64 *piLog, u32 *pnWrite){
+ int rc = LSM_OK;
+ MetaPage *pPg;
+ u32 iMeta;
+
+ iMeta = pDb->pShmhdr->iMetaPage;
+ if( iMeta==1 || iMeta==2 ){
+ rc = lsmFsMetaPageGet(pDb->pFS, 0, iMeta, &pPg);
+ if( rc==LSM_OK ){
+ int nCkpt;
+ int nData;
+ u8 *aData;
+
+ aData = lsmFsMetaPageData(pPg, &nData);
+ assert( nData==LSM_META_RW_PAGE_SIZE );
+ nCkpt = lsmGetU32(&aData[CKPT_HDR_NCKPT*sizeof(u32)]);
+ if( nCkpt<(LSM_META_RW_PAGE_SIZE/sizeof(u32)) ){
+ u32 *aCopy = lsmMallocRc(pDb->pEnv, sizeof(u32) * nCkpt, &rc);
+ if( aCopy ){
+ memcpy(aCopy, aData, nCkpt*sizeof(u32));
+ ckptChangeEndianness(aCopy, nCkpt);
+ if( ckptChecksumOk(aCopy) ){
+ if( piId ) *piId = lsmCheckpointId(aCopy, 0);
+ if( piLog ) *piLog = (lsmCheckpointLogOffset(aCopy) >> 1);
+ if( pnWrite ) *pnWrite = aCopy[CKPT_HDR_NWRITE];
+ }
+ lsmFree(pDb->pEnv, aCopy);
+ }
+ }
+ lsmFsMetaPageRelease(pPg);
+ }
+ }
+
+ if( (iMeta!=1 && iMeta!=2) || rc!=LSM_OK || pDb->pShmhdr->iMetaPage!=iMeta ){
+ if( piId ) *piId = 0;
+ if( piLog ) *piLog = 0;
+ if( pnWrite ) *pnWrite = 0;
+ }
+ return rc;
+}
+
+/*
+** Return the checkpoint-id of the checkpoint array passed as the first
+** argument to this function. If the second argument is true, then assume
+** that the checkpoint is made up of 32-bit big-endian integers. If it
+** is false, assume that the integers are in machine byte order.
+*/
+i64 lsmCheckpointId(u32 *aCkpt, int bDisk){
+ i64 iId;
+ if( bDisk ){
+ u8 *aData = (u8 *)aCkpt;
+ iId = (((i64)lsmGetU32(&aData[CKPT_HDR_ID_MSW*4])) << 32);
+ iId += ((i64)lsmGetU32(&aData[CKPT_HDR_ID_LSW*4]));
+ }else{
+ iId = ((i64)aCkpt[CKPT_HDR_ID_MSW] << 32) + (i64)aCkpt[CKPT_HDR_ID_LSW];
+ }
+ return iId;
+}
+
+u32 lsmCheckpointNBlock(u32 *aCkpt){
+ return aCkpt[CKPT_HDR_NBLOCK];
+}
+
+u32 lsmCheckpointNWrite(u32 *aCkpt, int bDisk){
+ if( bDisk ){
+ return lsmGetU32((u8 *)&aCkpt[CKPT_HDR_NWRITE]);
+ }else{
+ return aCkpt[CKPT_HDR_NWRITE];
+ }
+}
+
+i64 lsmCheckpointLogOffset(u32 *aCkpt){
+ return ((i64)aCkpt[CKPT_HDR_LO_MSW] << 32) + (i64)aCkpt[CKPT_HDR_LO_LSW];
+}
+
+int lsmCheckpointPgsz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_PGSZ]; }
+
+int lsmCheckpointBlksz(u32 *aCkpt){ return (int)aCkpt[CKPT_HDR_BLKSZ]; }
+
+void lsmCheckpointLogoffset(
+ u32 *aCkpt,
+ DbLog *pLog
+){
+ pLog->aRegion[2].iStart = (lsmCheckpointLogOffset(aCkpt) >> 1);
+
+ pLog->cksum0 = aCkpt[CKPT_HDR_LO_CKSUM1];
+ pLog->cksum1 = aCkpt[CKPT_HDR_LO_CKSUM2];
+ pLog->iSnapshotId = lsmCheckpointId(aCkpt, 0);
+}
+
+void lsmCheckpointZeroLogoffset(lsm_db *pDb){
+ u32 nCkpt;
+
+ nCkpt = pDb->aSnapshot[CKPT_HDR_NCKPT];
+ assert( nCkpt>CKPT_HDR_NCKPT );
+ assert( nCkpt==pDb->pShmhdr->aSnap1[CKPT_HDR_NCKPT] );
+ assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap1, nCkpt*sizeof(u32)) );
+ assert( 0==memcmp(pDb->aSnapshot, pDb->pShmhdr->aSnap2, nCkpt*sizeof(u32)) );
+
+ pDb->aSnapshot[CKPT_HDR_LO_MSW] = 0;
+ pDb->aSnapshot[CKPT_HDR_LO_LSW] = 0;
+ ckptChecksum(pDb->aSnapshot, nCkpt,
+ &pDb->aSnapshot[nCkpt-2], &pDb->aSnapshot[nCkpt-1]
+ );
+
+ memcpy(pDb->pShmhdr->aSnap1, pDb->aSnapshot, nCkpt*sizeof(u32));
+ memcpy(pDb->pShmhdr->aSnap2, pDb->aSnapshot, nCkpt*sizeof(u32));
+}
+
+/*
+** Set the output variable to the number of KB of data written into the
+** database file since the most recent checkpoint.
+*/
+int lsmCheckpointSize(lsm_db *db, int *pnKB){
+ int rc = LSM_OK;
+ u32 nSynced;
+
+ /* Set nSynced to the number of pages that had been written when the
+ ** database was last checkpointed. */
+ rc = lsmCheckpointSynced(db, 0, 0, &nSynced);
+
+ if( rc==LSM_OK ){
+ u32 nPgsz = db->pShmhdr->aSnap1[CKPT_HDR_PGSZ];
+ u32 nWrite = db->pShmhdr->aSnap1[CKPT_HDR_NWRITE];
+ *pnKB = (int)(( ((i64)(nWrite - nSynced) * nPgsz) + 1023) / 1024);
+ }
+
+ return rc;
+}