summaryrefslogtreecommitdiffstats
path: root/src/backend/access/hash/hashinsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/hash/hashinsert.c')
-rw-r--r--src/backend/access/hash/hashinsert.c433
1 files changed, 433 insertions, 0 deletions
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
new file mode 100644
index 0000000..4f2fecb
--- /dev/null
+++ b/src/backend/access/hash/hashinsert.c
@@ -0,0 +1,433 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashinsert.c
+ * Item insertion in hash tables for Postgres.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashinsert.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
+#include "storage/buf_internals.h"
+#include "storage/lwlock.h"
+#include "storage/predicate.h"
+#include "utils/rel.h"
+
+static void _hash_vacuum_one_page(Relation rel, Relation hrel,
+ Buffer metabuf, Buffer buf);
+
+/*
+ * _hash_doinsert() -- Handle insertion of a single index tuple.
+ *
+ * This routine is called by the public interface routines, hashbuild
+ * and hashinsert. By here, itup is completely filled in.
+ */
+void
+_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
+{
+ Buffer buf = InvalidBuffer;
+ Buffer bucket_buf;
+ Buffer metabuf;
+ HashMetaPage metap;
+ HashMetaPage usedmetap = NULL;
+ Page metapage;
+ Page page;
+ HashPageOpaque pageopaque;
+ Size itemsz;
+ bool do_expand;
+ uint32 hashkey;
+ Bucket bucket;
+ OffsetNumber itup_off;
+
+ /*
+ * Get the hash key for the item (it's stored in the index tuple itself).
+ */
+ hashkey = _hash_get_indextuple_hashkey(itup);
+
+ /* compute item size too */
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
+ * need to be consistent */
+
+restart_insert:
+
+ /*
+ * Read the metapage. We don't lock it yet; HashMaxItemSize() will
+ * examine pd_pagesize_version, but that can't change so we can examine it
+ * without a lock.
+ */
+ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
+ metapage = BufferGetPage(metabuf);
+
+ /*
+ * Check whether the item can fit on a hash page at all. (Eventually, we
+ * ought to try to apply TOAST methods if not.) Note that at this point,
+ * itemsz doesn't include the ItemId.
+ *
+ * XXX this is useless code if we are only storing hash keys.
+ */
+ if (itemsz > HashMaxItemSize(metapage))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row size %zu exceeds hash maximum %zu",
+ itemsz, HashMaxItemSize(metapage)),
+ errhint("Values larger than a buffer page cannot be indexed.")));
+
+ /* Lock the primary bucket page for the target bucket. */
+ buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
+ &usedmetap);
+ Assert(usedmetap != NULL);
+
+ CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(buf));
+
+ /* remember the primary bucket buffer to release the pin on it at end. */
+ bucket_buf = buf;
+
+ page = BufferGetPage(buf);
+ pageopaque = HashPageGetOpaque(page);
+ bucket = pageopaque->hasho_bucket;
+
+ /*
+ * If this bucket is in the process of being split, try to finish the
+ * split before inserting, because that might create room for the
+ * insertion to proceed without allocating an additional overflow page.
+ * It's only interesting to finish the split if we're trying to insert
+ * into the bucket from which we're removing tuples (the "old" bucket),
+ * not if we're trying to insert into the bucket into which tuples are
+ * being moved (the "new" bucket).
+ */
+ if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
+ {
+ /* release the lock on bucket buffer, before completing the split. */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ _hash_finish_split(rel, metabuf, buf, bucket,
+ usedmetap->hashm_maxbucket,
+ usedmetap->hashm_highmask,
+ usedmetap->hashm_lowmask);
+
+ /* release the pin on old and meta buffer. retry for insert. */
+ _hash_dropbuf(rel, buf);
+ _hash_dropbuf(rel, metabuf);
+ goto restart_insert;
+ }
+
+ /* Do the insertion */
+ while (PageGetFreeSpace(page) < itemsz)
+ {
+ BlockNumber nextblkno;
+
+ /*
+ * Check if current page has any DEAD tuples. If yes, delete these
+ * tuples and see if we can get a space for the new item to be
+ * inserted before moving to the next page in the bucket chain.
+ */
+ if (H_HAS_DEAD_TUPLES(pageopaque))
+ {
+
+ if (IsBufferCleanupOK(buf))
+ {
+ _hash_vacuum_one_page(rel, heapRel, metabuf, buf);
+
+ if (PageGetFreeSpace(page) >= itemsz)
+ break; /* OK, now we have enough space */
+ }
+ }
+
+ /*
+ * no space on this page; check for an overflow page
+ */
+ nextblkno = pageopaque->hasho_nextblkno;
+
+ if (BlockNumberIsValid(nextblkno))
+ {
+ /*
+ * ovfl page exists; go get it. if it doesn't have room, we'll
+ * find out next pass through the loop test above. we always
+ * release both the lock and pin if this is an overflow page, but
+ * only the lock if this is the primary bucket page, since the pin
+ * on the primary bucket must be retained throughout the scan.
+ */
+ if (buf != bucket_buf)
+ _hash_relbuf(rel, buf);
+ else
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+ }
+ else
+ {
+ /*
+ * we're at the end of the bucket chain and we haven't found a
+ * page with enough room. allocate a new overflow page.
+ */
+
+ /* release our write lock without modifying buffer */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ /* chain to a new overflow page */
+ buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf));
+ page = BufferGetPage(buf);
+
+ /* should fit now, given test above */
+ Assert(PageGetFreeSpace(page) >= itemsz);
+ }
+ pageopaque = HashPageGetOpaque(page);
+ Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
+ Assert(pageopaque->hasho_bucket == bucket);
+ }
+
+ /*
+ * Write-lock the metapage so we can increment the tuple count. After
+ * incrementing it, check to see if it's time for a split.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* Do the update. No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ /* found page with enough space, so add the item here */
+ itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
+ MarkBufferDirty(buf);
+
+ /* metapage operations */
+ metap = HashPageGetMeta(metapage);
+ metap->hashm_ntuples += 1;
+
+ /* Make sure this stays in sync with _hash_expandtable() */
+ do_expand = metap->hashm_ntuples >
+ (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
+
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_insert xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.offnum = itup_off;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
+
+ XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) itup, IndexTupleSize(itup));
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /* drop lock on metapage, but keep pin */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Release the modified page and ensure to release the pin on primary
+ * page.
+ */
+ _hash_relbuf(rel, buf);
+ if (buf != bucket_buf)
+ _hash_dropbuf(rel, bucket_buf);
+
+ /* Attempt to split if a split is needed */
+ if (do_expand)
+ _hash_expandtable(rel, metabuf);
+
+ /* Finally drop our pin on the metapage */
+ _hash_dropbuf(rel, metabuf);
+}
+
+/*
+ * _hash_pgaddtup() -- add a tuple to a particular page in the index.
+ *
+ * This routine adds the tuple to the page as requested; it does not write out
+ * the page. It is an error to call this function without pin and write lock
+ * on the target buffer.
+ *
+ * Returns the offset number at which the tuple was inserted. This function
+ * is responsible for preserving the condition that tuples in a hash index
+ * page are sorted by hashkey value.
+ */
+OffsetNumber
+_hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
+{
+ OffsetNumber itup_off;
+ Page page;
+ uint32 hashkey;
+
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+
+ /* Find where to insert the tuple (preserving page's hashkey ordering) */
+ hashkey = _hash_get_indextuple_hashkey(itup);
+ itup_off = _hash_binsearch(page, hashkey);
+
+ if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
+ == InvalidOffsetNumber)
+ elog(ERROR, "failed to add index item to \"%s\"",
+ RelationGetRelationName(rel));
+
+ return itup_off;
+}
+
+/*
+ * _hash_pgaddmultitup() -- add a tuple vector to a particular page in the
+ * index.
+ *
+ * This routine has same requirements for locking and tuple ordering as
+ * _hash_pgaddtup().
+ *
+ * Returns the offset number array at which the tuples were inserted.
+ */
+void
+_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
+ OffsetNumber *itup_offsets, uint16 nitups)
+{
+ OffsetNumber itup_off;
+ Page page;
+ uint32 hashkey;
+ int i;
+
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+
+ for (i = 0; i < nitups; i++)
+ {
+ Size itemsize;
+
+ itemsize = IndexTupleSize(itups[i]);
+ itemsize = MAXALIGN(itemsize);
+
+ /* Find where to insert the tuple (preserving page's hashkey ordering) */
+ hashkey = _hash_get_indextuple_hashkey(itups[i]);
+ itup_off = _hash_binsearch(page, hashkey);
+
+ itup_offsets[i] = itup_off;
+
+ if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
+ == InvalidOffsetNumber)
+ elog(ERROR, "failed to add index item to \"%s\"",
+ RelationGetRelationName(rel));
+ }
+}
+
+/*
+ * _hash_vacuum_one_page - vacuum just one index page.
+ *
+ * Try to remove LP_DEAD items from the given page. We must acquire cleanup
+ * lock on the page being modified before calling this function.
+ */
+
+static void
+_hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
+{
+ OffsetNumber deletable[MaxOffsetNumber];
+ int ndeletable = 0;
+ OffsetNumber offnum,
+ maxoff;
+ Page page = BufferGetPage(buf);
+ HashPageOpaque pageopaque;
+ HashMetaPage metap;
+
+ /* Scan each tuple in page to see if it is marked as LP_DEAD */
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page, offnum);
+
+ if (ItemIdIsDead(itemId))
+ deletable[ndeletable++] = offnum;
+ }
+
+ if (ndeletable > 0)
+ {
+ TransactionId latestRemovedXid;
+
+ latestRemovedXid =
+ index_compute_xid_horizon_for_tuples(rel, hrel, buf,
+ deletable, ndeletable);
+
+ /*
+ * Write-lock the meta page so that we can decrement tuple count.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ PageIndexMultiDelete(page, deletable, ndeletable);
+
+ /*
+ * Mark the page as not containing any LP_DEAD items. This is not
+ * certainly true (there might be some that have recently been marked,
+ * but weren't included in our target-item list), but it will almost
+ * always be true and it doesn't seem worth an additional page scan to
+ * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
+ * anyway.
+ */
+ pageopaque = HashPageGetOpaque(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+ metap->hashm_ntuples -= ndeletable;
+
+ MarkBufferDirty(buf);
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_vacuum_one_page xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.latestRemovedXid = latestRemovedXid;
+ xlrec.ntuples = ndeletable;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
+
+ /*
+ * We need the target-offsets array whether or not we store the
+ * whole buffer, to allow us to find the latestRemovedXid on a
+ * standby server.
+ */
+ XLogRegisterData((char *) deletable,
+ ndeletable * sizeof(OffsetNumber));
+
+ XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /*
+ * Releasing write lock on meta page as we have updated the tuple
+ * count.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+ }
+}