diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtxlog.c')
-rw-r--r-- | src/backend/access/nbtree/nbtxlog.c | 1126 |
1 files changed, 1126 insertions, 0 deletions
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c new file mode 100644 index 0000000..786c08c --- /dev/null +++ b/src/backend/access/nbtree/nbtxlog.c @@ -0,0 +1,1126 @@ +/*------------------------------------------------------------------------- + * + * nbtxlog.c + * WAL replay logic for btrees. + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/nbtree/nbtxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/bufmask.h" +#include "access/nbtree.h" +#include "access/nbtxlog.h" +#include "access/transam.h" +#include "access/xlog.h" +#include "access/xlogutils.h" +#include "miscadmin.h" +#include "storage/procarray.h" +#include "utils/memutils.h" + +static MemoryContext opCtx; /* working memory for operations */ + +/* + * _bt_restore_page -- re-enter all the index tuples on a page + * + * The page is freshly init'd, and *from (length len) is a copy of what + * had been its upper part (pd_upper to pd_special). We assume that the + * tuples had been added to the page in item-number order, and therefore + * the one with highest item number appears first (lowest on the page). + */ +static void +_bt_restore_page(Page page, char *from, int len) +{ + IndexTupleData itupdata; + Size itemsz; + char *end = from + len; + Item items[MaxIndexTuplesPerPage]; + uint16 itemsizes[MaxIndexTuplesPerPage]; + int i; + int nitems; + + /* + * To get the items back in the original order, we add them to the page in + * reverse. To figure out where one tuple ends and another begins, we + * have to scan them in forward order first. + */ + i = 0; + while (from < end) + { + /* + * As we step through the items, 'from' won't always be properly + * aligned, so we need to use memcpy(). Further, we use Item (which + * is just a char*) here for our items array for the same reason; + * wouldn't want the compiler or anyone thinking that an item is + * aligned when it isn't. + */ + memcpy(&itupdata, from, sizeof(IndexTupleData)); + itemsz = IndexTupleSize(&itupdata); + itemsz = MAXALIGN(itemsz); + + items[i] = (Item) from; + itemsizes[i] = itemsz; + i++; + + from += itemsz; + } + nitems = i; + + for (i = nitems - 1; i >= 0; i--) + { + if (PageAddItem(page, items[i], itemsizes[i], nitems - i, + false, false) == InvalidOffsetNumber) + elog(PANIC, "_bt_restore_page: cannot add item to page"); + } +} + +static void +_bt_restore_meta(XLogReaderState *record, uint8 block_id) +{ + XLogRecPtr lsn = record->EndRecPtr; + Buffer metabuf; + Page metapg; + BTMetaPageData *md; + BTPageOpaque pageop; + xl_btree_metadata *xlrec; + char *ptr; + Size len; + + metabuf = XLogInitBufferForRedo(record, block_id); + ptr = XLogRecGetBlockData(record, block_id, &len); + + Assert(len == sizeof(xl_btree_metadata)); + Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE); + xlrec = (xl_btree_metadata *) ptr; + metapg = BufferGetPage(metabuf); + + _bt_pageinit(metapg, BufferGetPageSize(metabuf)); + + md = BTPageGetMeta(metapg); + md->btm_magic = BTREE_MAGIC; + md->btm_version = xlrec->version; + md->btm_root = xlrec->root; + md->btm_level = xlrec->level; + md->btm_fastroot = xlrec->fastroot; + md->btm_fastlevel = xlrec->fastlevel; + /* Cannot log BTREE_MIN_VERSION index metapage without upgrade */ + Assert(md->btm_version >= BTREE_NOVAC_VERSION); + md->btm_last_cleanup_num_delpages = xlrec->last_cleanup_num_delpages; + md->btm_last_cleanup_num_heap_tuples = -1.0; + md->btm_allequalimage = xlrec->allequalimage; + + pageop = (BTPageOpaque) PageGetSpecialPointer(metapg); + pageop->btpo_flags = BTP_META; + + /* + * Set pd_lower just past the end of the metadata. This is essential, + * because without doing so, metadata will be lost if xlog.c compresses + * the page. + */ + ((PageHeader) metapg)->pd_lower = + ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg; + + PageSetLSN(metapg, lsn); + MarkBufferDirty(metabuf); + UnlockReleaseBuffer(metabuf); +} + +/* + * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page + * + * This is a common subroutine of the redo functions of all the WAL record + * types that can insert a downlink: insert, split, and newroot. + */ +static void +_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id) +{ + XLogRecPtr lsn = record->EndRecPtr; + Buffer buf; + + if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO) + { + Page page = (Page) BufferGetPage(buf); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + Assert(P_INCOMPLETE_SPLIT(pageop)); + pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT; + + PageSetLSN(page, lsn); + MarkBufferDirty(buf); + } + if (BufferIsValid(buf)) + UnlockReleaseBuffer(buf); +} + +static void +btree_xlog_insert(bool isleaf, bool ismeta, bool posting, + XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record); + Buffer buffer; + Page page; + + /* + * Insertion to an internal page finishes an incomplete split at the child + * level. Clear the incomplete-split flag in the child. Note: during + * normal operation, the child and parent pages are locked at the same + * time (the locks are coupled), so that clearing the flag and inserting + * the downlink appear atomic to other backends. We don't bother with + * that during replay, because readers don't care about the + * incomplete-split flag and there cannot be updates happening. + */ + if (!isleaf) + _bt_clear_incomplete_split(record, 1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + Size datalen; + char *datapos = XLogRecGetBlockData(record, 0, &datalen); + + page = BufferGetPage(buffer); + + if (!posting) + { + /* Simple retail insertion */ + if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum, + false, false) == InvalidOffsetNumber) + elog(PANIC, "failed to add new item"); + } + else + { + ItemId itemid; + IndexTuple oposting, + newitem, + nposting; + uint16 postingoff; + + /* + * A posting list split occurred during leaf page insertion. WAL + * record data will start with an offset number representing the + * point in an existing posting list that a split occurs at. + * + * Use _bt_swap_posting() to repeat posting list split steps from + * primary. Note that newitem from WAL record is 'orignewitem', + * not the final version of newitem that is actually inserted on + * page. + */ + postingoff = *((uint16 *) datapos); + datapos += sizeof(uint16); + datalen -= sizeof(uint16); + + itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum)); + oposting = (IndexTuple) PageGetItem(page, itemid); + + /* Use mutable, aligned newitem copy in _bt_swap_posting() */ + Assert(isleaf && postingoff > 0); + newitem = CopyIndexTuple((IndexTuple) datapos); + nposting = _bt_swap_posting(newitem, oposting, postingoff); + + /* Replace existing posting list with post-split version */ + memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting))); + + /* Insert "final" new item (not orignewitem from WAL stream) */ + Assert(IndexTupleSize(newitem) == datalen); + if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum, + false, false) == InvalidOffsetNumber) + elog(PANIC, "failed to add posting split new item"); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* + * Note: in normal operation, we'd update the metapage while still holding + * lock on the page we inserted into. But during replay it's not + * necessary to hold that lock, since no other index updates can be + * happening concurrently, and readers will cope fine with following an + * obsolete link from the metapage. + */ + if (ismeta) + _bt_restore_meta(record, 2); +} + +static void +btree_xlog_split(bool newitemonleft, XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); + bool isleaf = (xlrec->level == 0); + Buffer buf; + Buffer rbuf; + Page rpage; + BTPageOpaque ropaque; + char *datapos; + Size datalen; + BlockNumber origpagenumber; + BlockNumber rightpagenumber; + BlockNumber spagenumber; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &origpagenumber); + XLogRecGetBlockTag(record, 1, NULL, NULL, &rightpagenumber); + if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &spagenumber)) + spagenumber = P_NONE; + + /* + * Clear the incomplete split flag on the appropriate child page one level + * down when origpage/buf is an internal page (there must have been + * cascading page splits during original execution in the event of an + * internal page split). This is like the corresponding btree_xlog_insert + * call for internal pages. We're not clearing the incomplete split flag + * for the current page split here (you can think of this as part of the + * insert of newitem that the page split action needs to perform in + * passing). + * + * Like in btree_xlog_insert, this can be done before locking other pages. + * We never need to couple cross-level locks in REDO routines. + */ + if (!isleaf) + _bt_clear_incomplete_split(record, 3); + + /* Reconstruct right (new) sibling page from scratch */ + rbuf = XLogInitBufferForRedo(record, 1); + datapos = XLogRecGetBlockData(record, 1, &datalen); + rpage = (Page) BufferGetPage(rbuf); + + _bt_pageinit(rpage, BufferGetPageSize(rbuf)); + ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage); + + ropaque->btpo_prev = origpagenumber; + ropaque->btpo_next = spagenumber; + ropaque->btpo_level = xlrec->level; + ropaque->btpo_flags = isleaf ? BTP_LEAF : 0; + ropaque->btpo_cycleid = 0; + + _bt_restore_page(rpage, datapos, datalen); + + PageSetLSN(rpage, lsn); + MarkBufferDirty(rbuf); + + /* Now reconstruct original page (left half of split) */ + if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO) + { + /* + * To retain the same physical order of the tuples that they had, we + * initialize a temporary empty page for the left page and add all the + * items to that in item number order. This mirrors how _bt_split() + * works. Retaining the same physical order makes WAL consistency + * checking possible. See also _bt_restore_page(), which does the + * same for the right page. + */ + Page origpage = (Page) BufferGetPage(buf); + BTPageOpaque oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage); + OffsetNumber off; + IndexTuple newitem = NULL, + left_hikey = NULL, + nposting = NULL; + Size newitemsz = 0, + left_hikeysz = 0; + Page leftpage; + OffsetNumber leftoff, + replacepostingoff = InvalidOffsetNumber; + + datapos = XLogRecGetBlockData(record, 0, &datalen); + + if (newitemonleft || xlrec->postingoff != 0) + { + newitem = (IndexTuple) datapos; + newitemsz = MAXALIGN(IndexTupleSize(newitem)); + datapos += newitemsz; + datalen -= newitemsz; + + if (xlrec->postingoff != 0) + { + ItemId itemid; + IndexTuple oposting; + + /* Posting list must be at offset number before new item's */ + replacepostingoff = OffsetNumberPrev(xlrec->newitemoff); + + /* Use mutable, aligned newitem copy in _bt_swap_posting() */ + newitem = CopyIndexTuple(newitem); + itemid = PageGetItemId(origpage, replacepostingoff); + oposting = (IndexTuple) PageGetItem(origpage, itemid); + nposting = _bt_swap_posting(newitem, oposting, + xlrec->postingoff); + } + } + + /* + * Extract left hikey and its size. We assume that 16-bit alignment + * is enough to apply IndexTupleSize (since it's fetching from a + * uint16 field). + */ + left_hikey = (IndexTuple) datapos; + left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey)); + datapos += left_hikeysz; + datalen -= left_hikeysz; + + Assert(datalen == 0); + + leftpage = PageGetTempPageCopySpecial(origpage); + + /* Add high key tuple from WAL record to temp page */ + leftoff = P_HIKEY; + if (PageAddItem(leftpage, (Item) left_hikey, left_hikeysz, P_HIKEY, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add high key to left page after split"); + leftoff = OffsetNumberNext(leftoff); + + for (off = P_FIRSTDATAKEY(oopaque); off < xlrec->firstrightoff; off++) + { + ItemId itemid; + Size itemsz; + IndexTuple item; + + /* Add replacement posting list when required */ + if (off == replacepostingoff) + { + Assert(newitemonleft || + xlrec->firstrightoff == xlrec->newitemoff); + if (PageAddItem(leftpage, (Item) nposting, + MAXALIGN(IndexTupleSize(nposting)), leftoff, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add new posting list item to left page after split"); + leftoff = OffsetNumberNext(leftoff); + continue; /* don't insert oposting */ + } + + /* add the new item if it was inserted on left page */ + else if (newitemonleft && off == xlrec->newitemoff) + { + if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add new item to left page after split"); + leftoff = OffsetNumberNext(leftoff); + } + + itemid = PageGetItemId(origpage, off); + itemsz = ItemIdGetLength(itemid); + item = (IndexTuple) PageGetItem(origpage, itemid); + if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add old item to left page after split"); + leftoff = OffsetNumberNext(leftoff); + } + + /* cope with possibility that newitem goes at the end */ + if (newitemonleft && off == xlrec->newitemoff) + { + if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff, + false, false) == InvalidOffsetNumber) + elog(ERROR, "failed to add new item to left page after split"); + leftoff = OffsetNumberNext(leftoff); + } + + PageRestoreTempPage(leftpage, origpage); + + /* Fix opaque fields */ + oopaque->btpo_flags = BTP_INCOMPLETE_SPLIT; + if (isleaf) + oopaque->btpo_flags |= BTP_LEAF; + oopaque->btpo_next = rightpagenumber; + oopaque->btpo_cycleid = 0; + + PageSetLSN(origpage, lsn); + MarkBufferDirty(buf); + } + + /* Fix left-link of the page to the right of the new right sibling */ + if (spagenumber != P_NONE) + { + Buffer sbuf; + + if (XLogReadBufferForRedo(record, 2, &sbuf) == BLK_NEEDS_REDO) + { + Page spage = (Page) BufferGetPage(sbuf); + BTPageOpaque spageop = (BTPageOpaque) PageGetSpecialPointer(spage); + + spageop->btpo_prev = rightpagenumber; + + PageSetLSN(spage, lsn); + MarkBufferDirty(sbuf); + } + if (BufferIsValid(sbuf)) + UnlockReleaseBuffer(sbuf); + } + + /* + * Finally, release the remaining buffers. sbuf, rbuf, and buf must be + * released together, so that readers cannot observe inconsistencies. + */ + UnlockReleaseBuffer(rbuf); + if (BufferIsValid(buf)) + UnlockReleaseBuffer(buf); +} + +static void +btree_xlog_dedup(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record); + Buffer buf; + + if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO) + { + char *ptr = XLogRecGetBlockData(record, 0, NULL); + Page page = (Page) BufferGetPage(buf); + BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); + OffsetNumber offnum, + minoff, + maxoff; + BTDedupState state; + BTDedupInterval *intervals; + Page newpage; + + state = (BTDedupState) palloc(sizeof(BTDedupStateData)); + state->deduplicate = true; /* unused */ + state->nmaxitems = 0; /* unused */ + /* Conservatively use larger maxpostingsize than primary */ + state->maxpostingsize = BTMaxItemSize(page); + state->base = NULL; + state->baseoff = InvalidOffsetNumber; + state->basetupsize = 0; + state->htids = palloc(state->maxpostingsize); + state->nhtids = 0; + state->nitems = 0; + state->phystupsize = 0; + state->nintervals = 0; + + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + newpage = PageGetTempPageCopySpecial(page); + + if (!P_RIGHTMOST(opaque)) + { + ItemId itemid = PageGetItemId(page, P_HIKEY); + Size itemsz = ItemIdGetLength(itemid); + IndexTuple item = (IndexTuple) PageGetItem(page, itemid); + + if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY, + false, false) == InvalidOffsetNumber) + elog(ERROR, "deduplication failed to add highkey"); + } + + intervals = (BTDedupInterval *) ptr; + for (offnum = minoff; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemid = PageGetItemId(page, offnum); + IndexTuple itup = (IndexTuple) PageGetItem(page, itemid); + + if (offnum == minoff) + _bt_dedup_start_pending(state, itup, offnum); + else if (state->nintervals < xlrec->nintervals && + state->baseoff == intervals[state->nintervals].baseoff && + state->nitems < intervals[state->nintervals].nitems) + { + if (!_bt_dedup_save_htid(state, itup)) + elog(ERROR, "deduplication failed to add heap tid to pending posting list"); + } + else + { + _bt_dedup_finish_pending(newpage, state); + _bt_dedup_start_pending(state, itup, offnum); + } + } + + _bt_dedup_finish_pending(newpage, state); + Assert(state->nintervals == xlrec->nintervals); + Assert(memcmp(state->intervals, intervals, + state->nintervals * sizeof(BTDedupInterval)) == 0); + + if (P_HAS_GARBAGE(opaque)) + { + BTPageOpaque nopaque = (BTPageOpaque) PageGetSpecialPointer(newpage); + + nopaque->btpo_flags &= ~BTP_HAS_GARBAGE; + } + + PageRestoreTempPage(newpage, page); + PageSetLSN(page, lsn); + MarkBufferDirty(buf); + } + + if (BufferIsValid(buf)) + UnlockReleaseBuffer(buf); +} + +static void +btree_xlog_updates(Page page, OffsetNumber *updatedoffsets, + xl_btree_update *updates, int nupdated) +{ + BTVacuumPosting vacposting; + IndexTuple origtuple; + ItemId itemid; + Size itemsz; + + for (int i = 0; i < nupdated; i++) + { + itemid = PageGetItemId(page, updatedoffsets[i]); + origtuple = (IndexTuple) PageGetItem(page, itemid); + + vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) + + updates->ndeletedtids * sizeof(uint16)); + vacposting->updatedoffset = updatedoffsets[i]; + vacposting->itup = origtuple; + vacposting->ndeletedtids = updates->ndeletedtids; + memcpy(vacposting->deletetids, + (char *) updates + SizeOfBtreeUpdate, + updates->ndeletedtids * sizeof(uint16)); + + _bt_update_posting(vacposting); + + /* Overwrite updated version of tuple */ + itemsz = MAXALIGN(IndexTupleSize(vacposting->itup)); + if (!PageIndexTupleOverwrite(page, updatedoffsets[i], + (Item) vacposting->itup, itemsz)) + elog(PANIC, "failed to update partially dead item"); + + pfree(vacposting->itup); + pfree(vacposting); + + /* advance to next xl_btree_update from array */ + updates = (xl_btree_update *) + ((char *) updates + SizeOfBtreeUpdate + + updates->ndeletedtids * sizeof(uint16)); + } +} + +static void +btree_xlog_vacuum(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record); + Buffer buffer; + Page page; + BTPageOpaque opaque; + + /* + * We need to take a cleanup lock here, just like btvacuumpage(). However, + * it isn't necessary to exhaustively get a cleanup lock on every block in + * the index during recovery (just getting a cleanup lock on pages with + * items to kill suffices). See nbtree/README for details. + */ + if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer) + == BLK_NEEDS_REDO) + { + char *ptr = XLogRecGetBlockData(record, 0, NULL); + + page = (Page) BufferGetPage(buffer); + + if (xlrec->nupdated > 0) + { + OffsetNumber *updatedoffsets; + xl_btree_update *updates; + + updatedoffsets = (OffsetNumber *) + (ptr + xlrec->ndeleted * sizeof(OffsetNumber)); + updates = (xl_btree_update *) ((char *) updatedoffsets + + xlrec->nupdated * + sizeof(OffsetNumber)); + + btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated); + } + + if (xlrec->ndeleted > 0) + PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted); + + /* + * Mark the page as not containing any LP_DEAD items --- see comments + * in _bt_delitems_vacuum(). + */ + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + opaque->btpo_flags &= ~BTP_HAS_GARBAGE; + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +btree_xlog_delete(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record); + Buffer buffer; + Page page; + BTPageOpaque opaque; + + /* + * If we have any conflict processing to do, it must happen before we + * update the page + */ + if (InHotStandby) + { + RelFileNode rnode; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); + + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + } + + /* + * We don't need to take a cleanup lock to apply these changes. See + * nbtree/README for details. + */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + char *ptr = XLogRecGetBlockData(record, 0, NULL); + + page = (Page) BufferGetPage(buffer); + + if (xlrec->nupdated > 0) + { + OffsetNumber *updatedoffsets; + xl_btree_update *updates; + + updatedoffsets = (OffsetNumber *) + (ptr + xlrec->ndeleted * sizeof(OffsetNumber)); + updates = (xl_btree_update *) ((char *) updatedoffsets + + xlrec->nupdated * + sizeof(OffsetNumber)); + + btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated); + } + + if (xlrec->ndeleted > 0) + PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted); + + /* Mark the page as not containing any LP_DEAD items */ + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + opaque->btpo_flags &= ~BTP_HAS_GARBAGE; + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record); + Buffer buffer; + Page page; + BTPageOpaque pageop; + IndexTupleData trunctuple; + + /* + * In normal operation, we would lock all the pages this WAL record + * touches before changing any of them. In WAL replay, it should be okay + * to lock just one page at a time, since no concurrent index updates can + * be happening, and readers should not care whether they arrive at the + * target page or not (since it's surely empty). + */ + + /* to-be-deleted subtree's parent page */ + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) + { + OffsetNumber poffset; + ItemId itemid; + IndexTuple itup; + OffsetNumber nextoffset; + BlockNumber rightsib; + + page = (Page) BufferGetPage(buffer); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + poffset = xlrec->poffset; + + nextoffset = OffsetNumberNext(poffset); + itemid = PageGetItemId(page, nextoffset); + itup = (IndexTuple) PageGetItem(page, itemid); + rightsib = BTreeTupleGetDownLink(itup); + + itemid = PageGetItemId(page, poffset); + itup = (IndexTuple) PageGetItem(page, itemid); + BTreeTupleSetDownLink(itup, rightsib); + nextoffset = OffsetNumberNext(poffset); + PageIndexTupleDelete(page, nextoffset); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + + /* + * Don't need to couple cross-level locks in REDO routines, so release + * lock on internal page immediately + */ + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* Rewrite the leaf page as a halfdead page */ + buffer = XLogInitBufferForRedo(record, 0); + page = (Page) BufferGetPage(buffer); + + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_prev = xlrec->leftblk; + pageop->btpo_next = xlrec->rightblk; + pageop->btpo_level = 0; + pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF; + pageop->btpo_cycleid = 0; + + /* + * Construct a dummy high key item that points to top parent page (value + * is InvalidBlockNumber when the top parent page is the leaf page itself) + */ + MemSet(&trunctuple, 0, sizeof(IndexTupleData)); + trunctuple.t_info = sizeof(IndexTupleData); + BTreeTupleSetTopParent(&trunctuple, xlrec->topparent); + + if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, + false, false) == InvalidOffsetNumber) + elog(ERROR, "could not add dummy high key to half-dead page"); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); +} + + +static void +btree_xlog_unlink_page(uint8 info, XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record); + BlockNumber leftsib; + BlockNumber rightsib; + uint32 level; + bool isleaf; + FullTransactionId safexid; + Buffer leftbuf; + Buffer target; + Buffer rightbuf; + Page page; + BTPageOpaque pageop; + + leftsib = xlrec->leftsib; + rightsib = xlrec->rightsib; + level = xlrec->level; + isleaf = (level == 0); + safexid = xlrec->safexid; + + /* No leaftopparent for level 0 (leaf page) or level 1 target */ + Assert(!BlockNumberIsValid(xlrec->leaftopparent) || level > 1); + + /* + * In normal operation, we would lock all the pages this WAL record + * touches before changing any of them. In WAL replay, we at least lock + * the pages in the same standard left-to-right order (leftsib, target, + * rightsib), and don't release the sibling locks until the target is + * marked deleted. + */ + + /* Fix right-link of left sibling, if any */ + if (leftsib != P_NONE) + { + if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO) + { + page = (Page) BufferGetPage(leftbuf); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_next = rightsib; + + PageSetLSN(page, lsn); + MarkBufferDirty(leftbuf); + } + } + else + leftbuf = InvalidBuffer; + + /* Rewrite target page as empty deleted page */ + target = XLogInitBufferForRedo(record, 0); + page = (Page) BufferGetPage(target); + + _bt_pageinit(page, BufferGetPageSize(target)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_prev = leftsib; + pageop->btpo_next = rightsib; + pageop->btpo_level = level; + BTPageSetDeleted(page, safexid); + if (isleaf) + pageop->btpo_flags |= BTP_LEAF; + pageop->btpo_cycleid = 0; + + PageSetLSN(page, lsn); + MarkBufferDirty(target); + + /* Fix left-link of right sibling */ + if (XLogReadBufferForRedo(record, 2, &rightbuf) == BLK_NEEDS_REDO) + { + page = (Page) BufferGetPage(rightbuf); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + pageop->btpo_prev = leftsib; + + PageSetLSN(page, lsn); + MarkBufferDirty(rightbuf); + } + + /* Release siblings */ + if (BufferIsValid(leftbuf)) + UnlockReleaseBuffer(leftbuf); + if (BufferIsValid(rightbuf)) + UnlockReleaseBuffer(rightbuf); + + /* Release target */ + UnlockReleaseBuffer(target); + + /* + * If we deleted a parent of the targeted leaf page, instead of the leaf + * itself, update the leaf to point to the next remaining child in the + * to-be-deleted subtree + */ + if (XLogRecHasBlockRef(record, 3)) + { + /* + * There is no real data on the page, so we just re-create it from + * scratch using the information from the WAL record. + * + * Note that we don't end up here when the target page is also the + * leafbuf page. There is no need to add a dummy hikey item with a + * top parent link when deleting leafbuf because it's the last page + * we'll delete in the subtree undergoing deletion. + */ + Buffer leafbuf; + IndexTupleData trunctuple; + + Assert(!isleaf); + + leafbuf = XLogInitBufferForRedo(record, 3); + page = (Page) BufferGetPage(leafbuf); + + _bt_pageinit(page, BufferGetPageSize(leafbuf)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF; + pageop->btpo_prev = xlrec->leafleftsib; + pageop->btpo_next = xlrec->leafrightsib; + pageop->btpo_level = 0; + pageop->btpo_cycleid = 0; + + /* Add a dummy hikey item */ + MemSet(&trunctuple, 0, sizeof(IndexTupleData)); + trunctuple.t_info = sizeof(IndexTupleData); + BTreeTupleSetTopParent(&trunctuple, xlrec->leaftopparent); + + if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, + false, false) == InvalidOffsetNumber) + elog(ERROR, "could not add dummy high key to half-dead page"); + + PageSetLSN(page, lsn); + MarkBufferDirty(leafbuf); + UnlockReleaseBuffer(leafbuf); + } + + /* Update metapage if needed */ + if (info == XLOG_BTREE_UNLINK_PAGE_META) + _bt_restore_meta(record, 4); +} + +static void +btree_xlog_newroot(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record); + Buffer buffer; + Page page; + BTPageOpaque pageop; + char *ptr; + Size len; + + buffer = XLogInitBufferForRedo(record, 0); + page = (Page) BufferGetPage(buffer); + + _bt_pageinit(page, BufferGetPageSize(buffer)); + pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + pageop->btpo_flags = BTP_ROOT; + pageop->btpo_prev = pageop->btpo_next = P_NONE; + pageop->btpo_level = xlrec->level; + if (xlrec->level == 0) + pageop->btpo_flags |= BTP_LEAF; + pageop->btpo_cycleid = 0; + + if (xlrec->level > 0) + { + ptr = XLogRecGetBlockData(record, 0, &len); + _bt_restore_page(page, ptr, len); + + /* Clear the incomplete-split flag in left child */ + _bt_clear_incomplete_split(record, 1); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); + + _bt_restore_meta(record, 2); +} + +/* + * In general VACUUM must defer recycling as a way of avoiding certain race + * conditions. Deleted pages contain a safexid value that is used by VACUUM + * to determine whether or not it's safe to place a page that was deleted by + * VACUUM earlier into the FSM now. See nbtree/README. + * + * As far as any backend operating during original execution is concerned, the + * FSM is a cache of recycle-safe pages; the mere presence of the page in the + * FSM indicates that the page must already be safe to recycle (actually, + * _bt_getbuf() verifies it's safe using BTPageIsRecyclable(), but that's just + * because it would be unwise to completely trust the FSM, given its current + * limitations). + * + * This isn't sufficient to prevent similar concurrent recycling race + * conditions during Hot Standby, though. For that we need to log a + * xl_btree_reuse_page record at the point that a page is actually recycled + * and reused for an entirely unrelated page inside _bt_split(). These + * records include the same safexid value from the original deleted page, + * stored in the record's latestRemovedFullXid field. + * + * The GlobalVisCheckRemovableFullXid() test in BTPageIsRecyclable() is used + * to determine if it's safe to recycle a page. This mirrors our own test: + * the PGPROC->xmin > limitXmin test inside GetConflictingVirtualXIDs(). + * Consequently, one XID value achieves the same exclusion effect on primary + * and standby. + */ +static void +btree_xlog_reuse_page(XLogReaderState *record) +{ + xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record); + + if (InHotStandby) + ResolveRecoveryConflictWithSnapshotFullXid(xlrec->latestRemovedFullXid, + xlrec->node); +} + +void +btree_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + MemoryContext oldCtx; + + oldCtx = MemoryContextSwitchTo(opCtx); + switch (info) + { + case XLOG_BTREE_INSERT_LEAF: + btree_xlog_insert(true, false, false, record); + break; + case XLOG_BTREE_INSERT_UPPER: + btree_xlog_insert(false, false, false, record); + break; + case XLOG_BTREE_INSERT_META: + btree_xlog_insert(false, true, false, record); + break; + case XLOG_BTREE_SPLIT_L: + btree_xlog_split(true, record); + break; + case XLOG_BTREE_SPLIT_R: + btree_xlog_split(false, record); + break; + case XLOG_BTREE_INSERT_POST: + btree_xlog_insert(true, false, true, record); + break; + case XLOG_BTREE_DEDUP: + btree_xlog_dedup(record); + break; + case XLOG_BTREE_VACUUM: + btree_xlog_vacuum(record); + break; + case XLOG_BTREE_DELETE: + btree_xlog_delete(record); + break; + case XLOG_BTREE_MARK_PAGE_HALFDEAD: + btree_xlog_mark_page_halfdead(info, record); + break; + case XLOG_BTREE_UNLINK_PAGE: + case XLOG_BTREE_UNLINK_PAGE_META: + btree_xlog_unlink_page(info, record); + break; + case XLOG_BTREE_NEWROOT: + btree_xlog_newroot(record); + break; + case XLOG_BTREE_REUSE_PAGE: + btree_xlog_reuse_page(record); + break; + case XLOG_BTREE_META_CLEANUP: + _bt_restore_meta(record, 0); + break; + default: + elog(PANIC, "btree_redo: unknown op code %u", info); + } + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(opCtx); +} + +void +btree_xlog_startup(void) +{ + opCtx = AllocSetContextCreate(CurrentMemoryContext, + "Btree recovery temporary context", + ALLOCSET_DEFAULT_SIZES); +} + +void +btree_xlog_cleanup(void) +{ + MemoryContextDelete(opCtx); + opCtx = NULL; +} + +/* + * Mask a btree page before performing consistency checks on it. + */ +void +btree_mask(char *pagedata, BlockNumber blkno) +{ + Page page = (Page) pagedata; + BTPageOpaque maskopaq; + + mask_page_lsn_and_checksum(page); + + mask_page_hint_bits(page); + mask_unused_space(page); + + maskopaq = (BTPageOpaque) PageGetSpecialPointer(page); + + if (P_ISLEAF(maskopaq)) + { + /* + * In btree leaf pages, it is possible to modify the LP_FLAGS without + * emitting any WAL record. Hence, mask the line pointer flags. See + * _bt_killitems(), _bt_check_unique() for details. + */ + mask_lp_flags(page); + } + + /* + * BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See + * _bt_delete_or_dedup_one_page(), _bt_killitems(), and _bt_check_unique() + * for details. + */ + maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE; + + /* + * During replay of a btree page split, we don't set the BTP_SPLIT_END + * flag of the right sibling and initialize the cycle_id to 0 for the same + * page. See btree_xlog_split() for details. + */ + maskopaq->btpo_flags &= ~BTP_SPLIT_END; + maskopaq->btpo_cycleid = 0; +} |