diff options
Diffstat (limited to 'src/backend/access/spgist/spgxlog.c')
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 1013 |
1 files changed, 1013 insertions, 0 deletions
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c new file mode 100644 index 0000000..3dfd2aa --- /dev/null +++ b/src/backend/access/spgist/spgxlog.c @@ -0,0 +1,1013 @@ +/*------------------------------------------------------------------------- + * + * spgxlog.c + * WAL replay logic for SP-GiST + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/bufmask.h" +#include "access/spgist_private.h" +#include "access/spgxlog.h" +#include "access/transam.h" +#include "access/xlog.h" +#include "access/xlogutils.h" +#include "storage/standby.h" +#include "utils/memutils.h" + + +static MemoryContext opCtx; /* working memory for operations */ + + +/* + * Prepare a dummy SpGistState, with just the minimum info needed for replay. + * + * At present, all we need is enough info to support spgFormDeadTuple(), + * plus the isBuild flag. + */ +static void +fillFakeState(SpGistState *state, spgxlogState stateSrc) +{ + memset(state, 0, sizeof(*state)); + + state->myXid = stateSrc.myXid; + state->isBuild = stateSrc.isBuild; + state->deadTupleStorage = palloc0(SGDTSIZE); +} + +/* + * Add a leaf tuple, or replace an existing placeholder tuple. This is used + * to replay SpGistPageAddNewItem() operations. If the offset points at an + * existing tuple, it had better be a placeholder tuple. + */ +static void +addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset) +{ + if (offset <= PageGetMaxOffsetNumber(page)) + { + SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page, + PageGetItemId(page, offset)); + + if (dt->tupstate != SPGIST_PLACEHOLDER) + elog(ERROR, "SPGiST tuple to be replaced is not a placeholder"); + + Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0); + SpGistPageGetOpaque(page)->nPlaceholder--; + + PageIndexTupleDelete(page, offset); + } + + Assert(offset <= PageGetMaxOffsetNumber(page) + 1); + + if (PageAddItem(page, tuple, size, offset, false, false) != offset) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + size); +} + +static void +spgRedoAddLeaf(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr; + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + Buffer buffer; + Page page; + XLogRedoAction action; + + ptr += sizeof(spgxlogAddLeaf); + leafTuple = ptr; + /* the leaf tuple is unaligned, so make a copy to access its header */ + memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + + /* + * In normal operation we would have both current and parent pages locked + * simultaneously; but in WAL replay it should be safe to update the leaf + * page before updating the parent. + */ + if (xldata->newPage) + { + buffer = XLogInitBufferForRedo(record, 0); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 0, &buffer); + + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + /* insert new tuple */ + if (xldata->offnumLeaf != xldata->offnumHeadLeaf) + { + /* normal cases, tuple was added by SpGistPageAddNewItem */ + addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, + xldata->offnumLeaf); + + /* update head tuple's chain link if needed */ + if (xldata->offnumHeadLeaf != InvalidOffsetNumber) + { + SpGistLeafTuple head; + + head = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumHeadLeaf)); + Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr)); + SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf); + } + } + else + { + /* replacing a DEAD tuple */ + PageIndexTupleDelete(page, xldata->offnumLeaf); + if (PageAddItem(page, + (Item) leafTuple, leafTupleHdr.size, + xldata->offnumLeaf, false, false) != xldata->offnumLeaf) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + leafTupleHdr.size); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* update parent downlink if necessary */ + if (xldata->offnumParent != InvalidOffsetNumber) + { + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple tuple; + BlockNumber blknoLeaf; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf); + + page = BufferGetPage(buffer); + + tuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(tuple, xldata->nodeI, + blknoLeaf, xldata->offnumLeaf); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } +} + +static void +spgRedoMoveLeafs(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr; + SpGistState state; + OffsetNumber *toDelete; + OffsetNumber *toInsert; + int nInsert; + Buffer buffer; + Page page; + XLogRedoAction action; + BlockNumber blknoDst; + + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst); + + fillFakeState(&state, xldata->stateSrc); + + nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1; + + ptr += SizeOfSpgxlogMoveLeafs; + toDelete = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nMoves; + toInsert = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * nInsert; + + /* now ptr points to the list of leaf tuples */ + + /* + * In normal operation we would have all three pages (source, dest, and + * parent) locked simultaneously; but in WAL replay it should be safe to + * update them one at a time, as long as we do it in the right order. + */ + + /* Insert tuples on the dest page (do first, so redirect is valid) */ + if (xldata->newPage) + { + buffer = XLogInitBufferForRedo(record, 1); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 1, &buffer); + + if (action == BLK_NEEDS_REDO) + { + int i; + + page = BufferGetPage(buffer); + + for (i = 0; i < nInsert; i++) + { + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + + /* + * the tuples are not aligned, so must copy to access the size + * field. + */ + leafTuple = ptr; + memcpy(&leafTupleHdr, leafTuple, + sizeof(SpGistLeafTupleData)); + + addOrReplaceTuple(page, (Item) leafTuple, + leafTupleHdr.size, toInsert[i]); + ptr += leafTupleHdr.size; + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* Delete tuples from the source page, inserting a redirection pointer */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, + state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + blknoDst, + toInsert[nInsert - 1]); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* And update the parent downlink */ + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple tuple; + + page = BufferGetPage(buffer); + + tuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(tuple, xldata->nodeI, + blknoDst, toInsert[nInsert - 1]); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoAddNode(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogAddNode *xldata = (spgxlogAddNode *) ptr; + char *innerTuple; + SpGistInnerTupleData innerTupleHdr; + SpGistState state; + Buffer buffer; + Page page; + XLogRedoAction action; + + ptr += sizeof(spgxlogAddNode); + innerTuple = ptr; + /* the tuple is unaligned, so make a copy to access its header */ + memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData)); + + fillFakeState(&state, xldata->stateSrc); + + if (!XLogRecHasBlockRef(record, 1)) + { + /* update in place */ + Assert(xldata->parentBlk == -1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + PageIndexTupleDelete(page, xldata->offnum); + if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, + xldata->offnum, + false, false) != xldata->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + innerTupleHdr.size); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } + else + { + BlockNumber blkno; + BlockNumber blknoNew; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno); + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew); + + /* + * In normal operation we would have all three pages (source, dest, + * and parent) locked simultaneously; but in WAL replay it should be + * safe to update them one at a time, as long as we do it in the right + * order. We must insert the new tuple before replacing the old tuple + * with the redirect tuple. + */ + + /* Install new tuple first so redirect is valid */ + if (xldata->newPage) + { + /* AddNode is not used for nulls pages */ + buffer = XLogInitBufferForRedo(record, 1); + SpGistInitBuffer(buffer, 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + addOrReplaceTuple(page, (Item) innerTuple, + innerTupleHdr.size, xldata->offnumNew); + + /* + * If parent is in this same page, update it now. + */ + if (xldata->parentBlk == 1) + { + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); + } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* Delete old tuple, replacing it with redirect or placeholder tuple */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + SpGistDeadTuple dt; + + page = BufferGetPage(buffer); + + if (state.isBuild) + dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + else + dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, + blknoNew, + xldata->offnumNew); + + PageIndexTupleDelete(page, xldata->offnum); + if (PageAddItem(page, (Item) dt, dt->size, + xldata->offnum, + false, false) != xldata->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + dt->size); + + if (state.isBuild) + SpGistPageGetOpaque(page)->nPlaceholder++; + else + SpGistPageGetOpaque(page)->nRedirection++; + + /* + * If parent is in this same page, update it now. + */ + if (xldata->parentBlk == 0) + { + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); + } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* + * Update parent downlink (if we didn't do it as part of the source or + * destination page update already). + */ + if (xldata->parentBlk == 2) + { + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parentTuple; + + page = BufferGetPage(buffer); + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } + } +} + +static void +spgRedoSplitTuple(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr; + char *prefixTuple; + SpGistInnerTupleData prefixTupleHdr; + char *postfixTuple; + SpGistInnerTupleData postfixTupleHdr; + Buffer buffer; + Page page; + XLogRedoAction action; + + ptr += sizeof(spgxlogSplitTuple); + prefixTuple = ptr; + /* the prefix tuple is unaligned, so make a copy to access its header */ + memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData)); + ptr += prefixTupleHdr.size; + postfixTuple = ptr; + /* postfix tuple is also unaligned */ + memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData)); + + /* + * In normal operation we would have both pages locked simultaneously; but + * in WAL replay it should be safe to update them one at a time, as long + * as we do it in the right order. + */ + + /* insert postfix tuple first to avoid dangling link */ + if (!xldata->postfixBlkSame) + { + if (xldata->newPage) + { + buffer = XLogInitBufferForRedo(record, 1); + /* SplitTuple is not used for nulls pages */ + SpGistInitBuffer(buffer, 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, xldata->offnumPostfix); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } + + /* now handle the original page */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + PageIndexTupleDelete(page, xldata->offnumPrefix); + if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, + xldata->offnumPrefix, false, false) != xldata->offnumPrefix) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + prefixTupleHdr.size); + + if (xldata->postfixBlkSame) + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, + xldata->offnumPostfix); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoPickSplit(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr; + char *innerTuple; + SpGistInnerTupleData innerTupleHdr; + SpGistState state; + OffsetNumber *toDelete; + OffsetNumber *toInsert; + uint8 *leafPageSelect; + Buffer srcBuffer; + Buffer destBuffer; + Buffer innerBuffer; + Page srcPage; + Page destPage; + Page page; + int i; + BlockNumber blknoInner; + XLogRedoAction action; + + XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner); + + fillFakeState(&state, xldata->stateSrc); + + ptr += SizeOfSpgxlogPickSplit; + toDelete = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nDelete; + toInsert = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nInsert; + leafPageSelect = (uint8 *) ptr; + ptr += sizeof(uint8) * xldata->nInsert; + + innerTuple = ptr; + /* the inner tuple is unaligned, so make a copy to access its header */ + memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData)); + ptr += innerTupleHdr.size; + + /* now ptr points to the list of leaf tuples */ + + if (xldata->isRootSplit) + { + /* when splitting root, we touch it only in the guise of new inner */ + srcBuffer = InvalidBuffer; + srcPage = NULL; + } + else if (xldata->initSrc) + { + /* just re-init the source page */ + srcBuffer = XLogInitBufferForRedo(record, 0); + srcPage = (Page) BufferGetPage(srcBuffer); + + SpGistInitBuffer(srcBuffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + /* don't update LSN etc till we're done with it */ + } + else + { + /* + * Delete the specified tuples from source page. (In case we're in + * Hot Standby, we need to hold lock on the page till we're done + * inserting leaf tuples and the new inner tuple, else the added + * redirect tuple will be a dangling link.) + */ + srcPage = NULL; + if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO) + { + srcPage = BufferGetPage(srcBuffer); + + /* + * We have it a bit easier here than in doPickSplit(), because we + * know the inner tuple's location already, so we can inject the + * correct redirection tuple now. + */ + if (!state.isBuild) + spgPageIndexMultiDelete(&state, srcPage, + toDelete, xldata->nDelete, + SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + blknoInner, + xldata->offnumInner); + else + spgPageIndexMultiDelete(&state, srcPage, + toDelete, xldata->nDelete, + SPGIST_PLACEHOLDER, + SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + /* don't update LSN etc till we're done with it */ + } + } + + /* try to access dest page if any */ + if (!XLogRecHasBlockRef(record, 1)) + { + destBuffer = InvalidBuffer; + destPage = NULL; + } + else if (xldata->initDest) + { + /* just re-init the dest page */ + destBuffer = XLogInitBufferForRedo(record, 1); + destPage = (Page) BufferGetPage(destBuffer); + + SpGistInitBuffer(destBuffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + /* don't update LSN etc till we're done with it */ + } + else + { + /* + * We could probably release the page lock immediately in the + * full-page-image case, but for safety let's hold it till later. + */ + if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO) + destPage = (Page) BufferGetPage(destBuffer); + else + destPage = NULL; /* don't do any page updates */ + } + + /* restore leaf tuples to src and/or dest page */ + for (i = 0; i < xldata->nInsert; i++) + { + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + + /* the tuples are not aligned, so must copy to access the size field. */ + leafTuple = ptr; + memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + ptr += leafTupleHdr.size; + + page = leafPageSelect[i] ? destPage : srcPage; + if (page == NULL) + continue; /* no need to touch this page */ + + addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, + toInsert[i]); + } + + /* Now update src and dest page LSNs if needed */ + if (srcPage != NULL) + { + PageSetLSN(srcPage, lsn); + MarkBufferDirty(srcBuffer); + } + if (destPage != NULL) + { + PageSetLSN(destPage, lsn); + MarkBufferDirty(destBuffer); + } + + /* restore new inner tuple */ + if (xldata->initInner) + { + innerBuffer = XLogInitBufferForRedo(record, 2); + SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 2, &innerBuffer); + + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(innerBuffer); + + addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size, + xldata->offnumInner); + + /* if inner is also parent, update link while we're here */ + if (xldata->innerIsParent) + { + SpGistInnerTuple parent; + + parent = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + spgUpdateNodeLink(parent, xldata->nodeI, + blknoInner, xldata->offnumInner); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(innerBuffer); + } + if (BufferIsValid(innerBuffer)) + UnlockReleaseBuffer(innerBuffer); + + /* + * Now we can release the leaf-page locks. It's okay to do this before + * updating the parent downlink. + */ + if (BufferIsValid(srcBuffer)) + UnlockReleaseBuffer(srcBuffer); + if (BufferIsValid(destBuffer)) + UnlockReleaseBuffer(destBuffer); + + /* update parent downlink, unless we did it above */ + if (XLogRecHasBlockRef(record, 3)) + { + Buffer parentBuffer; + + if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parent; + + page = BufferGetPage(parentBuffer); + + parent = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + spgUpdateNodeLink(parent, xldata->nodeI, + blknoInner, xldata->offnumInner); + + PageSetLSN(page, lsn); + MarkBufferDirty(parentBuffer); + } + if (BufferIsValid(parentBuffer)) + UnlockReleaseBuffer(parentBuffer); + } + else + Assert(xldata->innerIsParent || xldata->isRootSplit); +} + +static void +spgRedoVacuumLeaf(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr; + OffsetNumber *toDead; + OffsetNumber *toPlaceholder; + OffsetNumber *moveSrc; + OffsetNumber *moveDest; + OffsetNumber *chainSrc; + OffsetNumber *chainDest; + SpGistState state; + Buffer buffer; + Page page; + int i; + + fillFakeState(&state, xldata->stateSrc); + + ptr += SizeOfSpgxlogVacuumLeaf; + toDead = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nDead; + toPlaceholder = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nPlaceholder; + moveSrc = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nMove; + moveDest = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nMove; + chainSrc = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nChain; + chainDest = (OffsetNumber *) ptr; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + spgPageIndexMultiDelete(&state, page, + toDead, xldata->nDead, + SPGIST_DEAD, SPGIST_DEAD, + InvalidBlockNumber, + InvalidOffsetNumber); + + spgPageIndexMultiDelete(&state, page, + toPlaceholder, xldata->nPlaceholder, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + /* see comments in vacuumLeafPage() */ + for (i = 0; i < xldata->nMove; i++) + { + ItemId idSrc = PageGetItemId(page, moveSrc[i]); + ItemId idDest = PageGetItemId(page, moveDest[i]); + ItemIdData tmp; + + tmp = *idSrc; + *idSrc = *idDest; + *idDest = tmp; + } + + spgPageIndexMultiDelete(&state, page, + moveSrc, xldata->nMove, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + for (i = 0; i < xldata->nChain; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, chainSrc[i])); + Assert(lt->tupstate == SPGIST_LIVE); + SGLT_SET_NEXTOFFSET(lt, chainDest[i]); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoVacuumRoot(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr; + OffsetNumber *toDelete; + Buffer buffer; + Page page; + + toDelete = xldata->offsets; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + /* The tuple numbers are in order */ + PageIndexMultiDelete(page, toDelete, xldata->nDelete); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoVacuumRedirect(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; + OffsetNumber *itemToPlaceholder; + Buffer buffer; + + itemToPlaceholder = xldata->offsets; + + /* + * If any redirection tuples are being removed, make sure there are no + * live Hot Standby transactions that might need to see them. + */ + if (InHotStandby) + { + if (TransactionIdIsValid(xldata->newestRedirectXid)) + { + RelFileNode node; + + XLogRecGetBlockTag(record, 0, &node, NULL, NULL); + ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + node); + } + } + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + Page page = BufferGetPage(buffer); + SpGistPageOpaque opaque = SpGistPageGetOpaque(page); + int i; + + /* Convert redirect pointers to plain placeholders */ + for (i = 0; i < xldata->nToPlaceholder; i++) + { + SpGistDeadTuple dt; + + dt = (SpGistDeadTuple) PageGetItem(page, + PageGetItemId(page, itemToPlaceholder[i])); + Assert(dt->tupstate == SPGIST_REDIRECT); + dt->tupstate = SPGIST_PLACEHOLDER; + ItemPointerSetInvalid(&dt->pointer); + } + + Assert(opaque->nRedirection >= xldata->nToPlaceholder); + opaque->nRedirection -= xldata->nToPlaceholder; + opaque->nPlaceholder += xldata->nToPlaceholder; + + /* Remove placeholder tuples at end of page */ + if (xldata->firstPlaceholder != InvalidOffsetNumber) + { + int max = PageGetMaxOffsetNumber(page); + OffsetNumber *toDelete; + + toDelete = palloc(sizeof(OffsetNumber) * max); + + for (i = xldata->firstPlaceholder; i <= max; i++) + toDelete[i - xldata->firstPlaceholder] = i; + + i = max - xldata->firstPlaceholder + 1; + Assert(opaque->nPlaceholder >= i); + opaque->nPlaceholder -= i; + + /* The array is sorted, so can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, toDelete, i); + + pfree(toDelete); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +void +spg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + MemoryContext oldCxt; + + oldCxt = MemoryContextSwitchTo(opCtx); + switch (info) + { + case XLOG_SPGIST_ADD_LEAF: + spgRedoAddLeaf(record); + break; + case XLOG_SPGIST_MOVE_LEAFS: + spgRedoMoveLeafs(record); + break; + case XLOG_SPGIST_ADD_NODE: + spgRedoAddNode(record); + break; + case XLOG_SPGIST_SPLIT_TUPLE: + spgRedoSplitTuple(record); + break; + case XLOG_SPGIST_PICKSPLIT: + spgRedoPickSplit(record); + break; + case XLOG_SPGIST_VACUUM_LEAF: + spgRedoVacuumLeaf(record); + break; + case XLOG_SPGIST_VACUUM_ROOT: + spgRedoVacuumRoot(record); + break; + case XLOG_SPGIST_VACUUM_REDIRECT: + spgRedoVacuumRedirect(record); + break; + default: + elog(PANIC, "spg_redo: unknown op code %u", info); + } + + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); +} + +void +spg_xlog_startup(void) +{ + opCtx = AllocSetContextCreate(CurrentMemoryContext, + "SP-GiST temporary context", + ALLOCSET_DEFAULT_SIZES); +} + +void +spg_xlog_cleanup(void) +{ + MemoryContextDelete(opCtx); + opCtx = NULL; +} + +/* + * Mask a SpGist page before performing consistency checks on it. + */ +void +spg_mask(char *pagedata, BlockNumber blkno) +{ + Page page = (Page) pagedata; + PageHeader pagehdr = (PageHeader) page; + + mask_page_lsn_and_checksum(page); + + mask_page_hint_bits(page); + + /* + * Mask the unused space, but only if the page's pd_lower appears to have + * been set correctly. + */ + if (pagehdr->pd_lower >= SizeOfPageHeaderData) + mask_unused_space(page); +} |