summaryrefslogtreecommitdiffstats
path: root/ext/lsm1/lsm_log.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/lsm1/lsm_log.c')
-rw-r--r--ext/lsm1/lsm_log.c1156
1 files changed, 1156 insertions, 0 deletions
diff --git a/ext/lsm1/lsm_log.c b/ext/lsm1/lsm_log.c
new file mode 100644
index 0000000..a66e40b
--- /dev/null
+++ b/ext/lsm1/lsm_log.c
@@ -0,0 +1,1156 @@
+/*
+** 2011-08-13
+**
+** The author disclaims copyright to this source code. In place of
+** a legal notice, here is a blessing:
+**
+** May you do good and not evil.
+** May you find forgiveness for yourself and forgive others.
+** May you share freely, never taking more than you give.
+**
+*************************************************************************
+**
+** This file contains the implementation of LSM database logging. Logging
+** has one purpose in LSM - to make transactions durable.
+**
+** When data is written to an LSM database, it is initially stored in an
+** in-memory tree structure. Since this structure is in volatile memory,
+** if a power failure or application crash occurs it may be lost. To
+** prevent loss of data in this case, each time a record is written to the
+** in-memory tree an equivalent record is appended to the log on disk.
+** If a power failure or application crash does occur, data can be recovered
+** by reading the log.
+**
+** A log file consists of the following types of records representing data
+** written into the database:
+**
+** LOG_WRITE: A key-value pair written to the database.
+** LOG_DELETE: A delete key issued to the database.
+** LOG_COMMIT: A transaction commit.
+**
+** And the following types of records for ancillary purposes..
+**
+** LOG_EOF: A record indicating the end of a log file.
+** LOG_PAD1: A single byte padding record.
+** LOG_PAD2: An N byte padding record (N>1).
+** LOG_JUMP: A pointer to another offset within the log file.
+**
+** Each transaction written to the log contains one or more LOG_WRITE and/or
+** LOG_DELETE records, followed by a LOG_COMMIT record. The LOG_COMMIT record
+** contains an 8-byte checksum based on all previous data written to the
+** log file.
+**
+** LOG CHECKSUMS & RECOVERY
+**
+** Checksums are found in two types of log records: LOG_COMMIT and
+** LOG_CKSUM records. In order to recover content from a log, a client
+** reads each record from the start of the log, calculating a checksum as
+** it does. Each time a LOG_COMMIT or LOG_CKSUM is encountered, the
+** recovery process verifies that the checksum stored in the log
+** matches the calculated checksum. If it does not, the recovery process
+** can stop reading the log.
+**
+** If a recovery process reads records (other than COMMIT or CKSUM)
+** consisting of at least LSM_CKSUM_MAXDATA bytes, then the next record in
+** the log must be either a LOG_CKSUM or LOG_COMMIT record. If it is
+** not, the recovery process also stops reading the log.
+**
+** To recover the log file, it must be read twice. The first time to
+** determine the location of the last valid commit record. And the second
+** time to load data into the in-memory tree.
+**
+** Todo: Surely there is a better way...
+**
+** LOG WRAPPING
+**
+** If the log file were never deleted or wrapped, it would be possible to
+** read it from start to end each time is required recovery (i.e each time
+** the number of database clients changes from 0 to 1). Effectively reading
+** the entire history of the database each time. This would quickly become
+** inefficient. Additionally, since the log file would grow without bound,
+** it wastes storage space.
+**
+** Instead, part of each checkpoint written into the database file contains
+** a log offset (and other information required to read the log starting at
+** at this offset) at which to begin recovery. Offset $O.
+**
+** Once a checkpoint has been written and synced into the database file, it
+** is guaranteed that no recovery process will need to read any data before
+** offset $O of the log file. It is therefore safe to begin overwriting
+** any data that occurs before offset $O.
+**
+** This implementation separates the log into three regions mapped into
+** the log file - regions 0, 1 and 2. During recovery, regions are read
+** in ascending order (i.e. 0, then 1, then 2). Each region is zero or
+** more bytes in size.
+**
+** |---1---|..|--0--|.|--2--|....
+**
+** New records are always appended to the end of region 2.
+**
+** Initially (when it is empty), all three regions are zero bytes in size.
+** Each of them are located at the beginning of the file. As records are
+** added to the log, region 2 grows, so that the log consists of a zero
+** byte region 1, followed by a zero byte region 0, followed by an N byte
+** region 2. After one or more checkpoints have been written to disk,
+** the start point of region 2 is moved to $O. For example:
+**
+** A) ||.........|--2--|....
+**
+** (both regions 0 and 1 are 0 bytes in size at offset 0).
+**
+** Eventually, the log wraps around to write new records into the start.
+** At this point, region 2 is renamed to region 0. Region 0 is renamed
+** to region 2. After appending a few records to the new region 2, the
+** log file looks like this:
+**
+** B) ||--2--|...|--0--|....
+**
+** (region 1 is still 0 bytes in size, located at offset 0).
+**
+** Any checkpoints made at this point may reduce the size of region 0.
+** However, if they do not, and region 2 expands so that it is about to
+** overwrite the start of region 0, then region 2 is renamed to region 1,
+** and a new region 2 created at the end of the file following the existing
+** region 0.
+**
+** C) |---1---|..|--0--|.|-2-|
+**
+** In this state records are appended to region 2 until checkpoints have
+** contracted regions 0 AND 1 UNTil they are both zero bytes in size. They
+** are then shifted to the start of the log file, leaving the system in
+** the equivalent of state A above.
+**
+** Alternatively, state B may transition directly to state A if the size
+** of region 0 is reduced to zero bytes before region 2 threatens to
+** encroach upon it.
+**
+** LOG_PAD1 & LOG_PAD2 RECORDS
+**
+** PAD1 and PAD2 records may appear in a log file at any point. They allow
+** a process writing the log file align the beginning of transactions with
+** the beginning of disk sectors, which increases robustness.
+**
+** RECORD FORMATS:
+**
+** LOG_EOF: * A single 0x00 byte.
+**
+** LOG_PAD1: * A single 0x01 byte.
+**
+** LOG_PAD2: * A single 0x02 byte, followed by
+** * The number of unused bytes (N) as a varint,
+** * An N byte block of unused space.
+**
+** LOG_COMMIT: * A single 0x03 byte.
+** * An 8-byte checksum.
+**
+** LOG_JUMP: * A single 0x04 byte.
+** * Absolute file offset to jump to, encoded as a varint.
+**
+** LOG_WRITE: * A single 0x06 or 0x07 byte,
+** * The number of bytes in the key, encoded as a varint,
+** * The number of bytes in the value, encoded as a varint,
+** * If the first byte was 0x07, an 8 byte checksum.
+** * The key data,
+** * The value data.
+**
+** LOG_DELETE: * A single 0x08 or 0x09 byte,
+** * The number of bytes in the key, encoded as a varint,
+** * If the first byte was 0x09, an 8 byte checksum.
+** * The key data.
+**
+** Varints are as described in lsm_varint.c (SQLite 4 format).
+**
+** CHECKSUMS:
+**
+** The checksum is calculated using two 32-bit unsigned integers, s0 and
+** s1. The initial value for both is 42. It is updated each time a record
+** is written into the log file by treating the encoded (binary) record as
+** an array of 32-bit little-endian integers. Then, if x[] is the integer
+** array, updating the checksum accumulators as follows:
+**
+** for i from 0 to n-1 step 2:
+** s0 += x[i] + s1;
+** s1 += x[i+1] + s0;
+** endfor
+**
+** If the record is not an even multiple of 8-bytes in size it is padded
+** with zeroes to make it so before the checksum is updated.
+**
+** The checksum stored in a COMMIT, WRITE or DELETE is based on all bytes
+** up to the start of the 8-byte checksum itself, including the COMMIT,
+** WRITE or DELETE fields that appear before the checksum in the record.
+**
+** VARINT FORMAT
+**
+** See lsm_varint.c.
+*/
+
+#ifndef _LSM_INT_H
+# include "lsmInt.h"
+#endif
+
+/* Log record types */
+#define LSM_LOG_EOF 0x00
+#define LSM_LOG_PAD1 0x01
+#define LSM_LOG_PAD2 0x02
+#define LSM_LOG_COMMIT 0x03
+#define LSM_LOG_JUMP 0x04
+
+#define LSM_LOG_WRITE 0x06
+#define LSM_LOG_WRITE_CKSUM 0x07
+
+#define LSM_LOG_DELETE 0x08
+#define LSM_LOG_DELETE_CKSUM 0x09
+
+#define LSM_LOG_DRANGE 0x0A
+#define LSM_LOG_DRANGE_CKSUM 0x0B
+
+/* Require a checksum every 32KB. */
+#define LSM_CKSUM_MAXDATA (32*1024)
+
+/* Do not wrap a log file smaller than this in bytes. */
+#define LSM_MIN_LOGWRAP (128*1024)
+
+/*
+** szSector:
+** Commit records must be aligned to end on szSector boundaries. If
+** the safety-mode is set to NORMAL or OFF, this value is 1. Otherwise,
+** if the safety-mode is set to FULL, it is the size of the file-system
+** sectors as reported by lsmFsSectorSize().
+*/
+struct LogWriter {
+ u32 cksum0; /* Checksum 0 at offset iOff */
+ u32 cksum1; /* Checksum 1 at offset iOff */
+ int iCksumBuf; /* Bytes of buf that have been checksummed */
+ i64 iOff; /* Offset at start of buffer buf */
+ int szSector; /* Sector size for this transaction */
+ LogRegion jump; /* Avoid writing to this region */
+ i64 iRegion1End; /* End of first region written by trans */
+ i64 iRegion2Start; /* Start of second regions written by trans */
+ LsmString buf; /* Buffer containing data not yet written */
+};
+
+/*
+** Return the result of interpreting the first 4 bytes in buffer aIn as
+** a 32-bit unsigned little-endian integer.
+*/
+static u32 getU32le(u8 *aIn){
+ return ((u32)aIn[3] << 24)
+ + ((u32)aIn[2] << 16)
+ + ((u32)aIn[1] << 8)
+ + ((u32)aIn[0]);
+}
+
+
+/*
+** This function is the same as logCksum(), except that pointer "a" need
+** not be aligned to an 8-byte boundary or padded with zero bytes. This
+** version is slower, but sometimes more convenient to use.
+*/
+static void logCksumUnaligned(
+ char *z, /* Input buffer */
+ int n, /* Size of input buffer in bytes */
+ u32 *pCksum0, /* IN/OUT: Checksum value 1 */
+ u32 *pCksum1 /* IN/OUT: Checksum value 2 */
+){
+ u8 *a = (u8 *)z;
+ u32 cksum0 = *pCksum0;
+ u32 cksum1 = *pCksum1;
+ int nIn = (n/8) * 8;
+ int i;
+
+ assert( n>0 );
+ for(i=0; i<nIn; i+=8){
+ cksum0 += getU32le(&a[i]) + cksum1;
+ cksum1 += getU32le(&a[i+4]) + cksum0;
+ }
+
+ if( nIn!=n ){
+ u8 aBuf[8] = {0, 0, 0, 0, 0, 0, 0, 0};
+ assert( (n-nIn)<8 && n>nIn );
+ memcpy(aBuf, &a[nIn], n-nIn);
+ cksum0 += getU32le(aBuf) + cksum1;
+ cksum1 += getU32le(&aBuf[4]) + cksum0;
+ }
+
+ *pCksum0 = cksum0;
+ *pCksum1 = cksum1;
+}
+
+/*
+** Update pLog->cksum0 and pLog->cksum1 so that the first nBuf bytes in the
+** write buffer (pLog->buf) are included in the checksum.
+*/
+static void logUpdateCksum(LogWriter *pLog, int nBuf){
+ assert( (pLog->iCksumBuf % 8)==0 );
+ assert( pLog->iCksumBuf<=nBuf );
+ assert( (nBuf % 8)==0 || nBuf==pLog->buf.n );
+ if( nBuf>pLog->iCksumBuf ){
+ logCksumUnaligned(
+ &pLog->buf.z[pLog->iCksumBuf], nBuf-pLog->iCksumBuf,
+ &pLog->cksum0, &pLog->cksum1
+ );
+ }
+ pLog->iCksumBuf = nBuf;
+}
+
+static i64 firstByteOnSector(LogWriter *pLog, i64 iOff){
+ return (iOff / pLog->szSector) * pLog->szSector;
+}
+static i64 lastByteOnSector(LogWriter *pLog, i64 iOff){
+ return firstByteOnSector(pLog, iOff) + pLog->szSector - 1;
+}
+
+/*
+** If possible, reclaim log file space. Log file space is reclaimed after
+** a snapshot that points to the same data in the database file is synced
+** into the db header.
+*/
+static int logReclaimSpace(lsm_db *pDb){
+ int rc;
+ int iMeta;
+ int bRotrans; /* True if there exists some ro-trans */
+
+ /* Test if there exists some other connection with a read-only transaction
+ ** open. If there does, then log file space may not be reclaimed. */
+ rc = lsmDetectRoTrans(pDb, &bRotrans);
+ if( rc!=LSM_OK || bRotrans ) return rc;
+
+ iMeta = (int)pDb->pShmhdr->iMetaPage;
+ if( iMeta==1 || iMeta==2 ){
+ DbLog *pLog = &pDb->treehdr.log;
+ i64 iSyncedId;
+
+ /* Read the snapshot-id of the snapshot stored on meta-page iMeta. Note
+ ** that in theory, the value read is untrustworthy (due to a race
+ ** condition - see comments above lsmFsReadSyncedId()). So it is only
+ ** ever used to conclude that no log space can be reclaimed. If it seems
+ ** to indicate that it may be possible to reclaim log space, a
+ ** second call to lsmCheckpointSynced() (which does return trustworthy
+ ** values) is made below to confirm. */
+ rc = lsmFsReadSyncedId(pDb, iMeta, &iSyncedId);
+
+ if( rc==LSM_OK && pLog->iSnapshotId!=iSyncedId ){
+ i64 iSnapshotId = 0;
+ i64 iOff = 0;
+ rc = lsmCheckpointSynced(pDb, &iSnapshotId, &iOff, 0);
+ if( rc==LSM_OK && pLog->iSnapshotId<iSnapshotId ){
+ int iRegion;
+ for(iRegion=0; iRegion<3; iRegion++){
+ LogRegion *p = &pLog->aRegion[iRegion];
+ if( iOff>=p->iStart && iOff<=p->iEnd ) break;
+ p->iStart = 0;
+ p->iEnd = 0;
+ }
+ assert( iRegion<3 );
+ pLog->aRegion[iRegion].iStart = iOff;
+ pLog->iSnapshotId = iSnapshotId;
+ }
+ }
+ }
+ return rc;
+}
+
+/*
+** This function is called when a write-transaction is first opened. It
+** is assumed that the caller is holding the client-mutex when it is
+** called.
+**
+** Before returning, this function allocates the LogWriter object that
+** will be used to write to the log file during the write transaction.
+** LSM_OK is returned if no error occurs, otherwise an LSM error code.
+*/
+int lsmLogBegin(lsm_db *pDb){
+ int rc = LSM_OK;
+ LogWriter *pNew;
+ LogRegion *aReg;
+
+ if( pDb->bUseLog==0 ) return LSM_OK;
+
+ /* If the log file has not yet been opened, open it now. Also allocate
+ ** the LogWriter structure, if it has not already been allocated. */
+ rc = lsmFsOpenLog(pDb, 0);
+ if( pDb->pLogWriter==0 ){
+ pNew = lsmMallocZeroRc(pDb->pEnv, sizeof(LogWriter), &rc);
+ if( pNew ){
+ lsmStringInit(&pNew->buf, pDb->pEnv);
+ rc = lsmStringExtend(&pNew->buf, 2);
+ }
+ pDb->pLogWriter = pNew;
+ }else{
+ pNew = pDb->pLogWriter;
+ assert( (u8 *)(&pNew[1])==(u8 *)(&((&pNew->buf)[1])) );
+ memset(pNew, 0, ((u8 *)&pNew->buf) - (u8 *)pNew);
+ pNew->buf.n = 0;
+ }
+
+ if( rc==LSM_OK ){
+ /* The following call detects whether or not a new snapshot has been
+ ** synced into the database file. If so, it updates the contents of
+ ** the pDb->treehdr.log structure to reclaim any space in the log
+ ** file that is no longer required.
+ **
+ ** TODO: Calling this every transaction is overkill. And since the
+ ** call has to read and checksum a snapshot from the database file,
+ ** it is expensive. It would be better to figure out a way so that
+ ** this is only called occasionally - say for every 32KB written to
+ ** the log file.
+ */
+ rc = logReclaimSpace(pDb);
+ }
+ if( rc!=LSM_OK ){
+ lsmLogClose(pDb);
+ return rc;
+ }
+
+ /* Set the effective sector-size for this transaction. Sectors are assumed
+ ** to be one byte in size if the safety-mode is OFF or NORMAL, or as
+ ** reported by lsmFsSectorSize if it is FULL. */
+ if( pDb->eSafety==LSM_SAFETY_FULL ){
+ pNew->szSector = lsmFsSectorSize(pDb->pFS);
+ assert( pNew->szSector>0 );
+ }else{
+ pNew->szSector = 1;
+ }
+
+ /* There are now three scenarios:
+ **
+ ** 1) Regions 0 and 1 are both zero bytes in size and region 2 begins
+ ** at a file offset greater than LSM_MIN_LOGWRAP. In this case, wrap
+ ** around to the start and write data into the start of the log file.
+ **
+ ** 2) Region 1 is zero bytes in size and region 2 occurs earlier in the
+ ** file than region 0. In this case, append data to region 2, but
+ ** remember to jump over region 1 if required.
+ **
+ ** 3) Region 2 is the last in the file. Append to it.
+ */
+ aReg = &pDb->treehdr.log.aRegion[0];
+
+ assert( aReg[0].iEnd==0 || aReg[0].iEnd>aReg[0].iStart );
+ assert( aReg[1].iEnd==0 || aReg[1].iEnd>aReg[1].iStart );
+
+ pNew->cksum0 = pDb->treehdr.log.cksum0;
+ pNew->cksum1 = pDb->treehdr.log.cksum1;
+
+ if( aReg[0].iEnd==0 && aReg[1].iEnd==0 && aReg[2].iStart>=LSM_MIN_LOGWRAP ){
+ /* Case 1. Wrap around to the start of the file. Write an LSM_LOG_JUMP
+ ** into the log file in this case. Pad it out to 8 bytes using a PAD2
+ ** record so that the checksums can be updated immediately. */
+ u8 aJump[] = {
+ LSM_LOG_PAD2, 0x04, 0x00, 0x00, 0x00, 0x00, LSM_LOG_JUMP, 0x00
+ };
+
+ lsmStringBinAppend(&pNew->buf, aJump, sizeof(aJump));
+ logUpdateCksum(pNew, pNew->buf.n);
+ rc = lsmFsWriteLog(pDb->pFS, aReg[2].iEnd, &pNew->buf);
+ pNew->iCksumBuf = pNew->buf.n = 0;
+
+ aReg[2].iEnd += 8;
+ pNew->jump = aReg[0] = aReg[2];
+ aReg[2].iStart = aReg[2].iEnd = 0;
+ }else if( aReg[1].iEnd==0 && aReg[2].iEnd<aReg[0].iEnd ){
+ /* Case 2. */
+ pNew->iOff = aReg[2].iEnd;
+ pNew->jump = aReg[0];
+ }else{
+ /* Case 3. */
+ assert( aReg[2].iStart>=aReg[0].iEnd && aReg[2].iStart>=aReg[1].iEnd );
+ pNew->iOff = aReg[2].iEnd;
+ }
+
+ if( pNew->jump.iStart ){
+ i64 iRound;
+ assert( pNew->jump.iStart>pNew->iOff );
+
+ iRound = firstByteOnSector(pNew, pNew->jump.iStart);
+ if( iRound>pNew->iOff ) pNew->jump.iStart = iRound;
+ pNew->jump.iEnd = lastByteOnSector(pNew, pNew->jump.iEnd);
+ }
+
+ assert( pDb->pLogWriter==pNew );
+ return rc;
+}
+
+/*
+** This function is called when a write-transaction is being closed.
+** Parameter bCommit is true if the transaction is being committed,
+** or false otherwise. The caller must hold the client-mutex to call
+** this function.
+**
+** A call to this function deletes the LogWriter object allocated by
+** lsmLogBegin(). If the transaction is being committed, the shared state
+** in *pLog is updated before returning.
+*/
+void lsmLogEnd(lsm_db *pDb, int bCommit){
+ DbLog *pLog;
+ LogWriter *p;
+ p = pDb->pLogWriter;
+
+ if( p==0 ) return;
+ pLog = &pDb->treehdr.log;
+
+ if( bCommit ){
+ pLog->aRegion[2].iEnd = p->iOff;
+ pLog->cksum0 = p->cksum0;
+ pLog->cksum1 = p->cksum1;
+ if( p->iRegion1End ){
+ /* This happens when the transaction had to jump over some other
+ ** part of the log. */
+ assert( pLog->aRegion[1].iEnd==0 );
+ assert( pLog->aRegion[2].iStart<p->iRegion1End );
+ pLog->aRegion[1].iStart = pLog->aRegion[2].iStart;
+ pLog->aRegion[1].iEnd = p->iRegion1End;
+ pLog->aRegion[2].iStart = p->iRegion2Start;
+ }
+ }
+}
+
+static int jumpIfRequired(
+ lsm_db *pDb,
+ LogWriter *pLog,
+ int nReq,
+ int *pbJump
+){
+ /* Determine if it is necessary to add an LSM_LOG_JUMP to jump over the
+ ** jump region before writing the LSM_LOG_WRITE or DELETE record. This
+ ** is necessary if there is insufficient room between the current offset
+ ** and the jump region to fit the new WRITE/DELETE record and the largest
+ ** possible JUMP record with up to 7 bytes of padding (a total of 17
+ ** bytes). */
+ if( (pLog->jump.iStart > (pLog->iOff + pLog->buf.n))
+ && (pLog->jump.iStart < (pLog->iOff + pLog->buf.n + (nReq + 17)))
+ ){
+ int rc; /* Return code */
+ i64 iJump; /* Offset to jump to */
+ u8 aJump[10]; /* Encoded jump record */
+ int nJump; /* Valid bytes in aJump[] */
+ int nPad; /* Bytes of padding required */
+
+ /* Serialize the JUMP record */
+ iJump = pLog->jump.iEnd+1;
+ aJump[0] = LSM_LOG_JUMP;
+ nJump = 1 + lsmVarintPut64(&aJump[1], iJump);
+
+ /* Adding padding to the contents of the buffer so that it will be a
+ ** multiple of 8 bytes in size after the JUMP record is appended. This
+ ** is not strictly required, it just makes the keeping the running
+ ** checksum up to date in this file a little simpler. */
+ nPad = (pLog->buf.n + nJump) % 8;
+ if( nPad ){
+ u8 aPad[7] = {0,0,0,0,0,0,0};
+ nPad = 8-nPad;
+ if( nPad==1 ){
+ aPad[0] = LSM_LOG_PAD1;
+ }else{
+ aPad[0] = LSM_LOG_PAD2;
+ aPad[1] = (u8)(nPad-2);
+ }
+ rc = lsmStringBinAppend(&pLog->buf, aPad, nPad);
+ if( rc!=LSM_OK ) return rc;
+ }
+
+ /* Append the JUMP record to the buffer. Then flush the buffer to disk
+ ** and update the checksums. The next write to the log file (assuming
+ ** there is no transaction rollback) will be to offset iJump (just past
+ ** the jump region). */
+ rc = lsmStringBinAppend(&pLog->buf, aJump, nJump);
+ if( rc!=LSM_OK ) return rc;
+ assert( (pLog->buf.n % 8)==0 );
+ rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
+ if( rc!=LSM_OK ) return rc;
+ logUpdateCksum(pLog, pLog->buf.n);
+ pLog->iRegion1End = (pLog->iOff + pLog->buf.n);
+ pLog->iRegion2Start = iJump;
+ pLog->iOff = iJump;
+ pLog->iCksumBuf = pLog->buf.n = 0;
+ if( pbJump ) *pbJump = 1;
+ }
+
+ return LSM_OK;
+}
+
+static int logCksumAndFlush(lsm_db *pDb){
+ int rc; /* Return code */
+ LogWriter *pLog = pDb->pLogWriter;
+
+ /* Calculate the checksum value. Append it to the buffer. */
+ logUpdateCksum(pLog, pLog->buf.n);
+ lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum0);
+ pLog->buf.n += 4;
+ lsmPutU32((u8 *)&pLog->buf.z[pLog->buf.n], pLog->cksum1);
+ pLog->buf.n += 4;
+
+ /* Write the contents of the buffer to disk. */
+ rc = lsmFsWriteLog(pDb->pFS, pLog->iOff, &pLog->buf);
+ pLog->iOff += pLog->buf.n;
+ pLog->iCksumBuf = pLog->buf.n = 0;
+
+ return rc;
+}
+
+/*
+** Write the contents of the log-buffer to disk. Then write either a CKSUM
+** or COMMIT record, depending on the value of parameter eType.
+*/
+static int logFlush(lsm_db *pDb, int eType){
+ int rc;
+ int nReq;
+ LogWriter *pLog = pDb->pLogWriter;
+
+ assert( eType==LSM_LOG_COMMIT );
+ assert( pLog );
+
+ /* Commit record is always 9 bytes in size. */
+ nReq = 9;
+ if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ) nReq += pLog->szSector + 17;
+ rc = jumpIfRequired(pDb, pLog, nReq, 0);
+
+ /* If this is a COMMIT, add padding to the log so that the COMMIT record
+ ** is aligned against the end of a disk sector. In other words, add padding
+ ** so that the first byte following the COMMIT record lies on a different
+ ** sector. */
+ if( eType==LSM_LOG_COMMIT && pLog->szSector>1 ){
+ int nPad; /* Bytes of padding to add */
+
+ /* Determine the value of nPad. */
+ nPad = ((pLog->iOff + pLog->buf.n + 9) % pLog->szSector);
+ if( nPad ) nPad = pLog->szSector - nPad;
+ rc = lsmStringExtend(&pLog->buf, nPad);
+ if( rc!=LSM_OK ) return rc;
+
+ while( nPad ){
+ if( nPad==1 ){
+ pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD1;
+ nPad = 0;
+ }else{
+ int n = LSM_MIN(200, nPad-2);
+ pLog->buf.z[pLog->buf.n++] = LSM_LOG_PAD2;
+ pLog->buf.z[pLog->buf.n++] = (char)n;
+ nPad -= 2;
+ memset(&pLog->buf.z[pLog->buf.n], 0x2B, n);
+ pLog->buf.n += n;
+ nPad -= n;
+ }
+ }
+ }
+
+ /* Make sure there is room in the log-buffer to add the CKSUM or COMMIT
+ ** record. Then add the first byte of it. */
+ rc = lsmStringExtend(&pLog->buf, 9);
+ if( rc!=LSM_OK ) return rc;
+ pLog->buf.z[pLog->buf.n++] = (char)eType;
+ memset(&pLog->buf.z[pLog->buf.n], 0, 8);
+
+ rc = logCksumAndFlush(pDb);
+
+ /* If this is a commit and synchronous=full, sync the log to disk. */
+ if( rc==LSM_OK && eType==LSM_LOG_COMMIT && pDb->eSafety==LSM_SAFETY_FULL ){
+ rc = lsmFsSyncLog(pDb->pFS);
+ }
+ return rc;
+}
+
+/*
+** Append an LSM_LOG_WRITE (if nVal>=0) or LSM_LOG_DELETE (if nVal<0)
+** record to the database log.
+*/
+int lsmLogWrite(
+ lsm_db *pDb, /* Database handle */
+ int eType,
+ void *pKey, int nKey, /* Database key to write to log */
+ void *pVal, int nVal /* Database value (or nVal<0) to write */
+){
+ int rc = LSM_OK;
+ LogWriter *pLog; /* Log object to write to */
+ int nReq; /* Bytes of space required in log */
+ int bCksum = 0; /* True to embed a checksum in this record */
+
+ assert( eType==LSM_WRITE || eType==LSM_DELETE || eType==LSM_DRANGE );
+ assert( LSM_LOG_WRITE==LSM_WRITE );
+ assert( LSM_LOG_DELETE==LSM_DELETE );
+ assert( LSM_LOG_DRANGE==LSM_DRANGE );
+ assert( (eType==LSM_LOG_DELETE)==(nVal<0) );
+
+ if( pDb->bUseLog==0 ) return LSM_OK;
+ pLog = pDb->pLogWriter;
+
+ /* Determine how many bytes of space are required, assuming that a checksum
+ ** will be embedded in this record (even though it may not be). */
+ nReq = 1 + lsmVarintLen32(nKey) + 8 + nKey;
+ if( eType!=LSM_LOG_DELETE ) nReq += lsmVarintLen32(nVal) + nVal;
+
+ /* Jump over the jump region if required. Set bCksum to true to tell the
+ ** code below to include a checksum in the record if either (a) writing
+ ** this record would mean that more than LSM_CKSUM_MAXDATA bytes of data
+ ** have been written to the log since the last checksum, or (b) the jump
+ ** is taken. */
+ rc = jumpIfRequired(pDb, pLog, nReq, &bCksum);
+ if( (pLog->buf.n+nReq) > LSM_CKSUM_MAXDATA ) bCksum = 1;
+
+ if( rc==LSM_OK ){
+ rc = lsmStringExtend(&pLog->buf, nReq);
+ }
+ if( rc==LSM_OK ){
+ u8 *a = (u8 *)&pLog->buf.z[pLog->buf.n];
+
+ /* Write the record header - the type byte followed by either 1 (for
+ ** DELETE) or 2 (for WRITE) varints. */
+ assert( LSM_LOG_WRITE_CKSUM == (LSM_LOG_WRITE | 0x0001) );
+ assert( LSM_LOG_DELETE_CKSUM == (LSM_LOG_DELETE | 0x0001) );
+ assert( LSM_LOG_DRANGE_CKSUM == (LSM_LOG_DRANGE | 0x0001) );
+ *(a++) = (u8)eType | (u8)bCksum;
+ a += lsmVarintPut32(a, nKey);
+ if( eType!=LSM_LOG_DELETE ) a += lsmVarintPut32(a, nVal);
+
+ if( bCksum ){
+ pLog->buf.n = (a - (u8 *)pLog->buf.z);
+ rc = logCksumAndFlush(pDb);
+ a = (u8 *)&pLog->buf.z[pLog->buf.n];
+ }
+
+ memcpy(a, pKey, nKey);
+ a += nKey;
+ if( eType!=LSM_LOG_DELETE ){
+ memcpy(a, pVal, nVal);
+ a += nVal;
+ }
+ pLog->buf.n = a - (u8 *)pLog->buf.z;
+ assert( pLog->buf.n<=pLog->buf.nAlloc );
+ }
+
+ return rc;
+}
+
+/*
+** Append an LSM_LOG_COMMIT record to the database log.
+*/
+int lsmLogCommit(lsm_db *pDb){
+ if( pDb->bUseLog==0 ) return LSM_OK;
+ return logFlush(pDb, LSM_LOG_COMMIT);
+}
+
+/*
+** Store the current offset and other checksum related information in the
+** structure *pMark. Later, *pMark can be passed to lsmLogSeek() to "rewind"
+** the LogWriter object to the current log file offset. This is used when
+** rolling back savepoint transactions.
+*/
+void lsmLogTell(
+ lsm_db *pDb, /* Database handle */
+ LogMark *pMark /* Populate this object with current offset */
+){
+ LogWriter *pLog;
+ int nCksum;
+
+ if( pDb->bUseLog==0 ) return;
+ pLog = pDb->pLogWriter;
+ nCksum = pLog->buf.n & 0xFFFFFFF8;
+ logUpdateCksum(pLog, nCksum);
+ assert( pLog->iCksumBuf==nCksum );
+ pMark->nBuf = pLog->buf.n - nCksum;
+ memcpy(pMark->aBuf, &pLog->buf.z[nCksum], pMark->nBuf);
+
+ pMark->iOff = pLog->iOff + pLog->buf.n;
+ pMark->cksum0 = pLog->cksum0;
+ pMark->cksum1 = pLog->cksum1;
+}
+
+/*
+** Seek (rewind) back to the log file offset stored by an ealier call to
+** lsmLogTell() in *pMark.
+*/
+void lsmLogSeek(
+ lsm_db *pDb, /* Database handle */
+ LogMark *pMark /* Object containing log offset to seek to */
+){
+ LogWriter *pLog;
+
+ if( pDb->bUseLog==0 ) return;
+ pLog = pDb->pLogWriter;
+
+ assert( pMark->iOff<=pLog->iOff+pLog->buf.n );
+ if( (pMark->iOff & 0xFFFFFFF8)>=pLog->iOff ){
+ pLog->buf.n = (int)(pMark->iOff - pLog->iOff);
+ pLog->iCksumBuf = (pLog->buf.n & 0xFFFFFFF8);
+ }else{
+ pLog->buf.n = pMark->nBuf;
+ memcpy(pLog->buf.z, pMark->aBuf, pMark->nBuf);
+ pLog->iCksumBuf = 0;
+ pLog->iOff = pMark->iOff - pMark->nBuf;
+ }
+ pLog->cksum0 = pMark->cksum0;
+ pLog->cksum1 = pMark->cksum1;
+
+ if( pMark->iOff > pLog->iRegion1End ) pLog->iRegion1End = 0;
+ if( pMark->iOff > pLog->iRegion2Start ) pLog->iRegion2Start = 0;
+}
+
+/*
+** This function does the work for an lsm_info(LOG_STRUCTURE) request.
+*/
+int lsmInfoLogStructure(lsm_db *pDb, char **pzVal){
+ int rc = LSM_OK;
+ char *zVal = 0;
+
+ /* If there is no read or write transaction open, read the latest
+ ** tree-header from shared-memory to report on. If necessary, update
+ ** it based on the contents of the database header.
+ **
+ ** No locks are taken here - these are passive read operations only.
+ */
+ if( pDb->pCsr==0 && pDb->nTransOpen==0 ){
+ rc = lsmTreeLoadHeader(pDb, 0);
+ if( rc==LSM_OK ) rc = logReclaimSpace(pDb);
+ }
+
+ if( rc==LSM_OK ){
+ DbLog *pLog = &pDb->treehdr.log;
+ zVal = lsmMallocPrintf(pDb->pEnv,
+ "%d %d %d %d %d %d",
+ (int)pLog->aRegion[0].iStart, (int)pLog->aRegion[0].iEnd,
+ (int)pLog->aRegion[1].iStart, (int)pLog->aRegion[1].iEnd,
+ (int)pLog->aRegion[2].iStart, (int)pLog->aRegion[2].iEnd
+ );
+ if( !zVal ) rc = LSM_NOMEM_BKPT;
+ }
+
+ *pzVal = zVal;
+ return rc;
+}
+
+/*************************************************************************
+** Begin code for log recovery.
+*/
+
+typedef struct LogReader LogReader;
+struct LogReader {
+ FileSystem *pFS; /* File system to read from */
+ i64 iOff; /* File offset at end of buf content */
+ int iBuf; /* Current read offset in buf */
+ LsmString buf; /* Buffer containing file content */
+
+ int iCksumBuf; /* Offset in buf corresponding to cksum[01] */
+ u32 cksum0; /* Checksum 0 at offset iCksumBuf */
+ u32 cksum1; /* Checksum 1 at offset iCksumBuf */
+};
+
+static void logReaderBlob(
+ LogReader *p, /* Log reader object */
+ LsmString *pBuf, /* Dynamic storage, if required */
+ int nBlob, /* Number of bytes to read */
+ u8 **ppBlob, /* OUT: Pointer to blob read */
+ int *pRc /* IN/OUT: Error code */
+){
+ static const int LOG_READ_SIZE = 512;
+ int rc = *pRc; /* Return code */
+ int nReq = nBlob; /* Bytes required */
+
+ while( rc==LSM_OK && nReq>0 ){
+ int nAvail; /* Bytes of data available in p->buf */
+ if( p->buf.n==p->iBuf ){
+ int nCksum; /* Total bytes requiring checksum */
+ int nCarry = 0; /* Total bytes requiring checksum */
+
+ nCksum = p->iBuf - p->iCksumBuf;
+ if( nCksum>0 ){
+ nCarry = nCksum % 8;
+ nCksum = ((nCksum / 8) * 8);
+ if( nCksum>0 ){
+ logCksumUnaligned(
+ &p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1
+ );
+ }
+ }
+ if( nCarry>0 ) memcpy(p->buf.z, &p->buf.z[p->iBuf-nCarry], nCarry);
+ p->buf.n = nCarry;
+ p->iBuf = nCarry;
+
+ rc = lsmFsReadLog(p->pFS, p->iOff, LOG_READ_SIZE, &p->buf);
+ if( rc!=LSM_OK ) break;
+ p->iCksumBuf = 0;
+ p->iOff += LOG_READ_SIZE;
+ }
+
+ nAvail = p->buf.n - p->iBuf;
+ if( ppBlob && nReq==nBlob && nBlob<=nAvail ){
+ *ppBlob = (u8 *)&p->buf.z[p->iBuf];
+ p->iBuf += nBlob;
+ nReq = 0;
+ }else{
+ int nCopy = LSM_MIN(nAvail, nReq);
+ if( nBlob==nReq ){
+ pBuf->n = 0;
+ }
+ rc = lsmStringBinAppend(pBuf, (u8 *)&p->buf.z[p->iBuf], nCopy);
+ nReq -= nCopy;
+ p->iBuf += nCopy;
+ if( nReq==0 && ppBlob ){
+ *ppBlob = (u8*)pBuf->z;
+ }
+ }
+ }
+
+ *pRc = rc;
+}
+
+static void logReaderVarint(
+ LogReader *p,
+ LsmString *pBuf,
+ int *piVal, /* OUT: Value read from log */
+ int *pRc /* IN/OUT: Error code */
+){
+ if( *pRc==LSM_OK ){
+ u8 *aVarint;
+ if( p->buf.n==p->iBuf ){
+ logReaderBlob(p, 0, 10, &aVarint, pRc);
+ if( LSM_OK==*pRc ) p->iBuf -= (10 - lsmVarintGet32(aVarint, piVal));
+ }else{
+ logReaderBlob(p, pBuf, lsmVarintSize(p->buf.z[p->iBuf]), &aVarint, pRc);
+ if( LSM_OK==*pRc ) lsmVarintGet32(aVarint, piVal);
+ }
+ }
+}
+
+static void logReaderByte(LogReader *p, u8 *pByte, int *pRc){
+ u8 *pPtr = 0;
+ logReaderBlob(p, 0, 1, &pPtr, pRc);
+ if( pPtr ) *pByte = *pPtr;
+}
+
+static void logReaderCksum(LogReader *p, LsmString *pBuf, int *pbEof, int *pRc){
+ if( *pRc==LSM_OK ){
+ u8 *pPtr = 0;
+ u32 cksum0, cksum1;
+ int nCksum = p->iBuf - p->iCksumBuf;
+
+ /* Update in-memory (expected) checksums */
+ assert( nCksum>=0 );
+ logCksumUnaligned(&p->buf.z[p->iCksumBuf], nCksum, &p->cksum0, &p->cksum1);
+ p->iCksumBuf = p->iBuf + 8;
+ logReaderBlob(p, pBuf, 8, &pPtr, pRc);
+ assert( pPtr || *pRc );
+
+ /* Read the checksums from the log file. Set *pbEof if they do not match. */
+ if( pPtr ){
+ cksum0 = lsmGetU32(pPtr);
+ cksum1 = lsmGetU32(&pPtr[4]);
+ *pbEof = (cksum0!=p->cksum0 || cksum1!=p->cksum1);
+ p->iCksumBuf = p->iBuf;
+ }
+ }
+}
+
+static void logReaderInit(
+ lsm_db *pDb, /* Database handle */
+ DbLog *pLog, /* Log object associated with pDb */
+ int bInitBuf, /* True if p->buf is uninitialized */
+ LogReader *p /* Initialize this LogReader object */
+){
+ p->pFS = pDb->pFS;
+ p->iOff = pLog->aRegion[2].iStart;
+ p->cksum0 = pLog->cksum0;
+ p->cksum1 = pLog->cksum1;
+ if( bInitBuf ){ lsmStringInit(&p->buf, pDb->pEnv); }
+ p->buf.n = 0;
+ p->iCksumBuf = 0;
+ p->iBuf = 0;
+}
+
+/*
+** This function is called after reading the header of a LOG_DELETE or
+** LOG_WRITE record. Parameter nByte is the total size of the key and
+** value that follow the header just read. Return true if the size and
+** position of the record indicate that it should contain a checksum.
+*/
+static int logRequireCksum(LogReader *p, int nByte){
+ return ((p->iBuf + nByte - p->iCksumBuf) > LSM_CKSUM_MAXDATA);
+}
+
+/*
+** Recover the contents of the log file.
+*/
+int lsmLogRecover(lsm_db *pDb){
+ LsmString buf1; /* Key buffer */
+ LsmString buf2; /* Value buffer */
+ LogReader reader; /* Log reader object */
+ int rc = LSM_OK; /* Return code */
+ int nCommit = 0; /* Number of transactions to recover */
+ int iPass;
+ int nJump = 0; /* Number of LSM_LOG_JUMP records in pass 0 */
+ DbLog *pLog;
+ int bOpen;
+
+ rc = lsmFsOpenLog(pDb, &bOpen);
+ if( rc!=LSM_OK ) return rc;
+
+ rc = lsmTreeInit(pDb);
+ if( rc!=LSM_OK ) return rc;
+
+ pLog = &pDb->treehdr.log;
+ lsmCheckpointLogoffset(pDb->pShmhdr->aSnap2, pLog);
+
+ logReaderInit(pDb, pLog, 1, &reader);
+ lsmStringInit(&buf1, pDb->pEnv);
+ lsmStringInit(&buf2, pDb->pEnv);
+
+ /* The outer for() loop runs at most twice. The first iteration is to
+ ** count the number of committed transactions in the log. The second
+ ** iterates through those transactions and updates the in-memory tree
+ ** structure with their contents. */
+ if( bOpen ){
+ for(iPass=0; iPass<2 && rc==LSM_OK; iPass++){
+ int bEof = 0;
+
+ while( rc==LSM_OK && !bEof ){
+ u8 eType = 0;
+ logReaderByte(&reader, &eType, &rc);
+
+ switch( eType ){
+ case LSM_LOG_PAD1:
+ break;
+
+ case LSM_LOG_PAD2: {
+ int nPad;
+ logReaderVarint(&reader, &buf1, &nPad, &rc);
+ logReaderBlob(&reader, &buf1, nPad, 0, &rc);
+ break;
+ }
+
+ case LSM_LOG_DRANGE:
+ case LSM_LOG_DRANGE_CKSUM:
+ case LSM_LOG_WRITE:
+ case LSM_LOG_WRITE_CKSUM: {
+ int nKey;
+ int nVal;
+ u8 *aVal;
+ logReaderVarint(&reader, &buf1, &nKey, &rc);
+ logReaderVarint(&reader, &buf2, &nVal, &rc);
+
+ if( eType==LSM_LOG_WRITE_CKSUM || eType==LSM_LOG_DRANGE_CKSUM ){
+ logReaderCksum(&reader, &buf1, &bEof, &rc);
+ }else{
+ bEof = logRequireCksum(&reader, nKey+nVal);
+ }
+ if( bEof ) break;
+
+ logReaderBlob(&reader, &buf1, nKey, 0, &rc);
+ logReaderBlob(&reader, &buf2, nVal, &aVal, &rc);
+ if( iPass==1 && rc==LSM_OK ){
+ if( eType==LSM_LOG_WRITE || eType==LSM_LOG_WRITE_CKSUM ){
+ rc = lsmTreeInsert(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
+ }else{
+ rc = lsmTreeDelete(pDb, (u8 *)buf1.z, nKey, aVal, nVal);
+ }
+ }
+ break;
+ }
+
+ case LSM_LOG_DELETE:
+ case LSM_LOG_DELETE_CKSUM: {
+ int nKey; u8 *aKey;
+ logReaderVarint(&reader, &buf1, &nKey, &rc);
+
+ if( eType==LSM_LOG_DELETE_CKSUM ){
+ logReaderCksum(&reader, &buf1, &bEof, &rc);
+ }else{
+ bEof = logRequireCksum(&reader, nKey);
+ }
+ if( bEof ) break;
+
+ logReaderBlob(&reader, &buf1, nKey, &aKey, &rc);
+ if( iPass==1 && rc==LSM_OK ){
+ rc = lsmTreeInsert(pDb, aKey, nKey, NULL, -1);
+ }
+ break;
+ }
+
+ case LSM_LOG_COMMIT:
+ logReaderCksum(&reader, &buf1, &bEof, &rc);
+ if( bEof==0 ){
+ nCommit++;
+ assert( nCommit>0 || iPass==1 );
+ if( nCommit==0 ) bEof = 1;
+ }
+ break;
+
+ case LSM_LOG_JUMP: {
+ int iOff = 0;
+ logReaderVarint(&reader, &buf1, &iOff, &rc);
+ if( rc==LSM_OK ){
+ if( iPass==1 ){
+ if( pLog->aRegion[2].iStart==0 ){
+ assert( pLog->aRegion[1].iStart==0 );
+ pLog->aRegion[1].iEnd = reader.iOff;
+ }else{
+ assert( pLog->aRegion[0].iStart==0 );
+ pLog->aRegion[0].iStart = pLog->aRegion[2].iStart;
+ pLog->aRegion[0].iEnd = reader.iOff-reader.buf.n+reader.iBuf;
+ }
+ pLog->aRegion[2].iStart = iOff;
+ }else{
+ if( (nJump++)==2 ){
+ bEof = 1;
+ }
+ }
+
+ reader.iOff = iOff;
+ reader.buf.n = reader.iBuf;
+ }
+ break;
+ }
+
+ default:
+ /* Including LSM_LOG_EOF */
+ bEof = 1;
+ break;
+ }
+ }
+
+ if( rc==LSM_OK && iPass==0 ){
+ if( nCommit==0 ){
+ if( pLog->aRegion[2].iStart==0 ){
+ iPass = 1;
+ }else{
+ pLog->aRegion[2].iStart = 0;
+ iPass = -1;
+ lsmCheckpointZeroLogoffset(pDb);
+ }
+ }
+ logReaderInit(pDb, pLog, 0, &reader);
+ nCommit = nCommit * -1;
+ }
+ }
+ }
+
+ /* Initialize DbLog object */
+ if( rc==LSM_OK ){
+ pLog->aRegion[2].iEnd = reader.iOff - reader.buf.n + reader.iBuf;
+ pLog->cksum0 = reader.cksum0;
+ pLog->cksum1 = reader.cksum1;
+ }
+
+ if( rc==LSM_OK ){
+ rc = lsmFinishRecovery(pDb);
+ }else{
+ lsmFinishRecovery(pDb);
+ }
+
+ if( pDb->bRoTrans ){
+ lsmFsCloseLog(pDb);
+ }
+
+ lsmStringClear(&buf1);
+ lsmStringClear(&buf2);
+ lsmStringClear(&reader.buf);
+ return rc;
+}
+
+void lsmLogClose(lsm_db *db){
+ if( db->pLogWriter ){
+ lsmFree(db->pEnv, db->pLogWriter->buf.z);
+ lsmFree(db->pEnv, db->pLogWriter);
+ db->pLogWriter = 0;
+ }
+}