summaryrefslogtreecommitdiffstats
path: root/src/backend/access/hash
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/access/hash
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/access/hash')
-rw-r--r--src/backend/access/hash/Makefile27
-rw-r--r--src/backend/access/hash/README651
-rw-r--r--src/backend/access/hash/hash.c918
-rw-r--r--src/backend/access/hash/hash_xlog.c1145
-rw-r--r--src/backend/access/hash/hashfunc.c411
-rw-r--r--src/backend/access/hash/hashinsert.c432
-rw-r--r--src/backend/access/hash/hashovfl.c1083
-rw-r--r--src/backend/access/hash/hashpage.c1612
-rw-r--r--src/backend/access/hash/hashsearch.c721
-rw-r--r--src/backend/access/hash/hashsort.c152
-rw-r--r--src/backend/access/hash/hashutil.c622
-rw-r--r--src/backend/access/hash/hashvalidate.c439
12 files changed, 8213 insertions, 0 deletions
diff --git a/src/backend/access/hash/Makefile b/src/backend/access/hash/Makefile
new file mode 100644
index 0000000..75bf365
--- /dev/null
+++ b/src/backend/access/hash/Makefile
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for access/hash
+#
+# IDENTIFICATION
+# src/backend/access/hash/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/hash
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ hash.o \
+ hash_xlog.o \
+ hashfunc.o \
+ hashinsert.o \
+ hashovfl.o \
+ hashpage.o \
+ hashsearch.o \
+ hashsort.o \
+ hashutil.o \
+ hashvalidate.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/hash/README b/src/backend/access/hash/README
new file mode 100644
index 0000000..2227ebf
--- /dev/null
+++ b/src/backend/access/hash/README
@@ -0,0 +1,651 @@
+src/backend/access/hash/README
+
+Hash Indexing
+=============
+
+This directory contains an implementation of hash indexing for Postgres.
+Most of the core ideas are taken from Margo Seltzer and Ozan Yigit,
+A New Hashing Package for UNIX, Proceedings of the Winter USENIX Conference,
+January 1991. (Our in-memory hashtable implementation,
+src/backend/utils/hash/dynahash.c, also relies on some of the same concepts;
+it is derived from code written by Esmond Pitt and later improved by Margo
+among others.)
+
+A hash index consists of two or more "buckets", into which tuples are
+placed whenever their hash key maps to the bucket number. The
+key-to-bucket-number mapping is chosen so that the index can be
+incrementally expanded. When a new bucket is to be added to the index,
+exactly one existing bucket will need to be "split", with some of its
+tuples being transferred to the new bucket according to the updated
+key-to-bucket-number mapping. This is essentially the same hash table
+management technique embodied in src/backend/utils/hash/dynahash.c for
+in-memory hash tables.
+
+Each bucket in the hash index comprises one or more index pages. The
+bucket's first page is permanently assigned to it when the bucket is
+created. Additional pages, called "overflow pages", are added if the
+bucket receives too many tuples to fit in the primary bucket page.
+The pages of a bucket are chained together in a doubly-linked list
+using fields in the index page special space.
+
+There is currently no provision to shrink a hash index, other than by
+rebuilding it with REINDEX. Overflow pages can be recycled for reuse
+in other buckets, but we never give them back to the operating system.
+There is no provision for reducing the number of buckets, either.
+
+As of PostgreSQL 8.4, hash index entries store only the hash code, not the
+actual data value, for each indexed item. This makes the index entries
+smaller (perhaps very substantially so) and speeds up various operations.
+In particular, we can speed searches by keeping the index entries in any
+one index page sorted by hash code, thus allowing binary search to be used
+within an index page. Note however that there is *no* assumption about the
+relative ordering of hash codes across different index pages of a bucket.
+
+
+Page Addressing
+---------------
+
+There are four kinds of pages in a hash index: the meta page (page zero),
+which contains statically allocated control information; primary bucket
+pages; overflow pages; and bitmap pages, which keep track of overflow
+pages that have been freed and are available for re-use. For addressing
+purposes, bitmap pages are regarded as a subset of the overflow pages.
+
+Primary bucket pages and overflow pages are allocated independently (since
+any given index might need more or fewer overflow pages relative to its
+number of buckets). The hash code uses an interesting set of addressing
+rules to support a variable number of overflow pages while not having to
+move primary bucket pages around after they are created.
+
+Primary bucket pages (henceforth just "bucket pages") are allocated in
+power-of-2 groups, called "split points" in the code. That means at every new
+splitpoint we double the existing number of buckets. Allocating huge chunks
+of bucket pages all at once isn't optimal and we will take ages to consume
+those. To avoid this exponential growth of index size, we did use a trick to
+break up allocation of buckets at the splitpoint into 4 equal phases. If
+(2 ^ x) are the total buckets need to be allocated at a splitpoint (from now on
+we shall call this as a splitpoint group), then we allocate 1/4th (2 ^ (x - 2))
+of total buckets at each phase of splitpoint group. Next quarter of allocation
+will only happen if buckets of the previous phase have been already consumed.
+For the initial splitpoint groups < 10 we will allocate all of their buckets in
+single phase only, as number of buckets allocated at initial groups are small
+in numbers. And for the groups >= 10 the allocation process is distributed
+among four equal phases. At group 10 we allocate (2 ^ 9) buckets in 4
+different phases {2 ^ 7, 2 ^ 7, 2 ^ 7, 2 ^ 7}, the numbers in curly braces
+indicate the number of buckets allocated within each phase of splitpoint group
+10. And, for splitpoint group 11 and 12 allocation phases will be
+{2 ^ 8, 2 ^ 8, 2 ^ 8, 2 ^ 8} and {2 ^ 9, 2 ^ 9, 2 ^ 9, 2 ^ 9} respectively. We
+can see that at each splitpoint group we double the total number of buckets
+from the previous group but in an incremental phase. The bucket pages
+allocated within one phase of a splitpoint group will appear consecutively in
+the index. This addressing scheme allows the physical location of a bucket
+page to be computed from the bucket number relatively easily, using only a
+small amount of control information. If we look at the function
+_hash_spareindex for a given bucket number we first compute the
+splitpoint group it belongs to and then the phase to which the bucket belongs
+to. Adding them we get the global splitpoint phase number S to which the
+bucket belongs and then simply add "hashm_spares[S] + 1" (where hashm_spares[]
+is an array stored in the metapage) with given bucket number to compute its
+physical address. The hashm_spares[S] can be interpreted as the total number
+of overflow pages that have been allocated before the bucket pages of
+splitpoint phase S. The hashm_spares[0] is always 0, so that buckets 0 and 1
+always appear at block numbers 1 and 2, just after the meta page. We always
+have hashm_spares[N] <= hashm_spares[N+1], since the latter count includes the
+former. The difference between the two represents the number of overflow pages
+appearing between the bucket page groups of splitpoints phase N and N+1.
+(Note: the above describes what happens when filling an initially minimally
+sized hash index. In practice, we try to estimate the required index size and
+allocate a suitable number of splitpoints phases immediately, to avoid
+expensive re-splitting during initial index build.)
+
+When S splitpoints exist altogether, the array entries hashm_spares[0]
+through hashm_spares[S] are valid; hashm_spares[S] records the current
+total number of overflow pages. New overflow pages are created as needed
+at the end of the index, and recorded by incrementing hashm_spares[S].
+When it is time to create a new splitpoint phase's worth of bucket pages, we
+copy hashm_spares[S] into hashm_spares[S+1] and increment S (which is
+stored in the hashm_ovflpoint field of the meta page). This has the
+effect of reserving the correct number of bucket pages at the end of the
+index, and preparing to allocate additional overflow pages after those
+bucket pages. hashm_spares[] entries before S cannot change anymore,
+since that would require moving already-created bucket pages.
+
+The last page nominally used by the index is always determinable from
+hashm_spares[S]. To avoid complaints from smgr, the logical EOF as seen by
+the filesystem and smgr must always be greater than or equal to this page.
+We have to allow the case "greater than" because it's possible that during
+an index extension we crash after allocating filesystem space and before
+updating the metapage. Note that on filesystems that allow "holes" in
+files, it's entirely likely that pages before the logical EOF are not yet
+allocated: when we allocate a new splitpoint phase's worth of bucket pages, we
+physically zero the last such page to force the EOF up, and the first such
+page will be used immediately, but the intervening pages are not written
+until needed.
+
+Since overflow pages may be recycled if enough tuples are deleted from
+their bucket, we need a way to keep track of currently-free overflow
+pages. The state of each overflow page (0 = available, 1 = not available)
+is recorded in "bitmap" pages dedicated to this purpose. The entries in
+the bitmap are indexed by "bit number", a zero-based count in which every
+overflow page has a unique entry. We can convert between an overflow
+page's physical block number and its bit number using the information in
+hashm_spares[] (see hashovfl.c for details). The bit number sequence
+includes the bitmap pages, which is the reason for saying that bitmap
+pages are a subset of the overflow pages. It turns out in fact that each
+bitmap page's first bit represents itself --- this is not an essential
+property, but falls out of the fact that we only allocate another bitmap
+page when we really need one. Bit number zero always corresponds to the
+first bitmap page, which is allocated during index creation just after all
+the initially created buckets.
+
+
+Lock Definitions
+----------------
+
+Concurrency control for hash indexes is provided using buffer content
+locks, buffer pins, and cleanup locks. Here as elsewhere in PostgreSQL,
+cleanup lock means that we hold an exclusive lock on the buffer and have
+observed at some point after acquiring the lock that we hold the only pin
+on that buffer. For hash indexes, a cleanup lock on a primary bucket page
+represents the right to perform an arbitrary reorganization of the entire
+bucket. Therefore, scans retain a pin on the primary bucket page for the
+bucket they are currently scanning. Splitting a bucket requires a cleanup
+lock on both the old and new primary bucket pages. VACUUM therefore takes
+a cleanup lock on every bucket page in order to remove tuples. It can also
+remove tuples copied to a new bucket by any previous split operation, because
+the cleanup lock taken on the primary bucket page guarantees that no scans
+which started prior to the most recent split can still be in progress. After
+cleaning each page individually, it attempts to take a cleanup lock on the
+primary bucket page in order to "squeeze" the bucket down to the minimum
+possible number of pages.
+
+To avoid deadlocks, we must be consistent about the lock order in which we
+lock the buckets for operations that requires locks on two different buckets.
+We choose to always lock the lower-numbered bucket first. The metapage is
+only ever locked after all bucket locks have been taken.
+
+
+Metapage Caching
+----------------
+
+Both scanning the index and inserting tuples require locating the bucket
+where a given tuple ought to be located. To do this, we need the bucket
+count, highmask, and lowmask from the metapage; however, it's undesirable
+for performance reasons to have to have to lock and pin the metapage for
+every such operation. Instead, we retain a cached copy of the metapage
+in each backend's relcache entry. This will produce the correct
+bucket mapping as long as the target bucket hasn't been split since the
+last cache refresh.
+
+To guard against the possibility that such a split has occurred, the
+primary page of each bucket chain stores the number of buckets that
+existed as of the time the bucket was last split, or if never split as
+of the time it was created, in the space normally used for the
+previous block number (that is, hasho_prevblkno). This doesn't cost
+anything because the primary bucket page is always the first page in
+the chain, and the previous block number is therefore always, in
+reality, InvalidBlockNumber.
+
+After computing the ostensibly-correct bucket number based on our cached
+copy of the metapage, we lock the corresponding primary bucket page and
+check whether the bucket count stored in hasho_prevblkno is greater than
+the number of buckets stored in our cached copy of the metapage. If
+so, the bucket has certainly been split, because the count must originally
+have been less than the number of buckets that existed at that time and
+can't have increased except due to a split. If not, the bucket can't have
+been split, because a split would have created a new bucket with a higher
+bucket number than any we'd seen previously. In the latter case, we've
+locked the correct bucket and can proceed; in the former case, we must
+release the lock on this bucket, lock the metapage, update our cache,
+unlock the metapage, and retry.
+
+Needing to retry occasionally might seem expensive, but the number of times
+any given bucket can be split is limited to a few dozen no matter how
+many times the hash index is accessed, because the total number of
+buckets is limited to less than 2^32. On the other hand, the number of
+times we access a bucket is unbounded and will be several orders of
+magnitude larger even in unsympathetic cases.
+
+(The metapage cache is new in v10. Older hash indexes had the primary
+bucket page's hasho_prevblkno initialized to InvalidBuffer.)
+
+Pseudocode Algorithms
+---------------------
+
+Various flags that are used in hash index operations are described as below:
+
+The bucket-being-split and bucket-being-populated flags indicate that split
+the operation is in progress for a bucket. During split operation, a
+bucket-being-split flag is set on the old bucket and bucket-being-populated
+flag is set on new bucket. These flags are cleared once the split operation
+is finished.
+
+The split-cleanup flag indicates that a bucket which has been recently split
+still contains tuples that were also copied to the new bucket; it essentially
+marks the split as incomplete. Once we're certain that no scans which
+started before the new bucket was fully populated are still in progress, we
+can remove the copies from the old bucket and clear the flag. We insist that
+this flag must be clear before splitting a bucket; thus, a bucket can't be
+split again until the previous split is totally complete.
+
+The moved-by-split flag on a tuple indicates that tuple is moved from old to
+new bucket. Concurrent scans will skip such tuples until the split operation
+is finished. Once the tuple is marked as moved-by-split, it will remain so
+forever but that does no harm. We have intentionally not cleared it as that
+can generate an additional I/O which is not necessary.
+
+The operations we need to support are: readers scanning the index for
+entries of a particular hash code (which by definition are all in the same
+bucket); insertion of a new tuple into the correct bucket; enlarging the
+hash table by splitting an existing bucket; and garbage collection
+(deletion of dead tuples and compaction of buckets). Bucket splitting is
+done at conclusion of any insertion that leaves the hash table more full
+than the target load factor, but it is convenient to consider it as an
+independent operation. Note that we do not have a bucket-merge operation
+--- the number of buckets never shrinks. Insertion, splitting, and
+garbage collection may all need access to freelist management, which keeps
+track of available overflow pages.
+
+The reader algorithm is:
+
+ lock the primary bucket page of the target bucket
+ if the target bucket is still being populated by a split:
+ release the buffer content lock on current bucket page
+ pin and acquire the buffer content lock on old bucket in shared mode
+ release the buffer content lock on old bucket, but not pin
+ retake the buffer content lock on new bucket
+ arrange to scan the old bucket normally and the new bucket for
+ tuples which are not moved-by-split
+-- then, per read request:
+ reacquire content lock on current page
+ step to next page if necessary (no chaining of content locks, but keep
+ the pin on the primary bucket throughout the scan)
+ save all the matching tuples from current index page into an items array
+ release pin and content lock (but if it is primary bucket page retain
+ its pin till the end of the scan)
+ get tuple from an item array
+-- at scan shutdown:
+ release all pins still held
+
+Holding the buffer pin on the primary bucket page for the whole scan prevents
+the reader's current-tuple pointer from being invalidated by splits or
+compactions. (Of course, other buckets can still be split or compacted.)
+
+To minimize lock/unlock traffic, hash index scan always searches the entire
+hash page to identify all the matching items at once, copying their heap tuple
+IDs into backend-local storage. The heap tuple IDs are then processed while not
+holding any page lock within the index thereby, allowing concurrent insertion
+to happen on the same index page without any requirement of re-finding the
+current scan position for the reader. We do continue to hold a pin on the
+bucket page, to protect against concurrent deletions and bucket split.
+
+To allow for scans during a bucket split, if at the start of the scan, the
+bucket is marked as bucket-being-populated, it scan all the tuples in that
+bucket except for those that are marked as moved-by-split. Once it finishes
+the scan of all the tuples in the current bucket, it scans the old bucket from
+which this bucket is formed by split.
+
+The insertion algorithm is rather similar:
+
+ lock the primary bucket page of the target bucket
+-- (so far same as reader, except for acquisition of buffer content lock in
+ exclusive mode on primary bucket page)
+ if the bucket-being-split flag is set for a bucket and pin count on it is
+ one, then finish the split
+ release the buffer content lock on current bucket
+ get the "new" bucket which was being populated by the split
+ scan the new bucket and form the hash table of TIDs
+ conditionally get the cleanup lock on old and new buckets
+ if we get the lock on both the buckets
+ finish the split using algorithm mentioned below for split
+ release the pin on old bucket and restart the insert from beginning.
+ if current page is full, first check if this page contains any dead tuples.
+ if yes, remove dead tuples from the current page and again check for the
+ availability of the space. If enough space found, insert the tuple else
+ release lock but not pin, read/exclusive-lock
+ next page; repeat as needed
+ >> see below if no space in any page of bucket
+ take buffer content lock in exclusive mode on metapage
+ insert tuple at appropriate place in page
+ mark current page dirty
+ increment tuple count, decide if split needed
+ mark meta page dirty
+ write WAL for insertion of tuple
+ release the buffer content lock on metapage
+ release buffer content lock on current page
+ if current page is not a bucket page, release the pin on bucket page
+ if split is needed, enter Split algorithm below
+ release the pin on metapage
+
+To speed searches, the index entries within any individual index page are
+kept sorted by hash code; the insertion code must take care to insert new
+entries in the right place. It is okay for an insertion to take place in a
+bucket that is being actively scanned, because readers can cope with this
+as explained above. We only need the short-term buffer locks to ensure
+that readers do not see a partially-updated page.
+
+To avoid deadlock between readers and inserters, whenever there is a need
+to lock multiple buckets, we always take in the order suggested in Lock
+Definitions above. This algorithm allows them a very high degree of
+concurrency. (The exclusive metapage lock taken to update the tuple count
+is stronger than necessary, since readers do not care about the tuple count,
+but the lock is held for such a short time that this is probably not an
+issue.)
+
+When an inserter cannot find space in any existing page of a bucket, it
+must obtain an overflow page and add that page to the bucket's chain.
+Details of that part of the algorithm appear later.
+
+The page split algorithm is entered whenever an inserter observes that the
+index is overfull (has a higher-than-wanted ratio of tuples to buckets).
+The algorithm attempts, but does not necessarily succeed, to split one
+existing bucket in two, thereby lowering the fill ratio:
+
+ pin meta page and take buffer content lock in exclusive mode
+ check split still needed
+ if split not needed anymore, drop buffer content lock and pin and exit
+ decide which bucket to split
+ try to take a cleanup lock on that bucket; if fail, give up
+ if that bucket is still being split or has split-cleanup work:
+ try to finish the split and the cleanup work
+ if that succeeds, start over; if it fails, give up
+ mark the old and new buckets indicating split is in progress
+ mark both old and new buckets as dirty
+ write WAL for allocation of new page for split
+ copy the tuples that belongs to new bucket from old bucket, marking
+ them as moved-by-split
+ write WAL record for moving tuples to new page once the new page is full
+ or all the pages of old bucket are finished
+ release lock but not pin for primary bucket page of old bucket,
+ read/shared-lock next page; repeat as needed
+ clear the bucket-being-split and bucket-being-populated flags
+ mark the old bucket indicating split-cleanup
+ write WAL for changing the flags on both old and new buckets
+
+The split operation's attempt to acquire cleanup-lock on the old bucket number
+could fail if another process holds any lock or pin on it. We do not want to
+wait if that happens, because we don't want to wait while holding the metapage
+exclusive-lock. So, this is a conditional LWLockAcquire operation, and if
+it fails we just abandon the attempt to split. This is all right since the
+index is overfull but perfectly functional. Every subsequent inserter will
+try to split, and eventually one will succeed. If multiple inserters failed
+to split, the index might still be overfull, but eventually, the index will
+not be overfull and split attempts will stop. (We could make a successful
+splitter loop to see if the index is still overfull, but it seems better to
+distribute the split overhead across successive insertions.)
+
+If a split fails partway through (e.g. due to insufficient disk space or an
+interrupt), the index will not be corrupted. Instead, we'll retry the split
+every time a tuple is inserted into the old bucket prior to inserting the new
+tuple; eventually, we should succeed. The fact that a split is left
+unfinished doesn't prevent subsequent buckets from being split, but we won't
+try to split the bucket again until the prior split is finished. In other
+words, a bucket can be in the middle of being split for some time, but it can't
+be in the middle of two splits at the same time.
+
+The fourth operation is garbage collection (bulk deletion):
+
+ next bucket := 0
+ pin metapage and take buffer content lock in exclusive mode
+ fetch current max bucket number
+ release meta page buffer content lock and pin
+ while next bucket <= max bucket do
+ acquire cleanup lock on primary bucket page
+ loop:
+ scan and remove tuples
+ mark the target page dirty
+ write WAL for deleting tuples from target page
+ if this is the last bucket page, break out of loop
+ pin and x-lock next page
+ release prior lock and pin (except keep pin on primary bucket page)
+ if the page we have locked is not the primary bucket page:
+ release lock and take exclusive lock on primary bucket page
+ if there are no other pins on the primary bucket page:
+ squeeze the bucket to remove free space
+ release the pin on primary bucket page
+ next bucket ++
+ end loop
+ pin metapage and take buffer content lock in exclusive mode
+ check if number of buckets changed
+ if so, release content lock and pin and return to for-each-bucket loop
+ else update metapage tuple count
+ mark meta page dirty and write WAL for update of metapage
+ release buffer content lock and pin
+
+Note that this is designed to allow concurrent splits and scans. If a split
+occurs, tuples relocated into the new bucket will be visited twice by the
+scan, but that does no harm. See also "Interlocking Between Scans and
+VACUUM", below.
+
+We must be careful about the statistics reported by the VACUUM operation.
+What we can do is count the number of tuples scanned, and believe this in
+preference to the stored tuple count if the stored tuple count and number of
+buckets did *not* change at any time during the scan. This provides a way of
+correcting the stored tuple count if it gets out of sync for some reason. But
+if a split or insertion does occur concurrently, the scan count is
+untrustworthy; instead, subtract the number of tuples deleted from the stored
+tuple count and use that.
+
+Interlocking Between Scans and VACUUM
+-------------------------------------
+
+Since we release the lock on bucket page during a cleanup scan of a bucket, a
+concurrent scan could start in that bucket before we've finished vacuuming it.
+If a scan gets ahead of cleanup, we could have the following problem: (1) the
+scan sees heap TIDs that are about to be removed before they are processed by
+VACUUM, (2) the scan decides that one or more of those TIDs are dead, (3)
+VACUUM completes, (4) one or more of the TIDs the scan decided were dead are
+reused for an unrelated tuple, and finally (5) the scan wakes up and
+erroneously kills the new tuple.
+
+Note that this requires VACUUM and a scan to be active in the same bucket at
+the same time. If VACUUM completes before the scan starts, the scan never has
+a chance to see the dead tuples; if the scan completes before the VACUUM
+starts, the heap TIDs can't have been reused meanwhile. Furthermore, VACUUM
+can't start on a bucket that has an active scan, because the scan holds a pin
+on the primary bucket page, and VACUUM must take a cleanup lock on that page
+in order to begin cleanup. Therefore, the only way this problem can occur is
+for a scan to start after VACUUM has released the cleanup lock on the bucket
+but before it has processed the entire bucket and then overtake the cleanup
+operation.
+
+Currently, we prevent this using lock chaining: cleanup locks the next page
+in the chain before releasing the lock and pin on the page just processed.
+
+Free Space Management
+---------------------
+
+(Question: why is this so complicated? Why not just have a linked list
+of free pages with the list head in the metapage? It's not like we
+avoid needing to modify the metapage with all this.)
+
+Free space management consists of two sub-algorithms, one for reserving
+an overflow page to add to a bucket chain, and one for returning an empty
+overflow page to the free pool.
+
+Obtaining an overflow page:
+
+ take metapage content lock in exclusive mode
+ determine next bitmap page number; if none, exit loop
+ release meta page content lock
+ pin bitmap page and take content lock in exclusive mode
+ search for a free page (zero bit in bitmap)
+ if found:
+ set bit in bitmap
+ mark bitmap page dirty
+ take metapage buffer content lock in exclusive mode
+ if first-free-bit value did not change,
+ update it and mark meta page dirty
+ else (not found):
+ release bitmap page buffer content lock
+ loop back to try next bitmap page, if any
+-- here when we have checked all bitmap pages; we hold meta excl. lock
+ extend index to add another overflow page; update meta information
+ mark meta page dirty
+ return page number
+
+It is slightly annoying to release and reacquire the metapage lock
+multiple times, but it seems best to do it that way to minimize loss of
+concurrency against processes just entering the index. We don't want
+to hold the metapage exclusive lock while reading in a bitmap page.
+(We can at least avoid repeated buffer pin/unpin here.)
+
+The normal path for extending the index does not require doing I/O while
+holding the metapage lock. We do have to do I/O when the extension
+requires adding a new bitmap page as well as the required overflow page
+... but that is an infrequent case, so the loss of concurrency seems
+acceptable.
+
+The portion of tuple insertion that calls the above subroutine looks
+like this:
+
+ -- having determined that no space is free in the target bucket:
+ remember last page of bucket, drop write lock on it
+ re-write-lock last page of bucket
+ if it is not last anymore, step to the last page
+ execute free-page-acquire (obtaining an overflow page) mechanism
+ described above
+ update (former) last page to point to the new page and mark buffer dirty
+ write-lock and initialize new page, with back link to former last page
+ write WAL for addition of overflow page
+ release the locks on meta page and bitmap page acquired in
+ free-page-acquire algorithm
+ release the lock on former last page
+ release the lock on new overflow page
+ insert tuple into new page
+ -- etc.
+
+Notice this handles the case where two concurrent inserters try to extend
+the same bucket. They will end up with a valid, though perhaps
+space-inefficient, configuration: two overflow pages will be added to the
+bucket, each containing one tuple.
+
+The last part of this violates the rule about holding write lock on two
+pages concurrently, but it should be okay to write-lock the previously
+free page; there can be no other process holding lock on it.
+
+Bucket splitting uses a similar algorithm if it has to extend the new
+bucket, but it need not worry about concurrent extension since it has
+buffer content lock in exclusive mode on the new bucket.
+
+Freeing an overflow page requires the process to hold buffer content lock in
+exclusive mode on the containing bucket, so need not worry about other
+accessors of pages in the bucket. The algorithm is:
+
+ delink overflow page from bucket chain
+ (this requires read/update/write/release of fore and aft siblings)
+ pin meta page and take buffer content lock in shared mode
+ determine which bitmap page contains the free space bit for page
+ release meta page buffer content lock
+ pin bitmap page and take buffer content lock in exclusive mode
+ retake meta page buffer content lock in exclusive mode
+ move (insert) tuples that belong to the overflow page being freed
+ update bitmap bit
+ mark bitmap page dirty
+ if page number is still less than first-free-bit,
+ update first-free-bit field and mark meta page dirty
+ write WAL for delinking overflow page operation
+ release buffer content lock and pin
+ release meta page buffer content lock and pin
+
+We have to do it this way because we must clear the bitmap bit before
+changing the first-free-bit field (hashm_firstfree). It is possible that
+we set first-free-bit too small (because someone has already reused the
+page we just freed), but that is okay; the only cost is the next overflow
+page acquirer will scan more bitmap bits than he needs to. What must be
+avoided is having first-free-bit greater than the actual first free bit,
+because then that free page would never be found by searchers.
+
+The reason of moving tuples from overflow page while delinking the later is
+to make that as an atomic operation. Not doing so could lead to spurious reads
+on standby. Basically, the user might see the same tuple twice.
+
+
+WAL Considerations
+------------------
+
+The hash index operations like create index, insert, delete, bucket split,
+allocate overflow page, and squeeze in themselves don't guarantee hash index
+consistency after a crash. To provide robustness, we write WAL for each of
+these operations.
+
+CREATE INDEX writes multiple WAL records. First, we write a record to cover
+the initializatoin of the metapage, followed by one for each new bucket
+created, followed by one for the initial bitmap page. It's not important for
+index creation to appear atomic, because the index isn't yet visible to any
+other transaction, and the creating transaction will roll back in the event of
+a crash. It would be difficult to cover the whole operation with a single
+write-ahead log record anyway, because we can log only a fixed number of
+pages, as given by XLR_MAX_BLOCK_ID (32), with current XLog machinery.
+
+Ordinary item insertions (that don't force a page split or need a new overflow
+page) are single WAL entries. They touch a single bucket page and the
+metapage. The metapage is updated during replay as it is updated during
+original operation.
+
+If an insertion causes the addition of an overflow page, there will be one
+WAL entry for the new overflow page and second entry for insert itself.
+
+If an insertion causes a bucket split, there will be one WAL entry for insert
+itself, followed by a WAL entry for allocating a new bucket, followed by a WAL
+entry for each overflow bucket page in the new bucket to which the tuples are
+moved from old bucket, followed by a WAL entry to indicate that split is
+complete for both old and new buckets. A split operation which requires
+overflow pages to complete the operation will need to write a WAL record for
+each new allocation of an overflow page.
+
+As splitting involves multiple atomic actions, it's possible that the system
+crashes between moving tuples from bucket pages of the old bucket to new
+bucket. In such a case, after recovery, the old and new buckets will be
+marked with bucket-being-split and bucket-being-populated flags respectively
+which indicates that split is in progress for those buckets. The reader
+algorithm works correctly, as it will scan both the old and new buckets when
+the split is in progress as explained in the reader algorithm section above.
+
+We finish the split at next insert or split operation on the old bucket as
+explained in insert and split algorithm above. It could be done during
+searches, too, but it seems best not to put any extra updates in what would
+otherwise be a read-only operation (updating is not possible in hot standby
+mode anyway). It would seem natural to complete the split in VACUUM, but since
+splitting a bucket might require allocating a new page, it might fail if you
+run out of disk space. That would be bad during VACUUM - the reason for
+running VACUUM in the first place might be that you run out of disk space,
+and now VACUUM won't finish because you're out of disk space. In contrast,
+an insertion can require enlarging the physical file anyway.
+
+Deletion of tuples from a bucket is performed for two reasons: to remove dead
+tuples, and to remove tuples that were moved by a bucket split. A WAL entry
+is made for each bucket page from which tuples are removed, and then another
+WAL entry is made when we clear the needs-split-cleanup flag. If dead tuples
+are removed, a separate WAL entry is made to update the metapage.
+
+As deletion involves multiple atomic operations, it is quite possible that
+system crashes after (a) removing tuples from some of the bucket pages, (b)
+before clearing the garbage flag, or (c) before updating the metapage. If the
+system crashes before completing (b), it will again try to clean the bucket
+during next vacuum or insert after recovery which can have some performance
+impact, but it will work fine. If the system crashes before completing (c),
+after recovery there could be some additional splits until the next vacuum
+updates the metapage, but the other operations like insert, delete and scan
+will work correctly. We can fix this problem by actually updating the
+metapage based on delete operation during replay, but it's not clear whether
+it's worth the complication.
+
+A squeeze operation moves tuples from one of the buckets later in the chain to
+one of the bucket earlier in chain and writes WAL record when either the
+bucket to which it is writing tuples is filled or bucket from which it
+is removing the tuples becomes empty.
+
+As a squeeze operation involves writing multiple atomic operations, it is
+quite possible that the system crashes before completing the operation on
+entire bucket. After recovery, the operations will work correctly, but
+the index will remain bloated and this can impact performance of read and
+insert operations until the next vacuum squeeze the bucket completely.
+
+
+Other Notes
+-----------
+
+Clean up locks prevent a split from occurring while *another* process is stopped
+in a given bucket. It also ensures that one of our *own* backend's scans is not
+stopped in the bucket.
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
new file mode 100644
index 0000000..0752fb3
--- /dev/null
+++ b/src/backend/access/hash/hash.c
@@ -0,0 +1,918 @@
+/*-------------------------------------------------------------------------
+ *
+ * hash.c
+ * Implementation of Margo Seltzer's Hashing package for postgres.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hash.c
+ *
+ * NOTES
+ * This file contains only the public interface routines.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "access/relscan.h"
+#include "access/tableam.h"
+#include "catalog/index.h"
+#include "commands/progress.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "optimizer/plancat.h"
+#include "pgstat.h"
+#include "utils/builtins.h"
+#include "utils/index_selfuncs.h"
+#include "utils/rel.h"
+
+/* Working state for hashbuild and its callback */
+typedef struct
+{
+ HSpool *spool; /* NULL if not using spooling */
+ double indtuples; /* # tuples accepted into index */
+ Relation heapRel; /* heap relation descriptor */
+} HashBuildState;
+
+static void hashbuildCallback(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *state);
+
+
+/*
+ * Hash handler function: return IndexAmRoutine with access method parameters
+ * and callbacks.
+ */
+Datum
+hashhandler(PG_FUNCTION_ARGS)
+{
+ IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
+
+ amroutine->amstrategies = HTMaxStrategyNumber;
+ amroutine->amsupport = HASHNProcs;
+ amroutine->amoptsprocnum = HASHOPTIONS_PROC;
+ amroutine->amcanorder = false;
+ amroutine->amcanorderbyop = false;
+ amroutine->amcanbackward = true;
+ amroutine->amcanunique = false;
+ amroutine->amcanmulticol = false;
+ amroutine->amoptionalkey = false;
+ amroutine->amsearcharray = false;
+ amroutine->amsearchnulls = false;
+ amroutine->amstorage = false;
+ amroutine->amclusterable = false;
+ amroutine->ampredlocks = true;
+ amroutine->amcanparallel = false;
+ amroutine->amcaninclude = false;
+ amroutine->amusemaintenanceworkmem = false;
+ amroutine->amparallelvacuumoptions =
+ VACUUM_OPTION_PARALLEL_BULKDEL;
+ amroutine->amkeytype = INT4OID;
+
+ amroutine->ambuild = hashbuild;
+ amroutine->ambuildempty = hashbuildempty;
+ amroutine->aminsert = hashinsert;
+ amroutine->ambulkdelete = hashbulkdelete;
+ amroutine->amvacuumcleanup = hashvacuumcleanup;
+ amroutine->amcanreturn = NULL;
+ amroutine->amcostestimate = hashcostestimate;
+ amroutine->amoptions = hashoptions;
+ amroutine->amproperty = NULL;
+ amroutine->ambuildphasename = NULL;
+ amroutine->amvalidate = hashvalidate;
+ amroutine->amadjustmembers = hashadjustmembers;
+ amroutine->ambeginscan = hashbeginscan;
+ amroutine->amrescan = hashrescan;
+ amroutine->amgettuple = hashgettuple;
+ amroutine->amgetbitmap = hashgetbitmap;
+ amroutine->amendscan = hashendscan;
+ amroutine->ammarkpos = NULL;
+ amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
+
+ PG_RETURN_POINTER(amroutine);
+}
+
+/*
+ * hashbuild() -- build a new hash index.
+ */
+IndexBuildResult *
+hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
+{
+ IndexBuildResult *result;
+ BlockNumber relpages;
+ double reltuples;
+ double allvisfrac;
+ uint32 num_buckets;
+ long sort_threshold;
+ HashBuildState buildstate;
+
+ /*
+ * We expect to be called exactly once for any index relation. If that's
+ * not the case, big trouble's what we have.
+ */
+ if (RelationGetNumberOfBlocks(index) != 0)
+ elog(ERROR, "index \"%s\" already contains data",
+ RelationGetRelationName(index));
+
+ /* Estimate the number of rows currently present in the table */
+ estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
+
+ /* Initialize the hash index metadata page and initial buckets */
+ num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
+
+ /*
+ * If we just insert the tuples into the index in scan order, then
+ * (assuming their hash codes are pretty random) there will be no locality
+ * of access to the index, and if the index is bigger than available RAM
+ * then we'll thrash horribly. To prevent that scenario, we can sort the
+ * tuples by (expected) bucket number. However, such a sort is useless
+ * overhead when the index does fit in RAM. We choose to sort if the
+ * initial index size exceeds maintenance_work_mem, or the number of
+ * buffers usable for the index, whichever is less. (Limiting by the
+ * number of buffers should reduce thrashing between PG buffers and kernel
+ * buffers, which seems useful even if no physical I/O results. Limiting
+ * by maintenance_work_mem is useful to allow easy testing of the sort
+ * code path, and may be useful to DBAs as an additional control knob.)
+ *
+ * NOTE: this test will need adjustment if a bucket is ever different from
+ * one page. Also, "initial index size" accounting does not include the
+ * metapage, nor the first bitmap page.
+ */
+ sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
+ if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
+ sort_threshold = Min(sort_threshold, NBuffers);
+ else
+ sort_threshold = Min(sort_threshold, NLocBuffer);
+
+ if (num_buckets >= (uint32) sort_threshold)
+ buildstate.spool = _h_spoolinit(heap, index, num_buckets);
+ else
+ buildstate.spool = NULL;
+
+ /* prepare to build the index */
+ buildstate.indtuples = 0;
+ buildstate.heapRel = heap;
+
+ /* do the heap scan */
+ reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+ hashbuildCallback,
+ (void *) &buildstate, NULL);
+ pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL,
+ buildstate.indtuples);
+
+ if (buildstate.spool)
+ {
+ /* sort the tuples and insert them into the index */
+ _h_indexbuild(buildstate.spool, buildstate.heapRel);
+ _h_spooldestroy(buildstate.spool);
+ }
+
+ /*
+ * Return statistics
+ */
+ result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
+
+ result->heap_tuples = reltuples;
+ result->index_tuples = buildstate.indtuples;
+
+ return result;
+}
+
+/*
+ * hashbuildempty() -- build an empty hash index in the initialization fork
+ */
+void
+hashbuildempty(Relation index)
+{
+ _hash_init(index, 0, INIT_FORKNUM);
+}
+
+/*
+ * Per-tuple callback for table_index_build_scan
+ */
+static void
+hashbuildCallback(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *state)
+{
+ HashBuildState *buildstate = (HashBuildState *) state;
+ Datum index_values[1];
+ bool index_isnull[1];
+ IndexTuple itup;
+
+ /* convert data to a hash key; on failure, do not insert anything */
+ if (!_hash_convert_tuple(index,
+ values, isnull,
+ index_values, index_isnull))
+ return;
+
+ /* Either spool the tuple for sorting, or just put it into the index */
+ if (buildstate->spool)
+ _h_spool(buildstate->spool, tid, index_values, index_isnull);
+ else
+ {
+ /* form an index tuple and point it at the heap tuple */
+ itup = index_form_tuple(RelationGetDescr(index),
+ index_values, index_isnull);
+ itup->t_tid = *tid;
+ _hash_doinsert(index, itup, buildstate->heapRel);
+ pfree(itup);
+ }
+
+ buildstate->indtuples += 1;
+}
+
+/*
+ * hashinsert() -- insert an index tuple into a hash table.
+ *
+ * Hash on the heap tuple's key, form an index tuple with hash code.
+ * Find the appropriate location for the new tuple, and put it there.
+ */
+bool
+hashinsert(Relation rel, Datum *values, bool *isnull,
+ ItemPointer ht_ctid, Relation heapRel,
+ IndexUniqueCheck checkUnique,
+ bool indexUnchanged,
+ IndexInfo *indexInfo)
+{
+ Datum index_values[1];
+ bool index_isnull[1];
+ IndexTuple itup;
+
+ /* convert data to a hash key; on failure, do not insert anything */
+ if (!_hash_convert_tuple(rel,
+ values, isnull,
+ index_values, index_isnull))
+ return false;
+
+ /* form an index tuple and point it at the heap tuple */
+ itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
+ itup->t_tid = *ht_ctid;
+
+ _hash_doinsert(rel, itup, heapRel);
+
+ pfree(itup);
+
+ return false;
+}
+
+
+/*
+ * hashgettuple() -- Get the next tuple in the scan.
+ */
+bool
+hashgettuple(IndexScanDesc scan, ScanDirection dir)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ bool res;
+
+ /* Hash indexes are always lossy since we store only the hash code */
+ scan->xs_recheck = true;
+
+ /*
+ * If we've already initialized this scan, we can just advance it in the
+ * appropriate direction. If we haven't done so yet, we call a routine to
+ * get the first item in the scan.
+ */
+ if (!HashScanPosIsValid(so->currPos))
+ res = _hash_first(scan, dir);
+ else
+ {
+ /*
+ * Check to see if we should kill the previously-fetched tuple.
+ */
+ if (scan->kill_prior_tuple)
+ {
+ /*
+ * Yes, so remember it for later. (We'll deal with all such tuples
+ * at once right after leaving the index page or at end of scan.)
+ * In case if caller reverses the indexscan direction it is quite
+ * possible that the same item might get entered multiple times.
+ * But, we don't detect that; instead, we just forget any excess
+ * entries.
+ */
+ if (so->killedItems == NULL)
+ so->killedItems = (int *)
+ palloc(MaxIndexTuplesPerPage * sizeof(int));
+
+ if (so->numKilled < MaxIndexTuplesPerPage)
+ so->killedItems[so->numKilled++] = so->currPos.itemIndex;
+ }
+
+ /*
+ * Now continue the scan.
+ */
+ res = _hash_next(scan, dir);
+ }
+
+ return res;
+}
+
+
+/*
+ * hashgetbitmap() -- get all tuples at once
+ */
+int64
+hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ bool res;
+ int64 ntids = 0;
+ HashScanPosItem *currItem;
+
+ res = _hash_first(scan, ForwardScanDirection);
+
+ while (res)
+ {
+ currItem = &so->currPos.items[so->currPos.itemIndex];
+
+ /*
+ * _hash_first and _hash_next handle eliminate dead index entries
+ * whenever scan->ignore_killed_tuples is true. Therefore, there's
+ * nothing to do here except add the results to the TIDBitmap.
+ */
+ tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
+ ntids++;
+
+ res = _hash_next(scan, ForwardScanDirection);
+ }
+
+ return ntids;
+}
+
+
+/*
+ * hashbeginscan() -- start a scan on a hash index
+ */
+IndexScanDesc
+hashbeginscan(Relation rel, int nkeys, int norderbys)
+{
+ IndexScanDesc scan;
+ HashScanOpaque so;
+
+ /* no order by operators allowed */
+ Assert(norderbys == 0);
+
+ scan = RelationGetIndexScan(rel, nkeys, norderbys);
+
+ so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
+ HashScanPosInvalidate(so->currPos);
+ so->hashso_bucket_buf = InvalidBuffer;
+ so->hashso_split_bucket_buf = InvalidBuffer;
+
+ so->hashso_buc_populated = false;
+ so->hashso_buc_split = false;
+
+ so->killedItems = NULL;
+ so->numKilled = 0;
+
+ scan->opaque = so;
+
+ return scan;
+}
+
+/*
+ * hashrescan() -- rescan an index relation
+ */
+void
+hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
+ ScanKey orderbys, int norderbys)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ Relation rel = scan->indexRelation;
+
+ if (HashScanPosIsValid(so->currPos))
+ {
+ /* Before leaving current page, deal with any killed items */
+ if (so->numKilled > 0)
+ _hash_kill_items(scan);
+ }
+
+ _hash_dropscanbuf(rel, so);
+
+ /* set position invalid (this will cause _hash_first call) */
+ HashScanPosInvalidate(so->currPos);
+
+ /* Update scan key, if a new one is given */
+ if (scankey && scan->numberOfKeys > 0)
+ {
+ memmove(scan->keyData,
+ scankey,
+ scan->numberOfKeys * sizeof(ScanKeyData));
+ }
+
+ so->hashso_buc_populated = false;
+ so->hashso_buc_split = false;
+}
+
+/*
+ * hashendscan() -- close down a scan
+ */
+void
+hashendscan(IndexScanDesc scan)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ Relation rel = scan->indexRelation;
+
+ if (HashScanPosIsValid(so->currPos))
+ {
+ /* Before leaving current page, deal with any killed items */
+ if (so->numKilled > 0)
+ _hash_kill_items(scan);
+ }
+
+ _hash_dropscanbuf(rel, so);
+
+ if (so->killedItems != NULL)
+ pfree(so->killedItems);
+ pfree(so);
+ scan->opaque = NULL;
+}
+
+/*
+ * Bulk deletion of all index entries pointing to a set of heap tuples.
+ * The set of target tuples is specified via a callback routine that tells
+ * whether any given heap tuple (identified by ItemPointer) is being deleted.
+ *
+ * This function also deletes the tuples that are moved by split to other
+ * bucket.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+IndexBulkDeleteResult *
+hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
+ IndexBulkDeleteCallback callback, void *callback_state)
+{
+ Relation rel = info->index;
+ double tuples_removed;
+ double num_index_tuples;
+ double orig_ntuples;
+ Bucket orig_maxbucket;
+ Bucket cur_maxbucket;
+ Bucket cur_bucket;
+ Buffer metabuf = InvalidBuffer;
+ HashMetaPage metap;
+ HashMetaPage cachedmetap;
+
+ tuples_removed = 0;
+ num_index_tuples = 0;
+
+ /*
+ * We need a copy of the metapage so that we can use its hashm_spares[]
+ * values to compute bucket page addresses, but a cached copy should be
+ * good enough. (If not, we'll detect that further down and refresh the
+ * cache as necessary.)
+ */
+ cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
+ Assert(cachedmetap != NULL);
+
+ orig_maxbucket = cachedmetap->hashm_maxbucket;
+ orig_ntuples = cachedmetap->hashm_ntuples;
+
+ /* Scan the buckets that we know exist */
+ cur_bucket = 0;
+ cur_maxbucket = orig_maxbucket;
+
+loop_top:
+ while (cur_bucket <= cur_maxbucket)
+ {
+ BlockNumber bucket_blkno;
+ BlockNumber blkno;
+ Buffer bucket_buf;
+ Buffer buf;
+ HashPageOpaque bucket_opaque;
+ Page page;
+ bool split_cleanup = false;
+
+ /* Get address of bucket's start page */
+ bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
+
+ blkno = bucket_blkno;
+
+ /*
+ * We need to acquire a cleanup lock on the primary bucket page to out
+ * wait concurrent scans before deleting the dead tuples.
+ */
+ buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+ LockBufferForCleanup(buf);
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE);
+
+ page = BufferGetPage(buf);
+ bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ /*
+ * If the bucket contains tuples that are moved by split, then we need
+ * to delete such tuples. We can't delete such tuples if the split
+ * operation on bucket is not finished as those are needed by scans.
+ */
+ if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
+ H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
+ {
+ split_cleanup = true;
+
+ /*
+ * This bucket might have been split since we last held a lock on
+ * the metapage. If so, hashm_maxbucket, hashm_highmask and
+ * hashm_lowmask might be old enough to cause us to fail to remove
+ * tuples left behind by the most recent split. To prevent that,
+ * now that the primary page of the target bucket has been locked
+ * (and thus can't be further split), check whether we need to
+ * update our cached metapage data.
+ */
+ Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
+ if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
+ {
+ cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
+ Assert(cachedmetap != NULL);
+ }
+ }
+
+ bucket_buf = buf;
+
+ hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
+ cachedmetap->hashm_maxbucket,
+ cachedmetap->hashm_highmask,
+ cachedmetap->hashm_lowmask, &tuples_removed,
+ &num_index_tuples, split_cleanup,
+ callback, callback_state);
+
+ _hash_dropbuf(rel, bucket_buf);
+
+ /* Advance to next bucket */
+ cur_bucket++;
+ }
+
+ if (BufferIsInvalid(metabuf))
+ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
+
+ /* Write-lock metapage and check for split since we started */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+ if (cur_maxbucket != metap->hashm_maxbucket)
+ {
+ /* There's been a split, so process the additional bucket(s) */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+ cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
+ Assert(cachedmetap != NULL);
+ cur_maxbucket = cachedmetap->hashm_maxbucket;
+ goto loop_top;
+ }
+
+ /* Okay, we're really done. Update tuple count in metapage. */
+ START_CRIT_SECTION();
+
+ if (orig_maxbucket == metap->hashm_maxbucket &&
+ orig_ntuples == metap->hashm_ntuples)
+ {
+ /*
+ * No one has split or inserted anything since start of scan, so
+ * believe our count as gospel.
+ */
+ metap->hashm_ntuples = num_index_tuples;
+ }
+ else
+ {
+ /*
+ * Otherwise, our count is untrustworthy since we may have
+ * double-scanned tuples in split buckets. Proceed by dead-reckoning.
+ * (Note: we still return estimated_count = false, because using this
+ * count is better than not updating reltuples at all.)
+ */
+ if (metap->hashm_ntuples > tuples_removed)
+ metap->hashm_ntuples -= tuples_removed;
+ else
+ metap->hashm_ntuples = 0;
+ num_index_tuples = metap->hashm_ntuples;
+ }
+
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_update_meta_page xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.ntuples = metap->hashm_ntuples;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
+
+ XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ _hash_relbuf(rel, metabuf);
+
+ /* return statistics */
+ if (stats == NULL)
+ stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
+ stats->estimated_count = false;
+ stats->num_index_tuples = num_index_tuples;
+ stats->tuples_removed += tuples_removed;
+ /* hashvacuumcleanup will fill in num_pages */
+
+ return stats;
+}
+
+/*
+ * Post-VACUUM cleanup.
+ *
+ * Result: a palloc'd struct containing statistical info for VACUUM displays.
+ */
+IndexBulkDeleteResult *
+hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
+{
+ Relation rel = info->index;
+ BlockNumber num_pages;
+
+ /* If hashbulkdelete wasn't called, return NULL signifying no change */
+ /* Note: this covers the analyze_only case too */
+ if (stats == NULL)
+ return NULL;
+
+ /* update statistics */
+ num_pages = RelationGetNumberOfBlocks(rel);
+ stats->num_pages = num_pages;
+
+ return stats;
+}
+
+/*
+ * Helper function to perform deletion of index entries from a bucket.
+ *
+ * This function expects that the caller has acquired a cleanup lock on the
+ * primary bucket page, and will return with a write lock again held on the
+ * primary bucket page. The lock won't necessarily be held continuously,
+ * though, because we'll release it when visiting overflow pages.
+ *
+ * There can't be any concurrent scans in progress when we first enter this
+ * function because of the cleanup lock we hold on the primary bucket page,
+ * but as soon as we release that lock, there might be. If those scans got
+ * ahead of our cleanup scan, they might see a tuple before we kill it and
+ * wake up only after VACUUM has completed and the TID has been recycled for
+ * an unrelated tuple. To avoid that calamity, we prevent scans from passing
+ * our cleanup scan by locking the next page in the bucket chain before
+ * releasing the lock on the previous page. (This type of lock chaining is not
+ * ideal, so we might want to look for a better solution at some point.)
+ *
+ * We need to retain a pin on the primary bucket to ensure that no concurrent
+ * split can start.
+ */
+void
+hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
+ BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
+ uint32 maxbucket, uint32 highmask, uint32 lowmask,
+ double *tuples_removed, double *num_index_tuples,
+ bool split_cleanup,
+ IndexBulkDeleteCallback callback, void *callback_state)
+{
+ BlockNumber blkno;
+ Buffer buf;
+ Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
+ bool bucket_dirty = false;
+
+ blkno = bucket_blkno;
+ buf = bucket_buf;
+
+ if (split_cleanup)
+ new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
+ lowmask, maxbucket);
+
+ /* Scan each page in bucket */
+ for (;;)
+ {
+ HashPageOpaque opaque;
+ OffsetNumber offno;
+ OffsetNumber maxoffno;
+ Buffer next_buf;
+ Page page;
+ OffsetNumber deletable[MaxOffsetNumber];
+ int ndeletable = 0;
+ bool retain_pin = false;
+ bool clear_dead_marking = false;
+
+ vacuum_delay_point();
+
+ page = BufferGetPage(buf);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ /* Scan each tuple in page */
+ maxoffno = PageGetMaxOffsetNumber(page);
+ for (offno = FirstOffsetNumber;
+ offno <= maxoffno;
+ offno = OffsetNumberNext(offno))
+ {
+ ItemPointer htup;
+ IndexTuple itup;
+ Bucket bucket;
+ bool kill_tuple = false;
+
+ itup = (IndexTuple) PageGetItem(page,
+ PageGetItemId(page, offno));
+ htup = &(itup->t_tid);
+
+ /*
+ * To remove the dead tuples, we strictly want to rely on results
+ * of callback function. refer btvacuumpage for detailed reason.
+ */
+ if (callback && callback(htup, callback_state))
+ {
+ kill_tuple = true;
+ if (tuples_removed)
+ *tuples_removed += 1;
+ }
+ else if (split_cleanup)
+ {
+ /* delete the tuples that are moved by split. */
+ bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
+ maxbucket,
+ highmask,
+ lowmask);
+ /* mark the item for deletion */
+ if (bucket != cur_bucket)
+ {
+ /*
+ * We expect tuples to either belong to current bucket or
+ * new_bucket. This is ensured because we don't allow
+ * further splits from bucket that contains garbage. See
+ * comments in _hash_expandtable.
+ */
+ Assert(bucket == new_bucket);
+ kill_tuple = true;
+ }
+ }
+
+ if (kill_tuple)
+ {
+ /* mark the item for deletion */
+ deletable[ndeletable++] = offno;
+ }
+ else
+ {
+ /* we're keeping it, so count it */
+ if (num_index_tuples)
+ *num_index_tuples += 1;
+ }
+ }
+
+ /* retain the pin on primary bucket page till end of bucket scan */
+ if (blkno == bucket_blkno)
+ retain_pin = true;
+ else
+ retain_pin = false;
+
+ blkno = opaque->hasho_nextblkno;
+
+ /*
+ * Apply deletions, advance to next page and write page if needed.
+ */
+ if (ndeletable > 0)
+ {
+ /* No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ PageIndexMultiDelete(page, deletable, ndeletable);
+ bucket_dirty = true;
+
+ /*
+ * Let us mark the page as clean if vacuum removes the DEAD tuples
+ * from an index page. We do this by clearing
+ * LH_PAGE_HAS_DEAD_TUPLES flag.
+ */
+ if (tuples_removed && *tuples_removed > 0 &&
+ H_HAS_DEAD_TUPLES(opaque))
+ {
+ opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+ clear_dead_marking = true;
+ }
+
+ MarkBufferDirty(buf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_delete xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.clear_dead_marking = clear_dead_marking;
+ xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
+
+ /*
+ * bucket buffer needs to be registered to ensure that we can
+ * acquire a cleanup lock on it during replay.
+ */
+ if (!xlrec.is_primary_bucket_page)
+ XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
+
+ XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(1, (char *) deletable,
+ ndeletable * sizeof(OffsetNumber));
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
+ PageSetLSN(BufferGetPage(buf), recptr);
+ }
+
+ END_CRIT_SECTION();
+ }
+
+ /* bail out if there are no more pages to scan. */
+ if (!BlockNumberIsValid(blkno))
+ break;
+
+ next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
+ LH_OVERFLOW_PAGE,
+ bstrategy);
+
+ /*
+ * release the lock on previous page after acquiring the lock on next
+ * page
+ */
+ if (retain_pin)
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, buf);
+
+ buf = next_buf;
+ }
+
+ /*
+ * lock the bucket page to clear the garbage flag and squeeze the bucket.
+ * if the current buffer is same as bucket buffer, then we already have
+ * lock on bucket page.
+ */
+ if (buf != bucket_buf)
+ {
+ _hash_relbuf(rel, buf);
+ LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
+ }
+
+ /*
+ * Clear the garbage flag from bucket after deleting the tuples that are
+ * moved by split. We purposefully clear the flag before squeeze bucket,
+ * so that after restart, vacuum shouldn't again try to delete the moved
+ * by split tuples.
+ */
+ if (split_cleanup)
+ {
+ HashPageOpaque bucket_opaque;
+ Page page;
+
+ page = BufferGetPage(bucket_buf);
+ bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ /* No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
+ MarkBufferDirty(bucket_buf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ XLogRecPtr recptr;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+ }
+
+ /*
+ * If we have deleted anything, try to compact free space. For squeezing
+ * the bucket, we must have a cleanup lock, else it can impact the
+ * ordering of tuples for a scan that has started before it.
+ */
+ if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
+ _hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
+ bstrategy);
+ else
+ LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
+}
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
new file mode 100644
index 0000000..af35a99
--- /dev/null
+++ b/src/backend/access/hash/hash_xlog.c
@@ -0,0 +1,1145 @@
+/*-------------------------------------------------------------------------
+ *
+ * hash_xlog.c
+ * WAL replay logic for hash index.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hash_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "access/transam.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "miscadmin.h"
+#include "storage/procarray.h"
+
+/*
+ * replay a hash index meta page
+ */
+static void
+hash_xlog_init_meta_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ Page page;
+ Buffer metabuf;
+ ForkNumber forknum;
+
+ xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);
+
+ /* create the index' metapage */
+ metabuf = XLogInitBufferForRedo(record, 0);
+ Assert(BufferIsValid(metabuf));
+ _hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
+ xlrec->ffactor, true);
+ page = (Page) BufferGetPage(metabuf);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+
+ /*
+ * Force the on-disk state of init forks to always be in sync with the
+ * state in shared buffers. See XLogReadBufferForRedoExtended. We need
+ * special handling for init forks as create index operations don't log a
+ * full page image of the metapage.
+ */
+ XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
+ if (forknum == INIT_FORKNUM)
+ FlushOneBuffer(metabuf);
+
+ /* all done */
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay a hash index bitmap page
+ */
+static void
+hash_xlog_init_bitmap_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ Buffer bitmapbuf;
+ Buffer metabuf;
+ Page page;
+ HashMetaPage metap;
+ uint32 num_buckets;
+ ForkNumber forknum;
+
+ xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);
+
+ /*
+ * Initialize bitmap page
+ */
+ bitmapbuf = XLogInitBufferForRedo(record, 0);
+ _hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
+ PageSetLSN(BufferGetPage(bitmapbuf), lsn);
+ MarkBufferDirty(bitmapbuf);
+
+ /*
+ * Force the on-disk state of init forks to always be in sync with the
+ * state in shared buffers. See XLogReadBufferForRedoExtended. We need
+ * special handling for init forks as create index operations don't log a
+ * full page image of the metapage.
+ */
+ XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
+ if (forknum == INIT_FORKNUM)
+ FlushOneBuffer(bitmapbuf);
+ UnlockReleaseBuffer(bitmapbuf);
+
+ /* add the new bitmap page to the metapage's list of bitmaps */
+ if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+ {
+ /*
+ * Note: in normal operation, we'd update the metapage while still
+ * holding lock on the bitmap page. But during replay it's not
+ * necessary to hold that lock, since nobody can see it yet; the
+ * creating transaction hasn't yet committed.
+ */
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+
+ num_buckets = metap->hashm_maxbucket + 1;
+ metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
+ metap->hashm_nmaps++;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+
+ XLogRecGetBlockTag(record, 1, NULL, &forknum, NULL);
+ if (forknum == INIT_FORKNUM)
+ FlushOneBuffer(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay a hash index insert without split
+ */
+static void
+hash_xlog_insert(XLogReaderState *record)
+{
+ HashMetaPage metap;
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ Size datalen;
+ char *datapos = XLogRecGetBlockData(record, 0, &datalen);
+
+ page = BufferGetPage(buffer);
+
+ if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
+ false, false) == InvalidOffsetNumber)
+ elog(PANIC, "hash_xlog_insert: failed to add item");
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
+ {
+ /*
+ * Note: in normal operation, we'd update the metapage while still
+ * holding lock on the page we inserted into. But during replay it's
+ * not necessary to hold that lock, since no other index updates can
+ * be happening concurrently.
+ */
+ page = BufferGetPage(buffer);
+ metap = HashPageGetMeta(page);
+ metap->hashm_ntuples += 1;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * replay addition of overflow page for hash index
+ */
+static void
+hash_xlog_add_ovfl_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
+ Buffer leftbuf;
+ Buffer ovflbuf;
+ Buffer metabuf;
+ BlockNumber leftblk;
+ BlockNumber rightblk;
+ BlockNumber newmapblk = InvalidBlockNumber;
+ Page ovflpage;
+ HashPageOpaque ovflopaque;
+ uint32 *num_bucket;
+ char *data;
+ Size datalen PG_USED_FOR_ASSERTS_ONLY;
+ bool new_bmpage = false;
+
+ XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
+ XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
+
+ ovflbuf = XLogInitBufferForRedo(record, 0);
+ Assert(BufferIsValid(ovflbuf));
+
+ data = XLogRecGetBlockData(record, 0, &datalen);
+ num_bucket = (uint32 *) data;
+ Assert(datalen == sizeof(uint32));
+ _hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
+ true);
+ /* update backlink */
+ ovflpage = BufferGetPage(ovflbuf);
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+ ovflopaque->hasho_prevblkno = leftblk;
+
+ PageSetLSN(ovflpage, lsn);
+ MarkBufferDirty(ovflbuf);
+
+ if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
+ {
+ Page leftpage;
+ HashPageOpaque leftopaque;
+
+ leftpage = BufferGetPage(leftbuf);
+ leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
+ leftopaque->hasho_nextblkno = rightblk;
+
+ PageSetLSN(leftpage, lsn);
+ MarkBufferDirty(leftbuf);
+ }
+
+ if (BufferIsValid(leftbuf))
+ UnlockReleaseBuffer(leftbuf);
+ UnlockReleaseBuffer(ovflbuf);
+
+ /*
+ * Note: in normal operation, we'd update the bitmap and meta page while
+ * still holding lock on the overflow pages. But during replay it's not
+ * necessary to hold those locks, since no other index updates can be
+ * happening concurrently.
+ */
+ if (XLogRecHasBlockRef(record, 2))
+ {
+ Buffer mapbuffer;
+
+ if (XLogReadBufferForRedo(record, 2, &mapbuffer) == BLK_NEEDS_REDO)
+ {
+ Page mappage = (Page) BufferGetPage(mapbuffer);
+ uint32 *freep = NULL;
+ char *data;
+ uint32 *bitmap_page_bit;
+
+ freep = HashPageGetBitmap(mappage);
+
+ data = XLogRecGetBlockData(record, 2, &datalen);
+ bitmap_page_bit = (uint32 *) data;
+
+ SETBIT(freep, *bitmap_page_bit);
+
+ PageSetLSN(mappage, lsn);
+ MarkBufferDirty(mapbuffer);
+ }
+ if (BufferIsValid(mapbuffer))
+ UnlockReleaseBuffer(mapbuffer);
+ }
+
+ if (XLogRecHasBlockRef(record, 3))
+ {
+ Buffer newmapbuf;
+
+ newmapbuf = XLogInitBufferForRedo(record, 3);
+
+ _hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
+
+ new_bmpage = true;
+ newmapblk = BufferGetBlockNumber(newmapbuf);
+
+ MarkBufferDirty(newmapbuf);
+ PageSetLSN(BufferGetPage(newmapbuf), lsn);
+
+ UnlockReleaseBuffer(newmapbuf);
+ }
+
+ if (XLogReadBufferForRedo(record, 4, &metabuf) == BLK_NEEDS_REDO)
+ {
+ HashMetaPage metap;
+ Page page;
+ uint32 *firstfree_ovflpage;
+
+ data = XLogRecGetBlockData(record, 4, &datalen);
+ firstfree_ovflpage = (uint32 *) data;
+
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+ metap->hashm_firstfree = *firstfree_ovflpage;
+
+ if (!xlrec->bmpage_found)
+ {
+ metap->hashm_spares[metap->hashm_ovflpoint]++;
+
+ if (new_bmpage)
+ {
+ Assert(BlockNumberIsValid(newmapblk));
+
+ metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
+ metap->hashm_nmaps++;
+ metap->hashm_spares[metap->hashm_ovflpoint]++;
+ }
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay allocation of page for split operation
+ */
+static void
+hash_xlog_split_allocate_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
+ Buffer oldbuf;
+ Buffer newbuf;
+ Buffer metabuf;
+ Size datalen PG_USED_FOR_ASSERTS_ONLY;
+ char *data;
+ XLogRedoAction action;
+
+ /*
+ * To be consistent with normal operation, here we take cleanup locks on
+ * both the old and new buckets even though there can't be any concurrent
+ * inserts.
+ */
+
+ /* replay the record for old bucket */
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
+
+ /*
+ * Note that we still update the page even if it was restored from a full
+ * page image, because the special space is not included in the image.
+ */
+ if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+ {
+ Page oldpage;
+ HashPageOpaque oldopaque;
+
+ oldpage = BufferGetPage(oldbuf);
+ oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
+
+ oldopaque->hasho_flag = xlrec->old_bucket_flag;
+ oldopaque->hasho_prevblkno = xlrec->new_bucket;
+
+ PageSetLSN(oldpage, lsn);
+ MarkBufferDirty(oldbuf);
+ }
+
+ /* replay the record for new bucket */
+ newbuf = XLogInitBufferForRedo(record, 1);
+ _hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
+ xlrec->new_bucket_flag, true);
+ if (!IsBufferCleanupOK(newbuf))
+ elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
+ MarkBufferDirty(newbuf);
+ PageSetLSN(BufferGetPage(newbuf), lsn);
+
+ /*
+ * We can release the lock on old bucket early as well but doing here to
+ * consistent with normal operation.
+ */
+ if (BufferIsValid(oldbuf))
+ UnlockReleaseBuffer(oldbuf);
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+
+ /*
+ * Note: in normal operation, we'd update the meta page while still
+ * holding lock on the old and new bucket pages. But during replay it's
+ * not necessary to hold those locks, since no other bucket splits can be
+ * happening concurrently.
+ */
+
+ /* replay the record for metapage changes */
+ if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
+ {
+ Page page;
+ HashMetaPage metap;
+
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+ metap->hashm_maxbucket = xlrec->new_bucket;
+
+ data = XLogRecGetBlockData(record, 2, &datalen);
+
+ if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
+ {
+ uint32 lowmask;
+ uint32 *highmask;
+
+ /* extract low and high masks. */
+ memcpy(&lowmask, data, sizeof(uint32));
+ highmask = (uint32 *) ((char *) data + sizeof(uint32));
+
+ /* update metapage */
+ metap->hashm_lowmask = lowmask;
+ metap->hashm_highmask = *highmask;
+
+ data += sizeof(uint32) * 2;
+ }
+
+ if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
+ {
+ uint32 ovflpoint;
+ uint32 *ovflpages;
+
+ /* extract information of overflow pages. */
+ memcpy(&ovflpoint, data, sizeof(uint32));
+ ovflpages = (uint32 *) ((char *) data + sizeof(uint32));
+
+ /* update metapage */
+ metap->hashm_spares[ovflpoint] = *ovflpages;
+ metap->hashm_ovflpoint = ovflpoint;
+ }
+
+ MarkBufferDirty(metabuf);
+ PageSetLSN(BufferGetPage(metabuf), lsn);
+ }
+
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay of split operation
+ */
+static void
+hash_xlog_split_page(XLogReaderState *record)
+{
+ Buffer buf;
+
+ if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
+ elog(ERROR, "Hash split record did not contain a full-page image");
+
+ UnlockReleaseBuffer(buf);
+}
+
+/*
+ * replay completion of split operation
+ */
+static void
+hash_xlog_split_complete(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
+ Buffer oldbuf;
+ Buffer newbuf;
+ XLogRedoAction action;
+
+ /* replay the record for old bucket */
+ action = XLogReadBufferForRedo(record, 0, &oldbuf);
+
+ /*
+ * Note that we still update the page even if it was restored from a full
+ * page image, because the bucket flag is not included in the image.
+ */
+ if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+ {
+ Page oldpage;
+ HashPageOpaque oldopaque;
+
+ oldpage = BufferGetPage(oldbuf);
+ oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
+
+ oldopaque->hasho_flag = xlrec->old_bucket_flag;
+
+ PageSetLSN(oldpage, lsn);
+ MarkBufferDirty(oldbuf);
+ }
+ if (BufferIsValid(oldbuf))
+ UnlockReleaseBuffer(oldbuf);
+
+ /* replay the record for new bucket */
+ action = XLogReadBufferForRedo(record, 1, &newbuf);
+
+ /*
+ * Note that we still update the page even if it was restored from a full
+ * page image, because the bucket flag is not included in the image.
+ */
+ if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
+ {
+ Page newpage;
+ HashPageOpaque nopaque;
+
+ newpage = BufferGetPage(newbuf);
+ nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);
+
+ nopaque->hasho_flag = xlrec->new_bucket_flag;
+
+ PageSetLSN(newpage, lsn);
+ MarkBufferDirty(newbuf);
+ }
+ if (BufferIsValid(newbuf))
+ UnlockReleaseBuffer(newbuf);
+}
+
+/*
+ * replay move of page contents for squeeze operation of hash index
+ */
+static void
+hash_xlog_move_page_contents(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer writebuf = InvalidBuffer;
+ Buffer deletebuf = InvalidBuffer;
+ XLogRedoAction action;
+
+ /*
+ * Ensure we have a cleanup lock on primary bucket page before we start
+ * with the actual replay operation. This is to ensure that neither a
+ * scan can start nor a scan can be already-in-progress during the replay
+ * of this operation. If we allow scans during this operation, then they
+ * can miss some records or show the same record multiple times.
+ */
+ if (xldata->is_prim_bucket_same_wrt)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
+ else
+ {
+ /*
+ * we don't care for return value as the purpose of reading bucketbuf
+ * is to ensure a cleanup lock on primary bucket page.
+ */
+ (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &writebuf);
+ }
+
+ /* replay the record for adding entries in overflow buffer */
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page writepage;
+ char *begin;
+ char *data;
+ Size datalen;
+ uint16 ninserted = 0;
+
+ data = begin = XLogRecGetBlockData(record, 1, &datalen);
+
+ writepage = (Page) BufferGetPage(writebuf);
+
+ if (xldata->ntups > 0)
+ {
+ OffsetNumber *towrite = (OffsetNumber *) data;
+
+ data += sizeof(OffsetNumber) * xldata->ntups;
+
+ while (data - begin < datalen)
+ {
+ IndexTuple itup = (IndexTuple) data;
+ Size itemsz;
+ OffsetNumber l;
+
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz);
+
+ data += itemsz;
+
+ l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
+ (int) itemsz);
+
+ ninserted++;
+ }
+ }
+
+ /*
+ * number of tuples inserted must be same as requested in REDO record.
+ */
+ Assert(ninserted == xldata->ntups);
+
+ PageSetLSN(writepage, lsn);
+ MarkBufferDirty(writebuf);
+ }
+
+ /* replay the record for deleting entries from overflow buffer */
+ if (XLogReadBufferForRedo(record, 2, &deletebuf) == BLK_NEEDS_REDO)
+ {
+ Page page;
+ char *ptr;
+ Size len;
+
+ ptr = XLogRecGetBlockData(record, 2, &len);
+
+ page = (Page) BufferGetPage(deletebuf);
+
+ if (len > 0)
+ {
+ OffsetNumber *unused;
+ OffsetNumber *unend;
+
+ unused = (OffsetNumber *) ptr;
+ unend = (OffsetNumber *) ((char *) ptr + len);
+
+ if ((unend - unused) > 0)
+ PageIndexMultiDelete(page, unused, unend - unused);
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(deletebuf);
+ }
+
+ /*
+ * Replay is complete, now we can release the buffers. We release locks at
+ * end of replay operation to ensure that we hold lock on primary bucket
+ * page till end of operation. We can optimize by releasing the lock on
+ * write buffer as soon as the operation for same is complete, if it is
+ * not same as primary bucket page, but that doesn't seem to be worth
+ * complicating the code.
+ */
+ if (BufferIsValid(deletebuf))
+ UnlockReleaseBuffer(deletebuf);
+
+ if (BufferIsValid(writebuf))
+ UnlockReleaseBuffer(writebuf);
+
+ if (BufferIsValid(bucketbuf))
+ UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay squeeze page operation of hash index
+ */
+static void
+hash_xlog_squeeze_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer writebuf;
+ Buffer ovflbuf;
+ Buffer prevbuf = InvalidBuffer;
+ Buffer mapbuf;
+ XLogRedoAction action;
+
+ /*
+ * Ensure we have a cleanup lock on primary bucket page before we start
+ * with the actual replay operation. This is to ensure that neither a
+ * scan can start nor a scan can be already-in-progress during the replay
+ * of this operation. If we allow scans during this operation, then they
+ * can miss some records or show the same record multiple times.
+ */
+ if (xldata->is_prim_bucket_same_wrt)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
+ else
+ {
+ /*
+ * we don't care for return value as the purpose of reading bucketbuf
+ * is to ensure a cleanup lock on primary bucket page.
+ */
+ (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &writebuf);
+ }
+
+ /* replay the record for adding entries in overflow buffer */
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page writepage;
+ char *begin;
+ char *data;
+ Size datalen;
+ uint16 ninserted = 0;
+
+ data = begin = XLogRecGetBlockData(record, 1, &datalen);
+
+ writepage = (Page) BufferGetPage(writebuf);
+
+ if (xldata->ntups > 0)
+ {
+ OffsetNumber *towrite = (OffsetNumber *) data;
+
+ data += sizeof(OffsetNumber) * xldata->ntups;
+
+ while (data - begin < datalen)
+ {
+ IndexTuple itup = (IndexTuple) data;
+ Size itemsz;
+ OffsetNumber l;
+
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz);
+
+ data += itemsz;
+
+ l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
+ (int) itemsz);
+
+ ninserted++;
+ }
+ }
+
+ /*
+ * number of tuples inserted must be same as requested in REDO record.
+ */
+ Assert(ninserted == xldata->ntups);
+
+ /*
+ * if the page on which are adding tuples is a page previous to freed
+ * overflow page, then update its nextblkno.
+ */
+ if (xldata->is_prev_bucket_same_wrt)
+ {
+ HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
+
+ writeopaque->hasho_nextblkno = xldata->nextblkno;
+ }
+
+ PageSetLSN(writepage, lsn);
+ MarkBufferDirty(writebuf);
+ }
+
+ /* replay the record for initializing overflow buffer */
+ if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
+ {
+ Page ovflpage;
+ HashPageOpaque ovflopaque;
+
+ ovflpage = BufferGetPage(ovflbuf);
+
+ _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
+
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+
+ ovflopaque->hasho_prevblkno = InvalidBlockNumber;
+ ovflopaque->hasho_nextblkno = InvalidBlockNumber;
+ ovflopaque->hasho_bucket = -1;
+ ovflopaque->hasho_flag = LH_UNUSED_PAGE;
+ ovflopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ PageSetLSN(ovflpage, lsn);
+ MarkBufferDirty(ovflbuf);
+ }
+ if (BufferIsValid(ovflbuf))
+ UnlockReleaseBuffer(ovflbuf);
+
+ /* replay the record for page previous to the freed overflow page */
+ if (!xldata->is_prev_bucket_same_wrt &&
+ XLogReadBufferForRedo(record, 3, &prevbuf) == BLK_NEEDS_REDO)
+ {
+ Page prevpage = BufferGetPage(prevbuf);
+ HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+ prevopaque->hasho_nextblkno = xldata->nextblkno;
+
+ PageSetLSN(prevpage, lsn);
+ MarkBufferDirty(prevbuf);
+ }
+ if (BufferIsValid(prevbuf))
+ UnlockReleaseBuffer(prevbuf);
+
+ /* replay the record for page next to the freed overflow page */
+ if (XLogRecHasBlockRef(record, 4))
+ {
+ Buffer nextbuf;
+
+ if (XLogReadBufferForRedo(record, 4, &nextbuf) == BLK_NEEDS_REDO)
+ {
+ Page nextpage = BufferGetPage(nextbuf);
+ HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
+
+ nextopaque->hasho_prevblkno = xldata->prevblkno;
+
+ PageSetLSN(nextpage, lsn);
+ MarkBufferDirty(nextbuf);
+ }
+ if (BufferIsValid(nextbuf))
+ UnlockReleaseBuffer(nextbuf);
+ }
+
+ if (BufferIsValid(writebuf))
+ UnlockReleaseBuffer(writebuf);
+
+ if (BufferIsValid(bucketbuf))
+ UnlockReleaseBuffer(bucketbuf);
+
+ /*
+ * Note: in normal operation, we'd update the bitmap and meta page while
+ * still holding lock on the primary bucket page and overflow pages. But
+ * during replay it's not necessary to hold those locks, since no other
+ * index updates can be happening concurrently.
+ */
+ /* replay the record for bitmap page */
+ if (XLogReadBufferForRedo(record, 5, &mapbuf) == BLK_NEEDS_REDO)
+ {
+ Page mappage = (Page) BufferGetPage(mapbuf);
+ uint32 *freep = NULL;
+ char *data;
+ uint32 *bitmap_page_bit;
+ Size datalen;
+
+ freep = HashPageGetBitmap(mappage);
+
+ data = XLogRecGetBlockData(record, 5, &datalen);
+ bitmap_page_bit = (uint32 *) data;
+
+ CLRBIT(freep, *bitmap_page_bit);
+
+ PageSetLSN(mappage, lsn);
+ MarkBufferDirty(mapbuf);
+ }
+ if (BufferIsValid(mapbuf))
+ UnlockReleaseBuffer(mapbuf);
+
+ /* replay the record for meta page */
+ if (XLogRecHasBlockRef(record, 6))
+ {
+ Buffer metabuf;
+
+ if (XLogReadBufferForRedo(record, 6, &metabuf) == BLK_NEEDS_REDO)
+ {
+ HashMetaPage metap;
+ Page page;
+ char *data;
+ uint32 *firstfree_ovflpage;
+ Size datalen;
+
+ data = XLogRecGetBlockData(record, 6, &datalen);
+ firstfree_ovflpage = (uint32 *) data;
+
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+ metap->hashm_firstfree = *firstfree_ovflpage;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+ }
+}
+
+/*
+ * replay delete operation of hash index
+ */
+static void
+hash_xlog_delete(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
+ Buffer bucketbuf = InvalidBuffer;
+ Buffer deletebuf;
+ Page page;
+ XLogRedoAction action;
+
+ /*
+ * Ensure we have a cleanup lock on primary bucket page before we start
+ * with the actual replay operation. This is to ensure that neither a
+ * scan can start nor a scan can be already-in-progress during the replay
+ * of this operation. If we allow scans during this operation, then they
+ * can miss some records or show the same record multiple times.
+ */
+ if (xldata->is_primary_bucket_page)
+ action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
+ else
+ {
+ /*
+ * we don't care for return value as the purpose of reading bucketbuf
+ * is to ensure a cleanup lock on primary bucket page.
+ */
+ (void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
+
+ action = XLogReadBufferForRedo(record, 1, &deletebuf);
+ }
+
+ /* replay the record for deleting entries in bucket page */
+ if (action == BLK_NEEDS_REDO)
+ {
+ char *ptr;
+ Size len;
+
+ ptr = XLogRecGetBlockData(record, 1, &len);
+
+ page = (Page) BufferGetPage(deletebuf);
+
+ if (len > 0)
+ {
+ OffsetNumber *unused;
+ OffsetNumber *unend;
+
+ unused = (OffsetNumber *) ptr;
+ unend = (OffsetNumber *) ((char *) ptr + len);
+
+ if ((unend - unused) > 0)
+ PageIndexMultiDelete(page, unused, unend - unused);
+ }
+
+ /*
+ * Mark the page as not containing any LP_DEAD items only if
+ * clear_dead_marking flag is set to true. See comments in
+ * hashbucketcleanup() for details.
+ */
+ if (xldata->clear_dead_marking)
+ {
+ HashPageOpaque pageopaque;
+
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+ }
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(deletebuf);
+ }
+ if (BufferIsValid(deletebuf))
+ UnlockReleaseBuffer(deletebuf);
+
+ if (BufferIsValid(bucketbuf))
+ UnlockReleaseBuffer(bucketbuf);
+}
+
+/*
+ * replay split cleanup flag operation for primary bucket page.
+ */
+static void
+hash_xlog_split_cleanup(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ Buffer buffer;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ HashPageOpaque bucket_opaque;
+
+ page = (Page) BufferGetPage(buffer);
+
+ bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * replay for update meta page
+ */
+static void
+hash_xlog_update_meta_page(XLogReaderState *record)
+{
+ HashMetaPage metap;
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
+ Buffer metabuf;
+ Page page;
+
+ if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(page);
+
+ metap->hashm_ntuples = xldata->ntuples;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * replay delete operation in hash index to remove
+ * tuples marked as DEAD during index tuple insertion.
+ */
+static void
+hash_xlog_vacuum_one_page(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_hash_vacuum_one_page *xldata;
+ Buffer buffer;
+ Buffer metabuf;
+ Page page;
+ XLogRedoAction action;
+ HashPageOpaque pageopaque;
+
+ xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+
+ /*
+ * If we have any conflict processing to do, it must happen before we
+ * update the page.
+ *
+ * Hash index records that are marked as LP_DEAD and being removed during
+ * hash index tuple insertion can conflict with standby queries. You might
+ * think that vacuum records would conflict as well, but we've handled
+ * that already. XLOG_HEAP2_PRUNE records provide the highest xid cleaned
+ * by the vacuum of the heap and so we can resolve any conflicts just once
+ * when that arrives. After that we know that no conflicts exist from
+ * individual hash index vacuum records on that index.
+ */
+ if (InHotStandby)
+ {
+ RelFileNode rnode;
+
+ XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+ ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+ }
+
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
+
+ if (action == BLK_NEEDS_REDO)
+ {
+ page = (Page) BufferGetPage(buffer);
+
+ if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
+ {
+ OffsetNumber *unused;
+
+ unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);
+
+ PageIndexMultiDelete(page, unused, xldata->ntuples);
+ }
+
+ /*
+ * Mark the page as not containing any LP_DEAD items. See comments in
+ * _hash_vacuum_one_page() for details.
+ */
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+ {
+ Page metapage;
+ HashMetaPage metap;
+
+ metapage = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(metapage);
+
+ metap->hashm_ntuples -= xldata->ntuples;
+
+ PageSetLSN(metapage, lsn);
+ MarkBufferDirty(metabuf);
+ }
+ if (BufferIsValid(metabuf))
+ UnlockReleaseBuffer(metabuf);
+}
+
+void
+hash_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_HASH_INIT_META_PAGE:
+ hash_xlog_init_meta_page(record);
+ break;
+ case XLOG_HASH_INIT_BITMAP_PAGE:
+ hash_xlog_init_bitmap_page(record);
+ break;
+ case XLOG_HASH_INSERT:
+ hash_xlog_insert(record);
+ break;
+ case XLOG_HASH_ADD_OVFL_PAGE:
+ hash_xlog_add_ovfl_page(record);
+ break;
+ case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
+ hash_xlog_split_allocate_page(record);
+ break;
+ case XLOG_HASH_SPLIT_PAGE:
+ hash_xlog_split_page(record);
+ break;
+ case XLOG_HASH_SPLIT_COMPLETE:
+ hash_xlog_split_complete(record);
+ break;
+ case XLOG_HASH_MOVE_PAGE_CONTENTS:
+ hash_xlog_move_page_contents(record);
+ break;
+ case XLOG_HASH_SQUEEZE_PAGE:
+ hash_xlog_squeeze_page(record);
+ break;
+ case XLOG_HASH_DELETE:
+ hash_xlog_delete(record);
+ break;
+ case XLOG_HASH_SPLIT_CLEANUP:
+ hash_xlog_split_cleanup(record);
+ break;
+ case XLOG_HASH_UPDATE_META_PAGE:
+ hash_xlog_update_meta_page(record);
+ break;
+ case XLOG_HASH_VACUUM_ONE_PAGE:
+ hash_xlog_vacuum_one_page(record);
+ break;
+ default:
+ elog(PANIC, "hash_redo: unknown op code %u", info);
+ }
+}
+
+/*
+ * Mask a hash page before performing consistency checks on it.
+ */
+void
+hash_mask(char *pagedata, BlockNumber blkno)
+{
+ Page page = (Page) pagedata;
+ HashPageOpaque opaque;
+ int pagetype;
+
+ mask_page_lsn_and_checksum(page);
+
+ mask_page_hint_bits(page);
+ mask_unused_space(page);
+
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
+ if (pagetype == LH_UNUSED_PAGE)
+ {
+ /*
+ * Mask everything on a UNUSED page.
+ */
+ mask_page_content(page);
+ }
+ else if (pagetype == LH_BUCKET_PAGE ||
+ pagetype == LH_OVERFLOW_PAGE)
+ {
+ /*
+ * In hash bucket and overflow pages, it is possible to modify the
+ * LP_FLAGS without emitting any WAL record. Hence, mask the line
+ * pointer flags. See hashgettuple(), _hash_kill_items() for details.
+ */
+ mask_lp_flags(page);
+ }
+
+ /*
+ * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
+ * unlogged. So, mask it. See _hash_kill_items() for details.
+ */
+ opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+}
diff --git a/src/backend/access/hash/hashfunc.c b/src/backend/access/hash/hashfunc.c
new file mode 100644
index 0000000..2423339
--- /dev/null
+++ b/src/backend/access/hash/hashfunc.c
@@ -0,0 +1,411 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashfunc.c
+ * Support functions for hash access method.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashfunc.c
+ *
+ * NOTES
+ * These functions are stored in pg_amproc. For each operator class
+ * defined for hash indexes, they compute the hash value of the argument.
+ *
+ * Additional hash functions appear in /utils/adt/ files for various
+ * specialized datatypes.
+ *
+ * It is expected that every bit of a hash function's 32-bit result is
+ * as random as every other; failure to ensure this is likely to lead
+ * to poor performance of hash joins, for example. In most cases a hash
+ * function should use hash_any() or its variant hash_uint32().
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "catalog/pg_collation.h"
+#include "common/hashfn.h"
+#include "utils/builtins.h"
+#include "utils/float.h"
+#include "utils/pg_locale.h"
+
+/*
+ * Datatype-specific hash functions.
+ *
+ * These support both hash indexes and hash joins.
+ *
+ * NOTE: some of these are also used by catcache operations, without
+ * any direct connection to hash indexes. Also, the common hash_any
+ * routine is also used by dynahash tables.
+ */
+
+/* Note: this is used for both "char" and boolean datatypes */
+Datum
+hashchar(PG_FUNCTION_ARGS)
+{
+ return hash_uint32((int32) PG_GETARG_CHAR(0));
+}
+
+Datum
+hashcharextended(PG_FUNCTION_ARGS)
+{
+ return hash_uint32_extended((int32) PG_GETARG_CHAR(0), PG_GETARG_INT64(1));
+}
+
+Datum
+hashint2(PG_FUNCTION_ARGS)
+{
+ return hash_uint32((int32) PG_GETARG_INT16(0));
+}
+
+Datum
+hashint2extended(PG_FUNCTION_ARGS)
+{
+ return hash_uint32_extended((int32) PG_GETARG_INT16(0), PG_GETARG_INT64(1));
+}
+
+Datum
+hashint4(PG_FUNCTION_ARGS)
+{
+ return hash_uint32(PG_GETARG_INT32(0));
+}
+
+Datum
+hashint4extended(PG_FUNCTION_ARGS)
+{
+ return hash_uint32_extended(PG_GETARG_INT32(0), PG_GETARG_INT64(1));
+}
+
+Datum
+hashint8(PG_FUNCTION_ARGS)
+{
+ /*
+ * The idea here is to produce a hash value compatible with the values
+ * produced by hashint4 and hashint2 for logically equal inputs; this is
+ * necessary to support cross-type hash joins across these input types.
+ * Since all three types are signed, we can xor the high half of the int8
+ * value if the sign is positive, or the complement of the high half when
+ * the sign is negative.
+ */
+ int64 val = PG_GETARG_INT64(0);
+ uint32 lohalf = (uint32) val;
+ uint32 hihalf = (uint32) (val >> 32);
+
+ lohalf ^= (val >= 0) ? hihalf : ~hihalf;
+
+ return hash_uint32(lohalf);
+}
+
+Datum
+hashint8extended(PG_FUNCTION_ARGS)
+{
+ /* Same approach as hashint8 */
+ int64 val = PG_GETARG_INT64(0);
+ uint32 lohalf = (uint32) val;
+ uint32 hihalf = (uint32) (val >> 32);
+
+ lohalf ^= (val >= 0) ? hihalf : ~hihalf;
+
+ return hash_uint32_extended(lohalf, PG_GETARG_INT64(1));
+}
+
+Datum
+hashoid(PG_FUNCTION_ARGS)
+{
+ return hash_uint32((uint32) PG_GETARG_OID(0));
+}
+
+Datum
+hashoidextended(PG_FUNCTION_ARGS)
+{
+ return hash_uint32_extended((uint32) PG_GETARG_OID(0), PG_GETARG_INT64(1));
+}
+
+Datum
+hashenum(PG_FUNCTION_ARGS)
+{
+ return hash_uint32((uint32) PG_GETARG_OID(0));
+}
+
+Datum
+hashenumextended(PG_FUNCTION_ARGS)
+{
+ return hash_uint32_extended((uint32) PG_GETARG_OID(0), PG_GETARG_INT64(1));
+}
+
+Datum
+hashfloat4(PG_FUNCTION_ARGS)
+{
+ float4 key = PG_GETARG_FLOAT4(0);
+ float8 key8;
+
+ /*
+ * On IEEE-float machines, minus zero and zero have different bit patterns
+ * but should compare as equal. We must ensure that they have the same
+ * hash value, which is most reliably done this way:
+ */
+ if (key == (float4) 0)
+ PG_RETURN_UINT32(0);
+
+ /*
+ * To support cross-type hashing of float8 and float4, we want to return
+ * the same hash value hashfloat8 would produce for an equal float8 value.
+ * So, widen the value to float8 and hash that. (We must do this rather
+ * than have hashfloat8 try to narrow its value to float4; that could fail
+ * on overflow.)
+ */
+ key8 = key;
+
+ /*
+ * Similarly, NaNs can have different bit patterns but they should all
+ * compare as equal. For backwards-compatibility reasons we force them to
+ * have the hash value of a standard float8 NaN. (You'd think we could
+ * replace key with a float4 NaN and then widen it; but on some old
+ * platforms, that way produces a different bit pattern.)
+ */
+ if (isnan(key8))
+ key8 = get_float8_nan();
+
+ return hash_any((unsigned char *) &key8, sizeof(key8));
+}
+
+Datum
+hashfloat4extended(PG_FUNCTION_ARGS)
+{
+ float4 key = PG_GETARG_FLOAT4(0);
+ uint64 seed = PG_GETARG_INT64(1);
+ float8 key8;
+
+ /* Same approach as hashfloat4 */
+ if (key == (float4) 0)
+ PG_RETURN_UINT64(seed);
+ key8 = key;
+ if (isnan(key8))
+ key8 = get_float8_nan();
+
+ return hash_any_extended((unsigned char *) &key8, sizeof(key8), seed);
+}
+
+Datum
+hashfloat8(PG_FUNCTION_ARGS)
+{
+ float8 key = PG_GETARG_FLOAT8(0);
+
+ /*
+ * On IEEE-float machines, minus zero and zero have different bit patterns
+ * but should compare as equal. We must ensure that they have the same
+ * hash value, which is most reliably done this way:
+ */
+ if (key == (float8) 0)
+ PG_RETURN_UINT32(0);
+
+ /*
+ * Similarly, NaNs can have different bit patterns but they should all
+ * compare as equal. For backwards-compatibility reasons we force them to
+ * have the hash value of a standard NaN.
+ */
+ if (isnan(key))
+ key = get_float8_nan();
+
+ return hash_any((unsigned char *) &key, sizeof(key));
+}
+
+Datum
+hashfloat8extended(PG_FUNCTION_ARGS)
+{
+ float8 key = PG_GETARG_FLOAT8(0);
+ uint64 seed = PG_GETARG_INT64(1);
+
+ /* Same approach as hashfloat8 */
+ if (key == (float8) 0)
+ PG_RETURN_UINT64(seed);
+ if (isnan(key))
+ key = get_float8_nan();
+
+ return hash_any_extended((unsigned char *) &key, sizeof(key), seed);
+}
+
+Datum
+hashoidvector(PG_FUNCTION_ARGS)
+{
+ oidvector *key = (oidvector *) PG_GETARG_POINTER(0);
+
+ return hash_any((unsigned char *) key->values, key->dim1 * sizeof(Oid));
+}
+
+Datum
+hashoidvectorextended(PG_FUNCTION_ARGS)
+{
+ oidvector *key = (oidvector *) PG_GETARG_POINTER(0);
+
+ return hash_any_extended((unsigned char *) key->values,
+ key->dim1 * sizeof(Oid),
+ PG_GETARG_INT64(1));
+}
+
+Datum
+hashname(PG_FUNCTION_ARGS)
+{
+ char *key = NameStr(*PG_GETARG_NAME(0));
+
+ return hash_any((unsigned char *) key, strlen(key));
+}
+
+Datum
+hashnameextended(PG_FUNCTION_ARGS)
+{
+ char *key = NameStr(*PG_GETARG_NAME(0));
+
+ return hash_any_extended((unsigned char *) key, strlen(key),
+ PG_GETARG_INT64(1));
+}
+
+Datum
+hashtext(PG_FUNCTION_ARGS)
+{
+ text *key = PG_GETARG_TEXT_PP(0);
+ Oid collid = PG_GET_COLLATION();
+ pg_locale_t mylocale = 0;
+ Datum result;
+
+ if (!collid)
+ ereport(ERROR,
+ (errcode(ERRCODE_INDETERMINATE_COLLATION),
+ errmsg("could not determine which collation to use for string hashing"),
+ errhint("Use the COLLATE clause to set the collation explicitly.")));
+
+ if (!lc_collate_is_c(collid) && collid != DEFAULT_COLLATION_OID)
+ mylocale = pg_newlocale_from_collation(collid);
+
+ if (!mylocale || mylocale->deterministic)
+ {
+ result = hash_any((unsigned char *) VARDATA_ANY(key),
+ VARSIZE_ANY_EXHDR(key));
+ }
+ else
+ {
+#ifdef USE_ICU
+ if (mylocale->provider == COLLPROVIDER_ICU)
+ {
+ int32_t ulen = -1;
+ UChar *uchar = NULL;
+ Size bsize;
+ uint8_t *buf;
+
+ ulen = icu_to_uchar(&uchar, VARDATA_ANY(key), VARSIZE_ANY_EXHDR(key));
+
+ bsize = ucol_getSortKey(mylocale->info.icu.ucol,
+ uchar, ulen, NULL, 0);
+ buf = palloc(bsize);
+ ucol_getSortKey(mylocale->info.icu.ucol,
+ uchar, ulen, buf, bsize);
+
+ result = hash_any(buf, bsize);
+
+ pfree(buf);
+ }
+ else
+#endif
+ /* shouldn't happen */
+ elog(ERROR, "unsupported collprovider: %c", mylocale->provider);
+ }
+
+ /* Avoid leaking memory for toasted inputs */
+ PG_FREE_IF_COPY(key, 0);
+
+ return result;
+}
+
+Datum
+hashtextextended(PG_FUNCTION_ARGS)
+{
+ text *key = PG_GETARG_TEXT_PP(0);
+ Oid collid = PG_GET_COLLATION();
+ pg_locale_t mylocale = 0;
+ Datum result;
+
+ if (!collid)
+ ereport(ERROR,
+ (errcode(ERRCODE_INDETERMINATE_COLLATION),
+ errmsg("could not determine which collation to use for string hashing"),
+ errhint("Use the COLLATE clause to set the collation explicitly.")));
+
+ if (!lc_collate_is_c(collid) && collid != DEFAULT_COLLATION_OID)
+ mylocale = pg_newlocale_from_collation(collid);
+
+ if (!mylocale || mylocale->deterministic)
+ {
+ result = hash_any_extended((unsigned char *) VARDATA_ANY(key),
+ VARSIZE_ANY_EXHDR(key),
+ PG_GETARG_INT64(1));
+ }
+ else
+ {
+#ifdef USE_ICU
+ if (mylocale->provider == COLLPROVIDER_ICU)
+ {
+ int32_t ulen = -1;
+ UChar *uchar = NULL;
+ Size bsize;
+ uint8_t *buf;
+
+ ulen = icu_to_uchar(&uchar, VARDATA_ANY(key), VARSIZE_ANY_EXHDR(key));
+
+ bsize = ucol_getSortKey(mylocale->info.icu.ucol,
+ uchar, ulen, NULL, 0);
+ buf = palloc(bsize);
+ ucol_getSortKey(mylocale->info.icu.ucol,
+ uchar, ulen, buf, bsize);
+
+ result = hash_any_extended(buf, bsize, PG_GETARG_INT64(1));
+
+ pfree(buf);
+ }
+ else
+#endif
+ /* shouldn't happen */
+ elog(ERROR, "unsupported collprovider: %c", mylocale->provider);
+ }
+
+ PG_FREE_IF_COPY(key, 0);
+
+ return result;
+}
+
+/*
+ * hashvarlena() can be used for any varlena datatype in which there are
+ * no non-significant bits, ie, distinct bitpatterns never compare as equal.
+ */
+Datum
+hashvarlena(PG_FUNCTION_ARGS)
+{
+ struct varlena *key = PG_GETARG_VARLENA_PP(0);
+ Datum result;
+
+ result = hash_any((unsigned char *) VARDATA_ANY(key),
+ VARSIZE_ANY_EXHDR(key));
+
+ /* Avoid leaking memory for toasted inputs */
+ PG_FREE_IF_COPY(key, 0);
+
+ return result;
+}
+
+Datum
+hashvarlenaextended(PG_FUNCTION_ARGS)
+{
+ struct varlena *key = PG_GETARG_VARLENA_PP(0);
+ Datum result;
+
+ result = hash_any_extended((unsigned char *) VARDATA_ANY(key),
+ VARSIZE_ANY_EXHDR(key),
+ PG_GETARG_INT64(1));
+
+ PG_FREE_IF_COPY(key, 0);
+
+ return result;
+}
diff --git a/src/backend/access/hash/hashinsert.c b/src/backend/access/hash/hashinsert.c
new file mode 100644
index 0000000..d254a00
--- /dev/null
+++ b/src/backend/access/hash/hashinsert.c
@@ -0,0 +1,432 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashinsert.c
+ * Item insertion in hash tables for Postgres.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashinsert.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "miscadmin.h"
+#include "storage/buf_internals.h"
+#include "storage/lwlock.h"
+#include "storage/predicate.h"
+#include "utils/rel.h"
+
+static void _hash_vacuum_one_page(Relation rel, Relation hrel,
+ Buffer metabuf, Buffer buf);
+
+/*
+ * _hash_doinsert() -- Handle insertion of a single index tuple.
+ *
+ * This routine is called by the public interface routines, hashbuild
+ * and hashinsert. By here, itup is completely filled in.
+ */
+void
+_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
+{
+ Buffer buf = InvalidBuffer;
+ Buffer bucket_buf;
+ Buffer metabuf;
+ HashMetaPage metap;
+ HashMetaPage usedmetap = NULL;
+ Page metapage;
+ Page page;
+ HashPageOpaque pageopaque;
+ Size itemsz;
+ bool do_expand;
+ uint32 hashkey;
+ Bucket bucket;
+ OffsetNumber itup_off;
+
+ /*
+ * Get the hash key for the item (it's stored in the index tuple itself).
+ */
+ hashkey = _hash_get_indextuple_hashkey(itup);
+
+ /* compute item size too */
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
+ * need to be consistent */
+
+restart_insert:
+
+ /*
+ * Read the metapage. We don't lock it yet; HashMaxItemSize() will
+ * examine pd_pagesize_version, but that can't change so we can examine it
+ * without a lock.
+ */
+ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
+ metapage = BufferGetPage(metabuf);
+
+ /*
+ * Check whether the item can fit on a hash page at all. (Eventually, we
+ * ought to try to apply TOAST methods if not.) Note that at this point,
+ * itemsz doesn't include the ItemId.
+ *
+ * XXX this is useless code if we are only storing hash keys.
+ */
+ if (itemsz > HashMaxItemSize(metapage))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row size %zu exceeds hash maximum %zu",
+ itemsz, HashMaxItemSize(metapage)),
+ errhint("Values larger than a buffer page cannot be indexed.")));
+
+ /* Lock the primary bucket page for the target bucket. */
+ buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
+ &usedmetap);
+ Assert(usedmetap != NULL);
+
+ CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(buf));
+
+ /* remember the primary bucket buffer to release the pin on it at end. */
+ bucket_buf = buf;
+
+ page = BufferGetPage(buf);
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ bucket = pageopaque->hasho_bucket;
+
+ /*
+ * If this bucket is in the process of being split, try to finish the
+ * split before inserting, because that might create room for the
+ * insertion to proceed without allocating an additional overflow page.
+ * It's only interesting to finish the split if we're trying to insert
+ * into the bucket from which we're removing tuples (the "old" bucket),
+ * not if we're trying to insert into the bucket into which tuples are
+ * being moved (the "new" bucket).
+ */
+ if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
+ {
+ /* release the lock on bucket buffer, before completing the split. */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ _hash_finish_split(rel, metabuf, buf, bucket,
+ usedmetap->hashm_maxbucket,
+ usedmetap->hashm_highmask,
+ usedmetap->hashm_lowmask);
+
+ /* release the pin on old and meta buffer. retry for insert. */
+ _hash_dropbuf(rel, buf);
+ _hash_dropbuf(rel, metabuf);
+ goto restart_insert;
+ }
+
+ /* Do the insertion */
+ while (PageGetFreeSpace(page) < itemsz)
+ {
+ BlockNumber nextblkno;
+
+ /*
+ * Check if current page has any DEAD tuples. If yes, delete these
+ * tuples and see if we can get a space for the new item to be
+ * inserted before moving to the next page in the bucket chain.
+ */
+ if (H_HAS_DEAD_TUPLES(pageopaque))
+ {
+
+ if (IsBufferCleanupOK(buf))
+ {
+ _hash_vacuum_one_page(rel, heapRel, metabuf, buf);
+
+ if (PageGetFreeSpace(page) >= itemsz)
+ break; /* OK, now we have enough space */
+ }
+ }
+
+ /*
+ * no space on this page; check for an overflow page
+ */
+ nextblkno = pageopaque->hasho_nextblkno;
+
+ if (BlockNumberIsValid(nextblkno))
+ {
+ /*
+ * ovfl page exists; go get it. if it doesn't have room, we'll
+ * find out next pass through the loop test above. we always
+ * release both the lock and pin if this is an overflow page, but
+ * only the lock if this is the primary bucket page, since the pin
+ * on the primary bucket must be retained throughout the scan.
+ */
+ if (buf != bucket_buf)
+ _hash_relbuf(rel, buf);
+ else
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+ }
+ else
+ {
+ /*
+ * we're at the end of the bucket chain and we haven't found a
+ * page with enough room. allocate a new overflow page.
+ */
+
+ /* release our write lock without modifying buffer */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ /* chain to a new overflow page */
+ buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf) ? true : false);
+ page = BufferGetPage(buf);
+
+ /* should fit now, given test above */
+ Assert(PageGetFreeSpace(page) >= itemsz);
+ }
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
+ Assert(pageopaque->hasho_bucket == bucket);
+ }
+
+ /*
+ * Write-lock the metapage so we can increment the tuple count. After
+ * incrementing it, check to see if it's time for a split.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* Do the update. No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ /* found page with enough space, so add the item here */
+ itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
+ MarkBufferDirty(buf);
+
+ /* metapage operations */
+ metap = HashPageGetMeta(metapage);
+ metap->hashm_ntuples += 1;
+
+ /* Make sure this stays in sync with _hash_expandtable() */
+ do_expand = metap->hashm_ntuples >
+ (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
+
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_insert xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.offnum = itup_off;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashInsert);
+
+ XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterBufData(0, (char *) itup, IndexTupleSize(itup));
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /* drop lock on metapage, but keep pin */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Release the modified page and ensure to release the pin on primary
+ * page.
+ */
+ _hash_relbuf(rel, buf);
+ if (buf != bucket_buf)
+ _hash_dropbuf(rel, bucket_buf);
+
+ /* Attempt to split if a split is needed */
+ if (do_expand)
+ _hash_expandtable(rel, metabuf);
+
+ /* Finally drop our pin on the metapage */
+ _hash_dropbuf(rel, metabuf);
+}
+
+/*
+ * _hash_pgaddtup() -- add a tuple to a particular page in the index.
+ *
+ * This routine adds the tuple to the page as requested; it does not write out
+ * the page. It is an error to call this function without pin and write lock
+ * on the target buffer.
+ *
+ * Returns the offset number at which the tuple was inserted. This function
+ * is responsible for preserving the condition that tuples in a hash index
+ * page are sorted by hashkey value.
+ */
+OffsetNumber
+_hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
+{
+ OffsetNumber itup_off;
+ Page page;
+ uint32 hashkey;
+
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+
+ /* Find where to insert the tuple (preserving page's hashkey ordering) */
+ hashkey = _hash_get_indextuple_hashkey(itup);
+ itup_off = _hash_binsearch(page, hashkey);
+
+ if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
+ == InvalidOffsetNumber)
+ elog(ERROR, "failed to add index item to \"%s\"",
+ RelationGetRelationName(rel));
+
+ return itup_off;
+}
+
+/*
+ * _hash_pgaddmultitup() -- add a tuple vector to a particular page in the
+ * index.
+ *
+ * This routine has same requirements for locking and tuple ordering as
+ * _hash_pgaddtup().
+ *
+ * Returns the offset number array at which the tuples were inserted.
+ */
+void
+_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
+ OffsetNumber *itup_offsets, uint16 nitups)
+{
+ OffsetNumber itup_off;
+ Page page;
+ uint32 hashkey;
+ int i;
+
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+
+ for (i = 0; i < nitups; i++)
+ {
+ Size itemsize;
+
+ itemsize = IndexTupleSize(itups[i]);
+ itemsize = MAXALIGN(itemsize);
+
+ /* Find where to insert the tuple (preserving page's hashkey ordering) */
+ hashkey = _hash_get_indextuple_hashkey(itups[i]);
+ itup_off = _hash_binsearch(page, hashkey);
+
+ itup_offsets[i] = itup_off;
+
+ if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
+ == InvalidOffsetNumber)
+ elog(ERROR, "failed to add index item to \"%s\"",
+ RelationGetRelationName(rel));
+ }
+}
+
+/*
+ * _hash_vacuum_one_page - vacuum just one index page.
+ *
+ * Try to remove LP_DEAD items from the given page. We must acquire cleanup
+ * lock on the page being modified before calling this function.
+ */
+
+static void
+_hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
+{
+ OffsetNumber deletable[MaxOffsetNumber];
+ int ndeletable = 0;
+ OffsetNumber offnum,
+ maxoff;
+ Page page = BufferGetPage(buf);
+ HashPageOpaque pageopaque;
+ HashMetaPage metap;
+
+ /* Scan each tuple in page to see if it is marked as LP_DEAD */
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page, offnum);
+
+ if (ItemIdIsDead(itemId))
+ deletable[ndeletable++] = offnum;
+ }
+
+ if (ndeletable > 0)
+ {
+ TransactionId latestRemovedXid;
+
+ latestRemovedXid =
+ index_compute_xid_horizon_for_tuples(rel, hrel, buf,
+ deletable, ndeletable);
+
+ /*
+ * Write-lock the meta page so that we can decrement tuple count.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* No ereport(ERROR) until changes are logged */
+ START_CRIT_SECTION();
+
+ PageIndexMultiDelete(page, deletable, ndeletable);
+
+ /*
+ * Mark the page as not containing any LP_DEAD items. This is not
+ * certainly true (there might be some that have recently been marked,
+ * but weren't included in our target-item list), but it will almost
+ * always be true and it doesn't seem worth an additional page scan to
+ * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
+ * anyway.
+ */
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+ metap->hashm_ntuples -= ndeletable;
+
+ MarkBufferDirty(buf);
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_vacuum_one_page xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.latestRemovedXid = latestRemovedXid;
+ xlrec.ntuples = ndeletable;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+ XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
+
+ /*
+ * We need the target-offsets array whether or not we store the
+ * whole buffer, to allow us to find the latestRemovedXid on a
+ * standby server.
+ */
+ XLogRegisterData((char *) deletable,
+ ndeletable * sizeof(OffsetNumber));
+
+ XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /*
+ * Releasing write lock on meta page as we have updated the tuple
+ * count.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+ }
+}
diff --git a/src/backend/access/hash/hashovfl.c b/src/backend/access/hash/hashovfl.c
new file mode 100644
index 0000000..1ff2e0c
--- /dev/null
+++ b/src/backend/access/hash/hashovfl.c
@@ -0,0 +1,1083 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashovfl.c
+ * Overflow page management code for the Postgres hash access method
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashovfl.c
+ *
+ * NOTES
+ * Overflow pages look like ordinary relation pages.
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "miscadmin.h"
+#include "utils/rel.h"
+
+
+static uint32 _hash_firstfreebit(uint32 map);
+
+
+/*
+ * Convert overflow page bit number (its index in the free-page bitmaps)
+ * to block number within the index.
+ */
+static BlockNumber
+bitno_to_blkno(HashMetaPage metap, uint32 ovflbitnum)
+{
+ uint32 splitnum = metap->hashm_ovflpoint;
+ uint32 i;
+
+ /* Convert zero-based bitnumber to 1-based page number */
+ ovflbitnum += 1;
+
+ /* Determine the split number for this page (must be >= 1) */
+ for (i = 1;
+ i < splitnum && ovflbitnum > metap->hashm_spares[i];
+ i++)
+ /* loop */ ;
+
+ /*
+ * Convert to absolute page number by adding the number of bucket pages
+ * that exist before this split point.
+ */
+ return (BlockNumber) (_hash_get_totalbuckets(i) + ovflbitnum);
+}
+
+/*
+ * _hash_ovflblkno_to_bitno
+ *
+ * Convert overflow page block number to bit number for free-page bitmap.
+ */
+uint32
+_hash_ovflblkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
+{
+ uint32 splitnum = metap->hashm_ovflpoint;
+ uint32 i;
+ uint32 bitnum;
+
+ /* Determine the split number containing this page */
+ for (i = 1; i <= splitnum; i++)
+ {
+ if (ovflblkno <= (BlockNumber) _hash_get_totalbuckets(i))
+ break; /* oops */
+ bitnum = ovflblkno - _hash_get_totalbuckets(i);
+
+ /*
+ * bitnum has to be greater than number of overflow page added in
+ * previous split point. The overflow page at this splitnum (i) if any
+ * should start from (_hash_get_totalbuckets(i) +
+ * metap->hashm_spares[i - 1] + 1).
+ */
+ if (bitnum > metap->hashm_spares[i - 1] &&
+ bitnum <= metap->hashm_spares[i])
+ return bitnum - 1; /* -1 to convert 1-based to 0-based */
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid overflow block number %u", ovflblkno)));
+ return 0; /* keep compiler quiet */
+}
+
+/*
+ * _hash_addovflpage
+ *
+ * Add an overflow page to the bucket whose last page is pointed to by 'buf'.
+ *
+ * On entry, the caller must hold a pin but no lock on 'buf'. The pin is
+ * dropped before exiting (we assume the caller is not interested in 'buf'
+ * anymore) if not asked to retain. The pin will be retained only for the
+ * primary bucket. The returned overflow page will be pinned and
+ * write-locked; it is guaranteed to be empty.
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * That buffer is returned in the same state.
+ *
+ * NB: since this could be executed concurrently by multiple processes,
+ * one should not assume that the returned overflow page will be the
+ * immediate successor of the originally passed 'buf'. Additional overflow
+ * pages might have been added to the bucket chain in between.
+ */
+Buffer
+_hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
+{
+ Buffer ovflbuf;
+ Page page;
+ Page ovflpage;
+ HashPageOpaque pageopaque;
+ HashPageOpaque ovflopaque;
+ HashMetaPage metap;
+ Buffer mapbuf = InvalidBuffer;
+ Buffer newmapbuf = InvalidBuffer;
+ BlockNumber blkno;
+ uint32 orig_firstfree;
+ uint32 splitnum;
+ uint32 *freep = NULL;
+ uint32 max_ovflpg;
+ uint32 bit;
+ uint32 bitmap_page_bit;
+ uint32 first_page;
+ uint32 last_bit;
+ uint32 last_page;
+ uint32 i,
+ j;
+ bool page_found = false;
+
+ /*
+ * Write-lock the tail page. Here, we need to maintain locking order such
+ * that, first acquire the lock on tail page of bucket, then on meta page
+ * to find and lock the bitmap page and if it is found, then lock on meta
+ * page is released, then finally acquire the lock on new overflow buffer.
+ * We need this locking order to avoid deadlock with backends that are
+ * doing inserts.
+ *
+ * Note: We could have avoided locking many buffers here if we made two
+ * WAL records for acquiring an overflow page (one to allocate an overflow
+ * page and another to add it to overflow bucket chain). However, doing
+ * so can leak an overflow page, if the system crashes after allocation.
+ * Needless to say, it is better to have a single record from a
+ * performance point of view as well.
+ */
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* probably redundant... */
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+
+ /* loop to find current tail page, in case someone else inserted too */
+ for (;;)
+ {
+ BlockNumber nextblkno;
+
+ page = BufferGetPage(buf);
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ nextblkno = pageopaque->hasho_nextblkno;
+
+ if (!BlockNumberIsValid(nextblkno))
+ break;
+
+ /* we assume we do not need to write the unmodified page */
+ if (retain_pin)
+ {
+ /* pin will be retained only for the primary bucket page */
+ Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_BUCKET_PAGE);
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ }
+ else
+ _hash_relbuf(rel, buf);
+
+ retain_pin = false;
+
+ buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
+ }
+
+ /* Get exclusive lock on the meta page */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ _hash_checkpage(rel, metabuf, LH_META_PAGE);
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+ /* start search at hashm_firstfree */
+ orig_firstfree = metap->hashm_firstfree;
+ first_page = orig_firstfree >> BMPG_SHIFT(metap);
+ bit = orig_firstfree & BMPG_MASK(metap);
+ i = first_page;
+ j = bit / BITS_PER_MAP;
+ bit &= ~(BITS_PER_MAP - 1);
+
+ /* outer loop iterates once per bitmap page */
+ for (;;)
+ {
+ BlockNumber mapblkno;
+ Page mappage;
+ uint32 last_inpage;
+
+ /* want to end search with the last existing overflow page */
+ splitnum = metap->hashm_ovflpoint;
+ max_ovflpg = metap->hashm_spares[splitnum] - 1;
+ last_page = max_ovflpg >> BMPG_SHIFT(metap);
+ last_bit = max_ovflpg & BMPG_MASK(metap);
+
+ if (i > last_page)
+ break;
+
+ Assert(i < metap->hashm_nmaps);
+ mapblkno = metap->hashm_mapp[i];
+
+ if (i == last_page)
+ last_inpage = last_bit;
+ else
+ last_inpage = BMPGSZ_BIT(metap) - 1;
+
+ /* Release exclusive lock on metapage while reading bitmap page */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE, LH_BITMAP_PAGE);
+ mappage = BufferGetPage(mapbuf);
+ freep = HashPageGetBitmap(mappage);
+
+ for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
+ {
+ if (freep[j] != ALL_SET)
+ {
+ page_found = true;
+
+ /* Reacquire exclusive lock on the meta page */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* convert bit to bit number within page */
+ bit += _hash_firstfreebit(freep[j]);
+ bitmap_page_bit = bit;
+
+ /* convert bit to absolute bit number */
+ bit += (i << BMPG_SHIFT(metap));
+ /* Calculate address of the recycled overflow page */
+ blkno = bitno_to_blkno(metap, bit);
+
+ /* Fetch and init the recycled page */
+ ovflbuf = _hash_getinitbuf(rel, blkno);
+
+ goto found;
+ }
+ }
+
+ /* No free space here, try to advance to next map page */
+ _hash_relbuf(rel, mapbuf);
+ mapbuf = InvalidBuffer;
+ i++;
+ j = 0; /* scan from start of next map page */
+ bit = 0;
+
+ /* Reacquire exclusive lock on the meta page */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+ }
+
+ /*
+ * No free pages --- have to extend the relation to add an overflow page.
+ * First, check to see if we have to add a new bitmap page too.
+ */
+ if (last_bit == (uint32) (BMPGSZ_BIT(metap) - 1))
+ {
+ /*
+ * We create the new bitmap page with all pages marked "in use".
+ * Actually two pages in the new bitmap's range will exist
+ * immediately: the bitmap page itself, and the following page which
+ * is the one we return to the caller. Both of these are correctly
+ * marked "in use". Subsequent pages do not exist yet, but it is
+ * convenient to pre-mark them as "in use" too.
+ */
+ bit = metap->hashm_spares[splitnum];
+
+ /* metapage already has a write lock */
+ if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("out of overflow pages in hash index \"%s\"",
+ RelationGetRelationName(rel))));
+
+ newmapbuf = _hash_getnewbuf(rel, bitno_to_blkno(metap, bit), MAIN_FORKNUM);
+ }
+ else
+ {
+ /*
+ * Nothing to do here; since the page will be past the last used page,
+ * we know its bitmap bit was preinitialized to "in use".
+ */
+ }
+
+ /* Calculate address of the new overflow page */
+ bit = BufferIsValid(newmapbuf) ?
+ metap->hashm_spares[splitnum] + 1 : metap->hashm_spares[splitnum];
+ blkno = bitno_to_blkno(metap, bit);
+
+ /*
+ * Fetch the page with _hash_getnewbuf to ensure smgr's idea of the
+ * relation length stays in sync with ours. XXX It's annoying to do this
+ * with metapage write lock held; would be better to use a lock that
+ * doesn't block incoming searches.
+ *
+ * It is okay to hold two buffer locks here (one on tail page of bucket
+ * and other on new overflow page) since there cannot be anyone else
+ * contending for access to ovflbuf.
+ */
+ ovflbuf = _hash_getnewbuf(rel, blkno, MAIN_FORKNUM);
+
+found:
+
+ /*
+ * Do the update. No ereport(ERROR) until changes are logged. We want to
+ * log the changes for bitmap page and overflow page together to avoid
+ * loss of pages in case the new page is added.
+ */
+ START_CRIT_SECTION();
+
+ if (page_found)
+ {
+ Assert(BufferIsValid(mapbuf));
+
+ /* mark page "in use" in the bitmap */
+ SETBIT(freep, bitmap_page_bit);
+ MarkBufferDirty(mapbuf);
+ }
+ else
+ {
+ /* update the count to indicate new overflow page is added */
+ metap->hashm_spares[splitnum]++;
+
+ if (BufferIsValid(newmapbuf))
+ {
+ _hash_initbitmapbuffer(newmapbuf, metap->hashm_bmsize, false);
+ MarkBufferDirty(newmapbuf);
+
+ /* add the new bitmap page to the metapage's list of bitmaps */
+ metap->hashm_mapp[metap->hashm_nmaps] = BufferGetBlockNumber(newmapbuf);
+ metap->hashm_nmaps++;
+ metap->hashm_spares[splitnum]++;
+ }
+
+ MarkBufferDirty(metabuf);
+
+ /*
+ * for new overflow page, we don't need to explicitly set the bit in
+ * bitmap page, as by default that will be set to "in use".
+ */
+ }
+
+ /*
+ * Adjust hashm_firstfree to avoid redundant searches. But don't risk
+ * changing it if someone moved it while we were searching bitmap pages.
+ */
+ if (metap->hashm_firstfree == orig_firstfree)
+ {
+ metap->hashm_firstfree = bit + 1;
+ MarkBufferDirty(metabuf);
+ }
+
+ /* initialize new overflow page */
+ ovflpage = BufferGetPage(ovflbuf);
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+ ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
+ ovflopaque->hasho_nextblkno = InvalidBlockNumber;
+ ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
+ ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
+ ovflopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ MarkBufferDirty(ovflbuf);
+
+ /* logically chain overflow page to previous page */
+ pageopaque->hasho_nextblkno = BufferGetBlockNumber(ovflbuf);
+
+ MarkBufferDirty(buf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ XLogRecPtr recptr;
+ xl_hash_add_ovfl_page xlrec;
+
+ xlrec.bmpage_found = page_found;
+ xlrec.bmsize = metap->hashm_bmsize;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashAddOvflPage);
+
+ XLogRegisterBuffer(0, ovflbuf, REGBUF_WILL_INIT);
+ XLogRegisterBufData(0, (char *) &pageopaque->hasho_bucket, sizeof(Bucket));
+
+ XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
+
+ if (BufferIsValid(mapbuf))
+ {
+ XLogRegisterBuffer(2, mapbuf, REGBUF_STANDARD);
+ XLogRegisterBufData(2, (char *) &bitmap_page_bit, sizeof(uint32));
+ }
+
+ if (BufferIsValid(newmapbuf))
+ XLogRegisterBuffer(3, newmapbuf, REGBUF_WILL_INIT);
+
+ XLogRegisterBuffer(4, metabuf, REGBUF_STANDARD);
+ XLogRegisterBufData(4, (char *) &metap->hashm_firstfree, sizeof(uint32));
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_ADD_OVFL_PAGE);
+
+ PageSetLSN(BufferGetPage(ovflbuf), recptr);
+ PageSetLSN(BufferGetPage(buf), recptr);
+
+ if (BufferIsValid(mapbuf))
+ PageSetLSN(BufferGetPage(mapbuf), recptr);
+
+ if (BufferIsValid(newmapbuf))
+ PageSetLSN(BufferGetPage(newmapbuf), recptr);
+
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ if (retain_pin)
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, buf);
+
+ if (BufferIsValid(mapbuf))
+ _hash_relbuf(rel, mapbuf);
+
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ if (BufferIsValid(newmapbuf))
+ _hash_relbuf(rel, newmapbuf);
+
+ return ovflbuf;
+}
+
+/*
+ * _hash_firstfreebit()
+ *
+ * Return the number of the first bit that is not set in the word 'map'.
+ */
+static uint32
+_hash_firstfreebit(uint32 map)
+{
+ uint32 i,
+ mask;
+
+ mask = 0x1;
+ for (i = 0; i < BITS_PER_MAP; i++)
+ {
+ if (!(mask & map))
+ return i;
+ mask <<= 1;
+ }
+
+ elog(ERROR, "firstfreebit found no free bit");
+
+ return 0; /* keep compiler quiet */
+}
+
+/*
+ * _hash_freeovflpage() -
+ *
+ * Remove this overflow page from its bucket's chain, and mark the page as
+ * free. On entry, ovflbuf is write-locked; it is released before exiting.
+ *
+ * Add the tuples (itups) to wbuf in this function. We could do that in the
+ * caller as well, but the advantage of doing it here is we can easily write
+ * the WAL for XLOG_HASH_SQUEEZE_PAGE operation. Addition of tuples and
+ * removal of overflow page has to done as an atomic operation, otherwise
+ * during replay on standby users might find duplicate records.
+ *
+ * Since this function is invoked in VACUUM, we provide an access strategy
+ * parameter that controls fetches of the bucket pages.
+ *
+ * Returns the block number of the page that followed the given page
+ * in the bucket, or InvalidBlockNumber if no following page.
+ *
+ * NB: caller must not hold lock on metapage, nor on page, that's next to
+ * ovflbuf in the bucket chain. We don't acquire the lock on page that's
+ * prior to ovflbuf in chain if it is same as wbuf because the caller already
+ * has a lock on same.
+ */
+BlockNumber
+_hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
+ Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
+ Size *tups_size, uint16 nitups,
+ BufferAccessStrategy bstrategy)
+{
+ HashMetaPage metap;
+ Buffer metabuf;
+ Buffer mapbuf;
+ BlockNumber ovflblkno;
+ BlockNumber prevblkno;
+ BlockNumber blkno;
+ BlockNumber nextblkno;
+ BlockNumber writeblkno;
+ HashPageOpaque ovflopaque;
+ Page ovflpage;
+ Page mappage;
+ uint32 *freep;
+ uint32 ovflbitno;
+ int32 bitmappage,
+ bitmapbit;
+ Bucket bucket PG_USED_FOR_ASSERTS_ONLY;
+ Buffer prevbuf = InvalidBuffer;
+ Buffer nextbuf = InvalidBuffer;
+ bool update_metap = false;
+
+ /* Get information from the doomed page */
+ _hash_checkpage(rel, ovflbuf, LH_OVERFLOW_PAGE);
+ ovflblkno = BufferGetBlockNumber(ovflbuf);
+ ovflpage = BufferGetPage(ovflbuf);
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+ nextblkno = ovflopaque->hasho_nextblkno;
+ prevblkno = ovflopaque->hasho_prevblkno;
+ writeblkno = BufferGetBlockNumber(wbuf);
+ bucket = ovflopaque->hasho_bucket;
+
+ /*
+ * Fix up the bucket chain. this is a doubly-linked list, so we must fix
+ * up the bucket chain members behind and ahead of the overflow page being
+ * deleted. Concurrency issues are avoided by using lock chaining as
+ * described atop hashbucketcleanup.
+ */
+ if (BlockNumberIsValid(prevblkno))
+ {
+ if (prevblkno == writeblkno)
+ prevbuf = wbuf;
+ else
+ prevbuf = _hash_getbuf_with_strategy(rel,
+ prevblkno,
+ HASH_WRITE,
+ LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
+ bstrategy);
+ }
+ if (BlockNumberIsValid(nextblkno))
+ nextbuf = _hash_getbuf_with_strategy(rel,
+ nextblkno,
+ HASH_WRITE,
+ LH_OVERFLOW_PAGE,
+ bstrategy);
+
+ /* Note: bstrategy is intentionally not used for metapage and bitmap */
+
+ /* Read the metapage so we can determine which bitmap page to use */
+ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+ /* Identify which bit to set */
+ ovflbitno = _hash_ovflblkno_to_bitno(metap, ovflblkno);
+
+ bitmappage = ovflbitno >> BMPG_SHIFT(metap);
+ bitmapbit = ovflbitno & BMPG_MASK(metap);
+
+ if (bitmappage >= metap->hashm_nmaps)
+ elog(ERROR, "invalid overflow bit number %u", ovflbitno);
+ blkno = metap->hashm_mapp[bitmappage];
+
+ /* Release metapage lock while we access the bitmap page */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ /* read the bitmap page to clear the bitmap bit */
+ mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
+ mappage = BufferGetPage(mapbuf);
+ freep = HashPageGetBitmap(mappage);
+ Assert(ISSET(freep, bitmapbit));
+
+ /* Get write-lock on metapage to update firstfree */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /* This operation needs to log multiple tuples, prepare WAL for that */
+ if (RelationNeedsWAL(rel))
+ XLogEnsureRecordSpace(HASH_XLOG_FREE_OVFL_BUFS, 4 + nitups);
+
+ START_CRIT_SECTION();
+
+ /*
+ * we have to insert tuples on the "write" page, being careful to preserve
+ * hashkey ordering. (If we insert many tuples into the same "write" page
+ * it would be worth qsort'ing them).
+ */
+ if (nitups > 0)
+ {
+ _hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
+ MarkBufferDirty(wbuf);
+ }
+
+ /*
+ * Reinitialize the freed overflow page. Just zeroing the page won't
+ * work, because WAL replay routines expect pages to be initialized. See
+ * explanation of RBM_NORMAL mode atop XLogReadBufferExtended. We are
+ * careful to make the special space valid here so that tools like
+ * pageinspect won't get confused.
+ */
+ _hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
+
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
+
+ ovflopaque->hasho_prevblkno = InvalidBlockNumber;
+ ovflopaque->hasho_nextblkno = InvalidBlockNumber;
+ ovflopaque->hasho_bucket = -1;
+ ovflopaque->hasho_flag = LH_UNUSED_PAGE;
+ ovflopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ MarkBufferDirty(ovflbuf);
+
+ if (BufferIsValid(prevbuf))
+ {
+ Page prevpage = BufferGetPage(prevbuf);
+ HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
+
+ Assert(prevopaque->hasho_bucket == bucket);
+ prevopaque->hasho_nextblkno = nextblkno;
+ MarkBufferDirty(prevbuf);
+ }
+ if (BufferIsValid(nextbuf))
+ {
+ Page nextpage = BufferGetPage(nextbuf);
+ HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
+
+ Assert(nextopaque->hasho_bucket == bucket);
+ nextopaque->hasho_prevblkno = prevblkno;
+ MarkBufferDirty(nextbuf);
+ }
+
+ /* Clear the bitmap bit to indicate that this overflow page is free */
+ CLRBIT(freep, bitmapbit);
+ MarkBufferDirty(mapbuf);
+
+ /* if this is now the first free page, update hashm_firstfree */
+ if (ovflbitno < metap->hashm_firstfree)
+ {
+ metap->hashm_firstfree = ovflbitno;
+ update_metap = true;
+ MarkBufferDirty(metabuf);
+ }
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_squeeze_page xlrec;
+ XLogRecPtr recptr;
+ int i;
+
+ xlrec.prevblkno = prevblkno;
+ xlrec.nextblkno = nextblkno;
+ xlrec.ntups = nitups;
+ xlrec.is_prim_bucket_same_wrt = (wbuf == bucketbuf);
+ xlrec.is_prev_bucket_same_wrt = (wbuf == prevbuf);
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage);
+
+ /*
+ * bucket buffer needs to be registered to ensure that we can acquire
+ * a cleanup lock on it during replay.
+ */
+ if (!xlrec.is_prim_bucket_same_wrt)
+ XLogRegisterBuffer(0, bucketbuf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
+
+ XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
+ if (xlrec.ntups > 0)
+ {
+ XLogRegisterBufData(1, (char *) itup_offsets,
+ nitups * sizeof(OffsetNumber));
+ for (i = 0; i < nitups; i++)
+ XLogRegisterBufData(1, (char *) itups[i], tups_size[i]);
+ }
+
+ XLogRegisterBuffer(2, ovflbuf, REGBUF_STANDARD);
+
+ /*
+ * If prevpage and the writepage (block in which we are moving tuples
+ * from overflow) are same, then no need to separately register
+ * prevpage. During replay, we can directly update the nextblock in
+ * writepage.
+ */
+ if (BufferIsValid(prevbuf) && !xlrec.is_prev_bucket_same_wrt)
+ XLogRegisterBuffer(3, prevbuf, REGBUF_STANDARD);
+
+ if (BufferIsValid(nextbuf))
+ XLogRegisterBuffer(4, nextbuf, REGBUF_STANDARD);
+
+ XLogRegisterBuffer(5, mapbuf, REGBUF_STANDARD);
+ XLogRegisterBufData(5, (char *) &bitmapbit, sizeof(uint32));
+
+ if (update_metap)
+ {
+ XLogRegisterBuffer(6, metabuf, REGBUF_STANDARD);
+ XLogRegisterBufData(6, (char *) &metap->hashm_firstfree, sizeof(uint32));
+ }
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SQUEEZE_PAGE);
+
+ PageSetLSN(BufferGetPage(wbuf), recptr);
+ PageSetLSN(BufferGetPage(ovflbuf), recptr);
+
+ if (BufferIsValid(prevbuf) && !xlrec.is_prev_bucket_same_wrt)
+ PageSetLSN(BufferGetPage(prevbuf), recptr);
+ if (BufferIsValid(nextbuf))
+ PageSetLSN(BufferGetPage(nextbuf), recptr);
+
+ PageSetLSN(BufferGetPage(mapbuf), recptr);
+
+ if (update_metap)
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /* release previous bucket if it is not same as write bucket */
+ if (BufferIsValid(prevbuf) && prevblkno != writeblkno)
+ _hash_relbuf(rel, prevbuf);
+
+ if (BufferIsValid(ovflbuf))
+ _hash_relbuf(rel, ovflbuf);
+
+ if (BufferIsValid(nextbuf))
+ _hash_relbuf(rel, nextbuf);
+
+ _hash_relbuf(rel, mapbuf);
+ _hash_relbuf(rel, metabuf);
+
+ return nextblkno;
+}
+
+
+/*
+ * _hash_initbitmapbuffer()
+ *
+ * Initialize a new bitmap page. All bits in the new bitmap page are set to
+ * "1", indicating "in use".
+ */
+void
+_hash_initbitmapbuffer(Buffer buf, uint16 bmsize, bool initpage)
+{
+ Page pg;
+ HashPageOpaque op;
+ uint32 *freep;
+
+ pg = BufferGetPage(buf);
+
+ /* initialize the page */
+ if (initpage)
+ _hash_pageinit(pg, BufferGetPageSize(buf));
+
+ /* initialize the page's special space */
+ op = (HashPageOpaque) PageGetSpecialPointer(pg);
+ op->hasho_prevblkno = InvalidBlockNumber;
+ op->hasho_nextblkno = InvalidBlockNumber;
+ op->hasho_bucket = -1;
+ op->hasho_flag = LH_BITMAP_PAGE;
+ op->hasho_page_id = HASHO_PAGE_ID;
+
+ /* set all of the bits to 1 */
+ freep = HashPageGetBitmap(pg);
+ MemSet(freep, 0xFF, bmsize);
+
+ /*
+ * Set pd_lower just past the end of the bitmap page data. We could even
+ * set pd_lower equal to pd_upper, but this is more precise and makes the
+ * page look compressible to xlog.c.
+ */
+ ((PageHeader) pg)->pd_lower = ((char *) freep + bmsize) - (char *) pg;
+}
+
+
+/*
+ * _hash_squeezebucket(rel, bucket)
+ *
+ * Try to squeeze the tuples onto pages occurring earlier in the
+ * bucket chain in an attempt to free overflow pages. When we start
+ * the "squeezing", the page from which we start taking tuples (the
+ * "read" page) is the last bucket in the bucket chain and the page
+ * onto which we start squeezing tuples (the "write" page) is the
+ * first page in the bucket chain. The read page works backward and
+ * the write page works forward; the procedure terminates when the
+ * read page and write page are the same page.
+ *
+ * At completion of this procedure, it is guaranteed that all pages in
+ * the bucket are nonempty, unless the bucket is totally empty (in
+ * which case all overflow pages will be freed). The original implementation
+ * required that to be true on entry as well, but it's a lot easier for
+ * callers to leave empty overflow pages and let this guy clean it up.
+ *
+ * Caller must acquire cleanup lock on the primary page of the target
+ * bucket to exclude any scans that are in progress, which could easily
+ * be confused into returning the same tuple more than once or some tuples
+ * not at all by the rearrangement we are performing here. To prevent
+ * any concurrent scan to cross the squeeze scan we use lock chaining
+ * similar to hashbucketcleanup. Refer comments atop hashbucketcleanup.
+ *
+ * We need to retain a pin on the primary bucket to ensure that no concurrent
+ * split can start.
+ *
+ * Since this function is invoked in VACUUM, we provide an access strategy
+ * parameter that controls fetches of the bucket pages.
+ */
+void
+_hash_squeezebucket(Relation rel,
+ Bucket bucket,
+ BlockNumber bucket_blkno,
+ Buffer bucket_buf,
+ BufferAccessStrategy bstrategy)
+{
+ BlockNumber wblkno;
+ BlockNumber rblkno;
+ Buffer wbuf;
+ Buffer rbuf;
+ Page wpage;
+ Page rpage;
+ HashPageOpaque wopaque;
+ HashPageOpaque ropaque;
+
+ /*
+ * start squeezing into the primary bucket page.
+ */
+ wblkno = bucket_blkno;
+ wbuf = bucket_buf;
+ wpage = BufferGetPage(wbuf);
+ wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
+
+ /*
+ * if there aren't any overflow pages, there's nothing to squeeze. caller
+ * is responsible for releasing the pin on primary bucket page.
+ */
+ if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
+ {
+ LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
+ return;
+ }
+
+ /*
+ * Find the last page in the bucket chain by starting at the base bucket
+ * page and working forward. Note: we assume that a hash bucket chain is
+ * usually smaller than the buffer ring being used by VACUUM, else using
+ * the access strategy here would be counterproductive.
+ */
+ rbuf = InvalidBuffer;
+ ropaque = wopaque;
+ do
+ {
+ rblkno = ropaque->hasho_nextblkno;
+ if (rbuf != InvalidBuffer)
+ _hash_relbuf(rel, rbuf);
+ rbuf = _hash_getbuf_with_strategy(rel,
+ rblkno,
+ HASH_WRITE,
+ LH_OVERFLOW_PAGE,
+ bstrategy);
+ rpage = BufferGetPage(rbuf);
+ ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
+ Assert(ropaque->hasho_bucket == bucket);
+ } while (BlockNumberIsValid(ropaque->hasho_nextblkno));
+
+ /*
+ * squeeze the tuples.
+ */
+ for (;;)
+ {
+ OffsetNumber roffnum;
+ OffsetNumber maxroffnum;
+ OffsetNumber deletable[MaxOffsetNumber];
+ IndexTuple itups[MaxIndexTuplesPerPage];
+ Size tups_size[MaxIndexTuplesPerPage];
+ OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
+ uint16 ndeletable = 0;
+ uint16 nitups = 0;
+ Size all_tups_size = 0;
+ int i;
+ bool retain_pin = false;
+
+readpage:
+ /* Scan each tuple in "read" page */
+ maxroffnum = PageGetMaxOffsetNumber(rpage);
+ for (roffnum = FirstOffsetNumber;
+ roffnum <= maxroffnum;
+ roffnum = OffsetNumberNext(roffnum))
+ {
+ IndexTuple itup;
+ Size itemsz;
+
+ /* skip dead tuples */
+ if (ItemIdIsDead(PageGetItemId(rpage, roffnum)))
+ continue;
+
+ itup = (IndexTuple) PageGetItem(rpage,
+ PageGetItemId(rpage, roffnum));
+ itemsz = IndexTupleSize(itup);
+ itemsz = MAXALIGN(itemsz);
+
+ /*
+ * Walk up the bucket chain, looking for a page big enough for
+ * this item and all other accumulated items. Exit if we reach
+ * the read page.
+ */
+ while (PageGetFreeSpaceForMultipleTuples(wpage, nitups + 1) < (all_tups_size + itemsz))
+ {
+ Buffer next_wbuf = InvalidBuffer;
+ bool tups_moved = false;
+
+ Assert(!PageIsEmpty(wpage));
+
+ if (wblkno == bucket_blkno)
+ retain_pin = true;
+
+ wblkno = wopaque->hasho_nextblkno;
+ Assert(BlockNumberIsValid(wblkno));
+
+ /* don't need to move to next page if we reached the read page */
+ if (wblkno != rblkno)
+ next_wbuf = _hash_getbuf_with_strategy(rel,
+ wblkno,
+ HASH_WRITE,
+ LH_OVERFLOW_PAGE,
+ bstrategy);
+
+ if (nitups > 0)
+ {
+ Assert(nitups == ndeletable);
+
+ /*
+ * This operation needs to log multiple tuples, prepare
+ * WAL for that.
+ */
+ if (RelationNeedsWAL(rel))
+ XLogEnsureRecordSpace(0, 3 + nitups);
+
+ START_CRIT_SECTION();
+
+ /*
+ * we have to insert tuples on the "write" page, being
+ * careful to preserve hashkey ordering. (If we insert
+ * many tuples into the same "write" page it would be
+ * worth qsort'ing them).
+ */
+ _hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
+ MarkBufferDirty(wbuf);
+
+ /* Delete tuples we already moved off read page */
+ PageIndexMultiDelete(rpage, deletable, ndeletable);
+ MarkBufferDirty(rbuf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ XLogRecPtr recptr;
+ xl_hash_move_page_contents xlrec;
+
+ xlrec.ntups = nitups;
+ xlrec.is_prim_bucket_same_wrt = (wbuf == bucket_buf) ? true : false;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents);
+
+ /*
+ * bucket buffer needs to be registered to ensure that
+ * we can acquire a cleanup lock on it during replay.
+ */
+ if (!xlrec.is_prim_bucket_same_wrt)
+ XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
+
+ XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
+ XLogRegisterBufData(1, (char *) itup_offsets,
+ nitups * sizeof(OffsetNumber));
+ for (i = 0; i < nitups; i++)
+ XLogRegisterBufData(1, (char *) itups[i], tups_size[i]);
+
+ XLogRegisterBuffer(2, rbuf, REGBUF_STANDARD);
+ XLogRegisterBufData(2, (char *) deletable,
+ ndeletable * sizeof(OffsetNumber));
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_MOVE_PAGE_CONTENTS);
+
+ PageSetLSN(BufferGetPage(wbuf), recptr);
+ PageSetLSN(BufferGetPage(rbuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ tups_moved = true;
+ }
+
+ /*
+ * release the lock on previous page after acquiring the lock
+ * on next page
+ */
+ if (retain_pin)
+ LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, wbuf);
+
+ /* nothing more to do if we reached the read page */
+ if (rblkno == wblkno)
+ {
+ _hash_relbuf(rel, rbuf);
+ return;
+ }
+
+ wbuf = next_wbuf;
+ wpage = BufferGetPage(wbuf);
+ wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
+ Assert(wopaque->hasho_bucket == bucket);
+ retain_pin = false;
+
+ /* be tidy */
+ for (i = 0; i < nitups; i++)
+ pfree(itups[i]);
+ nitups = 0;
+ all_tups_size = 0;
+ ndeletable = 0;
+
+ /*
+ * after moving the tuples, rpage would have been compacted,
+ * so we need to rescan it.
+ */
+ if (tups_moved)
+ goto readpage;
+ }
+
+ /* remember tuple for deletion from "read" page */
+ deletable[ndeletable++] = roffnum;
+
+ /*
+ * we need a copy of index tuples as they can be freed as part of
+ * overflow page, however we need them to write a WAL record in
+ * _hash_freeovflpage.
+ */
+ itups[nitups] = CopyIndexTuple(itup);
+ tups_size[nitups++] = itemsz;
+ all_tups_size += itemsz;
+ }
+
+ /*
+ * If we reach here, there are no live tuples on the "read" page ---
+ * it was empty when we got to it, or we moved them all. So we can
+ * just free the page without bothering with deleting tuples
+ * individually. Then advance to the previous "read" page.
+ *
+ * Tricky point here: if our read and write pages are adjacent in the
+ * bucket chain, our write lock on wbuf will conflict with
+ * _hash_freeovflpage's attempt to update the sibling links of the
+ * removed page. In that case, we don't need to lock it again.
+ */
+ rblkno = ropaque->hasho_prevblkno;
+ Assert(BlockNumberIsValid(rblkno));
+
+ /* free this overflow page (releases rbuf) */
+ _hash_freeovflpage(rel, bucket_buf, rbuf, wbuf, itups, itup_offsets,
+ tups_size, nitups, bstrategy);
+
+ /* be tidy */
+ for (i = 0; i < nitups; i++)
+ pfree(itups[i]);
+
+ /* are we freeing the page adjacent to wbuf? */
+ if (rblkno == wblkno)
+ {
+ /* retain the pin on primary bucket page till end of bucket scan */
+ if (wblkno == bucket_blkno)
+ LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, wbuf);
+ return;
+ }
+
+ rbuf = _hash_getbuf_with_strategy(rel,
+ rblkno,
+ HASH_WRITE,
+ LH_OVERFLOW_PAGE,
+ bstrategy);
+ rpage = BufferGetPage(rbuf);
+ ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
+ Assert(ropaque->hasho_bucket == bucket);
+ }
+
+ /* NOTREACHED */
+}
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
new file mode 100644
index 0000000..49a9867
--- /dev/null
+++ b/src/backend/access/hash/hashpage.c
@@ -0,0 +1,1612 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashpage.c
+ * Hash table page management code for the Postgres hash access method
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashpage.c
+ *
+ * NOTES
+ * Postgres hash pages look like ordinary relation pages. The opaque
+ * data at high addresses includes information about the page including
+ * whether a page is an overflow page or a true bucket, the bucket
+ * number, and the block numbers of the preceding and following pages
+ * in the same bucket.
+ *
+ * The first page in a hash relation, page zero, is special -- it stores
+ * information describing the hash table; it is referred to as the
+ * "meta page." Pages one and higher store the actual data.
+ *
+ * There are also bitmap pages, which are not manipulated here;
+ * see hashovfl.c.
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/hash_xlog.h"
+#include "miscadmin.h"
+#include "port/pg_bitutils.h"
+#include "storage/lmgr.h"
+#include "storage/predicate.h"
+#include "storage/smgr.h"
+
+static bool _hash_alloc_buckets(Relation rel, BlockNumber firstblock,
+ uint32 nblocks);
+static void _hash_splitbucket(Relation rel, Buffer metabuf,
+ Bucket obucket, Bucket nbucket,
+ Buffer obuf,
+ Buffer nbuf,
+ HTAB *htab,
+ uint32 maxbucket,
+ uint32 highmask, uint32 lowmask);
+static void log_split_page(Relation rel, Buffer buf);
+
+
+/*
+ * _hash_getbuf() -- Get a buffer by block number for read or write.
+ *
+ * 'access' must be HASH_READ, HASH_WRITE, or HASH_NOLOCK.
+ * 'flags' is a bitwise OR of the allowed page types.
+ *
+ * This must be used only to fetch pages that are expected to be valid
+ * already. _hash_checkpage() is applied using the given flags.
+ *
+ * When this routine returns, the appropriate lock is set on the
+ * requested buffer and its reference count has been incremented
+ * (ie, the buffer is "locked and pinned").
+ *
+ * P_NEW is disallowed because this routine can only be used
+ * to access pages that are known to be before the filesystem EOF.
+ * Extending the index should be done with _hash_getnewbuf.
+ */
+Buffer
+_hash_getbuf(Relation rel, BlockNumber blkno, int access, int flags)
+{
+ Buffer buf;
+
+ if (blkno == P_NEW)
+ elog(ERROR, "hash AM does not use P_NEW");
+
+ buf = ReadBuffer(rel, blkno);
+
+ if (access != HASH_NOLOCK)
+ LockBuffer(buf, access);
+
+ /* ref count and lock type are correct */
+
+ _hash_checkpage(rel, buf, flags);
+
+ return buf;
+}
+
+/*
+ * _hash_getbuf_with_condlock_cleanup() -- Try to get a buffer for cleanup.
+ *
+ * We read the page and try to acquire a cleanup lock. If we get it,
+ * we return the buffer; otherwise, we return InvalidBuffer.
+ */
+Buffer
+_hash_getbuf_with_condlock_cleanup(Relation rel, BlockNumber blkno, int flags)
+{
+ Buffer buf;
+
+ if (blkno == P_NEW)
+ elog(ERROR, "hash AM does not use P_NEW");
+
+ buf = ReadBuffer(rel, blkno);
+
+ if (!ConditionalLockBufferForCleanup(buf))
+ {
+ ReleaseBuffer(buf);
+ return InvalidBuffer;
+ }
+
+ /* ref count and lock type are correct */
+
+ _hash_checkpage(rel, buf, flags);
+
+ return buf;
+}
+
+/*
+ * _hash_getinitbuf() -- Get and initialize a buffer by block number.
+ *
+ * This must be used only to fetch pages that are known to be before
+ * the index's filesystem EOF, but are to be filled from scratch.
+ * _hash_pageinit() is applied automatically. Otherwise it has
+ * effects similar to _hash_getbuf() with access = HASH_WRITE.
+ *
+ * When this routine returns, a write lock is set on the
+ * requested buffer and its reference count has been incremented
+ * (ie, the buffer is "locked and pinned").
+ *
+ * P_NEW is disallowed because this routine can only be used
+ * to access pages that are known to be before the filesystem EOF.
+ * Extending the index should be done with _hash_getnewbuf.
+ */
+Buffer
+_hash_getinitbuf(Relation rel, BlockNumber blkno)
+{
+ Buffer buf;
+
+ if (blkno == P_NEW)
+ elog(ERROR, "hash AM does not use P_NEW");
+
+ buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK,
+ NULL);
+
+ /* ref count and lock type are correct */
+
+ /* initialize the page */
+ _hash_pageinit(BufferGetPage(buf), BufferGetPageSize(buf));
+
+ return buf;
+}
+
+/*
+ * _hash_initbuf() -- Get and initialize a buffer by bucket number.
+ */
+void
+_hash_initbuf(Buffer buf, uint32 max_bucket, uint32 num_bucket, uint32 flag,
+ bool initpage)
+{
+ HashPageOpaque pageopaque;
+ Page page;
+
+ page = BufferGetPage(buf);
+
+ /* initialize the page */
+ if (initpage)
+ _hash_pageinit(page, BufferGetPageSize(buf));
+
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ /*
+ * Set hasho_prevblkno with current hashm_maxbucket. This value will be
+ * used to validate cached HashMetaPageData. See
+ * _hash_getbucketbuf_from_hashkey().
+ */
+ pageopaque->hasho_prevblkno = max_bucket;
+ pageopaque->hasho_nextblkno = InvalidBlockNumber;
+ pageopaque->hasho_bucket = num_bucket;
+ pageopaque->hasho_flag = flag;
+ pageopaque->hasho_page_id = HASHO_PAGE_ID;
+}
+
+/*
+ * _hash_getnewbuf() -- Get a new page at the end of the index.
+ *
+ * This has the same API as _hash_getinitbuf, except that we are adding
+ * a page to the index, and hence expect the page to be past the
+ * logical EOF. (However, we have to support the case where it isn't,
+ * since a prior try might have crashed after extending the filesystem
+ * EOF but before updating the metapage to reflect the added page.)
+ *
+ * It is caller's responsibility to ensure that only one process can
+ * extend the index at a time. In practice, this function is called
+ * only while holding write lock on the metapage, because adding a page
+ * is always associated with an update of metapage data.
+ */
+Buffer
+_hash_getnewbuf(Relation rel, BlockNumber blkno, ForkNumber forkNum)
+{
+ BlockNumber nblocks = RelationGetNumberOfBlocksInFork(rel, forkNum);
+ Buffer buf;
+
+ if (blkno == P_NEW)
+ elog(ERROR, "hash AM does not use P_NEW");
+ if (blkno > nblocks)
+ elog(ERROR, "access to noncontiguous page in hash index \"%s\"",
+ RelationGetRelationName(rel));
+
+ /* smgr insists we use P_NEW to extend the relation */
+ if (blkno == nblocks)
+ {
+ buf = ReadBufferExtended(rel, forkNum, P_NEW, RBM_NORMAL, NULL);
+ if (BufferGetBlockNumber(buf) != blkno)
+ elog(ERROR, "unexpected hash relation size: %u, should be %u",
+ BufferGetBlockNumber(buf), blkno);
+ LockBuffer(buf, HASH_WRITE);
+ }
+ else
+ {
+ buf = ReadBufferExtended(rel, forkNum, blkno, RBM_ZERO_AND_LOCK,
+ NULL);
+ }
+
+ /* ref count and lock type are correct */
+
+ /* initialize the page */
+ _hash_pageinit(BufferGetPage(buf), BufferGetPageSize(buf));
+
+ return buf;
+}
+
+/*
+ * _hash_getbuf_with_strategy() -- Get a buffer with nondefault strategy.
+ *
+ * This is identical to _hash_getbuf() but also allows a buffer access
+ * strategy to be specified. We use this for VACUUM operations.
+ */
+Buffer
+_hash_getbuf_with_strategy(Relation rel, BlockNumber blkno,
+ int access, int flags,
+ BufferAccessStrategy bstrategy)
+{
+ Buffer buf;
+
+ if (blkno == P_NEW)
+ elog(ERROR, "hash AM does not use P_NEW");
+
+ buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, bstrategy);
+
+ if (access != HASH_NOLOCK)
+ LockBuffer(buf, access);
+
+ /* ref count and lock type are correct */
+
+ _hash_checkpage(rel, buf, flags);
+
+ return buf;
+}
+
+/*
+ * _hash_relbuf() -- release a locked buffer.
+ *
+ * Lock and pin (refcount) are both dropped.
+ */
+void
+_hash_relbuf(Relation rel, Buffer buf)
+{
+ UnlockReleaseBuffer(buf);
+}
+
+/*
+ * _hash_dropbuf() -- release an unlocked buffer.
+ *
+ * This is used to unpin a buffer on which we hold no lock.
+ */
+void
+_hash_dropbuf(Relation rel, Buffer buf)
+{
+ ReleaseBuffer(buf);
+}
+
+/*
+ * _hash_dropscanbuf() -- release buffers used in scan.
+ *
+ * This routine unpins the buffers used during scan on which we
+ * hold no lock.
+ */
+void
+_hash_dropscanbuf(Relation rel, HashScanOpaque so)
+{
+ /* release pin we hold on primary bucket page */
+ if (BufferIsValid(so->hashso_bucket_buf) &&
+ so->hashso_bucket_buf != so->currPos.buf)
+ _hash_dropbuf(rel, so->hashso_bucket_buf);
+ so->hashso_bucket_buf = InvalidBuffer;
+
+ /* release pin we hold on primary bucket page of bucket being split */
+ if (BufferIsValid(so->hashso_split_bucket_buf) &&
+ so->hashso_split_bucket_buf != so->currPos.buf)
+ _hash_dropbuf(rel, so->hashso_split_bucket_buf);
+ so->hashso_split_bucket_buf = InvalidBuffer;
+
+ /* release any pin we still hold */
+ if (BufferIsValid(so->currPos.buf))
+ _hash_dropbuf(rel, so->currPos.buf);
+ so->currPos.buf = InvalidBuffer;
+
+ /* reset split scan */
+ so->hashso_buc_populated = false;
+ so->hashso_buc_split = false;
+}
+
+
+/*
+ * _hash_init() -- Initialize the metadata page of a hash index,
+ * the initial buckets, and the initial bitmap page.
+ *
+ * The initial number of buckets is dependent on num_tuples, an estimate
+ * of the number of tuples to be loaded into the index initially. The
+ * chosen number of buckets is returned.
+ *
+ * We are fairly cavalier about locking here, since we know that no one else
+ * could be accessing this index. In particular the rule about not holding
+ * multiple buffer locks is ignored.
+ */
+uint32
+_hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
+{
+ Buffer metabuf;
+ Buffer buf;
+ Buffer bitmapbuf;
+ Page pg;
+ HashMetaPage metap;
+ RegProcedure procid;
+ int32 data_width;
+ int32 item_width;
+ int32 ffactor;
+ uint32 num_buckets;
+ uint32 i;
+ bool use_wal;
+
+ /* safety check */
+ if (RelationGetNumberOfBlocksInFork(rel, forkNum) != 0)
+ elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
+ RelationGetRelationName(rel));
+
+ /*
+ * WAL log creation of pages if the relation is persistent, or this is the
+ * init fork. Init forks for unlogged relations always need to be WAL
+ * logged.
+ */
+ use_wal = RelationNeedsWAL(rel) || forkNum == INIT_FORKNUM;
+
+ /*
+ * Determine the target fill factor (in tuples per bucket) for this index.
+ * The idea is to make the fill factor correspond to pages about as full
+ * as the user-settable fillfactor parameter says. We can compute it
+ * exactly since the index datatype (i.e. uint32 hash key) is fixed-width.
+ */
+ data_width = sizeof(uint32);
+ item_width = MAXALIGN(sizeof(IndexTupleData)) + MAXALIGN(data_width) +
+ sizeof(ItemIdData); /* include the line pointer */
+ ffactor = HashGetTargetPageUsage(rel) / item_width;
+ /* keep to a sane range */
+ if (ffactor < 10)
+ ffactor = 10;
+
+ procid = index_getprocid(rel, 1, HASHSTANDARD_PROC);
+
+ /*
+ * We initialize the metapage, the first N bucket pages, and the first
+ * bitmap page in sequence, using _hash_getnewbuf to cause smgrextend()
+ * calls to occur. This ensures that the smgr level has the right idea of
+ * the physical index length.
+ *
+ * Critical section not required, because on error the creation of the
+ * whole relation will be rolled back.
+ */
+ metabuf = _hash_getnewbuf(rel, HASH_METAPAGE, forkNum);
+ _hash_init_metabuffer(metabuf, num_tuples, procid, ffactor, false);
+ MarkBufferDirty(metabuf);
+
+ pg = BufferGetPage(metabuf);
+ metap = HashPageGetMeta(pg);
+
+ /* XLOG stuff */
+ if (use_wal)
+ {
+ xl_hash_init_meta_page xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.num_tuples = num_tuples;
+ xlrec.procid = metap->hashm_procid;
+ xlrec.ffactor = metap->hashm_ffactor;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashInitMetaPage);
+ XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE);
+
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ num_buckets = metap->hashm_maxbucket + 1;
+
+ /*
+ * Release buffer lock on the metapage while we initialize buckets.
+ * Otherwise, we'll be in interrupt holdoff and the CHECK_FOR_INTERRUPTS
+ * won't accomplish anything. It's a bad idea to hold buffer locks for
+ * long intervals in any case, since that can block the bgwriter.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Initialize and WAL Log the first N buckets
+ */
+ for (i = 0; i < num_buckets; i++)
+ {
+ BlockNumber blkno;
+
+ /* Allow interrupts, in case N is huge */
+ CHECK_FOR_INTERRUPTS();
+
+ blkno = BUCKET_TO_BLKNO(metap, i);
+ buf = _hash_getnewbuf(rel, blkno, forkNum);
+ _hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false);
+ MarkBufferDirty(buf);
+
+ if (use_wal)
+ log_newpage(&rel->rd_node,
+ forkNum,
+ blkno,
+ BufferGetPage(buf),
+ true);
+ _hash_relbuf(rel, buf);
+ }
+
+ /* Now reacquire buffer lock on metapage */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * Initialize bitmap page
+ */
+ bitmapbuf = _hash_getnewbuf(rel, num_buckets + 1, forkNum);
+ _hash_initbitmapbuffer(bitmapbuf, metap->hashm_bmsize, false);
+ MarkBufferDirty(bitmapbuf);
+
+ /* add the new bitmap page to the metapage's list of bitmaps */
+ /* metapage already has a write lock */
+ if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("out of overflow pages in hash index \"%s\"",
+ RelationGetRelationName(rel))));
+
+ metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
+
+ metap->hashm_nmaps++;
+ MarkBufferDirty(metabuf);
+
+ /* XLOG stuff */
+ if (use_wal)
+ {
+ xl_hash_init_bitmap_page xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.bmsize = metap->hashm_bmsize;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfHashInitBitmapPage);
+ XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT);
+
+ /*
+ * This is safe only because nobody else can be modifying the index at
+ * this stage; it's only visible to the transaction that is creating
+ * it.
+ */
+ XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_BITMAP_PAGE);
+
+ PageSetLSN(BufferGetPage(bitmapbuf), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ /* all done */
+ _hash_relbuf(rel, bitmapbuf);
+ _hash_relbuf(rel, metabuf);
+
+ return num_buckets;
+}
+
+/*
+ * _hash_init_metabuffer() -- Initialize the metadata page of a hash index.
+ */
+void
+_hash_init_metabuffer(Buffer buf, double num_tuples, RegProcedure procid,
+ uint16 ffactor, bool initpage)
+{
+ HashMetaPage metap;
+ HashPageOpaque pageopaque;
+ Page page;
+ double dnumbuckets;
+ uint32 num_buckets;
+ uint32 spare_index;
+ uint32 lshift;
+
+ /*
+ * Choose the number of initial bucket pages to match the fill factor
+ * given the estimated number of tuples. We round up the result to the
+ * total number of buckets which has to be allocated before using its
+ * hashm_spares element. However always force at least 2 bucket pages. The
+ * upper limit is determined by considerations explained in
+ * _hash_expandtable().
+ */
+ dnumbuckets = num_tuples / ffactor;
+ if (dnumbuckets <= 2.0)
+ num_buckets = 2;
+ else if (dnumbuckets >= (double) 0x40000000)
+ num_buckets = 0x40000000;
+ else
+ num_buckets = _hash_get_totalbuckets(_hash_spareindex(dnumbuckets));
+
+ spare_index = _hash_spareindex(num_buckets);
+ Assert(spare_index < HASH_MAX_SPLITPOINTS);
+
+ page = BufferGetPage(buf);
+ if (initpage)
+ _hash_pageinit(page, BufferGetPageSize(buf));
+
+ pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ pageopaque->hasho_prevblkno = InvalidBlockNumber;
+ pageopaque->hasho_nextblkno = InvalidBlockNumber;
+ pageopaque->hasho_bucket = -1;
+ pageopaque->hasho_flag = LH_META_PAGE;
+ pageopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ metap = HashPageGetMeta(page);
+
+ metap->hashm_magic = HASH_MAGIC;
+ metap->hashm_version = HASH_VERSION;
+ metap->hashm_ntuples = 0;
+ metap->hashm_nmaps = 0;
+ metap->hashm_ffactor = ffactor;
+ metap->hashm_bsize = HashGetMaxBitmapSize(page);
+
+ /* find largest bitmap array size that will fit in page size */
+ lshift = pg_leftmost_one_pos32(metap->hashm_bsize);
+ Assert(lshift > 0);
+ metap->hashm_bmsize = 1 << lshift;
+ metap->hashm_bmshift = lshift + BYTE_TO_BIT;
+ Assert((1 << BMPG_SHIFT(metap)) == (BMPG_MASK(metap) + 1));
+
+ /*
+ * Label the index with its primary hash support function's OID. This is
+ * pretty useless for normal operation (in fact, hashm_procid is not used
+ * anywhere), but it might be handy for forensic purposes so we keep it.
+ */
+ metap->hashm_procid = procid;
+
+ /*
+ * We initialize the index with N buckets, 0 .. N-1, occupying physical
+ * blocks 1 to N. The first freespace bitmap page is in block N+1.
+ */
+ metap->hashm_maxbucket = num_buckets - 1;
+
+ /*
+ * Set highmask as next immediate ((2 ^ x) - 1), which should be
+ * sufficient to cover num_buckets.
+ */
+ metap->hashm_highmask = pg_nextpower2_32(num_buckets + 1) - 1;
+ metap->hashm_lowmask = (metap->hashm_highmask >> 1);
+
+ MemSet(metap->hashm_spares, 0, sizeof(metap->hashm_spares));
+ MemSet(metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
+
+ /* Set up mapping for one spare page after the initial splitpoints */
+ metap->hashm_spares[spare_index] = 1;
+ metap->hashm_ovflpoint = spare_index;
+ metap->hashm_firstfree = 0;
+
+ /*
+ * Set pd_lower just past the end of the metadata. This is essential,
+ * because without doing so, metadata will be lost if xlog.c compresses
+ * the page.
+ */
+ ((PageHeader) page)->pd_lower =
+ ((char *) metap + sizeof(HashMetaPageData)) - (char *) page;
+}
+
+/*
+ * _hash_pageinit() -- Initialize a new hash index page.
+ */
+void
+_hash_pageinit(Page page, Size size)
+{
+ PageInit(page, size, sizeof(HashPageOpaqueData));
+}
+
+/*
+ * Attempt to expand the hash table by creating one new bucket.
+ *
+ * This will silently do nothing if we don't get cleanup lock on old or
+ * new bucket.
+ *
+ * Complete the pending splits and remove the tuples from old bucket,
+ * if there are any left over from the previous split.
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state.
+ */
+void
+_hash_expandtable(Relation rel, Buffer metabuf)
+{
+ HashMetaPage metap;
+ Bucket old_bucket;
+ Bucket new_bucket;
+ uint32 spare_ndx;
+ BlockNumber start_oblkno;
+ BlockNumber start_nblkno;
+ Buffer buf_nblkno;
+ Buffer buf_oblkno;
+ Page opage;
+ Page npage;
+ HashPageOpaque oopaque;
+ HashPageOpaque nopaque;
+ uint32 maxbucket;
+ uint32 highmask;
+ uint32 lowmask;
+ bool metap_update_masks = false;
+ bool metap_update_splitpoint = false;
+
+restart_expand:
+
+ /*
+ * Write-lock the meta page. It used to be necessary to acquire a
+ * heavyweight lock to begin a split, but that is no longer required.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+ _hash_checkpage(rel, metabuf, LH_META_PAGE);
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+ /*
+ * Check to see if split is still needed; someone else might have already
+ * done one while we waited for the lock.
+ *
+ * Make sure this stays in sync with _hash_doinsert()
+ */
+ if (metap->hashm_ntuples <=
+ (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1))
+ goto fail;
+
+ /*
+ * Can't split anymore if maxbucket has reached its maximum possible
+ * value.
+ *
+ * Ideally we'd allow bucket numbers up to UINT_MAX-1 (no higher because
+ * the calculation maxbucket+1 mustn't overflow). Currently we restrict
+ * to half that to prevent failure of pg_ceil_log2_32() and insufficient
+ * space in hashm_spares[]. It's moot anyway because an index with 2^32
+ * buckets would certainly overflow BlockNumber and hence
+ * _hash_alloc_buckets() would fail, but if we supported buckets smaller
+ * than a disk block then this would be an independent constraint.
+ *
+ * If you change this, see also the maximum initial number of buckets in
+ * _hash_init().
+ */
+ if (metap->hashm_maxbucket >= (uint32) 0x7FFFFFFE)
+ goto fail;
+
+ /*
+ * Determine which bucket is to be split, and attempt to take cleanup lock
+ * on the old bucket. If we can't get the lock, give up.
+ *
+ * The cleanup lock protects us not only against other backends, but
+ * against our own backend as well.
+ *
+ * The cleanup lock is mainly to protect the split from concurrent
+ * inserts. See src/backend/access/hash/README, Lock Definitions for
+ * further details. Due to this locking restriction, if there is any
+ * pending scan, the split will give up which is not good, but harmless.
+ */
+ new_bucket = metap->hashm_maxbucket + 1;
+
+ old_bucket = (new_bucket & metap->hashm_lowmask);
+
+ start_oblkno = BUCKET_TO_BLKNO(metap, old_bucket);
+
+ buf_oblkno = _hash_getbuf_with_condlock_cleanup(rel, start_oblkno, LH_BUCKET_PAGE);
+ if (!buf_oblkno)
+ goto fail;
+
+ opage = BufferGetPage(buf_oblkno);
+ oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+ /*
+ * We want to finish the split from a bucket as there is no apparent
+ * benefit by not doing so and it will make the code complicated to finish
+ * the split that involves multiple buckets considering the case where new
+ * split also fails. We don't need to consider the new bucket for
+ * completing the split here as it is not possible that a re-split of new
+ * bucket starts when there is still a pending split from old bucket.
+ */
+ if (H_BUCKET_BEING_SPLIT(oopaque))
+ {
+ /*
+ * Copy bucket mapping info now; refer the comment in code below where
+ * we copy this information before calling _hash_splitbucket to see
+ * why this is okay.
+ */
+ maxbucket = metap->hashm_maxbucket;
+ highmask = metap->hashm_highmask;
+ lowmask = metap->hashm_lowmask;
+
+ /*
+ * Release the lock on metapage and old_bucket, before completing the
+ * split.
+ */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(buf_oblkno, BUFFER_LOCK_UNLOCK);
+
+ _hash_finish_split(rel, metabuf, buf_oblkno, old_bucket, maxbucket,
+ highmask, lowmask);
+
+ /* release the pin on old buffer and retry for expand. */
+ _hash_dropbuf(rel, buf_oblkno);
+
+ goto restart_expand;
+ }
+
+ /*
+ * Clean the tuples remained from the previous split. This operation
+ * requires cleanup lock and we already have one on the old bucket, so
+ * let's do it. We also don't want to allow further splits from the bucket
+ * till the garbage of previous split is cleaned. This has two
+ * advantages; first, it helps in avoiding the bloat due to garbage and
+ * second is, during cleanup of bucket, we are always sure that the
+ * garbage tuples belong to most recently split bucket. On the contrary,
+ * if we allow cleanup of bucket after meta page is updated to indicate
+ * the new split and before the actual split, the cleanup operation won't
+ * be able to decide whether the tuple has been moved to the newly created
+ * bucket and ended up deleting such tuples.
+ */
+ if (H_NEEDS_SPLIT_CLEANUP(oopaque))
+ {
+ /*
+ * Copy bucket mapping info now; refer to the comment in code below
+ * where we copy this information before calling _hash_splitbucket to
+ * see why this is okay.
+ */
+ maxbucket = metap->hashm_maxbucket;
+ highmask = metap->hashm_highmask;
+ lowmask = metap->hashm_lowmask;
+
+ /* Release the metapage lock. */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ hashbucketcleanup(rel, old_bucket, buf_oblkno, start_oblkno, NULL,
+ maxbucket, highmask, lowmask, NULL, NULL, true,
+ NULL, NULL);
+
+ _hash_dropbuf(rel, buf_oblkno);
+
+ goto restart_expand;
+ }
+
+ /*
+ * There shouldn't be any active scan on new bucket.
+ *
+ * Note: it is safe to compute the new bucket's blkno here, even though we
+ * may still need to update the BUCKET_TO_BLKNO mapping. This is because
+ * the current value of hashm_spares[hashm_ovflpoint] correctly shows
+ * where we are going to put a new splitpoint's worth of buckets.
+ */
+ start_nblkno = BUCKET_TO_BLKNO(metap, new_bucket);
+
+ /*
+ * If the split point is increasing we need to allocate a new batch of
+ * bucket pages.
+ */
+ spare_ndx = _hash_spareindex(new_bucket + 1);
+ if (spare_ndx > metap->hashm_ovflpoint)
+ {
+ uint32 buckets_to_add;
+
+ Assert(spare_ndx == metap->hashm_ovflpoint + 1);
+
+ /*
+ * We treat allocation of buckets as a separate WAL-logged action.
+ * Even if we fail after this operation, won't leak bucket pages;
+ * rather, the next split will consume this space. In any case, even
+ * without failure we don't use all the space in one split operation.
+ */
+ buckets_to_add = _hash_get_totalbuckets(spare_ndx) - new_bucket;
+ if (!_hash_alloc_buckets(rel, start_nblkno, buckets_to_add))
+ {
+ /* can't split due to BlockNumber overflow */
+ _hash_relbuf(rel, buf_oblkno);
+ goto fail;
+ }
+ }
+
+ /*
+ * Physically allocate the new bucket's primary page. We want to do this
+ * before changing the metapage's mapping info, in case we can't get the
+ * disk space. Ideally, we don't need to check for cleanup lock on new
+ * bucket as no other backend could find this bucket unless meta page is
+ * updated. However, it is good to be consistent with old bucket locking.
+ */
+ buf_nblkno = _hash_getnewbuf(rel, start_nblkno, MAIN_FORKNUM);
+ if (!IsBufferCleanupOK(buf_nblkno))
+ {
+ _hash_relbuf(rel, buf_oblkno);
+ _hash_relbuf(rel, buf_nblkno);
+ goto fail;
+ }
+
+ /*
+ * Since we are scribbling on the pages in the shared buffers, establish a
+ * critical section. Any failure in this next code leaves us with a big
+ * problem: the metapage is effectively corrupt but could get written back
+ * to disk.
+ */
+ START_CRIT_SECTION();
+
+ /*
+ * Okay to proceed with split. Update the metapage bucket mapping info.
+ */
+ metap->hashm_maxbucket = new_bucket;
+
+ if (new_bucket > metap->hashm_highmask)
+ {
+ /* Starting a new doubling */
+ metap->hashm_lowmask = metap->hashm_highmask;
+ metap->hashm_highmask = new_bucket | metap->hashm_lowmask;
+ metap_update_masks = true;
+ }
+
+ /*
+ * If the split point is increasing we need to adjust the hashm_spares[]
+ * array and hashm_ovflpoint so that future overflow pages will be created
+ * beyond this new batch of bucket pages.
+ */
+ if (spare_ndx > metap->hashm_ovflpoint)
+ {
+ metap->hashm_spares[spare_ndx] = metap->hashm_spares[metap->hashm_ovflpoint];
+ metap->hashm_ovflpoint = spare_ndx;
+ metap_update_splitpoint = true;
+ }
+
+ MarkBufferDirty(metabuf);
+
+ /*
+ * Copy bucket mapping info now; this saves re-accessing the meta page
+ * inside _hash_splitbucket's inner loop. Note that once we drop the
+ * split lock, other splits could begin, so these values might be out of
+ * date before _hash_splitbucket finishes. That's okay, since all it
+ * needs is to tell which of these two buckets to map hashkeys into.
+ */
+ maxbucket = metap->hashm_maxbucket;
+ highmask = metap->hashm_highmask;
+ lowmask = metap->hashm_lowmask;
+
+ opage = BufferGetPage(buf_oblkno);
+ oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+ /*
+ * Mark the old bucket to indicate that split is in progress. (At
+ * operation end, we will clear the split-in-progress flag.) Also, for a
+ * primary bucket page, hasho_prevblkno stores the number of buckets that
+ * existed as of the last split, so we must update that value here.
+ */
+ oopaque->hasho_flag |= LH_BUCKET_BEING_SPLIT;
+ oopaque->hasho_prevblkno = maxbucket;
+
+ MarkBufferDirty(buf_oblkno);
+
+ npage = BufferGetPage(buf_nblkno);
+
+ /*
+ * initialize the new bucket's primary page and mark it to indicate that
+ * split is in progress.
+ */
+ nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+ nopaque->hasho_prevblkno = maxbucket;
+ nopaque->hasho_nextblkno = InvalidBlockNumber;
+ nopaque->hasho_bucket = new_bucket;
+ nopaque->hasho_flag = LH_BUCKET_PAGE | LH_BUCKET_BEING_POPULATED;
+ nopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ MarkBufferDirty(buf_nblkno);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ xl_hash_split_allocate_page xlrec;
+ XLogRecPtr recptr;
+
+ xlrec.new_bucket = maxbucket;
+ xlrec.old_bucket_flag = oopaque->hasho_flag;
+ xlrec.new_bucket_flag = nopaque->hasho_flag;
+ xlrec.flags = 0;
+
+ XLogBeginInsert();
+
+ XLogRegisterBuffer(0, buf_oblkno, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, buf_nblkno, REGBUF_WILL_INIT);
+ XLogRegisterBuffer(2, metabuf, REGBUF_STANDARD);
+
+ if (metap_update_masks)
+ {
+ xlrec.flags |= XLH_SPLIT_META_UPDATE_MASKS;
+ XLogRegisterBufData(2, (char *) &metap->hashm_lowmask, sizeof(uint32));
+ XLogRegisterBufData(2, (char *) &metap->hashm_highmask, sizeof(uint32));
+ }
+
+ if (metap_update_splitpoint)
+ {
+ xlrec.flags |= XLH_SPLIT_META_UPDATE_SPLITPOINT;
+ XLogRegisterBufData(2, (char *) &metap->hashm_ovflpoint,
+ sizeof(uint32));
+ XLogRegisterBufData(2,
+ (char *) &metap->hashm_spares[metap->hashm_ovflpoint],
+ sizeof(uint32));
+ }
+
+ XLogRegisterData((char *) &xlrec, SizeOfHashSplitAllocPage);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_ALLOCATE_PAGE);
+
+ PageSetLSN(BufferGetPage(buf_oblkno), recptr);
+ PageSetLSN(BufferGetPage(buf_nblkno), recptr);
+ PageSetLSN(BufferGetPage(metabuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /* drop lock, but keep pin */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+
+ /* Relocate records to the new bucket */
+ _hash_splitbucket(rel, metabuf,
+ old_bucket, new_bucket,
+ buf_oblkno, buf_nblkno, NULL,
+ maxbucket, highmask, lowmask);
+
+ /* all done, now release the pins on primary buckets. */
+ _hash_dropbuf(rel, buf_oblkno);
+ _hash_dropbuf(rel, buf_nblkno);
+
+ return;
+
+ /* Here if decide not to split or fail to acquire old bucket lock */
+fail:
+
+ /* We didn't write the metapage, so just drop lock */
+ LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+}
+
+
+/*
+ * _hash_alloc_buckets -- allocate a new splitpoint's worth of bucket pages
+ *
+ * This does not need to initialize the new bucket pages; we'll do that as
+ * each one is used by _hash_expandtable(). But we have to extend the logical
+ * EOF to the end of the splitpoint; this keeps smgr's idea of the EOF in
+ * sync with ours, so that we don't get complaints from smgr.
+ *
+ * We do this by writing a page of zeroes at the end of the splitpoint range.
+ * We expect that the filesystem will ensure that the intervening pages read
+ * as zeroes too. On many filesystems this "hole" will not be allocated
+ * immediately, which means that the index file may end up more fragmented
+ * than if we forced it all to be allocated now; but since we don't scan
+ * hash indexes sequentially anyway, that probably doesn't matter.
+ *
+ * XXX It's annoying that this code is executed with the metapage lock held.
+ * We need to interlock against _hash_addovflpage() adding a new overflow page
+ * concurrently, but it'd likely be better to use LockRelationForExtension
+ * for the purpose. OTOH, adding a splitpoint is a very infrequent operation,
+ * so it may not be worth worrying about.
+ *
+ * Returns true if successful, or false if allocation failed due to
+ * BlockNumber overflow.
+ */
+static bool
+_hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
+{
+ BlockNumber lastblock;
+ PGAlignedBlock zerobuf;
+ Page page;
+ HashPageOpaque ovflopaque;
+
+ lastblock = firstblock + nblocks - 1;
+
+ /*
+ * Check for overflow in block number calculation; if so, we cannot extend
+ * the index anymore.
+ */
+ if (lastblock < firstblock || lastblock == InvalidBlockNumber)
+ return false;
+
+ page = (Page) zerobuf.data;
+
+ /*
+ * Initialize the page. Just zeroing the page won't work; see
+ * _hash_freeovflpage for similar usage. We take care to make the special
+ * space valid for the benefit of tools such as pageinspect.
+ */
+ _hash_pageinit(page, BLCKSZ);
+
+ ovflopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ ovflopaque->hasho_prevblkno = InvalidBlockNumber;
+ ovflopaque->hasho_nextblkno = InvalidBlockNumber;
+ ovflopaque->hasho_bucket = -1;
+ ovflopaque->hasho_flag = LH_UNUSED_PAGE;
+ ovflopaque->hasho_page_id = HASHO_PAGE_ID;
+
+ if (RelationNeedsWAL(rel))
+ log_newpage(&rel->rd_node,
+ MAIN_FORKNUM,
+ lastblock,
+ zerobuf.data,
+ true);
+
+ RelationOpenSmgr(rel);
+ PageSetChecksumInplace(page, lastblock);
+ smgrextend(rel->rd_smgr, MAIN_FORKNUM, lastblock, zerobuf.data, false);
+
+ return true;
+}
+
+
+/*
+ * _hash_splitbucket -- split 'obucket' into 'obucket' and 'nbucket'
+ *
+ * This routine is used to partition the tuples between old and new bucket and
+ * is used to finish the incomplete split operations. To finish the previously
+ * interrupted split operation, the caller needs to fill htab. If htab is set,
+ * then we skip the movement of tuples that exists in htab, otherwise NULL
+ * value of htab indicates movement of all the tuples that belong to the new
+ * bucket.
+ *
+ * We are splitting a bucket that consists of a base bucket page and zero
+ * or more overflow (bucket chain) pages. We must relocate tuples that
+ * belong in the new bucket.
+ *
+ * The caller must hold cleanup locks on both buckets to ensure that
+ * no one else is trying to access them (see README).
+ *
+ * The caller must hold a pin, but no lock, on the metapage buffer.
+ * The buffer is returned in the same state. (The metapage is only
+ * touched if it becomes necessary to add or remove overflow pages.)
+ *
+ * Split needs to retain pin on primary bucket pages of both old and new
+ * buckets till end of operation. This is to prevent vacuum from starting
+ * while a split is in progress.
+ *
+ * In addition, the caller must have created the new bucket's base page,
+ * which is passed in buffer nbuf, pinned and write-locked. The lock will be
+ * released here and pin must be released by the caller. (The API is set up
+ * this way because we must do _hash_getnewbuf() before releasing the metapage
+ * write lock. So instead of passing the new bucket's start block number, we
+ * pass an actual buffer.)
+ */
+static void
+_hash_splitbucket(Relation rel,
+ Buffer metabuf,
+ Bucket obucket,
+ Bucket nbucket,
+ Buffer obuf,
+ Buffer nbuf,
+ HTAB *htab,
+ uint32 maxbucket,
+ uint32 highmask,
+ uint32 lowmask)
+{
+ Buffer bucket_obuf;
+ Buffer bucket_nbuf;
+ Page opage;
+ Page npage;
+ HashPageOpaque oopaque;
+ HashPageOpaque nopaque;
+ OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
+ IndexTuple itups[MaxIndexTuplesPerPage];
+ Size all_tups_size = 0;
+ int i;
+ uint16 nitups = 0;
+
+ bucket_obuf = obuf;
+ opage = BufferGetPage(obuf);
+ oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+ bucket_nbuf = nbuf;
+ npage = BufferGetPage(nbuf);
+ nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+
+ /* Copy the predicate locks from old bucket to new bucket. */
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(bucket_obuf),
+ BufferGetBlockNumber(bucket_nbuf));
+
+ /*
+ * Partition the tuples in the old bucket between the old bucket and the
+ * new bucket, advancing along the old bucket's overflow bucket chain and
+ * adding overflow pages to the new bucket as needed. Outer loop iterates
+ * once per page in old bucket.
+ */
+ for (;;)
+ {
+ BlockNumber oblkno;
+ OffsetNumber ooffnum;
+ OffsetNumber omaxoffnum;
+
+ /* Scan each tuple in old page */
+ omaxoffnum = PageGetMaxOffsetNumber(opage);
+ for (ooffnum = FirstOffsetNumber;
+ ooffnum <= omaxoffnum;
+ ooffnum = OffsetNumberNext(ooffnum))
+ {
+ IndexTuple itup;
+ Size itemsz;
+ Bucket bucket;
+ bool found = false;
+
+ /* skip dead tuples */
+ if (ItemIdIsDead(PageGetItemId(opage, ooffnum)))
+ continue;
+
+ /*
+ * Before inserting a tuple, probe the hash table containing TIDs
+ * of tuples belonging to new bucket, if we find a match, then
+ * skip that tuple, else fetch the item's hash key (conveniently
+ * stored in the item) and determine which bucket it now belongs
+ * in.
+ */
+ itup = (IndexTuple) PageGetItem(opage,
+ PageGetItemId(opage, ooffnum));
+
+ if (htab)
+ (void) hash_search(htab, &itup->t_tid, HASH_FIND, &found);
+
+ if (found)
+ continue;
+
+ bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
+ maxbucket, highmask, lowmask);
+
+ if (bucket == nbucket)
+ {
+ IndexTuple new_itup;
+
+ /*
+ * make a copy of index tuple as we have to scribble on it.
+ */
+ new_itup = CopyIndexTuple(itup);
+
+ /*
+ * mark the index tuple as moved by split, such tuples are
+ * skipped by scan if there is split in progress for a bucket.
+ */
+ new_itup->t_info |= INDEX_MOVED_BY_SPLIT_MASK;
+
+ /*
+ * insert the tuple into the new bucket. if it doesn't fit on
+ * the current page in the new bucket, we must allocate a new
+ * overflow page and place the tuple on that page instead.
+ */
+ itemsz = IndexTupleSize(new_itup);
+ itemsz = MAXALIGN(itemsz);
+
+ if (PageGetFreeSpaceForMultipleTuples(npage, nitups + 1) < (all_tups_size + itemsz))
+ {
+ /*
+ * Change the shared buffer state in critical section,
+ * otherwise any error could make it unrecoverable.
+ */
+ START_CRIT_SECTION();
+
+ _hash_pgaddmultitup(rel, nbuf, itups, itup_offsets, nitups);
+ MarkBufferDirty(nbuf);
+ /* log the split operation before releasing the lock */
+ log_split_page(rel, nbuf);
+
+ END_CRIT_SECTION();
+
+ /* drop lock, but keep pin */
+ LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
+
+ /* be tidy */
+ for (i = 0; i < nitups; i++)
+ pfree(itups[i]);
+ nitups = 0;
+ all_tups_size = 0;
+
+ /* chain to a new overflow page */
+ nbuf = _hash_addovflpage(rel, metabuf, nbuf, (nbuf == bucket_nbuf) ? true : false);
+ npage = BufferGetPage(nbuf);
+ nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+ }
+
+ itups[nitups++] = new_itup;
+ all_tups_size += itemsz;
+ }
+ else
+ {
+ /*
+ * the tuple stays on this page, so nothing to do.
+ */
+ Assert(bucket == obucket);
+ }
+ }
+
+ oblkno = oopaque->hasho_nextblkno;
+
+ /* retain the pin on the old primary bucket */
+ if (obuf == bucket_obuf)
+ LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, obuf);
+
+ /* Exit loop if no more overflow pages in old bucket */
+ if (!BlockNumberIsValid(oblkno))
+ {
+ /*
+ * Change the shared buffer state in critical section, otherwise
+ * any error could make it unrecoverable.
+ */
+ START_CRIT_SECTION();
+
+ _hash_pgaddmultitup(rel, nbuf, itups, itup_offsets, nitups);
+ MarkBufferDirty(nbuf);
+ /* log the split operation before releasing the lock */
+ log_split_page(rel, nbuf);
+
+ END_CRIT_SECTION();
+
+ if (nbuf == bucket_nbuf)
+ LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, nbuf);
+
+ /* be tidy */
+ for (i = 0; i < nitups; i++)
+ pfree(itups[i]);
+ break;
+ }
+
+ /* Else, advance to next old page */
+ obuf = _hash_getbuf(rel, oblkno, HASH_READ, LH_OVERFLOW_PAGE);
+ opage = BufferGetPage(obuf);
+ oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+ }
+
+ /*
+ * We're at the end of the old bucket chain, so we're done partitioning
+ * the tuples. Mark the old and new buckets to indicate split is
+ * finished.
+ *
+ * To avoid deadlocks due to locking order of buckets, first lock the old
+ * bucket and then the new bucket.
+ */
+ LockBuffer(bucket_obuf, BUFFER_LOCK_EXCLUSIVE);
+ opage = BufferGetPage(bucket_obuf);
+ oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
+
+ LockBuffer(bucket_nbuf, BUFFER_LOCK_EXCLUSIVE);
+ npage = BufferGetPage(bucket_nbuf);
+ nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+
+ START_CRIT_SECTION();
+
+ oopaque->hasho_flag &= ~LH_BUCKET_BEING_SPLIT;
+ nopaque->hasho_flag &= ~LH_BUCKET_BEING_POPULATED;
+
+ /*
+ * After the split is finished, mark the old bucket to indicate that it
+ * contains deletable tuples. We will clear split-cleanup flag after
+ * deleting such tuples either at the end of split or at the next split
+ * from old bucket or at the time of vacuum.
+ */
+ oopaque->hasho_flag |= LH_BUCKET_NEEDS_SPLIT_CLEANUP;
+
+ /*
+ * now write the buffers, here we don't release the locks as caller is
+ * responsible to release locks.
+ */
+ MarkBufferDirty(bucket_obuf);
+ MarkBufferDirty(bucket_nbuf);
+
+ if (RelationNeedsWAL(rel))
+ {
+ XLogRecPtr recptr;
+ xl_hash_split_complete xlrec;
+
+ xlrec.old_bucket_flag = oopaque->hasho_flag;
+ xlrec.new_bucket_flag = nopaque->hasho_flag;
+
+ XLogBeginInsert();
+
+ XLogRegisterData((char *) &xlrec, SizeOfHashSplitComplete);
+
+ XLogRegisterBuffer(0, bucket_obuf, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, bucket_nbuf, REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_COMPLETE);
+
+ PageSetLSN(BufferGetPage(bucket_obuf), recptr);
+ PageSetLSN(BufferGetPage(bucket_nbuf), recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ /*
+ * If possible, clean up the old bucket. We might not be able to do this
+ * if someone else has a pin on it, but if not then we can go ahead. This
+ * isn't absolutely necessary, but it reduces bloat; if we don't do it
+ * now, VACUUM will do it eventually, but maybe not until new overflow
+ * pages have been allocated. Note that there's no need to clean up the
+ * new bucket.
+ */
+ if (IsBufferCleanupOK(bucket_obuf))
+ {
+ LockBuffer(bucket_nbuf, BUFFER_LOCK_UNLOCK);
+ hashbucketcleanup(rel, obucket, bucket_obuf,
+ BufferGetBlockNumber(bucket_obuf), NULL,
+ maxbucket, highmask, lowmask, NULL, NULL, true,
+ NULL, NULL);
+ }
+ else
+ {
+ LockBuffer(bucket_nbuf, BUFFER_LOCK_UNLOCK);
+ LockBuffer(bucket_obuf, BUFFER_LOCK_UNLOCK);
+ }
+}
+
+/*
+ * _hash_finish_split() -- Finish the previously interrupted split operation
+ *
+ * To complete the split operation, we form the hash table of TIDs in new
+ * bucket which is then used by split operation to skip tuples that are
+ * already moved before the split operation was previously interrupted.
+ *
+ * The caller must hold a pin, but no lock, on the metapage and old bucket's
+ * primary page buffer. The buffers are returned in the same state. (The
+ * metapage is only touched if it becomes necessary to add or remove overflow
+ * pages.)
+ */
+void
+_hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf, Bucket obucket,
+ uint32 maxbucket, uint32 highmask, uint32 lowmask)
+{
+ HASHCTL hash_ctl;
+ HTAB *tidhtab;
+ Buffer bucket_nbuf = InvalidBuffer;
+ Buffer nbuf;
+ Page npage;
+ BlockNumber nblkno;
+ BlockNumber bucket_nblkno;
+ HashPageOpaque npageopaque;
+ Bucket nbucket;
+ bool found;
+
+ /* Initialize hash tables used to track TIDs */
+ hash_ctl.keysize = sizeof(ItemPointerData);
+ hash_ctl.entrysize = sizeof(ItemPointerData);
+ hash_ctl.hcxt = CurrentMemoryContext;
+
+ tidhtab =
+ hash_create("bucket ctids",
+ 256, /* arbitrary initial size */
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+ bucket_nblkno = nblkno = _hash_get_newblock_from_oldbucket(rel, obucket);
+
+ /*
+ * Scan the new bucket and build hash table of TIDs
+ */
+ for (;;)
+ {
+ OffsetNumber noffnum;
+ OffsetNumber nmaxoffnum;
+
+ nbuf = _hash_getbuf(rel, nblkno, HASH_READ,
+ LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+
+ /* remember the primary bucket buffer to acquire cleanup lock on it. */
+ if (nblkno == bucket_nblkno)
+ bucket_nbuf = nbuf;
+
+ npage = BufferGetPage(nbuf);
+ npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+
+ /* Scan each tuple in new page */
+ nmaxoffnum = PageGetMaxOffsetNumber(npage);
+ for (noffnum = FirstOffsetNumber;
+ noffnum <= nmaxoffnum;
+ noffnum = OffsetNumberNext(noffnum))
+ {
+ IndexTuple itup;
+
+ /* Fetch the item's TID and insert it in hash table. */
+ itup = (IndexTuple) PageGetItem(npage,
+ PageGetItemId(npage, noffnum));
+
+ (void) hash_search(tidhtab, &itup->t_tid, HASH_ENTER, &found);
+
+ Assert(!found);
+ }
+
+ nblkno = npageopaque->hasho_nextblkno;
+
+ /*
+ * release our write lock without modifying buffer and ensure to
+ * retain the pin on primary bucket.
+ */
+ if (nbuf == bucket_nbuf)
+ LockBuffer(nbuf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, nbuf);
+
+ /* Exit loop if no more overflow pages in new bucket */
+ if (!BlockNumberIsValid(nblkno))
+ break;
+ }
+
+ /*
+ * Conditionally get the cleanup lock on old and new buckets to perform
+ * the split operation. If we don't get the cleanup locks, silently give
+ * up and next insertion on old bucket will try again to complete the
+ * split.
+ */
+ if (!ConditionalLockBufferForCleanup(obuf))
+ {
+ hash_destroy(tidhtab);
+ return;
+ }
+ if (!ConditionalLockBufferForCleanup(bucket_nbuf))
+ {
+ LockBuffer(obuf, BUFFER_LOCK_UNLOCK);
+ hash_destroy(tidhtab);
+ return;
+ }
+
+ npage = BufferGetPage(bucket_nbuf);
+ npageopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
+ nbucket = npageopaque->hasho_bucket;
+
+ _hash_splitbucket(rel, metabuf, obucket,
+ nbucket, obuf, bucket_nbuf, tidhtab,
+ maxbucket, highmask, lowmask);
+
+ _hash_dropbuf(rel, bucket_nbuf);
+ hash_destroy(tidhtab);
+}
+
+/*
+ * log_split_page() -- Log the split operation
+ *
+ * We log the split operation when the new page in new bucket gets full,
+ * so we log the entire page.
+ *
+ * 'buf' must be locked by the caller which is also responsible for unlocking
+ * it.
+ */
+static void
+log_split_page(Relation rel, Buffer buf)
+{
+ if (RelationNeedsWAL(rel))
+ {
+ XLogRecPtr recptr;
+
+ XLogBeginInsert();
+
+ XLogRegisterBuffer(0, buf, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
+
+ recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_PAGE);
+
+ PageSetLSN(BufferGetPage(buf), recptr);
+ }
+}
+
+/*
+ * _hash_getcachedmetap() -- Returns cached metapage data.
+ *
+ * If metabuf is not InvalidBuffer, caller must hold a pin, but no lock, on
+ * the metapage. If not set, we'll set it before returning if we have to
+ * refresh the cache, and return with a pin but no lock on it; caller is
+ * responsible for releasing the pin.
+ *
+ * We refresh the cache if it's not initialized yet or force_refresh is true.
+ */
+HashMetaPage
+_hash_getcachedmetap(Relation rel, Buffer *metabuf, bool force_refresh)
+{
+ Page page;
+
+ Assert(metabuf);
+ if (force_refresh || rel->rd_amcache == NULL)
+ {
+ char *cache = NULL;
+
+ /*
+ * It's important that we don't set rd_amcache to an invalid value.
+ * Either MemoryContextAlloc or _hash_getbuf could fail, so don't
+ * install a pointer to the newly-allocated storage in the actual
+ * relcache entry until both have succeeded.
+ */
+ if (rel->rd_amcache == NULL)
+ cache = MemoryContextAlloc(rel->rd_indexcxt,
+ sizeof(HashMetaPageData));
+
+ /* Read the metapage. */
+ if (BufferIsValid(*metabuf))
+ LockBuffer(*metabuf, BUFFER_LOCK_SHARE);
+ else
+ *metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ,
+ LH_META_PAGE);
+ page = BufferGetPage(*metabuf);
+
+ /* Populate the cache. */
+ if (rel->rd_amcache == NULL)
+ rel->rd_amcache = cache;
+ memcpy(rel->rd_amcache, HashPageGetMeta(page),
+ sizeof(HashMetaPageData));
+
+ /* Release metapage lock, but keep the pin. */
+ LockBuffer(*metabuf, BUFFER_LOCK_UNLOCK);
+ }
+
+ return (HashMetaPage) rel->rd_amcache;
+}
+
+/*
+ * _hash_getbucketbuf_from_hashkey() -- Get the bucket's buffer for the given
+ * hashkey.
+ *
+ * Bucket pages do not move or get removed once they are allocated. This give
+ * us an opportunity to use the previously saved metapage contents to reach
+ * the target bucket buffer, instead of reading from the metapage every time.
+ * This saves one buffer access every time we want to reach the target bucket
+ * buffer, which is very helpful savings in bufmgr traffic and contention.
+ *
+ * The access type parameter (HASH_READ or HASH_WRITE) indicates whether the
+ * bucket buffer has to be locked for reading or writing.
+ *
+ * The out parameter cachedmetap is set with metapage contents used for
+ * hashkey to bucket buffer mapping. Some callers need this info to reach the
+ * old bucket in case of bucket split, see _hash_doinsert().
+ */
+Buffer
+_hash_getbucketbuf_from_hashkey(Relation rel, uint32 hashkey, int access,
+ HashMetaPage *cachedmetap)
+{
+ HashMetaPage metap;
+ Buffer buf;
+ Buffer metabuf = InvalidBuffer;
+ Page page;
+ Bucket bucket;
+ BlockNumber blkno;
+ HashPageOpaque opaque;
+
+ /* We read from target bucket buffer, hence locking is must. */
+ Assert(access == HASH_READ || access == HASH_WRITE);
+
+ metap = _hash_getcachedmetap(rel, &metabuf, false);
+ Assert(metap != NULL);
+
+ /*
+ * Loop until we get a lock on the correct target bucket.
+ */
+ for (;;)
+ {
+ /*
+ * Compute the target bucket number, and convert to block number.
+ */
+ bucket = _hash_hashkey2bucket(hashkey,
+ metap->hashm_maxbucket,
+ metap->hashm_highmask,
+ metap->hashm_lowmask);
+
+ blkno = BUCKET_TO_BLKNO(metap, bucket);
+
+ /* Fetch the primary bucket page for the bucket */
+ buf = _hash_getbuf(rel, blkno, access, LH_BUCKET_PAGE);
+ page = BufferGetPage(buf);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ Assert(opaque->hasho_bucket == bucket);
+ Assert(opaque->hasho_prevblkno != InvalidBlockNumber);
+
+ /*
+ * If this bucket hasn't been split, we're done.
+ */
+ if (opaque->hasho_prevblkno <= metap->hashm_maxbucket)
+ break;
+
+ /* Drop lock on this buffer, update cached metapage, and retry. */
+ _hash_relbuf(rel, buf);
+ metap = _hash_getcachedmetap(rel, &metabuf, true);
+ Assert(metap != NULL);
+ }
+
+ if (BufferIsValid(metabuf))
+ _hash_dropbuf(rel, metabuf);
+
+ if (cachedmetap)
+ *cachedmetap = metap;
+
+ return buf;
+}
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
new file mode 100644
index 0000000..2ffa28e
--- /dev/null
+++ b/src/backend/access/hash/hashsearch.c
@@ -0,0 +1,721 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashsearch.c
+ * search code for postgres hash tables
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashsearch.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/relscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/predicate.h"
+#include "utils/rel.h"
+
+static bool _hash_readpage(IndexScanDesc scan, Buffer *bufP,
+ ScanDirection dir);
+static int _hash_load_qualified_items(IndexScanDesc scan, Page page,
+ OffsetNumber offnum, ScanDirection dir);
+static inline void _hash_saveitem(HashScanOpaque so, int itemIndex,
+ OffsetNumber offnum, IndexTuple itup);
+static void _hash_readnext(IndexScanDesc scan, Buffer *bufp,
+ Page *pagep, HashPageOpaque *opaquep);
+
+/*
+ * _hash_next() -- Get the next item in a scan.
+ *
+ * On entry, so->currPos describes the current page, which may
+ * be pinned but not locked, and so->currPos.itemIndex identifies
+ * which item was previously returned.
+ *
+ * On successful exit, scan->xs_ctup.t_self is set to the TID
+ * of the next heap tuple. so->currPos is updated as needed.
+ *
+ * On failure exit (no more tuples), we return false with pin
+ * held on bucket page but no pins or locks held on overflow
+ * page.
+ */
+bool
+_hash_next(IndexScanDesc scan, ScanDirection dir)
+{
+ Relation rel = scan->indexRelation;
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ HashScanPosItem *currItem;
+ BlockNumber blkno;
+ Buffer buf;
+ bool end_of_scan = false;
+
+ /*
+ * Advance to the next tuple on the current page; or if done, try to read
+ * data from the next or previous page based on the scan direction. Before
+ * moving to the next or previous page make sure that we deal with all the
+ * killed items.
+ */
+ if (ScanDirectionIsForward(dir))
+ {
+ if (++so->currPos.itemIndex > so->currPos.lastItem)
+ {
+ if (so->numKilled > 0)
+ _hash_kill_items(scan);
+
+ blkno = so->currPos.nextPage;
+ if (BlockNumberIsValid(blkno))
+ {
+ buf = _hash_getbuf(rel, blkno, HASH_READ, LH_OVERFLOW_PAGE);
+ TestForOldSnapshot(scan->xs_snapshot, rel, BufferGetPage(buf));
+ if (!_hash_readpage(scan, &buf, dir))
+ end_of_scan = true;
+ }
+ else
+ end_of_scan = true;
+ }
+ }
+ else
+ {
+ if (--so->currPos.itemIndex < so->currPos.firstItem)
+ {
+ if (so->numKilled > 0)
+ _hash_kill_items(scan);
+
+ blkno = so->currPos.prevPage;
+ if (BlockNumberIsValid(blkno))
+ {
+ buf = _hash_getbuf(rel, blkno, HASH_READ,
+ LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ TestForOldSnapshot(scan->xs_snapshot, rel, BufferGetPage(buf));
+
+ /*
+ * We always maintain the pin on bucket page for whole scan
+ * operation, so releasing the additional pin we have acquired
+ * here.
+ */
+ if (buf == so->hashso_bucket_buf ||
+ buf == so->hashso_split_bucket_buf)
+ _hash_dropbuf(rel, buf);
+
+ if (!_hash_readpage(scan, &buf, dir))
+ end_of_scan = true;
+ }
+ else
+ end_of_scan = true;
+ }
+ }
+
+ if (end_of_scan)
+ {
+ _hash_dropscanbuf(rel, so);
+ HashScanPosInvalidate(so->currPos);
+ return false;
+ }
+
+ /* OK, itemIndex says what to return */
+ currItem = &so->currPos.items[so->currPos.itemIndex];
+ scan->xs_heaptid = currItem->heapTid;
+
+ return true;
+}
+
+/*
+ * Advance to next page in a bucket, if any. If we are scanning the bucket
+ * being populated during split operation then this function advances to the
+ * bucket being split after the last bucket page of bucket being populated.
+ */
+static void
+_hash_readnext(IndexScanDesc scan,
+ Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
+{
+ BlockNumber blkno;
+ Relation rel = scan->indexRelation;
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ bool block_found = false;
+
+ blkno = (*opaquep)->hasho_nextblkno;
+
+ /*
+ * Retain the pin on primary bucket page till the end of scan. Refer the
+ * comments in _hash_first to know the reason of retaining pin.
+ */
+ if (*bufp == so->hashso_bucket_buf || *bufp == so->hashso_split_bucket_buf)
+ LockBuffer(*bufp, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, *bufp);
+
+ *bufp = InvalidBuffer;
+ /* check for interrupts while we're not holding any buffer lock */
+ CHECK_FOR_INTERRUPTS();
+ if (BlockNumberIsValid(blkno))
+ {
+ *bufp = _hash_getbuf(rel, blkno, HASH_READ, LH_OVERFLOW_PAGE);
+ block_found = true;
+ }
+ else if (so->hashso_buc_populated && !so->hashso_buc_split)
+ {
+ /*
+ * end of bucket, scan bucket being split if there was a split in
+ * progress at the start of scan.
+ */
+ *bufp = so->hashso_split_bucket_buf;
+
+ /*
+ * buffer for bucket being split must be valid as we acquire the pin
+ * on it before the start of scan and retain it till end of scan.
+ */
+ Assert(BufferIsValid(*bufp));
+
+ LockBuffer(*bufp, BUFFER_LOCK_SHARE);
+ PredicateLockPage(rel, BufferGetBlockNumber(*bufp), scan->xs_snapshot);
+
+ /*
+ * setting hashso_buc_split to true indicates that we are scanning
+ * bucket being split.
+ */
+ so->hashso_buc_split = true;
+
+ block_found = true;
+ }
+
+ if (block_found)
+ {
+ *pagep = BufferGetPage(*bufp);
+ TestForOldSnapshot(scan->xs_snapshot, rel, *pagep);
+ *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
+ }
+}
+
+/*
+ * Advance to previous page in a bucket, if any. If the current scan has
+ * started during split operation then this function advances to bucket
+ * being populated after the first bucket page of bucket being split.
+ */
+static void
+_hash_readprev(IndexScanDesc scan,
+ Buffer *bufp, Page *pagep, HashPageOpaque *opaquep)
+{
+ BlockNumber blkno;
+ Relation rel = scan->indexRelation;
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ bool haveprevblk;
+
+ blkno = (*opaquep)->hasho_prevblkno;
+
+ /*
+ * Retain the pin on primary bucket page till the end of scan. Refer the
+ * comments in _hash_first to know the reason of retaining pin.
+ */
+ if (*bufp == so->hashso_bucket_buf || *bufp == so->hashso_split_bucket_buf)
+ {
+ LockBuffer(*bufp, BUFFER_LOCK_UNLOCK);
+ haveprevblk = false;
+ }
+ else
+ {
+ _hash_relbuf(rel, *bufp);
+ haveprevblk = true;
+ }
+
+ *bufp = InvalidBuffer;
+ /* check for interrupts while we're not holding any buffer lock */
+ CHECK_FOR_INTERRUPTS();
+
+ if (haveprevblk)
+ {
+ Assert(BlockNumberIsValid(blkno));
+ *bufp = _hash_getbuf(rel, blkno, HASH_READ,
+ LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ *pagep = BufferGetPage(*bufp);
+ TestForOldSnapshot(scan->xs_snapshot, rel, *pagep);
+ *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
+
+ /*
+ * We always maintain the pin on bucket page for whole scan operation,
+ * so releasing the additional pin we have acquired here.
+ */
+ if (*bufp == so->hashso_bucket_buf || *bufp == so->hashso_split_bucket_buf)
+ _hash_dropbuf(rel, *bufp);
+ }
+ else if (so->hashso_buc_populated && so->hashso_buc_split)
+ {
+ /*
+ * end of bucket, scan bucket being populated if there was a split in
+ * progress at the start of scan.
+ */
+ *bufp = so->hashso_bucket_buf;
+
+ /*
+ * buffer for bucket being populated must be valid as we acquire the
+ * pin on it before the start of scan and retain it till end of scan.
+ */
+ Assert(BufferIsValid(*bufp));
+
+ LockBuffer(*bufp, BUFFER_LOCK_SHARE);
+ *pagep = BufferGetPage(*bufp);
+ *opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
+
+ /* move to the end of bucket chain */
+ while (BlockNumberIsValid((*opaquep)->hasho_nextblkno))
+ _hash_readnext(scan, bufp, pagep, opaquep);
+
+ /*
+ * setting hashso_buc_split to false indicates that we are scanning
+ * bucket being populated.
+ */
+ so->hashso_buc_split = false;
+ }
+}
+
+/*
+ * _hash_first() -- Find the first item in a scan.
+ *
+ * We find the first item (or, if backward scan, the last item) in the
+ * index that satisfies the qualification associated with the scan
+ * descriptor.
+ *
+ * On successful exit, if the page containing current index tuple is an
+ * overflow page, both pin and lock are released whereas if it is a bucket
+ * page then it is pinned but not locked and data about the matching
+ * tuple(s) on the page has been loaded into so->currPos,
+ * scan->xs_ctup.t_self is set to the heap TID of the current tuple.
+ *
+ * On failure exit (no more tuples), we return false, with pin held on
+ * bucket page but no pins or locks held on overflow page.
+ */
+bool
+_hash_first(IndexScanDesc scan, ScanDirection dir)
+{
+ Relation rel = scan->indexRelation;
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ ScanKey cur;
+ uint32 hashkey;
+ Bucket bucket;
+ Buffer buf;
+ Page page;
+ HashPageOpaque opaque;
+ HashScanPosItem *currItem;
+
+ pgstat_count_index_scan(rel);
+
+ /*
+ * We do not support hash scans with no index qualification, because we
+ * would have to read the whole index rather than just one bucket. That
+ * creates a whole raft of problems, since we haven't got a practical way
+ * to lock all the buckets against splits or compactions.
+ */
+ if (scan->numberOfKeys < 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("hash indexes do not support whole-index scans")));
+
+ /* There may be more than one index qual, but we hash only the first */
+ cur = &scan->keyData[0];
+
+ /* We support only single-column hash indexes */
+ Assert(cur->sk_attno == 1);
+ /* And there's only one operator strategy, too */
+ Assert(cur->sk_strategy == HTEqualStrategyNumber);
+
+ /*
+ * If the constant in the index qual is NULL, assume it cannot match any
+ * items in the index.
+ */
+ if (cur->sk_flags & SK_ISNULL)
+ return false;
+
+ /*
+ * Okay to compute the hash key. We want to do this before acquiring any
+ * locks, in case a user-defined hash function happens to be slow.
+ *
+ * If scankey operator is not a cross-type comparison, we can use the
+ * cached hash function; otherwise gotta look it up in the catalogs.
+ *
+ * We support the convention that sk_subtype == InvalidOid means the
+ * opclass input type; this is a hack to simplify life for ScanKeyInit().
+ */
+ if (cur->sk_subtype == rel->rd_opcintype[0] ||
+ cur->sk_subtype == InvalidOid)
+ hashkey = _hash_datum2hashkey(rel, cur->sk_argument);
+ else
+ hashkey = _hash_datum2hashkey_type(rel, cur->sk_argument,
+ cur->sk_subtype);
+
+ so->hashso_sk_hash = hashkey;
+
+ buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_READ, NULL);
+ PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot);
+ page = BufferGetPage(buf);
+ TestForOldSnapshot(scan->xs_snapshot, rel, page);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ bucket = opaque->hasho_bucket;
+
+ so->hashso_bucket_buf = buf;
+
+ /*
+ * If a bucket split is in progress, then while scanning the bucket being
+ * populated, we need to skip tuples that were copied from bucket being
+ * split. We also need to maintain a pin on the bucket being split to
+ * ensure that split-cleanup work done by vacuum doesn't remove tuples
+ * from it till this scan is done. We need to maintain a pin on the
+ * bucket being populated to ensure that vacuum doesn't squeeze that
+ * bucket till this scan is complete; otherwise, the ordering of tuples
+ * can't be maintained during forward and backward scans. Here, we have
+ * to be cautious about locking order: first, acquire the lock on bucket
+ * being split; then, release the lock on it but not the pin; then,
+ * acquire a lock on bucket being populated and again re-verify whether
+ * the bucket split is still in progress. Acquiring the lock on bucket
+ * being split first ensures that the vacuum waits for this scan to
+ * finish.
+ */
+ if (H_BUCKET_BEING_POPULATED(opaque))
+ {
+ BlockNumber old_blkno;
+ Buffer old_buf;
+
+ old_blkno = _hash_get_oldblock_from_newbucket(rel, bucket);
+
+ /*
+ * release the lock on new bucket and re-acquire it after acquiring
+ * the lock on old bucket.
+ */
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ old_buf = _hash_getbuf(rel, old_blkno, HASH_READ, LH_BUCKET_PAGE);
+ TestForOldSnapshot(scan->xs_snapshot, rel, BufferGetPage(old_buf));
+
+ /*
+ * remember the split bucket buffer so as to use it later for
+ * scanning.
+ */
+ so->hashso_split_bucket_buf = old_buf;
+ LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
+
+ LockBuffer(buf, BUFFER_LOCK_SHARE);
+ page = BufferGetPage(buf);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ Assert(opaque->hasho_bucket == bucket);
+
+ if (H_BUCKET_BEING_POPULATED(opaque))
+ so->hashso_buc_populated = true;
+ else
+ {
+ _hash_dropbuf(rel, so->hashso_split_bucket_buf);
+ so->hashso_split_bucket_buf = InvalidBuffer;
+ }
+ }
+
+ /* If a backwards scan is requested, move to the end of the chain */
+ if (ScanDirectionIsBackward(dir))
+ {
+ /*
+ * Backward scans that start during split needs to start from end of
+ * bucket being split.
+ */
+ while (BlockNumberIsValid(opaque->hasho_nextblkno) ||
+ (so->hashso_buc_populated && !so->hashso_buc_split))
+ _hash_readnext(scan, &buf, &page, &opaque);
+ }
+
+ /* remember which buffer we have pinned, if any */
+ Assert(BufferIsInvalid(so->currPos.buf));
+ so->currPos.buf = buf;
+
+ /* Now find all the tuples satisfying the qualification from a page */
+ if (!_hash_readpage(scan, &buf, dir))
+ return false;
+
+ /* OK, itemIndex says what to return */
+ currItem = &so->currPos.items[so->currPos.itemIndex];
+ scan->xs_heaptid = currItem->heapTid;
+
+ /* if we're here, _hash_readpage found a valid tuples */
+ return true;
+}
+
+/*
+ * _hash_readpage() -- Load data from current index page into so->currPos
+ *
+ * We scan all the items in the current index page and save them into
+ * so->currPos if it satisfies the qualification. If no matching items
+ * are found in the current page, we move to the next or previous page
+ * in a bucket chain as indicated by the direction.
+ *
+ * Return true if any matching items are found else return false.
+ */
+static bool
+_hash_readpage(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
+{
+ Relation rel = scan->indexRelation;
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ Buffer buf;
+ Page page;
+ HashPageOpaque opaque;
+ OffsetNumber offnum;
+ uint16 itemIndex;
+
+ buf = *bufP;
+ Assert(BufferIsValid(buf));
+ _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
+ page = BufferGetPage(buf);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ so->currPos.buf = buf;
+ so->currPos.currPage = BufferGetBlockNumber(buf);
+
+ if (ScanDirectionIsForward(dir))
+ {
+ BlockNumber prev_blkno = InvalidBlockNumber;
+
+ for (;;)
+ {
+ /* new page, locate starting position by binary search */
+ offnum = _hash_binsearch(page, so->hashso_sk_hash);
+
+ itemIndex = _hash_load_qualified_items(scan, page, offnum, dir);
+
+ if (itemIndex != 0)
+ break;
+
+ /*
+ * Could not find any matching tuples in the current page, move to
+ * the next page. Before leaving the current page, deal with any
+ * killed items.
+ */
+ if (so->numKilled > 0)
+ _hash_kill_items(scan);
+
+ /*
+ * If this is a primary bucket page, hasho_prevblkno is not a real
+ * block number.
+ */
+ if (so->currPos.buf == so->hashso_bucket_buf ||
+ so->currPos.buf == so->hashso_split_bucket_buf)
+ prev_blkno = InvalidBlockNumber;
+ else
+ prev_blkno = opaque->hasho_prevblkno;
+
+ _hash_readnext(scan, &buf, &page, &opaque);
+ if (BufferIsValid(buf))
+ {
+ so->currPos.buf = buf;
+ so->currPos.currPage = BufferGetBlockNumber(buf);
+ }
+ else
+ {
+ /*
+ * Remember next and previous block numbers for scrollable
+ * cursors to know the start position and return false
+ * indicating that no more matching tuples were found. Also,
+ * don't reset currPage or lsn, because we expect
+ * _hash_kill_items to be called for the old page after this
+ * function returns.
+ */
+ so->currPos.prevPage = prev_blkno;
+ so->currPos.nextPage = InvalidBlockNumber;
+ so->currPos.buf = buf;
+ return false;
+ }
+ }
+
+ so->currPos.firstItem = 0;
+ so->currPos.lastItem = itemIndex - 1;
+ so->currPos.itemIndex = 0;
+ }
+ else
+ {
+ BlockNumber next_blkno = InvalidBlockNumber;
+
+ for (;;)
+ {
+ /* new page, locate starting position by binary search */
+ offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
+
+ itemIndex = _hash_load_qualified_items(scan, page, offnum, dir);
+
+ if (itemIndex != MaxIndexTuplesPerPage)
+ break;
+
+ /*
+ * Could not find any matching tuples in the current page, move to
+ * the previous page. Before leaving the current page, deal with
+ * any killed items.
+ */
+ if (so->numKilled > 0)
+ _hash_kill_items(scan);
+
+ if (so->currPos.buf == so->hashso_bucket_buf ||
+ so->currPos.buf == so->hashso_split_bucket_buf)
+ next_blkno = opaque->hasho_nextblkno;
+
+ _hash_readprev(scan, &buf, &page, &opaque);
+ if (BufferIsValid(buf))
+ {
+ so->currPos.buf = buf;
+ so->currPos.currPage = BufferGetBlockNumber(buf);
+ }
+ else
+ {
+ /*
+ * Remember next and previous block numbers for scrollable
+ * cursors to know the start position and return false
+ * indicating that no more matching tuples were found. Also,
+ * don't reset currPage or lsn, because we expect
+ * _hash_kill_items to be called for the old page after this
+ * function returns.
+ */
+ so->currPos.prevPage = InvalidBlockNumber;
+ so->currPos.nextPage = next_blkno;
+ so->currPos.buf = buf;
+ return false;
+ }
+ }
+
+ so->currPos.firstItem = itemIndex;
+ so->currPos.lastItem = MaxIndexTuplesPerPage - 1;
+ so->currPos.itemIndex = MaxIndexTuplesPerPage - 1;
+ }
+
+ if (so->currPos.buf == so->hashso_bucket_buf ||
+ so->currPos.buf == so->hashso_split_bucket_buf)
+ {
+ so->currPos.prevPage = InvalidBlockNumber;
+ so->currPos.nextPage = opaque->hasho_nextblkno;
+ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
+ }
+ else
+ {
+ so->currPos.prevPage = opaque->hasho_prevblkno;
+ so->currPos.nextPage = opaque->hasho_nextblkno;
+ _hash_relbuf(rel, so->currPos.buf);
+ so->currPos.buf = InvalidBuffer;
+ }
+
+ Assert(so->currPos.firstItem <= so->currPos.lastItem);
+ return true;
+}
+
+/*
+ * Load all the qualified items from a current index page
+ * into so->currPos. Helper function for _hash_readpage.
+ */
+static int
+_hash_load_qualified_items(IndexScanDesc scan, Page page,
+ OffsetNumber offnum, ScanDirection dir)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ IndexTuple itup;
+ int itemIndex;
+ OffsetNumber maxoff;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ if (ScanDirectionIsForward(dir))
+ {
+ /* load items[] in ascending order */
+ itemIndex = 0;
+
+ while (offnum <= maxoff)
+ {
+ Assert(offnum >= FirstOffsetNumber);
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+
+ /*
+ * skip the tuples that are moved by split operation for the scan
+ * that has started when split was in progress. Also, skip the
+ * tuples that are marked as dead.
+ */
+ if ((so->hashso_buc_populated && !so->hashso_buc_split &&
+ (itup->t_info & INDEX_MOVED_BY_SPLIT_MASK)) ||
+ (scan->ignore_killed_tuples &&
+ (ItemIdIsDead(PageGetItemId(page, offnum)))))
+ {
+ offnum = OffsetNumberNext(offnum); /* move forward */
+ continue;
+ }
+
+ if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup) &&
+ _hash_checkqual(scan, itup))
+ {
+ /* tuple is qualified, so remember it */
+ _hash_saveitem(so, itemIndex, offnum, itup);
+ itemIndex++;
+ }
+ else
+ {
+ /*
+ * No more matching tuples exist in this page. so, exit while
+ * loop.
+ */
+ break;
+ }
+
+ offnum = OffsetNumberNext(offnum);
+ }
+
+ Assert(itemIndex <= MaxIndexTuplesPerPage);
+ return itemIndex;
+ }
+ else
+ {
+ /* load items[] in descending order */
+ itemIndex = MaxIndexTuplesPerPage;
+
+ while (offnum >= FirstOffsetNumber)
+ {
+ Assert(offnum <= maxoff);
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
+
+ /*
+ * skip the tuples that are moved by split operation for the scan
+ * that has started when split was in progress. Also, skip the
+ * tuples that are marked as dead.
+ */
+ if ((so->hashso_buc_populated && !so->hashso_buc_split &&
+ (itup->t_info & INDEX_MOVED_BY_SPLIT_MASK)) ||
+ (scan->ignore_killed_tuples &&
+ (ItemIdIsDead(PageGetItemId(page, offnum)))))
+ {
+ offnum = OffsetNumberPrev(offnum); /* move back */
+ continue;
+ }
+
+ if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup) &&
+ _hash_checkqual(scan, itup))
+ {
+ itemIndex--;
+ /* tuple is qualified, so remember it */
+ _hash_saveitem(so, itemIndex, offnum, itup);
+ }
+ else
+ {
+ /*
+ * No more matching tuples exist in this page. so, exit while
+ * loop.
+ */
+ break;
+ }
+
+ offnum = OffsetNumberPrev(offnum);
+ }
+
+ Assert(itemIndex >= 0);
+ return itemIndex;
+ }
+}
+
+/* Save an index item into so->currPos.items[itemIndex] */
+static inline void
+_hash_saveitem(HashScanOpaque so, int itemIndex,
+ OffsetNumber offnum, IndexTuple itup)
+{
+ HashScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+ currItem->heapTid = itup->t_tid;
+ currItem->indexOffset = offnum;
+}
diff --git a/src/backend/access/hash/hashsort.c b/src/backend/access/hash/hashsort.c
new file mode 100644
index 0000000..3ce4248
--- /dev/null
+++ b/src/backend/access/hash/hashsort.c
@@ -0,0 +1,152 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashsort.c
+ * Sort tuples for insertion into a new hash index.
+ *
+ * When building a very large hash index, we pre-sort the tuples by bucket
+ * number to improve locality of access to the index, and thereby avoid
+ * thrashing. We use tuplesort.c to sort the given index tuples into order.
+ *
+ * Note: if the number of rows in the table has been underestimated,
+ * bucket splits may occur during the index build. In that case we'd
+ * be inserting into two or more buckets for each possible masked-off
+ * hash code value. That's no big problem though, since we'll still have
+ * plenty of locality of access.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashsort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "commands/progress.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "port/pg_bitutils.h"
+#include "utils/tuplesort.h"
+
+
+/*
+ * Status record for spooling/sorting phase.
+ */
+struct HSpool
+{
+ Tuplesortstate *sortstate; /* state data for tuplesort.c */
+ Relation index;
+
+ /*
+ * We sort the hash keys based on the buckets they belong to. Below masks
+ * are used in _hash_hashkey2bucket to determine the bucket of given hash
+ * key.
+ */
+ uint32 high_mask;
+ uint32 low_mask;
+ uint32 max_buckets;
+};
+
+
+/*
+ * create and initialize a spool structure
+ */
+HSpool *
+_h_spoolinit(Relation heap, Relation index, uint32 num_buckets)
+{
+ HSpool *hspool = (HSpool *) palloc0(sizeof(HSpool));
+
+ hspool->index = index;
+
+ /*
+ * Determine the bitmask for hash code values. Since there are currently
+ * num_buckets buckets in the index, the appropriate mask can be computed
+ * as follows.
+ *
+ * NOTE : This hash mask calculation should be in sync with similar
+ * calculation in _hash_init_metabuffer.
+ */
+ hspool->high_mask = pg_nextpower2_32(num_buckets + 1) - 1;
+ hspool->low_mask = (hspool->high_mask >> 1);
+ hspool->max_buckets = num_buckets - 1;
+
+ /*
+ * We size the sort area as maintenance_work_mem rather than work_mem to
+ * speed index creation. This should be OK since a single backend can't
+ * run multiple index creations in parallel.
+ */
+ hspool->sortstate = tuplesort_begin_index_hash(heap,
+ index,
+ hspool->high_mask,
+ hspool->low_mask,
+ hspool->max_buckets,
+ maintenance_work_mem,
+ NULL,
+ false);
+
+ return hspool;
+}
+
+/*
+ * clean up a spool structure and its substructures.
+ */
+void
+_h_spooldestroy(HSpool *hspool)
+{
+ tuplesort_end(hspool->sortstate);
+ pfree(hspool);
+}
+
+/*
+ * spool an index entry into the sort file.
+ */
+void
+_h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull)
+{
+ tuplesort_putindextuplevalues(hspool->sortstate, hspool->index,
+ self, values, isnull);
+}
+
+/*
+ * given a spool loaded by successive calls to _h_spool,
+ * create an entire index.
+ */
+void
+_h_indexbuild(HSpool *hspool, Relation heapRel)
+{
+ IndexTuple itup;
+ int64 tups_done = 0;
+#ifdef USE_ASSERT_CHECKING
+ uint32 hashkey = 0;
+#endif
+
+ tuplesort_performsort(hspool->sortstate);
+
+ while ((itup = tuplesort_getindextuple(hspool->sortstate, true)) != NULL)
+ {
+ /*
+ * Technically, it isn't critical that hash keys be found in sorted
+ * order, since this sorting is only used to increase locality of
+ * access as a performance optimization. It still seems like a good
+ * idea to test tuplesort.c's handling of hash index tuple sorts
+ * through an assertion, though.
+ */
+#ifdef USE_ASSERT_CHECKING
+ uint32 lasthashkey = hashkey;
+
+ hashkey = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
+ hspool->max_buckets, hspool->high_mask,
+ hspool->low_mask);
+ Assert(hashkey >= lasthashkey);
+#endif
+
+ _hash_doinsert(hspool->index, itup, heapRel);
+
+ pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_DONE,
+ ++tups_done);
+ }
+}
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
new file mode 100644
index 0000000..5198728
--- /dev/null
+++ b/src/backend/access/hash/hashutil.c
@@ -0,0 +1,622 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashutil.c
+ * Utility code for Postgres hash implementation.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashutil.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/hash.h"
+#include "access/reloptions.h"
+#include "access/relscan.h"
+#include "port/pg_bitutils.h"
+#include "storage/buf_internals.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+
+#define CALC_NEW_BUCKET(old_bucket, lowmask) \
+ old_bucket | (lowmask + 1)
+
+/*
+ * _hash_checkqual -- does the index tuple satisfy the scan conditions?
+ */
+bool
+_hash_checkqual(IndexScanDesc scan, IndexTuple itup)
+{
+ /*
+ * Currently, we can't check any of the scan conditions since we do not
+ * have the original index entry value to supply to the sk_func. Always
+ * return true; we expect that hashgettuple already set the recheck flag
+ * to make the main indexscan code do it.
+ */
+#ifdef NOT_USED
+ TupleDesc tupdesc = RelationGetDescr(scan->indexRelation);
+ ScanKey key = scan->keyData;
+ int scanKeySize = scan->numberOfKeys;
+
+ while (scanKeySize > 0)
+ {
+ Datum datum;
+ bool isNull;
+ Datum test;
+
+ datum = index_getattr(itup,
+ key->sk_attno,
+ tupdesc,
+ &isNull);
+
+ /* assume sk_func is strict */
+ if (isNull)
+ return false;
+ if (key->sk_flags & SK_ISNULL)
+ return false;
+
+ test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
+ datum, key->sk_argument);
+
+ if (!DatumGetBool(test))
+ return false;
+
+ key++;
+ scanKeySize--;
+ }
+#endif
+
+ return true;
+}
+
+/*
+ * _hash_datum2hashkey -- given a Datum, call the index's hash function
+ *
+ * The Datum is assumed to be of the index's column type, so we can use the
+ * "primary" hash function that's tracked for us by the generic index code.
+ */
+uint32
+_hash_datum2hashkey(Relation rel, Datum key)
+{
+ FmgrInfo *procinfo;
+ Oid collation;
+
+ /* XXX assumes index has only one attribute */
+ procinfo = index_getprocinfo(rel, 1, HASHSTANDARD_PROC);
+ collation = rel->rd_indcollation[0];
+
+ return DatumGetUInt32(FunctionCall1Coll(procinfo, collation, key));
+}
+
+/*
+ * _hash_datum2hashkey_type -- given a Datum of a specified type,
+ * hash it in a fashion compatible with this index
+ *
+ * This is much more expensive than _hash_datum2hashkey, so use it only in
+ * cross-type situations.
+ */
+uint32
+_hash_datum2hashkey_type(Relation rel, Datum key, Oid keytype)
+{
+ RegProcedure hash_proc;
+ Oid collation;
+
+ /* XXX assumes index has only one attribute */
+ hash_proc = get_opfamily_proc(rel->rd_opfamily[0],
+ keytype,
+ keytype,
+ HASHSTANDARD_PROC);
+ if (!RegProcedureIsValid(hash_proc))
+ elog(ERROR, "missing support function %d(%u,%u) for index \"%s\"",
+ HASHSTANDARD_PROC, keytype, keytype,
+ RelationGetRelationName(rel));
+ collation = rel->rd_indcollation[0];
+
+ return DatumGetUInt32(OidFunctionCall1Coll(hash_proc, collation, key));
+}
+
+/*
+ * _hash_hashkey2bucket -- determine which bucket the hashkey maps to.
+ */
+Bucket
+_hash_hashkey2bucket(uint32 hashkey, uint32 maxbucket,
+ uint32 highmask, uint32 lowmask)
+{
+ Bucket bucket;
+
+ bucket = hashkey & highmask;
+ if (bucket > maxbucket)
+ bucket = bucket & lowmask;
+
+ return bucket;
+}
+
+/*
+ * _hash_spareindex -- returns spare index / global splitpoint phase of the
+ * bucket
+ */
+uint32
+_hash_spareindex(uint32 num_bucket)
+{
+ uint32 splitpoint_group;
+ uint32 splitpoint_phases;
+
+ splitpoint_group = pg_ceil_log2_32(num_bucket);
+
+ if (splitpoint_group < HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE)
+ return splitpoint_group;
+
+ /* account for single-phase groups */
+ splitpoint_phases = HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE;
+
+ /* account for multi-phase groups before splitpoint_group */
+ splitpoint_phases +=
+ ((splitpoint_group - HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE) <<
+ HASH_SPLITPOINT_PHASE_BITS);
+
+ /* account for phases within current group */
+ splitpoint_phases +=
+ (((num_bucket - 1) >>
+ (splitpoint_group - (HASH_SPLITPOINT_PHASE_BITS + 1))) &
+ HASH_SPLITPOINT_PHASE_MASK); /* to 0-based value. */
+
+ return splitpoint_phases;
+}
+
+/*
+ * _hash_get_totalbuckets -- returns total number of buckets allocated till
+ * the given splitpoint phase.
+ */
+uint32
+_hash_get_totalbuckets(uint32 splitpoint_phase)
+{
+ uint32 splitpoint_group;
+ uint32 total_buckets;
+ uint32 phases_within_splitpoint_group;
+
+ if (splitpoint_phase < HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE)
+ return (1 << splitpoint_phase);
+
+ /* get splitpoint's group */
+ splitpoint_group = HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE;
+ splitpoint_group +=
+ ((splitpoint_phase - HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE) >>
+ HASH_SPLITPOINT_PHASE_BITS);
+
+ /* account for buckets before splitpoint_group */
+ total_buckets = (1 << (splitpoint_group - 1));
+
+ /* account for buckets within splitpoint_group */
+ phases_within_splitpoint_group =
+ (((splitpoint_phase - HASH_SPLITPOINT_GROUPS_WITH_ONE_PHASE) &
+ HASH_SPLITPOINT_PHASE_MASK) + 1); /* from 0-based to 1-based */
+ total_buckets +=
+ (((1 << (splitpoint_group - 1)) >> HASH_SPLITPOINT_PHASE_BITS) *
+ phases_within_splitpoint_group);
+
+ return total_buckets;
+}
+
+/*
+ * _hash_checkpage -- sanity checks on the format of all hash pages
+ *
+ * If flags is not zero, it is a bitwise OR of the acceptable page types
+ * (values of hasho_flag & LH_PAGE_TYPE).
+ */
+void
+_hash_checkpage(Relation rel, Buffer buf, int flags)
+{
+ Page page = BufferGetPage(buf);
+
+ /*
+ * ReadBuffer verifies that every newly-read page passes
+ * PageHeaderIsValid, which means it either contains a reasonably sane
+ * page header or is all-zero. We have to defend against the all-zero
+ * case, however.
+ */
+ if (PageIsNew(page))
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index \"%s\" contains unexpected zero page at block %u",
+ RelationGetRelationName(rel),
+ BufferGetBlockNumber(buf)),
+ errhint("Please REINDEX it.")));
+
+ /*
+ * Additionally check that the special area looks sane.
+ */
+ if (PageGetSpecialSize(page) != MAXALIGN(sizeof(HashPageOpaqueData)))
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index \"%s\" contains corrupted page at block %u",
+ RelationGetRelationName(rel),
+ BufferGetBlockNumber(buf)),
+ errhint("Please REINDEX it.")));
+
+ if (flags)
+ {
+ HashPageOpaque opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+
+ if ((opaque->hasho_flag & flags) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index \"%s\" contains corrupted page at block %u",
+ RelationGetRelationName(rel),
+ BufferGetBlockNumber(buf)),
+ errhint("Please REINDEX it.")));
+ }
+
+ /*
+ * When checking the metapage, also verify magic number and version.
+ */
+ if (flags == LH_META_PAGE)
+ {
+ HashMetaPage metap = HashPageGetMeta(page);
+
+ if (metap->hashm_magic != HASH_MAGIC)
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index \"%s\" is not a hash index",
+ RelationGetRelationName(rel))));
+
+ if (metap->hashm_version != HASH_VERSION)
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index \"%s\" has wrong hash version",
+ RelationGetRelationName(rel)),
+ errhint("Please REINDEX it.")));
+ }
+}
+
+bytea *
+hashoptions(Datum reloptions, bool validate)
+{
+ static const relopt_parse_elt tab[] = {
+ {"fillfactor", RELOPT_TYPE_INT, offsetof(HashOptions, fillfactor)},
+ };
+
+ return (bytea *) build_reloptions(reloptions, validate,
+ RELOPT_KIND_HASH,
+ sizeof(HashOptions),
+ tab, lengthof(tab));
+}
+
+/*
+ * _hash_get_indextuple_hashkey - get the hash index tuple's hash key value
+ */
+uint32
+_hash_get_indextuple_hashkey(IndexTuple itup)
+{
+ char *attp;
+
+ /*
+ * We assume the hash key is the first attribute and can't be null, so
+ * this can be done crudely but very very cheaply ...
+ */
+ attp = (char *) itup + IndexInfoFindDataOffset(itup->t_info);
+ return *((uint32 *) attp);
+}
+
+/*
+ * _hash_convert_tuple - convert raw index data to hash key
+ *
+ * Inputs: values and isnull arrays for the user data column(s)
+ * Outputs: values and isnull arrays for the index tuple, suitable for
+ * passing to index_form_tuple().
+ *
+ * Returns true if successful, false if not (because there are null values).
+ * On a false result, the given data need not be indexed.
+ *
+ * Note: callers know that the index-column arrays are always of length 1.
+ * In principle, there could be more than one input column, though we do not
+ * currently support that.
+ */
+bool
+_hash_convert_tuple(Relation index,
+ Datum *user_values, bool *user_isnull,
+ Datum *index_values, bool *index_isnull)
+{
+ uint32 hashkey;
+
+ /*
+ * We do not insert null values into hash indexes. This is okay because
+ * the only supported search operator is '=', and we assume it is strict.
+ */
+ if (user_isnull[0])
+ return false;
+
+ hashkey = _hash_datum2hashkey(index, user_values[0]);
+ index_values[0] = UInt32GetDatum(hashkey);
+ index_isnull[0] = false;
+ return true;
+}
+
+/*
+ * _hash_binsearch - Return the offset number in the page where the
+ * specified hash value should be sought or inserted.
+ *
+ * We use binary search, relying on the assumption that the existing entries
+ * are ordered by hash key.
+ *
+ * Returns the offset of the first index entry having hashkey >= hash_value,
+ * or the page's max offset plus one if hash_value is greater than all
+ * existing hash keys in the page. This is the appropriate place to start
+ * a search, or to insert a new item.
+ */
+OffsetNumber
+_hash_binsearch(Page page, uint32 hash_value)
+{
+ OffsetNumber upper;
+ OffsetNumber lower;
+
+ /* Loop invariant: lower <= desired place <= upper */
+ upper = PageGetMaxOffsetNumber(page) + 1;
+ lower = FirstOffsetNumber;
+
+ while (upper > lower)
+ {
+ OffsetNumber off;
+ IndexTuple itup;
+ uint32 hashkey;
+
+ off = (upper + lower) / 2;
+ Assert(OffsetNumberIsValid(off));
+
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
+ hashkey = _hash_get_indextuple_hashkey(itup);
+ if (hashkey < hash_value)
+ lower = off + 1;
+ else
+ upper = off;
+ }
+
+ return lower;
+}
+
+/*
+ * _hash_binsearch_last
+ *
+ * Same as above, except that if there are multiple matching items in the
+ * page, we return the offset of the last one instead of the first one,
+ * and the possible range of outputs is 0..maxoffset not 1..maxoffset+1.
+ * This is handy for starting a new page in a backwards scan.
+ */
+OffsetNumber
+_hash_binsearch_last(Page page, uint32 hash_value)
+{
+ OffsetNumber upper;
+ OffsetNumber lower;
+
+ /* Loop invariant: lower <= desired place <= upper */
+ upper = PageGetMaxOffsetNumber(page);
+ lower = FirstOffsetNumber - 1;
+
+ while (upper > lower)
+ {
+ IndexTuple itup;
+ OffsetNumber off;
+ uint32 hashkey;
+
+ off = (upper + lower + 1) / 2;
+ Assert(OffsetNumberIsValid(off));
+
+ itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
+ hashkey = _hash_get_indextuple_hashkey(itup);
+ if (hashkey > hash_value)
+ upper = off - 1;
+ else
+ lower = off;
+ }
+
+ return lower;
+}
+
+/*
+ * _hash_get_oldblock_from_newbucket() -- get the block number of a bucket
+ * from which current (new) bucket is being split.
+ */
+BlockNumber
+_hash_get_oldblock_from_newbucket(Relation rel, Bucket new_bucket)
+{
+ Bucket old_bucket;
+ uint32 mask;
+ Buffer metabuf;
+ HashMetaPage metap;
+ BlockNumber blkno;
+
+ /*
+ * To get the old bucket from the current bucket, we need a mask to modulo
+ * into lower half of table. This mask is stored in meta page as
+ * hashm_lowmask, but here we can't rely on the same, because we need a
+ * value of lowmask that was prevalent at the time when bucket split was
+ * started. Masking the most significant bit of new bucket would give us
+ * old bucket.
+ */
+ mask = (((uint32) 1) << (fls(new_bucket) - 1)) - 1;
+ old_bucket = new_bucket & mask;
+
+ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+ blkno = BUCKET_TO_BLKNO(metap, old_bucket);
+
+ _hash_relbuf(rel, metabuf);
+
+ return blkno;
+}
+
+/*
+ * _hash_get_newblock_from_oldbucket() -- get the block number of a bucket
+ * that will be generated after split from old bucket.
+ *
+ * This is used to find the new bucket from old bucket based on current table
+ * half. It is mainly required to finish the incomplete splits where we are
+ * sure that not more than one bucket could have split in progress from old
+ * bucket.
+ */
+BlockNumber
+_hash_get_newblock_from_oldbucket(Relation rel, Bucket old_bucket)
+{
+ Bucket new_bucket;
+ Buffer metabuf;
+ HashMetaPage metap;
+ BlockNumber blkno;
+
+ metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
+ metap = HashPageGetMeta(BufferGetPage(metabuf));
+
+ new_bucket = _hash_get_newbucket_from_oldbucket(rel, old_bucket,
+ metap->hashm_lowmask,
+ metap->hashm_maxbucket);
+ blkno = BUCKET_TO_BLKNO(metap, new_bucket);
+
+ _hash_relbuf(rel, metabuf);
+
+ return blkno;
+}
+
+/*
+ * _hash_get_newbucket_from_oldbucket() -- get the new bucket that will be
+ * generated after split from current (old) bucket.
+ *
+ * This is used to find the new bucket from old bucket. New bucket can be
+ * obtained by OR'ing old bucket with most significant bit of current table
+ * half (lowmask passed in this function can be used to identify msb of
+ * current table half). There could be multiple buckets that could have
+ * been split from current bucket. We need the first such bucket that exists.
+ * Caller must ensure that no more than one split has happened from old
+ * bucket.
+ */
+Bucket
+_hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
+ uint32 lowmask, uint32 maxbucket)
+{
+ Bucket new_bucket;
+
+ new_bucket = CALC_NEW_BUCKET(old_bucket, lowmask);
+ if (new_bucket > maxbucket)
+ {
+ lowmask = lowmask >> 1;
+ new_bucket = CALC_NEW_BUCKET(old_bucket, lowmask);
+ }
+
+ return new_bucket;
+}
+
+/*
+ * _hash_kill_items - set LP_DEAD state for items an indexscan caller has
+ * told us were killed.
+ *
+ * scan->opaque, referenced locally through so, contains information about the
+ * current page and killed tuples thereon (generally, this should only be
+ * called if so->numKilled > 0).
+ *
+ * The caller does not have a lock on the page and may or may not have the
+ * page pinned in a buffer. Note that read-lock is sufficient for setting
+ * LP_DEAD status (which is only a hint).
+ *
+ * The caller must have pin on bucket buffer, but may or may not have pin
+ * on overflow buffer, as indicated by HashScanPosIsPinned(so->currPos).
+ *
+ * We match items by heap TID before assuming they are the right ones to
+ * delete.
+ *
+ * There are never any scans active in a bucket at the time VACUUM begins,
+ * because VACUUM takes a cleanup lock on the primary bucket page and scans
+ * hold a pin. A scan can begin after VACUUM leaves the primary bucket page
+ * but before it finishes the entire bucket, but it can never pass VACUUM,
+ * because VACUUM always locks the next page before releasing the lock on
+ * the previous one. Therefore, we don't have to worry about accidentally
+ * killing a TID that has been reused for an unrelated tuple.
+ */
+void
+_hash_kill_items(IndexScanDesc scan)
+{
+ HashScanOpaque so = (HashScanOpaque) scan->opaque;
+ Relation rel = scan->indexRelation;
+ BlockNumber blkno;
+ Buffer buf;
+ Page page;
+ HashPageOpaque opaque;
+ OffsetNumber offnum,
+ maxoff;
+ int numKilled = so->numKilled;
+ int i;
+ bool killedsomething = false;
+ bool havePin = false;
+
+ Assert(so->numKilled > 0);
+ Assert(so->killedItems != NULL);
+ Assert(HashScanPosIsValid(so->currPos));
+
+ /*
+ * Always reset the scan state, so we don't look for same items on other
+ * pages.
+ */
+ so->numKilled = 0;
+
+ blkno = so->currPos.currPage;
+ if (HashScanPosIsPinned(so->currPos))
+ {
+ /*
+ * We already have pin on this buffer, so, all we need to do is
+ * acquire lock on it.
+ */
+ havePin = true;
+ buf = so->currPos.buf;
+ LockBuffer(buf, BUFFER_LOCK_SHARE);
+ }
+ else
+ buf = _hash_getbuf(rel, blkno, HASH_READ, LH_OVERFLOW_PAGE);
+
+ page = BufferGetPage(buf);
+ opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ for (i = 0; i < numKilled; i++)
+ {
+ int itemIndex = so->killedItems[i];
+ HashScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+ offnum = currItem->indexOffset;
+
+ Assert(itemIndex >= so->currPos.firstItem &&
+ itemIndex <= so->currPos.lastItem);
+
+ while (offnum <= maxoff)
+ {
+ ItemId iid = PageGetItemId(page, offnum);
+ IndexTuple ituple = (IndexTuple) PageGetItem(page, iid);
+
+ if (ItemPointerEquals(&ituple->t_tid, &currItem->heapTid))
+ {
+ /* found the item */
+ ItemIdMarkDead(iid);
+ killedsomething = true;
+ break; /* out of inner search loop */
+ }
+ offnum = OffsetNumberNext(offnum);
+ }
+ }
+
+ /*
+ * Since this can be redone later if needed, mark as dirty hint. Whenever
+ * we mark anything LP_DEAD, we also set the page's
+ * LH_PAGE_HAS_DEAD_TUPLES flag, which is likewise just a hint.
+ */
+ if (killedsomething)
+ {
+ opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
+ MarkBufferDirtyHint(buf, true);
+ }
+
+ if (so->hashso_bucket_buf == so->currPos.buf ||
+ havePin)
+ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
+ else
+ _hash_relbuf(rel, buf);
+}
diff --git a/src/backend/access/hash/hashvalidate.c b/src/backend/access/hash/hashvalidate.c
new file mode 100644
index 0000000..1e343df
--- /dev/null
+++ b/src/backend/access/hash/hashvalidate.c
@@ -0,0 +1,439 @@
+/*-------------------------------------------------------------------------
+ *
+ * hashvalidate.c
+ * Opclass validator for hash.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/hash/hashvalidate.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/amvalidate.h"
+#include "access/hash.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/pg_am.h"
+#include "catalog/pg_amop.h"
+#include "catalog/pg_amproc.h"
+#include "catalog/pg_opclass.h"
+#include "catalog/pg_opfamily.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_type.h"
+#include "parser/parse_coerce.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
+#include "utils/regproc.h"
+#include "utils/syscache.h"
+
+
+static bool check_hash_func_signature(Oid funcid, int16 amprocnum, Oid argtype);
+
+
+/*
+ * Validator for a hash opclass.
+ *
+ * Some of the checks done here cover the whole opfamily, and therefore are
+ * redundant when checking each opclass in a family. But they don't run long
+ * enough to be much of a problem, so we accept the duplication rather than
+ * complicate the amvalidate API.
+ */
+bool
+hashvalidate(Oid opclassoid)
+{
+ bool result = true;
+ HeapTuple classtup;
+ Form_pg_opclass classform;
+ Oid opfamilyoid;
+ Oid opcintype;
+ char *opclassname;
+ HeapTuple familytup;
+ Form_pg_opfamily familyform;
+ char *opfamilyname;
+ CatCList *proclist,
+ *oprlist;
+ List *grouplist;
+ OpFamilyOpFuncGroup *opclassgroup;
+ List *hashabletypes = NIL;
+ int i;
+ ListCell *lc;
+
+ /* Fetch opclass information */
+ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
+ if (!HeapTupleIsValid(classtup))
+ elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
+ classform = (Form_pg_opclass) GETSTRUCT(classtup);
+
+ opfamilyoid = classform->opcfamily;
+ opcintype = classform->opcintype;
+ opclassname = NameStr(classform->opcname);
+
+ /* Fetch opfamily information */
+ familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
+ if (!HeapTupleIsValid(familytup))
+ elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
+ familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
+
+ opfamilyname = NameStr(familyform->opfname);
+
+ /* Fetch all operators and support functions of the opfamily */
+ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
+ proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
+
+ /* Check individual support functions */
+ for (i = 0; i < proclist->n_members; i++)
+ {
+ HeapTuple proctup = &proclist->members[i]->tuple;
+ Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
+
+ /*
+ * All hash functions should be registered with matching left/right
+ * types
+ */
+ if (procform->amproclefttype != procform->amprocrighttype)
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s contains support function %s with different left and right input types",
+ opfamilyname, "hash",
+ format_procedure(procform->amproc))));
+ result = false;
+ }
+
+ /* Check procedure numbers and function signatures */
+ switch (procform->amprocnum)
+ {
+ case HASHSTANDARD_PROC:
+ case HASHEXTENDED_PROC:
+ if (!check_hash_func_signature(procform->amproc, procform->amprocnum,
+ procform->amproclefttype))
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d",
+ opfamilyname, "hash",
+ format_procedure(procform->amproc),
+ procform->amprocnum)));
+ result = false;
+ }
+ else
+ {
+ /* Remember which types we can hash */
+ hashabletypes =
+ list_append_unique_oid(hashabletypes,
+ procform->amproclefttype);
+ }
+ break;
+ case HASHOPTIONS_PROC:
+ if (!check_amoptsproc_signature(procform->amproc))
+ result = false;
+ break;
+ default:
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d",
+ opfamilyname, "hash",
+ format_procedure(procform->amproc),
+ procform->amprocnum)));
+ result = false;
+ break;
+ }
+ }
+
+ /* Check individual operators */
+ for (i = 0; i < oprlist->n_members; i++)
+ {
+ HeapTuple oprtup = &oprlist->members[i]->tuple;
+ Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
+
+ /* Check that only allowed strategy numbers exist */
+ if (oprform->amopstrategy < 1 ||
+ oprform->amopstrategy > HTMaxStrategyNumber)
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d",
+ opfamilyname, "hash",
+ format_operator(oprform->amopopr),
+ oprform->amopstrategy)));
+ result = false;
+ }
+
+ /* hash doesn't support ORDER BY operators */
+ if (oprform->amoppurpose != AMOP_SEARCH ||
+ OidIsValid(oprform->amopsortfamily))
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
+ opfamilyname, "hash",
+ format_operator(oprform->amopopr))));
+ result = false;
+ }
+
+ /* Check operator signature --- same for all hash strategies */
+ if (!check_amop_signature(oprform->amopopr, BOOLOID,
+ oprform->amoplefttype,
+ oprform->amoprighttype))
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature",
+ opfamilyname, "hash",
+ format_operator(oprform->amopopr))));
+ result = false;
+ }
+
+ /* There should be relevant hash functions for each datatype */
+ if (!list_member_oid(hashabletypes, oprform->amoplefttype) ||
+ !list_member_oid(hashabletypes, oprform->amoprighttype))
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s lacks support function for operator %s",
+ opfamilyname, "hash",
+ format_operator(oprform->amopopr))));
+ result = false;
+ }
+ }
+
+ /* Now check for inconsistent groups of operators/functions */
+ grouplist = identify_opfamily_groups(oprlist, proclist);
+ opclassgroup = NULL;
+ foreach(lc, grouplist)
+ {
+ OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
+
+ /* Remember the group exactly matching the test opclass */
+ if (thisgroup->lefttype == opcintype &&
+ thisgroup->righttype == opcintype)
+ opclassgroup = thisgroup;
+
+ /*
+ * Complain if there seems to be an incomplete set of operators for
+ * this datatype pair (implying that we have a hash function but no
+ * operator).
+ */
+ if (thisgroup->operatorset != (1 << HTEqualStrategyNumber))
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s",
+ opfamilyname, "hash",
+ format_type_be(thisgroup->lefttype),
+ format_type_be(thisgroup->righttype))));
+ result = false;
+ }
+ }
+
+ /* Check that the originally-named opclass is supported */
+ /* (if group is there, we already checked it adequately above) */
+ if (!opclassgroup)
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator class \"%s\" of access method %s is missing operator(s)",
+ opclassname, "hash")));
+ result = false;
+ }
+
+ /*
+ * Complain if the opfamily doesn't have entries for all possible
+ * combinations of its supported datatypes. While missing cross-type
+ * operators are not fatal, it seems reasonable to insist that all
+ * built-in hash opfamilies be complete.
+ */
+ if (list_length(grouplist) !=
+ list_length(hashabletypes) * list_length(hashabletypes))
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("operator family \"%s\" of access method %s is missing cross-type operator(s)",
+ opfamilyname, "hash")));
+ result = false;
+ }
+
+ ReleaseCatCacheList(proclist);
+ ReleaseCatCacheList(oprlist);
+ ReleaseSysCache(familytup);
+ ReleaseSysCache(classtup);
+
+ return result;
+}
+
+
+/*
+ * We need a custom version of check_amproc_signature because of assorted
+ * hacks in the core hash opclass definitions.
+ */
+static bool
+check_hash_func_signature(Oid funcid, int16 amprocnum, Oid argtype)
+{
+ bool result = true;
+ Oid restype;
+ int16 nargs;
+ HeapTuple tp;
+ Form_pg_proc procform;
+
+ switch (amprocnum)
+ {
+ case HASHSTANDARD_PROC:
+ restype = INT4OID;
+ nargs = 1;
+ break;
+
+ case HASHEXTENDED_PROC:
+ restype = INT8OID;
+ nargs = 2;
+ break;
+
+ default:
+ elog(ERROR, "invalid amprocnum");
+ }
+
+ tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
+ if (!HeapTupleIsValid(tp))
+ elog(ERROR, "cache lookup failed for function %u", funcid);
+ procform = (Form_pg_proc) GETSTRUCT(tp);
+
+ if (procform->prorettype != restype || procform->proretset ||
+ procform->pronargs != nargs)
+ result = false;
+
+ if (!IsBinaryCoercible(argtype, procform->proargtypes.values[0]))
+ {
+ /*
+ * Some of the built-in hash opclasses cheat by using hash functions
+ * that are different from but physically compatible with the opclass
+ * datatype. In some of these cases, even a "binary coercible" check
+ * fails because there's no relevant cast. For the moment, fix it by
+ * having a list of allowed cases. Test the specific function
+ * identity, not just its input type, because hashvarlena() takes
+ * INTERNAL and allowing any such function seems too scary.
+ */
+ if ((funcid == F_HASHINT4 || funcid == F_HASHINT4EXTENDED) &&
+ (argtype == DATEOID ||
+ argtype == XIDOID || argtype == CIDOID))
+ /* okay, allowed use of hashint4() */ ;
+ else if ((funcid == F_HASHINT8 || funcid == F_HASHINT8EXTENDED) &&
+ (argtype == XID8OID))
+ /* okay, allowed use of hashint8() */ ;
+ else if ((funcid == F_TIMESTAMP_HASH ||
+ funcid == F_TIMESTAMP_HASH_EXTENDED) &&
+ argtype == TIMESTAMPTZOID)
+ /* okay, allowed use of timestamp_hash() */ ;
+ else if ((funcid == F_HASHCHAR || funcid == F_HASHCHAREXTENDED) &&
+ argtype == BOOLOID)
+ /* okay, allowed use of hashchar() */ ;
+ else if ((funcid == F_HASHVARLENA || funcid == F_HASHVARLENAEXTENDED) &&
+ argtype == BYTEAOID)
+ /* okay, allowed use of hashvarlena() */ ;
+ else
+ result = false;
+ }
+
+ /* If function takes a second argument, it must be for a 64-bit salt. */
+ if (nargs == 2 && procform->proargtypes.values[1] != INT8OID)
+ result = false;
+
+ ReleaseSysCache(tp);
+ return result;
+}
+
+/*
+ * Prechecking function for adding operators/functions to a hash opfamily.
+ */
+void
+hashadjustmembers(Oid opfamilyoid,
+ Oid opclassoid,
+ List *operators,
+ List *functions)
+{
+ Oid opcintype;
+ ListCell *lc;
+
+ /*
+ * Hash operators and required support functions are always "loose"
+ * members of the opfamily if they are cross-type. If they are not
+ * cross-type, we prefer to tie them to the appropriate opclass ... but if
+ * the user hasn't created one, we can't do that, and must fall back to
+ * using the opfamily dependency. (We mustn't force creation of an
+ * opclass in such a case, as leaving an incomplete opclass laying about
+ * would be bad. Throwing an error is another undesirable alternative.)
+ *
+ * This behavior results in a bit of a dump/reload hazard, in that the
+ * order of restoring objects could affect what dependencies we end up
+ * with. pg_dump's existing behavior will preserve the dependency choices
+ * in most cases, but not if a cross-type operator has been bound tightly
+ * into an opclass. That's a mistake anyway, so silently "fixing" it
+ * isn't awful.
+ *
+ * Optional support functions are always "loose" family members.
+ *
+ * To avoid repeated lookups, we remember the most recently used opclass's
+ * input type.
+ */
+ if (OidIsValid(opclassoid))
+ {
+ /* During CREATE OPERATOR CLASS, need CCI to see the pg_opclass row */
+ CommandCounterIncrement();
+ opcintype = get_opclass_input_type(opclassoid);
+ }
+ else
+ opcintype = InvalidOid;
+
+ /*
+ * We handle operators and support functions almost identically, so rather
+ * than duplicate this code block, just join the lists.
+ */
+ foreach(lc, list_concat_copy(operators, functions))
+ {
+ OpFamilyMember *op = (OpFamilyMember *) lfirst(lc);
+
+ if (op->is_func && op->number != HASHSTANDARD_PROC)
+ {
+ /* Optional support proc, so always a soft family dependency */
+ op->ref_is_hard = false;
+ op->ref_is_family = true;
+ op->refobjid = opfamilyoid;
+ }
+ else if (op->lefttype != op->righttype)
+ {
+ /* Cross-type, so always a soft family dependency */
+ op->ref_is_hard = false;
+ op->ref_is_family = true;
+ op->refobjid = opfamilyoid;
+ }
+ else
+ {
+ /* Not cross-type; is there a suitable opclass? */
+ if (op->lefttype != opcintype)
+ {
+ /* Avoid repeating this expensive lookup, even if it fails */
+ opcintype = op->lefttype;
+ opclassoid = opclass_for_family_datatype(HASH_AM_OID,
+ opfamilyoid,
+ opcintype);
+ }
+ if (OidIsValid(opclassoid))
+ {
+ /* Hard dependency on opclass */
+ op->ref_is_hard = true;
+ op->ref_is_family = false;
+ op->refobjid = opclassoid;
+ }
+ else
+ {
+ /* We're stuck, so make a soft dependency on the opfamily */
+ op->ref_is_hard = false;
+ op->ref_is_family = true;
+ op->refobjid = opfamilyoid;
+ }
+ }
+ }
+}