diff options
Diffstat (limited to 'src/backend/access/spgist')
-rw-r--r-- | src/backend/access/spgist/Makefile | 28 | ||||
-rw-r--r-- | src/backend/access/spgist/README | 389 | ||||
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 2354 | ||||
-rw-r--r-- | src/backend/access/spgist/spginsert.c | 243 | ||||
-rw-r--r-- | src/backend/access/spgist/spgkdtreeproc.c | 349 | ||||
-rw-r--r-- | src/backend/access/spgist/spgproc.c | 88 | ||||
-rw-r--r-- | src/backend/access/spgist/spgquadtreeproc.c | 471 | ||||
-rw-r--r-- | src/backend/access/spgist/spgscan.c | 1097 | ||||
-rw-r--r-- | src/backend/access/spgist/spgtextproc.c | 699 | ||||
-rw-r--r-- | src/backend/access/spgist/spgutils.c | 1350 | ||||
-rw-r--r-- | src/backend/access/spgist/spgvacuum.c | 975 | ||||
-rw-r--r-- | src/backend/access/spgist/spgvalidate.c | 392 | ||||
-rw-r--r-- | src/backend/access/spgist/spgxlog.c | 1013 |
13 files changed, 9448 insertions, 0 deletions
diff --git a/src/backend/access/spgist/Makefile b/src/backend/access/spgist/Makefile new file mode 100644 index 0000000..8ed3b4a --- /dev/null +++ b/src/backend/access/spgist/Makefile @@ -0,0 +1,28 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/spgist +# +# IDENTIFICATION +# src/backend/access/spgist/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/spgist +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + spgdoinsert.o \ + spginsert.o \ + spgkdtreeproc.o \ + spgproc.o \ + spgquadtreeproc.o \ + spgscan.o \ + spgtextproc.o \ + spgutils.o \ + spgvacuum.o \ + spgvalidate.o \ + spgxlog.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README new file mode 100644 index 0000000..7117e02 --- /dev/null +++ b/src/backend/access/spgist/README @@ -0,0 +1,389 @@ +src/backend/access/spgist/README + +SP-GiST is an abbreviation of space-partitioned GiST. It provides a +generalized infrastructure for implementing space-partitioned data +structures, such as quadtrees, k-d trees, and radix trees (tries). When +implemented in main memory, these structures are usually designed as a set of +dynamically-allocated nodes linked by pointers. This is not suitable for +direct storing on disk, since the chains of pointers can be rather long and +require too many disk accesses. In contrast, disk based data structures +should have a high fanout to minimize I/O. The challenge is to map tree +nodes to disk pages in such a way that the search algorithm accesses only a +few disk pages, even if it traverses many nodes. + + +COMMON STRUCTURE DESCRIPTION + +Logically, an SP-GiST tree is a set of tuples, each of which can be either +an inner or leaf tuple. Each inner tuple contains "nodes", which are +(label,pointer) pairs, where the pointer (ItemPointerData) is a pointer to +another inner tuple or to the head of a list of leaf tuples. Inner tuples +can have different numbers of nodes (children). Branches can be of different +depth (actually, there is no control or code to support balancing), which +means that the tree is non-balanced. However, leaf and inner tuples cannot +be intermixed at the same level: a downlink from a node of an inner tuple +leads either to one inner tuple, or to a list of leaf tuples. + +The SP-GiST core requires that inner and leaf tuples fit on a single index +page, and even more stringently that the list of leaf tuples reached from a +single inner-tuple node all be stored on the same index page. (Restricting +such lists to not cross pages reduces seeks, and allows the list links to be +stored as simple 2-byte OffsetNumbers.) SP-GiST index opclasses should +therefore ensure that not too many nodes can be needed in one inner tuple, +and that inner-tuple prefixes and leaf-node datum values not be too large. + +Inner and leaf tuples are stored separately: the former are stored only on +"inner" pages, the latter only on "leaf" pages. Also, there are special +restrictions on the root page. Early in an index's life, when there is only +one page's worth of data, the root page contains an unorganized set of leaf +tuples. After the first page split has occurred, the root is required to +contain exactly one inner tuple. + +When the search traversal algorithm reaches an inner tuple, it chooses a set +of nodes to continue tree traverse in depth. If it reaches a leaf page it +scans a list of leaf tuples to find the ones that match the query. SP-GiST +also supports ordered (nearest-neighbor) searches - that is during scan pending +nodes are put into priority queue, so traversal is performed by the +closest-first model. + + +The insertion algorithm descends the tree similarly, except it must choose +just one node to descend to from each inner tuple. Insertion might also have +to modify the inner tuple before it can descend: it could add a new node, or +it could "split" the tuple to obtain a less-specific prefix that can match +the value to be inserted. If it's necessary to append a new leaf tuple to a +list and there is no free space on page, then SP-GiST creates a new inner +tuple and distributes leaf tuples into a set of lists on, perhaps, several +pages. + +An inner tuple consists of: + + optional prefix value - all successors must be consistent with it. + Example: + radix tree - prefix value is a common prefix string + quad tree - centroid + k-d tree - one coordinate + + list of nodes, where node is a (label, pointer) pair. + Example of a label: a single character for radix tree + +A leaf tuple consists of: + + a leaf value + Example: + radix tree - the rest of string (postfix) + quad and k-d tree - the point itself + + ItemPointer to the corresponding heap tuple + nextOffset number of next leaf tuple in a chain on a leaf page + + optional nulls bitmask + optional INCLUDE-column values + +For compatibility with pre-v14 indexes, a leaf tuple has a nulls bitmask +only if there are null values (among the leaf value and the INCLUDE values) +*and* there is at least one INCLUDE column. The null-ness of the leaf +value can be inferred from whether the tuple is on a "nulls page" (see below) +so it is not necessary to represent it explicitly. But we include it anyway +in a bitmask used with INCLUDE values, so that standard tuple deconstruction +code can be used. + + +NULLS HANDLING + +We assume that SPGiST-indexable operators are strict (can never succeed for +null inputs). It is still desirable to index nulls, so that whole-table +indexscans are possible and so that "x IS NULL" can be implemented by an +SPGiST indexscan. However, we prefer that SPGiST index opclasses not have +to cope with nulls. Therefore, the main tree of an SPGiST index does not +include any null entries. We store null entries in a separate SPGiST tree +occupying a disjoint set of pages (in particular, its own root page). +Insertions and searches in the nulls tree do not use any of the +opclass-supplied functions, but just use hardwired logic comparable to +AllTheSame cases in the normal tree. + + +INSERTION ALGORITHM + +Insertion algorithm is designed to keep the tree in a consistent state at +any moment. Here is a simplified insertion algorithm specification +(numbers refer to notes below): + + Start with the first tuple on the root page (1) + + loop: + if (page is leaf) then + if (enough space) + insert on page and exit (5) + else (7) + call PickSplitFn() (2) + end if + else + switch (chooseFn()) + case MatchNode - descend through selected node + case AddNode - add node and then retry chooseFn (3, 6) + case SplitTuple - split inner tuple to prefix and postfix, then + retry chooseFn with the prefix tuple (4, 6) + end if + +Notes: + +(1) Initially, we just dump leaf tuples into the root page until it is full; +then we split it. Once the root is not a leaf page, it can have only one +inner tuple, so as to keep the amount of free space on the root as large as +possible. Both of these rules are meant to postpone doing PickSplit on the +root for as long as possible, so that the topmost partitioning of the search +space is as good as we can easily make it. + +(2) Current implementation allows to do picksplit and insert a new leaf tuple +in one operation, if the new list of leaf tuples fits on one page. It's +always possible for trees with small nodes like quad tree or k-d tree, but +radix trees may require another picksplit. + +(3) Addition of node must keep size of inner tuple small enough to fit on a +page. After addition, inner tuple could become too large to be stored on +current page because of other tuples on page. In this case it will be moved +to another inner page (see notes about page management). When moving tuple to +another page, we can't change the numbers of other tuples on the page, else +we'd make downlink pointers to them invalid. To prevent that, SP-GiST leaves +a "placeholder" tuple, which can be reused later whenever another tuple is +added to the page. See also Concurrency and Vacuum sections below. Right now +only radix trees could add a node to the tuple; quad trees and k-d trees +make all possible nodes at once in PickSplitFn() call. + +(4) Prefix value could only partially match a new value, so the SplitTuple +action allows breaking the current tree branch into upper and lower sections. +Another way to say it is that we can split the current inner tuple into +"prefix" and "postfix" parts, where the prefix part is able to match the +incoming new value. Consider example of insertion into a radix tree. We use +the following notation, where tuple's id is just for discussion (no such id +is actually stored): + +inner tuple: {tuple id}(prefix string)[ comma separated list of node labels ] +leaf tuple: {tuple id}<value> + +Suppose we need to insert string 'www.gogo.com' into inner tuple + + {1}(www.google.com/)[a, i] + +The string does not match the prefix so we cannot descend. We must +split the inner tuple into two tuples: + + {2}(www.go)[o] - prefix tuple + | + {3}(gle.com/)[a,i] - postfix tuple + +On the next iteration of loop we find that 'www.gogo.com' matches the +prefix, but not any node label, so we add a node [g] to tuple {2}: + + NIL (no child exists yet) + | + {2}(www.go)[o, g] + | + {3}(gle.com/)[a,i] + +Now we can descend through the [g] node, which will cause us to update +the target string to just 'o.com'. Finally, we'll insert a leaf tuple +bearing that string: + + {4}<o.com> + | + {2}(www.go)[o, g] + | + {3}(gle.com/)[a,i] + +As we can see, the original tuple's node array moves to postfix tuple without +any changes. Note also that SP-GiST core assumes that prefix tuple is not +larger than old inner tuple. That allows us to store prefix tuple directly +in place of old inner tuple. SP-GiST core will try to store postfix tuple on +the same page if possible, but will use another page if there is not enough +free space (see notes 5 and 6). Currently, quad and k-d trees don't use this +feature, because they have no concept of a prefix being "inconsistent" with +any new value. They grow their depth only by PickSplitFn() call. + +(5) If pointer from node of parent is a NIL pointer, algorithm chooses a leaf +page to store on. At first, it tries to use the last-used leaf page with the +largest free space (which we track in each backend) to better utilize disk +space. If that's not large enough, then the algorithm allocates a new page. + +(6) Management of inner pages is very similar to management of leaf pages, +described in (5). + +(7) Actually, current implementation can move the whole list of leaf tuples +and a new tuple to another page, if the list is short enough. This improves +space utilization, but doesn't change the basis of the algorithm. + + +CONCURRENCY + +While descending the tree, the insertion algorithm holds exclusive lock on +two tree levels at a time, ie both parent and child pages (but parent and +child pages can be the same, see notes above). There is a possibility of +deadlock between two insertions if there are cross-referenced pages in +different branches. That is, if inner tuple on page M has a child on page N +while an inner tuple from another branch is on page N and has a child on +page M, then two insertions descending the two branches could deadlock, +since they will each hold their parent page's lock while trying to get the +child page's lock. + +Currently, we deal with this by conditionally locking buffers as we descend +the tree. If we fail to get lock on a buffer, we release both buffers and +restart the insertion process. This is potentially inefficient, but the +locking costs of a more deterministic approach seem very high. + +To reduce the number of cases where that happens, we introduce a concept of +"triple parity" of pages: if inner tuple is on page with BlockNumber N, then +its child tuples should be placed on the same page, or else on a page with +BlockNumber M where (N+1) mod 3 == M mod 3. This rule ensures that tuples +on page M will have no children on page N, since (M+1) mod 3 != N mod 3. +That makes it unlikely that two insertion processes will conflict against +each other while descending the tree. It's not perfect though: in the first +place, we could still get a deadlock among three or more insertion processes, +and in the second place, it's impractical to preserve this invariant in every +case when we expand or split an inner tuple. So we still have to allow for +deadlocks. + +Insertion may also need to take locks on an additional inner and/or leaf page +to add tuples of the right type(s), when there's not enough room on the pages +it descended through. However, we don't care exactly which such page we add +to, so deadlocks can be avoided by conditionally locking the additional +buffers: if we fail to get lock on an additional page, just try another one. + +Search traversal algorithm is rather traditional. At each non-leaf level, it +share-locks the page, identifies which node(s) in the current inner tuple +need to be visited, and puts those addresses on a stack of pages to examine +later. It then releases lock on the current buffer before visiting the next +stack item. So only one page is locked at a time, and no deadlock is +possible. But instead, we have to worry about race conditions: by the time +we arrive at a pointed-to page, a concurrent insertion could have replaced +the target inner tuple (or leaf tuple chain) with data placed elsewhere. +To handle that, whenever the insertion algorithm changes a nonempty downlink +in an inner tuple, it places a "redirect tuple" in place of the lower-level +inner tuple or leaf-tuple chain head that the link formerly led to. Scans +(though not insertions) must be prepared to honor such redirects. Only a +scan that had already visited the parent level could possibly reach such a +redirect tuple, so we can remove redirects once all active transactions have +been flushed out of the system. + + +DEAD TUPLES + +Tuples on leaf pages can be in one of four states: + +SPGIST_LIVE: normal, live pointer to a heap tuple. + +SPGIST_REDIRECT: placeholder that contains a link to another place in the +index. When a chain of leaf tuples has to be moved to another page, a +redirect tuple is inserted in place of the chain's head tuple. The parent +inner tuple's downlink is updated when this happens, but concurrent scans +might be "in flight" from the parent page to the child page (since they +release lock on the parent page before attempting to lock the child). +The redirect pointer serves to tell such a scan where to go. A redirect +pointer is only needed for as long as such concurrent scans could be in +progress. Eventually, it's converted into a PLACEHOLDER dead tuple by +VACUUM, and is then a candidate for replacement. Searches that find such +a tuple (which should never be part of a chain) should immediately proceed +to the other place, forgetting about the redirect tuple. Insertions that +reach such a tuple should raise error, since a valid downlink should never +point to such a tuple. + +SPGIST_DEAD: tuple is dead, but it cannot be removed or moved to a +different offset on the page because there is a link leading to it from +some inner tuple elsewhere in the index. (Such a tuple is never part of a +chain, since we don't need one unless there is nothing live left in its +chain.) Searches should ignore such entries. If an insertion action +arrives at such a tuple, it should either replace it in-place (if there's +room on the page to hold the desired new leaf tuple) or replace it with a +redirection pointer to wherever it puts the new leaf tuple. + +SPGIST_PLACEHOLDER: tuple is dead, and there are known to be no links to +it from elsewhere. When a live tuple is deleted or moved away, and not +replaced by a redirect pointer, it is replaced by a placeholder to keep +the offsets of later tuples on the same page from changing. Placeholders +can be freely replaced when adding a new tuple to the page, and also +VACUUM will delete any that are at the end of the range of valid tuple +offsets. Both searches and insertions should complain if a link from +elsewhere leads them to a placeholder tuple. + +When the root page is also a leaf, all its tuple should be in LIVE state; +there's no need for the others since there are no links and no need to +preserve offset numbers. + +Tuples on inner pages can be in LIVE, REDIRECT, or PLACEHOLDER states. +The REDIRECT state has the same function as on leaf pages, to send +concurrent searches to the place where they need to go after an inner +tuple is moved to another page. Expired REDIRECT pointers are converted +to PLACEHOLDER status by VACUUM, and are then candidates for replacement. +DEAD state is not currently possible, since VACUUM does not attempt to +remove unused inner tuples. + + +VACUUM + +VACUUM (or more precisely, spgbulkdelete) performs a single sequential scan +over the entire index. On both leaf and inner pages, we can convert old +REDIRECT tuples into PLACEHOLDER status, and then remove any PLACEHOLDERs +that are at the end of the page (since they aren't needed to preserve the +offsets of any live tuples). On leaf pages, we scan for tuples that need +to be deleted because their heap TIDs match a vacuum target TID. + +If we find a deletable tuple that is not at the head of its chain, we +can simply replace it with a PLACEHOLDER, updating the chain links to +remove it from the chain. If it is at the head of its chain, but there's +at least one live tuple remaining in the chain, we move that live tuple +to the head tuple's offset, replacing it with a PLACEHOLDER to preserve +the offsets of other tuples. This keeps the parent inner tuple's downlink +valid. If we find ourselves deleting all live tuples in a chain, we +replace the head tuple with a DEAD tuple and the rest with PLACEHOLDERS. +The parent inner tuple's downlink thus points to the DEAD tuple, and the +rules explained in the previous section keep everything working. + +VACUUM doesn't know a-priori which tuples are heads of their chains, but +it can easily figure that out by constructing a predecessor array that's +the reverse map of the nextOffset links (ie, when we see tuple x links to +tuple y, we set predecessor[y] = x). Then head tuples are the ones with +no predecessor. + +Because insertions can occur while VACUUM runs, a pure sequential scan +could miss deleting some target leaf tuples, because they could get moved +from a not-yet-visited leaf page to an already-visited leaf page as a +consequence of a PickSplit or MoveLeafs operation. Failing to delete any +target TID is not acceptable, so we have to extend the algorithm to cope +with such cases. We recognize that such a move might have occurred when +we see a leaf-page REDIRECT tuple whose XID indicates it might have been +created after the VACUUM scan started. We add the redirection target TID +to a "pending list" of places we need to recheck. Between pages of the +main sequential scan, we empty the pending list by visiting each listed +TID. If it points to an inner tuple (from a PickSplit), add each downlink +TID to the pending list. If it points to a leaf page, vacuum that page. +(We could just vacuum the single pointed-to chain, but vacuuming the +whole page simplifies the code and reduces the odds of VACUUM having to +modify the same page multiple times.) To ensure that pending-list +processing can never get into an endless loop, even in the face of +concurrent index changes, we don't remove list entries immediately but +only after we've completed all pending-list processing; instead we just +mark items as done after processing them. Adding a TID that's already in +the list is a no-op, whether or not that item is marked done yet. + +spgbulkdelete also updates the index's free space map. + +Currently, spgvacuumcleanup has nothing to do if spgbulkdelete was +performed; otherwise, it does an spgbulkdelete scan with an empty target +list, so as to clean up redirections and placeholders, update the free +space map, and gather statistics. + + +LAST USED PAGE MANAGEMENT + +The list of last used pages contains four pages - a leaf page and three +inner pages, one from each "triple parity" group. (Actually, there's one +such list for the main tree and a separate one for the nulls tree.) This +list is stored between calls on the index meta page, but updates are never +WAL-logged to decrease WAL traffic. Incorrect data on meta page isn't +critical, because we could allocate a new page at any moment. + + +AUTHORS + + Teodor Sigaev <teodor@sigaev.ru> + Oleg Bartunov <oleg@sai.msu.su> diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c new file mode 100644 index 0000000..70557bc --- /dev/null +++ b/src/backend/access/spgist/spgdoinsert.c @@ -0,0 +1,2354 @@ +/*------------------------------------------------------------------------- + * + * spgdoinsert.c + * implementation of insert algorithm + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgdoinsert.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/spgist_private.h" +#include "access/spgxlog.h" +#include "access/xloginsert.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "utils/rel.h" + + +/* + * SPPageDesc tracks all info about a page we are inserting into. In some + * situations it actually identifies a tuple, or even a specific node within + * an inner tuple. But any of the fields can be invalid. If the buffer + * field is valid, it implies we hold pin and exclusive lock on that buffer. + * page pointer should be valid exactly when buffer is. + */ +typedef struct SPPageDesc +{ + BlockNumber blkno; /* block number, or InvalidBlockNumber */ + Buffer buffer; /* page's buffer number, or InvalidBuffer */ + Page page; /* pointer to page buffer, or NULL */ + OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */ + int node; /* node number within inner tuple, or -1 */ +} SPPageDesc; + + +/* + * Set the item pointer in the nodeN'th entry in inner tuple tup. This + * is used to update the parent inner tuple's downlink after a move or + * split operation. + */ +void +spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN, + BlockNumber blkno, OffsetNumber offset) +{ + int i; + SpGistNodeTuple node; + + SGITITERATE(tup, i, node) + { + if (i == nodeN) + { + ItemPointerSet(&node->t_tid, blkno, offset); + return; + } + } + + elog(ERROR, "failed to find requested node %d in SPGiST inner tuple", + nodeN); +} + +/* + * Form a new inner tuple containing one more node than the given one, with + * the specified label datum, inserted at offset "offset" in the node array. + * The new tuple's prefix is the same as the old one's. + * + * Note that the new node initially has an invalid downlink. We'll find a + * page to point it to later. + */ +static SpGistInnerTuple +addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset) +{ + SpGistNodeTuple node, + *nodes; + int i; + + /* if offset is negative, insert at end */ + if (offset < 0) + offset = tuple->nNodes; + else if (offset > tuple->nNodes) + elog(ERROR, "invalid offset for adding node to SPGiST inner tuple"); + + nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1)); + SGITITERATE(tuple, i, node) + { + if (i < offset) + nodes[i] = node; + else + nodes[i + 1] = node; + } + + nodes[offset] = spgFormNodeTuple(state, label, false); + + return spgFormInnerTuple(state, + (tuple->prefixSize > 0), + SGITDATUM(tuple, state), + tuple->nNodes + 1, + nodes); +} + +/* qsort comparator for sorting OffsetNumbers */ +static int +cmpOffsetNumbers(const void *a, const void *b) +{ + if (*(const OffsetNumber *) a == *(const OffsetNumber *) b) + return 0; + return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1; +} + +/* + * Delete multiple tuples from an index page, preserving tuple offset numbers. + * + * The first tuple in the given list is replaced with a dead tuple of type + * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced + * with dead tuples of type "reststate". If either firststate or reststate + * is REDIRECT, blkno/offnum specify where to link to. + * + * NB: this is used during WAL replay, so beware of trying to make it too + * smart. In particular, it shouldn't use "state" except for calling + * spgFormDeadTuple(). This is also used in a critical section, so no + * pallocs either! + */ +void +spgPageIndexMultiDelete(SpGistState *state, Page page, + OffsetNumber *itemnos, int nitems, + int firststate, int reststate, + BlockNumber blkno, OffsetNumber offnum) +{ + OffsetNumber firstItem; + OffsetNumber sortednos[MaxIndexTuplesPerPage]; + SpGistDeadTuple tuple = NULL; + int i; + + if (nitems == 0) + return; /* nothing to do */ + + /* + * For efficiency we want to use PageIndexMultiDelete, which requires the + * targets to be listed in sorted order, so we have to sort the itemnos + * array. (This also greatly simplifies the math for reinserting the + * replacement tuples.) However, we must not scribble on the caller's + * array, so we have to make a copy. + */ + memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems); + if (nitems > 1) + qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers); + + PageIndexMultiDelete(page, sortednos, nitems); + + firstItem = itemnos[0]; + + for (i = 0; i < nitems; i++) + { + OffsetNumber itemno = sortednos[i]; + int tupstate; + + tupstate = (itemno == firstItem) ? firststate : reststate; + if (tuple == NULL || tuple->tupstate != tupstate) + tuple = spgFormDeadTuple(state, tupstate, blkno, offnum); + + if (PageAddItem(page, (Item) tuple, tuple->size, + itemno, false, false) != itemno) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + tuple->size); + + if (tupstate == SPGIST_REDIRECT) + SpGistPageGetOpaque(page)->nRedirection++; + else if (tupstate == SPGIST_PLACEHOLDER) + SpGistPageGetOpaque(page)->nPlaceholder++; + } +} + +/* + * Update the parent inner tuple's downlink, and mark the parent buffer + * dirty (this must be the last change to the parent page in the current + * WAL action). + */ +static void +saveNodeLink(Relation index, SPPageDesc *parent, + BlockNumber blkno, OffsetNumber offnum) +{ + SpGistInnerTuple innerTuple; + + innerTuple = (SpGistInnerTuple) PageGetItem(parent->page, + PageGetItemId(parent->page, parent->offnum)); + + spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum); + + MarkBufferDirty(parent->buffer); +} + +/* + * Add a leaf tuple to a leaf page where there is known to be room for it + */ +static void +addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, + SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew) +{ + spgxlogAddLeaf xlrec; + + xlrec.newPage = isNew; + xlrec.storesNulls = isNulls; + + /* these will be filled below as needed */ + xlrec.offnumLeaf = InvalidOffsetNumber; + xlrec.offnumHeadLeaf = InvalidOffsetNumber; + xlrec.offnumParent = InvalidOffsetNumber; + xlrec.nodeI = 0; + + START_CRIT_SECTION(); + + if (current->offnum == InvalidOffsetNumber || + SpGistBlockIsRoot(current->blkno)) + { + /* Tuple is not part of a chain */ + SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber); + current->offnum = SpGistPageAddNewItem(state, current->page, + (Item) leafTuple, leafTuple->size, + NULL, false); + + xlrec.offnumLeaf = current->offnum; + + /* Must update parent's downlink if any */ + if (parent->buffer != InvalidBuffer) + { + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + + saveNodeLink(index, parent, current->blkno, current->offnum); + } + } + else + { + /* + * Tuple must be inserted into existing chain. We mustn't change the + * chain's head address, but we don't need to chase the entire chain + * to put the tuple at the end; we can insert it second. + * + * Also, it's possible that the "chain" consists only of a DEAD tuple, + * in which case we should replace the DEAD tuple in-place. + */ + SpGistLeafTuple head; + OffsetNumber offnum; + + head = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, current->offnum)); + if (head->tupstate == SPGIST_LIVE) + { + SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head)); + offnum = SpGistPageAddNewItem(state, current->page, + (Item) leafTuple, leafTuple->size, + NULL, false); + + /* + * re-get head of list because it could have been moved on page, + * and set new second element + */ + head = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, current->offnum)); + SGLT_SET_NEXTOFFSET(head, offnum); + + xlrec.offnumLeaf = offnum; + xlrec.offnumHeadLeaf = current->offnum; + } + else if (head->tupstate == SPGIST_DEAD) + { + SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber); + PageIndexTupleDelete(current->page, current->offnum); + if (PageAddItem(current->page, + (Item) leafTuple, leafTuple->size, + current->offnum, false, false) != current->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + leafTuple->size); + + /* WAL replay distinguishes this case by equal offnums */ + xlrec.offnumLeaf = current->offnum; + xlrec.offnumHeadLeaf = current->offnum; + } + else + elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate); + } + + MarkBufferDirty(current->buffer); + + if (RelationNeedsWAL(index) && !state->isBuild) + { + XLogRecPtr recptr; + int flags; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) leafTuple, leafTuple->size); + + flags = REGBUF_STANDARD; + if (xlrec.newPage) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(0, current->buffer, flags); + if (xlrec.offnumParent != InvalidOffsetNumber) + XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF); + + PageSetLSN(current->page, recptr); + + /* update parent only if we actually changed it */ + if (xlrec.offnumParent != InvalidOffsetNumber) + { + PageSetLSN(parent->page, recptr); + } + } + + END_CRIT_SECTION(); +} + +/* + * Count the number and total size of leaf tuples in the chain starting at + * current->offnum. Return number into *nToSplit and total size as function + * result. + * + * Klugy special case when considering the root page (i.e., root is a leaf + * page, but we're about to split for the first time): return fake large + * values to force spgdoinsert() to take the doPickSplit rather than + * moveLeafs code path. moveLeafs is not prepared to deal with root page. + */ +static int +checkSplitConditions(Relation index, SpGistState *state, + SPPageDesc *current, int *nToSplit) +{ + int i, + n = 0, + totalSize = 0; + + if (SpGistBlockIsRoot(current->blkno)) + { + /* return impossible values to force split */ + *nToSplit = BLCKSZ; + return BLCKSZ; + } + + i = current->offnum; + while (i != InvalidOffsetNumber) + { + SpGistLeafTuple it; + + Assert(i >= FirstOffsetNumber && + i <= PageGetMaxOffsetNumber(current->page)); + it = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, i)); + if (it->tupstate == SPGIST_LIVE) + { + n++; + totalSize += it->size + sizeof(ItemIdData); + } + else if (it->tupstate == SPGIST_DEAD) + { + /* We could see a DEAD tuple as first/only chain item */ + Assert(i == current->offnum); + Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber); + /* Don't count it in result, because it won't go to other page */ + } + else + elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); + + i = SGLT_GET_NEXTOFFSET(it); + } + + *nToSplit = n; + + return totalSize; +} + +/* + * current points to a leaf-tuple chain that we wanted to add newLeafTuple to, + * but the chain has to be moved because there's not enough room to add + * newLeafTuple to its page. We use this method when the chain contains + * very little data so a split would be inefficient. We are sure we can + * fit the chain plus newLeafTuple on one other page. + */ +static void +moveLeafs(Relation index, SpGistState *state, + SPPageDesc *current, SPPageDesc *parent, + SpGistLeafTuple newLeafTuple, bool isNulls) +{ + int i, + nDelete, + nInsert, + size; + Buffer nbuf; + Page npage; + SpGistLeafTuple it; + OffsetNumber r = InvalidOffsetNumber, + startOffset = InvalidOffsetNumber; + bool replaceDead = false; + OffsetNumber *toDelete; + OffsetNumber *toInsert; + BlockNumber nblkno; + spgxlogMoveLeafs xlrec; + char *leafdata, + *leafptr; + + /* This doesn't work on root page */ + Assert(parent->buffer != InvalidBuffer); + Assert(parent->buffer != current->buffer); + + /* Locate the tuples to be moved, and count up the space needed */ + i = PageGetMaxOffsetNumber(current->page); + toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i); + toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1)); + + size = newLeafTuple->size + sizeof(ItemIdData); + + nDelete = 0; + i = current->offnum; + while (i != InvalidOffsetNumber) + { + SpGistLeafTuple it; + + Assert(i >= FirstOffsetNumber && + i <= PageGetMaxOffsetNumber(current->page)); + it = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, i)); + + if (it->tupstate == SPGIST_LIVE) + { + toDelete[nDelete] = i; + size += it->size + sizeof(ItemIdData); + nDelete++; + } + else if (it->tupstate == SPGIST_DEAD) + { + /* We could see a DEAD tuple as first/only chain item */ + Assert(i == current->offnum); + Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber); + /* We don't want to move it, so don't count it in size */ + toDelete[nDelete] = i; + nDelete++; + replaceDead = true; + } + else + elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); + + i = SGLT_GET_NEXTOFFSET(it); + } + + /* Find a leaf page that will hold them */ + nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), + size, &xlrec.newPage); + npage = BufferGetPage(nbuf); + nblkno = BufferGetBlockNumber(nbuf); + Assert(nblkno != current->blkno); + + leafdata = leafptr = palloc(size); + + START_CRIT_SECTION(); + + /* copy all the old tuples to new page, unless they're dead */ + nInsert = 0; + if (!replaceDead) + { + for (i = 0; i < nDelete; i++) + { + it = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, toDelete[i])); + Assert(it->tupstate == SPGIST_LIVE); + + /* + * Update chain link (notice the chain order gets reversed, but we + * don't care). We're modifying the tuple on the source page + * here, but it's okay since we're about to delete it. + */ + SGLT_SET_NEXTOFFSET(it, r); + + r = SpGistPageAddNewItem(state, npage, (Item) it, it->size, + &startOffset, false); + + toInsert[nInsert] = r; + nInsert++; + + /* save modified tuple into leafdata as well */ + memcpy(leafptr, it, it->size); + leafptr += it->size; + } + } + + /* add the new tuple as well */ + SGLT_SET_NEXTOFFSET(newLeafTuple, r); + r = SpGistPageAddNewItem(state, npage, + (Item) newLeafTuple, newLeafTuple->size, + &startOffset, false); + toInsert[nInsert] = r; + nInsert++; + memcpy(leafptr, newLeafTuple, newLeafTuple->size); + leafptr += newLeafTuple->size; + + /* + * Now delete the old tuples, leaving a redirection pointer behind for the + * first one, unless we're doing an index build; in which case there can't + * be any concurrent scan so we need not provide a redirect. + */ + spgPageIndexMultiDelete(state, current->page, toDelete, nDelete, + state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + nblkno, r); + + /* Update parent's downlink and mark parent page dirty */ + saveNodeLink(index, parent, nblkno, r); + + /* Mark the leaf pages too */ + MarkBufferDirty(current->buffer); + MarkBufferDirty(nbuf); + + if (RelationNeedsWAL(index) && !state->isBuild) + { + XLogRecPtr recptr; + + /* prepare WAL info */ + STORE_STATE(state, xlrec.stateSrc); + + xlrec.nMoves = nDelete; + xlrec.replaceDead = replaceDead; + xlrec.storesNulls = isNulls; + + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs); + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * nInsert); + XLogRegisterData((char *) leafdata, leafptr - leafdata); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0)); + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS); + + PageSetLSN(current->page, recptr); + PageSetLSN(npage, recptr); + PageSetLSN(parent->page, recptr); + } + + END_CRIT_SECTION(); + + /* Update local free-space cache and release new buffer */ + SpGistSetLastUsedPage(index, nbuf); + UnlockReleaseBuffer(nbuf); +} + +/* + * Update previously-created redirection tuple with appropriate destination + * + * We use this when it's not convenient to know the destination first. + * The tuple should have been made with the "impossible" destination of + * the metapage. + */ +static void +setRedirectionTuple(SPPageDesc *current, OffsetNumber position, + BlockNumber blkno, OffsetNumber offnum) +{ + SpGistDeadTuple dt; + + dt = (SpGistDeadTuple) PageGetItem(current->page, + PageGetItemId(current->page, position)); + Assert(dt->tupstate == SPGIST_REDIRECT); + Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO); + ItemPointerSet(&dt->pointer, blkno, offnum); +} + +/* + * Test to see if the user-defined picksplit function failed to do its job, + * ie, it put all the leaf tuples into the same node. + * If so, randomly divide the tuples into several nodes (all with the same + * label) and return true to select allTheSame mode for this inner tuple. + * + * (This code is also used to forcibly select allTheSame mode for nulls.) + * + * If we know that the leaf tuples wouldn't all fit on one page, then we + * exclude the last tuple (which is the incoming new tuple that forced a split) + * from the check to see if more than one node is used. The reason for this + * is that if the existing tuples are put into only one chain, then even if + * we move them all to an empty page, there would still not be room for the + * new tuple, so we'd get into an infinite loop of picksplit attempts. + * Forcing allTheSame mode dodges this problem by ensuring the old tuples will + * be split across pages. (Exercise for the reader: figure out why this + * fixes the problem even when there is only one old tuple.) + */ +static bool +checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig, + bool *includeNew) +{ + int theNode; + int limit; + int i; + + /* For the moment, assume we can include the new leaf tuple */ + *includeNew = true; + + /* If there's only the new leaf tuple, don't select allTheSame mode */ + if (in->nTuples <= 1) + return false; + + /* If tuple set doesn't fit on one page, ignore the new tuple in test */ + limit = tooBig ? in->nTuples - 1 : in->nTuples; + + /* Check to see if more than one node is populated */ + theNode = out->mapTuplesToNodes[0]; + for (i = 1; i < limit; i++) + { + if (out->mapTuplesToNodes[i] != theNode) + return false; + } + + /* Nope, so override the picksplit function's decisions */ + + /* If the new tuple is in its own node, it can't be included in split */ + if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode) + *includeNew = false; + + out->nNodes = 8; /* arbitrary number of child nodes */ + + /* Random assignment of tuples to nodes (note we include new tuple) */ + for (i = 0; i < in->nTuples; i++) + out->mapTuplesToNodes[i] = i % out->nNodes; + + /* The opclass may not use node labels, but if it does, duplicate 'em */ + if (out->nodeLabels) + { + Datum theLabel = out->nodeLabels[theNode]; + + out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes); + for (i = 0; i < out->nNodes; i++) + out->nodeLabels[i] = theLabel; + } + + /* We don't touch the prefix or the leaf tuple datum assignments */ + + return true; +} + +/* + * current points to a leaf-tuple chain that we wanted to add newLeafTuple to, + * but the chain has to be split because there's not enough room to add + * newLeafTuple to its page. + * + * This function splits the leaf tuple set according to picksplit's rules, + * creating one or more new chains that are spread across the current page + * and an additional leaf page (we assume that two leaf pages will be + * sufficient). A new inner tuple is created, and the parent downlink + * pointer is updated to point to that inner tuple instead of the leaf chain. + * + * On exit, current contains the address of the new inner tuple. + * + * Returns true if we successfully inserted newLeafTuple during this function, + * false if caller still has to do it (meaning another picksplit operation is + * probably needed). Failure could occur if the picksplit result is fairly + * unbalanced, or if newLeafTuple is just plain too big to fit on a page. + * Because we force the picksplit result to be at least two chains, each + * cycle will get rid of at least one leaf tuple from the chain, so the loop + * will eventually terminate if lack of balance is the issue. If the tuple + * is too big, we assume that repeated picksplit operations will eventually + * make it small enough by repeated prefix-stripping. A broken opclass could + * make this an infinite loop, though, so spgdoinsert() checks that the + * leaf datums get smaller each time. + */ +static bool +doPickSplit(Relation index, SpGistState *state, + SPPageDesc *current, SPPageDesc *parent, + SpGistLeafTuple newLeafTuple, + int level, bool isNulls, bool isNew) +{ + bool insertedNew = false; + spgPickSplitIn in; + spgPickSplitOut out; + FmgrInfo *procinfo; + bool includeNew; + int i, + max, + n; + SpGistInnerTuple innerTuple; + SpGistNodeTuple node, + *nodes; + Buffer newInnerBuffer, + newLeafBuffer; + uint8 *leafPageSelect; + int *leafSizes; + OffsetNumber *toDelete; + OffsetNumber *toInsert; + OffsetNumber redirectTuplePos = InvalidOffsetNumber; + OffsetNumber startOffsets[2]; + SpGistLeafTuple *oldLeafs; + SpGistLeafTuple *newLeafs; + Datum leafDatums[INDEX_MAX_KEYS]; + bool leafIsnulls[INDEX_MAX_KEYS]; + int spaceToDelete; + int currentFreeSpace; + int totalLeafSizes; + bool allTheSame; + spgxlogPickSplit xlrec; + char *leafdata, + *leafptr; + SPPageDesc saveCurrent; + int nToDelete, + nToInsert, + maxToInclude; + + in.level = level; + + /* + * Allocate per-leaf-tuple work arrays with max possible size + */ + max = PageGetMaxOffsetNumber(current->page); + n = max + 1; + in.datums = (Datum *) palloc(sizeof(Datum) * n); + toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); + toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); + oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); + newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); + leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); + + STORE_STATE(state, xlrec.stateSrc); + + /* + * Form list of leaf tuples which will be distributed as split result; + * also, count up the amount of space that will be freed from current. + * (Note that in the non-root case, we won't actually delete the old + * tuples, only replace them with redirects or placeholders.) + */ + nToInsert = 0; + nToDelete = 0; + spaceToDelete = 0; + if (SpGistBlockIsRoot(current->blkno)) + { + /* + * We are splitting the root (which up to now is also a leaf page). + * Its tuples are not linked, so scan sequentially to get them all. We + * ignore the original value of current->offnum. + */ + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple it; + + it = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, i)); + if (it->tupstate == SPGIST_LIVE) + { + in.datums[nToInsert] = + isNulls ? (Datum) 0 : SGLTDATUM(it, state); + oldLeafs[nToInsert] = it; + nToInsert++; + toDelete[nToDelete] = i; + nToDelete++; + /* we will delete the tuple altogether, so count full space */ + spaceToDelete += it->size + sizeof(ItemIdData); + } + else /* tuples on root should be live */ + elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); + } + } + else + { + /* Normal case, just collect the leaf tuples in the chain */ + i = current->offnum; + while (i != InvalidOffsetNumber) + { + SpGistLeafTuple it; + + Assert(i >= FirstOffsetNumber && i <= max); + it = (SpGistLeafTuple) PageGetItem(current->page, + PageGetItemId(current->page, i)); + if (it->tupstate == SPGIST_LIVE) + { + in.datums[nToInsert] = + isNulls ? (Datum) 0 : SGLTDATUM(it, state); + oldLeafs[nToInsert] = it; + nToInsert++; + toDelete[nToDelete] = i; + nToDelete++; + /* we will not delete the tuple, only replace with dead */ + Assert(it->size >= SGDTSIZE); + spaceToDelete += it->size - SGDTSIZE; + } + else if (it->tupstate == SPGIST_DEAD) + { + /* We could see a DEAD tuple as first/only chain item */ + Assert(i == current->offnum); + Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber); + toDelete[nToDelete] = i; + nToDelete++; + /* replacing it with redirect will save no space */ + } + else + elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); + + i = SGLT_GET_NEXTOFFSET(it); + } + } + in.nTuples = nToInsert; + + /* + * We may not actually insert new tuple because another picksplit may be + * necessary due to too large value, but we will try to allocate enough + * space to include it; and in any case it has to be included in the input + * for the picksplit function. So don't increment nToInsert yet. + */ + in.datums[in.nTuples] = + isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state); + oldLeafs[in.nTuples] = newLeafTuple; + in.nTuples++; + + memset(&out, 0, sizeof(out)); + + if (!isNulls) + { + /* + * Perform split using user-defined method. + */ + procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); + + /* + * Form new leaf tuples and count up the total space needed. + */ + totalLeafSizes = 0; + for (i = 0; i < in.nTuples; i++) + { + if (state->leafTupDesc->natts > 1) + spgDeformLeafTuple(oldLeafs[i], + state->leafTupDesc, + leafDatums, + leafIsnulls, + isNulls); + + leafDatums[spgKeyColumn] = out.leafTupleDatums[i]; + leafIsnulls[spgKeyColumn] = false; + + newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr, + leafDatums, + leafIsnulls); + totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + } + } + else + { + /* + * Perform dummy split that puts all tuples into one node. + * checkAllTheSame will override this and force allTheSame mode. + */ + out.hasPrefix = false; + out.nNodes = 1; + out.nodeLabels = NULL; + out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples); + + /* + * Form new leaf tuples and count up the total space needed. + */ + totalLeafSizes = 0; + for (i = 0; i < in.nTuples; i++) + { + if (state->leafTupDesc->natts > 1) + spgDeformLeafTuple(oldLeafs[i], + state->leafTupDesc, + leafDatums, + leafIsnulls, + isNulls); + + /* + * Nulls tree can contain only null key values. + */ + leafDatums[spgKeyColumn] = (Datum) 0; + leafIsnulls[spgKeyColumn] = true; + + newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr, + leafDatums, + leafIsnulls); + totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); + } + } + + /* + * Check to see if the picksplit function failed to separate the values, + * ie, it put them all into the same child node. If so, select allTheSame + * mode and create a random split instead. See comments for + * checkAllTheSame as to why we need to know if the new leaf tuples could + * fit on one page. + */ + allTheSame = checkAllTheSame(&in, &out, + totalLeafSizes > SPGIST_PAGE_CAPACITY, + &includeNew); + + /* + * If checkAllTheSame decided we must exclude the new tuple, don't + * consider it any further. + */ + if (includeNew) + maxToInclude = in.nTuples; + else + { + maxToInclude = in.nTuples - 1; + totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData); + } + + /* + * Allocate per-node work arrays. Since checkAllTheSame could replace + * out.nNodes with a value larger than the number of tuples on the input + * page, we can't allocate these arrays before here. + */ + nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes); + leafSizes = (int *) palloc0(sizeof(int) * out.nNodes); + + /* + * Form nodes of inner tuple and inner tuple itself + */ + for (i = 0; i < out.nNodes; i++) + { + Datum label = (Datum) 0; + bool labelisnull = (out.nodeLabels == NULL); + + if (!labelisnull) + label = out.nodeLabels[i]; + nodes[i] = spgFormNodeTuple(state, label, labelisnull); + } + innerTuple = spgFormInnerTuple(state, + out.hasPrefix, out.prefixDatum, + out.nNodes, nodes); + innerTuple->allTheSame = allTheSame; + + /* + * Update nodes[] array to point into the newly formed innerTuple, so that + * we can adjust their downlinks below. + */ + SGITITERATE(innerTuple, i, node) + { + nodes[i] = node; + } + + /* + * Re-scan new leaf tuples and count up the space needed under each node. + */ + for (i = 0; i < maxToInclude; i++) + { + n = out.mapTuplesToNodes[i]; + if (n < 0 || n >= out.nNodes) + elog(ERROR, "inconsistent result of SPGiST picksplit function"); + leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData); + } + + /* + * To perform the split, we must insert a new inner tuple, which can't go + * on a leaf page; and unless we are splitting the root page, we must then + * update the parent tuple's downlink to point to the inner tuple. If + * there is room, we'll put the new inner tuple on the same page as the + * parent tuple, otherwise we need another non-leaf buffer. But if the + * parent page is the root, we can't add the new inner tuple there, + * because the root page must have only one inner tuple. + */ + xlrec.initInner = false; + if (parent->buffer != InvalidBuffer && + !SpGistBlockIsRoot(parent->blkno) && + (SpGistPageGetFreeSpace(parent->page, 1) >= + innerTuple->size + sizeof(ItemIdData))) + { + /* New inner tuple will fit on parent page */ + newInnerBuffer = parent->buffer; + } + else if (parent->buffer != InvalidBuffer) + { + /* Send tuple to page with next triple parity (see README) */ + newInnerBuffer = SpGistGetBuffer(index, + GBUF_INNER_PARITY(parent->blkno + 1) | + (isNulls ? GBUF_NULLS : 0), + innerTuple->size + sizeof(ItemIdData), + &xlrec.initInner); + } + else + { + /* Root page split ... inner tuple will go to root page */ + newInnerBuffer = InvalidBuffer; + } + + /* + * The new leaf tuples converted from the existing ones should require the + * same or less space, and therefore should all fit onto one page + * (although that's not necessarily the current page, since we can't + * delete the old tuples but only replace them with placeholders). + * However, the incoming new tuple might not also fit, in which case we + * might need another picksplit cycle to reduce it some more. + * + * If there's not room to put everything back onto the current page, then + * we decide on a per-node basis which tuples go to the new page. (We do + * it like that because leaf tuple chains can't cross pages, so we must + * place all leaf tuples belonging to the same parent node on the same + * page.) + * + * If we are splitting the root page (turning it from a leaf page into an + * inner page), then no leaf tuples can go back to the current page; they + * must all go somewhere else. + */ + if (!SpGistBlockIsRoot(current->blkno)) + currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete; + else + currentFreeSpace = 0; /* prevent assigning any tuples to current */ + + xlrec.initDest = false; + + if (totalLeafSizes <= currentFreeSpace) + { + /* All the leaf tuples will fit on current page */ + newLeafBuffer = InvalidBuffer; + /* mark new leaf tuple as included in insertions, if allowed */ + if (includeNew) + { + nToInsert++; + insertedNew = true; + } + for (i = 0; i < nToInsert; i++) + leafPageSelect[i] = 0; /* signifies current page */ + } + else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY) + { + /* + * We're trying to split up a long value by repeated suffixing, but + * it's not going to fit yet. Don't bother allocating a second leaf + * buffer that we won't be able to use. + */ + newLeafBuffer = InvalidBuffer; + Assert(includeNew); + Assert(nToInsert == 0); + } + else + { + /* We will need another leaf page */ + uint8 *nodePageSelect; + int curspace; + int newspace; + + newLeafBuffer = SpGistGetBuffer(index, + GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), + Min(totalLeafSizes, + SPGIST_PAGE_CAPACITY), + &xlrec.initDest); + + /* + * Attempt to assign node groups to the two pages. We might fail to + * do so, even if totalLeafSizes is less than the available space, + * because we can't split a group across pages. + */ + nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes); + + curspace = currentFreeSpace; + newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer)); + for (i = 0; i < out.nNodes; i++) + { + if (leafSizes[i] <= curspace) + { + nodePageSelect[i] = 0; /* signifies current page */ + curspace -= leafSizes[i]; + } + else + { + nodePageSelect[i] = 1; /* signifies new leaf page */ + newspace -= leafSizes[i]; + } + } + if (curspace >= 0 && newspace >= 0) + { + /* Successful assignment, so we can include the new leaf tuple */ + if (includeNew) + { + nToInsert++; + insertedNew = true; + } + } + else if (includeNew) + { + /* We must exclude the new leaf tuple from the split */ + int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1]; + + leafSizes[nodeOfNewTuple] -= + newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData); + + /* Repeat the node assignment process --- should succeed now */ + curspace = currentFreeSpace; + newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer)); + for (i = 0; i < out.nNodes; i++) + { + if (leafSizes[i] <= curspace) + { + nodePageSelect[i] = 0; /* signifies current page */ + curspace -= leafSizes[i]; + } + else + { + nodePageSelect[i] = 1; /* signifies new leaf page */ + newspace -= leafSizes[i]; + } + } + if (curspace < 0 || newspace < 0) + elog(ERROR, "failed to divide leaf tuple groups across pages"); + } + else + { + /* oops, we already excluded new tuple ... should not get here */ + elog(ERROR, "failed to divide leaf tuple groups across pages"); + } + /* Expand the per-node assignments to be shown per leaf tuple */ + for (i = 0; i < nToInsert; i++) + { + n = out.mapTuplesToNodes[i]; + leafPageSelect[i] = nodePageSelect[n]; + } + } + + /* Start preparing WAL record */ + xlrec.nDelete = 0; + xlrec.initSrc = isNew; + xlrec.storesNulls = isNulls; + xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno); + + leafdata = leafptr = (char *) palloc(totalLeafSizes); + + /* Here we begin making the changes to the target pages */ + START_CRIT_SECTION(); + + /* + * Delete old leaf tuples from current buffer, except when we're splitting + * the root; in that case there's no need because we'll re-init the page + * below. We do this first to make room for reinserting new leaf tuples. + */ + if (!SpGistBlockIsRoot(current->blkno)) + { + /* + * Init buffer instead of deleting individual tuples, but only if + * there aren't any other live tuples and only during build; otherwise + * we need to set a redirection tuple for concurrent scans. + */ + if (state->isBuild && + nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder == + PageGetMaxOffsetNumber(current->page)) + { + SpGistInitBuffer(current->buffer, + SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0)); + xlrec.initSrc = true; + } + else if (isNew) + { + /* don't expose the freshly init'd buffer as a backup block */ + Assert(nToDelete == 0); + } + else + { + xlrec.nDelete = nToDelete; + + if (!state->isBuild) + { + /* + * Need to create redirect tuple (it will point to new inner + * tuple) but right now the new tuple's location is not known + * yet. So, set the redirection pointer to "impossible" value + * and remember its position to update tuple later. + */ + if (nToDelete > 0) + redirectTuplePos = toDelete[0]; + spgPageIndexMultiDelete(state, current->page, + toDelete, nToDelete, + SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + SPGIST_METAPAGE_BLKNO, + FirstOffsetNumber); + } + else + { + /* + * During index build there is not concurrent searches, so we + * don't need to create redirection tuple. + */ + spgPageIndexMultiDelete(state, current->page, + toDelete, nToDelete, + SPGIST_PLACEHOLDER, + SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + } + } + } + + /* + * Put leaf tuples on proper pages, and update downlinks in innerTuple's + * nodes. + */ + startOffsets[0] = startOffsets[1] = InvalidOffsetNumber; + for (i = 0; i < nToInsert; i++) + { + SpGistLeafTuple it = newLeafs[i]; + Buffer leafBuffer; + BlockNumber leafBlock; + OffsetNumber newoffset; + + /* Which page is it going to? */ + leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer; + leafBlock = BufferGetBlockNumber(leafBuffer); + + /* Link tuple into correct chain for its node */ + n = out.mapTuplesToNodes[i]; + + if (ItemPointerIsValid(&nodes[n]->t_tid)) + { + Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock); + SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid)); + } + else + SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber); + + /* Insert it on page */ + newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer), + (Item) it, it->size, + &startOffsets[leafPageSelect[i]], + false); + toInsert[i] = newoffset; + + /* ... and complete the chain linking */ + ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset); + + /* Also copy leaf tuple into WAL data */ + memcpy(leafptr, newLeafs[i], newLeafs[i]->size); + leafptr += newLeafs[i]->size; + } + + /* + * We're done modifying the other leaf buffer (if any), so mark it dirty. + * current->buffer will be marked below, after we're entirely done + * modifying it. + */ + if (newLeafBuffer != InvalidBuffer) + { + MarkBufferDirty(newLeafBuffer); + } + + /* Remember current buffer, since we're about to change "current" */ + saveCurrent = *current; + + /* + * Store the new innerTuple + */ + if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer) + { + /* + * new inner tuple goes to parent page + */ + Assert(current->buffer != parent->buffer); + + /* Repoint "current" at the new inner tuple */ + current->blkno = parent->blkno; + current->buffer = parent->buffer; + current->page = parent->page; + xlrec.offnumInner = current->offnum = + SpGistPageAddNewItem(state, current->page, + (Item) innerTuple, innerTuple->size, + NULL, false); + + /* + * Update parent node link and mark parent page dirty + */ + xlrec.innerIsParent = true; + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + saveNodeLink(index, parent, current->blkno, current->offnum); + + /* + * Update redirection link (in old current buffer) + */ + if (redirectTuplePos != InvalidOffsetNumber) + setRedirectionTuple(&saveCurrent, redirectTuplePos, + current->blkno, current->offnum); + + /* Done modifying old current buffer, mark it dirty */ + MarkBufferDirty(saveCurrent.buffer); + } + else if (parent->buffer != InvalidBuffer) + { + /* + * new inner tuple will be stored on a new page + */ + Assert(newInnerBuffer != InvalidBuffer); + + /* Repoint "current" at the new inner tuple */ + current->buffer = newInnerBuffer; + current->blkno = BufferGetBlockNumber(current->buffer); + current->page = BufferGetPage(current->buffer); + xlrec.offnumInner = current->offnum = + SpGistPageAddNewItem(state, current->page, + (Item) innerTuple, innerTuple->size, + NULL, false); + + /* Done modifying new current buffer, mark it dirty */ + MarkBufferDirty(current->buffer); + + /* + * Update parent node link and mark parent page dirty + */ + xlrec.innerIsParent = (parent->buffer == current->buffer); + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + saveNodeLink(index, parent, current->blkno, current->offnum); + + /* + * Update redirection link (in old current buffer) + */ + if (redirectTuplePos != InvalidOffsetNumber) + setRedirectionTuple(&saveCurrent, redirectTuplePos, + current->blkno, current->offnum); + + /* Done modifying old current buffer, mark it dirty */ + MarkBufferDirty(saveCurrent.buffer); + } + else + { + /* + * Splitting root page, which was a leaf but now becomes inner page + * (and so "current" continues to point at it) + */ + Assert(SpGistBlockIsRoot(current->blkno)); + Assert(redirectTuplePos == InvalidOffsetNumber); + + SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); + xlrec.initInner = true; + xlrec.innerIsParent = false; + + xlrec.offnumInner = current->offnum = + PageAddItem(current->page, (Item) innerTuple, innerTuple->size, + InvalidOffsetNumber, false, false); + if (current->offnum != FirstOffsetNumber) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + innerTuple->size); + + /* No parent link to update, nor redirection to do */ + xlrec.offnumParent = InvalidOffsetNumber; + xlrec.nodeI = 0; + + /* Done modifying new current buffer, mark it dirty */ + MarkBufferDirty(current->buffer); + + /* saveCurrent doesn't represent a different buffer */ + saveCurrent.buffer = InvalidBuffer; + } + + if (RelationNeedsWAL(index) && !state->isBuild) + { + XLogRecPtr recptr; + int flags; + + XLogBeginInsert(); + + xlrec.nInsert = nToInsert; + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit); + + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * xlrec.nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * xlrec.nInsert); + XLogRegisterData((char *) leafPageSelect, + sizeof(uint8) * xlrec.nInsert); + XLogRegisterData((char *) innerTuple, innerTuple->size); + XLogRegisterData(leafdata, leafptr - leafdata); + + /* Old leaf page */ + if (BufferIsValid(saveCurrent.buffer)) + { + flags = REGBUF_STANDARD; + if (xlrec.initSrc) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(0, saveCurrent.buffer, flags); + } + + /* New leaf page */ + if (BufferIsValid(newLeafBuffer)) + { + flags = REGBUF_STANDARD; + if (xlrec.initDest) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newLeafBuffer, flags); + } + + /* Inner page */ + flags = REGBUF_STANDARD; + if (xlrec.initInner) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(2, current->buffer, flags); + + /* Parent page, if different from inner page */ + if (parent->buffer != InvalidBuffer) + { + if (parent->buffer != current->buffer) + XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD); + else + Assert(xlrec.innerIsParent); + } + + /* Issue the WAL record */ + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT); + + /* Update page LSNs on all affected pages */ + if (newLeafBuffer != InvalidBuffer) + { + Page page = BufferGetPage(newLeafBuffer); + + PageSetLSN(page, recptr); + } + + if (saveCurrent.buffer != InvalidBuffer) + { + Page page = BufferGetPage(saveCurrent.buffer); + + PageSetLSN(page, recptr); + } + + PageSetLSN(current->page, recptr); + + if (parent->buffer != InvalidBuffer) + { + PageSetLSN(parent->page, recptr); + } + } + + END_CRIT_SECTION(); + + /* Update local free-space cache and unlock buffers */ + if (newLeafBuffer != InvalidBuffer) + { + SpGistSetLastUsedPage(index, newLeafBuffer); + UnlockReleaseBuffer(newLeafBuffer); + } + if (saveCurrent.buffer != InvalidBuffer) + { + SpGistSetLastUsedPage(index, saveCurrent.buffer); + UnlockReleaseBuffer(saveCurrent.buffer); + } + + return insertedNew; +} + +/* + * spgMatchNode action: descend to N'th child node of current inner tuple + */ +static void +spgMatchNodeAction(Relation index, SpGistState *state, + SpGistInnerTuple innerTuple, + SPPageDesc *current, SPPageDesc *parent, int nodeN) +{ + int i; + SpGistNodeTuple node; + + /* Release previous parent buffer if any */ + if (parent->buffer != InvalidBuffer && + parent->buffer != current->buffer) + { + SpGistSetLastUsedPage(index, parent->buffer); + UnlockReleaseBuffer(parent->buffer); + } + + /* Repoint parent to specified node of current inner tuple */ + parent->blkno = current->blkno; + parent->buffer = current->buffer; + parent->page = current->page; + parent->offnum = current->offnum; + parent->node = nodeN; + + /* Locate that node */ + SGITITERATE(innerTuple, i, node) + { + if (i == nodeN) + break; + } + + if (i != nodeN) + elog(ERROR, "failed to find requested node %d in SPGiST inner tuple", + nodeN); + + /* Point current to the downlink location, if any */ + if (ItemPointerIsValid(&node->t_tid)) + { + current->blkno = ItemPointerGetBlockNumber(&node->t_tid); + current->offnum = ItemPointerGetOffsetNumber(&node->t_tid); + } + else + { + /* Downlink is empty, so we'll need to find a new page */ + current->blkno = InvalidBlockNumber; + current->offnum = InvalidOffsetNumber; + } + + current->buffer = InvalidBuffer; + current->page = NULL; +} + +/* + * spgAddNode action: add a node to the inner tuple at current + */ +static void +spgAddNodeAction(Relation index, SpGistState *state, + SpGistInnerTuple innerTuple, + SPPageDesc *current, SPPageDesc *parent, + int nodeN, Datum nodeLabel) +{ + SpGistInnerTuple newInnerTuple; + spgxlogAddNode xlrec; + + /* Should not be applied to nulls */ + Assert(!SpGistPageStoresNulls(current->page)); + + /* Construct new inner tuple with additional node */ + newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); + + /* Prepare WAL record */ + STORE_STATE(state, xlrec.stateSrc); + xlrec.offnum = current->offnum; + + /* we don't fill these unless we need to change the parent downlink */ + xlrec.parentBlk = -1; + xlrec.offnumParent = InvalidOffsetNumber; + xlrec.nodeI = 0; + + /* we don't fill these unless tuple has to be moved */ + xlrec.offnumNew = InvalidOffsetNumber; + xlrec.newPage = false; + + if (PageGetExactFreeSpace(current->page) >= + newInnerTuple->size - innerTuple->size) + { + /* + * We can replace the inner tuple by new version in-place + */ + START_CRIT_SECTION(); + + PageIndexTupleDelete(current->page, current->offnum); + if (PageAddItem(current->page, + (Item) newInnerTuple, newInnerTuple->size, + current->offnum, false, false) != current->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + newInnerTuple->size); + + MarkBufferDirty(current->buffer); + + if (RelationNeedsWAL(index) && !state->isBuild) + { + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); + + PageSetLSN(current->page, recptr); + } + + END_CRIT_SECTION(); + } + else + { + /* + * move inner tuple to another page, and update parent + */ + SpGistDeadTuple dt; + SPPageDesc saveCurrent; + + /* + * It should not be possible to get here for the root page, since we + * allow only one inner tuple on the root page, and spgFormInnerTuple + * always checks that inner tuples don't exceed the size of a page. + */ + if (SpGistBlockIsRoot(current->blkno)) + elog(ERROR, "cannot enlarge root tuple any more"); + Assert(parent->buffer != InvalidBuffer); + + saveCurrent = *current; + + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + + /* + * obtain new buffer with the same parity as current, since it will be + * a child of same parent tuple + */ + current->buffer = SpGistGetBuffer(index, + GBUF_INNER_PARITY(current->blkno), + newInnerTuple->size + sizeof(ItemIdData), + &xlrec.newPage); + current->blkno = BufferGetBlockNumber(current->buffer); + current->page = BufferGetPage(current->buffer); + + /* + * Let's just make real sure new current isn't same as old. Right now + * that's impossible, but if SpGistGetBuffer ever got smart enough to + * delete placeholder tuples before checking space, maybe it wouldn't + * be impossible. The case would appear to work except that WAL + * replay would be subtly wrong, so I think a mere assert isn't enough + * here. + */ + if (current->blkno == saveCurrent.blkno) + elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer"); + + /* + * New current and parent buffer will both be modified; but note that + * parent buffer could be same as either new or old current. + */ + if (parent->buffer == saveCurrent.buffer) + xlrec.parentBlk = 0; + else if (parent->buffer == current->buffer) + xlrec.parentBlk = 1; + else + xlrec.parentBlk = 2; + + START_CRIT_SECTION(); + + /* insert new ... */ + xlrec.offnumNew = current->offnum = + SpGistPageAddNewItem(state, current->page, + (Item) newInnerTuple, newInnerTuple->size, + NULL, false); + + MarkBufferDirty(current->buffer); + + /* update parent's downlink and mark parent page dirty */ + saveNodeLink(index, parent, current->blkno, current->offnum); + + /* + * Replace old tuple with a placeholder or redirection tuple. Unless + * doing an index build, we have to insert a redirection tuple for + * possible concurrent scans. We can't just delete it in any case, + * because that could change the offsets of other tuples on the page, + * breaking downlinks from their parents. + */ + if (state->isBuild) + dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER, + InvalidBlockNumber, InvalidOffsetNumber); + else + dt = spgFormDeadTuple(state, SPGIST_REDIRECT, + current->blkno, current->offnum); + + PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum); + if (PageAddItem(saveCurrent.page, (Item) dt, dt->size, + saveCurrent.offnum, + false, false) != saveCurrent.offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + dt->size); + + if (state->isBuild) + SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++; + else + SpGistPageGetOpaque(saveCurrent.page)->nRedirection++; + + MarkBufferDirty(saveCurrent.buffer); + + if (RelationNeedsWAL(index) && !state->isBuild) + { + XLogRecPtr recptr; + int flags; + + XLogBeginInsert(); + + /* orig page */ + XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD); + /* new page */ + flags = REGBUF_STANDARD; + if (xlrec.newPage) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, current->buffer, flags); + /* parent page (if different from orig and new) */ + if (xlrec.parentBlk == 2) + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); + + /* we don't bother to check if any of these are redundant */ + PageSetLSN(current->page, recptr); + PageSetLSN(parent->page, recptr); + PageSetLSN(saveCurrent.page, recptr); + } + + END_CRIT_SECTION(); + + /* Release saveCurrent if it's not same as current or parent */ + if (saveCurrent.buffer != current->buffer && + saveCurrent.buffer != parent->buffer) + { + SpGistSetLastUsedPage(index, saveCurrent.buffer); + UnlockReleaseBuffer(saveCurrent.buffer); + } + } +} + +/* + * spgSplitNode action: split inner tuple at current into prefix and postfix + */ +static void +spgSplitNodeAction(Relation index, SpGistState *state, + SpGistInnerTuple innerTuple, + SPPageDesc *current, spgChooseOut *out) +{ + SpGistInnerTuple prefixTuple, + postfixTuple; + SpGistNodeTuple node, + *nodes; + BlockNumber postfixBlkno; + OffsetNumber postfixOffset; + int i; + spgxlogSplitTuple xlrec; + Buffer newBuffer = InvalidBuffer; + + /* Should not be applied to nulls */ + Assert(!SpGistPageStoresNulls(current->page)); + + /* Check opclass gave us sane values */ + if (out->result.splitTuple.prefixNNodes <= 0 || + out->result.splitTuple.prefixNNodes > SGITMAXNNODES) + elog(ERROR, "invalid number of prefix nodes: %d", + out->result.splitTuple.prefixNNodes); + if (out->result.splitTuple.childNodeN < 0 || + out->result.splitTuple.childNodeN >= + out->result.splitTuple.prefixNNodes) + elog(ERROR, "invalid child node number: %d", + out->result.splitTuple.childNodeN); + + /* + * Construct new prefix tuple with requested number of nodes. We'll fill + * in the childNodeN'th node's downlink below. + */ + nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * + out->result.splitTuple.prefixNNodes); + + for (i = 0; i < out->result.splitTuple.prefixNNodes; i++) + { + Datum label = (Datum) 0; + bool labelisnull; + + labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL); + if (!labelisnull) + label = out->result.splitTuple.prefixNodeLabels[i]; + nodes[i] = spgFormNodeTuple(state, label, labelisnull); + } + + prefixTuple = spgFormInnerTuple(state, + out->result.splitTuple.prefixHasPrefix, + out->result.splitTuple.prefixPrefixDatum, + out->result.splitTuple.prefixNNodes, + nodes); + + /* it must fit in the space that innerTuple now occupies */ + if (prefixTuple->size > innerTuple->size) + elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix"); + + /* + * Construct new postfix tuple, containing all nodes of innerTuple with + * same node datums, but with the prefix specified by the picksplit + * function. + */ + nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes); + SGITITERATE(innerTuple, i, node) + { + nodes[i] = node; + } + + postfixTuple = spgFormInnerTuple(state, + out->result.splitTuple.postfixHasPrefix, + out->result.splitTuple.postfixPrefixDatum, + innerTuple->nNodes, nodes); + + /* Postfix tuple is allTheSame if original tuple was */ + postfixTuple->allTheSame = innerTuple->allTheSame; + + /* prep data for WAL record */ + xlrec.newPage = false; + + /* + * If we can't fit both tuples on the current page, get a new page for the + * postfix tuple. In particular, can't split to the root page. + * + * For the space calculation, note that prefixTuple replaces innerTuple + * but postfixTuple will be a new entry. + */ + if (SpGistBlockIsRoot(current->blkno) || + SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size < + prefixTuple->size + postfixTuple->size + sizeof(ItemIdData)) + { + /* + * Choose page with next triple parity, because postfix tuple is a + * child of prefix one + */ + newBuffer = SpGistGetBuffer(index, + GBUF_INNER_PARITY(current->blkno + 1), + postfixTuple->size + sizeof(ItemIdData), + &xlrec.newPage); + } + + START_CRIT_SECTION(); + + /* + * Replace old tuple by prefix tuple + */ + PageIndexTupleDelete(current->page, current->offnum); + xlrec.offnumPrefix = PageAddItem(current->page, + (Item) prefixTuple, prefixTuple->size, + current->offnum, false, false); + if (xlrec.offnumPrefix != current->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + prefixTuple->size); + + /* + * put postfix tuple into appropriate page + */ + if (newBuffer == InvalidBuffer) + { + postfixBlkno = current->blkno; + xlrec.offnumPostfix = postfixOffset = + SpGistPageAddNewItem(state, current->page, + (Item) postfixTuple, postfixTuple->size, + NULL, false); + xlrec.postfixBlkSame = true; + } + else + { + postfixBlkno = BufferGetBlockNumber(newBuffer); + xlrec.offnumPostfix = postfixOffset = + SpGistPageAddNewItem(state, BufferGetPage(newBuffer), + (Item) postfixTuple, postfixTuple->size, + NULL, false); + MarkBufferDirty(newBuffer); + xlrec.postfixBlkSame = false; + } + + /* + * And set downlink pointer in the prefix tuple to point to postfix tuple. + * (We can't avoid this step by doing the above two steps in opposite + * order, because there might not be enough space on the page to insert + * the postfix tuple first.) We have to update the local copy of the + * prefixTuple too, because that's what will be written to WAL. + */ + spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN, + postfixBlkno, postfixOffset); + prefixTuple = (SpGistInnerTuple) PageGetItem(current->page, + PageGetItemId(current->page, current->offnum)); + spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN, + postfixBlkno, postfixOffset); + + MarkBufferDirty(current->buffer); + + if (RelationNeedsWAL(index) && !state->isBuild) + { + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) prefixTuple, prefixTuple->size); + XLogRegisterData((char *) postfixTuple, postfixTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (newBuffer != InvalidBuffer) + { + int flags; + + flags = REGBUF_STANDARD; + if (xlrec.newPage) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newBuffer, flags); + } + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE); + + PageSetLSN(current->page, recptr); + + if (newBuffer != InvalidBuffer) + { + PageSetLSN(BufferGetPage(newBuffer), recptr); + } + } + + END_CRIT_SECTION(); + + /* Update local free-space cache and release buffer */ + if (newBuffer != InvalidBuffer) + { + SpGistSetLastUsedPage(index, newBuffer); + UnlockReleaseBuffer(newBuffer); + } +} + +/* + * Insert one item into the index. + * + * Returns true on success, false if we failed to complete the insertion + * (typically because of conflict with a concurrent insert). In the latter + * case, caller should re-call spgdoinsert() with the same args. + */ +bool +spgdoinsert(Relation index, SpGistState *state, + ItemPointer heapPtr, Datum *datums, bool *isnulls) +{ + bool result = true; + TupleDesc leafDescriptor = state->leafTupDesc; + bool isnull = isnulls[spgKeyColumn]; + int level = 0; + Datum leafDatums[INDEX_MAX_KEYS]; + int leafSize; + int bestLeafSize; + int numNoProgressCycles = 0; + SPPageDesc current, + parent; + FmgrInfo *procinfo = NULL; + + /* + * Look up FmgrInfo of the user-defined choose function once, to save + * cycles in the loop below. + */ + if (!isnull) + procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); + + /* + * Prepare the leaf datum to insert. + * + * If an optional "compress" method is provided, then call it to form the + * leaf key datum from the input datum. Otherwise, store the input datum + * as is. Since we don't use index_form_tuple in this AM, we have to make + * sure value to be inserted is not toasted; FormIndexDatum doesn't + * guarantee that. But we assume the "compress" method to return an + * untoasted value. + */ + if (!isnull) + { + if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC))) + { + FmgrInfo *compressProcinfo = NULL; + + compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC); + leafDatums[spgKeyColumn] = + FunctionCall1Coll(compressProcinfo, + index->rd_indcollation[spgKeyColumn], + datums[spgKeyColumn]); + } + else + { + Assert(state->attLeafType.type == state->attType.type); + + if (state->attType.attlen == -1) + leafDatums[spgKeyColumn] = + PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn])); + else + leafDatums[spgKeyColumn] = datums[spgKeyColumn]; + } + } + else + leafDatums[spgKeyColumn] = (Datum) 0; + + /* Likewise, ensure that any INCLUDE values are not toasted */ + for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++) + { + if (!isnulls[i]) + { + if (TupleDescAttr(leafDescriptor, i)->attlen == -1) + leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i])); + else + leafDatums[i] = datums[i]; + } + else + leafDatums[i] = (Datum) 0; + } + + /* + * Compute space needed for a leaf tuple containing the given data. + */ + leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls); + /* Account for an item pointer, too */ + leafSize += sizeof(ItemIdData); + + /* + * If it isn't gonna fit, and the opclass can't reduce the datum size by + * suffixing, bail out now rather than doing a lot of useless work. + */ + if (leafSize > SPGIST_PAGE_CAPACITY && + (isnull || !state->config.longValuesOK)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + leafSize - sizeof(ItemIdData), + SPGIST_PAGE_CAPACITY - sizeof(ItemIdData), + RelationGetRelationName(index)), + errhint("Values larger than a buffer page cannot be indexed."))); + bestLeafSize = leafSize; + + /* Initialize "current" to the appropriate root page */ + current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO; + current.buffer = InvalidBuffer; + current.page = NULL; + current.offnum = FirstOffsetNumber; + current.node = -1; + + /* "parent" is invalid for the moment */ + parent.blkno = InvalidBlockNumber; + parent.buffer = InvalidBuffer; + parent.page = NULL; + parent.offnum = InvalidOffsetNumber; + parent.node = -1; + + /* + * Before entering the loop, try to clear any pending interrupt condition. + * If a query cancel is pending, we might as well accept it now not later; + * while if a non-canceling condition is pending, servicing it here avoids + * having to restart the insertion and redo all the work so far. + */ + CHECK_FOR_INTERRUPTS(); + + for (;;) + { + bool isNew = false; + + /* + * Bail out if query cancel is pending. We must have this somewhere + * in the loop since a broken opclass could produce an infinite + * picksplit loop. However, because we'll be holding buffer lock(s) + * after the first iteration, ProcessInterrupts() wouldn't be able to + * throw a cancel error here. Hence, if we see that an interrupt is + * pending, break out of the loop and deal with the situation below. + * Set result = false because we must restart the insertion if the + * interrupt isn't a query-cancel-or-die case. + */ + if (INTERRUPTS_PENDING_CONDITION()) + { + result = false; + break; + } + + if (current.blkno == InvalidBlockNumber) + { + /* + * Create a leaf page. If leafSize is too large to fit on a page, + * we won't actually use the page yet, but it simplifies the API + * for doPickSplit to always have a leaf page at hand; so just + * quietly limit our request to a page size. + */ + current.buffer = + SpGistGetBuffer(index, + GBUF_LEAF | (isnull ? GBUF_NULLS : 0), + Min(leafSize, SPGIST_PAGE_CAPACITY), + &isNew); + current.blkno = BufferGetBlockNumber(current.buffer); + } + else if (parent.buffer == InvalidBuffer) + { + /* we hold no parent-page lock, so no deadlock is possible */ + current.buffer = ReadBuffer(index, current.blkno); + LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE); + } + else if (current.blkno != parent.blkno) + { + /* descend to a new child page */ + current.buffer = ReadBuffer(index, current.blkno); + + /* + * Attempt to acquire lock on child page. We must beware of + * deadlock against another insertion process descending from that + * page to our parent page (see README). If we fail to get lock, + * abandon the insertion and tell our caller to start over. + * + * XXX this could be improved, because failing to get lock on a + * buffer is not proof of a deadlock situation; the lock might be + * held by a reader, or even just background writer/checkpointer + * process. Perhaps it'd be worth retrying after sleeping a bit? + */ + if (!ConditionalLockBuffer(current.buffer)) + { + ReleaseBuffer(current.buffer); + UnlockReleaseBuffer(parent.buffer); + return false; + } + } + else + { + /* inner tuple can be stored on the same page as parent one */ + current.buffer = parent.buffer; + } + current.page = BufferGetPage(current.buffer); + + /* should not arrive at a page of the wrong type */ + if (isnull ? !SpGistPageStoresNulls(current.page) : + SpGistPageStoresNulls(current.page)) + elog(ERROR, "SPGiST index page %u has wrong nulls flag", + current.blkno); + + if (SpGistPageIsLeaf(current.page)) + { + SpGistLeafTuple leafTuple; + int nToSplit, + sizeToSplit; + + leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls); + if (leafTuple->size + sizeof(ItemIdData) <= + SpGistPageGetFreeSpace(current.page, 1)) + { + /* it fits on page, so insert it and we're done */ + addLeafTuple(index, state, leafTuple, + ¤t, &parent, isnull, isNew); + break; + } + else if ((sizeToSplit = + checkSplitConditions(index, state, ¤t, + &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 && + nToSplit < 64 && + leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY) + { + /* + * the amount of data is pretty small, so just move the whole + * chain to another leaf page rather than splitting it. + */ + Assert(!isNew); + moveLeafs(index, state, ¤t, &parent, leafTuple, isnull); + break; /* we're done */ + } + else + { + /* picksplit */ + if (doPickSplit(index, state, ¤t, &parent, + leafTuple, level, isnull, isNew)) + break; /* doPickSplit installed new tuples */ + + /* leaf tuple will not be inserted yet */ + pfree(leafTuple); + + /* + * current now describes new inner tuple, go insert into it + */ + Assert(!SpGistPageIsLeaf(current.page)); + goto process_inner_tuple; + } + } + else /* non-leaf page */ + { + /* + * Apply the opclass choose function to figure out how to insert + * the given datum into the current inner tuple. + */ + SpGistInnerTuple innerTuple; + spgChooseIn in; + spgChooseOut out; + + /* + * spgAddNode and spgSplitTuple cases will loop back to here to + * complete the insertion operation. Just in case the choose + * function is broken and produces add or split requests + * repeatedly, check for query cancel (see comments above). + */ + process_inner_tuple: + if (INTERRUPTS_PENDING_CONDITION()) + { + result = false; + break; + } + + innerTuple = (SpGistInnerTuple) PageGetItem(current.page, + PageGetItemId(current.page, current.offnum)); + + in.datum = datums[spgKeyColumn]; + in.leafDatum = leafDatums[spgKeyColumn]; + in.level = level; + in.allTheSame = innerTuple->allTheSame; + in.hasPrefix = (innerTuple->prefixSize > 0); + in.prefixDatum = SGITDATUM(innerTuple, state); + in.nNodes = innerTuple->nNodes; + in.nodeLabels = spgExtractNodeLabels(state, innerTuple); + + memset(&out, 0, sizeof(out)); + + if (!isnull) + { + /* use user-defined choose method */ + FunctionCall2Coll(procinfo, + index->rd_indcollation[0], + PointerGetDatum(&in), + PointerGetDatum(&out)); + } + else + { + /* force "match" action (to insert to random subnode) */ + out.resultType = spgMatchNode; + } + + if (innerTuple->allTheSame) + { + /* + * It's not allowed to do an AddNode at an allTheSame tuple. + * Opclass must say "match", in which case we choose a random + * one of the nodes to descend into, or "split". + */ + if (out.resultType == spgAddNode) + elog(ERROR, "cannot add a node to an allTheSame inner tuple"); + else if (out.resultType == spgMatchNode) + out.result.matchNode.nodeN = random() % innerTuple->nNodes; + } + + switch (out.resultType) + { + case spgMatchNode: + /* Descend to N'th child node */ + spgMatchNodeAction(index, state, innerTuple, + ¤t, &parent, + out.result.matchNode.nodeN); + /* Adjust level as per opclass request */ + level += out.result.matchNode.levelAdd; + /* Replace leafDatum and recompute leafSize */ + if (!isnull) + { + leafDatums[spgKeyColumn] = out.result.matchNode.restDatum; + leafSize = SpGistGetLeafTupleSize(leafDescriptor, + leafDatums, isnulls); + leafSize += sizeof(ItemIdData); + } + + /* + * Check new tuple size; fail if it can't fit, unless the + * opclass says it can handle the situation by suffixing. + * + * However, the opclass can only shorten the leaf datum, + * which may not be enough to ever make the tuple fit, + * since INCLUDE columns might alone use more than a page. + * Depending on the opclass' behavior, that could lead to + * an infinite loop --- spgtextproc.c, for example, will + * just repeatedly generate an empty-string leaf datum + * once it runs out of data. Actual bugs in opclasses + * might cause infinite looping, too. To detect such a + * loop, check to see if we are making progress by + * reducing the leafSize in each pass. This is a bit + * tricky though. Because of alignment considerations, + * the total tuple size might not decrease on every pass. + * Also, there are edge cases where the choose method + * might seem to not make progress for a cycle or two. + * Somewhat arbitrarily, we allow up to 10 no-progress + * iterations before failing. (This limit should be more + * than MAXALIGN, to accommodate opclasses that trim one + * byte from the leaf datum per pass.) + */ + if (leafSize > SPGIST_PAGE_CAPACITY) + { + bool ok = false; + + if (state->config.longValuesOK && !isnull) + { + if (leafSize < bestLeafSize) + { + ok = true; + bestLeafSize = leafSize; + numNoProgressCycles = 0; + } + else if (++numNoProgressCycles < 10) + ok = true; + } + if (!ok) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + leafSize - sizeof(ItemIdData), + SPGIST_PAGE_CAPACITY - sizeof(ItemIdData), + RelationGetRelationName(index)), + errhint("Values larger than a buffer page cannot be indexed."))); + } + + /* + * Loop around and attempt to insert the new leafDatum at + * "current" (which might reference an existing child + * tuple, or might be invalid to force us to find a new + * page for the tuple). + */ + break; + case spgAddNode: + /* AddNode is not sensible if nodes don't have labels */ + if (in.nodeLabels == NULL) + elog(ERROR, "cannot add a node to an inner tuple without node labels"); + /* Add node to inner tuple, per request */ + spgAddNodeAction(index, state, innerTuple, + ¤t, &parent, + out.result.addNode.nodeN, + out.result.addNode.nodeLabel); + + /* + * Retry insertion into the enlarged node. We assume that + * we'll get a MatchNode result this time. + */ + goto process_inner_tuple; + break; + case spgSplitTuple: + /* Split inner tuple, per request */ + spgSplitNodeAction(index, state, innerTuple, + ¤t, &out); + + /* Retry insertion into the split node */ + goto process_inner_tuple; + break; + default: + elog(ERROR, "unrecognized SPGiST choose result: %d", + (int) out.resultType); + break; + } + } + } /* end loop */ + + /* + * Release any buffers we're still holding. Beware of possibility that + * current and parent reference same buffer. + */ + if (current.buffer != InvalidBuffer) + { + SpGistSetLastUsedPage(index, current.buffer); + UnlockReleaseBuffer(current.buffer); + } + if (parent.buffer != InvalidBuffer && + parent.buffer != current.buffer) + { + SpGistSetLastUsedPage(index, parent.buffer); + UnlockReleaseBuffer(parent.buffer); + } + + /* + * We do not support being called while some outer function is holding a + * buffer lock (or any other reason to postpone query cancels). If that + * were the case, telling the caller to retry would create an infinite + * loop. + */ + Assert(INTERRUPTS_CAN_BE_PROCESSED()); + + /* + * Finally, check for interrupts again. If there was a query cancel, + * ProcessInterrupts() will be able to throw the error here. If it was + * some other kind of interrupt that can just be cleared, return false to + * tell our caller to retry. + */ + CHECK_FOR_INTERRUPTS(); + + return result; +} diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c new file mode 100644 index 0000000..1af0af7 --- /dev/null +++ b/src/backend/access/spgist/spginsert.c @@ -0,0 +1,243 @@ +/*------------------------------------------------------------------------- + * + * spginsert.c + * Externally visible index creation/insertion routines + * + * All the actual insertion logic is in spgdoinsert.c. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spginsert.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/spgist_private.h" +#include "access/spgxlog.h" +#include "access/tableam.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "catalog/index.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/smgr.h" +#include "utils/memutils.h" +#include "utils/rel.h" + + +typedef struct +{ + SpGistState spgstate; /* SPGiST's working state */ + int64 indtuples; /* total number of tuples indexed */ + MemoryContext tmpCtx; /* per-tuple temporary context */ +} SpGistBuildState; + + +/* Callback to process one heap tuple during table_index_build_scan */ +static void +spgistBuildCallback(Relation index, ItemPointer tid, Datum *values, + bool *isnull, bool tupleIsAlive, void *state) +{ + SpGistBuildState *buildstate = (SpGistBuildState *) state; + MemoryContext oldCtx; + + /* Work in temp context, and reset it after each tuple */ + oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx); + + /* + * Even though no concurrent insertions can be happening, we still might + * get a buffer-locking failure due to bgwriter or checkpointer taking a + * lock on some buffer. So we need to be willing to retry. We can flush + * any temp data when retrying. + */ + while (!spgdoinsert(index, &buildstate->spgstate, tid, + values, isnull)) + { + MemoryContextReset(buildstate->tmpCtx); + } + + /* Update total tuple count */ + buildstate->indtuples += 1; + + MemoryContextSwitchTo(oldCtx); + MemoryContextReset(buildstate->tmpCtx); +} + +/* + * Build an SP-GiST index. + */ +IndexBuildResult * +spgbuild(Relation heap, Relation index, IndexInfo *indexInfo) +{ + IndexBuildResult *result; + double reltuples; + SpGistBuildState buildstate; + Buffer metabuffer, + rootbuffer, + nullbuffer; + + if (RelationGetNumberOfBlocks(index) != 0) + elog(ERROR, "index \"%s\" already contains data", + RelationGetRelationName(index)); + + /* + * Initialize the meta page and root pages + */ + metabuffer = SpGistNewBuffer(index); + rootbuffer = SpGistNewBuffer(index); + nullbuffer = SpGistNewBuffer(index); + + Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO); + Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO); + Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO); + + START_CRIT_SECTION(); + + SpGistInitMetapage(BufferGetPage(metabuffer)); + MarkBufferDirty(metabuffer); + SpGistInitBuffer(rootbuffer, SPGIST_LEAF); + MarkBufferDirty(rootbuffer); + SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS); + MarkBufferDirty(nullbuffer); + + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(metabuffer); + UnlockReleaseBuffer(rootbuffer); + UnlockReleaseBuffer(nullbuffer); + + /* + * Now insert all the heap data into the index + */ + initSpGistState(&buildstate.spgstate, index); + buildstate.spgstate.isBuild = true; + buildstate.indtuples = 0; + + buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext, + "SP-GiST build temporary context", + ALLOCSET_DEFAULT_SIZES); + + reltuples = table_index_build_scan(heap, index, indexInfo, true, true, + spgistBuildCallback, (void *) &buildstate, + NULL); + + MemoryContextDelete(buildstate.tmpCtx); + + SpGistUpdateMetaPage(index); + + /* + * We didn't write WAL records as we built the index, so if WAL-logging is + * required, write all pages to the WAL now. + */ + if (RelationNeedsWAL(index)) + { + log_newpage_range(index, MAIN_FORKNUM, + 0, RelationGetNumberOfBlocks(index), + true); + } + + result = (IndexBuildResult *) palloc0(sizeof(IndexBuildResult)); + result->heap_tuples = reltuples; + result->index_tuples = buildstate.indtuples; + + return result; +} + +/* + * Build an empty SPGiST index in the initialization fork + */ +void +spgbuildempty(Relation index) +{ + Page page; + + /* Construct metapage. */ + page = (Page) palloc(BLCKSZ); + SpGistInitMetapage(page); + + /* + * Write the page and log it unconditionally. This is important + * particularly for indexes created on tablespaces and databases whose + * creation happened after the last redo pointer as recovery removes any + * of their existing content when the corresponding create records are + * replayed. + */ + PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO); + smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, + (char *) page, true); + log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, + SPGIST_METAPAGE_BLKNO, page, true); + + /* Likewise for the root page. */ + SpGistInitPage(page, SPGIST_LEAF); + + PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO); + smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_ROOT_BLKNO, + (char *) page, true); + log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, + SPGIST_ROOT_BLKNO, page, true); + + /* Likewise for the null-tuples root page. */ + SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS); + + PageSetChecksumInplace(page, SPGIST_NULL_BLKNO); + smgrwrite(index->rd_smgr, INIT_FORKNUM, SPGIST_NULL_BLKNO, + (char *) page, true); + log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM, + SPGIST_NULL_BLKNO, page, true); + + /* + * An immediate sync is required even if we xlog'd the pages, because the + * writes did not go through shared buffers and therefore a concurrent + * checkpoint may have moved the redo pointer past our xlog record. + */ + smgrimmedsync(index->rd_smgr, INIT_FORKNUM); +} + +/* + * Insert one new tuple into an SPGiST index. + */ +bool +spginsert(Relation index, Datum *values, bool *isnull, + ItemPointer ht_ctid, Relation heapRel, + IndexUniqueCheck checkUnique, + bool indexUnchanged, + IndexInfo *indexInfo) +{ + SpGistState spgstate; + MemoryContext oldCtx; + MemoryContext insertCtx; + + insertCtx = AllocSetContextCreate(CurrentMemoryContext, + "SP-GiST insert temporary context", + ALLOCSET_DEFAULT_SIZES); + oldCtx = MemoryContextSwitchTo(insertCtx); + + initSpGistState(&spgstate, index); + + /* + * We might have to repeat spgdoinsert() multiple times, if conflicts + * occur with concurrent insertions. If so, reset the insertCtx each time + * to avoid cumulative memory consumption. That means we also have to + * redo initSpGistState(), but it's cheap enough not to matter. + */ + while (!spgdoinsert(index, &spgstate, ht_ctid, values, isnull)) + { + MemoryContextReset(insertCtx); + initSpGistState(&spgstate, index); + } + + SpGistUpdateMetaPage(index); + + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(insertCtx); + + /* return false since we've not done any unique check */ + return false; +} diff --git a/src/backend/access/spgist/spgkdtreeproc.c b/src/backend/access/spgist/spgkdtreeproc.c new file mode 100644 index 0000000..d9b3f6a --- /dev/null +++ b/src/backend/access/spgist/spgkdtreeproc.c @@ -0,0 +1,349 @@ +/*------------------------------------------------------------------------- + * + * spgkdtreeproc.c + * implementation of k-d tree over points for SP-GiST + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgkdtreeproc.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/spgist.h" +#include "access/spgist_private.h" +#include "access/stratnum.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" +#include "utils/float.h" +#include "utils/geo_decls.h" + + +Datum +spg_kd_config(PG_FUNCTION_ARGS) +{ + /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */ + spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1); + + cfg->prefixType = FLOAT8OID; + cfg->labelType = VOIDOID; /* we don't need node labels */ + cfg->canReturnData = true; + cfg->longValuesOK = false; + PG_RETURN_VOID(); +} + +static int +getSide(double coord, bool isX, Point *tst) +{ + double tstcoord = (isX) ? tst->x : tst->y; + + if (coord == tstcoord) + return 0; + else if (coord > tstcoord) + return 1; + else + return -1; +} + +Datum +spg_kd_choose(PG_FUNCTION_ARGS) +{ + spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0); + spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1); + Point *inPoint = DatumGetPointP(in->datum); + double coord; + + if (in->allTheSame) + elog(ERROR, "allTheSame should not occur for k-d trees"); + + Assert(in->hasPrefix); + coord = DatumGetFloat8(in->prefixDatum); + + Assert(in->nNodes == 2); + + out->resultType = spgMatchNode; + out->result.matchNode.nodeN = + (getSide(coord, in->level % 2, inPoint) > 0) ? 0 : 1; + out->result.matchNode.levelAdd = 1; + out->result.matchNode.restDatum = PointPGetDatum(inPoint); + + PG_RETURN_VOID(); +} + +typedef struct SortedPoint +{ + Point *p; + int i; +} SortedPoint; + +static int +x_cmp(const void *a, const void *b) +{ + SortedPoint *pa = (SortedPoint *) a; + SortedPoint *pb = (SortedPoint *) b; + + if (pa->p->x == pb->p->x) + return 0; + return (pa->p->x > pb->p->x) ? 1 : -1; +} + +static int +y_cmp(const void *a, const void *b) +{ + SortedPoint *pa = (SortedPoint *) a; + SortedPoint *pb = (SortedPoint *) b; + + if (pa->p->y == pb->p->y) + return 0; + return (pa->p->y > pb->p->y) ? 1 : -1; +} + + +Datum +spg_kd_picksplit(PG_FUNCTION_ARGS) +{ + spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0); + spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1); + int i; + int middle; + SortedPoint *sorted; + double coord; + + sorted = palloc(sizeof(*sorted) * in->nTuples); + for (i = 0; i < in->nTuples; i++) + { + sorted[i].p = DatumGetPointP(in->datums[i]); + sorted[i].i = i; + } + + qsort(sorted, in->nTuples, sizeof(*sorted), + (in->level % 2) ? x_cmp : y_cmp); + middle = in->nTuples >> 1; + coord = (in->level % 2) ? sorted[middle].p->x : sorted[middle].p->y; + + out->hasPrefix = true; + out->prefixDatum = Float8GetDatum(coord); + + out->nNodes = 2; + out->nodeLabels = NULL; /* we don't need node labels */ + + out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples); + out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples); + + /* + * Note: points that have coordinates exactly equal to coord may get + * classified into either node, depending on where they happen to fall in + * the sorted list. This is okay as long as the inner_consistent function + * descends into both sides for such cases. This is better than the + * alternative of trying to have an exact boundary, because it keeps the + * tree balanced even when we have many instances of the same point value. + * So we should never trigger the allTheSame logic. + */ + for (i = 0; i < in->nTuples; i++) + { + Point *p = sorted[i].p; + int n = sorted[i].i; + + out->mapTuplesToNodes[n] = (i < middle) ? 0 : 1; + out->leafTupleDatums[n] = PointPGetDatum(p); + } + + PG_RETURN_VOID(); +} + +Datum +spg_kd_inner_consistent(PG_FUNCTION_ARGS) +{ + spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); + spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); + double coord; + int which; + int i; + BOX bboxes[2]; + + Assert(in->hasPrefix); + coord = DatumGetFloat8(in->prefixDatum); + + if (in->allTheSame) + elog(ERROR, "allTheSame should not occur for k-d trees"); + + Assert(in->nNodes == 2); + + /* "which" is a bitmask of children that satisfy all constraints */ + which = (1 << 1) | (1 << 2); + + for (i = 0; i < in->nkeys; i++) + { + Point *query = DatumGetPointP(in->scankeys[i].sk_argument); + BOX *boxQuery; + + switch (in->scankeys[i].sk_strategy) + { + case RTLeftStrategyNumber: + if ((in->level % 2) != 0 && FPlt(query->x, coord)) + which &= (1 << 1); + break; + case RTRightStrategyNumber: + if ((in->level % 2) != 0 && FPgt(query->x, coord)) + which &= (1 << 2); + break; + case RTSameStrategyNumber: + if ((in->level % 2) != 0) + { + if (FPlt(query->x, coord)) + which &= (1 << 1); + else if (FPgt(query->x, coord)) + which &= (1 << 2); + } + else + { + if (FPlt(query->y, coord)) + which &= (1 << 1); + else if (FPgt(query->y, coord)) + which &= (1 << 2); + } + break; + case RTBelowStrategyNumber: + case RTOldBelowStrategyNumber: + if ((in->level % 2) == 0 && FPlt(query->y, coord)) + which &= (1 << 1); + break; + case RTAboveStrategyNumber: + case RTOldAboveStrategyNumber: + if ((in->level % 2) == 0 && FPgt(query->y, coord)) + which &= (1 << 2); + break; + case RTContainedByStrategyNumber: + + /* + * For this operator, the query is a box not a point. We + * cheat to the extent of assuming that DatumGetPointP won't + * do anything that would be bad for a pointer-to-box. + */ + boxQuery = DatumGetBoxP(in->scankeys[i].sk_argument); + + if ((in->level % 2) != 0) + { + if (FPlt(boxQuery->high.x, coord)) + which &= (1 << 1); + else if (FPgt(boxQuery->low.x, coord)) + which &= (1 << 2); + } + else + { + if (FPlt(boxQuery->high.y, coord)) + which &= (1 << 1); + else if (FPgt(boxQuery->low.y, coord)) + which &= (1 << 2); + } + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[i].sk_strategy); + break; + } + + if (which == 0) + break; /* no need to consider remaining conditions */ + } + + /* We must descend into the children identified by which */ + out->nNodes = 0; + + /* Fast-path for no matching children */ + if (!which) + PG_RETURN_VOID(); + + out->nodeNumbers = (int *) palloc(sizeof(int) * 2); + + /* + * When ordering scan keys are specified, we've to calculate distance for + * them. In order to do that, we need calculate bounding boxes for both + * children nodes. Calculation of those bounding boxes on non-zero level + * require knowledge of bounding box of upper node. So, we save bounding + * boxes to traversalValues. + */ + if (in->norderbys > 0) + { + BOX infArea; + BOX *area; + + out->distances = (double **) palloc(sizeof(double *) * in->nNodes); + out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes); + + if (in->level == 0) + { + float8 inf = get_float8_infinity(); + + infArea.high.x = inf; + infArea.high.y = inf; + infArea.low.x = -inf; + infArea.low.y = -inf; + area = &infArea; + } + else + { + area = (BOX *) in->traversalValue; + Assert(area); + } + + bboxes[0].low = area->low; + bboxes[1].high = area->high; + + if (in->level % 2) + { + /* split box by x */ + bboxes[0].high.x = bboxes[1].low.x = coord; + bboxes[0].high.y = area->high.y; + bboxes[1].low.y = area->low.y; + } + else + { + /* split box by y */ + bboxes[0].high.y = bboxes[1].low.y = coord; + bboxes[0].high.x = area->high.x; + bboxes[1].low.x = area->low.x; + } + } + + for (i = 1; i <= 2; i++) + { + if (which & (1 << i)) + { + out->nodeNumbers[out->nNodes] = i - 1; + + if (in->norderbys > 0) + { + MemoryContext oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext); + BOX *box = box_copy(&bboxes[i - 1]); + + MemoryContextSwitchTo(oldCtx); + + out->traversalValues[out->nNodes] = box; + + out->distances[out->nNodes] = spg_key_orderbys_distances(BoxPGetDatum(box), false, + in->orderbys, in->norderbys); + } + + out->nNodes++; + } + } + + /* Set up level increments, too */ + out->levelAdds = (int *) palloc(sizeof(int) * 2); + out->levelAdds[0] = 1; + out->levelAdds[1] = 1; + + PG_RETURN_VOID(); +} + +/* + * spg_kd_leaf_consistent() is the same as spg_quad_leaf_consistent(), + * since we support the same operators and the same leaf data type. + * So we just borrow that function. + */ diff --git a/src/backend/access/spgist/spgproc.c b/src/backend/access/spgist/spgproc.c new file mode 100644 index 0000000..1bad5d6 --- /dev/null +++ b/src/backend/access/spgist/spgproc.c @@ -0,0 +1,88 @@ +/*------------------------------------------------------------------------- + * + * spgproc.c + * Common supporting procedures for SP-GiST opclasses. + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgproc.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <math.h> + +#include "access/spgist_private.h" +#include "utils/builtins.h" +#include "utils/float.h" +#include "utils/geo_decls.h" + +#define point_point_distance(p1,p2) \ + DatumGetFloat8(DirectFunctionCall2(point_distance, \ + PointPGetDatum(p1), PointPGetDatum(p2))) + +/* Point-box distance in the assumption that box is aligned by axis */ +static double +point_box_distance(Point *point, BOX *box) +{ + double dx, + dy; + + if (isnan(point->x) || isnan(box->low.x) || + isnan(point->y) || isnan(box->low.y)) + return get_float8_nan(); + + if (point->x < box->low.x) + dx = box->low.x - point->x; + else if (point->x > box->high.x) + dx = point->x - box->high.x; + else + dx = 0.0; + + if (point->y < box->low.y) + dy = box->low.y - point->y; + else if (point->y > box->high.y) + dy = point->y - box->high.y; + else + dy = 0.0; + + return HYPOT(dx, dy); +} + +/* + * Returns distances from given key to array of ordering scan keys. Leaf key + * is expected to be point, non-leaf key is expected to be box. Scan key + * arguments are expected to be points. + */ +double * +spg_key_orderbys_distances(Datum key, bool isLeaf, + ScanKey orderbys, int norderbys) +{ + int sk_num; + double *distances = (double *) palloc(norderbys * sizeof(double)), + *distance = distances; + + for (sk_num = 0; sk_num < norderbys; ++sk_num, ++orderbys, ++distance) + { + Point *point = DatumGetPointP(orderbys->sk_argument); + + *distance = isLeaf ? point_point_distance(point, DatumGetPointP(key)) + : point_box_distance(point, DatumGetBoxP(key)); + } + + return distances; +} + +BOX * +box_copy(BOX *orig) +{ + BOX *result = palloc(sizeof(BOX)); + + *result = *orig; + return result; +} diff --git a/src/backend/access/spgist/spgquadtreeproc.c b/src/backend/access/spgist/spgquadtreeproc.c new file mode 100644 index 0000000..a52d924 --- /dev/null +++ b/src/backend/access/spgist/spgquadtreeproc.c @@ -0,0 +1,471 @@ +/*------------------------------------------------------------------------- + * + * spgquadtreeproc.c + * implementation of quad tree over points for SP-GiST + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgquadtreeproc.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/spgist.h" +#include "access/spgist_private.h" +#include "access/stratnum.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" +#include "utils/float.h" +#include "utils/geo_decls.h" + +Datum +spg_quad_config(PG_FUNCTION_ARGS) +{ + /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */ + spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1); + + cfg->prefixType = POINTOID; + cfg->labelType = VOIDOID; /* we don't need node labels */ + cfg->canReturnData = true; + cfg->longValuesOK = false; + PG_RETURN_VOID(); +} + +#define SPTEST(f, x, y) \ + DatumGetBool(DirectFunctionCall2(f, PointPGetDatum(x), PointPGetDatum(y))) + +/* + * Determine which quadrant a point falls into, relative to the centroid. + * + * Quadrants are identified like this: + * + * 4 | 1 + * ----+----- + * 3 | 2 + * + * Points on one of the axes are taken to lie in the lowest-numbered + * adjacent quadrant. + */ +static int16 +getQuadrant(Point *centroid, Point *tst) +{ + if ((SPTEST(point_above, tst, centroid) || + SPTEST(point_horiz, tst, centroid)) && + (SPTEST(point_right, tst, centroid) || + SPTEST(point_vert, tst, centroid))) + return 1; + + if (SPTEST(point_below, tst, centroid) && + (SPTEST(point_right, tst, centroid) || + SPTEST(point_vert, tst, centroid))) + return 2; + + if ((SPTEST(point_below, tst, centroid) || + SPTEST(point_horiz, tst, centroid)) && + SPTEST(point_left, tst, centroid)) + return 3; + + if (SPTEST(point_above, tst, centroid) && + SPTEST(point_left, tst, centroid)) + return 4; + + elog(ERROR, "getQuadrant: impossible case"); + return 0; +} + +/* Returns bounding box of a given quadrant inside given bounding box */ +static BOX * +getQuadrantArea(BOX *bbox, Point *centroid, int quadrant) +{ + BOX *result = (BOX *) palloc(sizeof(BOX)); + + switch (quadrant) + { + case 1: + result->high = bbox->high; + result->low = *centroid; + break; + case 2: + result->high.x = bbox->high.x; + result->high.y = centroid->y; + result->low.x = centroid->x; + result->low.y = bbox->low.y; + break; + case 3: + result->high = *centroid; + result->low = bbox->low; + break; + case 4: + result->high.x = centroid->x; + result->high.y = bbox->high.y; + result->low.x = bbox->low.x; + result->low.y = centroid->y; + break; + } + + return result; +} + +Datum +spg_quad_choose(PG_FUNCTION_ARGS) +{ + spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0); + spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1); + Point *inPoint = DatumGetPointP(in->datum), + *centroid; + + if (in->allTheSame) + { + out->resultType = spgMatchNode; + /* nodeN will be set by core */ + out->result.matchNode.levelAdd = 0; + out->result.matchNode.restDatum = PointPGetDatum(inPoint); + PG_RETURN_VOID(); + } + + Assert(in->hasPrefix); + centroid = DatumGetPointP(in->prefixDatum); + + Assert(in->nNodes == 4); + + out->resultType = spgMatchNode; + out->result.matchNode.nodeN = getQuadrant(centroid, inPoint) - 1; + out->result.matchNode.levelAdd = 0; + out->result.matchNode.restDatum = PointPGetDatum(inPoint); + + PG_RETURN_VOID(); +} + +#ifdef USE_MEDIAN +static int +x_cmp(const void *a, const void *b, void *arg) +{ + Point *pa = *(Point **) a; + Point *pb = *(Point **) b; + + if (pa->x == pb->x) + return 0; + return (pa->x > pb->x) ? 1 : -1; +} + +static int +y_cmp(const void *a, const void *b, void *arg) +{ + Point *pa = *(Point **) a; + Point *pb = *(Point **) b; + + if (pa->y == pb->y) + return 0; + return (pa->y > pb->y) ? 1 : -1; +} +#endif + +Datum +spg_quad_picksplit(PG_FUNCTION_ARGS) +{ + spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0); + spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1); + int i; + Point *centroid; + +#ifdef USE_MEDIAN + /* Use the median values of x and y as the centroid point */ + Point **sorted; + + sorted = palloc(sizeof(*sorted) * in->nTuples); + for (i = 0; i < in->nTuples; i++) + sorted[i] = DatumGetPointP(in->datums[i]); + + centroid = palloc(sizeof(*centroid)); + + qsort(sorted, in->nTuples, sizeof(*sorted), x_cmp); + centroid->x = sorted[in->nTuples >> 1]->x; + qsort(sorted, in->nTuples, sizeof(*sorted), y_cmp); + centroid->y = sorted[in->nTuples >> 1]->y; +#else + /* Use the average values of x and y as the centroid point */ + centroid = palloc0(sizeof(*centroid)); + + for (i = 0; i < in->nTuples; i++) + { + centroid->x += DatumGetPointP(in->datums[i])->x; + centroid->y += DatumGetPointP(in->datums[i])->y; + } + + centroid->x /= in->nTuples; + centroid->y /= in->nTuples; +#endif + + out->hasPrefix = true; + out->prefixDatum = PointPGetDatum(centroid); + + out->nNodes = 4; + out->nodeLabels = NULL; /* we don't need node labels */ + + out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples); + out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples); + + for (i = 0; i < in->nTuples; i++) + { + Point *p = DatumGetPointP(in->datums[i]); + int quadrant = getQuadrant(centroid, p) - 1; + + out->leafTupleDatums[i] = PointPGetDatum(p); + out->mapTuplesToNodes[i] = quadrant; + } + + PG_RETURN_VOID(); +} + + +Datum +spg_quad_inner_consistent(PG_FUNCTION_ARGS) +{ + spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); + spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); + Point *centroid; + BOX infbbox; + BOX *bbox = NULL; + int which; + int i; + + Assert(in->hasPrefix); + centroid = DatumGetPointP(in->prefixDatum); + + /* + * When ordering scan keys are specified, we've to calculate distance for + * them. In order to do that, we need calculate bounding boxes for all + * children nodes. Calculation of those bounding boxes on non-zero level + * require knowledge of bounding box of upper node. So, we save bounding + * boxes to traversalValues. + */ + if (in->norderbys > 0) + { + out->distances = (double **) palloc(sizeof(double *) * in->nNodes); + out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes); + + if (in->level == 0) + { + double inf = get_float8_infinity(); + + infbbox.high.x = inf; + infbbox.high.y = inf; + infbbox.low.x = -inf; + infbbox.low.y = -inf; + bbox = &infbbox; + } + else + { + bbox = in->traversalValue; + Assert(bbox); + } + } + + if (in->allTheSame) + { + /* Report that all nodes should be visited */ + out->nNodes = in->nNodes; + out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes); + for (i = 0; i < in->nNodes; i++) + { + out->nodeNumbers[i] = i; + + if (in->norderbys > 0) + { + MemoryContext oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext); + + /* Use parent quadrant box as traversalValue */ + BOX *quadrant = box_copy(bbox); + + MemoryContextSwitchTo(oldCtx); + + out->traversalValues[i] = quadrant; + out->distances[i] = spg_key_orderbys_distances(BoxPGetDatum(quadrant), false, + in->orderbys, in->norderbys); + } + } + PG_RETURN_VOID(); + } + + Assert(in->nNodes == 4); + + /* "which" is a bitmask of quadrants that satisfy all constraints */ + which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4); + + for (i = 0; i < in->nkeys; i++) + { + Point *query = DatumGetPointP(in->scankeys[i].sk_argument); + BOX *boxQuery; + + switch (in->scankeys[i].sk_strategy) + { + case RTLeftStrategyNumber: + if (SPTEST(point_right, centroid, query)) + which &= (1 << 3) | (1 << 4); + break; + case RTRightStrategyNumber: + if (SPTEST(point_left, centroid, query)) + which &= (1 << 1) | (1 << 2); + break; + case RTSameStrategyNumber: + which &= (1 << getQuadrant(centroid, query)); + break; + case RTBelowStrategyNumber: + case RTOldBelowStrategyNumber: + if (SPTEST(point_above, centroid, query)) + which &= (1 << 2) | (1 << 3); + break; + case RTAboveStrategyNumber: + case RTOldAboveStrategyNumber: + if (SPTEST(point_below, centroid, query)) + which &= (1 << 1) | (1 << 4); + break; + case RTContainedByStrategyNumber: + + /* + * For this operator, the query is a box not a point. We + * cheat to the extent of assuming that DatumGetPointP won't + * do anything that would be bad for a pointer-to-box. + */ + boxQuery = DatumGetBoxP(in->scankeys[i].sk_argument); + + if (DatumGetBool(DirectFunctionCall2(box_contain_pt, + PointerGetDatum(boxQuery), + PointerGetDatum(centroid)))) + { + /* centroid is in box, so all quadrants are OK */ + } + else + { + /* identify quadrant(s) containing all corners of box */ + Point p; + int r = 0; + + p = boxQuery->low; + r |= 1 << getQuadrant(centroid, &p); + p.y = boxQuery->high.y; + r |= 1 << getQuadrant(centroid, &p); + p = boxQuery->high; + r |= 1 << getQuadrant(centroid, &p); + p.x = boxQuery->low.x; + r |= 1 << getQuadrant(centroid, &p); + + which &= r; + } + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[i].sk_strategy); + break; + } + + if (which == 0) + break; /* no need to consider remaining conditions */ + } + + out->levelAdds = palloc(sizeof(int) * 4); + for (i = 0; i < 4; ++i) + out->levelAdds[i] = 1; + + /* We must descend into the quadrant(s) identified by which */ + out->nodeNumbers = (int *) palloc(sizeof(int) * 4); + out->nNodes = 0; + + for (i = 1; i <= 4; i++) + { + if (which & (1 << i)) + { + out->nodeNumbers[out->nNodes] = i - 1; + + if (in->norderbys > 0) + { + MemoryContext oldCtx = MemoryContextSwitchTo(in->traversalMemoryContext); + BOX *quadrant = getQuadrantArea(bbox, centroid, i); + + MemoryContextSwitchTo(oldCtx); + + out->traversalValues[out->nNodes] = quadrant; + + out->distances[out->nNodes] = spg_key_orderbys_distances(BoxPGetDatum(quadrant), false, + in->orderbys, in->norderbys); + } + + out->nNodes++; + } + } + + PG_RETURN_VOID(); +} + + +Datum +spg_quad_leaf_consistent(PG_FUNCTION_ARGS) +{ + spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0); + spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1); + Point *datum = DatumGetPointP(in->leafDatum); + bool res; + int i; + + /* all tests are exact */ + out->recheck = false; + + /* leafDatum is what it is... */ + out->leafValue = in->leafDatum; + + /* Perform the required comparison(s) */ + res = true; + for (i = 0; i < in->nkeys; i++) + { + Point *query = DatumGetPointP(in->scankeys[i].sk_argument); + + switch (in->scankeys[i].sk_strategy) + { + case RTLeftStrategyNumber: + res = SPTEST(point_left, datum, query); + break; + case RTRightStrategyNumber: + res = SPTEST(point_right, datum, query); + break; + case RTSameStrategyNumber: + res = SPTEST(point_eq, datum, query); + break; + case RTBelowStrategyNumber: + case RTOldBelowStrategyNumber: + res = SPTEST(point_below, datum, query); + break; + case RTAboveStrategyNumber: + case RTOldAboveStrategyNumber: + res = SPTEST(point_above, datum, query); + break; + case RTContainedByStrategyNumber: + + /* + * For this operator, the query is a box not a point. We + * cheat to the extent of assuming that DatumGetPointP won't + * do anything that would be bad for a pointer-to-box. + */ + res = SPTEST(box_contain_pt, query, datum); + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[i].sk_strategy); + break; + } + + if (!res) + break; + } + + if (res && in->norderbys > 0) + /* ok, it passes -> let's compute the distances */ + out->distances = spg_key_orderbys_distances(in->leafDatum, true, + in->orderbys, in->norderbys); + + PG_RETURN_BOOL(res); +} diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c new file mode 100644 index 0000000..401a1a8 --- /dev/null +++ b/src/backend/access/spgist/spgscan.c @@ -0,0 +1,1097 @@ +/*------------------------------------------------------------------------- + * + * spgscan.c + * routines for scanning SP-GiST indexes + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgscan.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/relscan.h" +#include "access/spgist_private.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "utils/datum.h" +#include "utils/float.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" + +typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr, + Datum leafValue, bool isNull, + SpGistLeafTuple leafTuple, bool recheck, + bool recheckDistances, double *distances); + +/* + * Pairing heap comparison function for the SpGistSearchItem queue. + * KNN-searches currently only support NULLS LAST. So, preserve this logic + * here. + */ +static int +pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a, + const pairingheap_node *b, void *arg) +{ + const SpGistSearchItem *sa = (const SpGistSearchItem *) a; + const SpGistSearchItem *sb = (const SpGistSearchItem *) b; + SpGistScanOpaque so = (SpGistScanOpaque) arg; + int i; + + if (sa->isNull) + { + if (!sb->isNull) + return -1; + } + else if (sb->isNull) + { + return 1; + } + else + { + /* Order according to distance comparison */ + for (i = 0; i < so->numberOfNonNullOrderBys; i++) + { + if (isnan(sa->distances[i]) && isnan(sb->distances[i])) + continue; /* NaN == NaN */ + if (isnan(sa->distances[i])) + return -1; /* NaN > number */ + if (isnan(sb->distances[i])) + return 1; /* number < NaN */ + if (sa->distances[i] != sb->distances[i]) + return (sa->distances[i] < sb->distances[i]) ? 1 : -1; + } + } + + /* Leaf items go before inner pages, to ensure a depth-first search */ + if (sa->isLeaf && !sb->isLeaf) + return 1; + if (!sa->isLeaf && sb->isLeaf) + return -1; + + return 0; +} + +static void +spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item) +{ + /* value is of type attType if isLeaf, else of type attLeafType */ + /* (no, that is not backwards; yes, it's confusing) */ + if (!(item->isLeaf ? so->state.attType.attbyval : + so->state.attLeafType.attbyval) && + DatumGetPointer(item->value) != NULL) + pfree(DatumGetPointer(item->value)); + + if (item->leafTuple) + pfree(item->leafTuple); + + if (item->traversalValue) + pfree(item->traversalValue); + + pfree(item); +} + +/* + * Add SpGistSearchItem to queue + * + * Called in queue context + */ +static void +spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item) +{ + pairingheap_add(so->scanQueue, &item->phNode); +} + +static SpGistSearchItem * +spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances) +{ + /* allocate distance array only for non-NULL items */ + SpGistSearchItem *item = + palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfNonNullOrderBys)); + + item->isNull = isnull; + + if (!isnull && so->numberOfNonNullOrderBys > 0) + memcpy(item->distances, distances, + sizeof(item->distances[0]) * so->numberOfNonNullOrderBys); + + return item; +} + +static void +spgAddStartItem(SpGistScanOpaque so, bool isnull) +{ + SpGistSearchItem *startEntry = + spgAllocSearchItem(so, isnull, so->zeroDistances); + + ItemPointerSet(&startEntry->heapPtr, + isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO, + FirstOffsetNumber); + startEntry->isLeaf = false; + startEntry->level = 0; + startEntry->value = (Datum) 0; + startEntry->leafTuple = NULL; + startEntry->traversalValue = NULL; + startEntry->recheck = false; + startEntry->recheckDistances = false; + + spgAddSearchItemToQueue(so, startEntry); +} + +/* + * Initialize queue to search the root page, resetting + * any previously active scan + */ +static void +resetSpGistScanOpaque(SpGistScanOpaque so) +{ + MemoryContext oldCtx; + + MemoryContextReset(so->traversalCxt); + + oldCtx = MemoryContextSwitchTo(so->traversalCxt); + + /* initialize queue only for distance-ordered scans */ + so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so); + + if (so->searchNulls) + /* Add a work item to scan the null index entries */ + spgAddStartItem(so, true); + + if (so->searchNonNulls) + /* Add a work item to scan the non-null index entries */ + spgAddStartItem(so, false); + + MemoryContextSwitchTo(oldCtx); + + if (so->numberOfOrderBys > 0) + { + /* Must pfree distances to avoid memory leak */ + int i; + + for (i = 0; i < so->nPtrs; i++) + if (so->distances[i]) + pfree(so->distances[i]); + } + + if (so->want_itup) + { + /* Must pfree reconstructed tuples to avoid memory leak */ + int i; + + for (i = 0; i < so->nPtrs; i++) + pfree(so->reconTups[i]); + } + so->iPtr = so->nPtrs = 0; +} + +/* + * Prepare scan keys in SpGistScanOpaque from caller-given scan keys + * + * Sets searchNulls, searchNonNulls, numberOfKeys, keyData fields of *so. + * + * The point here is to eliminate null-related considerations from what the + * opclass consistent functions need to deal with. We assume all SPGiST- + * indexable operators are strict, so any null RHS value makes the scan + * condition unsatisfiable. We also pull out any IS NULL/IS NOT NULL + * conditions; their effect is reflected into searchNulls/searchNonNulls. + */ +static void +spgPrepareScanKeys(IndexScanDesc scan) +{ + SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque; + bool qual_ok; + bool haveIsNull; + bool haveNotNull; + int nkeys; + int i; + + so->numberOfOrderBys = scan->numberOfOrderBys; + so->orderByData = scan->orderByData; + + if (so->numberOfOrderBys <= 0) + so->numberOfNonNullOrderBys = 0; + else + { + int j = 0; + + /* + * Remove all NULL keys, but remember their offsets in the original + * array. + */ + for (i = 0; i < scan->numberOfOrderBys; i++) + { + ScanKey skey = &so->orderByData[i]; + + if (skey->sk_flags & SK_ISNULL) + so->nonNullOrderByOffsets[i] = -1; + else + { + if (i != j) + so->orderByData[j] = *skey; + + so->nonNullOrderByOffsets[i] = j++; + } + } + + so->numberOfNonNullOrderBys = j; + } + + if (scan->numberOfKeys <= 0) + { + /* If no quals, whole-index scan is required */ + so->searchNulls = true; + so->searchNonNulls = true; + so->numberOfKeys = 0; + return; + } + + /* Examine the given quals */ + qual_ok = true; + haveIsNull = haveNotNull = false; + nkeys = 0; + for (i = 0; i < scan->numberOfKeys; i++) + { + ScanKey skey = &scan->keyData[i]; + + if (skey->sk_flags & SK_SEARCHNULL) + haveIsNull = true; + else if (skey->sk_flags & SK_SEARCHNOTNULL) + haveNotNull = true; + else if (skey->sk_flags & SK_ISNULL) + { + /* ordinary qual with null argument - unsatisfiable */ + qual_ok = false; + break; + } + else + { + /* ordinary qual, propagate into so->keyData */ + so->keyData[nkeys++] = *skey; + /* this effectively creates a not-null requirement */ + haveNotNull = true; + } + } + + /* IS NULL in combination with something else is unsatisfiable */ + if (haveIsNull && haveNotNull) + qual_ok = false; + + /* Emit results */ + if (qual_ok) + { + so->searchNulls = haveIsNull; + so->searchNonNulls = haveNotNull; + so->numberOfKeys = nkeys; + } + else + { + so->searchNulls = false; + so->searchNonNulls = false; + so->numberOfKeys = 0; + } +} + +IndexScanDesc +spgbeginscan(Relation rel, int keysz, int orderbysz) +{ + IndexScanDesc scan; + SpGistScanOpaque so; + int i; + + scan = RelationGetIndexScan(rel, keysz, orderbysz); + + so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData)); + if (keysz > 0) + so->keyData = (ScanKey) palloc(sizeof(ScanKeyData) * keysz); + else + so->keyData = NULL; + initSpGistState(&so->state, scan->indexRelation); + + so->tempCxt = AllocSetContextCreate(CurrentMemoryContext, + "SP-GiST search temporary context", + ALLOCSET_DEFAULT_SIZES); + so->traversalCxt = AllocSetContextCreate(CurrentMemoryContext, + "SP-GiST traversal-value context", + ALLOCSET_DEFAULT_SIZES); + + /* + * Set up reconTupDesc and xs_hitupdesc in case it's an index-only scan, + * making sure that the key column is shown as being of type attType. + * (It's rather annoying to do this work when it might be wasted, but for + * most opclasses we can re-use the index reldesc instead of making one.) + */ + so->reconTupDesc = scan->xs_hitupdesc = + getSpGistTupleDesc(rel, &so->state.attType); + + /* Allocate various arrays needed for order-by scans */ + if (scan->numberOfOrderBys > 0) + { + /* This will be filled in spgrescan, but allocate the space here */ + so->orderByTypes = (Oid *) + palloc(sizeof(Oid) * scan->numberOfOrderBys); + so->nonNullOrderByOffsets = (int *) + palloc(sizeof(int) * scan->numberOfOrderBys); + + /* These arrays have constant contents, so we can fill them now */ + so->zeroDistances = (double *) + palloc(sizeof(double) * scan->numberOfOrderBys); + so->infDistances = (double *) + palloc(sizeof(double) * scan->numberOfOrderBys); + + for (i = 0; i < scan->numberOfOrderBys; i++) + { + so->zeroDistances[i] = 0.0; + so->infDistances[i] = get_float8_infinity(); + } + + scan->xs_orderbyvals = (Datum *) + palloc0(sizeof(Datum) * scan->numberOfOrderBys); + scan->xs_orderbynulls = (bool *) + palloc(sizeof(bool) * scan->numberOfOrderBys); + memset(scan->xs_orderbynulls, true, + sizeof(bool) * scan->numberOfOrderBys); + } + + fmgr_info_copy(&so->innerConsistentFn, + index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC), + CurrentMemoryContext); + + fmgr_info_copy(&so->leafConsistentFn, + index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC), + CurrentMemoryContext); + + so->indexCollation = rel->rd_indcollation[0]; + + scan->opaque = so; + + return scan; +} + +void +spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, + ScanKey orderbys, int norderbys) +{ + SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque; + + /* copy scankeys into local storage */ + if (scankey && scan->numberOfKeys > 0) + memmove(scan->keyData, scankey, + scan->numberOfKeys * sizeof(ScanKeyData)); + + /* initialize order-by data if needed */ + if (orderbys && scan->numberOfOrderBys > 0) + { + int i; + + memmove(scan->orderByData, orderbys, + scan->numberOfOrderBys * sizeof(ScanKeyData)); + + for (i = 0; i < scan->numberOfOrderBys; i++) + { + ScanKey skey = &scan->orderByData[i]; + + /* + * Look up the datatype returned by the original ordering + * operator. SP-GiST always uses a float8 for the distance + * function, but the ordering operator could be anything else. + * + * XXX: The distance function is only allowed to be lossy if the + * ordering operator's result type is float4 or float8. Otherwise + * we don't know how to return the distance to the executor. But + * we cannot check that here, as we won't know if the distance + * function is lossy until it returns *recheck = true for the + * first time. + */ + so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid); + } + } + + /* preprocess scankeys, set up the representation in *so */ + spgPrepareScanKeys(scan); + + /* set up starting queue entries */ + resetSpGistScanOpaque(so); + + /* count an indexscan for stats */ + pgstat_count_index_scan(scan->indexRelation); +} + +void +spgendscan(IndexScanDesc scan) +{ + SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque; + + MemoryContextDelete(so->tempCxt); + MemoryContextDelete(so->traversalCxt); + + if (so->keyData) + pfree(so->keyData); + + if (so->state.leafTupDesc && + so->state.leafTupDesc != RelationGetDescr(so->state.index)) + FreeTupleDesc(so->state.leafTupDesc); + + if (so->state.deadTupleStorage) + pfree(so->state.deadTupleStorage); + + if (scan->numberOfOrderBys > 0) + { + pfree(so->orderByTypes); + pfree(so->nonNullOrderByOffsets); + pfree(so->zeroDistances); + pfree(so->infDistances); + pfree(scan->xs_orderbyvals); + pfree(scan->xs_orderbynulls); + } + + pfree(so); +} + +/* + * Leaf SpGistSearchItem constructor, called in queue context + */ +static SpGistSearchItem * +spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple, + Datum leafValue, bool recheck, bool recheckDistances, + bool isnull, double *distances) +{ + SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances); + + item->level = level; + item->heapPtr = leafTuple->heapPtr; + + /* + * If we need the reconstructed value, copy it to queue cxt out of tmp + * cxt. Caution: the leaf_consistent method may not have supplied a value + * if we didn't ask it to, and mildly-broken methods might supply one of + * the wrong type. The correct leafValue type is attType not leafType. + */ + if (so->want_itup) + { + item->value = isnull ? (Datum) 0 : + datumCopy(leafValue, so->state.attType.attbyval, + so->state.attType.attlen); + + /* + * If we're going to need to reconstruct INCLUDE attributes, store the + * whole leaf tuple so we can get the INCLUDE attributes out of it. + */ + if (so->state.leafTupDesc->natts > 1) + { + item->leafTuple = palloc(leafTuple->size); + memcpy(item->leafTuple, leafTuple, leafTuple->size); + } + else + item->leafTuple = NULL; + } + else + { + item->value = (Datum) 0; + item->leafTuple = NULL; + } + item->traversalValue = NULL; + item->isLeaf = true; + item->recheck = recheck; + item->recheckDistances = recheckDistances; + + return item; +} + +/* + * Test whether a leaf tuple satisfies all the scan keys + * + * *reportedSome is set to true if: + * the scan is not ordered AND the item satisfies the scankeys + */ +static bool +spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, + SpGistLeafTuple leafTuple, bool isnull, + bool *reportedSome, storeRes_func storeRes) +{ + Datum leafValue; + double *distances; + bool result; + bool recheck; + bool recheckDistances; + + if (isnull) + { + /* Should not have arrived on a nulls page unless nulls are wanted */ + Assert(so->searchNulls); + leafValue = (Datum) 0; + distances = NULL; + recheck = false; + recheckDistances = false; + result = true; + } + else + { + spgLeafConsistentIn in; + spgLeafConsistentOut out; + + /* use temp context for calling leaf_consistent */ + MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt); + + in.scankeys = so->keyData; + in.nkeys = so->numberOfKeys; + in.orderbys = so->orderByData; + in.norderbys = so->numberOfNonNullOrderBys; + Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */ + in.reconstructedValue = item->value; + in.traversalValue = item->traversalValue; + in.level = item->level; + in.returnData = so->want_itup; + in.leafDatum = SGLTDATUM(leafTuple, &so->state); + + out.leafValue = (Datum) 0; + out.recheck = false; + out.distances = NULL; + out.recheckDistances = false; + + result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn, + so->indexCollation, + PointerGetDatum(&in), + PointerGetDatum(&out))); + recheck = out.recheck; + recheckDistances = out.recheckDistances; + leafValue = out.leafValue; + distances = out.distances; + + MemoryContextSwitchTo(oldCxt); + } + + if (result) + { + /* item passes the scankeys */ + if (so->numberOfNonNullOrderBys > 0) + { + /* the scan is ordered -> add the item to the queue */ + MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt); + SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level, + leafTuple, + leafValue, + recheck, + recheckDistances, + isnull, + distances); + + spgAddSearchItemToQueue(so, heapItem); + + MemoryContextSwitchTo(oldCxt); + } + else + { + /* non-ordered scan, so report the item right away */ + Assert(!recheckDistances); + storeRes(so, &leafTuple->heapPtr, leafValue, isnull, + leafTuple, recheck, false, NULL); + *reportedSome = true; + } + } + + return result; +} + +/* A bundle initializer for inner_consistent methods */ +static void +spgInitInnerConsistentIn(spgInnerConsistentIn *in, + SpGistScanOpaque so, + SpGistSearchItem *item, + SpGistInnerTuple innerTuple) +{ + in->scankeys = so->keyData; + in->orderbys = so->orderByData; + in->nkeys = so->numberOfKeys; + in->norderbys = so->numberOfNonNullOrderBys; + Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */ + in->reconstructedValue = item->value; + in->traversalMemoryContext = so->traversalCxt; + in->traversalValue = item->traversalValue; + in->level = item->level; + in->returnData = so->want_itup; + in->allTheSame = innerTuple->allTheSame; + in->hasPrefix = (innerTuple->prefixSize > 0); + in->prefixDatum = SGITDATUM(innerTuple, &so->state); + in->nNodes = innerTuple->nNodes; + in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple); +} + +static SpGistSearchItem * +spgMakeInnerItem(SpGistScanOpaque so, + SpGistSearchItem *parentItem, + SpGistNodeTuple tuple, + spgInnerConsistentOut *out, int i, bool isnull, + double *distances) +{ + SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances); + + item->heapPtr = tuple->t_tid; + item->level = out->levelAdds ? parentItem->level + out->levelAdds[i] + : parentItem->level; + + /* Must copy value out of temp context */ + /* (recall that reconstructed values are of type leafType) */ + item->value = out->reconstructedValues + ? datumCopy(out->reconstructedValues[i], + so->state.attLeafType.attbyval, + so->state.attLeafType.attlen) + : (Datum) 0; + + item->leafTuple = NULL; + + /* + * Elements of out.traversalValues should be allocated in + * in.traversalMemoryContext, which is actually a long lived context of + * index scan. + */ + item->traversalValue = + out->traversalValues ? out->traversalValues[i] : NULL; + + item->isLeaf = false; + item->recheck = false; + item->recheckDistances = false; + + return item; +} + +static void +spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item, + SpGistInnerTuple innerTuple, bool isnull) +{ + MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt); + spgInnerConsistentOut out; + int nNodes = innerTuple->nNodes; + int i; + + memset(&out, 0, sizeof(out)); + + if (!isnull) + { + spgInnerConsistentIn in; + + spgInitInnerConsistentIn(&in, so, item, innerTuple); + + /* use user-defined inner consistent method */ + FunctionCall2Coll(&so->innerConsistentFn, + so->indexCollation, + PointerGetDatum(&in), + PointerGetDatum(&out)); + } + else + { + /* force all children to be visited */ + out.nNodes = nNodes; + out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes); + for (i = 0; i < nNodes; i++) + out.nodeNumbers[i] = i; + } + + /* If allTheSame, they should all or none of them match */ + if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes) + elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple"); + + if (out.nNodes) + { + /* collect node pointers */ + SpGistNodeTuple node; + SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * nNodes); + + SGITITERATE(innerTuple, i, node) + { + nodes[i] = node; + } + + MemoryContextSwitchTo(so->traversalCxt); + + for (i = 0; i < out.nNodes; i++) + { + int nodeN = out.nodeNumbers[i]; + SpGistSearchItem *innerItem; + double *distances; + + Assert(nodeN >= 0 && nodeN < nNodes); + + node = nodes[nodeN]; + + if (!ItemPointerIsValid(&node->t_tid)) + continue; + + /* + * Use infinity distances if innerConsistentFn() failed to return + * them or if is a NULL item (their distances are really unused). + */ + distances = out.distances ? out.distances[i] : so->infDistances; + + innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull, + distances); + + spgAddSearchItemToQueue(so, innerItem); + } + } + + MemoryContextSwitchTo(oldCxt); +} + +/* Returns a next item in an (ordered) scan or null if the index is exhausted */ +static SpGistSearchItem * +spgGetNextQueueItem(SpGistScanOpaque so) +{ + if (pairingheap_is_empty(so->scanQueue)) + return NULL; /* Done when both heaps are empty */ + + /* Return item; caller is responsible to pfree it */ + return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue); +} + +enum SpGistSpecialOffsetNumbers +{ + SpGistBreakOffsetNumber = InvalidOffsetNumber, + SpGistRedirectOffsetNumber = MaxOffsetNumber + 1, + SpGistErrorOffsetNumber = MaxOffsetNumber + 2 +}; + +static OffsetNumber +spgTestLeafTuple(SpGistScanOpaque so, + SpGistSearchItem *item, + Page page, OffsetNumber offset, + bool isnull, bool isroot, + bool *reportedSome, + storeRes_func storeRes) +{ + SpGistLeafTuple leafTuple = (SpGistLeafTuple) + PageGetItem(page, PageGetItemId(page, offset)); + + if (leafTuple->tupstate != SPGIST_LIVE) + { + if (!isroot) /* all tuples on root should be live */ + { + if (leafTuple->tupstate == SPGIST_REDIRECT) + { + /* redirection tuple should be first in chain */ + Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr)); + /* transfer attention to redirect point */ + item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer; + Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO); + return SpGistRedirectOffsetNumber; + } + + if (leafTuple->tupstate == SPGIST_DEAD) + { + /* dead tuple should be first in chain */ + Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr)); + /* No live entries on this page */ + Assert(SGLT_GET_NEXTOFFSET(leafTuple) == InvalidOffsetNumber); + return SpGistBreakOffsetNumber; + } + } + + /* We should not arrive at a placeholder */ + elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate); + return SpGistErrorOffsetNumber; + } + + Assert(ItemPointerIsValid(&leafTuple->heapPtr)); + + spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes); + + return SGLT_GET_NEXTOFFSET(leafTuple); +} + +/* + * Walk the tree and report all tuples passing the scan quals to the storeRes + * subroutine. + * + * If scanWholeIndex is true, we'll do just that. If not, we'll stop at the + * next page boundary once we have reported at least one tuple. + */ +static void +spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex, + storeRes_func storeRes, Snapshot snapshot) +{ + Buffer buffer = InvalidBuffer; + bool reportedSome = false; + + while (scanWholeIndex || !reportedSome) + { + SpGistSearchItem *item = spgGetNextQueueItem(so); + + if (item == NULL) + break; /* No more items in queue -> done */ + +redirect: + /* Check for interrupts, just in case of infinite loop */ + CHECK_FOR_INTERRUPTS(); + + if (item->isLeaf) + { + /* We store heap items in the queue only in case of ordered search */ + Assert(so->numberOfNonNullOrderBys > 0); + storeRes(so, &item->heapPtr, item->value, item->isNull, + item->leafTuple, item->recheck, + item->recheckDistances, item->distances); + reportedSome = true; + } + else + { + BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr); + OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr); + Page page; + bool isnull; + + if (buffer == InvalidBuffer) + { + buffer = ReadBuffer(index, blkno); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + } + else if (blkno != BufferGetBlockNumber(buffer)) + { + UnlockReleaseBuffer(buffer); + buffer = ReadBuffer(index, blkno); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + } + + /* else new pointer points to the same page, no work needed */ + + page = BufferGetPage(buffer); + TestForOldSnapshot(snapshot, index, page); + + isnull = SpGistPageStoresNulls(page) ? true : false; + + if (SpGistPageIsLeaf(page)) + { + /* Page is a leaf - that is, all it's tuples are heap items */ + OffsetNumber max = PageGetMaxOffsetNumber(page); + + if (SpGistBlockIsRoot(blkno)) + { + /* When root is a leaf, examine all its tuples */ + for (offset = FirstOffsetNumber; offset <= max; offset++) + (void) spgTestLeafTuple(so, item, page, offset, + isnull, true, + &reportedSome, storeRes); + } + else + { + /* Normal case: just examine the chain we arrived at */ + while (offset != InvalidOffsetNumber) + { + Assert(offset >= FirstOffsetNumber && offset <= max); + offset = spgTestLeafTuple(so, item, page, offset, + isnull, false, + &reportedSome, storeRes); + if (offset == SpGistRedirectOffsetNumber) + goto redirect; + } + } + } + else /* page is inner */ + { + SpGistInnerTuple innerTuple = (SpGistInnerTuple) + PageGetItem(page, PageGetItemId(page, offset)); + + if (innerTuple->tupstate != SPGIST_LIVE) + { + if (innerTuple->tupstate == SPGIST_REDIRECT) + { + /* transfer attention to redirect point */ + item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer; + Assert(ItemPointerGetBlockNumber(&item->heapPtr) != + SPGIST_METAPAGE_BLKNO); + goto redirect; + } + elog(ERROR, "unexpected SPGiST tuple state: %d", + innerTuple->tupstate); + } + + spgInnerTest(so, item, innerTuple, isnull); + } + } + + /* done with this scan item */ + spgFreeSearchItem(so, item); + /* clear temp context before proceeding to the next one */ + MemoryContextReset(so->tempCxt); + } + + if (buffer != InvalidBuffer) + UnlockReleaseBuffer(buffer); +} + + +/* storeRes subroutine for getbitmap case */ +static void +storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr, + Datum leafValue, bool isnull, + SpGistLeafTuple leafTuple, bool recheck, + bool recheckDistances, double *distances) +{ + Assert(!recheckDistances && !distances); + tbm_add_tuples(so->tbm, heapPtr, 1, recheck); + so->ntids++; +} + +int64 +spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm) +{ + SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque; + + /* Copy want_itup to *so so we don't need to pass it around separately */ + so->want_itup = false; + + so->tbm = tbm; + so->ntids = 0; + + spgWalk(scan->indexRelation, so, true, storeBitmap, scan->xs_snapshot); + + return so->ntids; +} + +/* storeRes subroutine for gettuple case */ +static void +storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr, + Datum leafValue, bool isnull, + SpGistLeafTuple leafTuple, bool recheck, + bool recheckDistances, double *nonNullDistances) +{ + Assert(so->nPtrs < MaxIndexTuplesPerPage); + so->heapPtrs[so->nPtrs] = *heapPtr; + so->recheck[so->nPtrs] = recheck; + so->recheckDistances[so->nPtrs] = recheckDistances; + + if (so->numberOfOrderBys > 0) + { + if (isnull || so->numberOfNonNullOrderBys <= 0) + so->distances[so->nPtrs] = NULL; + else + { + IndexOrderByDistance *distances = + palloc(sizeof(distances[0]) * so->numberOfOrderBys); + int i; + + for (i = 0; i < so->numberOfOrderBys; i++) + { + int offset = so->nonNullOrderByOffsets[i]; + + if (offset >= 0) + { + /* Copy non-NULL distance value */ + distances[i].value = nonNullDistances[offset]; + distances[i].isnull = false; + } + else + { + /* Set distance's NULL flag. */ + distances[i].value = 0.0; + distances[i].isnull = true; + } + } + + so->distances[so->nPtrs] = distances; + } + } + + if (so->want_itup) + { + /* + * Reconstruct index data. We have to copy the datum out of the temp + * context anyway, so we may as well create the tuple here. + */ + Datum leafDatums[INDEX_MAX_KEYS]; + bool leafIsnulls[INDEX_MAX_KEYS]; + + /* We only need to deform the old tuple if it has INCLUDE attributes */ + if (so->state.leafTupDesc->natts > 1) + spgDeformLeafTuple(leafTuple, so->state.leafTupDesc, + leafDatums, leafIsnulls, isnull); + + leafDatums[spgKeyColumn] = leafValue; + leafIsnulls[spgKeyColumn] = isnull; + + so->reconTups[so->nPtrs] = heap_form_tuple(so->reconTupDesc, + leafDatums, + leafIsnulls); + } + so->nPtrs++; +} + +bool +spggettuple(IndexScanDesc scan, ScanDirection dir) +{ + SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque; + + if (dir != ForwardScanDirection) + elog(ERROR, "SP-GiST only supports forward scan direction"); + + /* Copy want_itup to *so so we don't need to pass it around separately */ + so->want_itup = scan->xs_want_itup; + + for (;;) + { + if (so->iPtr < so->nPtrs) + { + /* continuing to return reported tuples */ + scan->xs_heaptid = so->heapPtrs[so->iPtr]; + scan->xs_recheck = so->recheck[so->iPtr]; + scan->xs_hitup = so->reconTups[so->iPtr]; + + if (so->numberOfOrderBys > 0) + index_store_float8_orderby_distances(scan, so->orderByTypes, + so->distances[so->iPtr], + so->recheckDistances[so->iPtr]); + so->iPtr++; + return true; + } + + if (so->numberOfOrderBys > 0) + { + /* Must pfree distances to avoid memory leak */ + int i; + + for (i = 0; i < so->nPtrs; i++) + if (so->distances[i]) + pfree(so->distances[i]); + } + + if (so->want_itup) + { + /* Must pfree reconstructed tuples to avoid memory leak */ + int i; + + for (i = 0; i < so->nPtrs; i++) + pfree(so->reconTups[i]); + } + so->iPtr = so->nPtrs = 0; + + spgWalk(scan->indexRelation, so, false, storeGettuple, + scan->xs_snapshot); + + if (so->nPtrs == 0) + break; /* must have completed scan */ + } + + return false; +} + +bool +spgcanreturn(Relation index, int attno) +{ + SpGistCache *cache; + + /* INCLUDE attributes can always be fetched for index-only scans */ + if (attno > 1) + return true; + + /* We can do it if the opclass config function says so */ + cache = spgGetCache(index); + + return cache->config.canReturnData; +} diff --git a/src/backend/access/spgist/spgtextproc.c b/src/backend/access/spgist/spgtextproc.c new file mode 100644 index 0000000..f340555 --- /dev/null +++ b/src/backend/access/spgist/spgtextproc.c @@ -0,0 +1,699 @@ +/*------------------------------------------------------------------------- + * + * spgtextproc.c + * implementation of radix tree (compressed trie) over text + * + * In a text_ops SPGiST index, inner tuples can have a prefix which is the + * common prefix of all strings indexed under that tuple. The node labels + * represent the next byte of the string(s) after the prefix. Assuming we + * always use the longest possible prefix, we will get more than one node + * label unless the prefix length is restricted by SPGIST_MAX_PREFIX_LENGTH. + * + * To reconstruct the indexed string for any index entry, concatenate the + * inner-tuple prefixes and node labels starting at the root and working + * down to the leaf entry, then append the datum in the leaf entry. + * (While descending the tree, "level" is the number of bytes reconstructed + * so far.) + * + * However, there are two special cases for node labels: -1 indicates that + * there are no more bytes after the prefix-so-far, and -2 indicates that we + * had to split an existing allTheSame tuple (in such a case we have to create + * a node label that doesn't correspond to any string byte). In either case, + * the node label does not contribute anything to the reconstructed string. + * + * Previously, we used a node label of zero for both special cases, but + * this was problematic because one can't tell whether a string ending at + * the current level can be pushed down into such a child node. For + * backwards compatibility, we still support such node labels for reading; + * but no new entries will ever be pushed down into a zero-labeled child. + * No new entries ever get pushed into a -2-labeled child, either. + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgtextproc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/spgist.h" +#include "catalog/pg_type.h" +#include "mb/pg_wchar.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/pg_locale.h" +#include "utils/varlena.h" + + +/* + * In the worst case, an inner tuple in a text radix tree could have as many + * as 258 nodes (one for each possible byte value, plus the two special + * cases). Each node can take 16 bytes on MAXALIGN=8 machines. The inner + * tuple must fit on an index page of size BLCKSZ. Rather than assuming we + * know the exact amount of overhead imposed by page headers, tuple headers, + * etc, we leave 100 bytes for that (the actual overhead should be no more + * than 56 bytes at this writing, so there is slop in this number). + * So we can safely create prefixes up to BLCKSZ - 258 * 16 - 100 bytes long. + * Unfortunately, because 258 * 16 is over 4K, there is no safe prefix length + * when BLCKSZ is less than 8K; it is always possible to get "SPGiST inner + * tuple size exceeds maximum" if there are too many distinct next-byte values + * at a given place in the tree. Since use of nonstandard block sizes appears + * to be negligible in the field, we just live with that fact for now, + * choosing a max prefix size of 32 bytes when BLCKSZ is configured smaller + * than default. + */ +#define SPGIST_MAX_PREFIX_LENGTH Max((int) (BLCKSZ - 258 * 16 - 100), 32) + +/* + * Strategy for collation aware operator on text is equal to btree strategy + * plus value of 10. + * + * Current collation aware strategies and their corresponding btree strategies: + * 11 BTLessStrategyNumber + * 12 BTLessEqualStrategyNumber + * 14 BTGreaterEqualStrategyNumber + * 15 BTGreaterStrategyNumber + */ +#define SPG_STRATEGY_ADDITION (10) +#define SPG_IS_COLLATION_AWARE_STRATEGY(s) ((s) > SPG_STRATEGY_ADDITION \ + && (s) != RTPrefixStrategyNumber) + +/* Struct for sorting values in picksplit */ +typedef struct spgNodePtr +{ + Datum d; + int i; + int16 c; +} spgNodePtr; + + +Datum +spg_text_config(PG_FUNCTION_ARGS) +{ + /* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */ + spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1); + + cfg->prefixType = TEXTOID; + cfg->labelType = INT2OID; + cfg->canReturnData = true; + cfg->longValuesOK = true; /* suffixing will shorten long values */ + PG_RETURN_VOID(); +} + +/* + * Form a text datum from the given not-necessarily-null-terminated string, + * using short varlena header format if possible + */ +static Datum +formTextDatum(const char *data, int datalen) +{ + char *p; + + p = (char *) palloc(datalen + VARHDRSZ); + + if (datalen + VARHDRSZ_SHORT <= VARATT_SHORT_MAX) + { + SET_VARSIZE_SHORT(p, datalen + VARHDRSZ_SHORT); + if (datalen) + memcpy(p + VARHDRSZ_SHORT, data, datalen); + } + else + { + SET_VARSIZE(p, datalen + VARHDRSZ); + memcpy(p + VARHDRSZ, data, datalen); + } + + return PointerGetDatum(p); +} + +/* + * Find the length of the common prefix of a and b + */ +static int +commonPrefix(const char *a, const char *b, int lena, int lenb) +{ + int i = 0; + + while (i < lena && i < lenb && *a == *b) + { + a++; + b++; + i++; + } + + return i; +} + +/* + * Binary search an array of int16 datums for a match to c + * + * On success, *i gets the match location; on failure, it gets where to insert + */ +static bool +searchChar(Datum *nodeLabels, int nNodes, int16 c, int *i) +{ + int StopLow = 0, + StopHigh = nNodes; + + while (StopLow < StopHigh) + { + int StopMiddle = (StopLow + StopHigh) >> 1; + int16 middle = DatumGetInt16(nodeLabels[StopMiddle]); + + if (c < middle) + StopHigh = StopMiddle; + else if (c > middle) + StopLow = StopMiddle + 1; + else + { + *i = StopMiddle; + return true; + } + } + + *i = StopHigh; + return false; +} + +Datum +spg_text_choose(PG_FUNCTION_ARGS) +{ + spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0); + spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1); + text *inText = DatumGetTextPP(in->datum); + char *inStr = VARDATA_ANY(inText); + int inSize = VARSIZE_ANY_EXHDR(inText); + char *prefixStr = NULL; + int prefixSize = 0; + int commonLen = 0; + int16 nodeChar = 0; + int i = 0; + + /* Check for prefix match, set nodeChar to first byte after prefix */ + if (in->hasPrefix) + { + text *prefixText = DatumGetTextPP(in->prefixDatum); + + prefixStr = VARDATA_ANY(prefixText); + prefixSize = VARSIZE_ANY_EXHDR(prefixText); + + commonLen = commonPrefix(inStr + in->level, + prefixStr, + inSize - in->level, + prefixSize); + + if (commonLen == prefixSize) + { + if (inSize - in->level > commonLen) + nodeChar = *(unsigned char *) (inStr + in->level + commonLen); + else + nodeChar = -1; + } + else + { + /* Must split tuple because incoming value doesn't match prefix */ + out->resultType = spgSplitTuple; + + if (commonLen == 0) + { + out->result.splitTuple.prefixHasPrefix = false; + } + else + { + out->result.splitTuple.prefixHasPrefix = true; + out->result.splitTuple.prefixPrefixDatum = + formTextDatum(prefixStr, commonLen); + } + out->result.splitTuple.prefixNNodes = 1; + out->result.splitTuple.prefixNodeLabels = + (Datum *) palloc(sizeof(Datum)); + out->result.splitTuple.prefixNodeLabels[0] = + Int16GetDatum(*(unsigned char *) (prefixStr + commonLen)); + + out->result.splitTuple.childNodeN = 0; + + if (prefixSize - commonLen == 1) + { + out->result.splitTuple.postfixHasPrefix = false; + } + else + { + out->result.splitTuple.postfixHasPrefix = true; + out->result.splitTuple.postfixPrefixDatum = + formTextDatum(prefixStr + commonLen + 1, + prefixSize - commonLen - 1); + } + + PG_RETURN_VOID(); + } + } + else if (inSize > in->level) + { + nodeChar = *(unsigned char *) (inStr + in->level); + } + else + { + nodeChar = -1; + } + + /* Look up nodeChar in the node label array */ + if (searchChar(in->nodeLabels, in->nNodes, nodeChar, &i)) + { + /* + * Descend to existing node. (If in->allTheSame, the core code will + * ignore our nodeN specification here, but that's OK. We still have + * to provide the correct levelAdd and restDatum values, and those are + * the same regardless of which node gets chosen by core.) + */ + int levelAdd; + + out->resultType = spgMatchNode; + out->result.matchNode.nodeN = i; + levelAdd = commonLen; + if (nodeChar >= 0) + levelAdd++; + out->result.matchNode.levelAdd = levelAdd; + if (inSize - in->level - levelAdd > 0) + out->result.matchNode.restDatum = + formTextDatum(inStr + in->level + levelAdd, + inSize - in->level - levelAdd); + else + out->result.matchNode.restDatum = + formTextDatum(NULL, 0); + } + else if (in->allTheSame) + { + /* + * Can't use AddNode action, so split the tuple. The upper tuple has + * the same prefix as before and uses a dummy node label -2 for the + * lower tuple. The lower tuple has no prefix and the same node + * labels as the original tuple. + * + * Note: it might seem tempting to shorten the upper tuple's prefix, + * if it has one, then use its last byte as label for the lower tuple. + * But that doesn't win since we know the incoming value matches the + * whole prefix: we'd just end up splitting the lower tuple again. + */ + out->resultType = spgSplitTuple; + out->result.splitTuple.prefixHasPrefix = in->hasPrefix; + out->result.splitTuple.prefixPrefixDatum = in->prefixDatum; + out->result.splitTuple.prefixNNodes = 1; + out->result.splitTuple.prefixNodeLabels = (Datum *) palloc(sizeof(Datum)); + out->result.splitTuple.prefixNodeLabels[0] = Int16GetDatum(-2); + out->result.splitTuple.childNodeN = 0; + out->result.splitTuple.postfixHasPrefix = false; + } + else + { + /* Add a node for the not-previously-seen nodeChar value */ + out->resultType = spgAddNode; + out->result.addNode.nodeLabel = Int16GetDatum(nodeChar); + out->result.addNode.nodeN = i; + } + + PG_RETURN_VOID(); +} + +/* qsort comparator to sort spgNodePtr structs by "c" */ +static int +cmpNodePtr(const void *a, const void *b) +{ + const spgNodePtr *aa = (const spgNodePtr *) a; + const spgNodePtr *bb = (const spgNodePtr *) b; + + return aa->c - bb->c; +} + +Datum +spg_text_picksplit(PG_FUNCTION_ARGS) +{ + spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0); + spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1); + text *text0 = DatumGetTextPP(in->datums[0]); + int i, + commonLen; + spgNodePtr *nodes; + + /* Identify longest common prefix, if any */ + commonLen = VARSIZE_ANY_EXHDR(text0); + for (i = 1; i < in->nTuples && commonLen > 0; i++) + { + text *texti = DatumGetTextPP(in->datums[i]); + int tmp = commonPrefix(VARDATA_ANY(text0), + VARDATA_ANY(texti), + VARSIZE_ANY_EXHDR(text0), + VARSIZE_ANY_EXHDR(texti)); + + if (tmp < commonLen) + commonLen = tmp; + } + + /* + * Limit the prefix length, if necessary, to ensure that the resulting + * inner tuple will fit on a page. + */ + commonLen = Min(commonLen, SPGIST_MAX_PREFIX_LENGTH); + + /* Set node prefix to be that string, if it's not empty */ + if (commonLen == 0) + { + out->hasPrefix = false; + } + else + { + out->hasPrefix = true; + out->prefixDatum = formTextDatum(VARDATA_ANY(text0), commonLen); + } + + /* Extract the node label (first non-common byte) from each value */ + nodes = (spgNodePtr *) palloc(sizeof(spgNodePtr) * in->nTuples); + + for (i = 0; i < in->nTuples; i++) + { + text *texti = DatumGetTextPP(in->datums[i]); + + if (commonLen < VARSIZE_ANY_EXHDR(texti)) + nodes[i].c = *(unsigned char *) (VARDATA_ANY(texti) + commonLen); + else + nodes[i].c = -1; /* use -1 if string is all common */ + nodes[i].i = i; + nodes[i].d = in->datums[i]; + } + + /* + * Sort by label values so that we can group the values into nodes. This + * also ensures that the nodes are ordered by label value, allowing the + * use of binary search in searchChar. + */ + qsort(nodes, in->nTuples, sizeof(*nodes), cmpNodePtr); + + /* And emit results */ + out->nNodes = 0; + out->nodeLabels = (Datum *) palloc(sizeof(Datum) * in->nTuples); + out->mapTuplesToNodes = (int *) palloc(sizeof(int) * in->nTuples); + out->leafTupleDatums = (Datum *) palloc(sizeof(Datum) * in->nTuples); + + for (i = 0; i < in->nTuples; i++) + { + text *texti = DatumGetTextPP(nodes[i].d); + Datum leafD; + + if (i == 0 || nodes[i].c != nodes[i - 1].c) + { + out->nodeLabels[out->nNodes] = Int16GetDatum(nodes[i].c); + out->nNodes++; + } + + if (commonLen < VARSIZE_ANY_EXHDR(texti)) + leafD = formTextDatum(VARDATA_ANY(texti) + commonLen + 1, + VARSIZE_ANY_EXHDR(texti) - commonLen - 1); + else + leafD = formTextDatum(NULL, 0); + + out->leafTupleDatums[nodes[i].i] = leafD; + out->mapTuplesToNodes[nodes[i].i] = out->nNodes - 1; + } + + PG_RETURN_VOID(); +} + +Datum +spg_text_inner_consistent(PG_FUNCTION_ARGS) +{ + spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0); + spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1); + bool collate_is_c = lc_collate_is_c(PG_GET_COLLATION()); + text *reconstructedValue; + text *reconstrText; + int maxReconstrLen; + text *prefixText = NULL; + int prefixSize = 0; + int i; + + /* + * Reconstruct values represented at this tuple, including parent data, + * prefix of this tuple if any, and the node label if it's non-dummy. + * in->level should be the length of the previously reconstructed value, + * and the number of bytes added here is prefixSize or prefixSize + 1. + * + * Note: we assume that in->reconstructedValue isn't toasted and doesn't + * have a short varlena header. This is okay because it must have been + * created by a previous invocation of this routine, and we always emit + * long-format reconstructed values. + */ + reconstructedValue = (text *) DatumGetPointer(in->reconstructedValue); + Assert(reconstructedValue == NULL ? in->level == 0 : + VARSIZE_ANY_EXHDR(reconstructedValue) == in->level); + + maxReconstrLen = in->level + 1; + if (in->hasPrefix) + { + prefixText = DatumGetTextPP(in->prefixDatum); + prefixSize = VARSIZE_ANY_EXHDR(prefixText); + maxReconstrLen += prefixSize; + } + + reconstrText = palloc(VARHDRSZ + maxReconstrLen); + SET_VARSIZE(reconstrText, VARHDRSZ + maxReconstrLen); + + if (in->level) + memcpy(VARDATA(reconstrText), + VARDATA(reconstructedValue), + in->level); + if (prefixSize) + memcpy(((char *) VARDATA(reconstrText)) + in->level, + VARDATA_ANY(prefixText), + prefixSize); + /* last byte of reconstrText will be filled in below */ + + /* + * Scan the child nodes. For each one, complete the reconstructed value + * and see if it's consistent with the query. If so, emit an entry into + * the output arrays. + */ + out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes); + out->levelAdds = (int *) palloc(sizeof(int) * in->nNodes); + out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes); + out->nNodes = 0; + + for (i = 0; i < in->nNodes; i++) + { + int16 nodeChar = DatumGetInt16(in->nodeLabels[i]); + int thisLen; + bool res = true; + int j; + + /* If nodeChar is a dummy value, don't include it in data */ + if (nodeChar <= 0) + thisLen = maxReconstrLen - 1; + else + { + ((unsigned char *) VARDATA(reconstrText))[maxReconstrLen - 1] = nodeChar; + thisLen = maxReconstrLen; + } + + for (j = 0; j < in->nkeys; j++) + { + StrategyNumber strategy = in->scankeys[j].sk_strategy; + text *inText; + int inSize; + int r; + + /* + * If it's a collation-aware operator, but the collation is C, we + * can treat it as non-collation-aware. With non-C collation we + * need to traverse whole tree :-( so there's no point in making + * any check here. (Note also that our reconstructed value may + * well end with a partial multibyte character, so that applying + * any encoding-sensitive test to it would be risky anyhow.) + */ + if (SPG_IS_COLLATION_AWARE_STRATEGY(strategy)) + { + if (collate_is_c) + strategy -= SPG_STRATEGY_ADDITION; + else + continue; + } + + inText = DatumGetTextPP(in->scankeys[j].sk_argument); + inSize = VARSIZE_ANY_EXHDR(inText); + + r = memcmp(VARDATA(reconstrText), VARDATA_ANY(inText), + Min(inSize, thisLen)); + + switch (strategy) + { + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + if (r > 0) + res = false; + break; + case BTEqualStrategyNumber: + if (r != 0 || inSize < thisLen) + res = false; + break; + case BTGreaterEqualStrategyNumber: + case BTGreaterStrategyNumber: + if (r < 0) + res = false; + break; + case RTPrefixStrategyNumber: + if (r != 0) + res = false; + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[j].sk_strategy); + break; + } + + if (!res) + break; /* no need to consider remaining conditions */ + } + + if (res) + { + out->nodeNumbers[out->nNodes] = i; + out->levelAdds[out->nNodes] = thisLen - in->level; + SET_VARSIZE(reconstrText, VARHDRSZ + thisLen); + out->reconstructedValues[out->nNodes] = + datumCopy(PointerGetDatum(reconstrText), false, -1); + out->nNodes++; + } + } + + PG_RETURN_VOID(); +} + +Datum +spg_text_leaf_consistent(PG_FUNCTION_ARGS) +{ + spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0); + spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1); + int level = in->level; + text *leafValue, + *reconstrValue = NULL; + char *fullValue; + int fullLen; + bool res; + int j; + + /* all tests are exact */ + out->recheck = false; + + leafValue = DatumGetTextPP(in->leafDatum); + + /* As above, in->reconstructedValue isn't toasted or short. */ + if (DatumGetPointer(in->reconstructedValue)) + reconstrValue = (text *) DatumGetPointer(in->reconstructedValue); + + Assert(reconstrValue == NULL ? level == 0 : + VARSIZE_ANY_EXHDR(reconstrValue) == level); + + /* Reconstruct the full string represented by this leaf tuple */ + fullLen = level + VARSIZE_ANY_EXHDR(leafValue); + if (VARSIZE_ANY_EXHDR(leafValue) == 0 && level > 0) + { + fullValue = VARDATA(reconstrValue); + out->leafValue = PointerGetDatum(reconstrValue); + } + else + { + text *fullText = palloc(VARHDRSZ + fullLen); + + SET_VARSIZE(fullText, VARHDRSZ + fullLen); + fullValue = VARDATA(fullText); + if (level) + memcpy(fullValue, VARDATA(reconstrValue), level); + if (VARSIZE_ANY_EXHDR(leafValue) > 0) + memcpy(fullValue + level, VARDATA_ANY(leafValue), + VARSIZE_ANY_EXHDR(leafValue)); + out->leafValue = PointerGetDatum(fullText); + } + + /* Perform the required comparison(s) */ + res = true; + for (j = 0; j < in->nkeys; j++) + { + StrategyNumber strategy = in->scankeys[j].sk_strategy; + text *query = DatumGetTextPP(in->scankeys[j].sk_argument); + int queryLen = VARSIZE_ANY_EXHDR(query); + int r; + + if (strategy == RTPrefixStrategyNumber) + { + /* + * if level >= length of query then reconstrValue must begin with + * query (prefix) string, so we don't need to check it again. + */ + res = (level >= queryLen) || + DatumGetBool(DirectFunctionCall2Coll(text_starts_with, + PG_GET_COLLATION(), + out->leafValue, + PointerGetDatum(query))); + + if (!res) /* no need to consider remaining conditions */ + break; + + continue; + } + + if (SPG_IS_COLLATION_AWARE_STRATEGY(strategy)) + { + /* Collation-aware comparison */ + strategy -= SPG_STRATEGY_ADDITION; + + /* If asserts enabled, verify encoding of reconstructed string */ + Assert(pg_verifymbstr(fullValue, fullLen, false)); + + r = varstr_cmp(fullValue, fullLen, + VARDATA_ANY(query), queryLen, + PG_GET_COLLATION()); + } + else + { + /* Non-collation-aware comparison */ + r = memcmp(fullValue, VARDATA_ANY(query), Min(queryLen, fullLen)); + + if (r == 0) + { + if (queryLen > fullLen) + r = -1; + else if (queryLen < fullLen) + r = 1; + } + } + + switch (strategy) + { + case BTLessStrategyNumber: + res = (r < 0); + break; + case BTLessEqualStrategyNumber: + res = (r <= 0); + break; + case BTEqualStrategyNumber: + res = (r == 0); + break; + case BTGreaterEqualStrategyNumber: + res = (r >= 0); + break; + case BTGreaterStrategyNumber: + res = (r > 0); + break; + default: + elog(ERROR, "unrecognized strategy number: %d", + in->scankeys[j].sk_strategy); + res = false; + break; + } + + if (!res) + break; /* no need to consider remaining conditions */ + } + + PG_RETURN_BOOL(res); +} diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c new file mode 100644 index 0000000..4484805 --- /dev/null +++ b/src/backend/access/spgist/spgutils.c @@ -0,0 +1,1350 @@ +/*------------------------------------------------------------------------- + * + * spgutils.c + * various support functions for SP-GiST + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgutils.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/amvalidate.h" +#include "access/htup_details.h" +#include "access/reloptions.h" +#include "access/spgist_private.h" +#include "access/toast_compression.h" +#include "access/transam.h" +#include "access/xact.h" +#include "catalog/pg_amop.h" +#include "commands/vacuum.h" +#include "nodes/nodeFuncs.h" +#include "parser/parse_coerce.h" +#include "storage/bufmgr.h" +#include "storage/indexfsm.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/index_selfuncs.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + + +/* + * SP-GiST handler function: return IndexAmRoutine with access method parameters + * and callbacks. + */ +Datum +spghandler(PG_FUNCTION_ARGS) +{ + IndexAmRoutine *amroutine = makeNode(IndexAmRoutine); + + amroutine->amstrategies = 0; + amroutine->amsupport = SPGISTNProc; + amroutine->amoptsprocnum = SPGIST_OPTIONS_PROC; + amroutine->amcanorder = false; + amroutine->amcanorderbyop = true; + amroutine->amcanbackward = false; + amroutine->amcanunique = false; + amroutine->amcanmulticol = false; + amroutine->amoptionalkey = true; + amroutine->amsearcharray = false; + amroutine->amsearchnulls = true; + amroutine->amstorage = true; + amroutine->amclusterable = false; + amroutine->ampredlocks = false; + amroutine->amcanparallel = false; + amroutine->amcaninclude = true; + amroutine->amusemaintenanceworkmem = false; + amroutine->amparallelvacuumoptions = + VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP; + amroutine->amkeytype = InvalidOid; + + amroutine->ambuild = spgbuild; + amroutine->ambuildempty = spgbuildempty; + amroutine->aminsert = spginsert; + amroutine->ambulkdelete = spgbulkdelete; + amroutine->amvacuumcleanup = spgvacuumcleanup; + amroutine->amcanreturn = spgcanreturn; + amroutine->amcostestimate = spgcostestimate; + amroutine->amoptions = spgoptions; + amroutine->amproperty = spgproperty; + amroutine->ambuildphasename = NULL; + amroutine->amvalidate = spgvalidate; + amroutine->amadjustmembers = spgadjustmembers; + amroutine->ambeginscan = spgbeginscan; + amroutine->amrescan = spgrescan; + amroutine->amgettuple = spggettuple; + amroutine->amgetbitmap = spggetbitmap; + amroutine->amendscan = spgendscan; + amroutine->ammarkpos = NULL; + amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; + + PG_RETURN_POINTER(amroutine); +} + +/* + * GetIndexInputType + * Determine the nominal input data type for an index column + * + * We define the "nominal" input type as the associated opclass's opcintype, + * or if that is a polymorphic type, the base type of the heap column or + * expression that is the index's input. The reason for preferring the + * opcintype is that non-polymorphic opclasses probably don't want to hear + * about binary-compatible input types. For instance, if a text opclass + * is being used with a varchar heap column, we want to report "text" not + * "varchar". Likewise, opclasses don't want to hear about domain types, + * so if we do consult the actual input type, we make sure to flatten domains. + * + * At some point maybe this should go somewhere else, but it's not clear + * if any other index AMs have a use for it. + */ +static Oid +GetIndexInputType(Relation index, AttrNumber indexcol) +{ + Oid opcintype; + AttrNumber heapcol; + List *indexprs; + ListCell *indexpr_item; + + Assert(index->rd_index != NULL); + Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts); + opcintype = index->rd_opcintype[indexcol - 1]; + if (!IsPolymorphicType(opcintype)) + return opcintype; + heapcol = index->rd_index->indkey.values[indexcol - 1]; + if (heapcol != 0) /* Simple index column? */ + return getBaseType(get_atttype(index->rd_index->indrelid, heapcol)); + + /* + * If the index expressions are already cached, skip calling + * RelationGetIndexExpressions, as it will make a copy which is overkill. + * We're not going to modify the trees, and we're not going to do anything + * that would invalidate the relcache entry before we're done. + */ + if (index->rd_indexprs) + indexprs = index->rd_indexprs; + else + indexprs = RelationGetIndexExpressions(index); + indexpr_item = list_head(indexprs); + for (int i = 1; i <= index->rd_index->indnkeyatts; i++) + { + if (index->rd_index->indkey.values[i - 1] == 0) + { + /* expression column */ + if (indexpr_item == NULL) + elog(ERROR, "wrong number of index expressions"); + if (i == indexcol) + return getBaseType(exprType((Node *) lfirst(indexpr_item))); + indexpr_item = lnext(indexprs, indexpr_item); + } + } + elog(ERROR, "wrong number of index expressions"); + return InvalidOid; /* keep compiler quiet */ +} + +/* Fill in a SpGistTypeDesc struct with info about the specified data type */ +static void +fillTypeDesc(SpGistTypeDesc *desc, Oid type) +{ + HeapTuple tp; + Form_pg_type typtup; + + desc->type = type; + tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for type %u", type); + typtup = (Form_pg_type) GETSTRUCT(tp); + desc->attlen = typtup->typlen; + desc->attbyval = typtup->typbyval; + desc->attalign = typtup->typalign; + desc->attstorage = typtup->typstorage; + ReleaseSysCache(tp); +} + +/* + * Fetch local cache of AM-specific info about the index, initializing it + * if necessary + */ +SpGistCache * +spgGetCache(Relation index) +{ + SpGistCache *cache; + + if (index->rd_amcache == NULL) + { + Oid atttype; + spgConfigIn in; + FmgrInfo *procinfo; + Buffer metabuffer; + SpGistMetaPageData *metadata; + + cache = MemoryContextAllocZero(index->rd_indexcxt, + sizeof(SpGistCache)); + + /* SPGiST must have one key column and can also have INCLUDE columns */ + Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1); + Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS); + + /* + * Get the actual (well, nominal) data type of the key column. We + * pass this to the opclass config function so that polymorphic + * opclasses are possible. + */ + atttype = GetIndexInputType(index, spgKeyColumn + 1); + + /* Call the config function to get config info for the opclass */ + in.attType = atttype; + + procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC); + FunctionCall2Coll(procinfo, + index->rd_indcollation[spgKeyColumn], + PointerGetDatum(&in), + PointerGetDatum(&cache->config)); + + /* + * If leafType isn't specified, use the declared index column type, + * which index.c will have derived from the opclass's opcintype. + * (Although we now make spgvalidate.c warn if these aren't the same, + * old user-defined opclasses may not set the STORAGE parameter + * correctly, so believe leafType if it's given.) + */ + if (!OidIsValid(cache->config.leafType)) + { + cache->config.leafType = + TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid; + + /* + * If index column type is binary-coercible to atttype (for + * example, it's a domain over atttype), treat it as plain atttype + * to avoid thinking we need to compress. + */ + if (cache->config.leafType != atttype && + IsBinaryCoercible(cache->config.leafType, atttype)) + cache->config.leafType = atttype; + } + + /* Get the information we need about each relevant datatype */ + fillTypeDesc(&cache->attType, atttype); + + if (cache->config.leafType != atttype) + { + if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("compress method must be defined when leaf type is different from input type"))); + + fillTypeDesc(&cache->attLeafType, cache->config.leafType); + } + else + { + /* Save lookups in this common case */ + cache->attLeafType = cache->attType; + } + + fillTypeDesc(&cache->attPrefixType, cache->config.prefixType); + fillTypeDesc(&cache->attLabelType, cache->config.labelType); + + /* Last, get the lastUsedPages data from the metapage */ + metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO); + LockBuffer(metabuffer, BUFFER_LOCK_SHARE); + + metadata = SpGistPageGetMeta(BufferGetPage(metabuffer)); + + if (metadata->magicNumber != SPGIST_MAGIC_NUMBER) + elog(ERROR, "index \"%s\" is not an SP-GiST index", + RelationGetRelationName(index)); + + cache->lastUsedPages = metadata->lastUsedPages; + + UnlockReleaseBuffer(metabuffer); + + index->rd_amcache = (void *) cache; + } + else + { + /* assume it's up to date */ + cache = (SpGistCache *) index->rd_amcache; + } + + return cache; +} + +/* + * Compute a tuple descriptor for leaf tuples or index-only-scan result tuples. + * + * We can use the relcache's tupdesc as-is in many cases, and it's always + * OK so far as any INCLUDE columns are concerned. However, the entry for + * the key column has to match leafType in the first case or attType in the + * second case. While the relcache's tupdesc *should* show leafType, this + * might not hold for legacy user-defined opclasses, since before v14 they + * were not allowed to declare their true storage type in CREATE OPCLASS. + * Also, attType can be different from what is in the relcache. + * + * This function gives back either a pointer to the relcache's tupdesc + * if that is suitable, or a palloc'd copy that's been adjusted to match + * the specified key column type. We can avoid doing any catalog lookups + * here by insisting that the caller pass an SpGistTypeDesc not just an OID. + */ +TupleDesc +getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType) +{ + TupleDesc outTupDesc; + Form_pg_attribute att; + + if (keyType->type == + TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid) + outTupDesc = RelationGetDescr(index); + else + { + outTupDesc = CreateTupleDescCopy(RelationGetDescr(index)); + att = TupleDescAttr(outTupDesc, spgKeyColumn); + /* It's sufficient to update the type-dependent fields of the column */ + att->atttypid = keyType->type; + att->atttypmod = -1; + att->attlen = keyType->attlen; + att->attbyval = keyType->attbyval; + att->attalign = keyType->attalign; + att->attstorage = keyType->attstorage; + /* We shouldn't need to bother with making these valid: */ + att->attcompression = InvalidCompressionMethod; + att->attcollation = InvalidOid; + /* In case we changed typlen, we'd better reset following offsets */ + for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++) + TupleDescAttr(outTupDesc, i)->attcacheoff = -1; + } + return outTupDesc; +} + +/* Initialize SpGistState for working with the given index */ +void +initSpGistState(SpGistState *state, Relation index) +{ + SpGistCache *cache; + + state->index = index; + + /* Get cached static information about index */ + cache = spgGetCache(index); + + state->config = cache->config; + state->attType = cache->attType; + state->attLeafType = cache->attLeafType; + state->attPrefixType = cache->attPrefixType; + state->attLabelType = cache->attLabelType; + + /* Ensure we have a valid descriptor for leaf tuples */ + state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType); + + /* Make workspace for constructing dead tuples */ + state->deadTupleStorage = palloc0(SGDTSIZE); + + /* Set XID to use in redirection tuples */ + state->myXid = GetTopTransactionIdIfAny(); + + /* Assume we're not in an index build (spgbuild will override) */ + state->isBuild = false; +} + +/* + * Allocate a new page (either by recycling, or by extending the index file). + * + * The returned buffer is already pinned and exclusive-locked. + * Caller is responsible for initializing the page by calling SpGistInitBuffer. + */ +Buffer +SpGistNewBuffer(Relation index) +{ + Buffer buffer; + bool needLock; + + /* First, try to get a page from FSM */ + for (;;) + { + BlockNumber blkno = GetFreeIndexPage(index); + + if (blkno == InvalidBlockNumber) + break; /* nothing known to FSM */ + + /* + * The fixed pages shouldn't ever be listed in FSM, but just in case + * one is, ignore it. + */ + if (SpGistBlockIsFixed(blkno)) + continue; + + buffer = ReadBuffer(index, blkno); + + /* + * We have to guard against the possibility that someone else already + * recycled this page; the buffer may be locked if so. + */ + if (ConditionalLockBuffer(buffer)) + { + Page page = BufferGetPage(buffer); + + if (PageIsNew(page)) + return buffer; /* OK to use, if never initialized */ + + if (SpGistPageIsDeleted(page) || PageIsEmpty(page)) + return buffer; /* OK to use */ + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + } + + /* Can't use it, so release buffer and try again */ + ReleaseBuffer(buffer); + } + + /* Must extend the file */ + needLock = !RELATION_IS_LOCAL(index); + if (needLock) + LockRelationForExtension(index, ExclusiveLock); + + buffer = ReadBuffer(index, P_NEW); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + if (needLock) + UnlockRelationForExtension(index, ExclusiveLock); + + return buffer; +} + +/* + * Update index metapage's lastUsedPages info from local cache, if possible + * + * Updating meta page isn't critical for index working, so + * 1 use ConditionalLockBuffer to improve concurrency + * 2 don't WAL-log metabuffer changes to decrease WAL traffic + */ +void +SpGistUpdateMetaPage(Relation index) +{ + SpGistCache *cache = (SpGistCache *) index->rd_amcache; + + if (cache != NULL) + { + Buffer metabuffer; + + metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO); + + if (ConditionalLockBuffer(metabuffer)) + { + Page metapage = BufferGetPage(metabuffer); + SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage); + + metadata->lastUsedPages = cache->lastUsedPages; + + /* + * Set pd_lower just past the end of the metadata. This is + * essential, because without doing so, metadata will be lost if + * xlog.c compresses the page. (We must do this here because + * pre-v11 versions of PG did not set the metapage's pd_lower + * correctly, so a pg_upgraded index might contain the wrong + * value.) + */ + ((PageHeader) metapage)->pd_lower = + ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage; + + MarkBufferDirty(metabuffer); + UnlockReleaseBuffer(metabuffer); + } + else + { + ReleaseBuffer(metabuffer); + } + } +} + +/* Macro to select proper element of lastUsedPages cache depending on flags */ +/* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */ +#define GET_LUP(c, f) (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES]) + +/* + * Allocate and initialize a new buffer of the type and parity specified by + * flags. The returned buffer is already pinned and exclusive-locked. + * + * When requesting an inner page, if we get one with the wrong parity, + * we just release the buffer and try again. We will get a different page + * because GetFreeIndexPage will have marked the page used in FSM. The page + * is entered in our local lastUsedPages cache, so there's some hope of + * making use of it later in this session, but otherwise we rely on VACUUM + * to eventually re-enter the page in FSM, making it available for recycling. + * Note that such a page does not get marked dirty here, so unless it's used + * fairly soon, the buffer will just get discarded and the page will remain + * as it was on disk. + * + * When we return a buffer to the caller, the page is *not* entered into + * the lastUsedPages cache; we expect the caller will do so after it's taken + * whatever space it will use. This is because after the caller has used up + * some space, the page might have less space than whatever was cached already + * so we'd rather not trash the old cache entry. + */ +static Buffer +allocNewBuffer(Relation index, int flags) +{ + SpGistCache *cache = spgGetCache(index); + uint16 pageflags = 0; + + if (GBUF_REQ_LEAF(flags)) + pageflags |= SPGIST_LEAF; + if (GBUF_REQ_NULLS(flags)) + pageflags |= SPGIST_NULLS; + + for (;;) + { + Buffer buffer; + + buffer = SpGistNewBuffer(index); + SpGistInitBuffer(buffer, pageflags); + + if (pageflags & SPGIST_LEAF) + { + /* Leaf pages have no parity concerns, so just use it */ + return buffer; + } + else + { + BlockNumber blkno = BufferGetBlockNumber(buffer); + int blkFlags = GBUF_INNER_PARITY(blkno); + + if ((flags & GBUF_PARITY_MASK) == blkFlags) + { + /* Page has right parity, use it */ + return buffer; + } + else + { + /* Page has wrong parity, record it in cache and try again */ + if (pageflags & SPGIST_NULLS) + blkFlags |= GBUF_NULLS; + cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno; + cache->lastUsedPages.cachedPage[blkFlags].freeSpace = + PageGetExactFreeSpace(BufferGetPage(buffer)); + UnlockReleaseBuffer(buffer); + } + } + } +} + +/* + * Get a buffer of the type and parity specified by flags, having at least + * as much free space as indicated by needSpace. We use the lastUsedPages + * cache to assign the same buffer previously requested when possible. + * The returned buffer is already pinned and exclusive-locked. + * + * *isNew is set true if the page was initialized here, false if it was + * already valid. + */ +Buffer +SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew) +{ + SpGistCache *cache = spgGetCache(index); + SpGistLastUsedPage *lup; + + /* Bail out if even an empty page wouldn't meet the demand */ + if (needSpace > SPGIST_PAGE_CAPACITY) + elog(ERROR, "desired SPGiST tuple size is too big"); + + /* + * If possible, increase the space request to include relation's + * fillfactor. This ensures that when we add unrelated tuples to a page, + * we try to keep 100-fillfactor% available for adding tuples that are + * related to the ones already on it. But fillfactor mustn't cause an + * error for requests that would otherwise be legal. + */ + needSpace += SpGistGetTargetPageFreeSpace(index); + needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY); + + /* Get the cache entry for this flags setting */ + lup = GET_LUP(cache, flags); + + /* If we have nothing cached, just turn it over to allocNewBuffer */ + if (lup->blkno == InvalidBlockNumber) + { + *isNew = true; + return allocNewBuffer(index, flags); + } + + /* fixed pages should never be in cache */ + Assert(!SpGistBlockIsFixed(lup->blkno)); + + /* If cached freeSpace isn't enough, don't bother looking at the page */ + if (lup->freeSpace >= needSpace) + { + Buffer buffer; + Page page; + + buffer = ReadBuffer(index, lup->blkno); + + if (!ConditionalLockBuffer(buffer)) + { + /* + * buffer is locked by another process, so return a new buffer + */ + ReleaseBuffer(buffer); + *isNew = true; + return allocNewBuffer(index, flags); + } + + page = BufferGetPage(buffer); + + if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page)) + { + /* OK to initialize the page */ + uint16 pageflags = 0; + + if (GBUF_REQ_LEAF(flags)) + pageflags |= SPGIST_LEAF; + if (GBUF_REQ_NULLS(flags)) + pageflags |= SPGIST_NULLS; + SpGistInitBuffer(buffer, pageflags); + lup->freeSpace = PageGetExactFreeSpace(page) - needSpace; + *isNew = true; + return buffer; + } + + /* + * Check that page is of right type and has enough space. We must + * recheck this since our cache isn't necessarily up to date. + */ + if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) && + (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page))) + { + int freeSpace = PageGetExactFreeSpace(page); + + if (freeSpace >= needSpace) + { + /* Success, update freespace info and return the buffer */ + lup->freeSpace = freeSpace - needSpace; + *isNew = false; + return buffer; + } + } + + /* + * fallback to allocation of new buffer + */ + UnlockReleaseBuffer(buffer); + } + + /* No success with cache, so return a new buffer */ + *isNew = true; + return allocNewBuffer(index, flags); +} + +/* + * Update lastUsedPages cache when done modifying a page. + * + * We update the appropriate cache entry if it already contained this page + * (its freeSpace is likely obsolete), or if this page has more space than + * whatever we had cached. + */ +void +SpGistSetLastUsedPage(Relation index, Buffer buffer) +{ + SpGistCache *cache = spgGetCache(index); + SpGistLastUsedPage *lup; + int freeSpace; + Page page = BufferGetPage(buffer); + BlockNumber blkno = BufferGetBlockNumber(buffer); + int flags; + + /* Never enter fixed pages (root pages) in cache, though */ + if (SpGistBlockIsFixed(blkno)) + return; + + if (SpGistPageIsLeaf(page)) + flags = GBUF_LEAF; + else + flags = GBUF_INNER_PARITY(blkno); + if (SpGistPageStoresNulls(page)) + flags |= GBUF_NULLS; + + lup = GET_LUP(cache, flags); + + freeSpace = PageGetExactFreeSpace(page); + if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno || + lup->freeSpace < freeSpace) + { + lup->blkno = blkno; + lup->freeSpace = freeSpace; + } +} + +/* + * Initialize an SPGiST page to empty, with specified flags + */ +void +SpGistInitPage(Page page, uint16 f) +{ + SpGistPageOpaque opaque; + + PageInit(page, BLCKSZ, sizeof(SpGistPageOpaqueData)); + opaque = SpGistPageGetOpaque(page); + opaque->flags = f; + opaque->spgist_page_id = SPGIST_PAGE_ID; +} + +/* + * Initialize a buffer's page to empty, with specified flags + */ +void +SpGistInitBuffer(Buffer b, uint16 f) +{ + Assert(BufferGetPageSize(b) == BLCKSZ); + SpGistInitPage(BufferGetPage(b), f); +} + +/* + * Initialize metadata page + */ +void +SpGistInitMetapage(Page page) +{ + SpGistMetaPageData *metadata; + int i; + + SpGistInitPage(page, SPGIST_META); + metadata = SpGistPageGetMeta(page); + memset(metadata, 0, sizeof(SpGistMetaPageData)); + metadata->magicNumber = SPGIST_MAGIC_NUMBER; + + /* initialize last-used-page cache to empty */ + for (i = 0; i < SPGIST_CACHED_PAGES; i++) + metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber; + + /* + * Set pd_lower just past the end of the metadata. This is essential, + * because without doing so, metadata will be lost if xlog.c compresses + * the page. + */ + ((PageHeader) page)->pd_lower = + ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page; +} + +/* + * reloptions processing for SPGiST + */ +bytea * +spgoptions(Datum reloptions, bool validate) +{ + static const relopt_parse_elt tab[] = { + {"fillfactor", RELOPT_TYPE_INT, offsetof(SpGistOptions, fillfactor)}, + }; + + return (bytea *) build_reloptions(reloptions, validate, + RELOPT_KIND_SPGIST, + sizeof(SpGistOptions), + tab, lengthof(tab)); + +} + +/* + * Get the space needed to store a non-null datum of the indicated type + * in an inner tuple (that is, as a prefix or node label). + * Note the result is already rounded up to a MAXALIGN boundary. + * Here we follow the convention that pass-by-val types are just stored + * in their Datum representation (compare memcpyInnerDatum). + */ +unsigned int +SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum) +{ + unsigned int size; + + if (att->attbyval) + size = sizeof(Datum); + else if (att->attlen > 0) + size = att->attlen; + else + size = VARSIZE_ANY(datum); + + return MAXALIGN(size); +} + +/* + * Copy the given non-null datum to *target, in the inner-tuple case + */ +static void +memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum) +{ + unsigned int size; + + if (att->attbyval) + { + memcpy(target, &datum, sizeof(Datum)); + } + else + { + size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum); + memcpy(target, DatumGetPointer(datum), size); + } +} + +/* + * Compute space required for a leaf tuple holding the given data. + * + * This must match the size-calculation portion of spgFormLeafTuple. + */ +Size +SpGistGetLeafTupleSize(TupleDesc tupleDescriptor, + Datum *datums, bool *isnulls) +{ + Size size; + Size data_size; + bool needs_null_mask = false; + int natts = tupleDescriptor->natts; + + /* + * Decide whether we need a nulls bitmask. + * + * If there is only a key attribute (natts == 1), never use a bitmask, for + * compatibility with the pre-v14 layout of leaf tuples. Otherwise, we + * need one if any attribute is null. + */ + if (natts > 1) + { + for (int i = 0; i < natts; i++) + { + if (isnulls[i]) + { + needs_null_mask = true; + break; + } + } + } + + /* + * Calculate size of the data part; same as for heap tuples. + */ + data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls); + + /* + * Compute total size. + */ + size = SGLTHDRSZ(needs_null_mask); + size += data_size; + size = MAXALIGN(size); + + /* + * Ensure that we can replace the tuple with a dead tuple later. This test + * is unnecessary when there are any non-null attributes, but be safe. + */ + if (size < SGDTSIZE) + size = SGDTSIZE; + + return size; +} + +/* + * Construct a leaf tuple containing the given heap TID and datum values + */ +SpGistLeafTuple +spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr, + Datum *datums, bool *isnulls) +{ + SpGistLeafTuple tup; + TupleDesc tupleDescriptor = state->leafTupDesc; + Size size; + Size hoff; + Size data_size; + bool needs_null_mask = false; + int natts = tupleDescriptor->natts; + char *tp; /* ptr to tuple data */ + uint16 tupmask = 0; /* unused heap_fill_tuple output */ + + /* + * Decide whether we need a nulls bitmask. + * + * If there is only a key attribute (natts == 1), never use a bitmask, for + * compatibility with the pre-v14 layout of leaf tuples. Otherwise, we + * need one if any attribute is null. + */ + if (natts > 1) + { + for (int i = 0; i < natts; i++) + { + if (isnulls[i]) + { + needs_null_mask = true; + break; + } + } + } + + /* + * Calculate size of the data part; same as for heap tuples. + */ + data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls); + + /* + * Compute total size. + */ + hoff = SGLTHDRSZ(needs_null_mask); + size = hoff + data_size; + size = MAXALIGN(size); + + /* + * Ensure that we can replace the tuple with a dead tuple later. This test + * is unnecessary when there are any non-null attributes, but be safe. + */ + if (size < SGDTSIZE) + size = SGDTSIZE; + + /* OK, form the tuple */ + tup = (SpGistLeafTuple) palloc0(size); + + tup->size = size; + SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber); + tup->heapPtr = *heapPtr; + + tp = (char *) tup + hoff; + + if (needs_null_mask) + { + bits8 *bp; /* ptr to null bitmap in tuple */ + + /* Set nullmask presence bit in SpGistLeafTuple header */ + SGLT_SET_HASNULLMASK(tup, true); + /* Fill the data area and null mask */ + bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData)); + heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size, + &tupmask, bp); + } + else if (natts > 1 || !isnulls[spgKeyColumn]) + { + /* Fill data area only */ + heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size, + &tupmask, (bits8 *) NULL); + } + /* otherwise we have no data, nor a bitmap, to fill */ + + return tup; +} + +/* + * Construct a node (to go into an inner tuple) containing the given label + * + * Note that the node's downlink is just set invalid here. Caller will fill + * it in later. + */ +SpGistNodeTuple +spgFormNodeTuple(SpGistState *state, Datum label, bool isnull) +{ + SpGistNodeTuple tup; + unsigned int size; + unsigned short infomask = 0; + + /* compute space needed (note result is already maxaligned) */ + size = SGNTHDRSZ; + if (!isnull) + size += SpGistGetInnerTypeSize(&state->attLabelType, label); + + /* + * Here we make sure that the size will fit in the field reserved for it + * in t_info. + */ + if ((size & INDEX_SIZE_MASK) != size) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index row requires %zu bytes, maximum size is %zu", + (Size) size, (Size) INDEX_SIZE_MASK))); + + tup = (SpGistNodeTuple) palloc0(size); + + if (isnull) + infomask |= INDEX_NULL_MASK; + /* we don't bother setting the INDEX_VAR_MASK bit */ + infomask |= size; + tup->t_info = infomask; + + /* The TID field will be filled in later */ + ItemPointerSetInvalid(&tup->t_tid); + + if (!isnull) + memcpyInnerDatum(SGNTDATAPTR(tup), &state->attLabelType, label); + + return tup; +} + +/* + * Construct an inner tuple containing the given prefix and node array + */ +SpGistInnerTuple +spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix, + int nNodes, SpGistNodeTuple *nodes) +{ + SpGistInnerTuple tup; + unsigned int size; + unsigned int prefixSize; + int i; + char *ptr; + + /* Compute size needed */ + if (hasPrefix) + prefixSize = SpGistGetInnerTypeSize(&state->attPrefixType, prefix); + else + prefixSize = 0; + + size = SGITHDRSZ + prefixSize; + + /* Note: we rely on node tuple sizes to be maxaligned already */ + for (i = 0; i < nNodes; i++) + size += IndexTupleSize(nodes[i]); + + /* + * Ensure that we can replace the tuple with a dead tuple later. This + * test is unnecessary given current tuple layouts, but let's be safe. + */ + if (size < SGDTSIZE) + size = SGDTSIZE; + + /* + * Inner tuple should be small enough to fit on a page + */ + if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu", + (Size) size, + SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)), + errhint("Values larger than a buffer page cannot be indexed."))); + + /* + * Check for overflow of header fields --- probably can't fail if the + * above succeeded, but let's be paranoid + */ + if (size > SGITMAXSIZE || + prefixSize > SGITMAXPREFIXSIZE || + nNodes > SGITMAXNNODES) + elog(ERROR, "SPGiST inner tuple header field is too small"); + + /* OK, form the tuple */ + tup = (SpGistInnerTuple) palloc0(size); + + tup->nNodes = nNodes; + tup->prefixSize = prefixSize; + tup->size = size; + + if (hasPrefix) + memcpyInnerDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix); + + ptr = (char *) SGITNODEPTR(tup); + + for (i = 0; i < nNodes; i++) + { + SpGistNodeTuple node = nodes[i]; + + memcpy(ptr, node, IndexTupleSize(node)); + ptr += IndexTupleSize(node); + } + + return tup; +} + +/* + * Construct a "dead" tuple to replace a tuple being deleted. + * + * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER. + * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and + * the xid field is filled in automatically. + * + * This is called in critical sections, so we don't use palloc; the tuple + * is built in preallocated storage. It should be copied before another + * call with different parameters can occur. + */ +SpGistDeadTuple +spgFormDeadTuple(SpGistState *state, int tupstate, + BlockNumber blkno, OffsetNumber offnum) +{ + SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage; + + tuple->tupstate = tupstate; + tuple->size = SGDTSIZE; + SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber); + + if (tupstate == SPGIST_REDIRECT) + { + ItemPointerSet(&tuple->pointer, blkno, offnum); + Assert(TransactionIdIsValid(state->myXid)); + tuple->xid = state->myXid; + } + else + { + ItemPointerSetInvalid(&tuple->pointer); + tuple->xid = InvalidTransactionId; + } + + return tuple; +} + +/* + * Convert an SPGiST leaf tuple into Datum/isnull arrays. + * + * The caller must allocate sufficient storage for the output arrays. + * (INDEX_MAX_KEYS entries should be enough.) + */ +void +spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor, + Datum *datums, bool *isnulls, bool keyColumnIsNull) +{ + bool hasNullsMask = SGLT_GET_HASNULLMASK(tup); + char *tp; /* ptr to tuple data */ + bits8 *bp; /* ptr to null bitmap in tuple */ + + if (keyColumnIsNull && tupleDescriptor->natts == 1) + { + /* + * Trivial case: there is only the key attribute and we're in a nulls + * tree. The hasNullsMask bit in the tuple header should not be set + * (and thus we can't use index_deform_tuple_internal), but + * nonetheless the result is NULL. + * + * Note: currently this is dead code, because noplace calls this when + * there is only the key attribute. But we should cover the case. + */ + Assert(!hasNullsMask); + + datums[spgKeyColumn] = (Datum) 0; + isnulls[spgKeyColumn] = true; + return; + } + + tp = (char *) tup + SGLTHDRSZ(hasNullsMask); + bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData)); + + index_deform_tuple_internal(tupleDescriptor, + datums, isnulls, + tp, bp, hasNullsMask); + + /* + * Key column isnull value from the tuple should be consistent with + * keyColumnIsNull flag from the caller. + */ + Assert(keyColumnIsNull == isnulls[spgKeyColumn]); +} + +/* + * Extract the label datums of the nodes within innerTuple + * + * Returns NULL if label datums are NULLs + */ +Datum * +spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple) +{ + Datum *nodeLabels; + int i; + SpGistNodeTuple node; + + /* Either all the labels must be NULL, or none. */ + node = SGITNODEPTR(innerTuple); + if (IndexTupleHasNulls(node)) + { + SGITITERATE(innerTuple, i, node) + { + if (!IndexTupleHasNulls(node)) + elog(ERROR, "some but not all node labels are null in SPGiST inner tuple"); + } + /* They're all null, so just return NULL */ + return NULL; + } + else + { + nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes); + SGITITERATE(innerTuple, i, node) + { + if (IndexTupleHasNulls(node)) + elog(ERROR, "some but not all node labels are null in SPGiST inner tuple"); + nodeLabels[i] = SGNTDATUM(node, state); + } + return nodeLabels; + } +} + +/* + * Add a new item to the page, replacing a PLACEHOLDER item if possible. + * Return the location it's inserted at, or InvalidOffsetNumber on failure. + * + * If startOffset isn't NULL, we start searching for placeholders at + * *startOffset, and update that to the next place to search. This is just + * an optimization for repeated insertions. + * + * If errorOK is false, we throw error when there's not enough room, + * rather than returning InvalidOffsetNumber. + */ +OffsetNumber +SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size, + OffsetNumber *startOffset, bool errorOK) +{ + SpGistPageOpaque opaque = SpGistPageGetOpaque(page); + OffsetNumber i, + maxoff, + offnum; + + if (opaque->nPlaceholder > 0 && + PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size)) + { + /* Try to replace a placeholder */ + maxoff = PageGetMaxOffsetNumber(page); + offnum = InvalidOffsetNumber; + + for (;;) + { + if (startOffset && *startOffset != InvalidOffsetNumber) + i = *startOffset; + else + i = FirstOffsetNumber; + for (; i <= maxoff; i++) + { + SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page, + PageGetItemId(page, i)); + + if (it->tupstate == SPGIST_PLACEHOLDER) + { + offnum = i; + break; + } + } + + /* Done if we found a placeholder */ + if (offnum != InvalidOffsetNumber) + break; + + if (startOffset && *startOffset != InvalidOffsetNumber) + { + /* Hint was no good, re-search from beginning */ + *startOffset = InvalidOffsetNumber; + continue; + } + + /* Hmm, no placeholder found? */ + opaque->nPlaceholder = 0; + break; + } + + if (offnum != InvalidOffsetNumber) + { + /* Replace the placeholder tuple */ + PageIndexTupleDelete(page, offnum); + + offnum = PageAddItem(page, item, size, offnum, false, false); + + /* + * We should not have failed given the size check at the top of + * the function, but test anyway. If we did fail, we must PANIC + * because we've already deleted the placeholder tuple, and + * there's no other way to keep the damage from getting to disk. + */ + if (offnum != InvalidOffsetNumber) + { + Assert(opaque->nPlaceholder > 0); + opaque->nPlaceholder--; + if (startOffset) + *startOffset = offnum + 1; + } + else + elog(PANIC, "failed to add item of size %u to SPGiST index page", + (int) size); + + return offnum; + } + } + + /* No luck in replacing a placeholder, so just add it to the page */ + offnum = PageAddItem(page, item, size, + InvalidOffsetNumber, false, false); + + if (offnum == InvalidOffsetNumber && !errorOK) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + (int) size); + + return offnum; +} + +/* + * spgproperty() -- Check boolean properties of indexes. + * + * This is optional for most AMs, but is required for SP-GiST because the core + * property code doesn't support AMPROP_DISTANCE_ORDERABLE. + */ +bool +spgproperty(Oid index_oid, int attno, + IndexAMProperty prop, const char *propname, + bool *res, bool *isnull) +{ + Oid opclass, + opfamily, + opcintype; + CatCList *catlist; + int i; + + /* Only answer column-level inquiries */ + if (attno == 0) + return false; + + switch (prop) + { + case AMPROP_DISTANCE_ORDERABLE: + break; + default: + return false; + } + + /* + * Currently, SP-GiST distance-ordered scans require that there be a + * distance operator in the opclass with the default types. So we assume + * that if such a operator exists, then there's a reason for it. + */ + + /* First we need to know the column's opclass. */ + opclass = get_index_column_opclass(index_oid, attno); + if (!OidIsValid(opclass)) + { + *isnull = true; + return true; + } + + /* Now look up the opclass family and input datatype. */ + if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype)) + { + *isnull = true; + return true; + } + + /* And now we can check whether the operator is provided. */ + catlist = SearchSysCacheList1(AMOPSTRATEGY, + ObjectIdGetDatum(opfamily)); + + *res = false; + + for (i = 0; i < catlist->n_members; i++) + { + HeapTuple amoptup = &catlist->members[i]->tuple; + Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup); + + if (amopform->amoppurpose == AMOP_ORDER && + (amopform->amoplefttype == opcintype || + amopform->amoprighttype == opcintype) && + opfamily_can_sort_type(amopform->amopsortfamily, + get_op_rettype(amopform->amopopr))) + { + *res = true; + break; + } + } + + ReleaseSysCacheList(catlist); + + *isnull = false; + + return true; +} diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c new file mode 100644 index 0000000..76fb037 --- /dev/null +++ b/src/backend/access/spgist/spgvacuum.c @@ -0,0 +1,975 @@ +/*------------------------------------------------------------------------- + * + * spgvacuum.c + * vacuum for SP-GiST + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgvacuum.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/spgist_private.h" +#include "access/spgxlog.h" +#include "access/transam.h" +#include "access/xloginsert.h" +#include "catalog/storage_xlog.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/indexfsm.h" +#include "storage/lmgr.h" +#include "utils/snapmgr.h" + + +/* Entry in pending-list of TIDs we need to revisit */ +typedef struct spgVacPendingItem +{ + ItemPointerData tid; /* redirection target to visit */ + bool done; /* have we dealt with this? */ + struct spgVacPendingItem *next; /* list link */ +} spgVacPendingItem; + +/* Local state for vacuum operations */ +typedef struct spgBulkDeleteState +{ + /* Parameters passed in to spgvacuumscan */ + IndexVacuumInfo *info; + IndexBulkDeleteResult *stats; + IndexBulkDeleteCallback callback; + void *callback_state; + + /* Additional working state */ + SpGistState spgstate; /* for SPGiST operations that need one */ + spgVacPendingItem *pendingList; /* TIDs we need to (re)visit */ + TransactionId myXmin; /* for detecting newly-added redirects */ + BlockNumber lastFilledBlock; /* last non-deletable block */ +} spgBulkDeleteState; + + +/* + * Add TID to pendingList, but only if not already present. + * + * Note that new items are always appended at the end of the list; this + * ensures that scans of the list don't miss items added during the scan. + */ +static void +spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid) +{ + spgVacPendingItem *pitem; + spgVacPendingItem **listLink; + + /* search the list for pre-existing entry */ + listLink = &bds->pendingList; + while (*listLink != NULL) + { + pitem = *listLink; + if (ItemPointerEquals(tid, &pitem->tid)) + return; /* already in list, do nothing */ + listLink = &pitem->next; + } + /* not there, so append new entry */ + pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem)); + pitem->tid = *tid; + pitem->done = false; + pitem->next = NULL; + *listLink = pitem; +} + +/* + * Clear pendingList + */ +static void +spgClearPendingList(spgBulkDeleteState *bds) +{ + spgVacPendingItem *pitem; + spgVacPendingItem *nitem; + + for (pitem = bds->pendingList; pitem != NULL; pitem = nitem) + { + nitem = pitem->next; + /* All items in list should have been dealt with */ + Assert(pitem->done); + pfree(pitem); + } + bds->pendingList = NULL; +} + +/* + * Vacuum a regular (non-root) leaf page + * + * We must delete tuples that are targeted for deletion by the VACUUM, + * but not move any tuples that are referenced by outside links; we assume + * those are the ones that are heads of chains. + * + * If we find a REDIRECT that was made by a concurrently-running transaction, + * we must add its target TID to pendingList. (We don't try to visit the + * target immediately, first because we don't want VACUUM locking more than + * one buffer at a time, and second because the duplicate-filtering logic + * in spgAddPendingTID is useful to ensure we can't get caught in an infinite + * loop in the face of continuous concurrent insertions.) + * + * If forPending is true, we are examining the page as a consequence of + * chasing a redirect link, not as part of the normal sequential scan. + * We still vacuum the page normally, but we don't increment the stats + * about live tuples; else we'd double-count those tuples, since the page + * has been or will be visited in the sequential scan as well. + */ +static void +vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, + bool forPending) +{ + Page page = BufferGetPage(buffer); + spgxlogVacuumLeaf xlrec; + OffsetNumber toDead[MaxIndexTuplesPerPage]; + OffsetNumber toPlaceholder[MaxIndexTuplesPerPage]; + OffsetNumber moveSrc[MaxIndexTuplesPerPage]; + OffsetNumber moveDest[MaxIndexTuplesPerPage]; + OffsetNumber chainSrc[MaxIndexTuplesPerPage]; + OffsetNumber chainDest[MaxIndexTuplesPerPage]; + OffsetNumber predecessor[MaxIndexTuplesPerPage + 1]; + bool deletable[MaxIndexTuplesPerPage + 1]; + int nDeletable; + OffsetNumber i, + max = PageGetMaxOffsetNumber(page); + + memset(predecessor, 0, sizeof(predecessor)); + memset(deletable, 0, sizeof(deletable)); + nDeletable = 0; + + /* Scan page, identify tuples to delete, accumulate stats */ + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, i)); + if (lt->tupstate == SPGIST_LIVE) + { + Assert(ItemPointerIsValid(<->heapPtr)); + + if (bds->callback(<->heapPtr, bds->callback_state)) + { + bds->stats->tuples_removed += 1; + deletable[i] = true; + nDeletable++; + } + else + { + if (!forPending) + bds->stats->num_index_tuples += 1; + } + + /* Form predecessor map, too */ + if (SGLT_GET_NEXTOFFSET(lt) != InvalidOffsetNumber) + { + /* paranoia about corrupted chain links */ + if (SGLT_GET_NEXTOFFSET(lt) < FirstOffsetNumber || + SGLT_GET_NEXTOFFSET(lt) > max || + predecessor[SGLT_GET_NEXTOFFSET(lt)] != InvalidOffsetNumber) + elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"", + BufferGetBlockNumber(buffer), + RelationGetRelationName(index)); + predecessor[SGLT_GET_NEXTOFFSET(lt)] = i; + } + } + else if (lt->tupstate == SPGIST_REDIRECT) + { + SpGistDeadTuple dt = (SpGistDeadTuple) lt; + + Assert(SGLT_GET_NEXTOFFSET(dt) == InvalidOffsetNumber); + Assert(ItemPointerIsValid(&dt->pointer)); + + /* + * Add target TID to pending list if the redirection could have + * happened since VACUUM started. + * + * Note: we could make a tighter test by seeing if the xid is + * "running" according to the active snapshot; but snapmgr.c + * doesn't currently export a suitable API, and it's not entirely + * clear that a tighter test is worth the cycles anyway. + */ + if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin)) + spgAddPendingTID(bds, &dt->pointer); + } + else + { + Assert(SGLT_GET_NEXTOFFSET(lt) == InvalidOffsetNumber); + } + } + + if (nDeletable == 0) + return; /* nothing more to do */ + + /*---------- + * Figure out exactly what we have to do. We do this separately from + * actually modifying the page, mainly so that we have a representation + * that can be dumped into WAL and then the replay code can do exactly + * the same thing. The output of this step consists of six arrays + * describing four kinds of operations, to be performed in this order: + * + * toDead[]: tuple numbers to be replaced with DEAD tuples + * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples + * moveSrc[]: tuple numbers that need to be relocated to another offset + * (replacing the tuple there) and then replaced with PLACEHOLDER tuples + * moveDest[]: new locations for moveSrc tuples + * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates + * chainDest[]: new values of nextOffset for chainSrc members + * + * It's easiest to figure out what we have to do by processing tuple + * chains, so we iterate over all the tuples (not just the deletable + * ones!) to identify chain heads, then chase down each chain and make + * work item entries for deletable tuples within the chain. + *---------- + */ + xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0; + + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple head; + bool interveningDeletable; + OffsetNumber prevLive; + OffsetNumber j; + + head = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, i)); + if (head->tupstate != SPGIST_LIVE) + continue; /* can't be a chain member */ + if (predecessor[i] != 0) + continue; /* not a chain head */ + + /* initialize ... */ + interveningDeletable = false; + prevLive = deletable[i] ? InvalidOffsetNumber : i; + + /* scan down the chain ... */ + j = SGLT_GET_NEXTOFFSET(head); + while (j != InvalidOffsetNumber) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, j)); + if (lt->tupstate != SPGIST_LIVE) + { + /* all tuples in chain should be live */ + elog(ERROR, "unexpected SPGiST tuple state: %d", + lt->tupstate); + } + + if (deletable[j]) + { + /* This tuple should be replaced by a placeholder */ + toPlaceholder[xlrec.nPlaceholder] = j; + xlrec.nPlaceholder++; + /* previous live tuple's chain link will need an update */ + interveningDeletable = true; + } + else if (prevLive == InvalidOffsetNumber) + { + /* + * This is the first live tuple in the chain. It has to move + * to the head position. + */ + moveSrc[xlrec.nMove] = j; + moveDest[xlrec.nMove] = i; + xlrec.nMove++; + /* Chain updates will be applied after the move */ + prevLive = i; + interveningDeletable = false; + } + else + { + /* + * Second or later live tuple. Arrange to re-chain it to the + * previous live one, if there was a gap. + */ + if (interveningDeletable) + { + chainSrc[xlrec.nChain] = prevLive; + chainDest[xlrec.nChain] = j; + xlrec.nChain++; + } + prevLive = j; + interveningDeletable = false; + } + + j = SGLT_GET_NEXTOFFSET(lt); + } + + if (prevLive == InvalidOffsetNumber) + { + /* The chain is entirely removable, so we need a DEAD tuple */ + toDead[xlrec.nDead] = i; + xlrec.nDead++; + } + else if (interveningDeletable) + { + /* One or more deletions at end of chain, so close it off */ + chainSrc[xlrec.nChain] = prevLive; + chainDest[xlrec.nChain] = InvalidOffsetNumber; + xlrec.nChain++; + } + } + + /* sanity check ... */ + if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove) + elog(ERROR, "inconsistent counts of deletable tuples"); + + /* Do the updates */ + START_CRIT_SECTION(); + + spgPageIndexMultiDelete(&bds->spgstate, page, + toDead, xlrec.nDead, + SPGIST_DEAD, SPGIST_DEAD, + InvalidBlockNumber, InvalidOffsetNumber); + + spgPageIndexMultiDelete(&bds->spgstate, page, + toPlaceholder, xlrec.nPlaceholder, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, InvalidOffsetNumber); + + /* + * We implement the move step by swapping the line pointers of the source + * and target tuples, then replacing the newly-source tuples with + * placeholders. This is perhaps unduly friendly with the page data + * representation, but it's fast and doesn't risk page overflow when a + * tuple to be relocated is large. + */ + for (i = 0; i < xlrec.nMove; i++) + { + ItemId idSrc = PageGetItemId(page, moveSrc[i]); + ItemId idDest = PageGetItemId(page, moveDest[i]); + ItemIdData tmp; + + tmp = *idSrc; + *idSrc = *idDest; + *idDest = tmp; + } + + spgPageIndexMultiDelete(&bds->spgstate, page, + moveSrc, xlrec.nMove, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, InvalidOffsetNumber); + + for (i = 0; i < xlrec.nChain; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, chainSrc[i])); + Assert(lt->tupstate == SPGIST_LIVE); + SGLT_SET_NEXTOFFSET(lt, chainDest[i]); + } + + MarkBufferDirty(buffer); + + if (RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + + XLogBeginInsert(); + + STORE_STATE(&bds->spgstate, xlrec.stateSrc); + + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf); + /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ + XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead); + XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder); + XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove); + XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove); + XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain); + XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); +} + +/* + * Vacuum a root page when it is also a leaf + * + * On the root, we just delete any dead leaf tuples; no fancy business + */ +static void +vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) +{ + Page page = BufferGetPage(buffer); + spgxlogVacuumRoot xlrec; + OffsetNumber toDelete[MaxIndexTuplesPerPage]; + OffsetNumber i, + max = PageGetMaxOffsetNumber(page); + + xlrec.nDelete = 0; + + /* Scan page, identify tuples to delete, accumulate stats */ + for (i = FirstOffsetNumber; i <= max; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, i)); + if (lt->tupstate == SPGIST_LIVE) + { + Assert(ItemPointerIsValid(<->heapPtr)); + + if (bds->callback(<->heapPtr, bds->callback_state)) + { + bds->stats->tuples_removed += 1; + toDelete[xlrec.nDelete] = i; + xlrec.nDelete++; + } + else + { + bds->stats->num_index_tuples += 1; + } + } + else + { + /* all tuples on root should be live */ + elog(ERROR, "unexpected SPGiST tuple state: %d", + lt->tupstate); + } + } + + if (xlrec.nDelete == 0) + return; /* nothing more to do */ + + /* Do the update */ + START_CRIT_SECTION(); + + /* The tuple numbers are in order, so we can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, toDelete, xlrec.nDelete); + + MarkBufferDirty(buffer); + + if (RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + + XLogBeginInsert(); + + /* Prepare WAL record */ + STORE_STATE(&bds->spgstate, xlrec.stateSrc); + + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot); + /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */ + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * xlrec.nDelete); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); +} + +/* + * Clean up redirect and placeholder tuples on the given page + * + * Redirect tuples can be marked placeholder once they're old enough. + * Placeholder tuples can be removed if it won't change the offsets of + * non-placeholder ones. + * + * Unlike the routines above, this works on both leaf and inner pages. + */ +static void +vacuumRedirectAndPlaceholder(Relation index, Buffer buffer) +{ + Page page = BufferGetPage(buffer); + SpGistPageOpaque opaque = SpGistPageGetOpaque(page); + OffsetNumber i, + max = PageGetMaxOffsetNumber(page), + firstPlaceholder = InvalidOffsetNumber; + bool hasNonPlaceholder = false; + bool hasUpdate = false; + OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage]; + OffsetNumber itemnos[MaxIndexTuplesPerPage]; + spgxlogVacuumRedirect xlrec; + GlobalVisState *vistest; + + xlrec.nToPlaceholder = 0; + xlrec.newestRedirectXid = InvalidTransactionId; + + /* XXX: providing heap relation would allow more pruning */ + vistest = GlobalVisTestFor(NULL); + + START_CRIT_SECTION(); + + /* + * Scan backwards to convert old redirection tuples to placeholder tuples, + * and identify location of last non-placeholder tuple while at it. + */ + for (i = max; + i >= FirstOffsetNumber && + (opaque->nRedirection > 0 || !hasNonPlaceholder); + i--) + { + SpGistDeadTuple dt; + + dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i)); + + if (dt->tupstate == SPGIST_REDIRECT && + GlobalVisTestIsRemovableXid(vistest, dt->xid)) + { + dt->tupstate = SPGIST_PLACEHOLDER; + Assert(opaque->nRedirection > 0); + opaque->nRedirection--; + opaque->nPlaceholder++; + + /* remember newest XID among the removed redirects */ + if (!TransactionIdIsValid(xlrec.newestRedirectXid) || + TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid)) + xlrec.newestRedirectXid = dt->xid; + + ItemPointerSetInvalid(&dt->pointer); + + itemToPlaceholder[xlrec.nToPlaceholder] = i; + xlrec.nToPlaceholder++; + + hasUpdate = true; + } + + if (dt->tupstate == SPGIST_PLACEHOLDER) + { + if (!hasNonPlaceholder) + firstPlaceholder = i; + } + else + { + hasNonPlaceholder = true; + } + } + + /* + * Any placeholder tuples at the end of page can safely be removed. We + * can't remove ones before the last non-placeholder, though, because we + * can't alter the offset numbers of non-placeholder tuples. + */ + if (firstPlaceholder != InvalidOffsetNumber) + { + /* + * We do not store this array to rdata because it's easy to recreate. + */ + for (i = firstPlaceholder; i <= max; i++) + itemnos[i - firstPlaceholder] = i; + + i = max - firstPlaceholder + 1; + Assert(opaque->nPlaceholder >= i); + opaque->nPlaceholder -= i; + + /* The array is surely sorted, so can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, itemnos, i); + + hasUpdate = true; + } + + xlrec.firstPlaceholder = firstPlaceholder; + + if (hasUpdate) + MarkBufferDirty(buffer); + + if (hasUpdate && RelationNeedsWAL(index)) + { + XLogRecPtr recptr; + + XLogBeginInsert(); + + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect); + XLogRegisterData((char *) itemToPlaceholder, + sizeof(OffsetNumber) * xlrec.nToPlaceholder); + + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT); + + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); +} + +/* + * Process one page during a bulkdelete scan + */ +static void +spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno) +{ + Relation index = bds->info->index; + Buffer buffer; + Page page; + + /* call vacuum_delay_point while not holding any buffer lock */ + vacuum_delay_point(); + + buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, + RBM_NORMAL, bds->info->strategy); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + if (PageIsNew(page)) + { + /* + * We found an all-zero page, which could happen if the database + * crashed just after extending the file. Recycle it. + */ + } + else if (PageIsEmpty(page)) + { + /* nothing to do */ + } + else if (SpGistPageIsLeaf(page)) + { + if (SpGistBlockIsRoot(blkno)) + { + vacuumLeafRoot(bds, index, buffer); + /* no need for vacuumRedirectAndPlaceholder */ + } + else + { + vacuumLeafPage(bds, index, buffer, false); + vacuumRedirectAndPlaceholder(index, buffer); + } + } + else + { + /* inner page */ + vacuumRedirectAndPlaceholder(index, buffer); + } + + /* + * The root pages must never be deleted, nor marked as available in FSM, + * because we don't want them ever returned by a search for a place to put + * a new tuple. Otherwise, check for empty page, and make sure the FSM + * knows about it. + */ + if (!SpGistBlockIsRoot(blkno)) + { + if (PageIsNew(page) || PageIsEmpty(page)) + { + RecordFreeIndexPage(index, blkno); + bds->stats->pages_deleted++; + } + else + { + SpGistSetLastUsedPage(index, buffer); + bds->lastFilledBlock = blkno; + } + } + + UnlockReleaseBuffer(buffer); +} + +/* + * Process the pending-TID list between pages of the main scan + */ +static void +spgprocesspending(spgBulkDeleteState *bds) +{ + Relation index = bds->info->index; + spgVacPendingItem *pitem; + spgVacPendingItem *nitem; + BlockNumber blkno; + Buffer buffer; + Page page; + + for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next) + { + if (pitem->done) + continue; /* ignore already-done items */ + + /* call vacuum_delay_point while not holding any buffer lock */ + vacuum_delay_point(); + + /* examine the referenced page */ + blkno = ItemPointerGetBlockNumber(&pitem->tid); + buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, + RBM_NORMAL, bds->info->strategy); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = (Page) BufferGetPage(buffer); + + if (PageIsNew(page) || SpGistPageIsDeleted(page)) + { + /* Probably shouldn't happen, but ignore it */ + } + else if (SpGistPageIsLeaf(page)) + { + if (SpGistBlockIsRoot(blkno)) + { + /* this should definitely not happen */ + elog(ERROR, "redirection leads to root page of index \"%s\"", + RelationGetRelationName(index)); + } + + /* deal with any deletable tuples */ + vacuumLeafPage(bds, index, buffer, true); + /* might as well do this while we are here */ + vacuumRedirectAndPlaceholder(index, buffer); + + SpGistSetLastUsedPage(index, buffer); + + /* + * We can mark as done not only this item, but any later ones + * pointing at the same page, since we vacuumed the whole page. + */ + pitem->done = true; + for (nitem = pitem->next; nitem != NULL; nitem = nitem->next) + { + if (ItemPointerGetBlockNumber(&nitem->tid) == blkno) + nitem->done = true; + } + } + else + { + /* + * On an inner page, visit the referenced inner tuple and add all + * its downlinks to the pending list. We might have pending items + * for more than one inner tuple on the same page (in fact this is + * pretty likely given the way space allocation works), so get + * them all while we are here. + */ + for (nitem = pitem; nitem != NULL; nitem = nitem->next) + { + if (nitem->done) + continue; + if (ItemPointerGetBlockNumber(&nitem->tid) == blkno) + { + OffsetNumber offset; + SpGistInnerTuple innerTuple; + + offset = ItemPointerGetOffsetNumber(&nitem->tid); + innerTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, offset)); + if (innerTuple->tupstate == SPGIST_LIVE) + { + SpGistNodeTuple node; + int i; + + SGITITERATE(innerTuple, i, node) + { + if (ItemPointerIsValid(&node->t_tid)) + spgAddPendingTID(bds, &node->t_tid); + } + } + else if (innerTuple->tupstate == SPGIST_REDIRECT) + { + /* transfer attention to redirect point */ + spgAddPendingTID(bds, + &((SpGistDeadTuple) innerTuple)->pointer); + } + else + elog(ERROR, "unexpected SPGiST tuple state: %d", + innerTuple->tupstate); + + nitem->done = true; + } + } + } + + UnlockReleaseBuffer(buffer); + } + + spgClearPendingList(bds); +} + +/* + * Perform a bulkdelete scan + */ +static void +spgvacuumscan(spgBulkDeleteState *bds) +{ + Relation index = bds->info->index; + bool needLock; + BlockNumber num_pages, + blkno; + + /* Finish setting up spgBulkDeleteState */ + initSpGistState(&bds->spgstate, index); + bds->pendingList = NULL; + bds->myXmin = GetActiveSnapshot()->xmin; + bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO; + + /* + * Reset counts that will be incremented during the scan; needed in case + * of multiple scans during a single VACUUM command + */ + bds->stats->estimated_count = false; + bds->stats->num_index_tuples = 0; + bds->stats->pages_deleted = 0; + + /* We can skip locking for new or temp relations */ + needLock = !RELATION_IS_LOCAL(index); + + /* + * The outer loop iterates over all index pages except the metapage, in + * physical order (we hope the kernel will cooperate in providing + * read-ahead for speed). It is critical that we visit all leaf pages, + * including ones added after we start the scan, else we might fail to + * delete some deletable tuples. See more extensive comments about this + * in btvacuumscan(). + */ + blkno = SPGIST_METAPAGE_BLKNO + 1; + for (;;) + { + /* Get the current relation length */ + if (needLock) + LockRelationForExtension(index, ExclusiveLock); + num_pages = RelationGetNumberOfBlocks(index); + if (needLock) + UnlockRelationForExtension(index, ExclusiveLock); + + /* Quit if we've scanned the whole relation */ + if (blkno >= num_pages) + break; + /* Iterate over pages, then loop back to recheck length */ + for (; blkno < num_pages; blkno++) + { + spgvacuumpage(bds, blkno); + /* empty the pending-list after each page */ + if (bds->pendingList != NULL) + spgprocesspending(bds); + } + } + + /* Propagate local lastUsedPages cache to metablock */ + SpGistUpdateMetaPage(index); + + /* + * If we found any empty pages (and recorded them in the FSM), then + * forcibly update the upper-level FSM pages to ensure that searchers can + * find them. It's possible that the pages were also found during + * previous scans and so this is a waste of time, but it's cheap enough + * relative to scanning the index that it shouldn't matter much, and + * making sure that free pages are available sooner not later seems + * worthwhile. + * + * Note that if no empty pages exist, we don't bother vacuuming the FSM at + * all. + */ + if (bds->stats->pages_deleted > 0) + IndexFreeSpaceMapVacuum(index); + + /* + * Truncate index if possible + * + * XXX disabled because it's unsafe due to possible concurrent inserts. + * We'd have to rescan the pages to make sure they're still empty, and it + * doesn't seem worth it. Note that btree doesn't do this either. + * + * Another reason not to truncate is that it could invalidate the cached + * pages-with-freespace pointers in the metapage and other backends' + * relation caches, that is leave them pointing to nonexistent pages. + * Adding RelationGetNumberOfBlocks calls to protect the places that use + * those pointers would be unduly expensive. + */ +#ifdef NOT_USED + if (num_pages > bds->lastFilledBlock + 1) + { + BlockNumber lastBlock = num_pages - 1; + + num_pages = bds->lastFilledBlock + 1; + RelationTruncate(index, num_pages); + bds->stats->pages_removed += lastBlock - bds->lastFilledBlock; + bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock; + } +#endif + + /* Report final stats */ + bds->stats->num_pages = num_pages; + bds->stats->pages_newly_deleted = bds->stats->pages_deleted; + bds->stats->pages_free = bds->stats->pages_deleted; +} + +/* + * Bulk deletion of all index entries pointing to a set of heap tuples. + * The set of target tuples is specified via a callback routine that tells + * whether any given heap tuple (identified by ItemPointer) is being deleted. + * + * Result: a palloc'd struct containing statistical info for VACUUM displays. + */ +IndexBulkDeleteResult * +spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, + IndexBulkDeleteCallback callback, void *callback_state) +{ + spgBulkDeleteState bds; + + /* allocate stats if first time through, else re-use existing struct */ + if (stats == NULL) + stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + bds.info = info; + bds.stats = stats; + bds.callback = callback; + bds.callback_state = callback_state; + + spgvacuumscan(&bds); + + return stats; +} + +/* Dummy callback to delete no tuples during spgvacuumcleanup */ +static bool +dummy_callback(ItemPointer itemptr, void *state) +{ + return false; +} + +/* + * Post-VACUUM cleanup. + * + * Result: a palloc'd struct containing statistical info for VACUUM displays. + */ +IndexBulkDeleteResult * +spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) +{ + spgBulkDeleteState bds; + + /* No-op in ANALYZE ONLY mode */ + if (info->analyze_only) + return stats; + + /* + * We don't need to scan the index if there was a preceding bulkdelete + * pass. Otherwise, make a pass that won't delete any live tuples, but + * might still accomplish useful stuff with redirect/placeholder cleanup + * and/or FSM housekeeping, and in any case will provide stats. + */ + if (stats == NULL) + { + stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + bds.info = info; + bds.stats = stats; + bds.callback = dummy_callback; + bds.callback_state = NULL; + + spgvacuumscan(&bds); + } + + /* + * It's quite possible for us to be fooled by concurrent tuple moves into + * double-counting some index tuples, so disbelieve any total that exceeds + * the underlying heap's count ... if we know that accurately. Otherwise + * this might just make matters worse. + */ + if (!info->estimated_count) + { + if (stats->num_index_tuples > info->num_heap_tuples) + stats->num_index_tuples = info->num_heap_tuples; + } + + return stats; +} diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c new file mode 100644 index 0000000..472a28b --- /dev/null +++ b/src/backend/access/spgist/spgvalidate.c @@ -0,0 +1,392 @@ +/*------------------------------------------------------------------------- + * + * spgvalidate.c + * Opclass validator for SP-GiST. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgvalidate.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/amvalidate.h" +#include "access/htup_details.h" +#include "access/spgist_private.h" +#include "catalog/pg_amop.h" +#include "catalog/pg_amproc.h" +#include "catalog/pg_opclass.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/regproc.h" +#include "utils/syscache.h" + + +/* + * Validator for an SP-GiST opclass. + * + * Some of the checks done here cover the whole opfamily, and therefore are + * redundant when checking each opclass in a family. But they don't run long + * enough to be much of a problem, so we accept the duplication rather than + * complicate the amvalidate API. + */ +bool +spgvalidate(Oid opclassoid) +{ + bool result = true; + HeapTuple classtup; + Form_pg_opclass classform; + Oid opfamilyoid; + Oid opcintype; + Oid opckeytype; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; + CatCList *proclist, + *oprlist; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; + int i; + ListCell *lc; + spgConfigIn configIn; + spgConfigOut configOut; + Oid configOutLefttype = InvalidOid; + Oid configOutRighttype = InvalidOid; + Oid configOutLeafType = InvalidOid; + + /* Fetch opclass information */ + classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); + if (!HeapTupleIsValid(classtup)) + elog(ERROR, "cache lookup failed for operator class %u", opclassoid); + classform = (Form_pg_opclass) GETSTRUCT(classtup); + + opfamilyoid = classform->opcfamily; + opcintype = classform->opcintype; + opckeytype = classform->opckeytype; + opclassname = NameStr(classform->opcname); + + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); + + /* Fetch all operators and support functions of the opfamily */ + oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); + proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); + grouplist = identify_opfamily_groups(oprlist, proclist); + + /* Check individual support functions */ + for (i = 0; i < proclist->n_members; i++) + { + HeapTuple proctup = &proclist->members[i]->tuple; + Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); + bool ok; + + /* + * All SP-GiST support functions should be registered with matching + * left/right types + */ + if (procform->amproclefttype != procform->amprocrighttype) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains support function %s with different left and right input types", + opfamilyname, "spgist", + format_procedure(procform->amproc)))); + result = false; + } + + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case SPGIST_CONFIG_PROC: + ok = check_amproc_signature(procform->amproc, VOIDOID, true, + 2, 2, INTERNALOID, INTERNALOID); + configIn.attType = procform->amproclefttype; + memset(&configOut, 0, sizeof(configOut)); + + OidFunctionCall2(procform->amproc, + PointerGetDatum(&configIn), + PointerGetDatum(&configOut)); + + configOutLefttype = procform->amproclefttype; + configOutRighttype = procform->amprocrighttype; + + /* Default leaf type is opckeytype or input type */ + if (OidIsValid(opckeytype)) + configOutLeafType = opckeytype; + else + configOutLeafType = procform->amproclefttype; + + /* If some other leaf datum type is specified, warn */ + if (OidIsValid(configOut.leafType) && + configOutLeafType != configOut.leafType) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("SP-GiST leaf data type %s does not match declared type %s", + format_type_be(configOut.leafType), + format_type_be(configOutLeafType)))); + result = false; + configOutLeafType = configOut.leafType; + } + + /* + * When leaf and attribute types are the same, compress + * function is not required and we set corresponding bit in + * functionset for later group consistency check. + */ + if (configOutLeafType == configIn.attType) + { + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *group = lfirst(lc); + + if (group->lefttype == procform->amproclefttype && + group->righttype == procform->amprocrighttype) + { + group->functionset |= + ((uint64) 1) << SPGIST_COMPRESS_PROC; + break; + } + } + } + break; + case SPGIST_CHOOSE_PROC: + case SPGIST_PICKSPLIT_PROC: + case SPGIST_INNER_CONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, VOIDOID, true, + 2, 2, INTERNALOID, INTERNALOID); + break; + case SPGIST_LEAF_CONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, BOOLOID, true, + 2, 2, INTERNALOID, INTERNALOID); + break; + case SPGIST_COMPRESS_PROC: + if (configOutLefttype != procform->amproclefttype || + configOutRighttype != procform->amprocrighttype) + ok = false; + else + ok = check_amproc_signature(procform->amproc, + configOutLeafType, true, + 1, 1, procform->amproclefttype); + break; + case SPGIST_OPTIONS_PROC: + ok = check_amoptsproc_signature(procform->amproc); + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "spgist", + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* don't want additional message */ + } + + if (!ok) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "spgist", + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } + } + + /* Check individual operators */ + for (i = 0; i < oprlist->n_members; i++) + { + HeapTuple oprtup = &oprlist->members[i]->tuple; + Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); + Oid op_rettype; + + /* TODO: Check that only allowed strategy numbers exist */ + if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "spgist", + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } + + /* spgist supports ORDER BY operators */ + if (oprform->amoppurpose != AMOP_SEARCH) + { + /* ... and operator result must match the claimed btree opfamily */ + op_rettype = get_op_rettype(oprform->amopopr); + if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "spgist", + format_operator(oprform->amopopr)))); + result = false; + } + } + else + op_rettype = BOOLOID; + + /* Check operator signature --- same for all spgist strategies */ + if (!check_amop_signature(oprform->amopopr, op_rettype, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "spgist", + format_operator(oprform->amopopr)))); + result = false; + } + } + + /* Now check for inconsistent groups of operators/functions */ + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); + + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; + + /* + * Complain if there are any datatype pairs with functions but no + * operators. This is about the best we can do for now to detect + * missing operators. + */ + if (thisgroup->operatorset == 0) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s", + opfamilyname, "spgist", + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; + } + + /* + * Complain if we're missing functions for any datatype, remembering + * that SP-GiST doesn't use cross-type support functions. + */ + if (thisgroup->lefttype != thisgroup->righttype) + continue; + + for (i = 1; i <= SPGISTNProc; i++) + { + if ((thisgroup->functionset & (((uint64) 1) << i)) != 0) + continue; /* got it */ + if (i == SPGIST_OPTIONS_PROC) + continue; /* optional method */ + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator family \"%s\" of access method %s is missing support function %d for type %s", + opfamilyname, "spgist", i, + format_type_be(thisgroup->lefttype)))); + result = false; + } + } + + /* Check that the originally-named opclass is supported */ + /* (if group is there, we already checked it adequately above) */ + if (!opclassgroup) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("operator class \"%s\" of access method %s is missing operator(s)", + opclassname, "spgist"))); + result = false; + } + + ReleaseCatCacheList(proclist); + ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); + + return result; +} + +/* + * Prechecking function for adding operators/functions to an SP-GiST opfamily. + */ +void +spgadjustmembers(Oid opfamilyoid, + Oid opclassoid, + List *operators, + List *functions) +{ + ListCell *lc; + + /* + * Operator members of an SP-GiST opfamily should never have hard + * dependencies, since their connection to the opfamily depends only on + * what the support functions think, and that can be altered. For + * consistency, we make all soft dependencies point to the opfamily, + * though a soft dependency on the opclass would work as well in the + * CREATE OPERATOR CLASS case. + */ + foreach(lc, operators) + { + OpFamilyMember *op = (OpFamilyMember *) lfirst(lc); + + op->ref_is_hard = false; + op->ref_is_family = true; + op->refobjid = opfamilyoid; + } + + /* + * Required support functions should have hard dependencies. Preferably + * those are just dependencies on the opclass, but if we're in ALTER + * OPERATOR FAMILY, we leave the dependency pointing at the whole + * opfamily. (Given that SP-GiST opclasses generally don't share + * opfamilies, it seems unlikely to be worth working harder.) + */ + foreach(lc, functions) + { + OpFamilyMember *op = (OpFamilyMember *) lfirst(lc); + + switch (op->number) + { + case SPGIST_CONFIG_PROC: + case SPGIST_CHOOSE_PROC: + case SPGIST_PICKSPLIT_PROC: + case SPGIST_INNER_CONSISTENT_PROC: + case SPGIST_LEAF_CONSISTENT_PROC: + /* Required support function */ + op->ref_is_hard = true; + break; + case SPGIST_COMPRESS_PROC: + case SPGIST_OPTIONS_PROC: + /* Optional, so force it to be a soft family dependency */ + op->ref_is_hard = false; + op->ref_is_family = true; + op->refobjid = opfamilyoid; + break; + default: + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("support function number %d is invalid for access method %s", + op->number, "spgist"))); + break; + } + } +} diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c new file mode 100644 index 0000000..3dfd2aa --- /dev/null +++ b/src/backend/access/spgist/spgxlog.c @@ -0,0 +1,1013 @@ +/*------------------------------------------------------------------------- + * + * spgxlog.c + * WAL replay logic for SP-GiST + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/spgist/spgxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/bufmask.h" +#include "access/spgist_private.h" +#include "access/spgxlog.h" +#include "access/transam.h" +#include "access/xlog.h" +#include "access/xlogutils.h" +#include "storage/standby.h" +#include "utils/memutils.h" + + +static MemoryContext opCtx; /* working memory for operations */ + + +/* + * Prepare a dummy SpGistState, with just the minimum info needed for replay. + * + * At present, all we need is enough info to support spgFormDeadTuple(), + * plus the isBuild flag. + */ +static void +fillFakeState(SpGistState *state, spgxlogState stateSrc) +{ + memset(state, 0, sizeof(*state)); + + state->myXid = stateSrc.myXid; + state->isBuild = stateSrc.isBuild; + state->deadTupleStorage = palloc0(SGDTSIZE); +} + +/* + * Add a leaf tuple, or replace an existing placeholder tuple. This is used + * to replay SpGistPageAddNewItem() operations. If the offset points at an + * existing tuple, it had better be a placeholder tuple. + */ +static void +addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset) +{ + if (offset <= PageGetMaxOffsetNumber(page)) + { + SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page, + PageGetItemId(page, offset)); + + if (dt->tupstate != SPGIST_PLACEHOLDER) + elog(ERROR, "SPGiST tuple to be replaced is not a placeholder"); + + Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0); + SpGistPageGetOpaque(page)->nPlaceholder--; + + PageIndexTupleDelete(page, offset); + } + + Assert(offset <= PageGetMaxOffsetNumber(page) + 1); + + if (PageAddItem(page, tuple, size, offset, false, false) != offset) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + size); +} + +static void +spgRedoAddLeaf(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr; + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + Buffer buffer; + Page page; + XLogRedoAction action; + + ptr += sizeof(spgxlogAddLeaf); + leafTuple = ptr; + /* the leaf tuple is unaligned, so make a copy to access its header */ + memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + + /* + * In normal operation we would have both current and parent pages locked + * simultaneously; but in WAL replay it should be safe to update the leaf + * page before updating the parent. + */ + if (xldata->newPage) + { + buffer = XLogInitBufferForRedo(record, 0); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 0, &buffer); + + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + /* insert new tuple */ + if (xldata->offnumLeaf != xldata->offnumHeadLeaf) + { + /* normal cases, tuple was added by SpGistPageAddNewItem */ + addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, + xldata->offnumLeaf); + + /* update head tuple's chain link if needed */ + if (xldata->offnumHeadLeaf != InvalidOffsetNumber) + { + SpGistLeafTuple head; + + head = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumHeadLeaf)); + Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr)); + SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf); + } + } + else + { + /* replacing a DEAD tuple */ + PageIndexTupleDelete(page, xldata->offnumLeaf); + if (PageAddItem(page, + (Item) leafTuple, leafTupleHdr.size, + xldata->offnumLeaf, false, false) != xldata->offnumLeaf) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + leafTupleHdr.size); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* update parent downlink if necessary */ + if (xldata->offnumParent != InvalidOffsetNumber) + { + if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple tuple; + BlockNumber blknoLeaf; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf); + + page = BufferGetPage(buffer); + + tuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(tuple, xldata->nodeI, + blknoLeaf, xldata->offnumLeaf); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } +} + +static void +spgRedoMoveLeafs(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr; + SpGistState state; + OffsetNumber *toDelete; + OffsetNumber *toInsert; + int nInsert; + Buffer buffer; + Page page; + XLogRedoAction action; + BlockNumber blknoDst; + + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst); + + fillFakeState(&state, xldata->stateSrc); + + nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1; + + ptr += SizeOfSpgxlogMoveLeafs; + toDelete = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nMoves; + toInsert = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * nInsert; + + /* now ptr points to the list of leaf tuples */ + + /* + * In normal operation we would have all three pages (source, dest, and + * parent) locked simultaneously; but in WAL replay it should be safe to + * update them one at a time, as long as we do it in the right order. + */ + + /* Insert tuples on the dest page (do first, so redirect is valid) */ + if (xldata->newPage) + { + buffer = XLogInitBufferForRedo(record, 1); + SpGistInitBuffer(buffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 1, &buffer); + + if (action == BLK_NEEDS_REDO) + { + int i; + + page = BufferGetPage(buffer); + + for (i = 0; i < nInsert; i++) + { + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + + /* + * the tuples are not aligned, so must copy to access the size + * field. + */ + leafTuple = ptr; + memcpy(&leafTupleHdr, leafTuple, + sizeof(SpGistLeafTupleData)); + + addOrReplaceTuple(page, (Item) leafTuple, + leafTupleHdr.size, toInsert[i]); + ptr += leafTupleHdr.size; + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* Delete tuples from the source page, inserting a redirection pointer */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves, + state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + blknoDst, + toInsert[nInsert - 1]); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* And update the parent downlink */ + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple tuple; + + page = BufferGetPage(buffer); + + tuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(tuple, xldata->nodeI, + blknoDst, toInsert[nInsert - 1]); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoAddNode(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogAddNode *xldata = (spgxlogAddNode *) ptr; + char *innerTuple; + SpGistInnerTupleData innerTupleHdr; + SpGistState state; + Buffer buffer; + Page page; + XLogRedoAction action; + + ptr += sizeof(spgxlogAddNode); + innerTuple = ptr; + /* the tuple is unaligned, so make a copy to access its header */ + memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData)); + + fillFakeState(&state, xldata->stateSrc); + + if (!XLogRecHasBlockRef(record, 1)) + { + /* update in place */ + Assert(xldata->parentBlk == -1); + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + PageIndexTupleDelete(page, xldata->offnum); + if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size, + xldata->offnum, + false, false) != xldata->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + innerTupleHdr.size); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } + else + { + BlockNumber blkno; + BlockNumber blknoNew; + + XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno); + XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew); + + /* + * In normal operation we would have all three pages (source, dest, + * and parent) locked simultaneously; but in WAL replay it should be + * safe to update them one at a time, as long as we do it in the right + * order. We must insert the new tuple before replacing the old tuple + * with the redirect tuple. + */ + + /* Install new tuple first so redirect is valid */ + if (xldata->newPage) + { + /* AddNode is not used for nulls pages */ + buffer = XLogInitBufferForRedo(record, 1); + SpGistInitBuffer(buffer, 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + addOrReplaceTuple(page, (Item) innerTuple, + innerTupleHdr.size, xldata->offnumNew); + + /* + * If parent is in this same page, update it now. + */ + if (xldata->parentBlk == 1) + { + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); + } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* Delete old tuple, replacing it with redirect or placeholder tuple */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + SpGistDeadTuple dt; + + page = BufferGetPage(buffer); + + if (state.isBuild) + dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + else + dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, + blknoNew, + xldata->offnumNew); + + PageIndexTupleDelete(page, xldata->offnum); + if (PageAddItem(page, (Item) dt, dt->size, + xldata->offnum, + false, false) != xldata->offnum) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + dt->size); + + if (state.isBuild) + SpGistPageGetOpaque(page)->nPlaceholder++; + else + SpGistPageGetOpaque(page)->nRedirection++; + + /* + * If parent is in this same page, update it now. + */ + if (xldata->parentBlk == 0) + { + SpGistInnerTuple parentTuple; + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); + } + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* + * Update parent downlink (if we didn't do it as part of the source or + * destination page update already). + */ + if (xldata->parentBlk == 2) + { + if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parentTuple; + + page = BufferGetPage(buffer); + + parentTuple = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + + spgUpdateNodeLink(parentTuple, xldata->nodeI, + blknoNew, xldata->offnumNew); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } + } +} + +static void +spgRedoSplitTuple(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr; + char *prefixTuple; + SpGistInnerTupleData prefixTupleHdr; + char *postfixTuple; + SpGistInnerTupleData postfixTupleHdr; + Buffer buffer; + Page page; + XLogRedoAction action; + + ptr += sizeof(spgxlogSplitTuple); + prefixTuple = ptr; + /* the prefix tuple is unaligned, so make a copy to access its header */ + memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData)); + ptr += prefixTupleHdr.size; + postfixTuple = ptr; + /* postfix tuple is also unaligned */ + memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData)); + + /* + * In normal operation we would have both pages locked simultaneously; but + * in WAL replay it should be safe to update them one at a time, as long + * as we do it in the right order. + */ + + /* insert postfix tuple first to avoid dangling link */ + if (!xldata->postfixBlkSame) + { + if (xldata->newPage) + { + buffer = XLogInitBufferForRedo(record, 1); + /* SplitTuple is not used for nulls pages */ + SpGistInitBuffer(buffer, 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, xldata->offnumPostfix); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + } + + /* now handle the original page */ + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + PageIndexTupleDelete(page, xldata->offnumPrefix); + if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size, + xldata->offnumPrefix, false, false) != xldata->offnumPrefix) + elog(ERROR, "failed to add item of size %u to SPGiST index page", + prefixTupleHdr.size); + + if (xldata->postfixBlkSame) + addOrReplaceTuple(page, (Item) postfixTuple, + postfixTupleHdr.size, + xldata->offnumPostfix); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoPickSplit(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr; + char *innerTuple; + SpGistInnerTupleData innerTupleHdr; + SpGistState state; + OffsetNumber *toDelete; + OffsetNumber *toInsert; + uint8 *leafPageSelect; + Buffer srcBuffer; + Buffer destBuffer; + Buffer innerBuffer; + Page srcPage; + Page destPage; + Page page; + int i; + BlockNumber blknoInner; + XLogRedoAction action; + + XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner); + + fillFakeState(&state, xldata->stateSrc); + + ptr += SizeOfSpgxlogPickSplit; + toDelete = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nDelete; + toInsert = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nInsert; + leafPageSelect = (uint8 *) ptr; + ptr += sizeof(uint8) * xldata->nInsert; + + innerTuple = ptr; + /* the inner tuple is unaligned, so make a copy to access its header */ + memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData)); + ptr += innerTupleHdr.size; + + /* now ptr points to the list of leaf tuples */ + + if (xldata->isRootSplit) + { + /* when splitting root, we touch it only in the guise of new inner */ + srcBuffer = InvalidBuffer; + srcPage = NULL; + } + else if (xldata->initSrc) + { + /* just re-init the source page */ + srcBuffer = XLogInitBufferForRedo(record, 0); + srcPage = (Page) BufferGetPage(srcBuffer); + + SpGistInitBuffer(srcBuffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + /* don't update LSN etc till we're done with it */ + } + else + { + /* + * Delete the specified tuples from source page. (In case we're in + * Hot Standby, we need to hold lock on the page till we're done + * inserting leaf tuples and the new inner tuple, else the added + * redirect tuple will be a dangling link.) + */ + srcPage = NULL; + if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO) + { + srcPage = BufferGetPage(srcBuffer); + + /* + * We have it a bit easier here than in doPickSplit(), because we + * know the inner tuple's location already, so we can inject the + * correct redirection tuple now. + */ + if (!state.isBuild) + spgPageIndexMultiDelete(&state, srcPage, + toDelete, xldata->nDelete, + SPGIST_REDIRECT, + SPGIST_PLACEHOLDER, + blknoInner, + xldata->offnumInner); + else + spgPageIndexMultiDelete(&state, srcPage, + toDelete, xldata->nDelete, + SPGIST_PLACEHOLDER, + SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + /* don't update LSN etc till we're done with it */ + } + } + + /* try to access dest page if any */ + if (!XLogRecHasBlockRef(record, 1)) + { + destBuffer = InvalidBuffer; + destPage = NULL; + } + else if (xldata->initDest) + { + /* just re-init the dest page */ + destBuffer = XLogInitBufferForRedo(record, 1); + destPage = (Page) BufferGetPage(destBuffer); + + SpGistInitBuffer(destBuffer, + SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0)); + /* don't update LSN etc till we're done with it */ + } + else + { + /* + * We could probably release the page lock immediately in the + * full-page-image case, but for safety let's hold it till later. + */ + if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO) + destPage = (Page) BufferGetPage(destBuffer); + else + destPage = NULL; /* don't do any page updates */ + } + + /* restore leaf tuples to src and/or dest page */ + for (i = 0; i < xldata->nInsert; i++) + { + char *leafTuple; + SpGistLeafTupleData leafTupleHdr; + + /* the tuples are not aligned, so must copy to access the size field. */ + leafTuple = ptr; + memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData)); + ptr += leafTupleHdr.size; + + page = leafPageSelect[i] ? destPage : srcPage; + if (page == NULL) + continue; /* no need to touch this page */ + + addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size, + toInsert[i]); + } + + /* Now update src and dest page LSNs if needed */ + if (srcPage != NULL) + { + PageSetLSN(srcPage, lsn); + MarkBufferDirty(srcBuffer); + } + if (destPage != NULL) + { + PageSetLSN(destPage, lsn); + MarkBufferDirty(destBuffer); + } + + /* restore new inner tuple */ + if (xldata->initInner) + { + innerBuffer = XLogInitBufferForRedo(record, 2); + SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0)); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 2, &innerBuffer); + + if (action == BLK_NEEDS_REDO) + { + page = BufferGetPage(innerBuffer); + + addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size, + xldata->offnumInner); + + /* if inner is also parent, update link while we're here */ + if (xldata->innerIsParent) + { + SpGistInnerTuple parent; + + parent = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + spgUpdateNodeLink(parent, xldata->nodeI, + blknoInner, xldata->offnumInner); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(innerBuffer); + } + if (BufferIsValid(innerBuffer)) + UnlockReleaseBuffer(innerBuffer); + + /* + * Now we can release the leaf-page locks. It's okay to do this before + * updating the parent downlink. + */ + if (BufferIsValid(srcBuffer)) + UnlockReleaseBuffer(srcBuffer); + if (BufferIsValid(destBuffer)) + UnlockReleaseBuffer(destBuffer); + + /* update parent downlink, unless we did it above */ + if (XLogRecHasBlockRef(record, 3)) + { + Buffer parentBuffer; + + if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO) + { + SpGistInnerTuple parent; + + page = BufferGetPage(parentBuffer); + + parent = (SpGistInnerTuple) PageGetItem(page, + PageGetItemId(page, xldata->offnumParent)); + spgUpdateNodeLink(parent, xldata->nodeI, + blknoInner, xldata->offnumInner); + + PageSetLSN(page, lsn); + MarkBufferDirty(parentBuffer); + } + if (BufferIsValid(parentBuffer)) + UnlockReleaseBuffer(parentBuffer); + } + else + Assert(xldata->innerIsParent || xldata->isRootSplit); +} + +static void +spgRedoVacuumLeaf(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr; + OffsetNumber *toDead; + OffsetNumber *toPlaceholder; + OffsetNumber *moveSrc; + OffsetNumber *moveDest; + OffsetNumber *chainSrc; + OffsetNumber *chainDest; + SpGistState state; + Buffer buffer; + Page page; + int i; + + fillFakeState(&state, xldata->stateSrc); + + ptr += SizeOfSpgxlogVacuumLeaf; + toDead = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nDead; + toPlaceholder = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nPlaceholder; + moveSrc = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nMove; + moveDest = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nMove; + chainSrc = (OffsetNumber *) ptr; + ptr += sizeof(OffsetNumber) * xldata->nChain; + chainDest = (OffsetNumber *) ptr; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + spgPageIndexMultiDelete(&state, page, + toDead, xldata->nDead, + SPGIST_DEAD, SPGIST_DEAD, + InvalidBlockNumber, + InvalidOffsetNumber); + + spgPageIndexMultiDelete(&state, page, + toPlaceholder, xldata->nPlaceholder, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + /* see comments in vacuumLeafPage() */ + for (i = 0; i < xldata->nMove; i++) + { + ItemId idSrc = PageGetItemId(page, moveSrc[i]); + ItemId idDest = PageGetItemId(page, moveDest[i]); + ItemIdData tmp; + + tmp = *idSrc; + *idSrc = *idDest; + *idDest = tmp; + } + + spgPageIndexMultiDelete(&state, page, + moveSrc, xldata->nMove, + SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, + InvalidBlockNumber, + InvalidOffsetNumber); + + for (i = 0; i < xldata->nChain; i++) + { + SpGistLeafTuple lt; + + lt = (SpGistLeafTuple) PageGetItem(page, + PageGetItemId(page, chainSrc[i])); + Assert(lt->tupstate == SPGIST_LIVE); + SGLT_SET_NEXTOFFSET(lt, chainDest[i]); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoVacuumRoot(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr; + OffsetNumber *toDelete; + Buffer buffer; + Page page; + + toDelete = xldata->offsets; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + /* The tuple numbers are in order */ + PageIndexMultiDelete(page, toDelete, xldata->nDelete); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +static void +spgRedoVacuumRedirect(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *ptr = XLogRecGetData(record); + spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr; + OffsetNumber *itemToPlaceholder; + Buffer buffer; + + itemToPlaceholder = xldata->offsets; + + /* + * If any redirection tuples are being removed, make sure there are no + * live Hot Standby transactions that might need to see them. + */ + if (InHotStandby) + { + if (TransactionIdIsValid(xldata->newestRedirectXid)) + { + RelFileNode node; + + XLogRecGetBlockTag(record, 0, &node, NULL, NULL); + ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + node); + } + } + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + Page page = BufferGetPage(buffer); + SpGistPageOpaque opaque = SpGistPageGetOpaque(page); + int i; + + /* Convert redirect pointers to plain placeholders */ + for (i = 0; i < xldata->nToPlaceholder; i++) + { + SpGistDeadTuple dt; + + dt = (SpGistDeadTuple) PageGetItem(page, + PageGetItemId(page, itemToPlaceholder[i])); + Assert(dt->tupstate == SPGIST_REDIRECT); + dt->tupstate = SPGIST_PLACEHOLDER; + ItemPointerSetInvalid(&dt->pointer); + } + + Assert(opaque->nRedirection >= xldata->nToPlaceholder); + opaque->nRedirection -= xldata->nToPlaceholder; + opaque->nPlaceholder += xldata->nToPlaceholder; + + /* Remove placeholder tuples at end of page */ + if (xldata->firstPlaceholder != InvalidOffsetNumber) + { + int max = PageGetMaxOffsetNumber(page); + OffsetNumber *toDelete; + + toDelete = palloc(sizeof(OffsetNumber) * max); + + for (i = xldata->firstPlaceholder; i <= max; i++) + toDelete[i - xldata->firstPlaceholder] = i; + + i = max - xldata->firstPlaceholder + 1; + Assert(opaque->nPlaceholder >= i); + opaque->nPlaceholder -= i; + + /* The array is sorted, so can use PageIndexMultiDelete */ + PageIndexMultiDelete(page, toDelete, i); + + pfree(toDelete); + } + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +void +spg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + MemoryContext oldCxt; + + oldCxt = MemoryContextSwitchTo(opCtx); + switch (info) + { + case XLOG_SPGIST_ADD_LEAF: + spgRedoAddLeaf(record); + break; + case XLOG_SPGIST_MOVE_LEAFS: + spgRedoMoveLeafs(record); + break; + case XLOG_SPGIST_ADD_NODE: + spgRedoAddNode(record); + break; + case XLOG_SPGIST_SPLIT_TUPLE: + spgRedoSplitTuple(record); + break; + case XLOG_SPGIST_PICKSPLIT: + spgRedoPickSplit(record); + break; + case XLOG_SPGIST_VACUUM_LEAF: + spgRedoVacuumLeaf(record); + break; + case XLOG_SPGIST_VACUUM_ROOT: + spgRedoVacuumRoot(record); + break; + case XLOG_SPGIST_VACUUM_REDIRECT: + spgRedoVacuumRedirect(record); + break; + default: + elog(PANIC, "spg_redo: unknown op code %u", info); + } + + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); +} + +void +spg_xlog_startup(void) +{ + opCtx = AllocSetContextCreate(CurrentMemoryContext, + "SP-GiST temporary context", + ALLOCSET_DEFAULT_SIZES); +} + +void +spg_xlog_cleanup(void) +{ + MemoryContextDelete(opCtx); + opCtx = NULL; +} + +/* + * Mask a SpGist page before performing consistency checks on it. + */ +void +spg_mask(char *pagedata, BlockNumber blkno) +{ + Page page = (Page) pagedata; + PageHeader pagehdr = (PageHeader) page; + + mask_page_lsn_and_checksum(page); + + mask_page_hint_bits(page); + + /* + * Mask the unused space, but only if the page's pd_lower appears to have + * been set correctly. + */ + if (pagehdr->pd_lower >= SizeOfPageHeaderData) + mask_unused_space(page); +} |