summaryrefslogtreecommitdiffstats
path: root/src/backend/access/hash/hash_xlog.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/access/hash/hash_xlog.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/access/hash/hash_xlog.c')
-rw-r--r--src/backend/access/hash/hash_xlog.c1145
1 files changed, 1145 insertions, 0 deletions
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
new file mode 100644
index 0000000..af35a99
--- /dev/null
+++ b/src/backend/access/hash/hash_xlog.c
@@ -0,0 +1,1145 @@
+/*-------------------------------------------------------------------------
+ *
+ * hash_xlog.c
+ * WAL replay logic for hash index.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hash_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "access/transam.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "miscadmin.h"
+#include "storage/procarray.h"
+
+/*
+ * replay a hash index meta page
+ */
+static void
+hash_xlog_init_meta_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ Page page;
+ Buffer metabuf;
+ ForkNumber forknum;
+
+ xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
+
+ /* create the index' metapage */
+ metabuf = XLogInitBufferForRedo(record, 0);
+ Assert(BufferIsValid(metabuf));
+ _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
+ xlrec->ffactor, true);
+ page = (Page) BufferGetPage(metabuf);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+
+ /*
+ * Force the on-disk state of init forks to always be in sync with the
+ * state in shared buffers. See XLogReadBufferForRedoExtended. We need
+ * special handling for init forks as create index operations don't log a
+ * full page image of the metapage.
+ */
+ XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
+ if (forknum == INIT_FORKNUM)
+ FlushOneBuffer(metabuf);
+
+ /* all done */
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay a hash index bitmap page
+ */
+static void
+hash_xlog_init_bitmap_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ Buffer bitmapbuf;
+ Buffer metabuf;
+ Page page;
+ HashMetaPage metap;
+ uint32 num_buckets;
+ ForkNumber forknum;
+
+ xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
+
+ /*
+ * Initialize bitmap page
+ */
+ bitmapbuf = XLogInitBufferForRedo(record, 0);
+ _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
+ PageSetLSN(BufferGetPage(bitmapbuf), lsn);
+ MarkBufferDirty(bitmapbuf);
+
+ /*
+ * Force the on-disk state of init forks to always be in sync with the
+ * state in shared buffers. See XLogReadBufferForRedoExtended. We need
+ * special handling for init forks as create index operations don't log a
+ * full page image of the metapage.
+ */
+ XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
+ if (forknum == INIT_FORKNUM)
+ FlushOneBuffer(bitmapbuf);
+ UnlockReleaseBuffer(bitmapbuf);
+
+ /* add the new bitmap page to the metapage's list of bitmaps */
+ if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+ {
+ /*
+ * Note: in normal operation, we'd update the metapage while still
+ * holding lock on the bitmap page. But during replay it's not
+ * necessary to hold that lock, since nobody can see it yet; the
+ * creating transaction hasn't yet committed.
+ */
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+
+ num_buckets = metap->hashm_maxbucket + 1;
+ metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
+ metap->hashm_nmaps++;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+
+ XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL);
+ if (forknum == INIT_FORKNUM)
+ FlushOneBuffer(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay a hash index insert without split
+ */
+static void
+hash_xlog_insert(XLogReaderState *record)
+{
+ HashMetaPage metap;
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ Size datalen;
+ char *datapos = XLogRecGetBlockData(record, 0, &datalen);
+
+ page = BufferGetPage(buffer);
+
+ if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "hash_xlog_insert: failed to add item");
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
+ {
+ /*
+ * Note: in normal operation, we'd update the metapage while still
+ * holding lock on the page we inserted into. But during replay it's
+ * not necessary to hold that lock, since no other index updates can
+ * be happening concurrently.
+ */
+ page = BufferGetPage(buffer);
+ metap = HashPageGetMeta(page);
+ metap->hashm_ntuples += 1;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * replay addition of overflow page for hash index
+ */
+static void
+hash_xlog_add_ovfl_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
+ Buffer leftbuf;
+ Buffer ovflbuf;
+ Buffer metabuf;
+ BlockNumber leftblk;
+ BlockNumber rightblk;
+ BlockNumber newmapblk = InvalidBlockNumber;
+ Page ovflpage;
+ HashPageOpaque ovflopaque;
+ uint32 *num_bucket;
+ char *data;
+ Size datalen PG_USED_FOR_ASSERTS_ONLY;
+ bool new_bmpage = false;
+
+ XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
+ XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
+
+ ovflbuf = XLogInitBufferForRedo(record, 0);
+ Assert(BufferIsValid(ovflbuf));
+
+ data = XLogRecGetBlockData(record, 0, &datalen);
+ num_bucket = (uint32 *) data;
+ Assert(datalen == sizeof(uint32));
+ _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
+ true);
+ /* update backlink */
+ ovflpage = BufferGetPage(ovflbuf);
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+ ovflopaque->hasho_prevblkno = leftblk;
+
+ PageSetLSN(ovflpage, lsn);
+ MarkBufferDirty(ovflbuf);
+
+ if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
+ {
+ Page leftpage;
+ HashPageOpaque leftopaque;
+
+ leftpage = BufferGetPage(leftbuf);
+ leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
+ leftopaque->hasho_nextblkno = rightblk;
+
+ PageSetLSN(leftpage, lsn);
+ MarkBufferDirty(leftbuf);
+ }
+
+ if (BufferIsValid(leftbuf))
+ UnlockReleaseBuffer(leftbuf);
+ UnlockReleaseBuffer(ovflbuf);
+
+ /*
+ * Note: in normal operation, we'd update the bitmap and meta page while
+ * still holding lock on the overflow pages. But during replay it's not
+ * necessary to hold those locks, since no other index updates can be
+ * happening concurrently.
+ */
+ if (XLogRecHasBlockRef(record, 2))
+ {
+ Buffer mapbuffer;
+
+ if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
+ {
+ Page mappage = (Page) BufferGetPage(mapbuffer);
+ uint32 *freep = NULL;
+ char *data;
+ uint32 *bitmap_page_bit;
+
+ freep = HashPageGetBitmap(mappage);
+
+ data = XLogRecGetBlockData(record, 2, &datalen);
+ bitmap_page_bit = (uint32 *) data;
+
+ SETBIT(freep, *bitmap_page_bit);
+
+ PageSetLSN(mappage, lsn);
+ MarkBufferDirty(mapbuffer);
+ }
+ if (BufferIsValid(mapbuffer))
+ UnlockReleaseBuffer(mapbuffer);
+ }
+
+ if (XLogRecHasBlockRef(record, 3))
+ {
+ Buffer newmapbuf;
+
+ newmapbuf = XLogInitBufferForRedo(record, 3);
+
+ _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
+
+ new_bmpage = true;
+ newmapblk = BufferGetBlockNumber(newmapbuf);
+
+ MarkBufferDirty(newmapbuf);
+ PageSetLSN(BufferGetPage(newmapbuf), lsn);
+
+ UnlockReleaseBuffer(newmapbuf);
+ }
+
+ if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
+ {
+ HashMetaPage metap;
+ Page page;
+ uint32 *firstfree_ovflpage;
+
+ data = XLogRecGetBlockData(record, 4, &datalen);
+ firstfree_ovflpage = (uint32 *) data;
+
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+ metap->hashm_firstfree = *firstfree_ovflpage;
+
+ if (!xlrec->bmpage_found)
+ {
+ metap->hashm_spares[metap->hashm_ovflpoint]++;
+
+ if (new_bmpage)
+ {
+ Assert(BlockNumberIsValid(newmapblk));
+
+ metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
+ metap->hashm_nmaps++;
+ metap->hashm_spares[metap->hashm_ovflpoint]++;
+ }
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay allocation of page for split operation
+ */
+static void
+hash_xlog_split_allocate_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
+ Buffer oldbuf;
+ Buffer newbuf;
+ Buffer metabuf;
+ Size datalen PG_USED_FOR_ASSERTS_ONLY;
+ char *data;
+ XLogRedoAction action;
+
+ /*
+ * To be consistent with normal operation, here we take cleanup locks on
+ * both the old and new buckets even though there can't be any concurrent
+ * inserts.
+ */
+
+ /* replay the record for old bucket */
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
+
+ /*
+ * Note that we still update the page even if it was restored from a full
+ * page image, because the special space is not included in the image.
+ */
+ if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+ {
+ Page oldpage;
+ HashPageOpaque oldopaque;
+
+ oldpage = BufferGetPage(oldbuf);
+ oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
+
+ oldopaque->hasho_flag = xlrec->old_bucket_flag;
+ oldopaque->hasho_prevblkno = xlrec->new_bucket;
+
+ PageSetLSN(oldpage, lsn);
+ MarkBufferDirty(oldbuf);
+ }
+
+ /* replay the record for new bucket */
+ newbuf = XLogInitBufferForRedo(record, 1);
+ _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
+ xlrec->new_bucket_flag, true);
+ if (!IsBufferCleanupOK(newbuf))
+ elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
+ MarkBufferDirty(newbuf);
+ PageSetLSN(BufferGetPage(newbuf), lsn);
+
+ /*
+ * We can release the lock on old bucket early as well but doing here to
+ * consistent with normal operation.
+ */
+ if (BufferIsValid(oldbuf))
+ UnlockReleaseBuffer(oldbuf);
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+
+ /*
+ * Note: in normal operation, we'd update the meta page while still
+ * holding lock on the old and new bucket pages. But during replay it's
+ * not necessary to hold those locks, since no other bucket splits can be
+ * happening concurrently.
+ */
+
+ /* replay the record for metapage changes */
+ if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
+ {
+ Page page;
+ HashMetaPage metap;
+
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+ metap->hashm_maxbucket = xlrec->new_bucket;
+
+ data = XLogRecGetBlockData(record, 2, &datalen);
+
+ if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
+ {
+ uint32 lowmask;
+ uint32 *highmask;
+
+ /* extract low and high masks. */
+ memcpy(&lowmask, data, sizeof(uint32));
+ highmask = (uint32 *) ((char *) data + sizeof(uint32));
+
+ /* update metapage */
+ metap->hashm_lowmask = lowmask;
+ metap->hashm_highmask = *highmask;
+
+ data += sizeof(uint32) * 2;
+ }
+
+ if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
+ {
+ uint32 ovflpoint;
+ uint32 *ovflpages;
+
+ /* extract information of overflow pages. */
+ memcpy(&ovflpoint, data, sizeof(uint32));
+ ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
+
+ /* update metapage */
+ metap->hashm_spares[ovflpoint] = *ovflpages;
+ metap->hashm_ovflpoint = ovflpoint;
+ }
+
+ MarkBufferDirty(metabuf);
+ PageSetLSN(BufferGetPage(metabuf), lsn);
+ }
+
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay of split operation
+ */
+static void
+hash_xlog_split_page(XLogReaderState *record)
+{
+ Buffer buf;
+
+ if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
+ elog(ERROR, "Hash split record did not contain a full-page image");
+
+ UnlockReleaseBuffer(buf);
+}
+
+/*
+ * replay completion of split operation
+ */
+static void
+hash_xlog_split_complete(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
+ Buffer oldbuf;
+ Buffer newbuf;
+ XLogRedoAction action;
+
+ /* replay the record for old bucket */
+ action = XLogReadBufferForRedo(record, 0, &oldbuf);
+
+ /*
+ * Note that we still update the page even if it was restored from a full
+ * page image, because the bucket flag is not included in the image.
+ */
+ if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+ {
+ Page oldpage;
+ HashPageOpaque oldopaque;
+
+ oldpage = BufferGetPage(oldbuf);
+ oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
+
+ oldopaque->hasho_flag = xlrec->old_bucket_flag;
+
+ PageSetLSN(oldpage, lsn);
+ MarkBufferDirty(oldbuf);
+ }
+ if (BufferIsValid(oldbuf))
+ UnlockReleaseBuffer(oldbuf);
+
+ /* replay the record for new bucket */
+ action = XLogReadBufferForRedo(record, 1, &newbuf);
+
+ /*
+ * Note that we still update the page even if it was restored from a full
+ * page image, because the bucket flag is not included in the image.
+ */
+ if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+ {
+ Page newpage;
+ HashPageOpaque nopaque;
+
+ newpage = BufferGetPage(newbuf);
+ nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
+
+ nopaque->hasho_flag = xlrec->new_bucket_flag;
+
+ PageSetLSN(newpage, lsn);
+ MarkBufferDirty(newbuf);
+ }
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+}
+
+/*
+ * replay move of page contents for squeeze operation of hash index
+ */
+static void
+hash_xlog_move_page_contents(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer writebuf = InvalidBuffer;
+ Buffer deletebuf = InvalidBuffer;
+ XLogRedoAction action;
+
+ /*
+ * Ensure we have a cleanup lock on primary bucket page before we start
+ * with the actual replay operation. This is to ensure that neither a
+ * scan can start nor a scan can be already-in-progress during the replay
+ * of this operation. If we allow scans during this operation, then they
+ * can miss some records or show the same record multiple times.
+ */
+ if (xldata->is_prim_bucket_same_wrt)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
+ else
+ {
+ /*
+ * we don't care for return value as the purpose of reading bucketbuf
+ * is to ensure a cleanup lock on primary bucket page.
+ */
+ (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &writebuf);
+ }
+
+ /* replay the record for adding entries in overflow buffer */
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page writepage;
+ char *begin;
+ char *data;
+ Size datalen;
+ uint16 ninserted = 0;
+
+ data = begin = XLogRecGetBlockData(record, 1, &datalen);
+
+ writepage = (Page) BufferGetPage(writebuf);
+
+ if (xldata->ntups > 0)
+ {
+ OffsetNumber *towrite = (OffsetNumber *) data;
+
+ data += sizeof(OffsetNumber) * xldata->ntups;
+
+ while (data - begin < datalen)
+ {
+ IndexTuple itup = (IndexTuple) data;
+ Size itemsz;
+ OffsetNumber l;
+
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz);
+
+ data += itemsz;
+
+ l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
+ (int) itemsz);
+
+ ninserted++;
+ }
+ }
+
+ /*
+ * number of tuples inserted must be same as requested in REDO record.
+ */
+ Assert(ninserted == xldata->ntups);
+
+ PageSetLSN(writepage, lsn);
+ MarkBufferDirty(writebuf);
+ }
+
+ /* replay the record for deleting entries from overflow buffer */
+ if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
+ {
+ Page page;
+ char *ptr;
+ Size len;
+
+ ptr = XLogRecGetBlockData(record, 2, &len);
+
+ page = (Page) BufferGetPage(deletebuf);
+
+ if (len > 0)
+ {
+ OffsetNumber *unused;
+ OffsetNumber *unend;
+
+ unused = (OffsetNumber *) ptr;
+ unend = (OffsetNumber *) ((char *) ptr + len);
+
+ if ((unend - unused) > 0)
+ PageIndexMultiDelete(page, unused, unend - unused);
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(deletebuf);
+ }
+
+ /*
+ * Replay is complete, now we can release the buffers. We release locks at
+ * end of replay operation to ensure that we hold lock on primary bucket
+ * page till end of operation. We can optimize by releasing the lock on
+ * write buffer as soon as the operation for same is complete, if it is
+ * not same as primary bucket page, but that doesn't seem to be worth
+ * complicating the code.
+ */
+ if (BufferIsValid(deletebuf))
+ UnlockReleaseBuffer(deletebuf);
+
+ if (BufferIsValid(writebuf))
+ UnlockReleaseBuffer(writebuf);
+
+ if (BufferIsValid(bucketbuf))
+ UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay squeeze page operation of hash index
+ */
+static void
+hash_xlog_squeeze_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer writebuf;
+ Buffer ovflbuf;
+ Buffer prevbuf = InvalidBuffer;
+ Buffer mapbuf;
+ XLogRedoAction action;
+
+ /*
+ * Ensure we have a cleanup lock on primary bucket page before we start
+ * with the actual replay operation. This is to ensure that neither a
+ * scan can start nor a scan can be already-in-progress during the replay
+ * of this operation. If we allow scans during this operation, then they
+ * can miss some records or show the same record multiple times.
+ */
+ if (xldata->is_prim_bucket_same_wrt)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
+ else
+ {
+ /*
+ * we don't care for return value as the purpose of reading bucketbuf
+ * is to ensure a cleanup lock on primary bucket page.
+ */
+ (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &writebuf);
+ }
+
+ /* replay the record for adding entries in overflow buffer */
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page writepage;
+ char *begin;
+ char *data;
+ Size datalen;
+ uint16 ninserted = 0;
+
+ data = begin = XLogRecGetBlockData(record, 1, &datalen);
+
+ writepage = (Page) BufferGetPage(writebuf);
+
+ if (xldata->ntups > 0)
+ {
+ OffsetNumber *towrite = (OffsetNumber *) data;
+
+ data += sizeof(OffsetNumber) * xldata->ntups;
+
+ while (data - begin < datalen)
+ {
+ IndexTuple itup = (IndexTuple) data;
+ Size itemsz;
+ OffsetNumber l;
+
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz);
+
+ data += itemsz;
+
+ l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
+ (int) itemsz);
+
+ ninserted++;
+ }
+ }
+
+ /*
+ * number of tuples inserted must be same as requested in REDO record.
+ */
+ Assert(ninserted == xldata->ntups);
+
+ /*
+ * if the page on which are adding tuples is a page previous to freed
+ * overflow page, then update its nextblkno.
+ */
+ if (xldata->is_prev_bucket_same_wrt)
+ {
+ HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
+
+ writeopaque->hasho_nextblkno = xldata->nextblkno;
+ }
+
+ PageSetLSN(writepage, lsn);
+ MarkBufferDirty(writebuf);
+ }
+
+ /* replay the record for initializing overflow buffer */
+ if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
+ {
+ Page ovflpage;
+ HashPageOpaque ovflopaque;
+
+ ovflpage = BufferGetPage(ovflbuf);
+
+ _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
+
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+
+ ovflopaque->hasho_prevblkno = InvalidBlockNumber;
+ ovflopaque->hasho_nextblkno = InvalidBlockNumber;
+ ovflopaque->hasho_bucket = -1;
+ ovflopaque->hasho_flag = LH_UNUSED_PAGE;
+ ovflopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ PageSetLSN(ovflpage, lsn);
+ MarkBufferDirty(ovflbuf);
+ }
+ if (BufferIsValid(ovflbuf))
+ UnlockReleaseBuffer(ovflbuf);
+
+ /* replay the record for page previous to the freed overflow page */
+ if (!xldata->is_prev_bucket_same_wrt &&
+ XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
+ {
+ Page prevpage = BufferGetPage(prevbuf);
+ HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+ prevopaque->hasho_nextblkno = xldata->nextblkno;
+
+ PageSetLSN(prevpage, lsn);
+ MarkBufferDirty(prevbuf);
+ }
+ if (BufferIsValid(prevbuf))
+ UnlockReleaseBuffer(prevbuf);
+
+ /* replay the record for page next to the freed overflow page */
+ if (XLogRecHasBlockRef(record, 4))
+ {
+ Buffer nextbuf;
+
+ if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
+ {
+ Page nextpage = BufferGetPage(nextbuf);
+ HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
+
+ nextopaque->hasho_prevblkno = xldata->prevblkno;
+
+ PageSetLSN(nextpage, lsn);
+ MarkBufferDirty(nextbuf);
+ }
+ if (BufferIsValid(nextbuf))
+ UnlockReleaseBuffer(nextbuf);
+ }
+
+ if (BufferIsValid(writebuf))
+ UnlockReleaseBuffer(writebuf);
+
+ if (BufferIsValid(bucketbuf))
+ UnlockReleaseBuffer(bucketbuf);
+
+ /*
+ * Note: in normal operation, we'd update the bitmap and meta page while
+ * still holding lock on the primary bucket page and overflow pages. But
+ * during replay it's not necessary to hold those locks, since no other
+ * index updates can be happening concurrently.
+ */
+ /* replay the record for bitmap page */
+ if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
+ {
+ Page mappage = (Page) BufferGetPage(mapbuf);
+ uint32 *freep = NULL;
+ char *data;
+ uint32 *bitmap_page_bit;
+ Size datalen;
+
+ freep = HashPageGetBitmap(mappage);
+
+ data = XLogRecGetBlockData(record, 5, &datalen);
+ bitmap_page_bit = (uint32 *) data;
+
+ CLRBIT(freep, *bitmap_page_bit);
+
+ PageSetLSN(mappage, lsn);
+ MarkBufferDirty(mapbuf);
+ }
+ if (BufferIsValid(mapbuf))
+ UnlockReleaseBuffer(mapbuf);
+
+ /* replay the record for meta page */
+ if (XLogRecHasBlockRef(record, 6))
+ {
+ Buffer metabuf;
+
+ if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
+ {
+ HashMetaPage metap;
+ Page page;
+ char *data;
+ uint32 *firstfree_ovflpage;
+ Size datalen;
+
+ data = XLogRecGetBlockData(record, 6, &datalen);
+ firstfree_ovflpage = (uint32 *) data;
+
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+ metap->hashm_firstfree = *firstfree_ovflpage;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+ }
+}
+
+/*
+ * replay delete operation of hash index
+ */
+static void
+hash_xlog_delete(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer deletebuf;
+ Page page;
+ XLogRedoAction action;
+
+ /*
+ * Ensure we have a cleanup lock on primary bucket page before we start
+ * with the actual replay operation. This is to ensure that neither a
+ * scan can start nor a scan can be already-in-progress during the replay
+ * of this operation. If we allow scans during this operation, then they
+ * can miss some records or show the same record multiple times.
+ */
+ if (xldata->is_primary_bucket_page)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
+ else
+ {
+ /*
+ * we don't care for return value as the purpose of reading bucketbuf
+ * is to ensure a cleanup lock on primary bucket page.
+ */
+ (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &deletebuf);
+ }
+
+ /* replay the record for deleting entries in bucket page */
+ if (action == BLK_NEEDS_REDO)
+ {
+ char *ptr;
+ Size len;
+
+ ptr = XLogRecGetBlockData(record, 1, &len);
+
+ page = (Page) BufferGetPage(deletebuf);
+
+ if (len > 0)
+ {
+ OffsetNumber *unused;
+ OffsetNumber *unend;
+
+ unused = (OffsetNumber *) ptr;
+ unend = (OffsetNumber *) ((char *) ptr + len);
+
+ if ((unend - unused) > 0)
+ PageIndexMultiDelete(page, unused, unend - unused);
+ }
+
+ /*
+ * Mark the page as not containing any LP_DEAD items only if
+ * clear_dead_marking flag is set to true. See comments in
+ * hashbucketcleanup() for details.
+ */
+ if (xldata->clear_dead_marking)
+ {
+ HashPageOpaque pageopaque;
+
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(deletebuf);
+ }
+ if (BufferIsValid(deletebuf))
+ UnlockReleaseBuffer(deletebuf);
+
+ if (BufferIsValid(bucketbuf))
+ UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay split cleanup flag operation for primary bucket page.
+ */
+static void
+hash_xlog_split_cleanup(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ Buffer buffer;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ HashPageOpaque bucket_opaque;
+
+ page = (Page) BufferGetPage(buffer);
+
+ bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * replay for update meta page
+ */
+static void
+hash_xlog_update_meta_page(XLogReaderState *record)
+{
+ HashMetaPage metap;
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
+ Buffer metabuf;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+
+ metap->hashm_ntuples = xldata->ntuples;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay delete operation in hash index to remove
+ * tuples marked as DEAD during index tuple insertion.
+ */
+static void
+hash_xlog_vacuum_one_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_vacuum_one_page *xldata;
+ Buffer buffer;
+ Buffer metabuf;
+ Page page;
+ XLogRedoAction action;
+ HashPageOpaque pageopaque;
+
+ xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+
+ /*
+ * If we have any conflict processing to do, it must happen before we
+ * update the page.
+ *
+ * Hash index records that are marked as LP_DEAD and being removed during
+ * hash index tuple insertion can conflict with standby queries. You might
+ * think that vacuum records would conflict as well, but we've handled
+ * that already. XLOG_HEAP2_PRUNE records provide the highest xid cleaned
+ * by the vacuum of the heap and so we can resolve any conflicts just once
+ * when that arrives. After that we know that no conflicts exist from
+ * individual hash index vacuum records on that index.
+ */
+ if (InHotStandby)
+ {
+ RelFileNode rnode;
+
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+ ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+ }
+
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
+
+ if (action == BLK_NEEDS_REDO)
+ {
+ page = (Page) BufferGetPage(buffer);
+
+ if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
+ {
+ OffsetNumber *unused;
+
+ unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
+
+ PageIndexMultiDelete(page, unused, xldata->ntuples);
+ }
+
+ /*
+ * Mark the page as not containing any LP_DEAD items. See comments in
+ * _hash_vacuum_one_page() for details.
+ */
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+ {
+ Page metapage;
+ HashMetaPage metap;
+
+ metapage = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(metapage);
+
+ metap->hashm_ntuples -= xldata->ntuples;
+
+ PageSetLSN(metapage, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+void
+hash_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_HASH_INIT_META_PAGE:
+ hash_xlog_init_meta_page(record);
+ break;
+ case XLOG_HASH_INIT_BITMAP_PAGE:
+ hash_xlog_init_bitmap_page(record);
+ break;
+ case XLOG_HASH_INSERT:
+ hash_xlog_insert(record);
+ break;
+ case XLOG_HASH_ADD_OVFL_PAGE:
+ hash_xlog_add_ovfl_page(record);
+ break;
+ case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
+ hash_xlog_split_allocate_page(record);
+ break;
+ case XLOG_HASH_SPLIT_PAGE:
+ hash_xlog_split_page(record);
+ break;
+ case XLOG_HASH_SPLIT_COMPLETE:
+ hash_xlog_split_complete(record);
+ break;
+ case XLOG_HASH_MOVE_PAGE_CONTENTS:
+ hash_xlog_move_page_contents(record);
+ break;
+ case XLOG_HASH_SQUEEZE_PAGE:
+ hash_xlog_squeeze_page(record);
+ break;
+ case XLOG_HASH_DELETE:
+ hash_xlog_delete(record);
+ break;
+ case XLOG_HASH_SPLIT_CLEANUP:
+ hash_xlog_split_cleanup(record);
+ break;
+ case XLOG_HASH_UPDATE_META_PAGE:
+ hash_xlog_update_meta_page(record);
+ break;
+ case XLOG_HASH_VACUUM_ONE_PAGE:
+ hash_xlog_vacuum_one_page(record);
+ break;
+ default:
+ elog(PANIC, "hash_redo: unknown op code %u", info);
+ }
+}
+
+/*
+ * Mask a hash page before performing consistency checks on it.
+ */
+void
+hash_mask(char *pagedata, BlockNumber blkno)
+{
+ Page page = (Page) pagedata;
+ HashPageOpaque opaque;
+ int pagetype;
+
+ mask_page_lsn_and_checksum(page);
+
+ mask_page_hint_bits(page);
+ mask_unused_space(page);
+
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
+ if (pagetype == LH_UNUSED_PAGE)
+ {
+ /*
+ * Mask everything on a UNUSED page.
+ */
+ mask_page_content(page);
+ }
+ else if (pagetype == LH_BUCKET_PAGE ||
+ pagetype == LH_OVERFLOW_PAGE)
+ {
+ /*
+ * In hash bucket and overflow pages, it is possible to modify the
+ * LP_FLAGS without emitting any WAL record. Hence, mask the line
+ * pointer flags. See hashgettuple(), _hash_kill_items() for details.
+ */
+ mask_lp_flags(page);
+ }
+
+ /*
+ * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
+ * unlogged. So, mask it. See _hash_kill_items() for details.
+ */
+ opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+}