summaryrefslogtreecommitdiffstats
path: root/src/backend/access/gist/gist.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gist/gist.c')
-rw-r--r--src/backend/access/gist/gist.c1713
1 files changed, 1713 insertions, 0 deletions
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
new file mode 100644
index 0000000..0683f42
--- /dev/null
+++ b/src/backend/access/gist/gist.c
@@ -0,0 +1,1713 @@
+/*-------------------------------------------------------------------------
+ *
+ * gist.c
+ * interface routines for the postgres GiST index access method.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/gist/gist.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/gist_private.h"
+#include "access/gistscan.h"
+#include "catalog/pg_collation.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "storage/lmgr.h"
+#include "storage/predicate.h"
+#include "utils/builtins.h"
+#include "utils/index_selfuncs.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+/* non-export function prototypes */
+static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
+static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
+static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate,
+ IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
+ Buffer leftchild, Buffer rightchild,
+ bool unlockbuf, bool unlockleftchild);
+static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, List *splitinfo, bool unlockbuf);
+static void gistprunepage(Relation rel, Page page, Buffer buffer,
+ Relation heapRel);
+
+
+#define ROTATEDIST(d) do { \
+ SplitedPageLayout *tmp=(SplitedPageLayout*)palloc0(sizeof(SplitedPageLayout)); \
+ tmp->block.blkno = InvalidBlockNumber; \
+ tmp->buffer = InvalidBuffer; \
+ tmp->next = (d); \
+ (d)=tmp; \
+} while(0)
+
+
+/*
+ * GiST handler function: return IndexAmRoutine with access method parameters
+ * and callbacks.
+ */
+Datum
+gisthandler(PG_FUNCTION_ARGS)
+{
+ IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
+
+ amroutine->amstrategies = 0;
+ amroutine->amsupport = GISTNProcs;
+ amroutine->amoptsprocnum = GIST_OPTIONS_PROC;
+ amroutine->amcanorder = false;
+ amroutine->amcanorderbyop = true;
+ amroutine->amcanbackward = false;
+ amroutine->amcanunique = false;
+ amroutine->amcanmulticol = true;
+ amroutine->amoptionalkey = true;
+ amroutine->amsearcharray = false;
+ amroutine->amsearchnulls = true;
+ amroutine->amstorage = true;
+ amroutine->amclusterable = true;
+ amroutine->ampredlocks = true;
+ amroutine->amcanparallel = false;
+ amroutine->amcaninclude = true;
+ amroutine->amusemaintenanceworkmem = false;
+ amroutine->amparallelvacuumoptions =
+ VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
+ amroutine->amkeytype = InvalidOid;
+
+ amroutine->ambuild = gistbuild;
+ amroutine->ambuildempty = gistbuildempty;
+ amroutine->aminsert = gistinsert;
+ amroutine->ambulkdelete = gistbulkdelete;
+ amroutine->amvacuumcleanup = gistvacuumcleanup;
+ amroutine->amcanreturn = gistcanreturn;
+ amroutine->amcostestimate = gistcostestimate;
+ amroutine->amoptions = gistoptions;
+ amroutine->amproperty = gistproperty;
+ amroutine->ambuildphasename = NULL;
+ amroutine->amvalidate = gistvalidate;
+ amroutine->amadjustmembers = gistadjustmembers;
+ amroutine->ambeginscan = gistbeginscan;
+ amroutine->amrescan = gistrescan;
+ amroutine->amgettuple = gistgettuple;
+ amroutine->amgetbitmap = gistgetbitmap;
+ amroutine->amendscan = gistendscan;
+ amroutine->ammarkpos = NULL;
+ amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
+
+ PG_RETURN_POINTER(amroutine);
+}
+
+/*
+ * Create and return a temporary memory context for use by GiST. We
+ * _always_ invoke user-provided methods in a temporary memory
+ * context, so that memory leaks in those functions cannot cause
+ * problems. Also, we use some additional temporary contexts in the
+ * GiST code itself, to avoid the need to do some awkward manual
+ * memory management.
+ */
+MemoryContext
+createTempGistContext(void)
+{
+ return AllocSetContextCreate(CurrentMemoryContext,
+ "GiST temporary context",
+ ALLOCSET_DEFAULT_SIZES);
+}
+
+/*
+ * gistbuildempty() -- build an empty gist index in the initialization fork
+ */
+void
+gistbuildempty(Relation index)
+{
+ Buffer buffer;
+
+ /* Initialize the root page */
+ buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /* Initialize and xlog buffer */
+ START_CRIT_SECTION();
+ GISTInitBuffer(buffer, F_LEAF);
+ MarkBufferDirty(buffer);
+ log_newpage_buffer(buffer, true);
+ END_CRIT_SECTION();
+
+ /* Unlock and release the buffer */
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * gistinsert -- wrapper for GiST tuple insertion.
+ *
+ * This is the public interface routine for tuple insertion in GiSTs.
+ * It doesn't do any work; just locks the relation and passes the buck.
+ */
+bool
+gistinsert(Relation r, Datum *values, bool *isnull,
+ ItemPointer ht_ctid, Relation heapRel,
+ IndexUniqueCheck checkUnique,
+ bool indexUnchanged,
+ IndexInfo *indexInfo)
+{
+ GISTSTATE *giststate = (GISTSTATE *) indexInfo->ii_AmCache;
+ IndexTuple itup;
+ MemoryContext oldCxt;
+
+ /* Initialize GISTSTATE cache if first call in this statement */
+ if (giststate == NULL)
+ {
+ oldCxt = MemoryContextSwitchTo(indexInfo->ii_Context);
+ giststate = initGISTstate(r);
+ giststate->tempCxt = createTempGistContext();
+ indexInfo->ii_AmCache = (void *) giststate;
+ MemoryContextSwitchTo(oldCxt);
+ }
+
+ oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
+
+ itup = gistFormTuple(giststate, r,
+ values, isnull, true /* size is currently bogus */ );
+ itup->t_tid = *ht_ctid;
+
+ gistdoinsert(r, itup, 0, giststate, heapRel, false);
+
+ /* cleanup */
+ MemoryContextSwitchTo(oldCxt);
+ MemoryContextReset(giststate->tempCxt);
+
+ return false;
+}
+
+
+/*
+ * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
+ * at that offset is atomically removed along with inserting the new tuples.
+ * This is used to replace a tuple with a new one.
+ *
+ * If 'leftchildbuf' is valid, we're inserting the downlink for the page
+ * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
+ * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
+ *
+ * If 'markfollowright' is true and the page is split, the left child is
+ * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
+ * index build, however, there is no concurrent access and the page splitting
+ * is done in a slightly simpler fashion, and false is passed.
+ *
+ * If there is not enough room on the page, it is split. All the split
+ * pages are kept pinned and locked and returned in *splitinfo, the caller
+ * is responsible for inserting the downlinks for them. However, if
+ * 'buffer' is the root page and it needs to be split, gistplacetopage()
+ * performs the split as one atomic operation, and *splitinfo is set to NIL.
+ * In that case, we continue to hold the root page locked, and the child
+ * pages are released; note that new tuple(s) are *not* on the root page
+ * but in one of the new child pages.
+ *
+ * If 'newblkno' is not NULL, returns the block number of page the first
+ * new/updated tuple was inserted to. Usually it's the given page, but could
+ * be its right sibling if the page was split.
+ *
+ * Returns 'true' if the page was split, 'false' otherwise.
+ */
+bool
+gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
+ Buffer buffer,
+ IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
+ BlockNumber *newblkno,
+ Buffer leftchildbuf,
+ List **splitinfo,
+ bool markfollowright,
+ Relation heapRel,
+ bool is_build)
+{
+ BlockNumber blkno = BufferGetBlockNumber(buffer);
+ Page page = BufferGetPage(buffer);
+ bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
+ XLogRecPtr recptr;
+ int i;
+ bool is_split;
+
+ /*
+ * Refuse to modify a page that's incompletely split. This should not
+ * happen because we finish any incomplete splits while we walk down the
+ * tree. However, it's remotely possible that another concurrent inserter
+ * splits a parent page, and errors out before completing the split. We
+ * will just throw an error in that case, and leave any split we had in
+ * progress unfinished too. The next insert that comes along will clean up
+ * the mess.
+ */
+ if (GistFollowRight(page))
+ elog(ERROR, "concurrent GiST page split was incomplete");
+
+ /* should never try to insert to a deleted page */
+ Assert(!GistPageIsDeleted(page));
+
+ *splitinfo = NIL;
+
+ /*
+ * if isupdate, remove old key: This node's key has been modified, either
+ * because a child split occurred or because we needed to adjust our key
+ * for an insert in a child node. Therefore, remove the old version of
+ * this node's key.
+ *
+ * for WAL replay, in the non-split case we handle this by setting up a
+ * one-element todelete array; in the split case, it's handled implicitly
+ * because the tuple vector passed to gistSplit won't include this tuple.
+ */
+ is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+
+ /*
+ * If leaf page is full, try at first to delete dead tuples. And then
+ * check again.
+ */
+ if (is_split && GistPageIsLeaf(page) && GistPageHasGarbage(page))
+ {
+ gistprunepage(rel, page, buffer, heapRel);
+ is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
+ }
+
+ if (is_split)
+ {
+ /* no space for insertion */
+ IndexTuple *itvec;
+ int tlen;
+ SplitedPageLayout *dist = NULL,
+ *ptr;
+ BlockNumber oldrlink = InvalidBlockNumber;
+ GistNSN oldnsn = 0;
+ SplitedPageLayout rootpg;
+ bool is_rootsplit;
+ int npage;
+
+ is_rootsplit = (blkno == GIST_ROOT_BLKNO);
+
+ /*
+ * Form index tuples vector to split. If we're replacing an old tuple,
+ * remove the old version from the vector.
+ */
+ itvec = gistextractpage(page, &tlen);
+ if (OffsetNumberIsValid(oldoffnum))
+ {
+ /* on inner page we should remove old tuple */
+ int pos = oldoffnum - FirstOffsetNumber;
+
+ tlen--;
+ if (pos != tlen)
+ memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
+ }
+ itvec = gistjoinvector(itvec, &tlen, itup, ntup);
+ dist = gistSplit(rel, page, itvec, tlen, giststate);
+
+ /*
+ * Check that split didn't produce too many pages.
+ */
+ npage = 0;
+ for (ptr = dist; ptr; ptr = ptr->next)
+ npage++;
+ /* in a root split, we'll add one more page to the list below */
+ if (is_rootsplit)
+ npage++;
+ if (npage > GIST_MAX_SPLIT_PAGES)
+ elog(ERROR, "GiST page split into too many halves (%d, maximum %d)",
+ npage, GIST_MAX_SPLIT_PAGES);
+
+ /*
+ * Set up pages to work with. Allocate new buffers for all but the
+ * leftmost page. The original page becomes the new leftmost page, and
+ * is just replaced with the new contents.
+ *
+ * For a root-split, allocate new buffers for all child pages, the
+ * original page is overwritten with new root page containing
+ * downlinks to the new child pages.
+ */
+ ptr = dist;
+ if (!is_rootsplit)
+ {
+ /* save old rightlink and NSN */
+ oldrlink = GistPageGetOpaque(page)->rightlink;
+ oldnsn = GistPageGetNSN(page);
+
+ dist->buffer = buffer;
+ dist->block.blkno = BufferGetBlockNumber(buffer);
+ dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
+
+ /* clean all flags except F_LEAF */
+ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
+
+ ptr = ptr->next;
+ }
+ for (; ptr; ptr = ptr->next)
+ {
+ /* Allocate new page */
+ ptr->buffer = gistNewBuffer(rel);
+ GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
+ ptr->page = BufferGetPage(ptr->buffer);
+ ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
+ PredicateLockPageSplit(rel,
+ BufferGetBlockNumber(buffer),
+ BufferGetBlockNumber(ptr->buffer));
+ }
+
+ /*
+ * Now that we know which blocks the new pages go to, set up downlink
+ * tuples to point to them.
+ */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
+ GistTupleSetValid(ptr->itup);
+ }
+
+ /*
+ * If this is a root split, we construct the new root page with the
+ * downlinks here directly, instead of requiring the caller to insert
+ * them. Add the new root page to the list along with the child pages.
+ */
+ if (is_rootsplit)
+ {
+ IndexTuple *downlinks;
+ int ndownlinks = 0;
+ int i;
+
+ rootpg.buffer = buffer;
+ rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
+ GistPageGetOpaque(rootpg.page)->flags = 0;
+
+ /* Prepare a vector of all the downlinks */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ ndownlinks++;
+ downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
+ for (i = 0, ptr = dist; ptr; ptr = ptr->next)
+ downlinks[i++] = ptr->itup;
+
+ rootpg.block.blkno = GIST_ROOT_BLKNO;
+ rootpg.block.num = ndownlinks;
+ rootpg.list = gistfillitupvec(downlinks, ndownlinks,
+ &(rootpg.lenlist));
+ rootpg.itup = NULL;
+
+ rootpg.next = dist;
+ dist = &rootpg;
+ }
+ else
+ {
+ /* Prepare split-info to be returned to caller */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
+
+ si->buf = ptr->buffer;
+ si->downlink = ptr->itup;
+ *splitinfo = lappend(*splitinfo, si);
+ }
+ }
+
+ /*
+ * Fill all pages. All the pages are new, ie. freshly allocated empty
+ * pages, or a temporary copy of the old page.
+ */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ {
+ char *data = (char *) (ptr->list);
+
+ for (i = 0; i < ptr->block.num; i++)
+ {
+ IndexTuple thistup = (IndexTuple) data;
+
+ if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
+
+ /*
+ * If this is the first inserted/updated tuple, let the caller
+ * know which page it landed on.
+ */
+ if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
+ *newblkno = ptr->block.blkno;
+
+ data += IndexTupleSize(thistup);
+ }
+
+ /* Set up rightlinks */
+ if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
+ GistPageGetOpaque(ptr->page)->rightlink =
+ ptr->next->block.blkno;
+ else
+ GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
+
+ /*
+ * Mark the all but the right-most page with the follow-right
+ * flag. It will be cleared as soon as the downlink is inserted
+ * into the parent, but this ensures that if we error out before
+ * that, the index is still consistent. (in buffering build mode,
+ * any error will abort the index build anyway, so this is not
+ * needed.)
+ */
+ if (ptr->next && !is_rootsplit && markfollowright)
+ GistMarkFollowRight(ptr->page);
+ else
+ GistClearFollowRight(ptr->page);
+
+ /*
+ * Copy the NSN of the original page to all pages. The
+ * F_FOLLOW_RIGHT flags ensure that scans will follow the
+ * rightlinks until the downlinks are inserted.
+ */
+ GistPageSetNSN(ptr->page, oldnsn);
+ }
+
+ /*
+ * gistXLogSplit() needs to WAL log a lot of pages, prepare WAL
+ * insertion for that. NB: The number of pages and data segments
+ * specified here must match the calculations in gistXLogSplit()!
+ */
+ if (!is_build && RelationNeedsWAL(rel))
+ XLogEnsureRecordSpace(npage, 1 + npage * 2);
+
+ START_CRIT_SECTION();
+
+ /*
+ * Must mark buffers dirty before XLogInsert, even though we'll still
+ * be changing their opaque fields below.
+ */
+ for (ptr = dist; ptr; ptr = ptr->next)
+ MarkBufferDirty(ptr->buffer);
+ if (BufferIsValid(leftchildbuf))
+ MarkBufferDirty(leftchildbuf);
+
+ /*
+ * The first page in the chain was a temporary working copy meant to
+ * replace the old page. Copy it over the old page.
+ */
+ PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
+ dist->page = BufferGetPage(dist->buffer);
+
+ /*
+ * Write the WAL record.
+ *
+ * If we're building a new index, however, we don't WAL-log changes
+ * yet. The LSN-NSN interlock between parent and child requires that
+ * LSNs never move backwards, so set the LSNs to a value that's
+ * smaller than any real or fake unlogged LSN that might be generated
+ * later. (There can't be any concurrent scans during index build, so
+ * we don't need to be able to detect concurrent splits yet.)
+ */
+ if (is_build)
+ recptr = GistBuildLSN;
+ else
+ {
+ if (RelationNeedsWAL(rel))
+ recptr = gistXLogSplit(is_leaf,
+ dist, oldrlink, oldnsn, leftchildbuf,
+ markfollowright);
+ else
+ recptr = gistGetFakeLSN(rel);
+ }
+
+ for (ptr = dist; ptr; ptr = ptr->next)
+ PageSetLSN(ptr->page, recptr);
+
+ /*
+ * Return the new child buffers to the caller.
+ *
+ * If this was a root split, we've already inserted the downlink
+ * pointers, in the form of a new root page. Therefore we can release
+ * all the new buffers, and keep just the root page locked.
+ */
+ if (is_rootsplit)
+ {
+ for (ptr = dist->next; ptr; ptr = ptr->next)
+ UnlockReleaseBuffer(ptr->buffer);
+ }
+ }
+ else
+ {
+ /*
+ * Enough space. We always get here if ntup==0.
+ */
+ START_CRIT_SECTION();
+
+ /*
+ * Delete old tuple if any, then insert new tuple(s) if any. If
+ * possible, use the fast path of PageIndexTupleOverwrite.
+ */
+ if (OffsetNumberIsValid(oldoffnum))
+ {
+ if (ntup == 1)
+ {
+ /* One-for-one replacement, so use PageIndexTupleOverwrite */
+ if (!PageIndexTupleOverwrite(page, oldoffnum, (Item) *itup,
+ IndexTupleSize(*itup)))
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(rel));
+ }
+ else
+ {
+ /* Delete old, then append new tuple(s) to page */
+ PageIndexTupleDelete(page, oldoffnum);
+ gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
+ }
+ }
+ else
+ {
+ /* Just append new tuples at the end of the page */
+ gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
+ }
+
+ MarkBufferDirty(buffer);
+
+ if (BufferIsValid(leftchildbuf))
+ MarkBufferDirty(leftchildbuf);
+
+ if (is_build)
+ recptr = GistBuildLSN;
+ else
+ {
+ if (RelationNeedsWAL(rel))
+ {
+ OffsetNumber ndeloffs = 0,
+ deloffs[1];
+
+ if (OffsetNumberIsValid(oldoffnum))
+ {
+ deloffs[0] = oldoffnum;
+ ndeloffs = 1;
+ }
+
+ recptr = gistXLogUpdate(buffer,
+ deloffs, ndeloffs, itup, ntup,
+ leftchildbuf);
+ }
+ else
+ recptr = gistGetFakeLSN(rel);
+ }
+ PageSetLSN(page, recptr);
+
+ if (newblkno)
+ *newblkno = blkno;
+ }
+
+ /*
+ * If we inserted the downlink for a child page, set NSN and clear
+ * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
+ * follow the rightlink if and only if they looked at the parent page
+ * before we inserted the downlink.
+ *
+ * Note that we do this *after* writing the WAL record. That means that
+ * the possible full page image in the WAL record does not include these
+ * changes, and they must be replayed even if the page is restored from
+ * the full page image. There's a chicken-and-egg problem: if we updated
+ * the child pages first, we wouldn't know the recptr of the WAL record
+ * we're about to write.
+ */
+ if (BufferIsValid(leftchildbuf))
+ {
+ Page leftpg = BufferGetPage(leftchildbuf);
+
+ GistPageSetNSN(leftpg, recptr);
+ GistClearFollowRight(leftpg);
+
+ PageSetLSN(leftpg, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ return is_split;
+}
+
+/*
+ * Workhouse routine for doing insertion into a GiST index. Note that
+ * this routine assumes it is invoked in a short-lived memory context,
+ * so it does not bother releasing palloc'd allocations.
+ */
+void
+gistdoinsert(Relation r, IndexTuple itup, Size freespace,
+ GISTSTATE *giststate, Relation heapRel, bool is_build)
+{
+ ItemId iid;
+ IndexTuple idxtuple;
+ GISTInsertStack firststack;
+ GISTInsertStack *stack;
+ GISTInsertState state;
+ bool xlocked = false;
+
+ memset(&state, 0, sizeof(GISTInsertState));
+ state.freespace = freespace;
+ state.r = r;
+ state.heapRel = heapRel;
+ state.is_build = is_build;
+
+ /* Start from the root */
+ firststack.blkno = GIST_ROOT_BLKNO;
+ firststack.lsn = 0;
+ firststack.retry_from_parent = false;
+ firststack.parent = NULL;
+ firststack.downlinkoffnum = InvalidOffsetNumber;
+ state.stack = stack = &firststack;
+
+ /*
+ * Walk down along the path of smallest penalty, updating the parent
+ * pointers with the key we're inserting as we go. If we crash in the
+ * middle, the tree is consistent, although the possible parent updates
+ * were a waste.
+ */
+ for (;;)
+ {
+ /*
+ * If we split an internal page while descending the tree, we have to
+ * retry at the parent. (Normally, the LSN-NSN interlock below would
+ * also catch this and cause us to retry. But LSNs are not updated
+ * during index build.)
+ */
+ while (stack->retry_from_parent)
+ {
+ if (xlocked)
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ xlocked = false;
+ ReleaseBuffer(stack->buffer);
+ state.stack = stack = stack->parent;
+ }
+
+ if (XLogRecPtrIsInvalid(stack->lsn))
+ stack->buffer = ReadBuffer(state.r, stack->blkno);
+
+ /*
+ * Be optimistic and grab shared lock first. Swap it for an exclusive
+ * lock later if we need to update the page.
+ */
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_SHARE);
+ gistcheckpage(state.r, stack->buffer);
+ }
+
+ stack->page = (Page) BufferGetPage(stack->buffer);
+ stack->lsn = xlocked ?
+ PageGetLSN(stack->page) : BufferGetLSNAtomic(stack->buffer);
+ Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
+
+ /*
+ * If this page was split but the downlink was never inserted to the
+ * parent because the inserting backend crashed before doing that, fix
+ * that now.
+ */
+ if (GistFollowRight(stack->page))
+ {
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ /* someone might've completed the split when we unlocked */
+ if (!GistFollowRight(stack->page))
+ continue;
+ }
+ gistfixsplit(&state, giststate);
+
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ continue;
+ }
+
+ if ((stack->blkno != GIST_ROOT_BLKNO &&
+ stack->parent->lsn < GistPageGetNSN(stack->page)) ||
+ GistPageIsDeleted(stack->page))
+ {
+ /*
+ * Concurrent split or page deletion detected. There's no
+ * guarantee that the downlink for this page is consistent with
+ * the tuple we're inserting anymore, so go back to parent and
+ * rechoose the best child.
+ */
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ continue;
+ }
+
+ if (!GistPageIsLeaf(stack->page))
+ {
+ /*
+ * This is an internal page so continue to walk down the tree.
+ * Find the child node that has the minimum insertion penalty.
+ */
+ BlockNumber childblkno;
+ IndexTuple newtup;
+ GISTInsertStack *item;
+ OffsetNumber downlinkoffnum;
+
+ downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
+ iid = PageGetItemId(stack->page, downlinkoffnum);
+ idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
+ childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+
+ /*
+ * Check that it's not a leftover invalid tuple from pre-9.1
+ */
+ if (GistTupleIsInvalid(idxtuple))
+ ereport(ERROR,
+ (errmsg("index \"%s\" contains an inner tuple marked as invalid",
+ RelationGetRelationName(r)),
+ errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
+ errhint("Please REINDEX it.")));
+
+ /*
+ * Check that the key representing the target child node is
+ * consistent with the key we're inserting. Update it if it's not.
+ */
+ newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
+ if (newtup)
+ {
+ /*
+ * Swap shared lock for an exclusive one. Beware, the page may
+ * change while we unlock/lock the page...
+ */
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ stack->page = (Page) BufferGetPage(stack->buffer);
+
+ if (PageGetLSN(stack->page) != stack->lsn)
+ {
+ /* the page was changed while we unlocked it, retry */
+ continue;
+ }
+ }
+
+ /*
+ * Update the tuple.
+ *
+ * We still hold the lock after gistinserttuple(), but it
+ * might have to split the page to make the updated tuple fit.
+ * In that case the updated tuple might migrate to the other
+ * half of the split, so we have to go back to the parent and
+ * descend back to the half that's a better fit for the new
+ * tuple.
+ */
+ if (gistinserttuple(&state, stack, giststate, newtup,
+ downlinkoffnum))
+ {
+ /*
+ * If this was a root split, the root page continues to be
+ * the parent and the updated tuple went to one of the
+ * child pages, so we just need to retry from the root
+ * page.
+ */
+ if (stack->blkno != GIST_ROOT_BLKNO)
+ {
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ }
+ continue;
+ }
+ }
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ xlocked = false;
+
+ /* descend to the chosen child */
+ item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
+ item->blkno = childblkno;
+ item->parent = stack;
+ item->downlinkoffnum = downlinkoffnum;
+ state.stack = stack = item;
+ }
+ else
+ {
+ /*
+ * Leaf page. Insert the new key. We've already updated all the
+ * parents on the way down, but we might have to split the page if
+ * it doesn't fit. gistinserttuple() will take care of that.
+ */
+
+ /*
+ * Swap shared lock for an exclusive one. Be careful, the page may
+ * change while we unlock/lock the page...
+ */
+ if (!xlocked)
+ {
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ LockBuffer(stack->buffer, GIST_EXCLUSIVE);
+ xlocked = true;
+ stack->page = (Page) BufferGetPage(stack->buffer);
+ stack->lsn = PageGetLSN(stack->page);
+
+ if (stack->blkno == GIST_ROOT_BLKNO)
+ {
+ /*
+ * the only page that can become inner instead of leaf is
+ * the root page, so for root we should recheck it
+ */
+ if (!GistPageIsLeaf(stack->page))
+ {
+ /*
+ * very rare situation: during unlock/lock index with
+ * number of pages = 1 was increased
+ */
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+ xlocked = false;
+ continue;
+ }
+
+ /*
+ * we don't need to check root split, because checking
+ * leaf/inner is enough to recognize split for root
+ */
+ }
+ else if ((GistFollowRight(stack->page) ||
+ stack->parent->lsn < GistPageGetNSN(stack->page)) ||
+ GistPageIsDeleted(stack->page))
+ {
+ /*
+ * The page was split or deleted while we momentarily
+ * unlocked the page. Go back to parent.
+ */
+ UnlockReleaseBuffer(stack->buffer);
+ xlocked = false;
+ state.stack = stack = stack->parent;
+ continue;
+ }
+ }
+
+ /* now state.stack->(page, buffer and blkno) points to leaf page */
+
+ gistinserttuple(&state, stack, giststate, itup,
+ InvalidOffsetNumber);
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+
+ /* Release any pins we might still hold before exiting */
+ for (; stack; stack = stack->parent)
+ ReleaseBuffer(stack->buffer);
+ break;
+ }
+ }
+}
+
+/*
+ * Traverse the tree to find path from root page to specified "child" block.
+ *
+ * returns a new insertion stack, starting from the parent of "child", up
+ * to the root. *downlinkoffnum is set to the offset of the downlink in the
+ * direct parent of child.
+ *
+ * To prevent deadlocks, this should lock only one page at a time.
+ */
+static GISTInsertStack *
+gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
+{
+ Page page;
+ Buffer buffer;
+ OffsetNumber i,
+ maxoff;
+ ItemId iid;
+ IndexTuple idxtuple;
+ List *fifo;
+ GISTInsertStack *top,
+ *ptr;
+ BlockNumber blkno;
+
+ top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
+ top->blkno = GIST_ROOT_BLKNO;
+ top->downlinkoffnum = InvalidOffsetNumber;
+
+ fifo = list_make1(top);
+ while (fifo != NIL)
+ {
+ /* Get next page to visit */
+ top = linitial(fifo);
+ fifo = list_delete_first(fifo);
+
+ buffer = ReadBuffer(r, top->blkno);
+ LockBuffer(buffer, GIST_SHARE);
+ gistcheckpage(r, buffer);
+ page = (Page) BufferGetPage(buffer);
+
+ if (GistPageIsLeaf(page))
+ {
+ /*
+ * Because we scan the index top-down, all the rest of the pages
+ * in the queue must be leaf pages as well.
+ */
+ UnlockReleaseBuffer(buffer);
+ break;
+ }
+
+ /* currently, internal pages are never deleted */
+ Assert(!GistPageIsDeleted(page));
+
+ top->lsn = BufferGetLSNAtomic(buffer);
+
+ /*
+ * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
+ * downlink. This should not normally happen..
+ */
+ if (GistFollowRight(page))
+ elog(ERROR, "concurrent GiST page split was incomplete");
+
+ if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
+ GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
+ {
+ /*
+ * Page was split while we looked elsewhere. We didn't see the
+ * downlink to the right page when we scanned the parent, so add
+ * it to the queue now.
+ *
+ * Put the right page ahead of the queue, so that we visit it
+ * next. That's important, because if this is the lowest internal
+ * level, just above leaves, we might already have queued up some
+ * leaf pages, and we assume that there can't be any non-leaf
+ * pages behind leaf pages.
+ */
+ ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
+ ptr->blkno = GistPageGetOpaque(page)->rightlink;
+ ptr->downlinkoffnum = InvalidOffsetNumber;
+ ptr->parent = top->parent;
+
+ fifo = lcons(ptr, fifo);
+ }
+
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ iid = PageGetItemId(page, i);
+ idxtuple = (IndexTuple) PageGetItem(page, iid);
+ blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
+ if (blkno == child)
+ {
+ /* Found it! */
+ UnlockReleaseBuffer(buffer);
+ *downlinkoffnum = i;
+ return top;
+ }
+ else
+ {
+ /* Append this child to the list of pages to visit later */
+ ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
+ ptr->blkno = blkno;
+ ptr->downlinkoffnum = i;
+ ptr->parent = top;
+
+ fifo = lappend(fifo, ptr);
+ }
+ }
+
+ UnlockReleaseBuffer(buffer);
+ }
+
+ elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
+ RelationGetRelationName(r), child);
+ return NULL; /* keep compiler quiet */
+}
+
+/*
+ * Updates the stack so that child->parent is the correct parent of the
+ * child. child->parent must be exclusively locked on entry, and will
+ * remain so at exit, but it might not be the same page anymore.
+ */
+static void
+gistFindCorrectParent(Relation r, GISTInsertStack *child)
+{
+ GISTInsertStack *parent = child->parent;
+
+ gistcheckpage(r, parent->buffer);
+ parent->page = (Page) BufferGetPage(parent->buffer);
+
+ /* here we don't need to distinguish between split and page update */
+ if (child->downlinkoffnum == InvalidOffsetNumber ||
+ parent->lsn != PageGetLSN(parent->page))
+ {
+ /* parent is changed, look child in right links until found */
+ OffsetNumber i,
+ maxoff;
+ ItemId iid;
+ IndexTuple idxtuple;
+ GISTInsertStack *ptr;
+
+ while (true)
+ {
+ maxoff = PageGetMaxOffsetNumber(parent->page);
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ iid = PageGetItemId(parent->page, i);
+ idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
+ if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
+ {
+ /* yes!!, found */
+ child->downlinkoffnum = i;
+ return;
+ }
+ }
+
+ parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
+ UnlockReleaseBuffer(parent->buffer);
+ if (parent->blkno == InvalidBlockNumber)
+ {
+ /*
+ * End of chain and still didn't find parent. It's a very-very
+ * rare situation when root splitted.
+ */
+ break;
+ }
+ parent->buffer = ReadBuffer(r, parent->blkno);
+ LockBuffer(parent->buffer, GIST_EXCLUSIVE);
+ gistcheckpage(r, parent->buffer);
+ parent->page = (Page) BufferGetPage(parent->buffer);
+ }
+
+ /*
+ * awful!!, we need search tree to find parent ... , but before we
+ * should release all old parent
+ */
+
+ ptr = child->parent->parent; /* child->parent already released
+ * above */
+ while (ptr)
+ {
+ ReleaseBuffer(ptr->buffer);
+ ptr = ptr->parent;
+ }
+
+ /* ok, find new path */
+ ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
+
+ /* read all buffers as expected by caller */
+ /* note we don't lock them or gistcheckpage them here! */
+ while (ptr)
+ {
+ ptr->buffer = ReadBuffer(r, ptr->blkno);
+ ptr->page = (Page) BufferGetPage(ptr->buffer);
+ ptr = ptr->parent;
+ }
+
+ /* install new chain of parents to stack */
+ child->parent = parent;
+
+ /* make recursive call to normal processing */
+ LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
+ gistFindCorrectParent(r, child);
+ }
+}
+
+/*
+ * Form a downlink pointer for the page in 'buf'.
+ */
+static IndexTuple
+gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
+ GISTInsertStack *stack)
+{
+ Page page = BufferGetPage(buf);
+ OffsetNumber maxoff;
+ OffsetNumber offset;
+ IndexTuple downlink = NULL;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
+ {
+ IndexTuple ituple = (IndexTuple)
+ PageGetItem(page, PageGetItemId(page, offset));
+
+ if (downlink == NULL)
+ downlink = CopyIndexTuple(ituple);
+ else
+ {
+ IndexTuple newdownlink;
+
+ newdownlink = gistgetadjusted(rel, downlink, ituple,
+ giststate);
+ if (newdownlink)
+ downlink = newdownlink;
+ }
+ }
+
+ /*
+ * If the page is completely empty, we can't form a meaningful downlink
+ * for it. But we have to insert a downlink for the page. Any key will do,
+ * as long as its consistent with the downlink of parent page, so that we
+ * can legally insert it to the parent. A minimal one that matches as few
+ * scans as possible would be best, to keep scans from doing useless work,
+ * but we don't know how to construct that. So we just use the downlink of
+ * the original page that was split - that's as far from optimal as it can
+ * get but will do..
+ */
+ if (!downlink)
+ {
+ ItemId iid;
+
+ LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
+ gistFindCorrectParent(rel, stack);
+ iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
+ downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
+ downlink = CopyIndexTuple(downlink);
+ LockBuffer(stack->parent->buffer, GIST_UNLOCK);
+ }
+
+ ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
+ GistTupleSetValid(downlink);
+
+ return downlink;
+}
+
+
+/*
+ * Complete the incomplete split of state->stack->page.
+ */
+static void
+gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
+{
+ GISTInsertStack *stack = state->stack;
+ Buffer buf;
+ Page page;
+ List *splitinfo = NIL;
+
+ ereport(LOG,
+ (errmsg("fixing incomplete split in index \"%s\", block %u",
+ RelationGetRelationName(state->r), stack->blkno)));
+
+ Assert(GistFollowRight(stack->page));
+ Assert(OffsetNumberIsValid(stack->downlinkoffnum));
+
+ buf = stack->buffer;
+
+ /*
+ * Read the chain of split pages, following the rightlinks. Construct a
+ * downlink tuple for each page.
+ */
+ for (;;)
+ {
+ GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
+ IndexTuple downlink;
+
+ page = BufferGetPage(buf);
+
+ /* Form the new downlink tuples to insert to parent */
+ downlink = gistformdownlink(state->r, buf, giststate, stack);
+
+ si->buf = buf;
+ si->downlink = downlink;
+
+ splitinfo = lappend(splitinfo, si);
+
+ if (GistFollowRight(page))
+ {
+ /* lock next page */
+ buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
+ LockBuffer(buf, GIST_EXCLUSIVE);
+ }
+ else
+ break;
+ }
+
+ /* Insert the downlinks */
+ gistfinishsplit(state, stack, giststate, splitinfo, false);
+}
+
+/*
+ * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
+ * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
+ * 'stack' represents the path from the root to the page being updated.
+ *
+ * The caller must hold an exclusive lock on stack->buffer. The lock is still
+ * held on return, but the page might not contain the inserted tuple if the
+ * page was split. The function returns true if the page was split, false
+ * otherwise.
+ */
+static bool
+gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
+{
+ return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
+ InvalidBuffer, InvalidBuffer, false, false);
+}
+
+/* ----------------
+ * An extended workhorse version of gistinserttuple(). This version allows
+ * inserting multiple tuples, or replacing a single tuple with multiple tuples.
+ * This is used to recursively update the downlinks in the parent when a page
+ * is split.
+ *
+ * If leftchild and rightchild are valid, we're inserting/replacing the
+ * downlink for rightchild, and leftchild is its left sibling. We clear the
+ * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
+ * insertion of the downlink.
+ *
+ * To avoid holding locks for longer than necessary, when recursing up the
+ * tree to update the parents, the locking is a bit peculiar here. On entry,
+ * the caller must hold an exclusive lock on stack->buffer, as well as
+ * leftchild and rightchild if given. On return:
+ *
+ * - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
+ * always kept pinned, however.
+ * - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
+ * is kept pinned.
+ * - Lock and pin on 'rightchild' are always released.
+ *
+ * Returns 'true' if the page had to be split. Note that if the page was
+ * split, the inserted/updated tuples might've been inserted to a right
+ * sibling of stack->buffer instead of stack->buffer itself.
+ */
+static bool
+gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate,
+ IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
+ Buffer leftchild, Buffer rightchild,
+ bool unlockbuf, bool unlockleftchild)
+{
+ List *splitinfo;
+ bool is_split;
+
+ /*
+ * Check for any rw conflicts (in serializable isolation level) just
+ * before we intend to modify the page
+ */
+ CheckForSerializableConflictIn(state->r, NULL, BufferGetBlockNumber(stack->buffer));
+
+ /* Insert the tuple(s) to the page, splitting the page if necessary */
+ is_split = gistplacetopage(state->r, state->freespace, giststate,
+ stack->buffer,
+ tuples, ntup,
+ oldoffnum, NULL,
+ leftchild,
+ &splitinfo,
+ true,
+ state->heapRel,
+ state->is_build);
+
+ /*
+ * Before recursing up in case the page was split, release locks on the
+ * child pages. We don't need to keep them locked when updating the
+ * parent.
+ */
+ if (BufferIsValid(rightchild))
+ UnlockReleaseBuffer(rightchild);
+ if (BufferIsValid(leftchild) && unlockleftchild)
+ LockBuffer(leftchild, GIST_UNLOCK);
+
+ /*
+ * If we had to split, insert/update the downlinks in the parent. If the
+ * caller requested us to release the lock on stack->buffer, tell
+ * gistfinishsplit() to do that as soon as it's safe to do so. If we
+ * didn't have to split, release it ourselves.
+ */
+ if (splitinfo)
+ gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
+ else if (unlockbuf)
+ LockBuffer(stack->buffer, GIST_UNLOCK);
+
+ return is_split;
+}
+
+/*
+ * Finish an incomplete split by inserting/updating the downlinks in parent
+ * page. 'splitinfo' contains all the child pages involved in the split,
+ * from left-to-right.
+ *
+ * On entry, the caller must hold a lock on stack->buffer and all the child
+ * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
+ * released on return. The child pages are always unlocked and unpinned.
+ */
+static void
+gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
+ GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
+{
+ GISTPageSplitInfo *right;
+ GISTPageSplitInfo *left;
+ IndexTuple tuples[2];
+
+ /* A split always contains at least two halves */
+ Assert(list_length(splitinfo) >= 2);
+
+ /*
+ * We need to insert downlinks for each new page, and update the downlink
+ * for the original (leftmost) page in the split. Begin at the rightmost
+ * page, inserting one downlink at a time until there's only two pages
+ * left. Finally insert the downlink for the last new page and update the
+ * downlink for the original page as one operation.
+ */
+ LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
+
+ /*
+ * Insert downlinks for the siblings from right to left, until there are
+ * only two siblings left.
+ */
+ for (int pos = list_length(splitinfo) - 1; pos > 1; pos--)
+ {
+ right = (GISTPageSplitInfo *) list_nth(splitinfo, pos);
+ left = (GISTPageSplitInfo *) list_nth(splitinfo, pos - 1);
+
+ gistFindCorrectParent(state->r, stack);
+ if (gistinserttuples(state, stack->parent, giststate,
+ &right->downlink, 1,
+ InvalidOffsetNumber,
+ left->buf, right->buf, false, false))
+ {
+ /*
+ * If the parent page was split, the existing downlink might have
+ * moved.
+ */
+ stack->downlinkoffnum = InvalidOffsetNumber;
+ }
+ /* gistinserttuples() released the lock on right->buf. */
+ }
+
+ right = (GISTPageSplitInfo *) lsecond(splitinfo);
+ left = (GISTPageSplitInfo *) linitial(splitinfo);
+
+ /*
+ * Finally insert downlink for the remaining right page and update the
+ * downlink for the original page to not contain the tuples that were
+ * moved to the new pages.
+ */
+ tuples[0] = left->downlink;
+ tuples[1] = right->downlink;
+ gistFindCorrectParent(state->r, stack);
+ if (gistinserttuples(state, stack->parent, giststate,
+ tuples, 2,
+ stack->downlinkoffnum,
+ left->buf, right->buf,
+ true, /* Unlock parent */
+ unlockbuf /* Unlock stack->buffer if caller wants
+ * that */
+ ))
+ {
+ /*
+ * If the parent page was split, the downlink might have moved.
+ */
+ stack->downlinkoffnum = InvalidOffsetNumber;
+ }
+
+ Assert(left->buf == stack->buffer);
+
+ /*
+ * If we split the page because we had to adjust the downlink on an
+ * internal page, while descending the tree for inserting a new tuple,
+ * then this might no longer be the correct page for the new tuple. The
+ * downlink to this page might not cover the new tuple anymore, it might
+ * need to go to the newly-created right sibling instead. Tell the caller
+ * to walk back up the stack, to re-check at the parent which page to
+ * insert to.
+ *
+ * Normally, the LSN-NSN interlock during the tree descend would also
+ * detect that a concurrent split happened (by ourselves), and cause us to
+ * retry at the parent. But that mechanism doesn't work during index
+ * build, because we don't do WAL-logging, and don't update LSNs, during
+ * index build.
+ */
+ stack->retry_from_parent = true;
+}
+
+/*
+ * gistSplit -- split a page in the tree and fill struct
+ * used for XLOG and real writes buffers. Function is recursive, ie
+ * it will split page until keys will fit in every page.
+ */
+SplitedPageLayout *
+gistSplit(Relation r,
+ Page page,
+ IndexTuple *itup, /* contains compressed entry */
+ int len,
+ GISTSTATE *giststate)
+{
+ IndexTuple *lvectup,
+ *rvectup;
+ GistSplitVector v;
+ int i;
+ SplitedPageLayout *res = NULL;
+
+ /* this should never recurse very deeply, but better safe than sorry */
+ check_stack_depth();
+
+ /* there's no point in splitting an empty page */
+ Assert(len > 0);
+
+ /*
+ * If a single tuple doesn't fit on a page, no amount of splitting will
+ * help.
+ */
+ if (len == 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
+ IndexTupleSize(itup[0]), GiSTPageSize,
+ RelationGetRelationName(r))));
+
+ memset(v.spl_lisnull, true,
+ sizeof(bool) * giststate->nonLeafTupdesc->natts);
+ memset(v.spl_risnull, true,
+ sizeof(bool) * giststate->nonLeafTupdesc->natts);
+ gistSplitByKey(r, page, itup, len, giststate, &v, 0);
+
+ /* form left and right vector */
+ lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+ rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
+
+ for (i = 0; i < v.splitVector.spl_nleft; i++)
+ lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
+
+ for (i = 0; i < v.splitVector.spl_nright; i++)
+ rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
+
+ /* finalize splitting (may need another split) */
+ if (!gistfitpage(rvectup, v.splitVector.spl_nright))
+ {
+ res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
+ }
+ else
+ {
+ ROTATEDIST(res);
+ res->block.num = v.splitVector.spl_nright;
+ res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
+ res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
+ }
+
+ if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
+ {
+ SplitedPageLayout *resptr,
+ *subres;
+
+ resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
+
+ /* install on list's tail */
+ while (resptr->next)
+ resptr = resptr->next;
+
+ resptr->next = res;
+ res = subres;
+ }
+ else
+ {
+ ROTATEDIST(res);
+ res->block.num = v.splitVector.spl_nleft;
+ res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
+ res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
+ }
+
+ return res;
+}
+
+/*
+ * Create a GISTSTATE and fill it with information about the index
+ */
+GISTSTATE *
+initGISTstate(Relation index)
+{
+ GISTSTATE *giststate;
+ MemoryContext scanCxt;
+ MemoryContext oldCxt;
+ int i;
+
+ /* safety check to protect fixed-size arrays in GISTSTATE */
+ if (index->rd_att->natts > INDEX_MAX_KEYS)
+ elog(ERROR, "numberOfAttributes %d > %d",
+ index->rd_att->natts, INDEX_MAX_KEYS);
+
+ /* Create the memory context that will hold the GISTSTATE */
+ scanCxt = AllocSetContextCreate(CurrentMemoryContext,
+ "GiST scan context",
+ ALLOCSET_DEFAULT_SIZES);
+ oldCxt = MemoryContextSwitchTo(scanCxt);
+
+ /* Create and fill in the GISTSTATE */
+ giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
+
+ giststate->scanCxt = scanCxt;
+ giststate->tempCxt = scanCxt; /* caller must change this if needed */
+ giststate->leafTupdesc = index->rd_att;
+
+ /*
+ * The truncated tupdesc for non-leaf index tuples, which doesn't contain
+ * the INCLUDE attributes.
+ *
+ * It is used to form tuples during tuple adjustment and page split.
+ * B-tree creates shortened tuple descriptor for every truncated tuple,
+ * because it is doing this less often: it does not have to form truncated
+ * tuples during page split. Also, B-tree is not adjusting tuples on
+ * internal pages the way GiST does.
+ */
+ giststate->nonLeafTupdesc = CreateTupleDescCopyConstr(index->rd_att);
+ giststate->nonLeafTupdesc->natts =
+ IndexRelationGetNumberOfKeyAttributes(index);
+
+ for (i = 0; i < IndexRelationGetNumberOfKeyAttributes(index); i++)
+ {
+ fmgr_info_copy(&(giststate->consistentFn[i]),
+ index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
+ scanCxt);
+ fmgr_info_copy(&(giststate->unionFn[i]),
+ index_getprocinfo(index, i + 1, GIST_UNION_PROC),
+ scanCxt);
+
+ /* opclasses are not required to provide a Compress method */
+ if (OidIsValid(index_getprocid(index, i + 1, GIST_COMPRESS_PROC)))
+ fmgr_info_copy(&(giststate->compressFn[i]),
+ index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
+ scanCxt);
+ else
+ giststate->compressFn[i].fn_oid = InvalidOid;
+
+ /* opclasses are not required to provide a Decompress method */
+ if (OidIsValid(index_getprocid(index, i + 1, GIST_DECOMPRESS_PROC)))
+ fmgr_info_copy(&(giststate->decompressFn[i]),
+ index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
+ scanCxt);
+ else
+ giststate->decompressFn[i].fn_oid = InvalidOid;
+
+ fmgr_info_copy(&(giststate->penaltyFn[i]),
+ index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
+ scanCxt);
+ fmgr_info_copy(&(giststate->picksplitFn[i]),
+ index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
+ scanCxt);
+ fmgr_info_copy(&(giststate->equalFn[i]),
+ index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
+ scanCxt);
+
+ /* opclasses are not required to provide a Distance method */
+ if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
+ fmgr_info_copy(&(giststate->distanceFn[i]),
+ index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
+ scanCxt);
+ else
+ giststate->distanceFn[i].fn_oid = InvalidOid;
+
+ /* opclasses are not required to provide a Fetch method */
+ if (OidIsValid(index_getprocid(index, i + 1, GIST_FETCH_PROC)))
+ fmgr_info_copy(&(giststate->fetchFn[i]),
+ index_getprocinfo(index, i + 1, GIST_FETCH_PROC),
+ scanCxt);
+ else
+ giststate->fetchFn[i].fn_oid = InvalidOid;
+
+ /*
+ * If the index column has a specified collation, we should honor that
+ * while doing comparisons. However, we may have a collatable storage
+ * type for a noncollatable indexed data type. If there's no index
+ * collation then specify default collation in case the support
+ * functions need collation. This is harmless if the support
+ * functions don't care about collation, so we just do it
+ * unconditionally. (We could alternatively call get_typcollation,
+ * but that seems like expensive overkill --- there aren't going to be
+ * any cases where a GiST storage type has a nondefault collation.)
+ */
+ if (OidIsValid(index->rd_indcollation[i]))
+ giststate->supportCollation[i] = index->rd_indcollation[i];
+ else
+ giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
+ }
+
+ /* No opclass information for INCLUDE attributes */
+ for (; i < index->rd_att->natts; i++)
+ {
+ giststate->consistentFn[i].fn_oid = InvalidOid;
+ giststate->unionFn[i].fn_oid = InvalidOid;
+ giststate->compressFn[i].fn_oid = InvalidOid;
+ giststate->decompressFn[i].fn_oid = InvalidOid;
+ giststate->penaltyFn[i].fn_oid = InvalidOid;
+ giststate->picksplitFn[i].fn_oid = InvalidOid;
+ giststate->equalFn[i].fn_oid = InvalidOid;
+ giststate->distanceFn[i].fn_oid = InvalidOid;
+ giststate->fetchFn[i].fn_oid = InvalidOid;
+ giststate->supportCollation[i] = InvalidOid;
+ }
+
+ MemoryContextSwitchTo(oldCxt);
+
+ return giststate;
+}
+
+void
+freeGISTstate(GISTSTATE *giststate)
+{
+ /* It's sufficient to delete the scanCxt */
+ MemoryContextDelete(giststate->scanCxt);
+}
+
+/*
+ * gistprunepage() -- try to remove LP_DEAD items from the given page.
+ * Function assumes that buffer is exclusively locked.
+ */
+static void
+gistprunepage(Relation rel, Page page, Buffer buffer, Relation heapRel)
+{
+ OffsetNumber deletable[MaxIndexTuplesPerPage];
+ int ndeletable = 0;
+ OffsetNumber offnum,
+ maxoff;
+
+ Assert(GistPageIsLeaf(page));
+
+ /*
+ * Scan over all items to see which ones need to be deleted according to
+ * LP_DEAD flags.
+ */
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page, offnum);
+
+ if (ItemIdIsDead(itemId))
+ deletable[ndeletable++] = offnum;
+ }
+
+ if (ndeletable > 0)
+ {
+ TransactionId latestRemovedXid = InvalidTransactionId;
+
+ if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
+ latestRemovedXid =
+ index_compute_xid_horizon_for_tuples(rel, heapRel, buffer,
+ deletable, ndeletable);
+
+ START_CRIT_SECTION();
+
+ PageIndexMultiDelete(page, deletable, ndeletable);
+
+ /*
+ * Mark the page as not containing any LP_DEAD items. This is not
+ * certainly true (there might be some that have recently been marked,
+ * but weren't included in our target-item list), but it will almost
+ * always be true and it doesn't seem worth an additional page scan to
+ * check it. Remember that F_HAS_GARBAGE is only a hint anyway.
+ */
+ GistClearPageHasGarbage(page);
+
+ MarkBufferDirty(buffer);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(rel))
+ {
+ XLogRecPtr recptr;
+
+ recptr = gistXLogDelete(buffer,
+ deletable, ndeletable,
+ latestRemovedXid);
+
+ PageSetLSN(page, recptr);
+ }
+ else
+ PageSetLSN(page, gistGetFakeLSN(rel));
+
+ END_CRIT_SECTION();
+ }
+
+ /*
+ * Note: if we didn't find any LP_DEAD items, then the page's
+ * F_HAS_GARBAGE hint bit is falsely set. We do not bother expending a
+ * separate write to clear it, however. We will clear it when we split
+ * the page.
+ */
+}