summaryrefslogtreecommitdiffstats
path: root/src/backend/utils/sort
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/backend/utils/sort
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/utils/sort')
-rw-r--r--src/backend/utils/sort/Makefile25
-rw-r--r--src/backend/utils/sort/logtape.c1193
-rw-r--r--src/backend/utils/sort/qsort_interruptible.c16
-rw-r--r--src/backend/utils/sort/sharedtuplestore.c631
-rw-r--r--src/backend/utils/sort/sortsupport.c211
-rw-r--r--src/backend/utils/sort/tuplesort.c4938
-rw-r--r--src/backend/utils/sort/tuplestore.c1552
7 files changed, 8566 insertions, 0 deletions
diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile
new file mode 100644
index 0000000..2c31fd4
--- /dev/null
+++ b/src/backend/utils/sort/Makefile
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for utils/sort
+#
+# IDENTIFICATION
+# src/backend/utils/sort/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/utils/sort
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
+
+OBJS = \
+ logtape.o \
+ qsort_interruptible.o \
+ sharedtuplestore.o \
+ sortsupport.o \
+ tuplesort.o \
+ tuplestore.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
new file mode 100644
index 0000000..31db75d
--- /dev/null
+++ b/src/backend/utils/sort/logtape.c
@@ -0,0 +1,1193 @@
+/*-------------------------------------------------------------------------
+ *
+ * logtape.c
+ * Management of "logical tapes" within temporary files.
+ *
+ * This module exists to support sorting via multiple merge passes (see
+ * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
+ * we implement it on disk by creating a separate file for each "tape",
+ * there is an annoying problem: the peak space usage is at least twice
+ * the volume of actual data to be sorted. (This must be so because each
+ * datum will appear in both the input and output tapes of the final
+ * merge pass.)
+ *
+ * We can work around this problem by recognizing that any one tape
+ * dataset (with the possible exception of the final output) is written
+ * and read exactly once in a perfectly sequential manner. Therefore,
+ * a datum once read will not be required again, and we can recycle its
+ * space for use by the new tape dataset(s) being generated. In this way,
+ * the total space usage is essentially just the actual data volume, plus
+ * insignificant bookkeeping and start/stop overhead.
+ *
+ * Few OSes allow arbitrary parts of a file to be released back to the OS,
+ * so we have to implement this space-recycling ourselves within a single
+ * logical file. logtape.c exists to perform this bookkeeping and provide
+ * the illusion of N independent tape devices to tuplesort.c. Note that
+ * logtape.c itself depends on buffile.c to provide a "logical file" of
+ * larger size than the underlying OS may support.
+ *
+ * For simplicity, we allocate and release space in the underlying file
+ * in BLCKSZ-size blocks. Space allocation boils down to keeping track
+ * of which blocks in the underlying file belong to which logical tape,
+ * plus any blocks that are free (recycled and not yet reused).
+ * The blocks in each logical tape form a chain, with a prev- and next-
+ * pointer in each block.
+ *
+ * The initial write pass is guaranteed to fill the underlying file
+ * perfectly sequentially, no matter how data is divided into logical tapes.
+ * Once we begin merge passes, the access pattern becomes considerably
+ * less predictable --- but the seeking involved should be comparable to
+ * what would happen if we kept each logical tape in a separate file,
+ * so there's no serious performance penalty paid to obtain the space
+ * savings of recycling. We try to localize the write accesses by always
+ * writing to the lowest-numbered free block when we have a choice; it's
+ * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
+ * policy for free blocks would be better?)
+ *
+ * To further make the I/Os more sequential, we can use a larger buffer
+ * when reading, and read multiple blocks from the same tape in one go,
+ * whenever the buffer becomes empty.
+ *
+ * To support the above policy of writing to the lowest free block, the
+ * freelist is a min heap.
+ *
+ * Since all the bookkeeping and buffer memory is allocated with palloc(),
+ * and the underlying file(s) are made with OpenTemporaryFile, all resources
+ * for a logical tape set are certain to be cleaned up even if processing
+ * is aborted by ereport(ERROR). To avoid confusion, the caller should take
+ * care that all calls for a single LogicalTapeSet are made in the same
+ * palloc context.
+ *
+ * To support parallel sort operations involving coordinated callers to
+ * tuplesort.c routines across multiple workers, it is necessary to
+ * concatenate each worker BufFile/tapeset into one single logical tapeset
+ * managed by the leader. Workers should have produced one final
+ * materialized tape (their entire output) when this happens in leader.
+ * There will always be the same number of runs as input tapes, and the same
+ * number of input tapes as participants (worker Tuplesortstates).
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/sort/logtape.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <fcntl.h>
+
+#include "storage/buffile.h"
+#include "utils/builtins.h"
+#include "utils/logtape.h"
+#include "utils/memdebug.h"
+#include "utils/memutils.h"
+
+/*
+ * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
+ *
+ * The first block of a tape has prev == -1. The last block of a tape
+ * stores the number of valid bytes on the block, inverted, in 'next'
+ * Therefore next < 0 indicates the last block.
+ */
+typedef struct TapeBlockTrailer
+{
+ long prev; /* previous block on this tape, or -1 on first
+ * block */
+ long next; /* next block on this tape, or # of valid
+ * bytes on last block (if < 0) */
+} TapeBlockTrailer;
+
+#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
+#define TapeBlockGetTrailer(buf) \
+ ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
+
+#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
+#define TapeBlockGetNBytes(buf) \
+ (TapeBlockIsLast(buf) ? \
+ (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
+#define TapeBlockSetNBytes(buf, nbytes) \
+ (TapeBlockGetTrailer(buf)->next = -(nbytes))
+
+/*
+ * When multiple tapes are being written to concurrently (as in HashAgg),
+ * avoid excessive fragmentation by preallocating block numbers to individual
+ * tapes. Each preallocation doubles in size starting at
+ * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
+ *
+ * No filesystem operations are performed for preallocation; only the block
+ * numbers are reserved. This may lead to sparse writes, which will cause
+ * ltsWriteBlock() to fill in holes with zeros.
+ */
+#define TAPE_WRITE_PREALLOC_MIN 8
+#define TAPE_WRITE_PREALLOC_MAX 128
+
+/*
+ * This data structure represents a single "logical tape" within the set
+ * of logical tapes stored in the same file.
+ *
+ * While writing, we hold the current partially-written data block in the
+ * buffer. While reading, we can hold multiple blocks in the buffer. Note
+ * that we don't retain the trailers of a block when it's read into the
+ * buffer. The buffer therefore contains one large contiguous chunk of data
+ * from the tape.
+ */
+struct LogicalTape
+{
+ LogicalTapeSet *tapeSet; /* tape set this tape is part of */
+
+ bool writing; /* T while in write phase */
+ bool frozen; /* T if blocks should not be freed when read */
+ bool dirty; /* does buffer need to be written? */
+
+ /*
+ * Block numbers of the first, current, and next block of the tape.
+ *
+ * The "current" block number is only valid when writing, or reading from
+ * a frozen tape. (When reading from an unfrozen tape, we use a larger
+ * read buffer that holds multiple blocks, so the "current" block is
+ * ambiguous.)
+ *
+ * When concatenation of worker tape BufFiles is performed, an offset to
+ * the first block in the unified BufFile space is applied during reads.
+ */
+ long firstBlockNumber;
+ long curBlockNumber;
+ long nextBlockNumber;
+ long offsetBlockNumber;
+
+ /*
+ * Buffer for current data block(s).
+ */
+ char *buffer; /* physical buffer (separately palloc'd) */
+ int buffer_size; /* allocated size of the buffer */
+ int max_size; /* highest useful, safe buffer_size */
+ int pos; /* next read/write position in buffer */
+ int nbytes; /* total # of valid bytes in buffer */
+
+ /*
+ * Preallocated block numbers are held in an array sorted in descending
+ * order; blocks are consumed from the end of the array (lowest block
+ * numbers first).
+ */
+ long *prealloc;
+ int nprealloc; /* number of elements in list */
+ int prealloc_size; /* number of elements list can hold */
+};
+
+/*
+ * This data structure represents a set of related "logical tapes" sharing
+ * space in a single underlying file. (But that "file" may be multiple files
+ * if needed to escape OS limits on file size; buffile.c handles that for us.)
+ * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
+ * demand.
+ */
+struct LogicalTapeSet
+{
+ BufFile *pfile; /* underlying file for whole tape set */
+ SharedFileSet *fileset;
+ int worker; /* worker # if shared, -1 for leader/serial */
+
+ /*
+ * File size tracking. nBlocksWritten is the size of the underlying file,
+ * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
+ * by ltsReleaseBlock(), and it is always greater than or equal to
+ * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
+ * blocks that have been allocated for a tape, but have not been written
+ * to the underlying file yet. nHoleBlocks tracks the total number of
+ * blocks that are in unused holes between worker spaces following BufFile
+ * concatenation.
+ */
+ long nBlocksAllocated; /* # of blocks allocated */
+ long nBlocksWritten; /* # of blocks used in underlying file */
+ long nHoleBlocks; /* # of "hole" blocks left */
+
+ /*
+ * We store the numbers of recycled-and-available blocks in freeBlocks[].
+ * When there are no such blocks, we extend the underlying file.
+ *
+ * If forgetFreeSpace is true then any freed blocks are simply forgotten
+ * rather than being remembered in freeBlocks[]. See notes for
+ * LogicalTapeSetForgetFreeSpace().
+ */
+ bool forgetFreeSpace; /* are we remembering free blocks? */
+ long *freeBlocks; /* resizable array holding minheap */
+ long nFreeBlocks; /* # of currently free blocks */
+ Size freeBlocksLen; /* current allocated length of freeBlocks[] */
+ bool enable_prealloc; /* preallocate write blocks? */
+};
+
+static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
+static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
+static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
+static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
+static long ltsGetFreeBlock(LogicalTapeSet *lts);
+static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
+static void ltsInitReadBuffer(LogicalTape *lt);
+
+
+/*
+ * Write a block-sized buffer to the specified block of the underlying file.
+ *
+ * No need for an error return convention; we ereport() on any error.
+ */
+static void
+ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
+{
+ /*
+ * BufFile does not support "holes", so if we're about to write a block
+ * that's past the current end of file, fill the space between the current
+ * end of file and the target block with zeros.
+ *
+ * This can happen either when tapes preallocate blocks; or for the last
+ * block of a tape which might not have been flushed.
+ *
+ * Note that BufFile concatenation can leave "holes" in BufFile between
+ * worker-owned block ranges. These are tracked for reporting purposes
+ * only. We never read from nor write to these hole blocks, and so they
+ * are not considered here.
+ */
+ while (blocknum > lts->nBlocksWritten)
+ {
+ PGAlignedBlock zerobuf;
+
+ MemSet(zerobuf.data, 0, sizeof(zerobuf));
+
+ ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
+ }
+
+ /* Write the requested block */
+ if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to block %ld of temporary file",
+ blocknum)));
+ BufFileWrite(lts->pfile, buffer, BLCKSZ);
+
+ /* Update nBlocksWritten, if we extended the file */
+ if (blocknum == lts->nBlocksWritten)
+ lts->nBlocksWritten++;
+}
+
+/*
+ * Read a block-sized buffer from the specified block of the underlying file.
+ *
+ * No need for an error return convention; we ereport() on any error. This
+ * module should never attempt to read a block it doesn't know is there.
+ */
+static void
+ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
+{
+ size_t nread;
+
+ if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to block %ld of temporary file",
+ blocknum)));
+ nread = BufFileRead(lts->pfile, buffer, BLCKSZ);
+ if (nread != BLCKSZ)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes",
+ blocknum, nread, (size_t) BLCKSZ)));
+}
+
+/*
+ * Read as many blocks as we can into the per-tape buffer.
+ *
+ * Returns true if anything was read, 'false' on EOF.
+ */
+static bool
+ltsReadFillBuffer(LogicalTape *lt)
+{
+ lt->pos = 0;
+ lt->nbytes = 0;
+
+ do
+ {
+ char *thisbuf = lt->buffer + lt->nbytes;
+ long datablocknum = lt->nextBlockNumber;
+
+ /* Fetch next block number */
+ if (datablocknum == -1L)
+ break; /* EOF */
+ /* Apply worker offset, needed for leader tapesets */
+ datablocknum += lt->offsetBlockNumber;
+
+ /* Read the block */
+ ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
+ if (!lt->frozen)
+ ltsReleaseBlock(lt->tapeSet, datablocknum);
+ lt->curBlockNumber = lt->nextBlockNumber;
+
+ lt->nbytes += TapeBlockGetNBytes(thisbuf);
+ if (TapeBlockIsLast(thisbuf))
+ {
+ lt->nextBlockNumber = -1L;
+ /* EOF */
+ break;
+ }
+ else
+ lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
+
+ /* Advance to next block, if we have buffer space left */
+ } while (lt->buffer_size - lt->nbytes > BLCKSZ);
+
+ return (lt->nbytes > 0);
+}
+
+static inline unsigned long
+left_offset(unsigned long i)
+{
+ return 2 * i + 1;
+}
+
+static inline unsigned long
+right_offset(unsigned long i)
+{
+ return 2 * i + 2;
+}
+
+static inline unsigned long
+parent_offset(unsigned long i)
+{
+ return (i - 1) / 2;
+}
+
+/*
+ * Get the next block for writing.
+ */
+static long
+ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
+{
+ if (lts->enable_prealloc)
+ return ltsGetPreallocBlock(lts, lt);
+ else
+ return ltsGetFreeBlock(lts);
+}
+
+/*
+ * Select the lowest currently unused block from the tape set's global free
+ * list min heap.
+ */
+static long
+ltsGetFreeBlock(LogicalTapeSet *lts)
+{
+ long *heap = lts->freeBlocks;
+ long blocknum;
+ long heapsize;
+ long holeval;
+ unsigned long holepos;
+
+ /* freelist empty; allocate a new block */
+ if (lts->nFreeBlocks == 0)
+ return lts->nBlocksAllocated++;
+
+ /* easy if heap contains one element */
+ if (lts->nFreeBlocks == 1)
+ {
+ lts->nFreeBlocks--;
+ return lts->freeBlocks[0];
+ }
+
+ /* remove top of minheap */
+ blocknum = heap[0];
+
+ /* we'll replace it with end of minheap array */
+ holeval = heap[--lts->nFreeBlocks];
+
+ /* sift down */
+ holepos = 0; /* holepos is where the "hole" is */
+ heapsize = lts->nFreeBlocks;
+ while (true)
+ {
+ unsigned long left = left_offset(holepos);
+ unsigned long right = right_offset(holepos);
+ unsigned long min_child;
+
+ if (left < heapsize && right < heapsize)
+ min_child = (heap[left] < heap[right]) ? left : right;
+ else if (left < heapsize)
+ min_child = left;
+ else if (right < heapsize)
+ min_child = right;
+ else
+ break;
+
+ if (heap[min_child] >= holeval)
+ break;
+
+ heap[holepos] = heap[min_child];
+ holepos = min_child;
+ }
+ heap[holepos] = holeval;
+
+ return blocknum;
+}
+
+/*
+ * Return the lowest free block number from the tape's preallocation list.
+ * Refill the preallocation list with blocks from the tape set's free list if
+ * necessary.
+ */
+static long
+ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
+{
+ /* sorted in descending order, so return the last element */
+ if (lt->nprealloc > 0)
+ return lt->prealloc[--lt->nprealloc];
+
+ if (lt->prealloc == NULL)
+ {
+ lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
+ lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
+ }
+ else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
+ {
+ /* when the preallocation list runs out, double the size */
+ lt->prealloc_size *= 2;
+ if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
+ lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
+ lt->prealloc = (long *) repalloc(lt->prealloc,
+ sizeof(long) * lt->prealloc_size);
+ }
+
+ /* refill preallocation list */
+ lt->nprealloc = lt->prealloc_size;
+ for (int i = lt->nprealloc; i > 0; i--)
+ {
+ lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
+
+ /* verify descending order */
+ Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
+ }
+
+ return lt->prealloc[--lt->nprealloc];
+}
+
+/*
+ * Return a block# to the freelist.
+ */
+static void
+ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
+{
+ long *heap;
+ unsigned long holepos;
+
+ /*
+ * Do nothing if we're no longer interested in remembering free space.
+ */
+ if (lts->forgetFreeSpace)
+ return;
+
+ /*
+ * Enlarge freeBlocks array if full.
+ */
+ if (lts->nFreeBlocks >= lts->freeBlocksLen)
+ {
+ /*
+ * If the freelist becomes very large, just return and leak this free
+ * block.
+ */
+ if (lts->freeBlocksLen * 2 * sizeof(long) > MaxAllocSize)
+ return;
+
+ lts->freeBlocksLen *= 2;
+ lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
+ lts->freeBlocksLen * sizeof(long));
+ }
+
+ /* create a "hole" at end of minheap array */
+ heap = lts->freeBlocks;
+ holepos = lts->nFreeBlocks;
+ lts->nFreeBlocks++;
+
+ /* sift up to insert blocknum */
+ while (holepos != 0)
+ {
+ unsigned long parent = parent_offset(holepos);
+
+ if (heap[parent] < blocknum)
+ break;
+
+ heap[holepos] = heap[parent];
+ holepos = parent;
+ }
+ heap[holepos] = blocknum;
+}
+
+/*
+ * Lazily allocate and initialize the read buffer. This avoids waste when many
+ * tapes are open at once, but not all are active between rewinding and
+ * reading.
+ */
+static void
+ltsInitReadBuffer(LogicalTape *lt)
+{
+ Assert(lt->buffer_size > 0);
+ lt->buffer = palloc(lt->buffer_size);
+
+ /* Read the first block, or reset if tape is empty */
+ lt->nextBlockNumber = lt->firstBlockNumber;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ ltsReadFillBuffer(lt);
+}
+
+/*
+ * Create a tape set, backed by a temporary underlying file.
+ *
+ * The tape set is initially empty. Use LogicalTapeCreate() to create
+ * tapes in it.
+ *
+ * In a single-process sort, pass NULL argument for fileset, and -1 for
+ * worker.
+ *
+ * In a parallel sort, parallel workers pass the shared fileset handle and
+ * their own worker number. After the workers have finished, create the
+ * tape set in the leader, passing the shared fileset handle and -1 for
+ * worker, and use LogicalTapeImport() to import the worker tapes into it.
+ *
+ * Currently, the leader will only import worker tapes into the set, it does
+ * not create tapes of its own, although in principle that should work.
+ *
+ * If preallocate is true, blocks for each individual tape are allocated in
+ * batches. This avoids fragmentation when writing multiple tapes at the
+ * same time.
+ */
+LogicalTapeSet *
+LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
+{
+ LogicalTapeSet *lts;
+
+ /*
+ * Create top-level struct including per-tape LogicalTape structs.
+ */
+ lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
+ lts->nBlocksAllocated = 0L;
+ lts->nBlocksWritten = 0L;
+ lts->nHoleBlocks = 0L;
+ lts->forgetFreeSpace = false;
+ lts->freeBlocksLen = 32; /* reasonable initial guess */
+ lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
+ lts->nFreeBlocks = 0;
+ lts->enable_prealloc = preallocate;
+
+ lts->fileset = fileset;
+ lts->worker = worker;
+
+ /*
+ * Create temp BufFile storage as required.
+ *
+ * In leader, we hijack the BufFile of the first tape that's imported, and
+ * concatenate the BufFiles of any subsequent tapes to that. Hence don't
+ * create a BufFile here. Things are simpler for the worker case and the
+ * serial case, though. They are generally very similar -- workers use a
+ * shared fileset, whereas serial sorts use a conventional serial BufFile.
+ */
+ if (fileset && worker == -1)
+ lts->pfile = NULL;
+ else if (fileset)
+ {
+ char filename[MAXPGPATH];
+
+ pg_itoa(worker, filename);
+ lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
+ }
+ else
+ lts->pfile = BufFileCreateTemp(false);
+
+ return lts;
+}
+
+/*
+ * Claim ownership of a logical tape from an existing shared BufFile.
+ *
+ * Caller should be leader process. Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
+ */
+LogicalTape *
+LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
+{
+ LogicalTape *lt;
+ long tapeblocks;
+ char filename[MAXPGPATH];
+ BufFile *file;
+ int64 filesize;
+
+ lt = ltsCreateTape(lts);
+
+ /*
+ * build concatenated view of all buffiles, remembering the block number
+ * where each source file begins.
+ */
+ pg_itoa(worker, filename);
+ file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
+ filesize = BufFileSize(file);
+
+ /*
+ * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
+ * block offset into each tape as we go.
+ */
+ lt->firstBlockNumber = shared->firstblocknumber;
+ if (lts->pfile == NULL)
+ {
+ lts->pfile = file;
+ lt->offsetBlockNumber = 0L;
+ }
+ else
+ {
+ lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+ }
+ /* Don't allocate more for read buffer than could possibly help */
+ lt->max_size = Min(MaxAllocSize, filesize);
+ tapeblocks = filesize / BLCKSZ;
+
+ /*
+ * Update # of allocated blocks and # blocks written to reflect the
+ * imported BufFile. Allocated/written blocks include space used by holes
+ * left between concatenated BufFiles. Also track the number of hole
+ * blocks so that we can later work backwards to calculate the number of
+ * physical blocks for instrumentation.
+ */
+ lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
+
+ lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+ lts->nBlocksWritten = lts->nBlocksAllocated;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes! You must close them
+ * first, or you can let them be destroyed along with the memory context.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+ BufFileClose(lts->pfile);
+ pfree(lts->freeBlocks);
+ pfree(lts);
+}
+
+/*
+ * Create a logical tape in the given tapeset.
+ *
+ * The tape is initialized in write state.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+ /*
+ * The only thing that currently prevents creating new tapes in leader is
+ * the fact that BufFiles opened using BufFileOpenShared() are read-only
+ * by definition, but that could be changed if it seemed worthwhile. For
+ * now, writing to the leader tape will raise a "Bad file descriptor"
+ * error, so tuplesort must avoid writing to the leader tape altogether.
+ */
+ if (lts->fileset && lts->worker == -1)
+ elog(ERROR, "cannot create new tapes in leader process");
+
+ return ltsCreateTape(lts);
+}
+
+static LogicalTape *
+ltsCreateTape(LogicalTapeSet *lts)
+{
+ LogicalTape *lt;
+
+ /*
+ * Create per-tape struct. Note we allocate the I/O buffer lazily.
+ */
+ lt = palloc(sizeof(LogicalTape));
+ lt->tapeSet = lts;
+ lt->writing = true;
+ lt->frozen = false;
+ lt->dirty = false;
+ lt->firstBlockNumber = -1L;
+ lt->curBlockNumber = -1L;
+ lt->nextBlockNumber = -1L;
+ lt->offsetBlockNumber = 0L;
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ /* palloc() larger than MaxAllocSize would fail */
+ lt->max_size = MaxAllocSize;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ lt->prealloc = NULL;
+ lt->nprealloc = 0;
+ lt->prealloc_size = 0;
+
+ return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list! You must read
+ * the tape to the end first, to reuse the space. In current use, though,
+ * we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+ if (lt->buffer)
+ pfree(lt->buffer);
+ pfree(lt);
+}
+
+/*
+ * Mark a logical tape set as not needing management of free space anymore.
+ *
+ * This should be called if the caller does not intend to write any more data
+ * into the tape set, but is reading from un-frozen tapes. Since no more
+ * writes are planned, remembering free blocks is no longer useful. Setting
+ * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
+ * is not designed to handle large numbers of free blocks.
+ */
+void
+LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
+{
+ lts->forgetFreeSpace = true;
+}
+
+/*
+ * Write to a logical tape.
+ *
+ * There are no error returns; we ereport() on failure.
+ */
+void
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
+{
+ LogicalTapeSet *lts = lt->tapeSet;
+ size_t nthistime;
+
+ Assert(lt->writing);
+ Assert(lt->offsetBlockNumber == 0L);
+
+ /* Allocate data buffer and first block on first write */
+ if (lt->buffer == NULL)
+ {
+ lt->buffer = (char *) palloc(BLCKSZ);
+ lt->buffer_size = BLCKSZ;
+ }
+ if (lt->curBlockNumber == -1)
+ {
+ Assert(lt->firstBlockNumber == -1);
+ Assert(lt->pos == 0);
+
+ lt->curBlockNumber = ltsGetBlock(lts, lt);
+ lt->firstBlockNumber = lt->curBlockNumber;
+
+ TapeBlockGetTrailer(lt->buffer)->prev = -1L;
+ }
+
+ Assert(lt->buffer_size == BLCKSZ);
+ while (size > 0)
+ {
+ if (lt->pos >= (int) TapeBlockPayloadSize)
+ {
+ /* Buffer full, dump it out */
+ long nextBlockNumber;
+
+ if (!lt->dirty)
+ {
+ /* Hmm, went directly from reading to writing? */
+ elog(ERROR, "invalid logtape state: should be dirty");
+ }
+
+ /*
+ * First allocate the next block, so that we can store it in the
+ * 'next' pointer of this block.
+ */
+ nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
+
+ /* set the next-pointer and dump the current block. */
+ TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
+
+ /* initialize the prev-pointer of the next block */
+ TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
+ lt->curBlockNumber = nextBlockNumber;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ }
+
+ nthistime = TapeBlockPayloadSize - lt->pos;
+ if (nthistime > size)
+ nthistime = size;
+ Assert(nthistime > 0);
+
+ memcpy(lt->buffer + lt->pos, ptr, nthistime);
+
+ lt->dirty = true;
+ lt->pos += nthistime;
+ if (lt->nbytes < lt->pos)
+ lt->nbytes = lt->pos;
+ ptr = (void *) ((char *) ptr + nthistime);
+ size -= nthistime;
+ }
+}
+
+/*
+ * Rewind logical tape and switch from writing to reading.
+ *
+ * The tape must currently be in writing state, or "frozen" in read state.
+ *
+ * 'buffer_size' specifies how much memory to use for the read buffer.
+ * Regardless of the argument, the actual amount of memory used is between
+ * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
+ * rounded down and truncated to fit those constraints, if necessary. If the
+ * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
+ * byte buffer is used.
+ */
+void
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
+{
+ LogicalTapeSet *lts = lt->tapeSet;
+
+ /*
+ * Round and cap buffer_size if needed.
+ */
+ if (lt->frozen)
+ buffer_size = BLCKSZ;
+ else
+ {
+ /* need at least one block */
+ if (buffer_size < BLCKSZ)
+ buffer_size = BLCKSZ;
+
+ /* palloc() larger than max_size is unlikely to be helpful */
+ if (buffer_size > lt->max_size)
+ buffer_size = lt->max_size;
+
+ /* round down to BLCKSZ boundary */
+ buffer_size -= buffer_size % BLCKSZ;
+ }
+
+ if (lt->writing)
+ {
+ /*
+ * Completion of a write phase. Flush last partial data block, and
+ * rewind for normal (destructive) read.
+ */
+ if (lt->dirty)
+ {
+ /*
+ * As long as we've filled the buffer at least once, its contents
+ * are entirely defined from valgrind's point of view, even though
+ * contents beyond the current end point may be stale. But it's
+ * possible - at least in the case of a parallel sort - to sort
+ * such small amount of data that we do not fill the buffer even
+ * once. Tell valgrind that its contents are defined, so it
+ * doesn't bleat.
+ */
+ VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
+ lt->buffer_size - lt->nbytes);
+
+ TapeBlockSetNBytes(lt->buffer, lt->nbytes);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
+ }
+ lt->writing = false;
+ }
+ else
+ {
+ /*
+ * This is only OK if tape is frozen; we rewind for (another) read
+ * pass.
+ */
+ Assert(lt->frozen);
+ }
+
+ if (lt->buffer)
+ pfree(lt->buffer);
+
+ /* the buffer is lazily allocated, but set the size here */
+ lt->buffer = NULL;
+ lt->buffer_size = buffer_size;
+
+ /* free the preallocation list, and return unused block numbers */
+ if (lt->prealloc != NULL)
+ {
+ for (int i = lt->nprealloc; i > 0; i--)
+ ltsReleaseBlock(lts, lt->prealloc[i - 1]);
+ pfree(lt->prealloc);
+ lt->prealloc = NULL;
+ lt->nprealloc = 0;
+ lt->prealloc_size = 0;
+ }
+}
+
+/*
+ * Read from a logical tape.
+ *
+ * Early EOF is indicated by return value less than #bytes requested.
+ */
+size_t
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
+{
+ size_t nread = 0;
+ size_t nthistime;
+
+ Assert(!lt->writing);
+
+ if (lt->buffer == NULL)
+ ltsInitReadBuffer(lt);
+
+ while (size > 0)
+ {
+ if (lt->pos >= lt->nbytes)
+ {
+ /* Try to load more data into buffer. */
+ if (!ltsReadFillBuffer(lt))
+ break; /* EOF */
+ }
+
+ nthistime = lt->nbytes - lt->pos;
+ if (nthistime > size)
+ nthistime = size;
+ Assert(nthistime > 0);
+
+ memcpy(ptr, lt->buffer + lt->pos, nthistime);
+
+ lt->pos += nthistime;
+ ptr = (void *) ((char *) ptr + nthistime);
+ size -= nthistime;
+ nread += nthistime;
+ }
+
+ return nread;
+}
+
+/*
+ * "Freeze" the contents of a tape so that it can be read multiple times
+ * and/or read backwards. Once a tape is frozen, its contents will not
+ * be released until the LogicalTapeSet is destroyed. This is expected
+ * to be used only for the final output pass of a merge.
+ *
+ * This *must* be called just at the end of a write pass, before the
+ * tape is rewound (after rewind is too late!). It performs a rewind
+ * and switch to read mode "for free". An immediately following rewind-
+ * for-read call is OK but not necessary.
+ *
+ * share output argument is set with details of storage used for tape after
+ * freezing, which may be passed to LogicalTapeSetCreate within leader
+ * process later. This metadata is only of interest to worker callers
+ * freezing their final output for leader (single materialized tape).
+ * Serial sorts should set share to NULL.
+ */
+void
+LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
+{
+ LogicalTapeSet *lts = lt->tapeSet;
+
+ Assert(lt->writing);
+ Assert(lt->offsetBlockNumber == 0L);
+
+ /*
+ * Completion of a write phase. Flush last partial data block, and rewind
+ * for nondestructive read.
+ */
+ if (lt->dirty)
+ {
+ /*
+ * As long as we've filled the buffer at least once, its contents are
+ * entirely defined from valgrind's point of view, even though
+ * contents beyond the current end point may be stale. But it's
+ * possible - at least in the case of a parallel sort - to sort such
+ * small amount of data that we do not fill the buffer even once. Tell
+ * valgrind that its contents are defined, so it doesn't bleat.
+ */
+ VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
+ lt->buffer_size - lt->nbytes);
+
+ TapeBlockSetNBytes(lt->buffer, lt->nbytes);
+ ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
+ }
+ lt->writing = false;
+ lt->frozen = true;
+
+ /*
+ * The seek and backspace functions assume a single block read buffer.
+ * That's OK with current usage. A larger buffer is helpful to make the
+ * read pattern of the backing file look more sequential to the OS, when
+ * we're reading from multiple tapes. But at the end of a sort, when a
+ * tape is frozen, we only read from a single tape anyway.
+ */
+ if (!lt->buffer || lt->buffer_size != BLCKSZ)
+ {
+ if (lt->buffer)
+ pfree(lt->buffer);
+ lt->buffer = palloc(BLCKSZ);
+ lt->buffer_size = BLCKSZ;
+ }
+
+ /* Read the first block, or reset if tape is empty */
+ lt->curBlockNumber = lt->firstBlockNumber;
+ lt->pos = 0;
+ lt->nbytes = 0;
+
+ if (lt->firstBlockNumber == -1L)
+ lt->nextBlockNumber = -1L;
+ ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
+ if (TapeBlockIsLast(lt->buffer))
+ lt->nextBlockNumber = -1L;
+ else
+ lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
+ lt->nbytes = TapeBlockGetNBytes(lt->buffer);
+
+ /* Handle extra steps when caller is to share its tapeset */
+ if (share)
+ {
+ BufFileExportFileSet(lts->pfile);
+ share->firstblocknumber = lt->firstBlockNumber;
+ }
+}
+
+/*
+ * Backspace the tape a given number of bytes. (We also support a more
+ * general seek interface, see below.)
+ *
+ * *Only* a frozen-for-read tape can be backed up; we don't support
+ * random access during write, and an unfrozen read tape may have
+ * already discarded the desired data!
+ *
+ * Returns the number of bytes backed up. It can be less than the
+ * requested amount, if there isn't that much data before the current
+ * position. The tape is positioned to the beginning of the tape in
+ * that case.
+ */
+size_t
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
+{
+ size_t seekpos = 0;
+
+ Assert(lt->frozen);
+ Assert(lt->buffer_size == BLCKSZ);
+
+ if (lt->buffer == NULL)
+ ltsInitReadBuffer(lt);
+
+ /*
+ * Easy case for seek within current block.
+ */
+ if (size <= (size_t) lt->pos)
+ {
+ lt->pos -= (int) size;
+ return size;
+ }
+
+ /*
+ * Not-so-easy case, have to walk back the chain of blocks. This
+ * implementation would be pretty inefficient for long seeks, but we
+ * really aren't doing that (a seek over one tuple is typical).
+ */
+ seekpos = (size_t) lt->pos; /* part within this block */
+ while (size > seekpos)
+ {
+ long prev = TapeBlockGetTrailer(lt->buffer)->prev;
+
+ if (prev == -1L)
+ {
+ /* Tried to back up beyond the beginning of tape. */
+ if (lt->curBlockNumber != lt->firstBlockNumber)
+ elog(ERROR, "unexpected end of tape");
+ lt->pos = 0;
+ return seekpos;
+ }
+
+ ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
+
+ if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
+ elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
+ prev,
+ TapeBlockGetTrailer(lt->buffer)->next,
+ lt->curBlockNumber);
+
+ lt->nbytes = TapeBlockPayloadSize;
+ lt->curBlockNumber = prev;
+ lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
+
+ seekpos += TapeBlockPayloadSize;
+ }
+
+ /*
+ * 'seekpos' can now be greater than 'size', because it points to the
+ * beginning the target block. The difference is the position within the
+ * page.
+ */
+ lt->pos = seekpos - size;
+ return size;
+}
+
+/*
+ * Seek to an arbitrary position in a logical tape.
+ *
+ * *Only* a frozen-for-read tape can be seeked.
+ *
+ * Must be called with a block/offset previously returned by
+ * LogicalTapeTell().
+ */
+void
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
+{
+ Assert(lt->frozen);
+ Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
+ Assert(lt->buffer_size == BLCKSZ);
+
+ if (lt->buffer == NULL)
+ ltsInitReadBuffer(lt);
+
+ if (blocknum != lt->curBlockNumber)
+ {
+ ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
+ lt->curBlockNumber = blocknum;
+ lt->nbytes = TapeBlockPayloadSize;
+ lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
+ }
+
+ if (offset > lt->nbytes)
+ elog(ERROR, "invalid tape seek position");
+ lt->pos = offset;
+}
+
+/*
+ * Obtain current position in a form suitable for a later LogicalTapeSeek.
+ *
+ * NOTE: it'd be OK to do this during write phase with intention of using
+ * the position for a seek after freezing. Not clear if anyone needs that.
+ */
+void
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
+{
+ if (lt->buffer == NULL)
+ ltsInitReadBuffer(lt);
+
+ Assert(lt->offsetBlockNumber == 0L);
+
+ /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
+ Assert(lt->buffer_size == BLCKSZ);
+
+ *blocknum = lt->curBlockNumber;
+ *offset = lt->pos;
+}
+
+/*
+ * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
+ *
+ * This should not be called while there are open write buffers; otherwise it
+ * may not account for buffered data.
+ */
+long
+LogicalTapeSetBlocks(LogicalTapeSet *lts)
+{
+ return lts->nBlocksWritten - lts->nHoleBlocks;
+}
diff --git a/src/backend/utils/sort/qsort_interruptible.c b/src/backend/utils/sort/qsort_interruptible.c
new file mode 100644
index 0000000..f179b25
--- /dev/null
+++ b/src/backend/utils/sort/qsort_interruptible.c
@@ -0,0 +1,16 @@
+/*
+ * qsort_interruptible.c: qsort_arg that includes CHECK_FOR_INTERRUPTS
+ */
+
+#include "postgres.h"
+#include "miscadmin.h"
+
+#define ST_SORT qsort_interruptible
+#define ST_ELEMENT_TYPE_VOID
+#define ST_COMPARATOR_TYPE_NAME qsort_arg_comparator
+#define ST_COMPARE_RUNTIME_POINTER
+#define ST_COMPARE_ARG_TYPE void
+#define ST_SCOPE
+#define ST_DEFINE
+#define ST_CHECK_FOR_INTERRUPTS
+#include "lib/sort_template.h"
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
new file mode 100644
index 0000000..464d4c5
--- /dev/null
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -0,0 +1,631 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.c
+ * Simple mechanism for sharing tuples between backends.
+ *
+ * This module contains a shared temporary tuple storage mechanism providing
+ * a parallel-aware subset of the features of tuplestore.c. Multiple backends
+ * can write to a SharedTuplestore, and then multiple backends can later scan
+ * the stored tuples. Currently, the only scan type supported is a parallel
+ * scan where each backend reads an arbitrary subset of the tuples that were
+ * written.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/sort/sharedtuplestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup.h"
+#include "access/htup_details.h"
+#include "miscadmin.h"
+#include "storage/buffile.h"
+#include "storage/lwlock.h"
+#include "storage/sharedfileset.h"
+#include "utils/sharedtuplestore.h"
+
+/*
+ * The size of chunks, in pages. This is somewhat arbitrarily set to match
+ * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
+ * at approximately the same rate as it allocates new chunks of memory to
+ * insert them into.
+ */
+#define STS_CHUNK_PAGES 4
+#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
+#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
+
+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+ int ntuples; /* Number of tuples in this chunk. */
+ int overflow; /* If overflow, how many including this one? */
+ char data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+ LWLock lock;
+ BlockNumber read_page; /* Page number for next read. */
+ BlockNumber npages; /* Number of pages written. */
+ bool writing; /* Used only for assertions. */
+} SharedTuplestoreParticipant;
+
+/* The control object that lives in shared memory. */
+struct SharedTuplestore
+{
+ int nparticipants; /* Number of participants that can write. */
+ int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
+ size_t meta_data_size; /* Size of per-tuple header. */
+ char name[NAMEDATALEN]; /* A name for this tuplestore. */
+
+ /* Followed by per-participant shared state. */
+ SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* Per-participant state that lives in backend-local memory. */
+struct SharedTuplestoreAccessor
+{
+ int participant; /* My participant number. */
+ SharedTuplestore *sts; /* The shared state. */
+ SharedFileSet *fileset; /* The SharedFileSet holding files. */
+ MemoryContext context; /* Memory context for buffers. */
+
+ /* State for reading. */
+ int read_participant; /* The current participant to read from. */
+ BufFile *read_file; /* The current file to read from. */
+ int read_ntuples_available; /* The number of tuples in chunk. */
+ int read_ntuples; /* How many tuples have we read from chunk? */
+ size_t read_bytes; /* How many bytes have we read from chunk? */
+ char *read_buffer; /* A buffer for loading tuples. */
+ size_t read_buffer_size;
+ BlockNumber read_next_page; /* Lowest block we'll consider reading. */
+
+ /* State for writing. */
+ SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
+ BufFile *write_file; /* The current file to write to. */
+ BlockNumber write_page; /* The next page to write to. */
+ char *write_pointer; /* Current write pointer within chunk. */
+ char *write_end; /* One past the end of the current chunk. */
+};
+
+static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
+ int participant);
+
+/*
+ * Return the amount of shared memory required to hold SharedTuplestore for a
+ * given number of participants.
+ */
+size_t
+sts_estimate(int participants)
+{
+ return offsetof(SharedTuplestore, participants) +
+ sizeof(SharedTuplestoreParticipant) * participants;
+}
+
+/*
+ * Initialize a SharedTuplestore in existing shared memory. There must be
+ * space for sts_estimate(participants) bytes. If flags includes the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).
+ *
+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple. This is useful for
+ * the hash values used in multi-batch hash joins, but could have other
+ * applications.
+ *
+ * The caller must supply a SharedFileSet, which is essentially a directory
+ * that will be cleaned up automatically, and a name which must be unique
+ * across all SharedTuplestores created in the same SharedFileSet.
+ */
+SharedTuplestoreAccessor *
+sts_initialize(SharedTuplestore *sts, int participants,
+ int my_participant_number,
+ size_t meta_data_size,
+ int flags,
+ SharedFileSet *fileset,
+ const char *name)
+{
+ SharedTuplestoreAccessor *accessor;
+ int i;
+
+ Assert(my_participant_number < participants);
+
+ sts->nparticipants = participants;
+ sts->meta_data_size = meta_data_size;
+ sts->flags = flags;
+
+ if (strlen(name) > sizeof(sts->name) - 1)
+ elog(ERROR, "SharedTuplestore name too long");
+ strcpy(sts->name, name);
+
+ /*
+ * Limit meta-data so it + tuple size always fits into a single chunk.
+ * sts_puttuple() and sts_read_tuple() could be made to support scenarios
+ * where that's not the case, but it's not currently required. If so,
+ * meta-data size probably should be made variable, too.
+ */
+ if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
+ elog(ERROR, "meta-data too long");
+
+ for (i = 0; i < participants; ++i)
+ {
+ LWLockInitialize(&sts->participants[i].lock,
+ LWTRANCHE_SHARED_TUPLESTORE);
+ sts->participants[i].read_page = 0;
+ sts->participants[i].npages = 0;
+ sts->participants[i].writing = false;
+ }
+
+ accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+ accessor->participant = my_participant_number;
+ accessor->sts = sts;
+ accessor->fileset = fileset;
+ accessor->context = CurrentMemoryContext;
+
+ return accessor;
+}
+
+/*
+ * Attach to a SharedTuplestore that has been initialized by another backend,
+ * so that this backend can read and write tuples.
+ */
+SharedTuplestoreAccessor *
+sts_attach(SharedTuplestore *sts,
+ int my_participant_number,
+ SharedFileSet *fileset)
+{
+ SharedTuplestoreAccessor *accessor;
+
+ Assert(my_participant_number < sts->nparticipants);
+
+ accessor = palloc0(sizeof(SharedTuplestoreAccessor));
+ accessor->participant = my_participant_number;
+ accessor->sts = sts;
+ accessor->fileset = fileset;
+ accessor->context = CurrentMemoryContext;
+
+ return accessor;
+}
+
+static void
+sts_flush_chunk(SharedTuplestoreAccessor *accessor)
+{
+ size_t size;
+
+ size = STS_CHUNK_PAGES * BLCKSZ;
+ BufFileWrite(accessor->write_file, accessor->write_chunk, size);
+ memset(accessor->write_chunk, 0, size);
+ accessor->write_pointer = &accessor->write_chunk->data[0];
+ accessor->sts->participants[accessor->participant].npages +=
+ STS_CHUNK_PAGES;
+}
+
+/*
+ * Finish writing tuples. This must be called by all backends that have
+ * written data before any backend begins reading it.
+ */
+void
+sts_end_write(SharedTuplestoreAccessor *accessor)
+{
+ if (accessor->write_file != NULL)
+ {
+ sts_flush_chunk(accessor);
+ BufFileClose(accessor->write_file);
+ pfree(accessor->write_chunk);
+ accessor->write_chunk = NULL;
+ accessor->write_file = NULL;
+ accessor->sts->participants[accessor->participant].writing = false;
+ }
+}
+
+/*
+ * Prepare to rescan. Only one participant must call this. After it returns,
+ * all participants may call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next(). This function must not be called concurrently
+ * with a scan, and synchronization to avoid that is the caller's
+ * responsibility.
+ */
+void
+sts_reinitialize(SharedTuplestoreAccessor *accessor)
+{
+ int i;
+
+ /*
+ * Reset the shared read head for all participants' files. Also set the
+ * initial chunk size to the minimum (any increases from that size will be
+ * recorded in chunk_expansion_log).
+ */
+ for (i = 0; i < accessor->sts->nparticipants; ++i)
+ {
+ accessor->sts->participants[i].read_page = 0;
+ }
+}
+
+/*
+ * Begin scanning the contents in parallel.
+ */
+void
+sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
+{
+ int i PG_USED_FOR_ASSERTS_ONLY;
+
+ /* End any existing scan that was in progress. */
+ sts_end_parallel_scan(accessor);
+
+ /*
+ * Any backend that might have written into this shared tuplestore must
+ * have called sts_end_write(), so that all buffers are flushed and the
+ * files have stopped growing.
+ */
+ for (i = 0; i < accessor->sts->nparticipants; ++i)
+ Assert(!accessor->sts->participants[i].writing);
+
+ /*
+ * We will start out reading the file that THIS backend wrote. There may
+ * be some caching locality advantage to that.
+ */
+ accessor->read_participant = accessor->participant;
+ accessor->read_file = NULL;
+ accessor->read_next_page = 0;
+}
+
+/*
+ * Finish a parallel scan, freeing associated backend-local resources.
+ */
+void
+sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
+{
+ /*
+ * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
+ * we'd probably need a reference count of current parallel scanners so we
+ * could safely do it only when the reference count reaches zero.
+ */
+ if (accessor->read_file != NULL)
+ {
+ BufFileClose(accessor->read_file);
+ accessor->read_file = NULL;
+ }
+}
+
+/*
+ * Write a tuple. If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+ MinimalTuple tuple)
+{
+ size_t size;
+
+ /* Do we have our own file yet? */
+ if (accessor->write_file == NULL)
+ {
+ SharedTuplestoreParticipant *participant;
+ char name[MAXPGPATH];
+
+ /* Create one. Only this backend will write into it. */
+ sts_filename(name, accessor, accessor->participant);
+ accessor->write_file =
+ BufFileCreateFileSet(&accessor->fileset->fs, name);
+
+ /* Set up the shared state for this backend's file. */
+ participant = &accessor->sts->participants[accessor->participant];
+ participant->writing = true; /* for assertions only */
+ }
+
+ /* Do we have space? */
+ size = accessor->sts->meta_data_size + tuple->t_len;
+ if (accessor->write_pointer + size > accessor->write_end)
+ {
+ if (accessor->write_chunk == NULL)
+ {
+ /* First time through. Allocate chunk. */
+ accessor->write_chunk = (SharedTuplestoreChunk *)
+ MemoryContextAllocZero(accessor->context,
+ STS_CHUNK_PAGES * BLCKSZ);
+ accessor->write_chunk->ntuples = 0;
+ accessor->write_pointer = &accessor->write_chunk->data[0];
+ accessor->write_end = (char *)
+ accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
+ }
+ else
+ {
+ /* See if flushing helps. */
+ sts_flush_chunk(accessor);
+ }
+
+ /* It may still not be enough in the case of a gigantic tuple. */
+ if (accessor->write_pointer + size > accessor->write_end)
+ {
+ size_t written;
+
+ /*
+ * We'll write the beginning of the oversized tuple, and then
+ * write the rest in some number of 'overflow' chunks.
+ *
+ * sts_initialize() verifies that the size of the tuple +
+ * meta-data always fits into a chunk. Because the chunk has been
+ * flushed above, we can be sure to have all of a chunk's usable
+ * space available.
+ */
+ Assert(accessor->write_pointer + accessor->sts->meta_data_size +
+ sizeof(uint32) < accessor->write_end);
+
+ /* Write the meta-data as one chunk. */
+ if (accessor->sts->meta_data_size > 0)
+ memcpy(accessor->write_pointer, meta_data,
+ accessor->sts->meta_data_size);
+
+ /*
+ * Write as much of the tuple as we can fit. This includes the
+ * tuple's size at the start.
+ */
+ written = accessor->write_end - accessor->write_pointer -
+ accessor->sts->meta_data_size;
+ memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
+ tuple, written);
+ ++accessor->write_chunk->ntuples;
+ size -= accessor->sts->meta_data_size;
+ size -= written;
+ /* Now write as many overflow chunks as we need for the rest. */
+ while (size > 0)
+ {
+ size_t written_this_chunk;
+
+ sts_flush_chunk(accessor);
+
+ /*
+ * How many overflow chunks to go? This will allow readers to
+ * skip all of them at once instead of reading each one.
+ */
+ accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
+ STS_CHUNK_DATA_SIZE;
+ written_this_chunk =
+ Min(accessor->write_end - accessor->write_pointer, size);
+ memcpy(accessor->write_pointer, (char *) tuple + written,
+ written_this_chunk);
+ accessor->write_pointer += written_this_chunk;
+ size -= written_this_chunk;
+ written += written_this_chunk;
+ }
+ return;
+ }
+ }
+
+ /* Copy meta-data and tuple into buffer. */
+ if (accessor->sts->meta_data_size > 0)
+ memcpy(accessor->write_pointer, meta_data,
+ accessor->sts->meta_data_size);
+ memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
+ tuple->t_len);
+ accessor->write_pointer += size;
+ ++accessor->write_chunk->ntuples;
+}
+
+static MinimalTuple
+sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ MinimalTuple tuple;
+ uint32 size;
+ size_t remaining_size;
+ size_t this_chunk_size;
+ char *destination;
+
+ /*
+ * We'll keep track of bytes read from this chunk so that we can detect an
+ * overflowing tuple and switch to reading overflow pages.
+ */
+ if (accessor->sts->meta_data_size > 0)
+ {
+ if (BufFileRead(accessor->read_file,
+ meta_data,
+ accessor->sts->meta_data_size) !=
+ accessor->sts->meta_data_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading meta-data.")));
+ accessor->read_bytes += accessor->sts->meta_data_size;
+ }
+ if (BufFileRead(accessor->read_file,
+ &size,
+ sizeof(size)) != sizeof(size))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading size.")));
+ accessor->read_bytes += sizeof(size);
+ if (size > accessor->read_buffer_size)
+ {
+ size_t new_read_buffer_size;
+
+ if (accessor->read_buffer != NULL)
+ pfree(accessor->read_buffer);
+ new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
+ accessor->read_buffer =
+ MemoryContextAlloc(accessor->context, new_read_buffer_size);
+ accessor->read_buffer_size = new_read_buffer_size;
+ }
+ remaining_size = size - sizeof(uint32);
+ this_chunk_size = Min(remaining_size,
+ BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
+ destination = accessor->read_buffer + sizeof(uint32);
+ if (BufFileRead(accessor->read_file,
+ destination,
+ this_chunk_size) != this_chunk_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading tuple.")));
+ accessor->read_bytes += this_chunk_size;
+ remaining_size -= this_chunk_size;
+ destination += this_chunk_size;
+ ++accessor->read_ntuples;
+
+ /* Check if we need to read any overflow chunks. */
+ while (remaining_size > 0)
+ {
+ /* We are now positioned at the start of an overflow chunk. */
+ SharedTuplestoreChunk chunk_header;
+
+ if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
+ STS_CHUNK_HEADER_SIZE)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading overflow chunk header.")));
+ accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
+ if (chunk_header.overflow == 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected chunk in shared tuplestore temporary file"),
+ errdetail_internal("Expected overflow chunk.")));
+ accessor->read_next_page += STS_CHUNK_PAGES;
+ this_chunk_size = Min(remaining_size,
+ BLCKSZ * STS_CHUNK_PAGES -
+ STS_CHUNK_HEADER_SIZE);
+ if (BufFileRead(accessor->read_file,
+ destination,
+ this_chunk_size) != this_chunk_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail_internal("Short read while reading tuple.")));
+ accessor->read_bytes += this_chunk_size;
+ remaining_size -= this_chunk_size;
+ destination += this_chunk_size;
+
+ /*
+ * These will be used to count regular tuples following the oversized
+ * tuple that spilled into this overflow chunk.
+ */
+ accessor->read_ntuples = 0;
+ accessor->read_ntuples_available = chunk_header.ntuples;
+ }
+
+ tuple = (MinimalTuple) accessor->read_buffer;
+ tuple->t_len = size;
+
+ return tuple;
+}
+
+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ SharedTuplestoreParticipant *p;
+ BlockNumber read_page;
+ bool eof;
+
+ for (;;)
+ {
+ /* Can we read more tuples from the current chunk? */
+ if (accessor->read_ntuples < accessor->read_ntuples_available)
+ return sts_read_tuple(accessor, meta_data);
+
+ /* Find the location of a new chunk to read. */
+ p = &accessor->sts->participants[accessor->read_participant];
+
+ LWLockAcquire(&p->lock, LW_EXCLUSIVE);
+ /* We can skip directly past overflow pages we know about. */
+ if (p->read_page < accessor->read_next_page)
+ p->read_page = accessor->read_next_page;
+ eof = p->read_page >= p->npages;
+ if (!eof)
+ {
+ /* Claim the next chunk. */
+ read_page = p->read_page;
+ /* Advance the read head for the next reader. */
+ p->read_page += STS_CHUNK_PAGES;
+ accessor->read_next_page = p->read_page;
+ }
+ LWLockRelease(&p->lock);
+
+ if (!eof)
+ {
+ SharedTuplestoreChunk chunk_header;
+ size_t nread;
+
+ /* Make sure we have the file open. */
+ if (accessor->read_file == NULL)
+ {
+ char name[MAXPGPATH];
+
+ sts_filename(name, accessor, accessor->read_participant);
+ accessor->read_file =
+ BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
+ false);
+ }
+
+ /* Seek and load the chunk header. */
+ if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to block %u in shared tuplestore temporary file",
+ read_page)));
+ nread = BufFileRead(accessor->read_file, &chunk_header,
+ STS_CHUNK_HEADER_SIZE);
+ if (nread != STS_CHUNK_HEADER_SIZE)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes",
+ nread, STS_CHUNK_HEADER_SIZE)));
+
+ /*
+ * If this is an overflow chunk, we skip it and any following
+ * overflow chunks all at once.
+ */
+ if (chunk_header.overflow > 0)
+ {
+ accessor->read_next_page = read_page +
+ chunk_header.overflow * STS_CHUNK_PAGES;
+ continue;
+ }
+
+ accessor->read_ntuples = 0;
+ accessor->read_ntuples_available = chunk_header.ntuples;
+ accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
+
+ /* Go around again, so we can get a tuple from this chunk. */
+ }
+ else
+ {
+ if (accessor->read_file != NULL)
+ {
+ BufFileClose(accessor->read_file);
+ accessor->read_file = NULL;
+ }
+
+ /*
+ * Try the next participant's file. If we've gone full circle,
+ * we're done.
+ */
+ accessor->read_participant = (accessor->read_participant + 1) %
+ accessor->sts->nparticipants;
+ if (accessor->read_participant == accessor->participant)
+ break;
+ accessor->read_next_page = 0;
+
+ /* Go around again, so we can get a chunk from this file. */
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Create the name used for the BufFile that a given participant will write.
+ */
+static void
+sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+ snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}
diff --git a/src/backend/utils/sort/sortsupport.c b/src/backend/utils/sort/sortsupport.c
new file mode 100644
index 0000000..b38bcd8
--- /dev/null
+++ b/src/backend/utils/sort/sortsupport.c
@@ -0,0 +1,211 @@
+/*-------------------------------------------------------------------------
+ *
+ * sortsupport.c
+ * Support routines for accelerated sorting.
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/sort/sortsupport.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/gist.h"
+#include "access/nbtree.h"
+#include "catalog/pg_am.h"
+#include "fmgr.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/sortsupport.h"
+
+
+/* Info needed to use an old-style comparison function as a sort comparator */
+typedef struct
+{
+ FmgrInfo flinfo; /* lookup data for comparison function */
+ FunctionCallInfoBaseData fcinfo; /* reusable callinfo structure */
+} SortShimExtra;
+
+#define SizeForSortShimExtra(nargs) (offsetof(SortShimExtra, fcinfo) + SizeForFunctionCallInfo(nargs))
+
+/*
+ * Shim function for calling an old-style comparator
+ *
+ * This is essentially an inlined version of FunctionCall2Coll(), except
+ * we assume that the FunctionCallInfoBaseData was already mostly set up by
+ * PrepareSortSupportComparisonShim.
+ */
+static int
+comparison_shim(Datum x, Datum y, SortSupport ssup)
+{
+ SortShimExtra *extra = (SortShimExtra *) ssup->ssup_extra;
+ Datum result;
+
+ extra->fcinfo.args[0].value = x;
+ extra->fcinfo.args[1].value = y;
+
+ /* just for paranoia's sake, we reset isnull each time */
+ extra->fcinfo.isnull = false;
+
+ result = FunctionCallInvoke(&extra->fcinfo);
+
+ /* Check for null result, since caller is clearly not expecting one */
+ if (extra->fcinfo.isnull)
+ elog(ERROR, "function %u returned NULL", extra->flinfo.fn_oid);
+
+ return result;
+}
+
+/*
+ * Set up a shim function to allow use of an old-style btree comparison
+ * function as if it were a sort support comparator.
+ */
+void
+PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup)
+{
+ SortShimExtra *extra;
+
+ extra = (SortShimExtra *) MemoryContextAlloc(ssup->ssup_cxt,
+ SizeForSortShimExtra(2));
+
+ /* Lookup the comparison function */
+ fmgr_info_cxt(cmpFunc, &extra->flinfo, ssup->ssup_cxt);
+
+ /* We can initialize the callinfo just once and re-use it */
+ InitFunctionCallInfoData(extra->fcinfo, &extra->flinfo, 2,
+ ssup->ssup_collation, NULL, NULL);
+ extra->fcinfo.args[0].isnull = false;
+ extra->fcinfo.args[1].isnull = false;
+
+ ssup->ssup_extra = extra;
+ ssup->comparator = comparison_shim;
+}
+
+/*
+ * Look up and call sortsupport function to setup SortSupport comparator;
+ * or if no such function exists or it declines to set up the appropriate
+ * state, prepare a suitable shim.
+ */
+static void
+FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup)
+{
+ Oid sortSupportFunction;
+
+ /* Look for a sort support function */
+ sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
+ BTSORTSUPPORT_PROC);
+ if (OidIsValid(sortSupportFunction))
+ {
+ /*
+ * The sort support function can provide a comparator, but it can also
+ * choose not to so (e.g. based on the selected collation).
+ */
+ OidFunctionCall1(sortSupportFunction, PointerGetDatum(ssup));
+ }
+
+ if (ssup->comparator == NULL)
+ {
+ Oid sortFunction;
+
+ sortFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
+ BTORDER_PROC);
+
+ if (!OidIsValid(sortFunction))
+ elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
+ BTORDER_PROC, opcintype, opcintype, opfamily);
+
+ /* We'll use a shim to call the old-style btree comparator */
+ PrepareSortSupportComparisonShim(sortFunction, ssup);
+ }
+}
+
+/*
+ * Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
+ *
+ * Caller must previously have zeroed the SortSupportData structure and then
+ * filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill
+ * in ssup_reverse as well as the comparator function pointer.
+ */
+void
+PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
+{
+ Oid opfamily;
+ Oid opcintype;
+ int16 strategy;
+
+ Assert(ssup->comparator == NULL);
+
+ /* Find the operator in pg_amop */
+ if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
+ &strategy))
+ elog(ERROR, "operator %u is not a valid ordering operator",
+ orderingOp);
+ ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
+
+ FinishSortSupportFunction(opfamily, opcintype, ssup);
+}
+
+/*
+ * Fill in SortSupport given an index relation, attribute, and strategy.
+ *
+ * Caller must previously have zeroed the SortSupportData structure and then
+ * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This
+ * will fill in ssup_reverse (based on the supplied strategy), as well as the
+ * comparator function pointer.
+ */
+void
+PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
+ SortSupport ssup)
+{
+ Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1];
+ Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1];
+
+ Assert(ssup->comparator == NULL);
+
+ if (indexRel->rd_rel->relam != BTREE_AM_OID)
+ elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam);
+ if (strategy != BTGreaterStrategyNumber &&
+ strategy != BTLessStrategyNumber)
+ elog(ERROR, "unexpected sort support strategy: %d", strategy);
+ ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
+
+ FinishSortSupportFunction(opfamily, opcintype, ssup);
+}
+
+/*
+ * Fill in SortSupport given a GiST index relation
+ *
+ * Caller must previously have zeroed the SortSupportData structure and then
+ * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This
+ * will fill in ssup_reverse (always false for GiST index build), as well as
+ * the comparator function pointer.
+ */
+void
+PrepareSortSupportFromGistIndexRel(Relation indexRel, SortSupport ssup)
+{
+ Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1];
+ Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1];
+ Oid sortSupportFunction;
+
+ Assert(ssup->comparator == NULL);
+
+ if (indexRel->rd_rel->relam != GIST_AM_OID)
+ elog(ERROR, "unexpected non-gist AM: %u", indexRel->rd_rel->relam);
+ ssup->ssup_reverse = false;
+
+ /*
+ * Look up the sort support function. This is simpler than for B-tree
+ * indexes because we don't support the old-style btree comparators.
+ */
+ sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
+ GIST_SORTSUPPORT_PROC);
+ if (!OidIsValid(sortSupportFunction))
+ elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
+ GIST_SORTSUPPORT_PROC, opcintype, opcintype, opfamily);
+ OidFunctionCall1(sortSupportFunction, PointerGetDatum(ssup));
+}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
new file mode 100644
index 0000000..421afcf
--- /dev/null
+++ b/src/backend/utils/sort/tuplesort.c
@@ -0,0 +1,4938 @@
+/*-------------------------------------------------------------------------
+ *
+ * tuplesort.c
+ * Generalized tuple sorting routines.
+ *
+ * This module handles sorting of heap tuples, index tuples, or single
+ * Datums (and could easily support other kinds of sortable objects,
+ * if necessary). It works efficiently for both small and large amounts
+ * of data. Small amounts are sorted in-memory using qsort(). Large
+ * amounts are sorted using temporary files and a standard external sort
+ * algorithm.
+ *
+ * See Knuth, volume 3, for more than you want to know about external
+ * sorting algorithms. The algorithm we use is a balanced k-way merge.
+ * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's
+ * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
+ * merge is better. Knuth is assuming that tape drives are expensive
+ * beasts, and in particular that there will always be many more runs than
+ * tape drives. The polyphase merge algorithm was good at keeping all the
+ * tape drives busy, but in our implementation a "tape drive" doesn't cost
+ * much more than a few Kb of memory buffers, so we can afford to have
+ * lots of them. In particular, if we can have as many tape drives as
+ * sorted runs, we can eliminate any repeated I/O at all.
+ *
+ * Historically, we divided the input into sorted runs using replacement
+ * selection, in the form of a priority tree implemented as a heap
+ * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
+ * for run generation.
+ *
+ * The approximate amount of memory allowed for any one sort operation
+ * is specified in kilobytes by the caller (most pass work_mem). Initially,
+ * we absorb tuples and simply store them in an unsorted array as long as
+ * we haven't exceeded workMem. If we reach the end of the input without
+ * exceeding workMem, we sort the array using qsort() and subsequently return
+ * tuples just by scanning the tuple array sequentially. If we do exceed
+ * workMem, we begin to emit tuples into sorted runs in temporary tapes.
+ * When tuples are dumped in batch after quicksorting, we begin a new run
+ * with a new output tape. If we reach the max number of tapes, we write
+ * subsequent runs on the existing tapes in a round-robin fashion. We will
+ * need multiple merge passes to finish the merge in that case. After the
+ * end of the input is reached, we dump out remaining tuples in memory into
+ * a final run, then merge the runs.
+ *
+ * When merging runs, we use a heap containing just the frontmost tuple from
+ * each source run; we repeatedly output the smallest tuple and replace it
+ * with the next tuple from its source tape (if any). When the heap empties,
+ * the merge is complete. The basic merge algorithm thus needs very little
+ * memory --- only M tuples for an M-way merge, and M is constrained to a
+ * small number. However, we can still make good use of our full workMem
+ * allocation by pre-reading additional blocks from each source tape. Without
+ * prereading, our access pattern to the temporary file would be very erratic;
+ * on average we'd read one block from each of M source tapes during the same
+ * time that we're writing M blocks to the output tape, so there is no
+ * sequentiality of access at all, defeating the read-ahead methods used by
+ * most Unix kernels. Worse, the output tape gets written into a very random
+ * sequence of blocks of the temp file, ensuring that things will be even
+ * worse when it comes time to read that tape. A straightforward merge pass
+ * thus ends up doing a lot of waiting for disk seeks. We can improve matters
+ * by prereading from each source tape sequentially, loading about workMem/M
+ * bytes from each tape in turn, and making the sequential blocks immediately
+ * available for reuse. This approach helps to localize both read and write
+ * accesses. The pre-reading is handled by logtape.c, we just tell it how
+ * much memory to use for the buffers.
+ *
+ * In the current code we determine the number of input tapes M on the basis
+ * of workMem: we want workMem/M to be large enough that we read a fair
+ * amount of data each time we read from a tape, so as to maintain the
+ * locality of access described above. Nonetheless, with large workMem we
+ * can have many tapes. The logical "tapes" are implemented by logtape.c,
+ * which avoids space wastage by recycling disk space as soon as each block
+ * is read from its "tape".
+ *
+ * When the caller requests random access to the sort result, we form
+ * the final sorted run on a logical tape which is then "frozen", so
+ * that we can access it randomly. When the caller does not need random
+ * access, we return from tuplesort_performsort() as soon as we are down
+ * to one run per logical tape. The final merge is then performed
+ * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
+ * saves one cycle of writing all the data out to disk and reading it in.
+ *
+ * This module supports parallel sorting. Parallel sorts involve coordination
+ * among one or more worker processes, and a leader process, each with its own
+ * tuplesort state. The leader process (or, more accurately, the
+ * Tuplesortstate associated with a leader process) creates a full tapeset
+ * consisting of worker tapes with one run to merge; a run for every
+ * worker process. This is then merged. Worker processes are guaranteed to
+ * produce exactly one output run from their partial input.
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/sort/tuplesort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <limits.h>
+
+#include "access/hash.h"
+#include "access/htup_details.h"
+#include "access/nbtree.h"
+#include "catalog/index.h"
+#include "catalog/pg_am.h"
+#include "commands/tablespace.h"
+#include "executor/executor.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/datum.h"
+#include "utils/logtape.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/pg_rusage.h"
+#include "utils/rel.h"
+#include "utils/sortsupport.h"
+#include "utils/tuplesort.h"
+
+
+/* sort-type codes for sort__start probes */
+#define HEAP_SORT 0
+#define INDEX_SORT 1
+#define DATUM_SORT 2
+#define CLUSTER_SORT 3
+
+/* Sort parallel code from state for sort__start probes */
+#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \
+ (state)->worker >= 0 ? 1 : 2)
+
+/*
+ * Initial size of memtuples array. We're trying to select this size so that
+ * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of
+ * allocation might possibly be lowered. However, we don't consider array sizes
+ * less than 1024.
+ *
+ */
+#define INITIAL_MEMTUPSIZE Max(1024, \
+ ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
+
+/* GUC variables */
+#ifdef TRACE_SORT
+bool trace_sort = false;
+#endif
+
+#ifdef DEBUG_BOUNDED_SORT
+bool optimize_bounded_sort = true;
+#endif
+
+
+/*
+ * The objects we actually sort are SortTuple structs. These contain
+ * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
+ * which is a separate palloc chunk --- we assume it is just one chunk and
+ * can be freed by a simple pfree() (except during merge, when we use a
+ * simple slab allocator). SortTuples also contain the tuple's first key
+ * column in Datum/nullflag format, and a source/input tape number that
+ * tracks which tape each heap element/slot belongs to during merging.
+ *
+ * Storing the first key column lets us save heap_getattr or index_getattr
+ * calls during tuple comparisons. We could extract and save all the key
+ * columns not just the first, but this would increase code complexity and
+ * overhead, and wouldn't actually save any comparison cycles in the common
+ * case where the first key determines the comparison result. Note that
+ * for a pass-by-reference datatype, datum1 points into the "tuple" storage.
+ *
+ * There is one special case: when the sort support infrastructure provides an
+ * "abbreviated key" representation, where the key is (typically) a pass by
+ * value proxy for a pass by reference type. In this case, the abbreviated key
+ * is stored in datum1 in place of the actual first key column.
+ *
+ * When sorting single Datums, the data value is represented directly by
+ * datum1/isnull1 for pass by value types (or null values). If the datatype is
+ * pass-by-reference and isnull1 is false, then "tuple" points to a separately
+ * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then
+ * either the same pointer as "tuple", or is an abbreviated key value as
+ * described above. Accordingly, "tuple" is always used in preference to
+ * datum1 as the authoritative value for pass-by-reference cases.
+ */
+typedef struct
+{
+ void *tuple; /* the tuple itself */
+ Datum datum1; /* value of first key column */
+ bool isnull1; /* is first key column NULL? */
+ int srctape; /* source tape number */
+} SortTuple;
+
+/*
+ * During merge, we use a pre-allocated set of fixed-size slots to hold
+ * tuples. To avoid palloc/pfree overhead.
+ *
+ * Merge doesn't require a lot of memory, so we can afford to waste some,
+ * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the
+ * palloc() overhead is not significant anymore.
+ *
+ * 'nextfree' is valid when this chunk is in the free list. When in use, the
+ * slot holds a tuple.
+ */
+#define SLAB_SLOT_SIZE 1024
+
+typedef union SlabSlot
+{
+ union SlabSlot *nextfree;
+ char buffer[SLAB_SLOT_SIZE];
+} SlabSlot;
+
+/*
+ * Possible states of a Tuplesort object. These denote the states that
+ * persist between calls of Tuplesort routines.
+ */
+typedef enum
+{
+ TSS_INITIAL, /* Loading tuples; still within memory limit */
+ TSS_BOUNDED, /* Loading tuples into bounded-size heap */
+ TSS_BUILDRUNS, /* Loading tuples; writing to tape */
+ TSS_SORTEDINMEM, /* Sort completed entirely in memory */
+ TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */
+ TSS_FINALMERGE /* Performing final merge on-the-fly */
+} TupSortStatus;
+
+/*
+ * Parameters for calculation of number of tapes to use --- see inittapes()
+ * and tuplesort_merge_order().
+ *
+ * In this calculation we assume that each tape will cost us about 1 blocks
+ * worth of buffer space. This ignores the overhead of all the other data
+ * structures needed for each tape, but it's probably close enough.
+ *
+ * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
+ * input tape, for pre-reading (see discussion at top of file). This is *in
+ * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
+ */
+#define MINORDER 6 /* minimum merge order */
+#define MAXORDER 500 /* maximum merge order */
+#define TAPE_BUFFER_OVERHEAD BLCKSZ
+#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
+
+typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
+
+/*
+ * Private state of a Tuplesort operation.
+ */
+struct Tuplesortstate
+{
+ TupSortStatus status; /* enumerated value as shown above */
+ int nKeys; /* number of columns in sort key */
+ int sortopt; /* Bitmask of flags used to setup sort */
+ bool bounded; /* did caller specify a maximum number of
+ * tuples to return? */
+ bool boundUsed; /* true if we made use of a bounded heap */
+ int bound; /* if bounded, the maximum number of tuples */
+ bool tuples; /* Can SortTuple.tuple ever be set? */
+ int64 availMem; /* remaining memory available, in bytes */
+ int64 allowedMem; /* total memory allowed, in bytes */
+ int maxTapes; /* max number of input tapes to merge in each
+ * pass */
+ int64 maxSpace; /* maximum amount of space occupied among sort
+ * of groups, either in-memory or on-disk */
+ bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk
+ * space, false when it's value for in-memory
+ * space */
+ TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
+ MemoryContext maincontext; /* memory context for tuple sort metadata that
+ * persists across multiple batches */
+ MemoryContext sortcontext; /* memory context holding most sort data */
+ MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
+ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
+
+ /*
+ * These function pointers decouple the routines that must know what kind
+ * of tuple we are sorting from the routines that don't need to know it.
+ * They are set up by the tuplesort_begin_xxx routines.
+ *
+ * Function to compare two tuples; result is per qsort() convention, ie:
+ * <0, 0, >0 according as a<b, a=b, a>b. The API must match
+ * qsort_arg_comparator.
+ */
+ SortTupleComparator comparetup;
+
+ /*
+ * Function to copy a supplied input tuple into palloc'd space and set up
+ * its SortTuple representation (ie, set tuple/datum1/isnull1). Also,
+ * state->availMem must be decreased by the amount of space used for the
+ * tuple copy (note the SortTuple struct itself is not counted).
+ */
+ void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup);
+
+ /*
+ * Function to write a stored tuple onto tape. The representation of the
+ * tuple on tape need not be the same as it is in memory; requirements on
+ * the tape representation are given below. Unless the slab allocator is
+ * used, after writing the tuple, pfree() the out-of-line data (not the
+ * SortTuple struct!), and increase state->availMem by the amount of
+ * memory space thereby released.
+ */
+ void (*writetup) (Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+
+ /*
+ * Function to read a stored tuple from tape back into memory. 'len' is
+ * the already-read length of the stored tuple. The tuple is allocated
+ * from the slab memory arena, or is palloc'd, see readtup_alloc().
+ */
+ void (*readtup) (Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len);
+
+ /*
+ * Whether SortTuple's datum1 and isnull1 members are maintained by the
+ * above routines. If not, some sort specializations are disabled.
+ */
+ bool haveDatum1;
+
+ /*
+ * This array holds the tuples now in sort memory. If we are in state
+ * INITIAL, the tuples are in no particular order; if we are in state
+ * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
+ * and FINALMERGE, the tuples are organized in "heap" order per Algorithm
+ * H. In state SORTEDONTAPE, the array is not used.
+ */
+ SortTuple *memtuples; /* array of SortTuple structs */
+ int memtupcount; /* number of tuples currently present */
+ int memtupsize; /* allocated length of memtuples array */
+ bool growmemtuples; /* memtuples' growth still underway? */
+
+ /*
+ * Memory for tuples is sometimes allocated using a simple slab allocator,
+ * rather than with palloc(). Currently, we switch to slab allocation
+ * when we start merging. Merging only needs to keep a small, fixed
+ * number of tuples in memory at any time, so we can avoid the
+ * palloc/pfree overhead by recycling a fixed number of fixed-size slots
+ * to hold the tuples.
+ *
+ * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE
+ * slots. The allocation is sized to have one slot per tape, plus one
+ * additional slot. We need that many slots to hold all the tuples kept
+ * in the heap during merge, plus the one we have last returned from the
+ * sort, with tuplesort_gettuple.
+ *
+ * Initially, all the slots are kept in a linked list of free slots. When
+ * a tuple is read from a tape, it is put to the next available slot, if
+ * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd
+ * instead.
+ *
+ * When we're done processing a tuple, we return the slot back to the free
+ * list, or pfree() if it was palloc'd. We know that a tuple was
+ * allocated from the slab, if its pointer value is between
+ * slabMemoryBegin and -End.
+ *
+ * When the slab allocator is used, the USEMEM/LACKMEM mechanism of
+ * tracking memory usage is not used.
+ */
+ bool slabAllocatorUsed;
+
+ char *slabMemoryBegin; /* beginning of slab memory arena */
+ char *slabMemoryEnd; /* end of slab memory arena */
+ SlabSlot *slabFreeHead; /* head of free list */
+
+ /* Memory used for input and output tape buffers. */
+ size_t tape_buffer_mem;
+
+ /*
+ * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
+ * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
+ * modes), we remember the tuple in 'lastReturnedTuple', so that we can
+ * recycle the memory on next gettuple call.
+ */
+ void *lastReturnedTuple;
+
+ /*
+ * While building initial runs, this is the current output run number.
+ * Afterwards, it is the number of initial runs we made.
+ */
+ int currentRun;
+
+ /*
+ * Logical tapes, for merging.
+ *
+ * The initial runs are written in the output tapes. In each merge pass,
+ * the output tapes of the previous pass become the input tapes, and new
+ * output tapes are created as needed. When nInputTapes equals
+ * nInputRuns, there is only one merge pass left.
+ */
+ LogicalTape **inputTapes;
+ int nInputTapes;
+ int nInputRuns;
+
+ LogicalTape **outputTapes;
+ int nOutputTapes;
+ int nOutputRuns;
+
+ LogicalTape *destTape; /* current output tape */
+
+ /*
+ * These variables are used after completion of sorting to keep track of
+ * the next tuple to return. (In the tape case, the tape's current read
+ * position is also critical state.)
+ */
+ LogicalTape *result_tape; /* actual tape of finished output */
+ int current; /* array index (only used if SORTEDINMEM) */
+ bool eof_reached; /* reached EOF (needed for cursors) */
+
+ /* markpos_xxx holds marked position for mark and restore */
+ long markpos_block; /* tape block# (only used if SORTEDONTAPE) */
+ int markpos_offset; /* saved "current", or offset in tape block */
+ bool markpos_eof; /* saved "eof_reached" */
+
+ /*
+ * These variables are used during parallel sorting.
+ *
+ * worker is our worker identifier. Follows the general convention that
+ * -1 value relates to a leader tuplesort, and values >= 0 worker
+ * tuplesorts. (-1 can also be a serial tuplesort.)
+ *
+ * shared is mutable shared memory state, which is used to coordinate
+ * parallel sorts.
+ *
+ * nParticipants is the number of worker Tuplesortstates known by the
+ * leader to have actually been launched, which implies that they must
+ * finish a run that the leader needs to merge. Typically includes a
+ * worker state held by the leader process itself. Set in the leader
+ * Tuplesortstate only.
+ */
+ int worker;
+ Sharedsort *shared;
+ int nParticipants;
+
+ /*
+ * The sortKeys variable is used by every case other than the hash index
+ * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
+ * MinimalTuple and CLUSTER routines, though.
+ */
+ TupleDesc tupDesc;
+ SortSupport sortKeys; /* array of length nKeys */
+
+ /*
+ * This variable is shared by the single-key MinimalTuple case and the
+ * Datum case (which both use qsort_ssup()). Otherwise, it's NULL. The
+ * presence of a value in this field is also checked by various sort
+ * specialization functions as an optimization when comparing the leading
+ * key in a tiebreak situation to determine if there are any subsequent
+ * keys to sort on.
+ */
+ SortSupport onlyKey;
+
+ /*
+ * Additional state for managing "abbreviated key" sortsupport routines
+ * (which currently may be used by all cases except the hash index case).
+ * Tracks the intervals at which the optimization's effectiveness is
+ * tested.
+ */
+ int64 abbrevNext; /* Tuple # at which to next check
+ * applicability */
+
+ /*
+ * These variables are specific to the CLUSTER case; they are set by
+ * tuplesort_begin_cluster.
+ */
+ IndexInfo *indexInfo; /* info about index being used for reference */
+ EState *estate; /* for evaluating index expressions */
+
+ /*
+ * These variables are specific to the IndexTuple case; they are set by
+ * tuplesort_begin_index_xxx and used only by the IndexTuple routines.
+ */
+ Relation heapRel; /* table the index is being built on */
+ Relation indexRel; /* index being built */
+
+ /* These are specific to the index_btree subcase: */
+ bool enforceUnique; /* complain if we find duplicate tuples */
+ bool uniqueNullsNotDistinct; /* unique constraint null treatment */
+
+ /* These are specific to the index_hash subcase: */
+ uint32 high_mask; /* masks for sortable part of hash code */
+ uint32 low_mask;
+ uint32 max_buckets;
+
+ /*
+ * These variables are specific to the Datum case; they are set by
+ * tuplesort_begin_datum and used only by the DatumTuple routines.
+ */
+ Oid datumType;
+ /* we need typelen in order to know how to copy the Datums. */
+ int datumTypeLen;
+
+ /*
+ * Resource snapshot for time of sort start.
+ */
+#ifdef TRACE_SORT
+ PGRUsage ru_start;
+#endif
+};
+
+/*
+ * Private mutable state of tuplesort-parallel-operation. This is allocated
+ * in shared memory.
+ */
+struct Sharedsort
+{
+ /* mutex protects all fields prior to tapes */
+ slock_t mutex;
+
+ /*
+ * currentWorker generates ordinal identifier numbers for parallel sort
+ * workers. These start from 0, and are always gapless.
+ *
+ * Workers increment workersFinished to indicate having finished. If this
+ * is equal to state.nParticipants within the leader, leader is ready to
+ * merge worker runs.
+ */
+ int currentWorker;
+ int workersFinished;
+
+ /* Temporary file space */
+ SharedFileSet fileset;
+
+ /* Size of tapes flexible array */
+ int nTapes;
+
+ /*
+ * Tapes array used by workers to report back information needed by the
+ * leader to concatenate all worker tapes into one for merging
+ */
+ TapeShare tapes[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * Is the given tuple allocated from the slab memory arena?
+ */
+#define IS_SLAB_SLOT(state, tuple) \
+ ((char *) (tuple) >= (state)->slabMemoryBegin && \
+ (char *) (tuple) < (state)->slabMemoryEnd)
+
+/*
+ * Return the given tuple to the slab memory free list, or free it
+ * if it was palloc'd.
+ */
+#define RELEASE_SLAB_SLOT(state, tuple) \
+ do { \
+ SlabSlot *buf = (SlabSlot *) tuple; \
+ \
+ if (IS_SLAB_SLOT((state), buf)) \
+ { \
+ buf->nextfree = (state)->slabFreeHead; \
+ (state)->slabFreeHead = buf; \
+ } else \
+ pfree(buf); \
+ } while(0)
+
+#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state))
+#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
+#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
+#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
+#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed)
+#define USEMEM(state,amt) ((state)->availMem -= (amt))
+#define FREEMEM(state,amt) ((state)->availMem += (amt))
+#define SERIAL(state) ((state)->shared == NULL)
+#define WORKER(state) ((state)->shared && (state)->worker != -1)
+#define LEADER(state) ((state)->shared && (state)->worker == -1)
+
+/*
+ * NOTES about on-tape representation of tuples:
+ *
+ * We require the first "unsigned int" of a stored tuple to be the total size
+ * on-tape of the tuple, including itself (so it is never zero; an all-zero
+ * unsigned int is used to delimit runs). The remainder of the stored tuple
+ * may or may not match the in-memory representation of the tuple ---
+ * any conversion needed is the job of the writetup and readtup routines.
+ *
+ * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored
+ * representation of the tuple must be followed by another "unsigned int" that
+ * is a copy of the length --- so the total tape space used is actually
+ * sizeof(unsigned int) more than the stored length value. This allows
+ * read-backwards. When the random access flag was not specified, the
+ * write/read routines may omit the extra length word.
+ *
+ * writetup is expected to write both length words as well as the tuple
+ * data. When readtup is called, the tape is positioned just after the
+ * front length word; readtup must read the tuple data and advance past
+ * the back length word (if present).
+ *
+ * The write/read routines can make use of the tuple description data
+ * stored in the Tuplesortstate record, if needed. They are also expected
+ * to adjust state->availMem by the amount of memory space (not tape space!)
+ * released or consumed. There is no error return from either writetup
+ * or readtup; they should ereport() on failure.
+ *
+ *
+ * NOTES about memory consumption calculations:
+ *
+ * We count space allocated for tuples against the workMem limit, plus
+ * the space used by the variable-size memtuples array. Fixed-size space
+ * is not counted; it's small enough to not be interesting.
+ *
+ * Note that we count actual space used (as shown by GetMemoryChunkSpace)
+ * rather than the originally-requested size. This is important since
+ * palloc can add substantial overhead. It's not a complete answer since
+ * we won't count any wasted space in palloc allocation blocks, but it's
+ * a lot better than what we were doing before 7.3. As of 9.6, a
+ * separate memory context is used for caller passed tuples. Resetting
+ * it at certain key increments significantly ameliorates fragmentation.
+ * Note that this places a responsibility on copytup routines to use the
+ * correct memory context for these tuples (and to not use the reset
+ * context for anything whose lifetime needs to span multiple external
+ * sort runs). readtup routines use the slab allocator (they cannot use
+ * the reset context because it gets deleted at the point that merging
+ * begins).
+ */
+
+/* When using this macro, beware of double evaluation of len */
+#define LogicalTapeReadExact(tape, ptr, len) \
+ do { \
+ if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
+ elog(ERROR, "unexpected end of data"); \
+ } while(0)
+
+
+static Tuplesortstate *tuplesort_begin_common(int workMem,
+ SortCoordinate coordinate,
+ int sortopt);
+static void tuplesort_begin_batch(Tuplesortstate *state);
+static void puttuple_common(Tuplesortstate *state, SortTuple *tuple);
+static bool consider_abort_common(Tuplesortstate *state);
+static void inittapes(Tuplesortstate *state, bool mergeruns);
+static void inittapestate(Tuplesortstate *state, int maxTapes);
+static void selectnewtape(Tuplesortstate *state);
+static void init_slab_allocator(Tuplesortstate *state, int numSlots);
+static void mergeruns(Tuplesortstate *state);
+static void mergeonerun(Tuplesortstate *state);
+static void beginmerge(Tuplesortstate *state);
+static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
+static void dumptuples(Tuplesortstate *state, bool alltuples);
+static void make_bounded_heap(Tuplesortstate *state);
+static void sort_bounded_heap(Tuplesortstate *state);
+static void tuplesort_sort_memtuples(Tuplesortstate *state);
+static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
+static void tuplesort_heap_delete_top(Tuplesortstate *state);
+static void reversedirection(Tuplesortstate *state);
+static unsigned int getlen(LogicalTape *tape, bool eofOK);
+static void markrunend(LogicalTape *tape);
+static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
+static int comparetup_heap(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
+static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len);
+static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
+static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len);
+static int comparetup_index_btree(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
+static int comparetup_index_hash(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
+static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+static void readtup_index(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len);
+static int comparetup_datum(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state);
+static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
+ SortTuple *stup);
+static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len);
+static int worker_get_identifier(Tuplesortstate *state);
+static void worker_freeze_result_tape(Tuplesortstate *state);
+static void worker_nomergeruns(Tuplesortstate *state);
+static void leader_takeover_tapes(Tuplesortstate *state);
+static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
+static void tuplesort_free(Tuplesortstate *state);
+static void tuplesort_updatemax(Tuplesortstate *state);
+
+/*
+ * Specialized comparators that we can inline into specialized sorts. The goal
+ * is to try to sort two tuples without having to follow the pointers to the
+ * comparator or the tuple.
+ *
+ * XXX: For now, these fall back to comparator functions that will compare the
+ * leading datum a second time.
+ *
+ * XXX: For now, there is no specialization for cases where datum1 is
+ * authoritative and we don't even need to fall back to a callback at all (that
+ * would be true for types like int4/int8/timestamp/date, but not true for
+ * abbreviations of text or multi-key sorts. There could be! Is it worth it?
+ */
+
+/* Used if first key's comparator is ssup_datum_unsigned_compare */
+static pg_attribute_always_inline int
+qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
+{
+ int compare;
+
+ compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ &state->sortKeys[0]);
+ if (compare != 0)
+ return compare;
+
+ /*
+ * No need to waste effort calling the tiebreak function when there are no
+ * other keys to sort on.
+ */
+ if (state->onlyKey != NULL)
+ return 0;
+
+ return state->comparetup(a, b, state);
+}
+
+#if SIZEOF_DATUM >= 8
+/* Used if first key's comparator is ssup_datum_signed_compare */
+static pg_attribute_always_inline int
+qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
+{
+ int compare;
+
+ compare = ApplySignedSortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ &state->sortKeys[0]);
+
+ if (compare != 0)
+ return compare;
+
+ /*
+ * No need to waste effort calling the tiebreak function when there are no
+ * other keys to sort on.
+ */
+ if (state->onlyKey != NULL)
+ return 0;
+
+ return state->comparetup(a, b, state);
+}
+#endif
+
+/* Used if first key's comparator is ssup_datum_int32_compare */
+static pg_attribute_always_inline int
+qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state)
+{
+ int compare;
+
+ compare = ApplyInt32SortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ &state->sortKeys[0]);
+
+ if (compare != 0)
+ return compare;
+
+ /*
+ * No need to waste effort calling the tiebreak function when there are no
+ * other keys to sort on.
+ */
+ if (state->onlyKey != NULL)
+ return 0;
+
+ return state->comparetup(a, b, state);
+}
+
+/*
+ * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts
+ * any variant of SortTuples, using the appropriate comparetup function.
+ * qsort_ssup() is specialized for the case where the comparetup function
+ * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts
+ * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for
+ * common comparison functions on pass-by-value leading datums.
+ */
+
+#define ST_SORT qsort_tuple_unsigned
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state)
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+#if SIZEOF_DATUM >= 8
+#define ST_SORT qsort_tuple_signed
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state)
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+#endif
+
+#define ST_SORT qsort_tuple_int32
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state)
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+#define ST_SORT qsort_tuple
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE_RUNTIME_POINTER
+#define ST_COMPARE_ARG_TYPE Tuplesortstate
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DECLARE
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+#define ST_SORT qsort_ssup
+#define ST_ELEMENT_TYPE SortTuple
+#define ST_COMPARE(a, b, ssup) \
+ ApplySortComparator((a)->datum1, (a)->isnull1, \
+ (b)->datum1, (b)->isnull1, (ssup))
+#define ST_COMPARE_ARG_TYPE SortSupportData
+#define ST_CHECK_FOR_INTERRUPTS
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
+/*
+ * tuplesort_begin_xxx
+ *
+ * Initialize for a tuple sort operation.
+ *
+ * After calling tuplesort_begin, the caller should call tuplesort_putXXX
+ * zero or more times, then call tuplesort_performsort when all the tuples
+ * have been supplied. After performsort, retrieve the tuples in sorted
+ * order by calling tuplesort_getXXX until it returns false/NULL. (If random
+ * access was requested, rescan, markpos, and restorepos can also be called.)
+ * Call tuplesort_end to terminate the operation and release memory/disk space.
+ *
+ * Each variant of tuplesort_begin has a workMem parameter specifying the
+ * maximum number of kilobytes of RAM to use before spilling data to disk.
+ * (The normal value of this parameter is work_mem, but some callers use
+ * other values.) Each variant also has a sortopt which is a bitmask of
+ * sort options. See TUPLESORT_* definitions in tuplesort.h
+ */
+
+static Tuplesortstate *
+tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt)
+{
+ Tuplesortstate *state;
+ MemoryContext maincontext;
+ MemoryContext sortcontext;
+ MemoryContext oldcontext;
+
+ /* See leader_takeover_tapes() remarks on random access support */
+ if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS))
+ elog(ERROR, "random access disallowed under parallel sort");
+
+ /*
+ * Memory context surviving tuplesort_reset. This memory context holds
+ * data which is useful to keep while sorting multiple similar batches.
+ */
+ maincontext = AllocSetContextCreate(CurrentMemoryContext,
+ "TupleSort main",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Create a working memory context for one sort operation. The content of
+ * this context is deleted by tuplesort_reset.
+ */
+ sortcontext = AllocSetContextCreate(maincontext,
+ "TupleSort sort",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Additionally a working memory context for tuples is setup in
+ * tuplesort_begin_batch.
+ */
+
+ /*
+ * Make the Tuplesortstate within the per-sortstate context. This way, we
+ * don't need a separate pfree() operation for it at shutdown.
+ */
+ oldcontext = MemoryContextSwitchTo(maincontext);
+
+ state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ pg_rusage_init(&state->ru_start);
+#endif
+
+ state->sortopt = sortopt;
+ state->tuples = true;
+
+ /*
+ * workMem is forced to be at least 64KB, the current minimum valid value
+ * for the work_mem GUC. This is a defense against parallel sort callers
+ * that divide out memory among many workers in a way that leaves each
+ * with very little memory.
+ */
+ state->allowedMem = Max(workMem, 64) * (int64) 1024;
+ state->sortcontext = sortcontext;
+ state->maincontext = maincontext;
+
+ /*
+ * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
+ * see comments in grow_memtuples().
+ */
+ state->memtupsize = INITIAL_MEMTUPSIZE;
+ state->memtuples = NULL;
+
+ /*
+ * After all of the other non-parallel-related state, we setup all of the
+ * state needed for each batch.
+ */
+ tuplesort_begin_batch(state);
+
+ /*
+ * Initialize parallel-related state based on coordination information
+ * from caller
+ */
+ if (!coordinate)
+ {
+ /* Serial sort */
+ state->shared = NULL;
+ state->worker = -1;
+ state->nParticipants = -1;
+ }
+ else if (coordinate->isWorker)
+ {
+ /* Parallel worker produces exactly one final run from all input */
+ state->shared = coordinate->sharedsort;
+ state->worker = worker_get_identifier(state);
+ state->nParticipants = -1;
+ }
+ else
+ {
+ /* Parallel leader state only used for final merge */
+ state->shared = coordinate->sharedsort;
+ state->worker = -1;
+ state->nParticipants = coordinate->nParticipants;
+ Assert(state->nParticipants >= 1);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * tuplesort_begin_batch
+ *
+ * Setup, or reset, all state need for processing a new set of tuples with this
+ * sort state. Called both from tuplesort_begin_common (the first time sorting
+ * with this sort state) and tuplesort_reset (for subsequent usages).
+ */
+static void
+tuplesort_begin_batch(Tuplesortstate *state)
+{
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(state->maincontext);
+
+ /*
+ * Caller tuple (e.g. IndexTuple) memory context.
+ *
+ * A dedicated child context used exclusively for caller passed tuples
+ * eases memory management. Resetting at key points reduces
+ * fragmentation. Note that the memtuples array of SortTuples is allocated
+ * in the parent context, not this context, because there is no need to
+ * free memtuples early. For bounded sorts, tuples may be pfreed in any
+ * order, so we use a regular aset.c context so that it can make use of
+ * free'd memory. When the sort is not bounded, we make use of a
+ * generation.c context as this keeps allocations more compact with less
+ * wastage. Allocations are also slightly more CPU efficient.
+ */
+ if (state->sortopt & TUPLESORT_ALLOWBOUNDED)
+ state->tuplecontext = AllocSetContextCreate(state->sortcontext,
+ "Caller tuples",
+ ALLOCSET_DEFAULT_SIZES);
+ else
+ state->tuplecontext = GenerationContextCreate(state->sortcontext,
+ "Caller tuples",
+ ALLOCSET_DEFAULT_SIZES);
+
+
+ state->status = TSS_INITIAL;
+ state->bounded = false;
+ state->boundUsed = false;
+
+ state->availMem = state->allowedMem;
+
+ state->tapeset = NULL;
+
+ state->memtupcount = 0;
+
+ /*
+ * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
+ * see comments in grow_memtuples().
+ */
+ state->growmemtuples = true;
+ state->slabAllocatorUsed = false;
+ if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE)
+ {
+ pfree(state->memtuples);
+ state->memtuples = NULL;
+ state->memtupsize = INITIAL_MEMTUPSIZE;
+ }
+ if (state->memtuples == NULL)
+ {
+ state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ }
+
+ /* workMem must be large enough for the minimal memtuples array */
+ if (LACKMEM(state))
+ elog(ERROR, "insufficient memory allowed for sort");
+
+ state->currentRun = 0;
+
+ /*
+ * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
+ * inittapes(), if needed.
+ */
+
+ state->result_tape = NULL; /* flag that result tape has not been formed */
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+Tuplesortstate *
+tuplesort_begin_heap(TupleDesc tupDesc,
+ int nkeys, AttrNumber *attNums,
+ Oid *sortOperators, Oid *sortCollations,
+ bool *nullsFirstFlags,
+ int workMem, SortCoordinate coordinate, int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(state->maincontext);
+
+ AssertArg(nkeys > 0);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
+ nkeys, workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ state->nKeys = nkeys;
+
+ TRACE_POSTGRESQL_SORT_START(HEAP_SORT,
+ false, /* no unique check */
+ nkeys,
+ workMem,
+ sortopt & TUPLESORT_RANDOMACCESS,
+ PARALLEL_SORT(state));
+
+ state->comparetup = comparetup_heap;
+ state->copytup = copytup_heap;
+ state->writetup = writetup_heap;
+ state->readtup = readtup_heap;
+ state->haveDatum1 = true;
+
+ state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
+ state->abbrevNext = 10;
+
+ /* Prepare SortSupport data for each column */
+ state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData));
+
+ for (i = 0; i < nkeys; i++)
+ {
+ SortSupport sortKey = state->sortKeys + i;
+
+ AssertArg(attNums[i] != 0);
+ AssertArg(sortOperators[i] != 0);
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = sortCollations[i];
+ sortKey->ssup_nulls_first = nullsFirstFlags[i];
+ sortKey->ssup_attno = attNums[i];
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0 && state->haveDatum1);
+
+ PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey);
+ }
+
+ /*
+ * The "onlyKey" optimization cannot be used with abbreviated keys, since
+ * tie-breaker comparisons may be required. Typically, the optimization
+ * is only of value to pass-by-value types anyway, whereas abbreviated
+ * keys are typically only of value to pass-by-reference types.
+ */
+ if (nkeys == 1 && !state->sortKeys->abbrev_converter)
+ state->onlyKey = state->sortKeys;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+Tuplesortstate *
+tuplesort_begin_cluster(TupleDesc tupDesc,
+ Relation indexRel,
+ int workMem,
+ SortCoordinate coordinate, int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ BTScanInsert indexScanKey;
+ MemoryContext oldcontext;
+ int i;
+
+ Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
+
+ oldcontext = MemoryContextSwitchTo(state->maincontext);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c",
+ RelationGetNumberOfAttributes(indexRel),
+ workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
+
+ TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
+ false, /* no unique check */
+ state->nKeys,
+ workMem,
+ sortopt & TUPLESORT_RANDOMACCESS,
+ PARALLEL_SORT(state));
+
+ state->comparetup = comparetup_cluster;
+ state->copytup = copytup_cluster;
+ state->writetup = writetup_cluster;
+ state->readtup = readtup_cluster;
+ state->abbrevNext = 10;
+
+ state->indexInfo = BuildIndexInfo(indexRel);
+
+ /*
+ * If we don't have a simple leading attribute, we don't currently
+ * initialize datum1, so disable optimizations that require it.
+ */
+ if (state->indexInfo->ii_IndexAttrNumbers[0] == 0)
+ state->haveDatum1 = false;
+ else
+ state->haveDatum1 = true;
+
+ state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
+
+ indexScanKey = _bt_mkscankey(indexRel, NULL);
+
+ if (state->indexInfo->ii_Expressions != NULL)
+ {
+ TupleTableSlot *slot;
+ ExprContext *econtext;
+
+ /*
+ * We will need to use FormIndexDatum to evaluate the index
+ * expressions. To do that, we need an EState, as well as a
+ * TupleTableSlot to put the table tuples into. The econtext's
+ * scantuple has to point to that slot, too.
+ */
+ state->estate = CreateExecutorState();
+ slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
+ econtext = GetPerTupleExprContext(state->estate);
+ econtext->ecxt_scantuple = slot;
+ }
+
+ /* Prepare SortSupport data for each column */
+ state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ sizeof(SortSupportData));
+
+ for (i = 0; i < state->nKeys; i++)
+ {
+ SortSupport sortKey = state->sortKeys + i;
+ ScanKey scanKey = indexScanKey->scankeys + i;
+ int16 strategy;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = scanKey->sk_collation;
+ sortKey->ssup_nulls_first =
+ (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ sortKey->ssup_attno = scanKey->sk_attno;
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0 && state->haveDatum1);
+
+ AssertState(sortKey->ssup_attno != 0);
+
+ strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+ PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+ }
+
+ pfree(indexScanKey);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+Tuplesortstate *
+tuplesort_begin_index_btree(Relation heapRel,
+ Relation indexRel,
+ bool enforceUnique,
+ bool uniqueNullsNotDistinct,
+ int workMem,
+ SortCoordinate coordinate,
+ int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ BTScanInsert indexScanKey;
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(state->maincontext);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin index sort: unique = %c, workMem = %d, randomAccess = %c",
+ enforceUnique ? 't' : 'f',
+ workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
+
+ TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
+ enforceUnique,
+ state->nKeys,
+ workMem,
+ sortopt & TUPLESORT_RANDOMACCESS,
+ PARALLEL_SORT(state));
+
+ state->comparetup = comparetup_index_btree;
+ state->copytup = copytup_index;
+ state->writetup = writetup_index;
+ state->readtup = readtup_index;
+ state->abbrevNext = 10;
+ state->haveDatum1 = true;
+
+ state->heapRel = heapRel;
+ state->indexRel = indexRel;
+ state->enforceUnique = enforceUnique;
+ state->uniqueNullsNotDistinct = uniqueNullsNotDistinct;
+
+ indexScanKey = _bt_mkscankey(indexRel, NULL);
+
+ /* Prepare SortSupport data for each column */
+ state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ sizeof(SortSupportData));
+
+ for (i = 0; i < state->nKeys; i++)
+ {
+ SortSupport sortKey = state->sortKeys + i;
+ ScanKey scanKey = indexScanKey->scankeys + i;
+ int16 strategy;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = scanKey->sk_collation;
+ sortKey->ssup_nulls_first =
+ (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
+ sortKey->ssup_attno = scanKey->sk_attno;
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0 && state->haveDatum1);
+
+ AssertState(sortKey->ssup_attno != 0);
+
+ strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
+ BTGreaterStrategyNumber : BTLessStrategyNumber;
+
+ PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
+ }
+
+ pfree(indexScanKey);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+Tuplesortstate *
+tuplesort_begin_index_hash(Relation heapRel,
+ Relation indexRel,
+ uint32 high_mask,
+ uint32 low_mask,
+ uint32 max_buckets,
+ int workMem,
+ SortCoordinate coordinate,
+ int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(state->maincontext);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin index sort: high_mask = 0x%x, low_mask = 0x%x, "
+ "max_buckets = 0x%x, workMem = %d, randomAccess = %c",
+ high_mask,
+ low_mask,
+ max_buckets,
+ workMem,
+ sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ state->nKeys = 1; /* Only one sort column, the hash code */
+
+ state->comparetup = comparetup_index_hash;
+ state->copytup = copytup_index;
+ state->writetup = writetup_index;
+ state->readtup = readtup_index;
+ state->haveDatum1 = true;
+
+ state->heapRel = heapRel;
+ state->indexRel = indexRel;
+
+ state->high_mask = high_mask;
+ state->low_mask = low_mask;
+ state->max_buckets = max_buckets;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+Tuplesortstate *
+tuplesort_begin_index_gist(Relation heapRel,
+ Relation indexRel,
+ int workMem,
+ SortCoordinate coordinate,
+ int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ MemoryContext oldcontext;
+ int i;
+
+ oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin index sort: workMem = %d, randomAccess = %c",
+ workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
+
+ state->comparetup = comparetup_index_btree;
+ state->copytup = copytup_index;
+ state->writetup = writetup_index;
+ state->readtup = readtup_index;
+ state->haveDatum1 = true;
+
+ state->heapRel = heapRel;
+ state->indexRel = indexRel;
+
+ /* Prepare SortSupport data for each column */
+ state->sortKeys = (SortSupport) palloc0(state->nKeys *
+ sizeof(SortSupportData));
+
+ for (i = 0; i < state->nKeys; i++)
+ {
+ SortSupport sortKey = state->sortKeys + i;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = indexRel->rd_indcollation[i];
+ sortKey->ssup_nulls_first = false;
+ sortKey->ssup_attno = i + 1;
+ /* Convey if abbreviation optimization is applicable in principle */
+ sortKey->abbreviate = (i == 0 && state->haveDatum1);
+
+ AssertState(sortKey->ssup_attno != 0);
+
+ /* Look for a sort support function */
+ PrepareSortSupportFromGistIndexRel(indexRel, sortKey);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+Tuplesortstate *
+tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
+ bool nullsFirstFlag, int workMem,
+ SortCoordinate coordinate, int sortopt)
+{
+ Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+ sortopt);
+ MemoryContext oldcontext;
+ int16 typlen;
+ bool typbyval;
+
+ oldcontext = MemoryContextSwitchTo(state->maincontext);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG,
+ "begin datum sort: workMem = %d, randomAccess = %c",
+ workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+ state->nKeys = 1; /* always a one-column sort */
+
+ TRACE_POSTGRESQL_SORT_START(DATUM_SORT,
+ false, /* no unique check */
+ 1,
+ workMem,
+ sortopt & TUPLESORT_RANDOMACCESS,
+ PARALLEL_SORT(state));
+
+ state->comparetup = comparetup_datum;
+ state->copytup = copytup_datum;
+ state->writetup = writetup_datum;
+ state->readtup = readtup_datum;
+ state->abbrevNext = 10;
+ state->haveDatum1 = true;
+
+ state->datumType = datumType;
+
+ /* lookup necessary attributes of the datum type */
+ get_typlenbyval(datumType, &typlen, &typbyval);
+ state->datumTypeLen = typlen;
+ state->tuples = !typbyval;
+
+ /* Prepare SortSupport data */
+ state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
+
+ state->sortKeys->ssup_cxt = CurrentMemoryContext;
+ state->sortKeys->ssup_collation = sortCollation;
+ state->sortKeys->ssup_nulls_first = nullsFirstFlag;
+
+ /*
+ * Abbreviation is possible here only for by-reference types. In theory,
+ * a pass-by-value datatype could have an abbreviated form that is cheaper
+ * to compare. In a tuple sort, we could support that, because we can
+ * always extract the original datum from the tuple as needed. Here, we
+ * can't, because a datum sort only stores a single copy of the datum; the
+ * "tuple" field of each SortTuple is NULL.
+ */
+ state->sortKeys->abbreviate = !typbyval;
+
+ PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys);
+
+ /*
+ * The "onlyKey" optimization cannot be used with abbreviated keys, since
+ * tie-breaker comparisons may be required. Typically, the optimization
+ * is only of value to pass-by-value types anyway, whereas abbreviated
+ * keys are typically only of value to pass-by-reference types.
+ */
+ if (!state->sortKeys->abbrev_converter)
+ state->onlyKey = state->sortKeys;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return state;
+}
+
+/*
+ * tuplesort_set_bound
+ *
+ * Advise tuplesort that at most the first N result tuples are required.
+ *
+ * Must be called before inserting any tuples. (Actually, we could allow it
+ * as long as the sort hasn't spilled to disk, but there seems no need for
+ * delayed calls at the moment.)
+ *
+ * This is a hint only. The tuplesort may still return more tuples than
+ * requested. Parallel leader tuplesorts will always ignore the hint.
+ */
+void
+tuplesort_set_bound(Tuplesortstate *state, int64 bound)
+{
+ /* Assert we're called before loading any tuples */
+ Assert(state->status == TSS_INITIAL && state->memtupcount == 0);
+ /* Assert we allow bounded sorts */
+ Assert(state->sortopt & TUPLESORT_ALLOWBOUNDED);
+ /* Can't set the bound twice, either */
+ Assert(!state->bounded);
+ /* Also, this shouldn't be called in a parallel worker */
+ Assert(!WORKER(state));
+
+ /* Parallel leader allows but ignores hint */
+ if (LEADER(state))
+ return;
+
+#ifdef DEBUG_BOUNDED_SORT
+ /* Honor GUC setting that disables the feature (for easy testing) */
+ if (!optimize_bounded_sort)
+ return;
+#endif
+
+ /* We want to be able to compute bound * 2, so limit the setting */
+ if (bound > (int64) (INT_MAX / 2))
+ return;
+
+ state->bounded = true;
+ state->bound = (int) bound;
+
+ /*
+ * Bounded sorts are not an effective target for abbreviated key
+ * optimization. Disable by setting state to be consistent with no
+ * abbreviation support.
+ */
+ state->sortKeys->abbrev_converter = NULL;
+ if (state->sortKeys->abbrev_full_comparator)
+ state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
+
+ /* Not strictly necessary, but be tidy */
+ state->sortKeys->abbrev_abort = NULL;
+ state->sortKeys->abbrev_full_comparator = NULL;
+}
+
+/*
+ * tuplesort_used_bound
+ *
+ * Allow callers to find out if the sort state was able to use a bound.
+ */
+bool
+tuplesort_used_bound(Tuplesortstate *state)
+{
+ return state->boundUsed;
+}
+
+/*
+ * tuplesort_free
+ *
+ * Internal routine for freeing resources of tuplesort.
+ */
+static void
+tuplesort_free(Tuplesortstate *state)
+{
+ /* context swap probably not needed, but let's be safe */
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+#ifdef TRACE_SORT
+ long spaceUsed;
+
+ if (state->tapeset)
+ spaceUsed = LogicalTapeSetBlocks(state->tapeset);
+ else
+ spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
+#endif
+
+ /*
+ * Delete temporary "tape" files, if any.
+ *
+ * Note: want to include this in reported total cost of sort, hence need
+ * for two #ifdef TRACE_SORT sections.
+ *
+ * We don't bother to destroy the individual tapes here. They will go away
+ * with the sortcontext. (In TSS_FINALMERGE state, we have closed
+ * finished tapes already.)
+ */
+ if (state->tapeset)
+ LogicalTapeSetClose(state->tapeset);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ {
+ if (state->tapeset)
+ elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s",
+ SERIAL(state) ? "external sort" : "parallel external sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
+ else
+ elog(LOG, "%s of worker %d ended, %ld KB used: %s",
+ SERIAL(state) ? "internal sort" : "unperformed parallel sort",
+ state->worker, spaceUsed, pg_rusage_show(&state->ru_start));
+ }
+
+ TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed);
+#else
+
+ /*
+ * If you disabled TRACE_SORT, you can still probe sort__done, but you
+ * ain't getting space-used stats.
+ */
+ TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L);
+#endif
+
+ /* Free any execution state created for CLUSTER case */
+ if (state->estate != NULL)
+ {
+ ExprContext *econtext = GetPerTupleExprContext(state->estate);
+
+ ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple);
+ FreeExecutorState(state->estate);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Free the per-sort memory context, thereby releasing all working memory.
+ */
+ MemoryContextReset(state->sortcontext);
+}
+
+/*
+ * tuplesort_end
+ *
+ * Release resources and clean up.
+ *
+ * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
+ * pointing to garbage. Be careful not to attempt to use or free such
+ * pointers afterwards!
+ */
+void
+tuplesort_end(Tuplesortstate *state)
+{
+ tuplesort_free(state);
+
+ /*
+ * Free the main memory context, including the Tuplesortstate struct
+ * itself.
+ */
+ MemoryContextDelete(state->maincontext);
+}
+
+/*
+ * tuplesort_updatemax
+ *
+ * Update maximum resource usage statistics.
+ */
+static void
+tuplesort_updatemax(Tuplesortstate *state)
+{
+ int64 spaceUsed;
+ bool isSpaceDisk;
+
+ /*
+ * Note: it might seem we should provide both memory and disk usage for a
+ * disk-based sort. However, the current code doesn't track memory space
+ * accurately once we have begun to return tuples to the caller (since we
+ * don't account for pfree's the caller is expected to do), so we cannot
+ * rely on availMem in a disk sort. This does not seem worth the overhead
+ * to fix. Is it worth creating an API for the memory context code to
+ * tell us how much is actually used in sortcontext?
+ */
+ if (state->tapeset)
+ {
+ isSpaceDisk = true;
+ spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
+ }
+ else
+ {
+ isSpaceDisk = false;
+ spaceUsed = state->allowedMem - state->availMem;
+ }
+
+ /*
+ * Sort evicts data to the disk when it wasn't able to fit that data into
+ * main memory. This is why we assume space used on the disk to be more
+ * important for tracking resource usage than space used in memory. Note
+ * that the amount of space occupied by some tupleset on the disk might be
+ * less than amount of space occupied by the same tupleset in memory due
+ * to more compact representation.
+ */
+ if ((isSpaceDisk && !state->isMaxSpaceDisk) ||
+ (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace))
+ {
+ state->maxSpace = spaceUsed;
+ state->isMaxSpaceDisk = isSpaceDisk;
+ state->maxSpaceStatus = state->status;
+ }
+}
+
+/*
+ * tuplesort_reset
+ *
+ * Reset the tuplesort. Reset all the data in the tuplesort, but leave the
+ * meta-information in. After tuplesort_reset, tuplesort is ready to start
+ * a new sort. This allows avoiding recreation of tuple sort states (and
+ * save resources) when sorting multiple small batches.
+ */
+void
+tuplesort_reset(Tuplesortstate *state)
+{
+ tuplesort_updatemax(state);
+ tuplesort_free(state);
+
+ /*
+ * After we've freed up per-batch memory, re-setup all of the state common
+ * to both the first batch and any subsequent batch.
+ */
+ tuplesort_begin_batch(state);
+
+ state->lastReturnedTuple = NULL;
+ state->slabMemoryBegin = NULL;
+ state->slabMemoryEnd = NULL;
+ state->slabFreeHead = NULL;
+}
+
+/*
+ * Grow the memtuples[] array, if possible within our memory constraint. We
+ * must not exceed INT_MAX tuples in memory or the caller-provided memory
+ * limit. Return true if we were able to enlarge the array, false if not.
+ *
+ * Normally, at each increment we double the size of the array. When doing
+ * that would exceed a limit, we attempt one last, smaller increase (and then
+ * clear the growmemtuples flag so we don't try any more). That allows us to
+ * use memory as fully as permitted; sticking to the pure doubling rule could
+ * result in almost half going unused. Because availMem moves around with
+ * tuple addition/removal, we need some rule to prevent making repeated small
+ * increases in memtupsize, which would just be useless thrashing. The
+ * growmemtuples flag accomplishes that and also prevents useless
+ * recalculations in this function.
+ */
+static bool
+grow_memtuples(Tuplesortstate *state)
+{
+ int newmemtupsize;
+ int memtupsize = state->memtupsize;
+ int64 memNowUsed = state->allowedMem - state->availMem;
+
+ /* Forget it if we've already maxed out memtuples, per comment above */
+ if (!state->growmemtuples)
+ return false;
+
+ /* Select new value of memtupsize */
+ if (memNowUsed <= state->availMem)
+ {
+ /*
+ * We've used no more than half of allowedMem; double our usage,
+ * clamping at INT_MAX tuples.
+ */
+ if (memtupsize < INT_MAX / 2)
+ newmemtupsize = memtupsize * 2;
+ else
+ {
+ newmemtupsize = INT_MAX;
+ state->growmemtuples = false;
+ }
+ }
+ else
+ {
+ /*
+ * This will be the last increment of memtupsize. Abandon doubling
+ * strategy and instead increase as much as we safely can.
+ *
+ * To stay within allowedMem, we can't increase memtupsize by more
+ * than availMem / sizeof(SortTuple) elements. In practice, we want
+ * to increase it by considerably less, because we need to leave some
+ * space for the tuples to which the new array slots will refer. We
+ * assume the new tuples will be about the same size as the tuples
+ * we've already seen, and thus we can extrapolate from the space
+ * consumption so far to estimate an appropriate new size for the
+ * memtuples array. The optimal value might be higher or lower than
+ * this estimate, but it's hard to know that in advance. We again
+ * clamp at INT_MAX tuples.
+ *
+ * This calculation is safe against enlarging the array so much that
+ * LACKMEM becomes true, because the memory currently used includes
+ * the present array; thus, there would be enough allowedMem for the
+ * new array elements even if no other memory were currently used.
+ *
+ * We do the arithmetic in float8, because otherwise the product of
+ * memtupsize and allowedMem could overflow. Any inaccuracy in the
+ * result should be insignificant; but even if we computed a
+ * completely insane result, the checks below will prevent anything
+ * really bad from happening.
+ */
+ double grow_ratio;
+
+ grow_ratio = (double) state->allowedMem / (double) memNowUsed;
+ if (memtupsize * grow_ratio < INT_MAX)
+ newmemtupsize = (int) (memtupsize * grow_ratio);
+ else
+ newmemtupsize = INT_MAX;
+
+ /* We won't make any further enlargement attempts */
+ state->growmemtuples = false;
+ }
+
+ /* Must enlarge array by at least one element, else report failure */
+ if (newmemtupsize <= memtupsize)
+ goto noalloc;
+
+ /*
+ * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
+ * to ensure our request won't be rejected. Note that we can easily
+ * exhaust address space before facing this outcome. (This is presently
+ * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
+ * don't rely on that at this distance.)
+ */
+ if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple))
+ {
+ newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple));
+ state->growmemtuples = false; /* can't grow any more */
+ }
+
+ /*
+ * We need to be sure that we do not cause LACKMEM to become true, else
+ * the space management algorithm will go nuts. The code above should
+ * never generate a dangerous request, but to be safe, check explicitly
+ * that the array growth fits within availMem. (We could still cause
+ * LACKMEM if the memory chunk overhead associated with the memtuples
+ * array were to increase. That shouldn't happen because we chose the
+ * initial array size large enough to ensure that palloc will be treating
+ * both old and new arrays as separate chunks. But we'll check LACKMEM
+ * explicitly below just in case.)
+ */
+ if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple)))
+ goto noalloc;
+
+ /* OK, do it */
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ state->memtupsize = newmemtupsize;
+ state->memtuples = (SortTuple *)
+ repalloc_huge(state->memtuples,
+ state->memtupsize * sizeof(SortTuple));
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ if (LACKMEM(state))
+ elog(ERROR, "unexpected out-of-memory situation in tuplesort");
+ return true;
+
+noalloc:
+ /* If for any reason we didn't realloc, shut off future attempts */
+ state->growmemtuples = false;
+ return false;
+}
+
+/*
+ * Accept one tuple while collecting input data for sort.
+ *
+ * Note that the input data is always copied; the caller need not save it.
+ */
+void
+tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ SortTuple stup;
+
+ /*
+ * Copy the given tuple into memory we control, and decrease availMem.
+ * Then call the common code.
+ */
+ COPYTUP(state, &stup, (void *) slot);
+
+ puttuple_common(state, &stup);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accept one tuple while collecting input data for sort.
+ *
+ * Note that the input data is always copied; the caller need not save it.
+ */
+void
+tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ SortTuple stup;
+
+ /*
+ * Copy the given tuple into memory we control, and decrease availMem.
+ * Then call the common code.
+ */
+ COPYTUP(state, &stup, (void *) tup);
+
+ puttuple_common(state, &stup);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Collect one index tuple while collecting input data for sort, building
+ * it from caller-supplied values.
+ */
+void
+tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
+ ItemPointer self, Datum *values,
+ bool *isnull)
+{
+ MemoryContext oldcontext;
+ SortTuple stup;
+ Datum original;
+ IndexTuple tuple;
+
+ stup.tuple = index_form_tuple_context(RelationGetDescr(rel), values,
+ isnull, state->tuplecontext);
+ tuple = ((IndexTuple) stup.tuple);
+ tuple->t_tid = *self;
+ USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+ /* set up first-column key value */
+ original = index_getattr(tuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &stup.isnull1);
+
+ oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+ if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup.datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup.datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup.datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ tuple = mtup->tuple;
+ mtup->datum1 = index_getattr(tuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &mtup->isnull1);
+ }
+ }
+
+ puttuple_common(state, &stup);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accept one Datum while collecting input data for sort.
+ *
+ * If the Datum is pass-by-ref type, the value will be copied.
+ */
+void
+tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
+ SortTuple stup;
+
+ /*
+ * Pass-by-value types or null values are just stored directly in
+ * stup.datum1 (and stup.tuple is not used and set to NULL).
+ *
+ * Non-null pass-by-reference values need to be copied into memory we
+ * control, and possibly abbreviated. The copied value is pointed to by
+ * stup.tuple and is treated as the canonical copy (e.g. to return via
+ * tuplesort_getdatum or when writing to tape); stup.datum1 gets the
+ * abbreviated value if abbreviation is happening, otherwise it's
+ * identical to stup.tuple.
+ */
+
+ if (isNull || !state->tuples)
+ {
+ /*
+ * Set datum1 to zeroed representation for NULLs (to be consistent,
+ * and to support cheap inequality tests for NULL abbreviated keys).
+ */
+ stup.datum1 = !isNull ? val : (Datum) 0;
+ stup.isnull1 = isNull;
+ stup.tuple = NULL; /* no separate storage */
+ MemoryContextSwitchTo(state->sortcontext);
+ }
+ else
+ {
+ Datum original = datumCopy(val, false, state->datumTypeLen);
+
+ stup.isnull1 = false;
+ stup.tuple = DatumGetPointer(original);
+ USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+ MemoryContextSwitchTo(state->sortcontext);
+
+ if (!state->sortKeys->abbrev_converter)
+ {
+ stup.datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup.datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup.datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any
+ * case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ mtup->datum1 = PointerGetDatum(mtup->tuple);
+ }
+ }
+ }
+
+ puttuple_common(state, &stup);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Shared code for tuple and datum cases.
+ */
+static void
+puttuple_common(Tuplesortstate *state, SortTuple *tuple)
+{
+ Assert(!LEADER(state));
+
+ switch (state->status)
+ {
+ case TSS_INITIAL:
+
+ /*
+ * Save the tuple into the unsorted array. First, grow the array
+ * as needed. Note that we try to grow the array when there is
+ * still one free slot remaining --- if we fail, there'll still be
+ * room to store the incoming tuple, and then we'll switch to
+ * tape-based operation.
+ */
+ if (state->memtupcount >= state->memtupsize - 1)
+ {
+ (void) grow_memtuples(state);
+ Assert(state->memtupcount < state->memtupsize);
+ }
+ state->memtuples[state->memtupcount++] = *tuple;
+
+ /*
+ * Check if it's time to switch over to a bounded heapsort. We do
+ * so if the input tuple count exceeds twice the desired tuple
+ * count (this is a heuristic for where heapsort becomes cheaper
+ * than a quicksort), or if we've just filled workMem and have
+ * enough tuples to meet the bound.
+ *
+ * Note that once we enter TSS_BOUNDED state we will always try to
+ * complete the sort that way. In the worst case, if later input
+ * tuples are larger than earlier ones, this might cause us to
+ * exceed workMem significantly.
+ */
+ if (state->bounded &&
+ (state->memtupcount > state->bound * 2 ||
+ (state->memtupcount > state->bound && LACKMEM(state))))
+ {
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "switching to bounded heapsort at %d tuples: %s",
+ state->memtupcount,
+ pg_rusage_show(&state->ru_start));
+#endif
+ make_bounded_heap(state);
+ return;
+ }
+
+ /*
+ * Done if we still fit in available memory and have array slots.
+ */
+ if (state->memtupcount < state->memtupsize && !LACKMEM(state))
+ return;
+
+ /*
+ * Nope; time to switch to tape-based operation.
+ */
+ inittapes(state, true);
+
+ /*
+ * Dump all tuples.
+ */
+ dumptuples(state, false);
+ break;
+
+ case TSS_BOUNDED:
+
+ /*
+ * We don't want to grow the array here, so check whether the new
+ * tuple can be discarded before putting it in. This should be a
+ * good speed optimization, too, since when there are many more
+ * input tuples than the bound, most input tuples can be discarded
+ * with just this one comparison. Note that because we currently
+ * have the sort direction reversed, we must check for <= not >=.
+ */
+ if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0)
+ {
+ /* new tuple <= top of the heap, so we can discard it */
+ free_sort_tuple(state, tuple);
+ CHECK_FOR_INTERRUPTS();
+ }
+ else
+ {
+ /* discard top of heap, replacing it with the new tuple */
+ free_sort_tuple(state, &state->memtuples[0]);
+ tuplesort_heap_replace_top(state, tuple);
+ }
+ break;
+
+ case TSS_BUILDRUNS:
+
+ /*
+ * Save the tuple into the unsorted array (there must be space)
+ */
+ state->memtuples[state->memtupcount++] = *tuple;
+
+ /*
+ * If we are over the memory limit, dump all tuples.
+ */
+ dumptuples(state, false);
+ break;
+
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ break;
+ }
+}
+
+static bool
+consider_abort_common(Tuplesortstate *state)
+{
+ Assert(state->sortKeys[0].abbrev_converter != NULL);
+ Assert(state->sortKeys[0].abbrev_abort != NULL);
+ Assert(state->sortKeys[0].abbrev_full_comparator != NULL);
+
+ /*
+ * Check effectiveness of abbreviation optimization. Consider aborting
+ * when still within memory limit.
+ */
+ if (state->status == TSS_INITIAL &&
+ state->memtupcount >= state->abbrevNext)
+ {
+ state->abbrevNext *= 2;
+
+ /*
+ * Check opclass-supplied abbreviation abort routine. It may indicate
+ * that abbreviation should not proceed.
+ */
+ if (!state->sortKeys->abbrev_abort(state->memtupcount,
+ state->sortKeys))
+ return false;
+
+ /*
+ * Finally, restore authoritative comparator, and indicate that
+ * abbreviation is not in play by setting abbrev_converter to NULL
+ */
+ state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator;
+ state->sortKeys[0].abbrev_converter = NULL;
+ /* Not strictly necessary, but be tidy */
+ state->sortKeys[0].abbrev_abort = NULL;
+ state->sortKeys[0].abbrev_full_comparator = NULL;
+
+ /* Give up - expect original pass-by-value representation */
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * All tuples have been provided; finish the sort.
+ */
+void
+tuplesort_performsort(Tuplesortstate *state)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "performsort of worker %d starting: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
+#endif
+
+ switch (state->status)
+ {
+ case TSS_INITIAL:
+
+ /*
+ * We were able to accumulate all the tuples within the allowed
+ * amount of memory, or leader to take over worker tapes
+ */
+ if (SERIAL(state))
+ {
+ /* Just qsort 'em and we're done */
+ tuplesort_sort_memtuples(state);
+ state->status = TSS_SORTEDINMEM;
+ }
+ else if (WORKER(state))
+ {
+ /*
+ * Parallel workers must still dump out tuples to tape. No
+ * merge is required to produce single output run, though.
+ */
+ inittapes(state, false);
+ dumptuples(state, true);
+ worker_nomergeruns(state);
+ state->status = TSS_SORTEDONTAPE;
+ }
+ else
+ {
+ /*
+ * Leader will take over worker tapes and merge worker runs.
+ * Note that mergeruns sets the correct state->status.
+ */
+ leader_takeover_tapes(state);
+ mergeruns(state);
+ }
+ state->current = 0;
+ state->eof_reached = false;
+ state->markpos_block = 0L;
+ state->markpos_offset = 0;
+ state->markpos_eof = false;
+ break;
+
+ case TSS_BOUNDED:
+
+ /*
+ * We were able to accumulate all the tuples required for output
+ * in memory, using a heap to eliminate excess tuples. Now we
+ * have to transform the heap to a properly-sorted array.
+ */
+ sort_bounded_heap(state);
+ state->current = 0;
+ state->eof_reached = false;
+ state->markpos_offset = 0;
+ state->markpos_eof = false;
+ state->status = TSS_SORTEDINMEM;
+ break;
+
+ case TSS_BUILDRUNS:
+
+ /*
+ * Finish tape-based sort. First, flush all tuples remaining in
+ * memory out to tape; then merge until we have a single remaining
+ * run (or, if !randomAccess and !WORKER(), one run per tape).
+ * Note that mergeruns sets the correct state->status.
+ */
+ dumptuples(state, true);
+ mergeruns(state);
+ state->eof_reached = false;
+ state->markpos_block = 0L;
+ state->markpos_offset = 0;
+ state->markpos_eof = false;
+ break;
+
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ break;
+ }
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ {
+ if (state->status == TSS_FINALMERGE)
+ elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
+ state->worker, state->nInputTapes,
+ pg_rusage_show(&state->ru_start));
+ else
+ elog(LOG, "performsort of worker %d done: %s",
+ state->worker, pg_rusage_show(&state->ru_start));
+ }
+#endif
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Internal routine to fetch the next tuple in either forward or back
+ * direction into *stup. Returns false if no more tuples.
+ * Returned tuple belongs to tuplesort memory context, and must not be freed
+ * by caller. Note that fetched tuple is stored in memory that may be
+ * recycled by any future fetch.
+ */
+static bool
+tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
+ SortTuple *stup)
+{
+ unsigned int tuplen;
+ size_t nmoved;
+
+ Assert(!WORKER(state));
+
+ switch (state->status)
+ {
+ case TSS_SORTEDINMEM:
+ Assert(forward || state->sortopt & TUPLESORT_RANDOMACCESS);
+ Assert(!state->slabAllocatorUsed);
+ if (forward)
+ {
+ if (state->current < state->memtupcount)
+ {
+ *stup = state->memtuples[state->current++];
+ return true;
+ }
+ state->eof_reached = true;
+
+ /*
+ * Complain if caller tries to retrieve more tuples than
+ * originally asked for in a bounded sort. This is because
+ * returning EOF here might be the wrong thing.
+ */
+ if (state->bounded && state->current >= state->bound)
+ elog(ERROR, "retrieved too many tuples in a bounded sort");
+
+ return false;
+ }
+ else
+ {
+ if (state->current <= 0)
+ return false;
+
+ /*
+ * if all tuples are fetched already then we return last
+ * tuple, else - tuple before last returned.
+ */
+ if (state->eof_reached)
+ state->eof_reached = false;
+ else
+ {
+ state->current--; /* last returned tuple */
+ if (state->current <= 0)
+ return false;
+ }
+ *stup = state->memtuples[state->current - 1];
+ return true;
+ }
+ break;
+
+ case TSS_SORTEDONTAPE:
+ Assert(forward || state->sortopt & TUPLESORT_RANDOMACCESS);
+ Assert(state->slabAllocatorUsed);
+
+ /*
+ * The slot that held the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->lastReturnedTuple)
+ {
+ RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+ state->lastReturnedTuple = NULL;
+ }
+
+ if (forward)
+ {
+ if (state->eof_reached)
+ return false;
+
+ if ((tuplen = getlen(state->result_tape, true)) != 0)
+ {
+ READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle
+ * its memory on next call. (This can be NULL, in the
+ * !state->tuples case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
+ return true;
+ }
+ else
+ {
+ state->eof_reached = true;
+ return false;
+ }
+ }
+
+ /*
+ * Backward.
+ *
+ * if all tuples are fetched already then we return last tuple,
+ * else - tuple before last returned.
+ */
+ if (state->eof_reached)
+ {
+ /*
+ * Seek position is pointing just past the zero tuplen at the
+ * end of file; back up to fetch last tuple's ending length
+ * word. If seek fails we must have a completely empty file.
+ */
+ nmoved = LogicalTapeBackspace(state->result_tape,
+ 2 * sizeof(unsigned int));
+ if (nmoved == 0)
+ return false;
+ else if (nmoved != 2 * sizeof(unsigned int))
+ elog(ERROR, "unexpected tape position");
+ state->eof_reached = false;
+ }
+ else
+ {
+ /*
+ * Back up and fetch previously-returned tuple's ending length
+ * word. If seek fails, assume we are at start of file.
+ */
+ nmoved = LogicalTapeBackspace(state->result_tape,
+ sizeof(unsigned int));
+ if (nmoved == 0)
+ return false;
+ else if (nmoved != sizeof(unsigned int))
+ elog(ERROR, "unexpected tape position");
+ tuplen = getlen(state->result_tape, false);
+
+ /*
+ * Back up to get ending length word of tuple before it.
+ */
+ nmoved = LogicalTapeBackspace(state->result_tape,
+ tuplen + 2 * sizeof(unsigned int));
+ if (nmoved == tuplen + sizeof(unsigned int))
+ {
+ /*
+ * We backed up over the previous tuple, but there was no
+ * ending length word before it. That means that the prev
+ * tuple is the first tuple in the file. It is now the
+ * next to read in forward direction (not obviously right,
+ * but that is what in-memory case does).
+ */
+ return false;
+ }
+ else if (nmoved != tuplen + 2 * sizeof(unsigned int))
+ elog(ERROR, "bogus tuple length in backward scan");
+ }
+
+ tuplen = getlen(state->result_tape, false);
+
+ /*
+ * Now we have the length of the prior tuple, back up and read it.
+ * Note: READTUP expects we are positioned after the initial
+ * length word of the tuple, so back up to that point.
+ */
+ nmoved = LogicalTapeBackspace(state->result_tape,
+ tuplen);
+ if (nmoved != tuplen)
+ elog(ERROR, "bogus tuple length in backward scan");
+ READTUP(state, stup, state->result_tape, tuplen);
+
+ /*
+ * Remember the tuple we return, so that we can recycle its memory
+ * on next call. (This can be NULL, in the Datum case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
+ return true;
+
+ case TSS_FINALMERGE:
+ Assert(forward);
+ /* We are managing memory ourselves, with the slab allocator. */
+ Assert(state->slabAllocatorUsed);
+
+ /*
+ * The slab slot holding the tuple that we returned in previous
+ * gettuple call can now be reused.
+ */
+ if (state->lastReturnedTuple)
+ {
+ RELEASE_SLAB_SLOT(state, state->lastReturnedTuple);
+ state->lastReturnedTuple = NULL;
+ }
+
+ /*
+ * This code should match the inner loop of mergeonerun().
+ */
+ if (state->memtupcount > 0)
+ {
+ int srcTapeIndex = state->memtuples[0].srctape;
+ LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
+ SortTuple newtup;
+
+ *stup = state->memtuples[0];
+
+ /*
+ * Remember the tuple we return, so that we can recycle its
+ * memory on next call. (This can be NULL, in the Datum case).
+ */
+ state->lastReturnedTuple = stup->tuple;
+
+ /*
+ * Pull next tuple from tape, and replace the returned tuple
+ * at top of the heap with it.
+ */
+ if (!mergereadnext(state, srcTape, &newtup))
+ {
+ /*
+ * If no more data, we've reached end of run on this tape.
+ * Remove the top node from the heap.
+ */
+ tuplesort_heap_delete_top(state);
+ state->nInputRuns--;
+
+ /*
+ * Close the tape. It'd go away at the end of the sort
+ * anyway, but better to release the memory early.
+ */
+ LogicalTapeClose(srcTape);
+ return true;
+ }
+ newtup.srctape = srcTapeIndex;
+ tuplesort_heap_replace_top(state, &newtup);
+ return true;
+ }
+ return false;
+
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ return false; /* keep compiler quiet */
+ }
+}
+
+/*
+ * Fetch the next tuple in either forward or back direction.
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
+ *
+ * Caller may optionally be passed back abbreviated value (on true return
+ * value) when abbreviation was used, which can be used to cheaply avoid
+ * equality checks that might otherwise be required. Caller can safely make a
+ * determination of "non-equal tuple" based on simple binary inequality. A
+ * NULL value in leading attribute will set abbreviated value to zeroed
+ * representation, which caller may rely on in abbreviated inequality check.
+ *
+ * If copy is true, the slot receives a tuple that's been copied into the
+ * caller's memory context, so that it will stay valid regardless of future
+ * manipulations of the tuplesort's state (up to and including deleting the
+ * tuplesort). If copy is false, the slot will just receive a pointer to a
+ * tuple held within the tuplesort, which is more efficient, but only safe for
+ * callers that are prepared to have any subsequent manipulation of the
+ * tuplesort's state invalidate slot contents.
+ */
+bool
+tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy,
+ TupleTableSlot *slot, Datum *abbrev)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ SortTuple stup;
+
+ if (!tuplesort_gettuple_common(state, forward, &stup))
+ stup.tuple = NULL;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (stup.tuple)
+ {
+ /* Record abbreviated key for caller */
+ if (state->sortKeys->abbrev_converter && abbrev)
+ *abbrev = stup.datum1;
+
+ if (copy)
+ stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple);
+
+ ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy);
+ return true;
+ }
+ else
+ {
+ ExecClearTuple(slot);
+ return false;
+ }
+}
+
+/*
+ * Fetch the next tuple in either forward or back direction.
+ * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
+ */
+HeapTuple
+tuplesort_getheaptuple(Tuplesortstate *state, bool forward)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ SortTuple stup;
+
+ if (!tuplesort_gettuple_common(state, forward, &stup))
+ stup.tuple = NULL;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return stup.tuple;
+}
+
+/*
+ * Fetch the next index tuple in either forward or back direction.
+ * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller. Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
+ */
+IndexTuple
+tuplesort_getindextuple(Tuplesortstate *state, bool forward)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ SortTuple stup;
+
+ if (!tuplesort_gettuple_common(state, forward, &stup))
+ stup.tuple = NULL;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return (IndexTuple) stup.tuple;
+}
+
+/*
+ * Fetch the next Datum in either forward or back direction.
+ * Returns false if no more datums.
+ *
+ * If the Datum is pass-by-ref type, the returned value is freshly palloc'd
+ * in caller's context, and is now owned by the caller (this differs from
+ * similar routines for other types of tuplesorts).
+ *
+ * Caller may optionally be passed back abbreviated value (on true return
+ * value) when abbreviation was used, which can be used to cheaply avoid
+ * equality checks that might otherwise be required. Caller can safely make a
+ * determination of "non-equal tuple" based on simple binary inequality. A
+ * NULL value will have a zeroed abbreviated value representation, which caller
+ * may rely on in abbreviated inequality check.
+ */
+bool
+tuplesort_getdatum(Tuplesortstate *state, bool forward,
+ Datum *val, bool *isNull, Datum *abbrev)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ SortTuple stup;
+
+ if (!tuplesort_gettuple_common(state, forward, &stup))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ return false;
+ }
+
+ /* Ensure we copy into caller's memory context */
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Record abbreviated key for caller */
+ if (state->sortKeys->abbrev_converter && abbrev)
+ *abbrev = stup.datum1;
+
+ if (stup.isnull1 || !state->tuples)
+ {
+ *val = stup.datum1;
+ *isNull = stup.isnull1;
+ }
+ else
+ {
+ /* use stup.tuple because stup.datum1 may be an abbreviation */
+ *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen);
+ *isNull = false;
+ }
+
+ return true;
+}
+
+/*
+ * Advance over N tuples in either forward or back direction,
+ * without returning any data. N==0 is a no-op.
+ * Returns true if successful, false if ran out of tuples.
+ */
+bool
+tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward)
+{
+ MemoryContext oldcontext;
+
+ /*
+ * We don't actually support backwards skip yet, because no callers need
+ * it. The API is designed to allow for that later, though.
+ */
+ Assert(forward);
+ Assert(ntuples >= 0);
+ Assert(!WORKER(state));
+
+ switch (state->status)
+ {
+ case TSS_SORTEDINMEM:
+ if (state->memtupcount - state->current >= ntuples)
+ {
+ state->current += ntuples;
+ return true;
+ }
+ state->current = state->memtupcount;
+ state->eof_reached = true;
+
+ /*
+ * Complain if caller tries to retrieve more tuples than
+ * originally asked for in a bounded sort. This is because
+ * returning EOF here might be the wrong thing.
+ */
+ if (state->bounded && state->current >= state->bound)
+ elog(ERROR, "retrieved too many tuples in a bounded sort");
+
+ return false;
+
+ case TSS_SORTEDONTAPE:
+ case TSS_FINALMERGE:
+
+ /*
+ * We could probably optimize these cases better, but for now it's
+ * not worth the trouble.
+ */
+ oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ while (ntuples-- > 0)
+ {
+ SortTuple stup;
+
+ if (!tuplesort_gettuple_common(state, forward, &stup))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ return false;
+ }
+ CHECK_FOR_INTERRUPTS();
+ }
+ MemoryContextSwitchTo(oldcontext);
+ return true;
+
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ return false; /* keep compiler quiet */
+ }
+}
+
+/*
+ * tuplesort_merge_order - report merge order we'll use for given memory
+ * (note: "merge order" just means the number of input tapes in the merge).
+ *
+ * This is exported for use by the planner. allowedMem is in bytes.
+ */
+int
+tuplesort_merge_order(int64 allowedMem)
+{
+ int mOrder;
+
+ /*----------
+ * In the merge phase, we need buffer space for each input and output tape.
+ * Each pass in the balanced merge algorithm reads from M input tapes, and
+ * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes
+ * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per
+ * input tape.
+ *
+ * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
+ * N * TAPE_BUFFER_OVERHEAD
+ *
+ * Except for the last and next-to-last merge passes, where there can be
+ * fewer tapes left to process, M = N. We choose M so that we have the
+ * desired amount of memory available for the input buffers
+ * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
+ * available for the tape buffers (allowedMem).
+ *
+ * Note: you might be thinking we need to account for the memtuples[]
+ * array in this calculation, but we effectively treat that as part of the
+ * MERGE_BUFFER_SIZE workspace.
+ *----------
+ */
+ mOrder = allowedMem /
+ (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
+
+ /*
+ * Even in minimum memory, use at least a MINORDER merge. On the other
+ * hand, even when we have lots of memory, do not use more than a MAXORDER
+ * merge. Tapes are pretty cheap, but they're not entirely free. Each
+ * additional tape reduces the amount of memory available to build runs,
+ * which in turn can cause the same sort to need more runs, which makes
+ * merging slower even if it can still be done in a single pass. Also,
+ * high order merges are quite slow due to CPU cache effects; it can be
+ * faster to pay the I/O cost of a multi-pass merge than to perform a
+ * single merge pass across many hundreds of tapes.
+ */
+ mOrder = Max(mOrder, MINORDER);
+ mOrder = Min(mOrder, MAXORDER);
+
+ return mOrder;
+}
+
+/*
+ * Helper function to calculate how much memory to allocate for the read buffer
+ * of each input tape in a merge pass.
+ *
+ * 'avail_mem' is the amount of memory available for the buffers of all the
+ * tapes, both input and output.
+ * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
+ * 'maxOutputTapes' is the max. number of output tapes we should produce.
+ */
+static int64
+merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
+ int maxOutputTapes)
+{
+ int nOutputRuns;
+ int nOutputTapes;
+
+ /*
+ * How many output tapes will we produce in this pass?
+ *
+ * This is nInputRuns / nInputTapes, rounded up.
+ */
+ nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
+
+ nOutputTapes = Min(nOutputRuns, maxOutputTapes);
+
+ /*
+ * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All
+ * remaining memory is divided evenly between the input tapes.
+ *
+ * This also follows from the formula in tuplesort_merge_order, but here
+ * we derive the input buffer size from the amount of memory available,
+ * and M and N.
+ */
+ return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
+}
+
+/*
+ * inittapes - initialize for tape sorting.
+ *
+ * This is called only if we have found we won't sort in memory.
+ */
+static void
+inittapes(Tuplesortstate *state, bool mergeruns)
+{
+ Assert(!LEADER(state));
+
+ if (mergeruns)
+ {
+ /* Compute number of input tapes to use when merging */
+ state->maxTapes = tuplesort_merge_order(state->allowedMem);
+ }
+ else
+ {
+ /* Workers can sometimes produce single run, output without merge */
+ Assert(WORKER(state));
+ state->maxTapes = MINORDER;
+ }
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d switching to external sort with %d tapes: %s",
+ state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
+#endif
+
+ /* Create the tape set */
+ inittapestate(state, state->maxTapes);
+ state->tapeset =
+ LogicalTapeSetCreate(false,
+ state->shared ? &state->shared->fileset : NULL,
+ state->worker);
+
+ state->currentRun = 0;
+
+ /*
+ * Initialize logical tape arrays.
+ */
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
+
+ state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
+
+ state->status = TSS_BUILDRUNS;
+
+ selectnewtape(state);
+}
+
+/*
+ * inittapestate - initialize generic tape management state
+ */
+static void
+inittapestate(Tuplesortstate *state, int maxTapes)
+{
+ int64 tapeSpace;
+
+ /*
+ * Decrease availMem to reflect the space needed for tape buffers; but
+ * don't decrease it to the point that we have no room for tuples. (That
+ * case is only likely to occur if sorting pass-by-value Datums; in all
+ * other scenarios the memtuples[] array is unlikely to occupy more than
+ * half of allowedMem. In the pass-by-value case it's not important to
+ * account for tuple space, so we don't care if LACKMEM becomes
+ * inaccurate.)
+ */
+ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD;
+
+ if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
+ USEMEM(state, tapeSpace);
+
+ /*
+ * Make sure that the temp file(s) underlying the tape set are created in
+ * suitable temp tablespaces. For parallel sorts, this should have been
+ * called already, but it doesn't matter if it is called a second time.
+ */
+ PrepareTempTablespaces();
+}
+
+/*
+ * selectnewtape -- select next tape to output to.
+ *
+ * This is called after finishing a run when we know another run
+ * must be started. This is used both when building the initial
+ * runs, and during merge passes.
+ */
+static void
+selectnewtape(Tuplesortstate *state)
+{
+ /*
+ * At the beginning of each merge pass, nOutputTapes and nOutputRuns are
+ * both zero. On each call, we create a new output tape to hold the next
+ * run, until maxTapes is reached. After that, we assign new runs to the
+ * existing tapes in a round robin fashion.
+ */
+ if (state->nOutputTapes < state->maxTapes)
+ {
+ /* Create a new tape to hold the next run */
+ Assert(state->outputTapes[state->nOutputRuns] == NULL);
+ Assert(state->nOutputRuns == state->nOutputTapes);
+ state->destTape = LogicalTapeCreate(state->tapeset);
+ state->outputTapes[state->nOutputTapes] = state->destTape;
+ state->nOutputTapes++;
+ state->nOutputRuns++;
+ }
+ else
+ {
+ /*
+ * We have reached the max number of tapes. Append to an existing
+ * tape.
+ */
+ state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
+ state->nOutputRuns++;
+ }
+}
+
+/*
+ * Initialize the slab allocation arena, for the given number of slots.
+ */
+static void
+init_slab_allocator(Tuplesortstate *state, int numSlots)
+{
+ if (numSlots > 0)
+ {
+ char *p;
+ int i;
+
+ state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE);
+ state->slabMemoryEnd = state->slabMemoryBegin +
+ numSlots * SLAB_SLOT_SIZE;
+ state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin;
+ USEMEM(state, numSlots * SLAB_SLOT_SIZE);
+
+ p = state->slabMemoryBegin;
+ for (i = 0; i < numSlots - 1; i++)
+ {
+ ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE);
+ p += SLAB_SLOT_SIZE;
+ }
+ ((SlabSlot *) p)->nextfree = NULL;
+ }
+ else
+ {
+ state->slabMemoryBegin = state->slabMemoryEnd = NULL;
+ state->slabFreeHead = NULL;
+ }
+ state->slabAllocatorUsed = true;
+}
+
+/*
+ * mergeruns -- merge all the completed initial runs.
+ *
+ * This implements the Balanced k-Way Merge Algorithm. All input data has
+ * already been written to initial runs on tape (see dumptuples).
+ */
+static void
+mergeruns(Tuplesortstate *state)
+{
+ int tapenum;
+
+ Assert(state->status == TSS_BUILDRUNS);
+ Assert(state->memtupcount == 0);
+
+ if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL)
+ {
+ /*
+ * If there are multiple runs to be merged, when we go to read back
+ * tuples from disk, abbreviated keys will not have been stored, and
+ * we don't care to regenerate them. Disable abbreviation from this
+ * point on.
+ */
+ state->sortKeys->abbrev_converter = NULL;
+ state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator;
+
+ /* Not strictly necessary, but be tidy */
+ state->sortKeys->abbrev_abort = NULL;
+ state->sortKeys->abbrev_full_comparator = NULL;
+ }
+
+ /*
+ * Reset tuple memory. We've freed all the tuples that we previously
+ * allocated. We will use the slab allocator from now on.
+ */
+ MemoryContextResetOnly(state->tuplecontext);
+
+ /*
+ * We no longer need a large memtuples array. (We will allocate a smaller
+ * one for the heap later.)
+ */
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ pfree(state->memtuples);
+ state->memtuples = NULL;
+
+ /*
+ * Initialize the slab allocator. We need one slab slot per input tape,
+ * for the tuples in the heap, plus one to hold the tuple last returned
+ * from tuplesort_gettuple. (If we're sorting pass-by-val Datums,
+ * however, we don't need to do allocate anything.)
+ *
+ * In a multi-pass merge, we could shrink this allocation for the last
+ * merge pass, if it has fewer tapes than previous passes, but we don't
+ * bother.
+ *
+ * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
+ * to track memory usage of individual tuples.
+ */
+ if (state->tuples)
+ init_slab_allocator(state, state->nOutputTapes + 1);
+ else
+ init_slab_allocator(state, 0);
+
+ /*
+ * Allocate a new 'memtuples' array, for the heap. It will hold one tuple
+ * from each input tape.
+ *
+ * We could shrink this, too, between passes in a multi-pass merge, but we
+ * don't bother. (The initial input tapes are still in outputTapes. The
+ * number of input tapes will not increase between passes.)
+ */
+ state->memtupsize = state->nOutputTapes;
+ state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
+ state->nOutputTapes * sizeof(SortTuple));
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
+ /*
+ * Use all the remaining memory we have available for tape buffers among
+ * all the input tapes. At the beginning of each merge pass, we will
+ * divide this memory between the input and output tapes in the pass.
+ */
+ state->tape_buffer_mem = state->availMem;
+ USEMEM(state, state->tape_buffer_mem);
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d using %zu KB of memory for tape buffers",
+ state->worker, state->tape_buffer_mem / 1024);
+#endif
+
+ for (;;)
+ {
+ /*
+ * On the first iteration, or if we have read all the runs from the
+ * input tapes in a multi-pass merge, it's time to start a new pass.
+ * Rewind all the output tapes, and make them inputs for the next
+ * pass.
+ */
+ if (state->nInputRuns == 0)
+ {
+ int64 input_buffer_size;
+
+ /* Close the old, emptied, input tapes */
+ if (state->nInputTapes > 0)
+ {
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
+ pfree(state->inputTapes);
+ }
+
+ /* Previous pass's outputs become next pass's inputs. */
+ state->inputTapes = state->outputTapes;
+ state->nInputTapes = state->nOutputTapes;
+ state->nInputRuns = state->nOutputRuns;
+
+ /*
+ * Reset output tape variables. The actual LogicalTapes will be
+ * created as needed, here we only allocate the array to hold
+ * them.
+ */
+ state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
+ state->nOutputTapes = 0;
+ state->nOutputRuns = 0;
+
+ /*
+ * Redistribute the memory allocated for tape buffers, among the
+ * new input and output tapes.
+ */
+ input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
+ state->nInputTapes,
+ state->nInputRuns,
+ state->maxTapes);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
+ state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ /* Prepare the new input tapes for merge pass. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
+
+ /*
+ * If there's just one run left on each input tape, then only one
+ * merge pass remains. If we don't have to produce a materialized
+ * sorted tape, we can stop at this point and do the final merge
+ * on-the-fly.
+ */
+ if ((state->sortopt & TUPLESORT_RANDOMACCESS) == 0
+ && state->nInputRuns <= state->nInputTapes
+ && !WORKER(state))
+ {
+ /* Tell logtape.c we won't be writing anymore */
+ LogicalTapeSetForgetFreeSpace(state->tapeset);
+ /* Initialize for the final merge pass */
+ beginmerge(state);
+ state->status = TSS_FINALMERGE;
+ return;
+ }
+ }
+
+ /* Select an output tape */
+ selectnewtape(state);
+
+ /* Merge one run from each input tape. */
+ mergeonerun(state);
+
+ /*
+ * If the input tapes are empty, and we output only one output run,
+ * we're done. The current output tape contains the final result.
+ */
+ if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
+ break;
+ }
+
+ /*
+ * Done. The result is on a single run on a single tape.
+ */
+ state->result_tape = state->outputTapes[0];
+ if (!WORKER(state))
+ LogicalTapeFreeze(state->result_tape, NULL);
+ else
+ worker_freeze_result_tape(state);
+ state->status = TSS_SORTEDONTAPE;
+
+ /* Close all the now-empty input tapes, to release their read buffers. */
+ for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+ LogicalTapeClose(state->inputTapes[tapenum]);
+}
+
+/*
+ * Merge one run from each input tape.
+ */
+static void
+mergeonerun(Tuplesortstate *state)
+{
+ int srcTapeIndex;
+ LogicalTape *srcTape;
+
+ /*
+ * Start the merge by loading one tuple from each active source tape into
+ * the heap.
+ */
+ beginmerge(state);
+
+ /*
+ * Execute merge by repeatedly extracting lowest tuple in heap, writing it
+ * out, and replacing it with next tuple from same tape (if there is
+ * another one).
+ */
+ while (state->memtupcount > 0)
+ {
+ SortTuple stup;
+
+ /* write the tuple to destTape */
+ srcTapeIndex = state->memtuples[0].srctape;
+ srcTape = state->inputTapes[srcTapeIndex];
+ WRITETUP(state, state->destTape, &state->memtuples[0]);
+
+ /* recycle the slot of the tuple we just wrote out, for the next read */
+ if (state->memtuples[0].tuple)
+ RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple);
+
+ /*
+ * pull next tuple from the tape, and replace the written-out tuple in
+ * the heap with it.
+ */
+ if (mergereadnext(state, srcTape, &stup))
+ {
+ stup.srctape = srcTapeIndex;
+ tuplesort_heap_replace_top(state, &stup);
+ }
+ else
+ {
+ tuplesort_heap_delete_top(state);
+ state->nInputRuns--;
+ }
+ }
+
+ /*
+ * When the heap empties, we're done. Write an end-of-run marker on the
+ * output tape.
+ */
+ markrunend(state->destTape);
+}
+
+/*
+ * beginmerge - initialize for a merge pass
+ *
+ * Fill the merge heap with the first tuple from each input tape.
+ */
+static void
+beginmerge(Tuplesortstate *state)
+{
+ int activeTapes;
+ int srcTapeIndex;
+
+ /* Heap should be empty here */
+ Assert(state->memtupcount == 0);
+
+ activeTapes = Min(state->nInputTapes, state->nInputRuns);
+
+ for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
+ {
+ SortTuple tup;
+
+ if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
+ {
+ tup.srctape = srcTapeIndex;
+ tuplesort_heap_insert(state, &tup);
+ }
+ }
+}
+
+/*
+ * mergereadnext - read next tuple from one merge input tape
+ *
+ * Returns false on EOF.
+ */
+static bool
+mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
+{
+ unsigned int tuplen;
+
+ /* read next tuple, if any */
+ if ((tuplen = getlen(srcTape, true)) == 0)
+ return false;
+ READTUP(state, stup, srcTape, tuplen);
+
+ return true;
+}
+
+/*
+ * dumptuples - remove tuples from memtuples and write initial run to tape
+ *
+ * When alltuples = true, dump everything currently in memory. (This case is
+ * only used at end of input data.)
+ */
+static void
+dumptuples(Tuplesortstate *state, bool alltuples)
+{
+ int memtupwrite;
+ int i;
+
+ /*
+ * Nothing to do if we still fit in available memory and have array slots,
+ * unless this is the final call during initial run generation.
+ */
+ if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&
+ !alltuples)
+ return;
+
+ /*
+ * Final call might require no sorting, in rare cases where we just so
+ * happen to have previously LACKMEM()'d at the point where exactly all
+ * remaining tuples are loaded into memory, just before input was
+ * exhausted. In general, short final runs are quite possible, but avoid
+ * creating a completely empty run. In a worker, though, we must produce
+ * at least one tape, even if it's empty.
+ */
+ if (state->memtupcount == 0 && state->currentRun > 0)
+ return;
+
+ Assert(state->status == TSS_BUILDRUNS);
+
+ /*
+ * It seems unlikely that this limit will ever be exceeded, but take no
+ * chances
+ */
+ if (state->currentRun == INT_MAX)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("cannot have more than %d runs for an external sort",
+ INT_MAX)));
+
+ if (state->currentRun > 0)
+ selectnewtape(state);
+
+ state->currentRun++;
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d starting quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ /*
+ * Sort all tuples accumulated within the allowed amount of memory for
+ * this run using quicksort
+ */
+ tuplesort_sort_memtuples(state);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d finished quicksort of run %d: %s",
+ state->worker, state->currentRun,
+ pg_rusage_show(&state->ru_start));
+#endif
+
+ memtupwrite = state->memtupcount;
+ for (i = 0; i < memtupwrite; i++)
+ {
+ WRITETUP(state, state->destTape, &state->memtuples[i]);
+ state->memtupcount--;
+ }
+
+ /*
+ * Reset tuple memory. We've freed all of the tuples that we previously
+ * allocated. It's important to avoid fragmentation when there is a stark
+ * change in the sizes of incoming tuples. Fragmentation due to
+ * AllocSetFree's bucketing by size class might be particularly bad if
+ * this step wasn't taken.
+ */
+ MemoryContextReset(state->tuplecontext);
+
+ markrunend(state->destTape);
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "worker %d finished writing run %d to tape %d: %s",
+ state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
+ pg_rusage_show(&state->ru_start));
+#endif
+}
+
+/*
+ * tuplesort_rescan - rewind and replay the scan
+ */
+void
+tuplesort_rescan(Tuplesortstate *state)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+ Assert(state->sortopt & TUPLESORT_RANDOMACCESS);
+
+ switch (state->status)
+ {
+ case TSS_SORTEDINMEM:
+ state->current = 0;
+ state->eof_reached = false;
+ state->markpos_offset = 0;
+ state->markpos_eof = false;
+ break;
+ case TSS_SORTEDONTAPE:
+ LogicalTapeRewindForRead(state->result_tape, 0);
+ state->eof_reached = false;
+ state->markpos_block = 0L;
+ state->markpos_offset = 0;
+ state->markpos_eof = false;
+ break;
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ break;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * tuplesort_markpos - saves current position in the merged sort file
+ */
+void
+tuplesort_markpos(Tuplesortstate *state)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+ Assert(state->sortopt & TUPLESORT_RANDOMACCESS);
+
+ switch (state->status)
+ {
+ case TSS_SORTEDINMEM:
+ state->markpos_offset = state->current;
+ state->markpos_eof = state->eof_reached;
+ break;
+ case TSS_SORTEDONTAPE:
+ LogicalTapeTell(state->result_tape,
+ &state->markpos_block,
+ &state->markpos_offset);
+ state->markpos_eof = state->eof_reached;
+ break;
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ break;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * tuplesort_restorepos - restores current position in merged sort file to
+ * last saved position
+ */
+void
+tuplesort_restorepos(Tuplesortstate *state)
+{
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+
+ Assert(state->sortopt & TUPLESORT_RANDOMACCESS);
+
+ switch (state->status)
+ {
+ case TSS_SORTEDINMEM:
+ state->current = state->markpos_offset;
+ state->eof_reached = state->markpos_eof;
+ break;
+ case TSS_SORTEDONTAPE:
+ LogicalTapeSeek(state->result_tape,
+ state->markpos_block,
+ state->markpos_offset);
+ state->eof_reached = state->markpos_eof;
+ break;
+ default:
+ elog(ERROR, "invalid tuplesort state");
+ break;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * tuplesort_get_stats - extract summary statistics
+ *
+ * This can be called after tuplesort_performsort() finishes to obtain
+ * printable summary information about how the sort was performed.
+ */
+void
+tuplesort_get_stats(Tuplesortstate *state,
+ TuplesortInstrumentation *stats)
+{
+ /*
+ * Note: it might seem we should provide both memory and disk usage for a
+ * disk-based sort. However, the current code doesn't track memory space
+ * accurately once we have begun to return tuples to the caller (since we
+ * don't account for pfree's the caller is expected to do), so we cannot
+ * rely on availMem in a disk sort. This does not seem worth the overhead
+ * to fix. Is it worth creating an API for the memory context code to
+ * tell us how much is actually used in sortcontext?
+ */
+ tuplesort_updatemax(state);
+
+ if (state->isMaxSpaceDisk)
+ stats->spaceType = SORT_SPACE_TYPE_DISK;
+ else
+ stats->spaceType = SORT_SPACE_TYPE_MEMORY;
+ stats->spaceUsed = (state->maxSpace + 1023) / 1024;
+
+ switch (state->maxSpaceStatus)
+ {
+ case TSS_SORTEDINMEM:
+ if (state->boundUsed)
+ stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT;
+ else
+ stats->sortMethod = SORT_TYPE_QUICKSORT;
+ break;
+ case TSS_SORTEDONTAPE:
+ stats->sortMethod = SORT_TYPE_EXTERNAL_SORT;
+ break;
+ case TSS_FINALMERGE:
+ stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE;
+ break;
+ default:
+ stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS;
+ break;
+ }
+}
+
+/*
+ * Convert TuplesortMethod to a string.
+ */
+const char *
+tuplesort_method_name(TuplesortMethod m)
+{
+ switch (m)
+ {
+ case SORT_TYPE_STILL_IN_PROGRESS:
+ return "still in progress";
+ case SORT_TYPE_TOP_N_HEAPSORT:
+ return "top-N heapsort";
+ case SORT_TYPE_QUICKSORT:
+ return "quicksort";
+ case SORT_TYPE_EXTERNAL_SORT:
+ return "external sort";
+ case SORT_TYPE_EXTERNAL_MERGE:
+ return "external merge";
+ }
+
+ return "unknown";
+}
+
+/*
+ * Convert TuplesortSpaceType to a string.
+ */
+const char *
+tuplesort_space_type_name(TuplesortSpaceType t)
+{
+ Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY);
+ return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory";
+}
+
+
+/*
+ * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
+ */
+
+/*
+ * Convert the existing unordered array of SortTuples to a bounded heap,
+ * discarding all but the smallest "state->bound" tuples.
+ *
+ * When working with a bounded heap, we want to keep the largest entry
+ * at the root (array entry zero), instead of the smallest as in the normal
+ * sort case. This allows us to discard the largest entry cheaply.
+ * Therefore, we temporarily reverse the sort direction.
+ */
+static void
+make_bounded_heap(Tuplesortstate *state)
+{
+ int tupcount = state->memtupcount;
+ int i;
+
+ Assert(state->status == TSS_INITIAL);
+ Assert(state->bounded);
+ Assert(tupcount >= state->bound);
+ Assert(SERIAL(state));
+
+ /* Reverse sort direction so largest entry will be at root */
+ reversedirection(state);
+
+ state->memtupcount = 0; /* make the heap empty */
+ for (i = 0; i < tupcount; i++)
+ {
+ if (state->memtupcount < state->bound)
+ {
+ /* Insert next tuple into heap */
+ /* Must copy source tuple to avoid possible overwrite */
+ SortTuple stup = state->memtuples[i];
+
+ tuplesort_heap_insert(state, &stup);
+ }
+ else
+ {
+ /*
+ * The heap is full. Replace the largest entry with the new
+ * tuple, or just discard it, if it's larger than anything already
+ * in the heap.
+ */
+ if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
+ {
+ free_sort_tuple(state, &state->memtuples[i]);
+ CHECK_FOR_INTERRUPTS();
+ }
+ else
+ tuplesort_heap_replace_top(state, &state->memtuples[i]);
+ }
+ }
+
+ Assert(state->memtupcount == state->bound);
+ state->status = TSS_BOUNDED;
+}
+
+/*
+ * Convert the bounded heap to a properly-sorted array
+ */
+static void
+sort_bounded_heap(Tuplesortstate *state)
+{
+ int tupcount = state->memtupcount;
+
+ Assert(state->status == TSS_BOUNDED);
+ Assert(state->bounded);
+ Assert(tupcount == state->bound);
+ Assert(SERIAL(state));
+
+ /*
+ * We can unheapify in place because each delete-top call will remove the
+ * largest entry, which we can promptly store in the newly freed slot at
+ * the end. Once we're down to a single-entry heap, we're done.
+ */
+ while (state->memtupcount > 1)
+ {
+ SortTuple stup = state->memtuples[0];
+
+ /* this sifts-up the next-largest entry and decreases memtupcount */
+ tuplesort_heap_delete_top(state);
+ state->memtuples[state->memtupcount] = stup;
+ }
+ state->memtupcount = tupcount;
+
+ /*
+ * Reverse sort direction back to the original state. This is not
+ * actually necessary but seems like a good idea for tidiness.
+ */
+ reversedirection(state);
+
+ state->status = TSS_SORTEDINMEM;
+ state->boundUsed = true;
+}
+
+/*
+ * Sort all memtuples using specialized qsort() routines.
+ *
+ * Quicksort is used for small in-memory sorts, and external sort runs.
+ */
+static void
+tuplesort_sort_memtuples(Tuplesortstate *state)
+{
+ Assert(!LEADER(state));
+
+ if (state->memtupcount > 1)
+ {
+ /*
+ * Do we have the leading column's value or abbreviation in datum1,
+ * and is there a specialization for its comparator?
+ */
+ if (state->haveDatum1 && state->sortKeys)
+ {
+ if (state->sortKeys[0].comparator == ssup_datum_unsigned_cmp)
+ {
+ qsort_tuple_unsigned(state->memtuples,
+ state->memtupcount,
+ state);
+ return;
+ }
+#if SIZEOF_DATUM >= 8
+ else if (state->sortKeys[0].comparator == ssup_datum_signed_cmp)
+ {
+ qsort_tuple_signed(state->memtuples,
+ state->memtupcount,
+ state);
+ return;
+ }
+#endif
+ else if (state->sortKeys[0].comparator == ssup_datum_int32_cmp)
+ {
+ qsort_tuple_int32(state->memtuples,
+ state->memtupcount,
+ state);
+ return;
+ }
+ }
+
+ /* Can we use the single-key sort function? */
+ if (state->onlyKey != NULL)
+ {
+ qsort_ssup(state->memtuples, state->memtupcount,
+ state->onlyKey);
+ }
+ else
+ {
+ qsort_tuple(state->memtuples,
+ state->memtupcount,
+ state->comparetup,
+ state);
+ }
+ }
+}
+
+/*
+ * Insert a new tuple into an empty or existing heap, maintaining the
+ * heap invariant. Caller is responsible for ensuring there's room.
+ *
+ * Note: For some callers, tuple points to a memtuples[] entry above the
+ * end of the heap. This is safe as long as it's not immediately adjacent
+ * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
+ * is, it might get overwritten before being moved into the heap!
+ */
+static void
+tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple)
+{
+ SortTuple *memtuples;
+ int j;
+
+ memtuples = state->memtuples;
+ Assert(state->memtupcount < state->memtupsize);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
+ * using 1-based array indexes, not 0-based.
+ */
+ j = state->memtupcount++;
+ while (j > 0)
+ {
+ int i = (j - 1) >> 1;
+
+ if (COMPARETUP(state, tuple, &memtuples[i]) >= 0)
+ break;
+ memtuples[j] = memtuples[i];
+ j = i;
+ }
+ memtuples[j] = *tuple;
+}
+
+/*
+ * Remove the tuple at state->memtuples[0] from the heap. Decrement
+ * memtupcount, and sift up to maintain the heap invariant.
+ *
+ * The caller has already free'd the tuple the top node points to,
+ * if necessary.
+ */
+static void
+tuplesort_heap_delete_top(Tuplesortstate *state)
+{
+ SortTuple *memtuples = state->memtuples;
+ SortTuple *tuple;
+
+ if (--state->memtupcount <= 0)
+ return;
+
+ /*
+ * Remove the last tuple in the heap, and re-insert it, by replacing the
+ * current top node with it.
+ */
+ tuple = &memtuples[state->memtupcount];
+ tuplesort_heap_replace_top(state, tuple);
+}
+
+/*
+ * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
+ * maintain the heap invariant.
+ *
+ * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
+ * Heapsort, steps H3-H8).
+ */
+static void
+tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple)
+{
+ SortTuple *memtuples = state->memtuples;
+ unsigned int i,
+ n;
+
+ Assert(state->memtupcount >= 1);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * state->memtupcount is "int", but we use "unsigned int" for i, j, n.
+ * This prevents overflow in the "2 * i + 1" calculation, since at the top
+ * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2.
+ */
+ n = state->memtupcount;
+ i = 0; /* i is where the "hole" is */
+ for (;;)
+ {
+ unsigned int j = 2 * i + 1;
+
+ if (j >= n)
+ break;
+ if (j + 1 < n &&
+ COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0)
+ j++;
+ if (COMPARETUP(state, tuple, &memtuples[j]) <= 0)
+ break;
+ memtuples[i] = memtuples[j];
+ i = j;
+ }
+ memtuples[i] = *tuple;
+}
+
+/*
+ * Function to reverse the sort direction from its current state
+ *
+ * It is not safe to call this when performing hash tuplesorts
+ */
+static void
+reversedirection(Tuplesortstate *state)
+{
+ SortSupport sortKey = state->sortKeys;
+ int nkey;
+
+ for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
+ {
+ sortKey->ssup_reverse = !sortKey->ssup_reverse;
+ sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
+ }
+}
+
+
+/*
+ * Tape interface routines
+ */
+
+static unsigned int
+getlen(LogicalTape *tape, bool eofOK)
+{
+ unsigned int len;
+
+ if (LogicalTapeRead(tape,
+ &len, sizeof(len)) != sizeof(len))
+ elog(ERROR, "unexpected end of tape");
+ if (len == 0 && !eofOK)
+ elog(ERROR, "unexpected end of data");
+ return len;
+}
+
+static void
+markrunend(LogicalTape *tape)
+{
+ unsigned int len = 0;
+
+ LogicalTapeWrite(tape, (void *) &len, sizeof(len));
+}
+
+/*
+ * Get memory for tuple from within READTUP() routine.
+ *
+ * We use next free slot from the slab allocator, or palloc() if the tuple
+ * is too large for that.
+ */
+static void *
+readtup_alloc(Tuplesortstate *state, Size tuplen)
+{
+ SlabSlot *buf;
+
+ /*
+ * We pre-allocate enough slots in the slab arena that we should never run
+ * out.
+ */
+ Assert(state->slabFreeHead);
+
+ if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead)
+ return MemoryContextAlloc(state->sortcontext, tuplen);
+ else
+ {
+ buf = state->slabFreeHead;
+ /* Reuse this slot */
+ state->slabFreeHead = buf->nextfree;
+
+ return buf;
+ }
+}
+
+
+/*
+ * Routines specialized for HeapTuple (actually MinimalTuple) case
+ */
+
+static int
+comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
+{
+ SortSupport sortKey = state->sortKeys;
+ HeapTupleData ltup;
+ HeapTupleData rtup;
+ TupleDesc tupDesc;
+ int nkey;
+ int32 compare;
+ AttrNumber attno;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+
+
+ /* Compare the leading sort key */
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ sortKey);
+ if (compare != 0)
+ return compare;
+
+ /* Compare additional sort keys */
+ ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
+ ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET);
+ rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET;
+ rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET);
+ tupDesc = state->tupDesc;
+
+ if (sortKey->abbrev_converter)
+ {
+ attno = sortKey->ssup_attno;
+
+ datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
+ datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+
+ sortKey++;
+ for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++)
+ {
+ attno = sortKey->ssup_attno;
+
+ datum1 = heap_getattr(&ltup, attno, tupDesc, &isnull1);
+ datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2);
+
+ compare = ApplySortComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+
+ return 0;
+}
+
+static void
+copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
+{
+ /*
+ * We expect the passed "tup" to be a TupleTableSlot, and form a
+ * MinimalTuple using the exported interface for that.
+ */
+ TupleTableSlot *slot = (TupleTableSlot *) tup;
+ Datum original;
+ MinimalTuple tuple;
+ HeapTupleData htup;
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
+
+ /* copy the tuple into sort storage */
+ tuple = ExecCopySlotMinimalTuple(slot);
+ stup->tuple = (void *) tuple;
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ /* set up first-column key value */
+ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
+ htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
+ original = heap_getattr(&htup,
+ state->sortKeys[0].ssup_attno,
+ state->tupDesc,
+ &stup->isnull1);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (!state->sortKeys->abbrev_converter || stup->isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup->datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup->datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup->datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ htup.t_len = ((MinimalTuple) mtup->tuple)->t_len +
+ MINIMAL_TUPLE_OFFSET;
+ htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple -
+ MINIMAL_TUPLE_OFFSET);
+
+ mtup->datum1 = heap_getattr(&htup,
+ state->sortKeys[0].ssup_attno,
+ state->tupDesc,
+ &mtup->isnull1);
+ }
+ }
+}
+
+static void
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+ MinimalTuple tuple = (MinimalTuple) stup->tuple;
+
+ /* the part of the MinimalTuple we'll write: */
+ char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
+ unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
+
+ /* total on-disk footprint: */
+ unsigned int tuplen = tupbodylen + sizeof(int);
+
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_free_minimal_tuple(tuple);
+ }
+}
+
+static void
+readtup_heap(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len)
+{
+ unsigned int tupbodylen = len - sizeof(int);
+ unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
+ MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen);
+ char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
+ HeapTupleData htup;
+
+ /* read in the tuple proper */
+ tuple->t_len = tuplen;
+ LogicalTapeReadExact(tape, tupbody, tupbodylen);
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+ stup->tuple = (void *) tuple;
+ /* set up first-column key value */
+ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
+ htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET);
+ stup->datum1 = heap_getattr(&htup,
+ state->sortKeys[0].ssup_attno,
+ state->tupDesc,
+ &stup->isnull1);
+}
+
+/*
+ * Routines specialized for the CLUSTER case (HeapTuple data, with
+ * comparisons per a btree index definition)
+ */
+
+static int
+comparetup_cluster(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state)
+{
+ SortSupport sortKey = state->sortKeys;
+ HeapTuple ltup;
+ HeapTuple rtup;
+ TupleDesc tupDesc;
+ int nkey;
+ int32 compare;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+
+ /* Be prepared to compare additional sort keys */
+ ltup = (HeapTuple) a->tuple;
+ rtup = (HeapTuple) b->tuple;
+ tupDesc = state->tupDesc;
+
+ /* Compare the leading sort key, if it's simple */
+ if (state->haveDatum1)
+ {
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ sortKey);
+ if (compare != 0)
+ return compare;
+
+ if (sortKey->abbrev_converter)
+ {
+ AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0];
+
+ datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1);
+ datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ }
+ if (compare != 0 || state->nKeys == 1)
+ return compare;
+ /* Compare additional columns the hard way */
+ sortKey++;
+ nkey = 1;
+ }
+ else
+ {
+ /* Must compare all keys the hard way */
+ nkey = 0;
+ }
+
+ if (state->indexInfo->ii_Expressions == NULL)
+ {
+ /* If not expression index, just compare the proper heap attrs */
+
+ for (; nkey < state->nKeys; nkey++, sortKey++)
+ {
+ AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey];
+
+ datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
+ datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
+
+ compare = ApplySortComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+ }
+ else
+ {
+ /*
+ * In the expression index case, compute the whole index tuple and
+ * then compare values. It would perhaps be faster to compute only as
+ * many columns as we need to compare, but that would require
+ * duplicating all the logic in FormIndexDatum.
+ */
+ Datum l_index_values[INDEX_MAX_KEYS];
+ bool l_index_isnull[INDEX_MAX_KEYS];
+ Datum r_index_values[INDEX_MAX_KEYS];
+ bool r_index_isnull[INDEX_MAX_KEYS];
+ TupleTableSlot *ecxt_scantuple;
+
+ /* Reset context each time to prevent memory leakage */
+ ResetPerTupleExprContext(state->estate);
+
+ ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple;
+
+ ExecStoreHeapTuple(ltup, ecxt_scantuple, false);
+ FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
+ l_index_values, l_index_isnull);
+
+ ExecStoreHeapTuple(rtup, ecxt_scantuple, false);
+ FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
+ r_index_values, r_index_isnull);
+
+ for (; nkey < state->nKeys; nkey++, sortKey++)
+ {
+ compare = ApplySortComparator(l_index_values[nkey],
+ l_index_isnull[nkey],
+ r_index_values[nkey],
+ r_index_isnull[nkey],
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+ }
+
+ return 0;
+}
+
+static void
+copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
+{
+ HeapTuple tuple = (HeapTuple) tup;
+ Datum original;
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
+
+ /* copy the tuple into sort storage */
+ tuple = heap_copytuple(tuple);
+ stup->tuple = (void *) tuple;
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * set up first-column key value, and potentially abbreviate, if it's a
+ * simple column
+ */
+ if (!state->haveDatum1)
+ return;
+
+ original = heap_getattr(tuple,
+ state->indexInfo->ii_IndexAttrNumbers[0],
+ state->tupDesc,
+ &stup->isnull1);
+
+ if (!state->sortKeys->abbrev_converter || stup->isnull1)
+ {
+ /*
+ * Store ordinary Datum representation, or NULL value. If there is a
+ * converter it won't expect NULL values, and cost model is not
+ * required to account for NULL, so in that case we avoid calling
+ * converter and just set datum1 to zeroed representation (to be
+ * consistent, and to support cheap inequality tests for NULL
+ * abbreviated keys).
+ */
+ stup->datum1 = original;
+ }
+ else if (!consider_abort_common(state))
+ {
+ /* Store abbreviated key representation */
+ stup->datum1 = state->sortKeys->abbrev_converter(original,
+ state->sortKeys);
+ }
+ else
+ {
+ /* Abort abbreviation */
+ int i;
+
+ stup->datum1 = original;
+
+ /*
+ * Set state to be consistent with never trying abbreviation.
+ *
+ * Alter datum1 representation in already-copied tuples, so as to
+ * ensure a consistent representation (current tuple was just
+ * handled). It does not matter if some dumped tuples are already
+ * sorted on tape, since serialized tuples lack abbreviated keys
+ * (TSS_BUILDRUNS state prevents control reaching here in any case).
+ */
+ for (i = 0; i < state->memtupcount; i++)
+ {
+ SortTuple *mtup = &state->memtuples[i];
+
+ tuple = (HeapTuple) mtup->tuple;
+ mtup->datum1 = heap_getattr(tuple,
+ state->indexInfo->ii_IndexAttrNumbers[0],
+ state->tupDesc,
+ &mtup->isnull1);
+ }
+ }
+}
+
+static void
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+ HeapTuple tuple = (HeapTuple) stup->tuple;
+ unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
+
+ /* We need to store t_self, but not other fields of HeapTupleData */
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+ LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_freetuple(tuple);
+ }
+}
+
+static void
+readtup_cluster(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int tuplen)
+{
+ unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
+ HeapTuple tuple = (HeapTuple) readtup_alloc(state,
+ t_len + HEAPTUPLESIZE);
+
+ /* Reconstruct the HeapTupleData header */
+ tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
+ tuple->t_len = t_len;
+ LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
+ /* We don't currently bother to reconstruct t_tableOid */
+ tuple->t_tableOid = InvalidOid;
+ /* Read in the tuple body */
+ LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+ stup->tuple = (void *) tuple;
+ /* set up first-column key value, if it's a simple column */
+ if (state->haveDatum1)
+ stup->datum1 = heap_getattr(tuple,
+ state->indexInfo->ii_IndexAttrNumbers[0],
+ state->tupDesc,
+ &stup->isnull1);
+}
+
+/*
+ * Routines specialized for IndexTuple case
+ *
+ * The btree and hash cases require separate comparison functions, but the
+ * IndexTuple representation is the same so the copy/write/read support
+ * functions can be shared.
+ */
+
+static int
+comparetup_index_btree(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state)
+{
+ /*
+ * This is similar to comparetup_heap(), but expects index tuples. There
+ * is also special handling for enforcing uniqueness, and special
+ * treatment for equal keys at the end.
+ */
+ SortSupport sortKey = state->sortKeys;
+ IndexTuple tuple1;
+ IndexTuple tuple2;
+ int keysz;
+ TupleDesc tupDes;
+ bool equal_hasnull = false;
+ int nkey;
+ int32 compare;
+ Datum datum1,
+ datum2;
+ bool isnull1,
+ isnull2;
+
+
+ /* Compare the leading sort key */
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ sortKey);
+ if (compare != 0)
+ return compare;
+
+ /* Compare additional sort keys */
+ tuple1 = (IndexTuple) a->tuple;
+ tuple2 = (IndexTuple) b->tuple;
+ keysz = state->nKeys;
+ tupDes = RelationGetDescr(state->indexRel);
+
+ if (sortKey->abbrev_converter)
+ {
+ datum1 = index_getattr(tuple1, 1, tupDes, &isnull1);
+ datum2 = index_getattr(tuple2, 1, tupDes, &isnull2);
+
+ compare = ApplySortAbbrevFullComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare;
+ }
+
+ /* they are equal, so we only need to examine one null flag */
+ if (a->isnull1)
+ equal_hasnull = true;
+
+ sortKey++;
+ for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
+ {
+ datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
+ datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
+
+ compare = ApplySortComparator(datum1, isnull1,
+ datum2, isnull2,
+ sortKey);
+ if (compare != 0)
+ return compare; /* done when we find unequal attributes */
+
+ /* they are equal, so we only need to examine one null flag */
+ if (isnull1)
+ equal_hasnull = true;
+ }
+
+ /*
+ * If btree has asked us to enforce uniqueness, complain if two equal
+ * tuples are detected (unless there was at least one NULL field and NULLS
+ * NOT DISTINCT was not set).
+ *
+ * It is sufficient to make the test here, because if two tuples are equal
+ * they *must* get compared at some stage of the sort --- otherwise the
+ * sort algorithm wouldn't have checked whether one must appear before the
+ * other.
+ */
+ if (state->enforceUnique && !(!state->uniqueNullsNotDistinct && equal_hasnull))
+ {
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ char *key_desc;
+
+ /*
+ * Some rather brain-dead implementations of qsort (such as the one in
+ * QNX 4) will sometimes call the comparison routine to compare a
+ * value to itself, but we always use our own implementation, which
+ * does not.
+ */
+ Assert(tuple1 != tuple2);
+
+ index_deform_tuple(tuple1, tupDes, values, isnull);
+
+ key_desc = BuildIndexValueDescription(state->indexRel, values, isnull);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNIQUE_VIOLATION),
+ errmsg("could not create unique index \"%s\"",
+ RelationGetRelationName(state->indexRel)),
+ key_desc ? errdetail("Key %s is duplicated.", key_desc) :
+ errdetail("Duplicate keys exist."),
+ errtableconstraint(state->heapRel,
+ RelationGetRelationName(state->indexRel))));
+ }
+
+ /*
+ * If key values are equal, we sort on ItemPointer. This is required for
+ * btree indexes, since heap TID is treated as an implicit last key
+ * attribute in order to ensure that all keys in the index are physically
+ * unique.
+ */
+ {
+ BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
+ BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
+
+ if (blk1 != blk2)
+ return (blk1 < blk2) ? -1 : 1;
+ }
+ {
+ OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
+ OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
+
+ if (pos1 != pos2)
+ return (pos1 < pos2) ? -1 : 1;
+ }
+
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
+ return 0;
+}
+
+static int
+comparetup_index_hash(const SortTuple *a, const SortTuple *b,
+ Tuplesortstate *state)
+{
+ Bucket bucket1;
+ Bucket bucket2;
+ IndexTuple tuple1;
+ IndexTuple tuple2;
+
+ /*
+ * Fetch hash keys and mask off bits we don't want to sort by. We know
+ * that the first column of the index tuple is the hash key.
+ */
+ Assert(!a->isnull1);
+ bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1),
+ state->max_buckets, state->high_mask,
+ state->low_mask);
+ Assert(!b->isnull1);
+ bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1),
+ state->max_buckets, state->high_mask,
+ state->low_mask);
+ if (bucket1 > bucket2)
+ return 1;
+ else if (bucket1 < bucket2)
+ return -1;
+
+ /*
+ * If hash values are equal, we sort on ItemPointer. This does not affect
+ * validity of the finished index, but it may be useful to have index
+ * scans in physical order.
+ */
+ tuple1 = (IndexTuple) a->tuple;
+ tuple2 = (IndexTuple) b->tuple;
+
+ {
+ BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid);
+ BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid);
+
+ if (blk1 != blk2)
+ return (blk1 < blk2) ? -1 : 1;
+ }
+ {
+ OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid);
+ OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid);
+
+ if (pos1 != pos2)
+ return (pos1 < pos2) ? -1 : 1;
+ }
+
+ /* ItemPointer values should never be equal */
+ Assert(false);
+
+ return 0;
+}
+
+static void
+copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
+{
+ /* Not currently needed */
+ elog(ERROR, "copytup_index() should not be called");
+}
+
+static void
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+ IndexTuple tuple = (IndexTuple) stup->tuple;
+ unsigned int tuplen;
+
+ tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+ LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+
+ if (!state->slabAllocatorUsed)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ pfree(tuple);
+ }
+}
+
+static void
+readtup_index(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len)
+{
+ unsigned int tuplen = len - sizeof(unsigned int);
+ IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen);
+
+ LogicalTapeReadExact(tape, tuple, tuplen);
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+ stup->tuple = (void *) tuple;
+ /* set up first-column key value */
+ stup->datum1 = index_getattr(tuple,
+ 1,
+ RelationGetDescr(state->indexRel),
+ &stup->isnull1);
+}
+
+/*
+ * Routines specialized for DatumTuple case
+ */
+
+static int
+comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state)
+{
+ int compare;
+
+ compare = ApplySortComparator(a->datum1, a->isnull1,
+ b->datum1, b->isnull1,
+ state->sortKeys);
+ if (compare != 0)
+ return compare;
+
+ /* if we have abbreviations, then "tuple" has the original value */
+
+ if (state->sortKeys->abbrev_converter)
+ compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1,
+ PointerGetDatum(b->tuple), b->isnull1,
+ state->sortKeys);
+
+ return compare;
+}
+
+static void
+copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
+{
+ /* Not currently needed */
+ elog(ERROR, "copytup_datum() should not be called");
+}
+
+static void
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+ void *waddr;
+ unsigned int tuplen;
+ unsigned int writtenlen;
+
+ if (stup->isnull1)
+ {
+ waddr = NULL;
+ tuplen = 0;
+ }
+ else if (!state->tuples)
+ {
+ waddr = &stup->datum1;
+ tuplen = sizeof(Datum);
+ }
+ else
+ {
+ waddr = stup->tuple;
+ tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen);
+ Assert(tuplen != 0);
+ }
+
+ writtenlen = tuplen + sizeof(unsigned int);
+
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+ LogicalTapeWrite(tape, waddr, tuplen);
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+
+ if (!state->slabAllocatorUsed && stup->tuple)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
+ pfree(stup->tuple);
+ }
+}
+
+static void
+readtup_datum(Tuplesortstate *state, SortTuple *stup,
+ LogicalTape *tape, unsigned int len)
+{
+ unsigned int tuplen = len - sizeof(unsigned int);
+
+ if (tuplen == 0)
+ {
+ /* it's NULL */
+ stup->datum1 = (Datum) 0;
+ stup->isnull1 = true;
+ stup->tuple = NULL;
+ }
+ else if (!state->tuples)
+ {
+ Assert(tuplen == sizeof(Datum));
+ LogicalTapeReadExact(tape, &stup->datum1, tuplen);
+ stup->isnull1 = false;
+ stup->tuple = NULL;
+ }
+ else
+ {
+ void *raddr = readtup_alloc(state, tuplen);
+
+ LogicalTapeReadExact(tape, raddr, tuplen);
+ stup->datum1 = PointerGetDatum(raddr);
+ stup->isnull1 = false;
+ stup->tuple = raddr;
+ }
+
+ if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length
+ * word? */
+ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+}
+
+/*
+ * Parallel sort routines
+ */
+
+/*
+ * tuplesort_estimate_shared - estimate required shared memory allocation
+ *
+ * nWorkers is an estimate of the number of workers (it's the number that
+ * will be requested).
+ */
+Size
+tuplesort_estimate_shared(int nWorkers)
+{
+ Size tapesSize;
+
+ Assert(nWorkers > 0);
+
+ /* Make sure that BufFile shared state is MAXALIGN'd */
+ tapesSize = mul_size(sizeof(TapeShare), nWorkers);
+ tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes)));
+
+ return tapesSize;
+}
+
+/*
+ * tuplesort_initialize_shared - initialize shared tuplesort state
+ *
+ * Must be called from leader process before workers are launched, to
+ * establish state needed up-front for worker tuplesortstates. nWorkers
+ * should match the argument passed to tuplesort_estimate_shared().
+ */
+void
+tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg)
+{
+ int i;
+
+ Assert(nWorkers > 0);
+
+ SpinLockInit(&shared->mutex);
+ shared->currentWorker = 0;
+ shared->workersFinished = 0;
+ SharedFileSetInit(&shared->fileset, seg);
+ shared->nTapes = nWorkers;
+ for (i = 0; i < nWorkers; i++)
+ {
+ shared->tapes[i].firstblocknumber = 0L;
+ }
+}
+
+/*
+ * tuplesort_attach_shared - attach to shared tuplesort state
+ *
+ * Must be called by all worker processes.
+ */
+void
+tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg)
+{
+ /* Attach to SharedFileSet */
+ SharedFileSetAttach(&shared->fileset, seg);
+}
+
+/*
+ * worker_get_identifier - Assign and return ordinal identifier for worker
+ *
+ * The order in which these are assigned is not well defined, and should not
+ * matter; worker numbers across parallel sort participants need only be
+ * distinct and gapless. logtape.c requires this.
+ *
+ * Note that the identifiers assigned from here have no relation to
+ * ParallelWorkerNumber number, to avoid making any assumption about
+ * caller's requirements. However, we do follow the ParallelWorkerNumber
+ * convention of representing a non-worker with worker number -1. This
+ * includes the leader, as well as serial Tuplesort processes.
+ */
+static int
+worker_get_identifier(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int worker;
+
+ Assert(WORKER(state));
+
+ SpinLockAcquire(&shared->mutex);
+ worker = shared->currentWorker++;
+ SpinLockRelease(&shared->mutex);
+
+ return worker;
+}
+
+/*
+ * worker_freeze_result_tape - freeze worker's result tape for leader
+ *
+ * This is called by workers just after the result tape has been determined,
+ * instead of calling LogicalTapeFreeze() directly. They do so because
+ * workers require a few additional steps over similar serial
+ * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra
+ * steps are around freeing now unneeded resources, and representing to
+ * leader that worker's input run is available for its merge.
+ *
+ * There should only be one final output run for each worker, which consists
+ * of all tuples that were originally input into worker.
+ */
+static void
+worker_freeze_result_tape(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ TapeShare output;
+
+ Assert(WORKER(state));
+ Assert(state->result_tape != NULL);
+ Assert(state->memtupcount == 0);
+
+ /*
+ * Free most remaining memory, in case caller is sensitive to our holding
+ * on to it. memtuples may not be a tiny merge heap at this point.
+ */
+ pfree(state->memtuples);
+ /* Be tidy */
+ state->memtuples = NULL;
+ state->memtupsize = 0;
+
+ /*
+ * Parallel worker requires result tape metadata, which is to be stored in
+ * shared memory for leader
+ */
+ LogicalTapeFreeze(state->result_tape, &output);
+
+ /* Store properties of output tape, and update finished worker count */
+ SpinLockAcquire(&shared->mutex);
+ shared->tapes[state->worker] = output;
+ shared->workersFinished++;
+ SpinLockRelease(&shared->mutex);
+}
+
+/*
+ * worker_nomergeruns - dump memtuples in worker, without merging
+ *
+ * This called as an alternative to mergeruns() with a worker when no
+ * merging is required.
+ */
+static void
+worker_nomergeruns(Tuplesortstate *state)
+{
+ Assert(WORKER(state));
+ Assert(state->result_tape == NULL);
+ Assert(state->nOutputRuns == 1);
+
+ state->result_tape = state->destTape;
+ worker_freeze_result_tape(state);
+}
+
+/*
+ * leader_takeover_tapes - create tapeset for leader from worker tapes
+ *
+ * So far, leader Tuplesortstate has performed no actual sorting. By now, all
+ * sorting has occurred in workers, all of which must have already returned
+ * from tuplesort_performsort().
+ *
+ * When this returns, leader process is left in a state that is virtually
+ * indistinguishable from it having generated runs as a serial external sort
+ * might have.
+ */
+static void
+leader_takeover_tapes(Tuplesortstate *state)
+{
+ Sharedsort *shared = state->shared;
+ int nParticipants = state->nParticipants;
+ int workersFinished;
+ int j;
+
+ Assert(LEADER(state));
+ Assert(nParticipants >= 1);
+
+ SpinLockAcquire(&shared->mutex);
+ workersFinished = shared->workersFinished;
+ SpinLockRelease(&shared->mutex);
+
+ if (nParticipants != workersFinished)
+ elog(ERROR, "cannot take over tapes before all workers finish");
+
+ /*
+ * Create the tapeset from worker tapes, including a leader-owned tape at
+ * the end. Parallel workers are far more expensive than logical tapes,
+ * so the number of tapes allocated here should never be excessive.
+ */
+ inittapestate(state, nParticipants);
+ state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
+
+ /*
+ * Set currentRun to reflect the number of runs we will merge (it's not
+ * used for anything, this is just pro forma)
+ */
+ state->currentRun = nParticipants;
+
+ /*
+ * Initialize the state to look the same as after building the initial
+ * runs.
+ *
+ * There will always be exactly 1 run per worker, and exactly one input
+ * tape per run, because workers always output exactly 1 run, even when
+ * there were no input tuples for workers to sort.
+ */
+ state->inputTapes = NULL;
+ state->nInputTapes = 0;
+ state->nInputRuns = 0;
+
+ state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
+ state->nOutputTapes = nParticipants;
+ state->nOutputRuns = nParticipants;
+
+ for (j = 0; j < nParticipants; j++)
+ {
+ state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
+ }
+
+ state->status = TSS_BUILDRUNS;
+}
+
+/*
+ * Convenience routine to free a tuple previously loaded into sort memory
+ */
+static void
+free_sort_tuple(Tuplesortstate *state, SortTuple *stup)
+{
+ if (stup->tuple)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(stup->tuple));
+ pfree(stup->tuple);
+ stup->tuple = NULL;
+ }
+}
+
+int
+ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup)
+{
+ if (x < y)
+ return -1;
+ else if (x > y)
+ return 1;
+ else
+ return 0;
+}
+
+#if SIZEOF_DATUM >= 8
+int
+ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup)
+{
+ int64 xx = DatumGetInt64(x);
+ int64 yy = DatumGetInt64(y);
+
+ if (xx < yy)
+ return -1;
+ else if (xx > yy)
+ return 1;
+ else
+ return 0;
+}
+#endif
+
+int
+ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup)
+{
+ int32 xx = DatumGetInt32(x);
+ int32 yy = DatumGetInt32(y);
+
+ if (xx < yy)
+ return -1;
+ else if (xx > yy)
+ return 1;
+ else
+ return 0;
+}
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
new file mode 100644
index 0000000..f605ece
--- /dev/null
+++ b/src/backend/utils/sort/tuplestore.c
@@ -0,0 +1,1552 @@
+/*-------------------------------------------------------------------------
+ *
+ * tuplestore.c
+ * Generalized routines for temporary tuple storage.
+ *
+ * This module handles temporary storage of tuples for purposes such
+ * as Materialize nodes, hashjoin batch files, etc. It is essentially
+ * a dumbed-down version of tuplesort.c; it does no sorting of tuples
+ * but can only store and regurgitate a sequence of tuples. However,
+ * because no sort is required, it is allowed to start reading the sequence
+ * before it has all been written. This is particularly useful for cursors,
+ * because it allows random access within the already-scanned portion of
+ * a query without having to process the underlying scan to completion.
+ * Also, it is possible to support multiple independent read pointers.
+ *
+ * A temporary file is used to handle the data if it exceeds the
+ * space limit specified by the caller.
+ *
+ * The (approximate) amount of memory allowed to the tuplestore is specified
+ * in kilobytes by the caller. We absorb tuples and simply store them in an
+ * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
+ * maxKBytes, we dump all the tuples into a temp file and then read from that
+ * when needed.
+ *
+ * Upon creation, a tuplestore supports a single read pointer, numbered 0.
+ * Additional read pointers can be created using tuplestore_alloc_read_pointer.
+ * Mark/restore behavior is supported by copying read pointers.
+ *
+ * When the caller requests backward-scan capability, we write the temp file
+ * in a format that allows either forward or backward scan. Otherwise, only
+ * forward scan is allowed. A request for backward scan must be made before
+ * putting any tuples into the tuplestore. Rewind is normally allowed but
+ * can be turned off via tuplestore_set_eflags; turning off rewind for all
+ * read pointers enables truncation of the tuplestore at the oldest read point
+ * for minimal memory usage. (The caller must explicitly call tuplestore_trim
+ * at appropriate times for truncation to actually happen.)
+ *
+ * Note: in TSS_WRITEFILE state, the temp file's seek position is the
+ * current write position, and the write-position variables in the tuplestore
+ * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
+ * seek position is the active read pointer's position, and that read pointer
+ * isn't kept up to date. We update the appropriate variables using ftell()
+ * before switching to the other state or activating a different read pointer.
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/sort/tuplestore.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <limits.h>
+
+#include "access/htup_details.h"
+#include "commands/tablespace.h"
+#include "executor/executor.h"
+#include "miscadmin.h"
+#include "storage/buffile.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+
+/*
+ * Possible states of a Tuplestore object. These denote the states that
+ * persist between calls of Tuplestore routines.
+ */
+typedef enum
+{
+ TSS_INMEM, /* Tuples still fit in memory */
+ TSS_WRITEFILE, /* Writing to temp file */
+ TSS_READFILE /* Reading from temp file */
+} TupStoreStatus;
+
+/*
+ * State for a single read pointer. If we are in state INMEM then all the
+ * read pointers' "current" fields denote the read positions. In state
+ * WRITEFILE, the file/offset fields denote the read positions. In state
+ * READFILE, inactive read pointers have valid file/offset, but the active
+ * read pointer implicitly has position equal to the temp file's seek position.
+ *
+ * Special case: if eof_reached is true, then the pointer's read position is
+ * implicitly equal to the write position, and current/file/offset aren't
+ * maintained. This way we need not update all the read pointers each time
+ * we write.
+ */
+typedef struct
+{
+ int eflags; /* capability flags */
+ bool eof_reached; /* read has reached EOF */
+ int current; /* next array index to read */
+ int file; /* temp file# */
+ off_t offset; /* byte offset in file */
+} TSReadPointer;
+
+/*
+ * Private state of a Tuplestore operation.
+ */
+struct Tuplestorestate
+{
+ TupStoreStatus status; /* enumerated value as shown above */
+ int eflags; /* capability flags (OR of pointers' flags) */
+ bool backward; /* store extra length words in file? */
+ bool interXact; /* keep open through transactions? */
+ bool truncated; /* tuplestore_trim has removed tuples? */
+ int64 availMem; /* remaining memory available, in bytes */
+ int64 allowedMem; /* total memory allowed, in bytes */
+ int64 tuples; /* number of tuples added */
+ BufFile *myfile; /* underlying file, or NULL if none */
+ MemoryContext context; /* memory context for holding tuples */
+ ResourceOwner resowner; /* resowner for holding temp files */
+
+ /*
+ * These function pointers decouple the routines that must know what kind
+ * of tuple we are handling from the routines that don't need to know it.
+ * They are set up by the tuplestore_begin_xxx routines.
+ *
+ * (Although tuplestore.c currently only supports heap tuples, I've copied
+ * this part of tuplesort.c so that extension to other kinds of objects
+ * will be easy if it's ever needed.)
+ *
+ * Function to copy a supplied input tuple into palloc'd space. (NB: we
+ * assume that a single pfree() is enough to release the tuple later, so
+ * the representation must be "flat" in one palloc chunk.) state->availMem
+ * must be decreased by the amount of space used.
+ */
+ void *(*copytup) (Tuplestorestate *state, void *tup);
+
+ /*
+ * Function to write a stored tuple onto tape. The representation of the
+ * tuple on tape need not be the same as it is in memory; requirements on
+ * the tape representation are given below. After writing the tuple,
+ * pfree() it, and increase state->availMem by the amount of memory space
+ * thereby released.
+ */
+ void (*writetup) (Tuplestorestate *state, void *tup);
+
+ /*
+ * Function to read a stored tuple from tape back into memory. 'len' is
+ * the already-read length of the stored tuple. Create and return a
+ * palloc'd copy, and decrease state->availMem by the amount of memory
+ * space consumed.
+ */
+ void *(*readtup) (Tuplestorestate *state, unsigned int len);
+
+ /*
+ * This array holds pointers to tuples in memory if we are in state INMEM.
+ * In states WRITEFILE and READFILE it's not used.
+ *
+ * When memtupdeleted > 0, the first memtupdeleted pointers are already
+ * released due to a tuplestore_trim() operation, but we haven't expended
+ * the effort to slide the remaining pointers down. These unused pointers
+ * are set to NULL to catch any invalid accesses. Note that memtupcount
+ * includes the deleted pointers.
+ */
+ void **memtuples; /* array of pointers to palloc'd tuples */
+ int memtupdeleted; /* the first N slots are currently unused */
+ int memtupcount; /* number of tuples currently present */
+ int memtupsize; /* allocated length of memtuples array */
+ bool growmemtuples; /* memtuples' growth still underway? */
+
+ /*
+ * These variables are used to keep track of the current positions.
+ *
+ * In state WRITEFILE, the current file seek position is the write point;
+ * in state READFILE, the write position is remembered in writepos_xxx.
+ * (The write position is the same as EOF, but since BufFileSeek doesn't
+ * currently implement SEEK_END, we have to remember it explicitly.)
+ */
+ TSReadPointer *readptrs; /* array of read pointers */
+ int activeptr; /* index of the active read pointer */
+ int readptrcount; /* number of pointers currently valid */
+ int readptrsize; /* allocated length of readptrs array */
+
+ int writepos_file; /* file# (valid if READFILE state) */
+ off_t writepos_offset; /* offset (valid if READFILE state) */
+};
+
+#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
+#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
+#define READTUP(state,len) ((*(state)->readtup) (state, len))
+#define LACKMEM(state) ((state)->availMem < 0)
+#define USEMEM(state,amt) ((state)->availMem -= (amt))
+#define FREEMEM(state,amt) ((state)->availMem += (amt))
+
+/*--------------------
+ *
+ * NOTES about on-tape representation of tuples:
+ *
+ * We require the first "unsigned int" of a stored tuple to be the total size
+ * on-tape of the tuple, including itself (so it is never zero).
+ * The remainder of the stored tuple
+ * may or may not match the in-memory representation of the tuple ---
+ * any conversion needed is the job of the writetup and readtup routines.
+ *
+ * If state->backward is true, then the stored representation of
+ * the tuple must be followed by another "unsigned int" that is a copy of the
+ * length --- so the total tape space used is actually sizeof(unsigned int)
+ * more than the stored length value. This allows read-backwards. When
+ * state->backward is not set, the write/read routines may omit the extra
+ * length word.
+ *
+ * writetup is expected to write both length words as well as the tuple
+ * data. When readtup is called, the tape is positioned just after the
+ * front length word; readtup must read the tuple data and advance past
+ * the back length word (if present).
+ *
+ * The write/read routines can make use of the tuple description data
+ * stored in the Tuplestorestate record, if needed. They are also expected
+ * to adjust state->availMem by the amount of memory space (not tape space!)
+ * released or consumed. There is no error return from either writetup
+ * or readtup; they should ereport() on failure.
+ *
+ *
+ * NOTES about memory consumption calculations:
+ *
+ * We count space allocated for tuples against the maxKBytes limit,
+ * plus the space used by the variable-size array memtuples.
+ * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
+ * We don't worry about the size of the read pointer array, either.
+ *
+ * Note that we count actual space used (as shown by GetMemoryChunkSpace)
+ * rather than the originally-requested size. This is important since
+ * palloc can add substantial overhead. It's not a complete answer since
+ * we won't count any wasted space in palloc allocation blocks, but it's
+ * a lot better than what we were doing before 7.3.
+ *
+ *--------------------
+ */
+
+
+static Tuplestorestate *tuplestore_begin_common(int eflags,
+ bool interXact,
+ int maxKBytes);
+static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
+static void dumptuples(Tuplestorestate *state);
+static unsigned int getlen(Tuplestorestate *state, bool eofOK);
+static void *copytup_heap(Tuplestorestate *state, void *tup);
+static void writetup_heap(Tuplestorestate *state, void *tup);
+static void *readtup_heap(Tuplestorestate *state, unsigned int len);
+
+
+/*
+ * tuplestore_begin_xxx
+ *
+ * Initialize for a tuple store operation.
+ */
+static Tuplestorestate *
+tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
+{
+ Tuplestorestate *state;
+
+ state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
+
+ state->status = TSS_INMEM;
+ state->eflags = eflags;
+ state->interXact = interXact;
+ state->truncated = false;
+ state->allowedMem = maxKBytes * 1024L;
+ state->availMem = state->allowedMem;
+ state->myfile = NULL;
+ state->context = CurrentMemoryContext;
+ state->resowner = CurrentResourceOwner;
+
+ state->memtupdeleted = 0;
+ state->memtupcount = 0;
+ state->tuples = 0;
+
+ /*
+ * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
+ * see comments in grow_memtuples().
+ */
+ state->memtupsize = Max(16384 / sizeof(void *),
+ ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
+
+ state->growmemtuples = true;
+ state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
+
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+
+ state->activeptr = 0;
+ state->readptrcount = 1;
+ state->readptrsize = 8; /* arbitrary */
+ state->readptrs = (TSReadPointer *)
+ palloc(state->readptrsize * sizeof(TSReadPointer));
+
+ state->readptrs[0].eflags = eflags;
+ state->readptrs[0].eof_reached = false;
+ state->readptrs[0].current = 0;
+
+ return state;
+}
+
+/*
+ * tuplestore_begin_heap
+ *
+ * Create a new tuplestore; other types of tuple stores (other than
+ * "heap" tuple stores, for heap tuples) are possible, but not presently
+ * implemented.
+ *
+ * randomAccess: if true, both forward and backward accesses to the
+ * tuple store are allowed.
+ *
+ * interXact: if true, the files used for on-disk storage persist beyond the
+ * end of the current transaction. NOTE: It's the caller's responsibility to
+ * create such a tuplestore in a memory context and resource owner that will
+ * also survive transaction boundaries, and to ensure the tuplestore is closed
+ * when it's no longer wanted.
+ *
+ * maxKBytes: how much data to store in memory (any data beyond this
+ * amount is paged to disk). When in doubt, use work_mem.
+ */
+Tuplestorestate *
+tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
+{
+ Tuplestorestate *state;
+ int eflags;
+
+ /*
+ * This interpretation of the meaning of randomAccess is compatible with
+ * the pre-8.3 behavior of tuplestores.
+ */
+ eflags = randomAccess ?
+ (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
+ (EXEC_FLAG_REWIND);
+
+ state = tuplestore_begin_common(eflags, interXact, maxKBytes);
+
+ state->copytup = copytup_heap;
+ state->writetup = writetup_heap;
+ state->readtup = readtup_heap;
+
+ return state;
+}
+
+/*
+ * tuplestore_set_eflags
+ *
+ * Set the capability flags for read pointer 0 at a finer grain than is
+ * allowed by tuplestore_begin_xxx. This must be called before inserting
+ * any data into the tuplestore.
+ *
+ * eflags is a bitmask following the meanings used for executor node
+ * startup flags (see executor.h). tuplestore pays attention to these bits:
+ * EXEC_FLAG_REWIND need rewind to start
+ * EXEC_FLAG_BACKWARD need backward fetch
+ * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
+ * is set per "randomAccess" in the tuplestore_begin_xxx call.
+ *
+ * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
+ * but not further than the truncation point (the furthest-back read pointer
+ * position at the time of the last tuplestore_trim call).
+ */
+void
+tuplestore_set_eflags(Tuplestorestate *state, int eflags)
+{
+ int i;
+
+ if (state->status != TSS_INMEM || state->memtupcount != 0)
+ elog(ERROR, "too late to call tuplestore_set_eflags");
+
+ state->readptrs[0].eflags = eflags;
+ for (i = 1; i < state->readptrcount; i++)
+ eflags |= state->readptrs[i].eflags;
+ state->eflags = eflags;
+}
+
+/*
+ * tuplestore_alloc_read_pointer - allocate another read pointer.
+ *
+ * Returns the pointer's index.
+ *
+ * The new pointer initially copies the position of read pointer 0.
+ * It can have its own eflags, but if any data has been inserted into
+ * the tuplestore, these eflags must not represent an increase in
+ * requirements.
+ */
+int
+tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
+{
+ /* Check for possible increase of requirements */
+ if (state->status != TSS_INMEM || state->memtupcount != 0)
+ {
+ if ((state->eflags | eflags) != state->eflags)
+ elog(ERROR, "too late to require new tuplestore eflags");
+ }
+
+ /* Make room for another read pointer if needed */
+ if (state->readptrcount >= state->readptrsize)
+ {
+ int newcnt = state->readptrsize * 2;
+
+ state->readptrs = (TSReadPointer *)
+ repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
+ state->readptrsize = newcnt;
+ }
+
+ /* And set it up */
+ state->readptrs[state->readptrcount] = state->readptrs[0];
+ state->readptrs[state->readptrcount].eflags = eflags;
+
+ state->eflags |= eflags;
+
+ return state->readptrcount++;
+}
+
+/*
+ * tuplestore_clear
+ *
+ * Delete all the contents of a tuplestore, and reset its read pointers
+ * to the start.
+ */
+void
+tuplestore_clear(Tuplestorestate *state)
+{
+ int i;
+ TSReadPointer *readptr;
+
+ if (state->myfile)
+ BufFileClose(state->myfile);
+ state->myfile = NULL;
+ if (state->memtuples)
+ {
+ for (i = state->memtupdeleted; i < state->memtupcount; i++)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
+ pfree(state->memtuples[i]);
+ }
+ }
+ state->status = TSS_INMEM;
+ state->truncated = false;
+ state->memtupdeleted = 0;
+ state->memtupcount = 0;
+ state->tuples = 0;
+ readptr = state->readptrs;
+ for (i = 0; i < state->readptrcount; readptr++, i++)
+ {
+ readptr->eof_reached = false;
+ readptr->current = 0;
+ }
+}
+
+/*
+ * tuplestore_end
+ *
+ * Release resources and clean up.
+ */
+void
+tuplestore_end(Tuplestorestate *state)
+{
+ int i;
+
+ if (state->myfile)
+ BufFileClose(state->myfile);
+ if (state->memtuples)
+ {
+ for (i = state->memtupdeleted; i < state->memtupcount; i++)
+ pfree(state->memtuples[i]);
+ pfree(state->memtuples);
+ }
+ pfree(state->readptrs);
+ pfree(state);
+}
+
+/*
+ * tuplestore_select_read_pointer - make the specified read pointer active
+ */
+void
+tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
+{
+ TSReadPointer *readptr;
+ TSReadPointer *oldptr;
+
+ Assert(ptr >= 0 && ptr < state->readptrcount);
+
+ /* No work if already active */
+ if (ptr == state->activeptr)
+ return;
+
+ readptr = &state->readptrs[ptr];
+ oldptr = &state->readptrs[state->activeptr];
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+ case TSS_WRITEFILE:
+ /* no work */
+ break;
+ case TSS_READFILE:
+
+ /*
+ * First, save the current read position in the pointer about to
+ * become inactive.
+ */
+ if (!oldptr->eof_reached)
+ BufFileTell(state->myfile,
+ &oldptr->file,
+ &oldptr->offset);
+
+ /*
+ * We have to make the temp file's seek position equal to the
+ * logical position of the new read pointer. In eof_reached
+ * state, that's the EOF, which we have available from the saved
+ * write position.
+ */
+ if (readptr->eof_reached)
+ {
+ if (BufFileSeek(state->myfile,
+ state->writepos_file,
+ state->writepos_offset,
+ SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ }
+ else
+ {
+ if (BufFileSeek(state->myfile,
+ readptr->file,
+ readptr->offset,
+ SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ }
+ break;
+ default:
+ elog(ERROR, "invalid tuplestore state");
+ break;
+ }
+
+ state->activeptr = ptr;
+}
+
+/*
+ * tuplestore_tuple_count
+ *
+ * Returns the number of tuples added since creation or the last
+ * tuplestore_clear().
+ */
+int64
+tuplestore_tuple_count(Tuplestorestate *state)
+{
+ return state->tuples;
+}
+
+/*
+ * tuplestore_ateof
+ *
+ * Returns the active read pointer's eof_reached state.
+ */
+bool
+tuplestore_ateof(Tuplestorestate *state)
+{
+ return state->readptrs[state->activeptr].eof_reached;
+}
+
+/*
+ * Grow the memtuples[] array, if possible within our memory constraint. We
+ * must not exceed INT_MAX tuples in memory or the caller-provided memory
+ * limit. Return true if we were able to enlarge the array, false if not.
+ *
+ * Normally, at each increment we double the size of the array. When doing
+ * that would exceed a limit, we attempt one last, smaller increase (and then
+ * clear the growmemtuples flag so we don't try any more). That allows us to
+ * use memory as fully as permitted; sticking to the pure doubling rule could
+ * result in almost half going unused. Because availMem moves around with
+ * tuple addition/removal, we need some rule to prevent making repeated small
+ * increases in memtupsize, which would just be useless thrashing. The
+ * growmemtuples flag accomplishes that and also prevents useless
+ * recalculations in this function.
+ */
+static bool
+grow_memtuples(Tuplestorestate *state)
+{
+ int newmemtupsize;
+ int memtupsize = state->memtupsize;
+ int64 memNowUsed = state->allowedMem - state->availMem;
+
+ /* Forget it if we've already maxed out memtuples, per comment above */
+ if (!state->growmemtuples)
+ return false;
+
+ /* Select new value of memtupsize */
+ if (memNowUsed <= state->availMem)
+ {
+ /*
+ * We've used no more than half of allowedMem; double our usage,
+ * clamping at INT_MAX tuples.
+ */
+ if (memtupsize < INT_MAX / 2)
+ newmemtupsize = memtupsize * 2;
+ else
+ {
+ newmemtupsize = INT_MAX;
+ state->growmemtuples = false;
+ }
+ }
+ else
+ {
+ /*
+ * This will be the last increment of memtupsize. Abandon doubling
+ * strategy and instead increase as much as we safely can.
+ *
+ * To stay within allowedMem, we can't increase memtupsize by more
+ * than availMem / sizeof(void *) elements. In practice, we want to
+ * increase it by considerably less, because we need to leave some
+ * space for the tuples to which the new array slots will refer. We
+ * assume the new tuples will be about the same size as the tuples
+ * we've already seen, and thus we can extrapolate from the space
+ * consumption so far to estimate an appropriate new size for the
+ * memtuples array. The optimal value might be higher or lower than
+ * this estimate, but it's hard to know that in advance. We again
+ * clamp at INT_MAX tuples.
+ *
+ * This calculation is safe against enlarging the array so much that
+ * LACKMEM becomes true, because the memory currently used includes
+ * the present array; thus, there would be enough allowedMem for the
+ * new array elements even if no other memory were currently used.
+ *
+ * We do the arithmetic in float8, because otherwise the product of
+ * memtupsize and allowedMem could overflow. Any inaccuracy in the
+ * result should be insignificant; but even if we computed a
+ * completely insane result, the checks below will prevent anything
+ * really bad from happening.
+ */
+ double grow_ratio;
+
+ grow_ratio = (double) state->allowedMem / (double) memNowUsed;
+ if (memtupsize * grow_ratio < INT_MAX)
+ newmemtupsize = (int) (memtupsize * grow_ratio);
+ else
+ newmemtupsize = INT_MAX;
+
+ /* We won't make any further enlargement attempts */
+ state->growmemtuples = false;
+ }
+
+ /* Must enlarge array by at least one element, else report failure */
+ if (newmemtupsize <= memtupsize)
+ goto noalloc;
+
+ /*
+ * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
+ * to ensure our request won't be rejected. Note that we can easily
+ * exhaust address space before facing this outcome. (This is presently
+ * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
+ * don't rely on that at this distance.)
+ */
+ if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
+ {
+ newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
+ state->growmemtuples = false; /* can't grow any more */
+ }
+
+ /*
+ * We need to be sure that we do not cause LACKMEM to become true, else
+ * the space management algorithm will go nuts. The code above should
+ * never generate a dangerous request, but to be safe, check explicitly
+ * that the array growth fits within availMem. (We could still cause
+ * LACKMEM if the memory chunk overhead associated with the memtuples
+ * array were to increase. That shouldn't happen because we chose the
+ * initial array size large enough to ensure that palloc will be treating
+ * both old and new arrays as separate chunks. But we'll check LACKMEM
+ * explicitly below just in case.)
+ */
+ if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
+ goto noalloc;
+
+ /* OK, do it */
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ state->memtupsize = newmemtupsize;
+ state->memtuples = (void **)
+ repalloc_huge(state->memtuples,
+ state->memtupsize * sizeof(void *));
+ USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ if (LACKMEM(state))
+ elog(ERROR, "unexpected out-of-memory situation in tuplestore");
+ return true;
+
+noalloc:
+ /* If for any reason we didn't realloc, shut off future attempts */
+ state->growmemtuples = false;
+ return false;
+}
+
+/*
+ * Accept one tuple and append it to the tuplestore.
+ *
+ * Note that the input tuple is always copied; the caller need not save it.
+ *
+ * If the active read pointer is currently "at EOF", it remains so (the read
+ * pointer implicitly advances along with the write pointer); otherwise the
+ * read pointer is unchanged. Non-active read pointers do not move, which
+ * means they are certain to not be "at EOF" immediately after puttuple.
+ * This curious-seeming behavior is for the convenience of nodeMaterial.c and
+ * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
+ * steps.
+ *
+ * tuplestore_puttupleslot() is a convenience routine to collect data from
+ * a TupleTableSlot without an extra copy operation.
+ */
+void
+tuplestore_puttupleslot(Tuplestorestate *state,
+ TupleTableSlot *slot)
+{
+ MinimalTuple tuple;
+ MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
+
+ /*
+ * Form a MinimalTuple in working memory
+ */
+ tuple = ExecCopySlotMinimalTuple(slot);
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+
+ tuplestore_puttuple_common(state, (void *) tuple);
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * "Standard" case to copy from a HeapTuple. This is actually now somewhat
+ * deprecated, but not worth getting rid of in view of the number of callers.
+ */
+void
+tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
+{
+ MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
+
+ /*
+ * Copy the tuple. (Must do this even in WRITEFILE case. Note that
+ * COPYTUP includes USEMEM, so we needn't do that here.)
+ */
+ tuple = COPYTUP(state, tuple);
+
+ tuplestore_puttuple_common(state, (void *) tuple);
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
+ * This avoids an extra tuple-construction operation.
+ */
+void
+tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
+ Datum *values, bool *isnull)
+{
+ MinimalTuple tuple;
+ MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
+
+ tuple = heap_form_minimal_tuple(tdesc, values, isnull);
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+
+ tuplestore_puttuple_common(state, (void *) tuple);
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+static void
+tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
+{
+ TSReadPointer *readptr;
+ int i;
+ ResourceOwner oldowner;
+
+ state->tuples++;
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+
+ /*
+ * Update read pointers as needed; see API spec above.
+ */
+ readptr = state->readptrs;
+ for (i = 0; i < state->readptrcount; readptr++, i++)
+ {
+ if (readptr->eof_reached && i != state->activeptr)
+ {
+ readptr->eof_reached = false;
+ readptr->current = state->memtupcount;
+ }
+ }
+
+ /*
+ * Grow the array as needed. Note that we try to grow the array
+ * when there is still one free slot remaining --- if we fail,
+ * there'll still be room to store the incoming tuple, and then
+ * we'll switch to tape-based operation.
+ */
+ if (state->memtupcount >= state->memtupsize - 1)
+ {
+ (void) grow_memtuples(state);
+ Assert(state->memtupcount < state->memtupsize);
+ }
+
+ /* Stash the tuple in the in-memory array */
+ state->memtuples[state->memtupcount++] = tuple;
+
+ /*
+ * Done if we still fit in available memory and have array slots.
+ */
+ if (state->memtupcount < state->memtupsize && !LACKMEM(state))
+ return;
+
+ /*
+ * Nope; time to switch to tape-based operation. Make sure that
+ * the temp file(s) are created in suitable temp tablespaces.
+ */
+ PrepareTempTablespaces();
+
+ /* associate the file with the store's resource owner */
+ oldowner = CurrentResourceOwner;
+ CurrentResourceOwner = state->resowner;
+
+ state->myfile = BufFileCreateTemp(state->interXact);
+
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * Freeze the decision about whether trailing length words will be
+ * used. We can't change this choice once data is on tape, even
+ * though callers might drop the requirement.
+ */
+ state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
+ state->status = TSS_WRITEFILE;
+ dumptuples(state);
+ break;
+ case TSS_WRITEFILE:
+
+ /*
+ * Update read pointers as needed; see API spec above. Note:
+ * BufFileTell is quite cheap, so not worth trying to avoid
+ * multiple calls.
+ */
+ readptr = state->readptrs;
+ for (i = 0; i < state->readptrcount; readptr++, i++)
+ {
+ if (readptr->eof_reached && i != state->activeptr)
+ {
+ readptr->eof_reached = false;
+ BufFileTell(state->myfile,
+ &readptr->file,
+ &readptr->offset);
+ }
+ }
+
+ WRITETUP(state, tuple);
+ break;
+ case TSS_READFILE:
+
+ /*
+ * Switch from reading to writing.
+ */
+ if (!state->readptrs[state->activeptr].eof_reached)
+ BufFileTell(state->myfile,
+ &state->readptrs[state->activeptr].file,
+ &state->readptrs[state->activeptr].offset);
+ if (BufFileSeek(state->myfile,
+ state->writepos_file, state->writepos_offset,
+ SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ state->status = TSS_WRITEFILE;
+
+ /*
+ * Update read pointers as needed; see API spec above.
+ */
+ readptr = state->readptrs;
+ for (i = 0; i < state->readptrcount; readptr++, i++)
+ {
+ if (readptr->eof_reached && i != state->activeptr)
+ {
+ readptr->eof_reached = false;
+ readptr->file = state->writepos_file;
+ readptr->offset = state->writepos_offset;
+ }
+ }
+
+ WRITETUP(state, tuple);
+ break;
+ default:
+ elog(ERROR, "invalid tuplestore state");
+ break;
+ }
+}
+
+/*
+ * Fetch the next tuple in either forward or back direction.
+ * Returns NULL if no more tuples. If should_free is set, the
+ * caller must pfree the returned tuple when done with it.
+ *
+ * Backward scan is only allowed if randomAccess was set true or
+ * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
+ */
+static void *
+tuplestore_gettuple(Tuplestorestate *state, bool forward,
+ bool *should_free)
+{
+ TSReadPointer *readptr = &state->readptrs[state->activeptr];
+ unsigned int tuplen;
+ void *tup;
+
+ Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+ *should_free = false;
+ if (forward)
+ {
+ if (readptr->eof_reached)
+ return NULL;
+ if (readptr->current < state->memtupcount)
+ {
+ /* We have another tuple, so return it */
+ return state->memtuples[readptr->current++];
+ }
+ readptr->eof_reached = true;
+ return NULL;
+ }
+ else
+ {
+ /*
+ * if all tuples are fetched already then we return last
+ * tuple, else tuple before last returned.
+ */
+ if (readptr->eof_reached)
+ {
+ readptr->current = state->memtupcount;
+ readptr->eof_reached = false;
+ }
+ else
+ {
+ if (readptr->current <= state->memtupdeleted)
+ {
+ Assert(!state->truncated);
+ return NULL;
+ }
+ readptr->current--; /* last returned tuple */
+ }
+ if (readptr->current <= state->memtupdeleted)
+ {
+ Assert(!state->truncated);
+ return NULL;
+ }
+ return state->memtuples[readptr->current - 1];
+ }
+ break;
+
+ case TSS_WRITEFILE:
+ /* Skip state change if we'll just return NULL */
+ if (readptr->eof_reached && forward)
+ return NULL;
+
+ /*
+ * Switch from writing to reading.
+ */
+ BufFileTell(state->myfile,
+ &state->writepos_file, &state->writepos_offset);
+ if (!readptr->eof_reached)
+ if (BufFileSeek(state->myfile,
+ readptr->file, readptr->offset,
+ SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ state->status = TSS_READFILE;
+ /* FALLTHROUGH */
+
+ case TSS_READFILE:
+ *should_free = true;
+ if (forward)
+ {
+ if ((tuplen = getlen(state, true)) != 0)
+ {
+ tup = READTUP(state, tuplen);
+ return tup;
+ }
+ else
+ {
+ readptr->eof_reached = true;
+ return NULL;
+ }
+ }
+
+ /*
+ * Backward.
+ *
+ * if all tuples are fetched already then we return last tuple,
+ * else tuple before last returned.
+ *
+ * Back up to fetch previously-returned tuple's ending length
+ * word. If seek fails, assume we are at start of file.
+ */
+ if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
+ SEEK_CUR) != 0)
+ {
+ /* even a failed backwards fetch gets you out of eof state */
+ readptr->eof_reached = false;
+ Assert(!state->truncated);
+ return NULL;
+ }
+ tuplen = getlen(state, false);
+
+ if (readptr->eof_reached)
+ {
+ readptr->eof_reached = false;
+ /* We will return the tuple returned before returning NULL */
+ }
+ else
+ {
+ /*
+ * Back up to get ending length word of tuple before it.
+ */
+ if (BufFileSeek(state->myfile, 0,
+ -(long) (tuplen + 2 * sizeof(unsigned int)),
+ SEEK_CUR) != 0)
+ {
+ /*
+ * If that fails, presumably the prev tuple is the first
+ * in the file. Back up so that it becomes next to read
+ * in forward direction (not obviously right, but that is
+ * what in-memory case does).
+ */
+ if (BufFileSeek(state->myfile, 0,
+ -(long) (tuplen + sizeof(unsigned int)),
+ SEEK_CUR) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ Assert(!state->truncated);
+ return NULL;
+ }
+ tuplen = getlen(state, false);
+ }
+
+ /*
+ * Now we have the length of the prior tuple, back up and read it.
+ * Note: READTUP expects we are positioned after the initial
+ * length word of the tuple, so back up to that point.
+ */
+ if (BufFileSeek(state->myfile, 0,
+ -(long) tuplen,
+ SEEK_CUR) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ tup = READTUP(state, tuplen);
+ return tup;
+
+ default:
+ elog(ERROR, "invalid tuplestore state");
+ return NULL; /* keep compiler quiet */
+ }
+}
+
+/*
+ * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
+ *
+ * If successful, put tuple in slot and return true; else, clear the slot
+ * and return false.
+ *
+ * If copy is true, the slot receives a copied tuple (allocated in current
+ * memory context) that will stay valid regardless of future manipulations of
+ * the tuplestore's state. If copy is false, the slot may just receive a
+ * pointer to a tuple held within the tuplestore. The latter is more
+ * efficient but the slot contents may be corrupted if additional writes to
+ * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
+ */
+bool
+tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
+ bool copy, TupleTableSlot *slot)
+{
+ MinimalTuple tuple;
+ bool should_free;
+
+ tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
+
+ if (tuple)
+ {
+ if (copy && !should_free)
+ {
+ tuple = heap_copy_minimal_tuple(tuple);
+ should_free = true;
+ }
+ ExecStoreMinimalTuple(tuple, slot, should_free);
+ return true;
+ }
+ else
+ {
+ ExecClearTuple(slot);
+ return false;
+ }
+}
+
+/*
+ * tuplestore_advance - exported function to adjust position without fetching
+ *
+ * We could optimize this case to avoid palloc/pfree overhead, but for the
+ * moment it doesn't seem worthwhile.
+ */
+bool
+tuplestore_advance(Tuplestorestate *state, bool forward)
+{
+ void *tuple;
+ bool should_free;
+
+ tuple = tuplestore_gettuple(state, forward, &should_free);
+
+ if (tuple)
+ {
+ if (should_free)
+ pfree(tuple);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+/*
+ * Advance over N tuples in either forward or back direction,
+ * without returning any data. N<=0 is a no-op.
+ * Returns true if successful, false if ran out of tuples.
+ */
+bool
+tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
+{
+ TSReadPointer *readptr = &state->readptrs[state->activeptr];
+
+ Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
+
+ if (ntuples <= 0)
+ return true;
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+ if (forward)
+ {
+ if (readptr->eof_reached)
+ return false;
+ if (state->memtupcount - readptr->current >= ntuples)
+ {
+ readptr->current += ntuples;
+ return true;
+ }
+ readptr->current = state->memtupcount;
+ readptr->eof_reached = true;
+ return false;
+ }
+ else
+ {
+ if (readptr->eof_reached)
+ {
+ readptr->current = state->memtupcount;
+ readptr->eof_reached = false;
+ ntuples--;
+ }
+ if (readptr->current - state->memtupdeleted > ntuples)
+ {
+ readptr->current -= ntuples;
+ return true;
+ }
+ Assert(!state->truncated);
+ readptr->current = state->memtupdeleted;
+ return false;
+ }
+ break;
+
+ default:
+ /* We don't currently try hard to optimize other cases */
+ while (ntuples-- > 0)
+ {
+ void *tuple;
+ bool should_free;
+
+ tuple = tuplestore_gettuple(state, forward, &should_free);
+
+ if (tuple == NULL)
+ return false;
+ if (should_free)
+ pfree(tuple);
+ CHECK_FOR_INTERRUPTS();
+ }
+ return true;
+ }
+}
+
+/*
+ * dumptuples - remove tuples from memory and write to tape
+ *
+ * As a side effect, we must convert each read pointer's position from
+ * "current" to file/offset format. But eof_reached pointers don't
+ * need to change state.
+ */
+static void
+dumptuples(Tuplestorestate *state)
+{
+ int i;
+
+ for (i = state->memtupdeleted;; i++)
+ {
+ TSReadPointer *readptr = state->readptrs;
+ int j;
+
+ for (j = 0; j < state->readptrcount; readptr++, j++)
+ {
+ if (i == readptr->current && !readptr->eof_reached)
+ BufFileTell(state->myfile,
+ &readptr->file, &readptr->offset);
+ }
+ if (i >= state->memtupcount)
+ break;
+ WRITETUP(state, state->memtuples[i]);
+ }
+ state->memtupdeleted = 0;
+ state->memtupcount = 0;
+}
+
+/*
+ * tuplestore_rescan - rewind the active read pointer to start
+ */
+void
+tuplestore_rescan(Tuplestorestate *state)
+{
+ TSReadPointer *readptr = &state->readptrs[state->activeptr];
+
+ Assert(readptr->eflags & EXEC_FLAG_REWIND);
+ Assert(!state->truncated);
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+ readptr->eof_reached = false;
+ readptr->current = 0;
+ break;
+ case TSS_WRITEFILE:
+ readptr->eof_reached = false;
+ readptr->file = 0;
+ readptr->offset = 0L;
+ break;
+ case TSS_READFILE:
+ readptr->eof_reached = false;
+ if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ break;
+ default:
+ elog(ERROR, "invalid tuplestore state");
+ break;
+ }
+}
+
+/*
+ * tuplestore_copy_read_pointer - copy a read pointer's state to another
+ */
+void
+tuplestore_copy_read_pointer(Tuplestorestate *state,
+ int srcptr, int destptr)
+{
+ TSReadPointer *sptr = &state->readptrs[srcptr];
+ TSReadPointer *dptr = &state->readptrs[destptr];
+
+ Assert(srcptr >= 0 && srcptr < state->readptrcount);
+ Assert(destptr >= 0 && destptr < state->readptrcount);
+
+ /* Assigning to self is a no-op */
+ if (srcptr == destptr)
+ return;
+
+ if (dptr->eflags != sptr->eflags)
+ {
+ /* Possible change of overall eflags, so copy and then recompute */
+ int eflags;
+ int i;
+
+ *dptr = *sptr;
+ eflags = state->readptrs[0].eflags;
+ for (i = 1; i < state->readptrcount; i++)
+ eflags |= state->readptrs[i].eflags;
+ state->eflags = eflags;
+ }
+ else
+ *dptr = *sptr;
+
+ switch (state->status)
+ {
+ case TSS_INMEM:
+ case TSS_WRITEFILE:
+ /* no work */
+ break;
+ case TSS_READFILE:
+
+ /*
+ * This case is a bit tricky since the active read pointer's
+ * position corresponds to the seek point, not what is in its
+ * variables. Assigning to the active requires a seek, and
+ * assigning from the active requires a tell, except when
+ * eof_reached.
+ */
+ if (destptr == state->activeptr)
+ {
+ if (dptr->eof_reached)
+ {
+ if (BufFileSeek(state->myfile,
+ state->writepos_file,
+ state->writepos_offset,
+ SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ }
+ else
+ {
+ if (BufFileSeek(state->myfile,
+ dptr->file, dptr->offset,
+ SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek in tuplestore temporary file")));
+ }
+ }
+ else if (srcptr == state->activeptr)
+ {
+ if (!dptr->eof_reached)
+ BufFileTell(state->myfile,
+ &dptr->file,
+ &dptr->offset);
+ }
+ break;
+ default:
+ elog(ERROR, "invalid tuplestore state");
+ break;
+ }
+}
+
+/*
+ * tuplestore_trim - remove all no-longer-needed tuples
+ *
+ * Calling this function authorizes the tuplestore to delete all tuples
+ * before the oldest read pointer, if no read pointer is marked as requiring
+ * REWIND capability.
+ *
+ * Note: this is obviously safe if no pointer has BACKWARD capability either.
+ * If a pointer is marked as BACKWARD but not REWIND capable, it means that
+ * the pointer can be moved backward but not before the oldest other read
+ * pointer.
+ */
+void
+tuplestore_trim(Tuplestorestate *state)
+{
+ int oldest;
+ int nremove;
+ int i;
+
+ /*
+ * Truncation is disallowed if any read pointer requires rewind
+ * capability.
+ */
+ if (state->eflags & EXEC_FLAG_REWIND)
+ return;
+
+ /*
+ * We don't bother trimming temp files since it usually would mean more
+ * work than just letting them sit in kernel buffers until they age out.
+ */
+ if (state->status != TSS_INMEM)
+ return;
+
+ /* Find the oldest read pointer */
+ oldest = state->memtupcount;
+ for (i = 0; i < state->readptrcount; i++)
+ {
+ if (!state->readptrs[i].eof_reached)
+ oldest = Min(oldest, state->readptrs[i].current);
+ }
+
+ /*
+ * Note: you might think we could remove all the tuples before the oldest
+ * "current", since that one is the next to be returned. However, since
+ * tuplestore_gettuple returns a direct pointer to our internal copy of
+ * the tuple, it's likely that the caller has still got the tuple just
+ * before "current" referenced in a slot. So we keep one extra tuple
+ * before the oldest "current". (Strictly speaking, we could require such
+ * callers to use the "copy" flag to tuplestore_gettupleslot, but for
+ * efficiency we allow this one case to not use "copy".)
+ */
+ nremove = oldest - 1;
+ if (nremove <= 0)
+ return; /* nothing to do */
+
+ Assert(nremove >= state->memtupdeleted);
+ Assert(nremove <= state->memtupcount);
+
+ /* Release no-longer-needed tuples */
+ for (i = state->memtupdeleted; i < nremove; i++)
+ {
+ FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
+ pfree(state->memtuples[i]);
+ state->memtuples[i] = NULL;
+ }
+ state->memtupdeleted = nremove;
+
+ /* mark tuplestore as truncated (used for Assert crosschecks only) */
+ state->truncated = true;
+
+ /*
+ * If nremove is less than 1/8th memtupcount, just stop here, leaving the
+ * "deleted" slots as NULL. This prevents us from expending O(N^2) time
+ * repeatedly memmove-ing a large pointer array. The worst case space
+ * wastage is pretty small, since it's just pointers and not whole tuples.
+ */
+ if (nremove < state->memtupcount / 8)
+ return;
+
+ /*
+ * Slide the array down and readjust pointers.
+ *
+ * In mergejoin's current usage, it's demonstrable that there will always
+ * be exactly one non-removed tuple; so optimize that case.
+ */
+ if (nremove + 1 == state->memtupcount)
+ state->memtuples[0] = state->memtuples[nremove];
+ else
+ memmove(state->memtuples, state->memtuples + nremove,
+ (state->memtupcount - nremove) * sizeof(void *));
+
+ state->memtupdeleted = 0;
+ state->memtupcount -= nremove;
+ for (i = 0; i < state->readptrcount; i++)
+ {
+ if (!state->readptrs[i].eof_reached)
+ state->readptrs[i].current -= nremove;
+ }
+}
+
+/*
+ * tuplestore_in_memory
+ *
+ * Returns true if the tuplestore has not spilled to disk.
+ *
+ * XXX exposing this is a violation of modularity ... should get rid of it.
+ */
+bool
+tuplestore_in_memory(Tuplestorestate *state)
+{
+ return (state->status == TSS_INMEM);
+}
+
+
+/*
+ * Tape interface routines
+ */
+
+static unsigned int
+getlen(Tuplestorestate *state, bool eofOK)
+{
+ unsigned int len;
+ size_t nbytes;
+
+ nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
+ if (nbytes == sizeof(len))
+ return len;
+ if (nbytes != 0 || !eofOK)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes",
+ nbytes, sizeof(len))));
+ return 0;
+}
+
+
+/*
+ * Routines specialized for HeapTuple case
+ *
+ * The stored form is actually a MinimalTuple, but for largely historical
+ * reasons we allow COPYTUP to work from a HeapTuple.
+ *
+ * Since MinimalTuple already has length in its first word, we don't need
+ * to write that separately.
+ */
+
+static void *
+copytup_heap(Tuplestorestate *state, void *tup)
+{
+ MinimalTuple tuple;
+
+ tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ return (void *) tuple;
+}
+
+static void
+writetup_heap(Tuplestorestate *state, void *tup)
+{
+ MinimalTuple tuple = (MinimalTuple) tup;
+
+ /* the part of the MinimalTuple we'll write: */
+ char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
+ unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
+
+ /* total on-disk footprint: */
+ unsigned int tuplen = tupbodylen + sizeof(int);
+
+ BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen));
+ BufFileWrite(state->myfile, (void *) tupbody, tupbodylen);
+ if (state->backward) /* need trailing length word? */
+ BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen));
+
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ heap_free_minimal_tuple(tuple);
+}
+
+static void *
+readtup_heap(Tuplestorestate *state, unsigned int len)
+{
+ unsigned int tupbodylen = len - sizeof(int);
+ unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
+ MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
+ char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
+ size_t nread;
+
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ /* read in the tuple proper */
+ tuple->t_len = tuplen;
+ nread = BufFileRead(state->myfile, (void *) tupbody, tupbodylen);
+ if (nread != (size_t) tupbodylen)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes",
+ nread, (size_t) tupbodylen)));
+ if (state->backward) /* need trailing length word? */
+ {
+ nread = BufFileRead(state->myfile, (void *) &tuplen, sizeof(tuplen));
+ if (nread != sizeof(tuplen))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes",
+ nread, sizeof(tuplen))));
+ }
+ return (void *) tuple;
+}