summaryrefslogtreecommitdiffstats
path: root/src/backend/access/gin/ginfast.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/access/gin/ginfast.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/access/gin/ginfast.c')
-rw-r--r--src/backend/access/gin/ginfast.c1068
1 files changed, 1068 insertions, 0 deletions
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
new file mode 100644
index 0000000..e0d9940
--- /dev/null
+++ b/src/backend/access/gin/ginfast.c
@@ -0,0 +1,1068 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginfast.c
+ * Fast insert routines for the Postgres inverted index access method.
+ * Pending entries are stored in linear list of pages. Later on
+ * (typically during VACUUM), ginInsertCleanup() will be invoked to
+ * transfer pending entries into the regular index structure. This
+ * wins because bulk insertion is much more efficient than retail.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/gin/ginfast.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/gin_private.h"
+#include "access/ginxlog.h"
+#include "access/xlog.h"
+#include "access/xloginsert.h"
+#include "catalog/pg_am.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "port/pg_bitutils.h"
+#include "postmaster/autovacuum.h"
+#include "storage/indexfsm.h"
+#include "storage/lmgr.h"
+#include "storage/predicate.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+/* GUC parameter */
+int gin_pending_list_limit = 0;
+
+#define GIN_PAGE_FREESIZE \
+ ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
+
+typedef struct KeyArray
+{
+ Datum *keys; /* expansible array */
+ GinNullCategory *categories; /* another expansible array */
+ int32 nvalues; /* current number of valid entries */
+ int32 maxvalues; /* allocated size of arrays */
+} KeyArray;
+
+
+/*
+ * Build a pending-list page from the given array of tuples, and write it out.
+ *
+ * Returns amount of free space left on the page.
+ */
+static int32
+writeListPage(Relation index, Buffer buffer,
+ IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
+{
+ Page page = BufferGetPage(buffer);
+ int32 i,
+ freesize,
+ size = 0;
+ OffsetNumber l,
+ off;
+ PGAlignedBlock workspace;
+ char *ptr;
+
+ START_CRIT_SECTION();
+
+ GinInitBuffer(buffer, GIN_LIST);
+
+ off = FirstOffsetNumber;
+ ptr = workspace.data;
+
+ for (i = 0; i < ntuples; i++)
+ {
+ int this_size = IndexTupleSize(tuples[i]);
+
+ memcpy(ptr, tuples[i], this_size);
+ ptr += this_size;
+ size += this_size;
+
+ l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
+
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(index));
+
+ off++;
+ }
+
+ Assert(size <= BLCKSZ); /* else we overran workspace */
+
+ GinPageGetOpaque(page)->rightlink = rightlink;
+
+ /*
+ * tail page may contain only whole row(s) or final part of row placed on
+ * previous pages (a "row" here meaning all the index tuples generated for
+ * one heap tuple)
+ */
+ if (rightlink == InvalidBlockNumber)
+ {
+ GinPageSetFullRow(page);
+ GinPageGetOpaque(page)->maxoff = 1;
+ }
+ else
+ {
+ GinPageGetOpaque(page)->maxoff = 0;
+ }
+
+ MarkBufferDirty(buffer);
+
+ if (RelationNeedsWAL(index))
+ {
+ ginxlogInsertListPage data;
+ XLogRecPtr recptr;
+
+ data.rightlink = rightlink;
+ data.ntuples = ntuples;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &data, sizeof(ginxlogInsertListPage));
+
+ XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
+ XLogRegisterBufData(0, workspace.data, size);
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE);
+ PageSetLSN(page, recptr);
+ }
+
+ /* get free space before releasing buffer */
+ freesize = PageGetExactFreeSpace(page);
+
+ UnlockReleaseBuffer(buffer);
+
+ END_CRIT_SECTION();
+
+ return freesize;
+}
+
+static void
+makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
+ GinMetaPageData *res)
+{
+ Buffer curBuffer = InvalidBuffer;
+ Buffer prevBuffer = InvalidBuffer;
+ int i,
+ size = 0,
+ tupsize;
+ int startTuple = 0;
+
+ Assert(ntuples > 0);
+
+ /*
+ * Split tuples into pages
+ */
+ for (i = 0; i < ntuples; i++)
+ {
+ if (curBuffer == InvalidBuffer)
+ {
+ curBuffer = GinNewBuffer(index);
+
+ if (prevBuffer != InvalidBuffer)
+ {
+ res->nPendingPages++;
+ writeListPage(index, prevBuffer,
+ tuples + startTuple,
+ i - startTuple,
+ BufferGetBlockNumber(curBuffer));
+ }
+ else
+ {
+ res->head = BufferGetBlockNumber(curBuffer);
+ }
+
+ prevBuffer = curBuffer;
+ startTuple = i;
+ size = 0;
+ }
+
+ tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
+
+ if (size + tupsize > GinListPageSize)
+ {
+ /* won't fit, force a new page and reprocess */
+ i--;
+ curBuffer = InvalidBuffer;
+ }
+ else
+ {
+ size += tupsize;
+ }
+ }
+
+ /*
+ * Write last page
+ */
+ res->tail = BufferGetBlockNumber(curBuffer);
+ res->tailFreeSize = writeListPage(index, curBuffer,
+ tuples + startTuple,
+ ntuples - startTuple,
+ InvalidBlockNumber);
+ res->nPendingPages++;
+ /* that was only one heap tuple */
+ res->nPendingHeapTuples = 1;
+}
+
+/*
+ * Write the index tuples contained in *collector into the index's
+ * pending list.
+ *
+ * Function guarantees that all these tuples will be inserted consecutively,
+ * preserving order
+ */
+void
+ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
+{
+ Relation index = ginstate->index;
+ Buffer metabuffer;
+ Page metapage;
+ GinMetaPageData *metadata = NULL;
+ Buffer buffer = InvalidBuffer;
+ Page page = NULL;
+ ginxlogUpdateMeta data;
+ bool separateList = false;
+ bool needCleanup = false;
+ int cleanupSize;
+ bool needWal;
+
+ if (collector->ntuples == 0)
+ return;
+
+ needWal = RelationNeedsWAL(index);
+
+ data.node = index->rd_node;
+ data.ntuples = 0;
+ data.newRightlink = data.prevTail = InvalidBlockNumber;
+
+ metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
+ metapage = BufferGetPage(metabuffer);
+
+ /*
+ * An insertion to the pending list could logically belong anywhere in the
+ * tree, so it conflicts with all serializable scans. All scans acquire a
+ * predicate lock on the metabuffer to represent that.
+ */
+ CheckForSerializableConflictIn(index, NULL, GIN_METAPAGE_BLKNO);
+
+ if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
+ {
+ /*
+ * Total size is greater than one page => make sublist
+ */
+ separateList = true;
+ }
+ else
+ {
+ LockBuffer(metabuffer, GIN_EXCLUSIVE);
+ metadata = GinPageGetMeta(metapage);
+
+ if (metadata->head == InvalidBlockNumber ||
+ collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
+ {
+ /*
+ * Pending list is empty or total size is greater than freespace
+ * on tail page => make sublist
+ *
+ * We unlock metabuffer to keep high concurrency
+ */
+ separateList = true;
+ LockBuffer(metabuffer, GIN_UNLOCK);
+ }
+ }
+
+ if (separateList)
+ {
+ /*
+ * We should make sublist separately and append it to the tail
+ */
+ GinMetaPageData sublist;
+
+ memset(&sublist, 0, sizeof(GinMetaPageData));
+ makeSublist(index, collector->tuples, collector->ntuples, &sublist);
+
+ if (needWal)
+ XLogBeginInsert();
+
+ /*
+ * metapage was unlocked, see above
+ */
+ LockBuffer(metabuffer, GIN_EXCLUSIVE);
+ metadata = GinPageGetMeta(metapage);
+
+ if (metadata->head == InvalidBlockNumber)
+ {
+ /*
+ * Main list is empty, so just insert sublist as main list
+ */
+ START_CRIT_SECTION();
+
+ metadata->head = sublist.head;
+ metadata->tail = sublist.tail;
+ metadata->tailFreeSize = sublist.tailFreeSize;
+
+ metadata->nPendingPages = sublist.nPendingPages;
+ metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
+ }
+ else
+ {
+ /*
+ * Merge lists
+ */
+ data.prevTail = metadata->tail;
+ data.newRightlink = sublist.head;
+
+ buffer = ReadBuffer(index, metadata->tail);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+
+ Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
+
+ START_CRIT_SECTION();
+
+ GinPageGetOpaque(page)->rightlink = sublist.head;
+
+ MarkBufferDirty(buffer);
+
+ metadata->tail = sublist.tail;
+ metadata->tailFreeSize = sublist.tailFreeSize;
+
+ metadata->nPendingPages += sublist.nPendingPages;
+ metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
+
+ if (needWal)
+ XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
+ }
+ }
+ else
+ {
+ /*
+ * Insert into tail page. Metapage is already locked
+ */
+ OffsetNumber l,
+ off;
+ int i,
+ tupsize;
+ char *ptr;
+ char *collectordata;
+
+ buffer = ReadBuffer(index, metadata->tail);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+
+ off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ collectordata = ptr = (char *) palloc(collector->sumsize);
+
+ data.ntuples = collector->ntuples;
+
+ if (needWal)
+ XLogBeginInsert();
+
+ START_CRIT_SECTION();
+
+ /*
+ * Increase counter of heap tuples
+ */
+ Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
+ GinPageGetOpaque(page)->maxoff++;
+ metadata->nPendingHeapTuples++;
+
+ for (i = 0; i < collector->ntuples; i++)
+ {
+ tupsize = IndexTupleSize(collector->tuples[i]);
+ l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
+
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(index));
+
+ memcpy(ptr, collector->tuples[i], tupsize);
+ ptr += tupsize;
+
+ off++;
+ }
+
+ Assert((ptr - collectordata) <= collector->sumsize);
+ if (needWal)
+ {
+ XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
+ XLogRegisterBufData(1, collectordata, collector->sumsize);
+ }
+
+ metadata->tailFreeSize = PageGetExactFreeSpace(page);
+
+ MarkBufferDirty(buffer);
+ }
+
+ /*
+ * Set pd_lower just past the end of the metadata. This is essential,
+ * because without doing so, metadata will be lost if xlog.c compresses
+ * the page. (We must do this here because pre-v11 versions of PG did not
+ * set the metapage's pd_lower correctly, so a pg_upgraded index might
+ * contain the wrong value.)
+ */
+ ((PageHeader) metapage)->pd_lower =
+ ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
+
+ /*
+ * Write metabuffer, make xlog entry
+ */
+ MarkBufferDirty(metabuffer);
+
+ if (needWal)
+ {
+ XLogRecPtr recptr;
+
+ memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+
+ XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
+ XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
+ PageSetLSN(metapage, recptr);
+
+ if (buffer != InvalidBuffer)
+ {
+ PageSetLSN(page, recptr);
+ }
+ }
+
+ if (buffer != InvalidBuffer)
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * Force pending list cleanup when it becomes too long. And,
+ * ginInsertCleanup could take significant amount of time, so we prefer to
+ * call it when it can do all the work in a single collection cycle. In
+ * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
+ * while pending list is still small enough to fit into
+ * gin_pending_list_limit.
+ *
+ * ginInsertCleanup() should not be called inside our CRIT_SECTION.
+ */
+ cleanupSize = GinGetPendingListCleanupSize(index);
+ if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L)
+ needCleanup = true;
+
+ UnlockReleaseBuffer(metabuffer);
+
+ END_CRIT_SECTION();
+
+ /*
+ * Since it could contend with concurrent cleanup process we cleanup
+ * pending list not forcibly.
+ */
+ if (needCleanup)
+ ginInsertCleanup(ginstate, false, true, false, NULL);
+}
+
+/*
+ * Create temporary index tuples for a single indexable item (one index column
+ * for the heap tuple specified by ht_ctid), and append them to the array
+ * in *collector. They will subsequently be written out using
+ * ginHeapTupleFastInsert. Note that to guarantee consistent state, all
+ * temp tuples for a given heap tuple must be written in one call to
+ * ginHeapTupleFastInsert.
+ */
+void
+ginHeapTupleFastCollect(GinState *ginstate,
+ GinTupleCollector *collector,
+ OffsetNumber attnum, Datum value, bool isNull,
+ ItemPointer ht_ctid)
+{
+ Datum *entries;
+ GinNullCategory *categories;
+ int32 i,
+ nentries;
+
+ /*
+ * Extract the key values that need to be inserted in the index
+ */
+ entries = ginExtractEntries(ginstate, attnum, value, isNull,
+ &nentries, &categories);
+
+ /*
+ * Protect against integer overflow in allocation calculations
+ */
+ if (nentries < 0 ||
+ collector->ntuples + nentries > MaxAllocSize / sizeof(IndexTuple))
+ elog(ERROR, "too many entries for GIN index");
+
+ /*
+ * Allocate/reallocate memory for storing collected tuples
+ */
+ if (collector->tuples == NULL)
+ {
+ /*
+ * Determine the number of elements to allocate in the tuples array
+ * initially. Make it a power of 2 to avoid wasting memory when
+ * resizing (since palloc likes powers of 2).
+ */
+ collector->lentuples = pg_nextpower2_32(Max(16, nentries));
+ collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
+ }
+ else if (collector->lentuples < collector->ntuples + nentries)
+ {
+ /*
+ * Advance lentuples to the next suitable power of 2. This won't
+ * overflow, though we could get to a value that exceeds
+ * MaxAllocSize/sizeof(IndexTuple), causing an error in repalloc.
+ */
+ collector->lentuples = pg_nextpower2_32(collector->ntuples + nentries);
+ collector->tuples = (IndexTuple *) repalloc(collector->tuples,
+ sizeof(IndexTuple) * collector->lentuples);
+ }
+
+ /*
+ * Build an index tuple for each key value, and add to array. In pending
+ * tuples we just stick the heap TID into t_tid.
+ */
+ for (i = 0; i < nentries; i++)
+ {
+ IndexTuple itup;
+
+ itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
+ NULL, 0, 0, true);
+ itup->t_tid = *ht_ctid;
+ collector->tuples[collector->ntuples++] = itup;
+ collector->sumsize += IndexTupleSize(itup);
+ }
+}
+
+/*
+ * Deletes pending list pages up to (not including) newHead page.
+ * If newHead == InvalidBlockNumber then function drops the whole list.
+ *
+ * metapage is pinned and exclusive-locked throughout this function.
+ */
+static void
+shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
+ bool fill_fsm, IndexBulkDeleteResult *stats)
+{
+ Page metapage;
+ GinMetaPageData *metadata;
+ BlockNumber blknoToDelete;
+
+ metapage = BufferGetPage(metabuffer);
+ metadata = GinPageGetMeta(metapage);
+ blknoToDelete = metadata->head;
+
+ do
+ {
+ Page page;
+ int i;
+ int64 nDeletedHeapTuples = 0;
+ ginxlogDeleteListPages data;
+ Buffer buffers[GIN_NDELETE_AT_ONCE];
+ BlockNumber freespace[GIN_NDELETE_AT_ONCE];
+
+ data.ndeleted = 0;
+ while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
+ {
+ freespace[data.ndeleted] = blknoToDelete;
+ buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
+ LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
+ page = BufferGetPage(buffers[data.ndeleted]);
+
+ data.ndeleted++;
+
+ Assert(!GinPageIsDeleted(page));
+
+ nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
+ blknoToDelete = GinPageGetOpaque(page)->rightlink;
+ }
+
+ if (stats)
+ stats->pages_deleted += data.ndeleted;
+
+ /*
+ * This operation touches an unusually large number of pages, so
+ * prepare the XLogInsert machinery for that before entering the
+ * critical section.
+ */
+ if (RelationNeedsWAL(index))
+ XLogEnsureRecordSpace(data.ndeleted, 0);
+
+ START_CRIT_SECTION();
+
+ metadata->head = blknoToDelete;
+
+ Assert(metadata->nPendingPages >= data.ndeleted);
+ metadata->nPendingPages -= data.ndeleted;
+ Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
+ metadata->nPendingHeapTuples -= nDeletedHeapTuples;
+
+ if (blknoToDelete == InvalidBlockNumber)
+ {
+ metadata->tail = InvalidBlockNumber;
+ metadata->tailFreeSize = 0;
+ metadata->nPendingPages = 0;
+ metadata->nPendingHeapTuples = 0;
+ }
+
+ /*
+ * Set pd_lower just past the end of the metadata. This is essential,
+ * because without doing so, metadata will be lost if xlog.c
+ * compresses the page. (We must do this here because pre-v11
+ * versions of PG did not set the metapage's pd_lower correctly, so a
+ * pg_upgraded index might contain the wrong value.)
+ */
+ ((PageHeader) metapage)->pd_lower =
+ ((char *) metadata + sizeof(GinMetaPageData)) - (char *) metapage;
+
+ MarkBufferDirty(metabuffer);
+
+ for (i = 0; i < data.ndeleted; i++)
+ {
+ page = BufferGetPage(buffers[i]);
+ GinPageGetOpaque(page)->flags = GIN_DELETED;
+ MarkBufferDirty(buffers[i]);
+ }
+
+ if (RelationNeedsWAL(index))
+ {
+ XLogRecPtr recptr;
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, metabuffer,
+ REGBUF_WILL_INIT | REGBUF_STANDARD);
+ for (i = 0; i < data.ndeleted; i++)
+ XLogRegisterBuffer(i + 1, buffers[i], REGBUF_WILL_INIT);
+
+ memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
+
+ XLogRegisterData((char *) &data,
+ sizeof(ginxlogDeleteListPages));
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE);
+ PageSetLSN(metapage, recptr);
+
+ for (i = 0; i < data.ndeleted; i++)
+ {
+ page = BufferGetPage(buffers[i]);
+ PageSetLSN(page, recptr);
+ }
+ }
+
+ for (i = 0; i < data.ndeleted; i++)
+ UnlockReleaseBuffer(buffers[i]);
+
+ END_CRIT_SECTION();
+
+ for (i = 0; fill_fsm && i < data.ndeleted; i++)
+ RecordFreeIndexPage(index, freespace[i]);
+
+ } while (blknoToDelete != newHead);
+}
+
+/* Initialize empty KeyArray */
+static void
+initKeyArray(KeyArray *keys, int32 maxvalues)
+{
+ keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
+ keys->categories = (GinNullCategory *)
+ palloc(sizeof(GinNullCategory) * maxvalues);
+ keys->nvalues = 0;
+ keys->maxvalues = maxvalues;
+}
+
+/* Add datum to KeyArray, resizing if needed */
+static void
+addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
+{
+ if (keys->nvalues >= keys->maxvalues)
+ {
+ keys->maxvalues *= 2;
+ keys->keys = (Datum *)
+ repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
+ keys->categories = (GinNullCategory *)
+ repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
+ }
+
+ keys->keys[keys->nvalues] = datum;
+ keys->categories[keys->nvalues] = category;
+ keys->nvalues++;
+}
+
+/*
+ * Collect data from a pending-list page in preparation for insertion into
+ * the main index.
+ *
+ * Go through all tuples >= startoff on page and collect values in accum
+ *
+ * Note that ka is just workspace --- it does not carry any state across
+ * calls.
+ */
+static void
+processPendingPage(BuildAccumulator *accum, KeyArray *ka,
+ Page page, OffsetNumber startoff)
+{
+ ItemPointerData heapptr;
+ OffsetNumber i,
+ maxoff;
+ OffsetNumber attrnum;
+
+ /* reset *ka to empty */
+ ka->nvalues = 0;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ Assert(maxoff >= FirstOffsetNumber);
+ ItemPointerSetInvalid(&heapptr);
+ attrnum = 0;
+
+ for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+ OffsetNumber curattnum;
+ Datum curkey;
+ GinNullCategory curcategory;
+
+ /* Check for change of heap TID or attnum */
+ curattnum = gintuple_get_attrnum(accum->ginstate, itup);
+
+ if (!ItemPointerIsValid(&heapptr))
+ {
+ heapptr = itup->t_tid;
+ attrnum = curattnum;
+ }
+ else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
+ curattnum == attrnum))
+ {
+ /*
+ * ginInsertBAEntries can insert several datums per call, but only
+ * for one heap tuple and one column. So call it at a boundary,
+ * and reset ka.
+ */
+ ginInsertBAEntries(accum, &heapptr, attrnum,
+ ka->keys, ka->categories, ka->nvalues);
+ ka->nvalues = 0;
+ heapptr = itup->t_tid;
+ attrnum = curattnum;
+ }
+
+ /* Add key to KeyArray */
+ curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
+ addDatum(ka, curkey, curcategory);
+ }
+
+ /* Dump out all remaining keys */
+ ginInsertBAEntries(accum, &heapptr, attrnum,
+ ka->keys, ka->categories, ka->nvalues);
+}
+
+/*
+ * Move tuples from pending pages into regular GIN structure.
+ *
+ * On first glance it looks completely not crash-safe. But if we crash
+ * after posting entries to the main index and before removing them from the
+ * pending list, it's okay because when we redo the posting later on, nothing
+ * bad will happen.
+ *
+ * fill_fsm indicates that ginInsertCleanup should add deleted pages
+ * to FSM otherwise caller is responsible to put deleted pages into
+ * FSM.
+ *
+ * If stats isn't null, we count deleted pending pages into the counts.
+ */
+void
+ginInsertCleanup(GinState *ginstate, bool full_clean,
+ bool fill_fsm, bool forceCleanup,
+ IndexBulkDeleteResult *stats)
+{
+ Relation index = ginstate->index;
+ Buffer metabuffer,
+ buffer;
+ Page metapage,
+ page;
+ GinMetaPageData *metadata;
+ MemoryContext opCtx,
+ oldCtx;
+ BuildAccumulator accum;
+ KeyArray datums;
+ BlockNumber blkno,
+ blknoFinish;
+ bool cleanupFinish = false;
+ bool fsm_vac = false;
+ Size workMemory;
+
+ /*
+ * We would like to prevent concurrent cleanup process. For that we will
+ * lock metapage in exclusive mode using LockPage() call. Nobody other
+ * will use that lock for metapage, so we keep possibility of concurrent
+ * insertion into pending list
+ */
+
+ if (forceCleanup)
+ {
+ /*
+ * We are called from [auto]vacuum/analyze or gin_clean_pending_list()
+ * and we would like to wait concurrent cleanup to finish.
+ */
+ LockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
+ workMemory =
+ (IsAutoVacuumWorkerProcess() && autovacuum_work_mem != -1) ?
+ autovacuum_work_mem : maintenance_work_mem;
+ }
+ else
+ {
+ /*
+ * We are called from regular insert and if we see concurrent cleanup
+ * just exit in hope that concurrent process will clean up pending
+ * list.
+ */
+ if (!ConditionalLockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock))
+ return;
+ workMemory = work_mem;
+ }
+
+ metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
+ LockBuffer(metabuffer, GIN_SHARE);
+ metapage = BufferGetPage(metabuffer);
+ metadata = GinPageGetMeta(metapage);
+
+ if (metadata->head == InvalidBlockNumber)
+ {
+ /* Nothing to do */
+ UnlockReleaseBuffer(metabuffer);
+ UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
+ return;
+ }
+
+ /*
+ * Remember a tail page to prevent infinite cleanup if other backends add
+ * new tuples faster than we can cleanup.
+ */
+ blknoFinish = metadata->tail;
+
+ /*
+ * Read and lock head of pending list
+ */
+ blkno = metadata->head;
+ buffer = ReadBuffer(index, blkno);
+ LockBuffer(buffer, GIN_SHARE);
+ page = BufferGetPage(buffer);
+
+ LockBuffer(metabuffer, GIN_UNLOCK);
+
+ /*
+ * Initialize. All temporary space will be in opCtx
+ */
+ opCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "GIN insert cleanup temporary context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldCtx = MemoryContextSwitchTo(opCtx);
+
+ initKeyArray(&datums, 128);
+ ginInitBA(&accum);
+ accum.ginstate = ginstate;
+
+ /*
+ * At the top of this loop, we have pin and lock on the current page of
+ * the pending list. However, we'll release that before exiting the loop.
+ * Note we also have pin but not lock on the metapage.
+ */
+ for (;;)
+ {
+ Assert(!GinPageIsDeleted(page));
+
+ /*
+ * Are we walk through the page which as we remember was a tail when
+ * we start our cleanup? But if caller asks us to clean up whole
+ * pending list then ignore old tail, we will work until list becomes
+ * empty.
+ */
+ if (blkno == blknoFinish && full_clean == false)
+ cleanupFinish = true;
+
+ /*
+ * read page's datums into accum
+ */
+ processPendingPage(&accum, &datums, page, FirstOffsetNumber);
+
+ vacuum_delay_point();
+
+ /*
+ * Is it time to flush memory to disk? Flush if we are at the end of
+ * the pending list, or if we have a full row and memory is getting
+ * full.
+ */
+ if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
+ (GinPageHasFullRow(page) &&
+ (accum.allocatedMemory >= workMemory * 1024L)))
+ {
+ ItemPointerData *list;
+ uint32 nlist;
+ Datum key;
+ GinNullCategory category;
+ OffsetNumber maxoff,
+ attnum;
+
+ /*
+ * Unlock current page to increase performance. Changes of page
+ * will be checked later by comparing maxoff after completion of
+ * memory flush.
+ */
+ maxoff = PageGetMaxOffsetNumber(page);
+ LockBuffer(buffer, GIN_UNLOCK);
+
+ /*
+ * Moving collected data into regular structure can take
+ * significant amount of time - so, run it without locking pending
+ * list.
+ */
+ ginBeginBAScan(&accum);
+ while ((list = ginGetBAEntry(&accum,
+ &attnum, &key, &category, &nlist)) != NULL)
+ {
+ ginEntryInsert(ginstate, attnum, key, category,
+ list, nlist, NULL);
+ vacuum_delay_point();
+ }
+
+ /*
+ * Lock the whole list to remove pages
+ */
+ LockBuffer(metabuffer, GIN_EXCLUSIVE);
+ LockBuffer(buffer, GIN_SHARE);
+
+ Assert(!GinPageIsDeleted(page));
+
+ /*
+ * While we left the page unlocked, more stuff might have gotten
+ * added to it. If so, process those entries immediately. There
+ * shouldn't be very many, so we don't worry about the fact that
+ * we're doing this with exclusive lock. Insertion algorithm
+ * guarantees that inserted row(s) will not continue on next page.
+ * NOTE: intentionally no vacuum_delay_point in this loop.
+ */
+ if (PageGetMaxOffsetNumber(page) != maxoff)
+ {
+ ginInitBA(&accum);
+ processPendingPage(&accum, &datums, page, maxoff + 1);
+
+ ginBeginBAScan(&accum);
+ while ((list = ginGetBAEntry(&accum,
+ &attnum, &key, &category, &nlist)) != NULL)
+ ginEntryInsert(ginstate, attnum, key, category,
+ list, nlist, NULL);
+ }
+
+ /*
+ * Remember next page - it will become the new list head
+ */
+ blkno = GinPageGetOpaque(page)->rightlink;
+ UnlockReleaseBuffer(buffer); /* shiftList will do exclusive
+ * locking */
+
+ /*
+ * remove read pages from pending list, at this point all content
+ * of read pages is in regular structure
+ */
+ shiftList(index, metabuffer, blkno, fill_fsm, stats);
+
+ /* At this point, some pending pages have been freed up */
+ fsm_vac = true;
+
+ Assert(blkno == metadata->head);
+ LockBuffer(metabuffer, GIN_UNLOCK);
+
+ /*
+ * if we removed the whole pending list or we cleanup tail (which
+ * we remembered on start our cleanup process) then just exit
+ */
+ if (blkno == InvalidBlockNumber || cleanupFinish)
+ break;
+
+ /*
+ * release memory used so far and reinit state
+ */
+ MemoryContextReset(opCtx);
+ initKeyArray(&datums, datums.maxvalues);
+ ginInitBA(&accum);
+ }
+ else
+ {
+ blkno = GinPageGetOpaque(page)->rightlink;
+ UnlockReleaseBuffer(buffer);
+ }
+
+ /*
+ * Read next page in pending list
+ */
+ vacuum_delay_point();
+ buffer = ReadBuffer(index, blkno);
+ LockBuffer(buffer, GIN_SHARE);
+ page = BufferGetPage(buffer);
+ }
+
+ UnlockPage(index, GIN_METAPAGE_BLKNO, ExclusiveLock);
+ ReleaseBuffer(metabuffer);
+
+ /*
+ * As pending list pages can have a high churn rate, it is desirable to
+ * recycle them immediately to the FreeSpaceMap when ordinary backends
+ * clean the list.
+ */
+ if (fsm_vac && fill_fsm)
+ IndexFreeSpaceMapVacuum(index);
+
+ /* Clean up temporary space */
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextDelete(opCtx);
+}
+
+/*
+ * SQL-callable function to clean the insert pending list
+ */
+Datum
+gin_clean_pending_list(PG_FUNCTION_ARGS)
+{
+ Oid indexoid = PG_GETARG_OID(0);
+ Relation indexRel = index_open(indexoid, RowExclusiveLock);
+ IndexBulkDeleteResult stats;
+ GinState ginstate;
+
+ if (RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is in progress"),
+ errhint("GIN pending list cannot be cleaned up during recovery.")));
+
+ /* Must be a GIN index */
+ if (indexRel->rd_rel->relkind != RELKIND_INDEX ||
+ indexRel->rd_rel->relam != GIN_AM_OID)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a GIN index",
+ RelationGetRelationName(indexRel))));
+
+ /*
+ * Reject attempts to read non-local temporary relations; we would be
+ * likely to get wrong data since we have no visibility into the owning
+ * session's local buffers.
+ */
+ if (RELATION_IS_OTHER_TEMP(indexRel))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot access temporary indexes of other sessions")));
+
+ /* User must own the index (comparable to privileges needed for VACUUM) */
+ if (!pg_class_ownercheck(indexoid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
+ RelationGetRelationName(indexRel));
+
+ memset(&stats, 0, sizeof(stats));
+ initGinState(&ginstate, indexRel);
+ ginInsertCleanup(&ginstate, true, true, true, &stats);
+
+ index_close(indexRel, RowExclusiveLock);
+
+ PG_RETURN_INT64((int64) stats.pages_deleted);
+}