diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:17:33 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:17:33 +0000 |
commit | 5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch) | |
tree | 739caf8c461053357daa9f162bef34516c7bf452 /src/backend/utils/sort | |
parent | Initial commit. (diff) | |
download | postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip |
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/utils/sort')
-rw-r--r-- | src/backend/utils/sort/Makefile | 25 | ||||
-rw-r--r-- | src/backend/utils/sort/logtape.c | 1193 | ||||
-rw-r--r-- | src/backend/utils/sort/qsort_interruptible.c | 16 | ||||
-rw-r--r-- | src/backend/utils/sort/sharedtuplestore.c | 631 | ||||
-rw-r--r-- | src/backend/utils/sort/sortsupport.c | 211 | ||||
-rw-r--r-- | src/backend/utils/sort/tuplesort.c | 4938 | ||||
-rw-r--r-- | src/backend/utils/sort/tuplestore.c | 1552 |
7 files changed, 8566 insertions, 0 deletions
diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile new file mode 100644 index 0000000..2c31fd4 --- /dev/null +++ b/src/backend/utils/sort/Makefile @@ -0,0 +1,25 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for utils/sort +# +# IDENTIFICATION +# src/backend/utils/sort/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/utils/sort +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) + +OBJS = \ + logtape.o \ + qsort_interruptible.o \ + sharedtuplestore.o \ + sortsupport.o \ + tuplesort.o \ + tuplestore.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c new file mode 100644 index 0000000..31db75d --- /dev/null +++ b/src/backend/utils/sort/logtape.c @@ -0,0 +1,1193 @@ +/*------------------------------------------------------------------------- + * + * logtape.c + * Management of "logical tapes" within temporary files. + * + * This module exists to support sorting via multiple merge passes (see + * tuplesort.c). Merging is an ideal algorithm for tape devices, but if + * we implement it on disk by creating a separate file for each "tape", + * there is an annoying problem: the peak space usage is at least twice + * the volume of actual data to be sorted. (This must be so because each + * datum will appear in both the input and output tapes of the final + * merge pass.) + * + * We can work around this problem by recognizing that any one tape + * dataset (with the possible exception of the final output) is written + * and read exactly once in a perfectly sequential manner. Therefore, + * a datum once read will not be required again, and we can recycle its + * space for use by the new tape dataset(s) being generated. In this way, + * the total space usage is essentially just the actual data volume, plus + * insignificant bookkeeping and start/stop overhead. + * + * Few OSes allow arbitrary parts of a file to be released back to the OS, + * so we have to implement this space-recycling ourselves within a single + * logical file. logtape.c exists to perform this bookkeeping and provide + * the illusion of N independent tape devices to tuplesort.c. Note that + * logtape.c itself depends on buffile.c to provide a "logical file" of + * larger size than the underlying OS may support. + * + * For simplicity, we allocate and release space in the underlying file + * in BLCKSZ-size blocks. Space allocation boils down to keeping track + * of which blocks in the underlying file belong to which logical tape, + * plus any blocks that are free (recycled and not yet reused). + * The blocks in each logical tape form a chain, with a prev- and next- + * pointer in each block. + * + * The initial write pass is guaranteed to fill the underlying file + * perfectly sequentially, no matter how data is divided into logical tapes. + * Once we begin merge passes, the access pattern becomes considerably + * less predictable --- but the seeking involved should be comparable to + * what would happen if we kept each logical tape in a separate file, + * so there's no serious performance penalty paid to obtain the space + * savings of recycling. We try to localize the write accesses by always + * writing to the lowest-numbered free block when we have a choice; it's + * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO + * policy for free blocks would be better?) + * + * To further make the I/Os more sequential, we can use a larger buffer + * when reading, and read multiple blocks from the same tape in one go, + * whenever the buffer becomes empty. + * + * To support the above policy of writing to the lowest free block, the + * freelist is a min heap. + * + * Since all the bookkeeping and buffer memory is allocated with palloc(), + * and the underlying file(s) are made with OpenTemporaryFile, all resources + * for a logical tape set are certain to be cleaned up even if processing + * is aborted by ereport(ERROR). To avoid confusion, the caller should take + * care that all calls for a single LogicalTapeSet are made in the same + * palloc context. + * + * To support parallel sort operations involving coordinated callers to + * tuplesort.c routines across multiple workers, it is necessary to + * concatenate each worker BufFile/tapeset into one single logical tapeset + * managed by the leader. Workers should have produced one final + * materialized tape (their entire output) when this happens in leader. + * There will always be the same number of runs as input tapes, and the same + * number of input tapes as participants (worker Tuplesortstates). + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/utils/sort/logtape.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <fcntl.h> + +#include "storage/buffile.h" +#include "utils/builtins.h" +#include "utils/logtape.h" +#include "utils/memdebug.h" +#include "utils/memutils.h" + +/* + * A TapeBlockTrailer is stored at the end of each BLCKSZ block. + * + * The first block of a tape has prev == -1. The last block of a tape + * stores the number of valid bytes on the block, inverted, in 'next' + * Therefore next < 0 indicates the last block. + */ +typedef struct TapeBlockTrailer +{ + long prev; /* previous block on this tape, or -1 on first + * block */ + long next; /* next block on this tape, or # of valid + * bytes on last block (if < 0) */ +} TapeBlockTrailer; + +#define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer)) +#define TapeBlockGetTrailer(buf) \ + ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize)) + +#define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0) +#define TapeBlockGetNBytes(buf) \ + (TapeBlockIsLast(buf) ? \ + (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize) +#define TapeBlockSetNBytes(buf, nbytes) \ + (TapeBlockGetTrailer(buf)->next = -(nbytes)) + +/* + * When multiple tapes are being written to concurrently (as in HashAgg), + * avoid excessive fragmentation by preallocating block numbers to individual + * tapes. Each preallocation doubles in size starting at + * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks. + * + * No filesystem operations are performed for preallocation; only the block + * numbers are reserved. This may lead to sparse writes, which will cause + * ltsWriteBlock() to fill in holes with zeros. + */ +#define TAPE_WRITE_PREALLOC_MIN 8 +#define TAPE_WRITE_PREALLOC_MAX 128 + +/* + * This data structure represents a single "logical tape" within the set + * of logical tapes stored in the same file. + * + * While writing, we hold the current partially-written data block in the + * buffer. While reading, we can hold multiple blocks in the buffer. Note + * that we don't retain the trailers of a block when it's read into the + * buffer. The buffer therefore contains one large contiguous chunk of data + * from the tape. + */ +struct LogicalTape +{ + LogicalTapeSet *tapeSet; /* tape set this tape is part of */ + + bool writing; /* T while in write phase */ + bool frozen; /* T if blocks should not be freed when read */ + bool dirty; /* does buffer need to be written? */ + + /* + * Block numbers of the first, current, and next block of the tape. + * + * The "current" block number is only valid when writing, or reading from + * a frozen tape. (When reading from an unfrozen tape, we use a larger + * read buffer that holds multiple blocks, so the "current" block is + * ambiguous.) + * + * When concatenation of worker tape BufFiles is performed, an offset to + * the first block in the unified BufFile space is applied during reads. + */ + long firstBlockNumber; + long curBlockNumber; + long nextBlockNumber; + long offsetBlockNumber; + + /* + * Buffer for current data block(s). + */ + char *buffer; /* physical buffer (separately palloc'd) */ + int buffer_size; /* allocated size of the buffer */ + int max_size; /* highest useful, safe buffer_size */ + int pos; /* next read/write position in buffer */ + int nbytes; /* total # of valid bytes in buffer */ + + /* + * Preallocated block numbers are held in an array sorted in descending + * order; blocks are consumed from the end of the array (lowest block + * numbers first). + */ + long *prealloc; + int nprealloc; /* number of elements in list */ + int prealloc_size; /* number of elements list can hold */ +}; + +/* + * This data structure represents a set of related "logical tapes" sharing + * space in a single underlying file. (But that "file" may be multiple files + * if needed to escape OS limits on file size; buffile.c handles that for us.) + * Tapes belonging to a tape set can be created and destroyed on-the-fly, on + * demand. + */ +struct LogicalTapeSet +{ + BufFile *pfile; /* underlying file for whole tape set */ + SharedFileSet *fileset; + int worker; /* worker # if shared, -1 for leader/serial */ + + /* + * File size tracking. nBlocksWritten is the size of the underlying file, + * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated + * by ltsReleaseBlock(), and it is always greater than or equal to + * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are + * blocks that have been allocated for a tape, but have not been written + * to the underlying file yet. nHoleBlocks tracks the total number of + * blocks that are in unused holes between worker spaces following BufFile + * concatenation. + */ + long nBlocksAllocated; /* # of blocks allocated */ + long nBlocksWritten; /* # of blocks used in underlying file */ + long nHoleBlocks; /* # of "hole" blocks left */ + + /* + * We store the numbers of recycled-and-available blocks in freeBlocks[]. + * When there are no such blocks, we extend the underlying file. + * + * If forgetFreeSpace is true then any freed blocks are simply forgotten + * rather than being remembered in freeBlocks[]. See notes for + * LogicalTapeSetForgetFreeSpace(). + */ + bool forgetFreeSpace; /* are we remembering free blocks? */ + long *freeBlocks; /* resizable array holding minheap */ + long nFreeBlocks; /* # of currently free blocks */ + Size freeBlocksLen; /* current allocated length of freeBlocks[] */ + bool enable_prealloc; /* preallocate write blocks? */ +}; + +static LogicalTape *ltsCreateTape(LogicalTapeSet *lts); +static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); +static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer); +static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt); +static long ltsGetFreeBlock(LogicalTapeSet *lts); +static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt); +static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); +static void ltsInitReadBuffer(LogicalTape *lt); + + +/* + * Write a block-sized buffer to the specified block of the underlying file. + * + * No need for an error return convention; we ereport() on any error. + */ +static void +ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer) +{ + /* + * BufFile does not support "holes", so if we're about to write a block + * that's past the current end of file, fill the space between the current + * end of file and the target block with zeros. + * + * This can happen either when tapes preallocate blocks; or for the last + * block of a tape which might not have been flushed. + * + * Note that BufFile concatenation can leave "holes" in BufFile between + * worker-owned block ranges. These are tracked for reporting purposes + * only. We never read from nor write to these hole blocks, and so they + * are not considered here. + */ + while (blocknum > lts->nBlocksWritten) + { + PGAlignedBlock zerobuf; + + MemSet(zerobuf.data, 0, sizeof(zerobuf)); + + ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data); + } + + /* Write the requested block */ + if (BufFileSeekBlock(lts->pfile, blocknum) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to block %ld of temporary file", + blocknum))); + BufFileWrite(lts->pfile, buffer, BLCKSZ); + + /* Update nBlocksWritten, if we extended the file */ + if (blocknum == lts->nBlocksWritten) + lts->nBlocksWritten++; +} + +/* + * Read a block-sized buffer from the specified block of the underlying file. + * + * No need for an error return convention; we ereport() on any error. This + * module should never attempt to read a block it doesn't know is there. + */ +static void +ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer) +{ + size_t nread; + + if (BufFileSeekBlock(lts->pfile, blocknum) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to block %ld of temporary file", + blocknum))); + nread = BufFileRead(lts->pfile, buffer, BLCKSZ); + if (nread != BLCKSZ) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read block %ld of temporary file: read only %zu of %zu bytes", + blocknum, nread, (size_t) BLCKSZ))); +} + +/* + * Read as many blocks as we can into the per-tape buffer. + * + * Returns true if anything was read, 'false' on EOF. + */ +static bool +ltsReadFillBuffer(LogicalTape *lt) +{ + lt->pos = 0; + lt->nbytes = 0; + + do + { + char *thisbuf = lt->buffer + lt->nbytes; + long datablocknum = lt->nextBlockNumber; + + /* Fetch next block number */ + if (datablocknum == -1L) + break; /* EOF */ + /* Apply worker offset, needed for leader tapesets */ + datablocknum += lt->offsetBlockNumber; + + /* Read the block */ + ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf); + if (!lt->frozen) + ltsReleaseBlock(lt->tapeSet, datablocknum); + lt->curBlockNumber = lt->nextBlockNumber; + + lt->nbytes += TapeBlockGetNBytes(thisbuf); + if (TapeBlockIsLast(thisbuf)) + { + lt->nextBlockNumber = -1L; + /* EOF */ + break; + } + else + lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next; + + /* Advance to next block, if we have buffer space left */ + } while (lt->buffer_size - lt->nbytes > BLCKSZ); + + return (lt->nbytes > 0); +} + +static inline unsigned long +left_offset(unsigned long i) +{ + return 2 * i + 1; +} + +static inline unsigned long +right_offset(unsigned long i) +{ + return 2 * i + 2; +} + +static inline unsigned long +parent_offset(unsigned long i) +{ + return (i - 1) / 2; +} + +/* + * Get the next block for writing. + */ +static long +ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt) +{ + if (lts->enable_prealloc) + return ltsGetPreallocBlock(lts, lt); + else + return ltsGetFreeBlock(lts); +} + +/* + * Select the lowest currently unused block from the tape set's global free + * list min heap. + */ +static long +ltsGetFreeBlock(LogicalTapeSet *lts) +{ + long *heap = lts->freeBlocks; + long blocknum; + long heapsize; + long holeval; + unsigned long holepos; + + /* freelist empty; allocate a new block */ + if (lts->nFreeBlocks == 0) + return lts->nBlocksAllocated++; + + /* easy if heap contains one element */ + if (lts->nFreeBlocks == 1) + { + lts->nFreeBlocks--; + return lts->freeBlocks[0]; + } + + /* remove top of minheap */ + blocknum = heap[0]; + + /* we'll replace it with end of minheap array */ + holeval = heap[--lts->nFreeBlocks]; + + /* sift down */ + holepos = 0; /* holepos is where the "hole" is */ + heapsize = lts->nFreeBlocks; + while (true) + { + unsigned long left = left_offset(holepos); + unsigned long right = right_offset(holepos); + unsigned long min_child; + + if (left < heapsize && right < heapsize) + min_child = (heap[left] < heap[right]) ? left : right; + else if (left < heapsize) + min_child = left; + else if (right < heapsize) + min_child = right; + else + break; + + if (heap[min_child] >= holeval) + break; + + heap[holepos] = heap[min_child]; + holepos = min_child; + } + heap[holepos] = holeval; + + return blocknum; +} + +/* + * Return the lowest free block number from the tape's preallocation list. + * Refill the preallocation list with blocks from the tape set's free list if + * necessary. + */ +static long +ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt) +{ + /* sorted in descending order, so return the last element */ + if (lt->nprealloc > 0) + return lt->prealloc[--lt->nprealloc]; + + if (lt->prealloc == NULL) + { + lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN; + lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size); + } + else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX) + { + /* when the preallocation list runs out, double the size */ + lt->prealloc_size *= 2; + if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX) + lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX; + lt->prealloc = (long *) repalloc(lt->prealloc, + sizeof(long) * lt->prealloc_size); + } + + /* refill preallocation list */ + lt->nprealloc = lt->prealloc_size; + for (int i = lt->nprealloc; i > 0; i--) + { + lt->prealloc[i - 1] = ltsGetFreeBlock(lts); + + /* verify descending order */ + Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]); + } + + return lt->prealloc[--lt->nprealloc]; +} + +/* + * Return a block# to the freelist. + */ +static void +ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) +{ + long *heap; + unsigned long holepos; + + /* + * Do nothing if we're no longer interested in remembering free space. + */ + if (lts->forgetFreeSpace) + return; + + /* + * Enlarge freeBlocks array if full. + */ + if (lts->nFreeBlocks >= lts->freeBlocksLen) + { + /* + * If the freelist becomes very large, just return and leak this free + * block. + */ + if (lts->freeBlocksLen * 2 * sizeof(long) > MaxAllocSize) + return; + + lts->freeBlocksLen *= 2; + lts->freeBlocks = (long *) repalloc(lts->freeBlocks, + lts->freeBlocksLen * sizeof(long)); + } + + /* create a "hole" at end of minheap array */ + heap = lts->freeBlocks; + holepos = lts->nFreeBlocks; + lts->nFreeBlocks++; + + /* sift up to insert blocknum */ + while (holepos != 0) + { + unsigned long parent = parent_offset(holepos); + + if (heap[parent] < blocknum) + break; + + heap[holepos] = heap[parent]; + holepos = parent; + } + heap[holepos] = blocknum; +} + +/* + * Lazily allocate and initialize the read buffer. This avoids waste when many + * tapes are open at once, but not all are active between rewinding and + * reading. + */ +static void +ltsInitReadBuffer(LogicalTape *lt) +{ + Assert(lt->buffer_size > 0); + lt->buffer = palloc(lt->buffer_size); + + /* Read the first block, or reset if tape is empty */ + lt->nextBlockNumber = lt->firstBlockNumber; + lt->pos = 0; + lt->nbytes = 0; + ltsReadFillBuffer(lt); +} + +/* + * Create a tape set, backed by a temporary underlying file. + * + * The tape set is initially empty. Use LogicalTapeCreate() to create + * tapes in it. + * + * In a single-process sort, pass NULL argument for fileset, and -1 for + * worker. + * + * In a parallel sort, parallel workers pass the shared fileset handle and + * their own worker number. After the workers have finished, create the + * tape set in the leader, passing the shared fileset handle and -1 for + * worker, and use LogicalTapeImport() to import the worker tapes into it. + * + * Currently, the leader will only import worker tapes into the set, it does + * not create tapes of its own, although in principle that should work. + * + * If preallocate is true, blocks for each individual tape are allocated in + * batches. This avoids fragmentation when writing multiple tapes at the + * same time. + */ +LogicalTapeSet * +LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker) +{ + LogicalTapeSet *lts; + + /* + * Create top-level struct including per-tape LogicalTape structs. + */ + lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet)); + lts->nBlocksAllocated = 0L; + lts->nBlocksWritten = 0L; + lts->nHoleBlocks = 0L; + lts->forgetFreeSpace = false; + lts->freeBlocksLen = 32; /* reasonable initial guess */ + lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long)); + lts->nFreeBlocks = 0; + lts->enable_prealloc = preallocate; + + lts->fileset = fileset; + lts->worker = worker; + + /* + * Create temp BufFile storage as required. + * + * In leader, we hijack the BufFile of the first tape that's imported, and + * concatenate the BufFiles of any subsequent tapes to that. Hence don't + * create a BufFile here. Things are simpler for the worker case and the + * serial case, though. They are generally very similar -- workers use a + * shared fileset, whereas serial sorts use a conventional serial BufFile. + */ + if (fileset && worker == -1) + lts->pfile = NULL; + else if (fileset) + { + char filename[MAXPGPATH]; + + pg_itoa(worker, filename); + lts->pfile = BufFileCreateFileSet(&fileset->fs, filename); + } + else + lts->pfile = BufFileCreateTemp(false); + + return lts; +} + +/* + * Claim ownership of a logical tape from an existing shared BufFile. + * + * Caller should be leader process. Though tapes are marked as frozen in + * workers, they are not frozen when opened within leader, since unfrozen tapes + * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized + * for random access.) + */ +LogicalTape * +LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared) +{ + LogicalTape *lt; + long tapeblocks; + char filename[MAXPGPATH]; + BufFile *file; + int64 filesize; + + lt = ltsCreateTape(lts); + + /* + * build concatenated view of all buffiles, remembering the block number + * where each source file begins. + */ + pg_itoa(worker, filename); + file = BufFileOpenFileSet(<s->fileset->fs, filename, O_RDONLY, false); + filesize = BufFileSize(file); + + /* + * Stash first BufFile, and concatenate subsequent BufFiles to that. Store + * block offset into each tape as we go. + */ + lt->firstBlockNumber = shared->firstblocknumber; + if (lts->pfile == NULL) + { + lts->pfile = file; + lt->offsetBlockNumber = 0L; + } + else + { + lt->offsetBlockNumber = BufFileAppend(lts->pfile, file); + } + /* Don't allocate more for read buffer than could possibly help */ + lt->max_size = Min(MaxAllocSize, filesize); + tapeblocks = filesize / BLCKSZ; + + /* + * Update # of allocated blocks and # blocks written to reflect the + * imported BufFile. Allocated/written blocks include space used by holes + * left between concatenated BufFiles. Also track the number of hole + * blocks so that we can later work backwards to calculate the number of + * physical blocks for instrumentation. + */ + lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated; + + lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks; + lts->nBlocksWritten = lts->nBlocksAllocated; + + return lt; +} + +/* + * Close a logical tape set and release all resources. + * + * NOTE: This doesn't close any of the tapes! You must close them + * first, or you can let them be destroyed along with the memory context. + */ +void +LogicalTapeSetClose(LogicalTapeSet *lts) +{ + BufFileClose(lts->pfile); + pfree(lts->freeBlocks); + pfree(lts); +} + +/* + * Create a logical tape in the given tapeset. + * + * The tape is initialized in write state. + */ +LogicalTape * +LogicalTapeCreate(LogicalTapeSet *lts) +{ + /* + * The only thing that currently prevents creating new tapes in leader is + * the fact that BufFiles opened using BufFileOpenShared() are read-only + * by definition, but that could be changed if it seemed worthwhile. For + * now, writing to the leader tape will raise a "Bad file descriptor" + * error, so tuplesort must avoid writing to the leader tape altogether. + */ + if (lts->fileset && lts->worker == -1) + elog(ERROR, "cannot create new tapes in leader process"); + + return ltsCreateTape(lts); +} + +static LogicalTape * +ltsCreateTape(LogicalTapeSet *lts) +{ + LogicalTape *lt; + + /* + * Create per-tape struct. Note we allocate the I/O buffer lazily. + */ + lt = palloc(sizeof(LogicalTape)); + lt->tapeSet = lts; + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; + lt->prealloc = NULL; + lt->nprealloc = 0; + lt->prealloc_size = 0; + + return lt; +} + +/* + * Close a logical tape. + * + * Note: This doesn't return any blocks to the free list! You must read + * the tape to the end first, to reuse the space. In current use, though, + * we only close tapes after fully reading them. + */ +void +LogicalTapeClose(LogicalTape *lt) +{ + if (lt->buffer) + pfree(lt->buffer); + pfree(lt); +} + +/* + * Mark a logical tape set as not needing management of free space anymore. + * + * This should be called if the caller does not intend to write any more data + * into the tape set, but is reading from un-frozen tapes. Since no more + * writes are planned, remembering free blocks is no longer useful. Setting + * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which + * is not designed to handle large numbers of free blocks. + */ +void +LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts) +{ + lts->forgetFreeSpace = true; +} + +/* + * Write to a logical tape. + * + * There are no error returns; we ereport() on failure. + */ +void +LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size) +{ + LogicalTapeSet *lts = lt->tapeSet; + size_t nthistime; + + Assert(lt->writing); + Assert(lt->offsetBlockNumber == 0L); + + /* Allocate data buffer and first block on first write */ + if (lt->buffer == NULL) + { + lt->buffer = (char *) palloc(BLCKSZ); + lt->buffer_size = BLCKSZ; + } + if (lt->curBlockNumber == -1) + { + Assert(lt->firstBlockNumber == -1); + Assert(lt->pos == 0); + + lt->curBlockNumber = ltsGetBlock(lts, lt); + lt->firstBlockNumber = lt->curBlockNumber; + + TapeBlockGetTrailer(lt->buffer)->prev = -1L; + } + + Assert(lt->buffer_size == BLCKSZ); + while (size > 0) + { + if (lt->pos >= (int) TapeBlockPayloadSize) + { + /* Buffer full, dump it out */ + long nextBlockNumber; + + if (!lt->dirty) + { + /* Hmm, went directly from reading to writing? */ + elog(ERROR, "invalid logtape state: should be dirty"); + } + + /* + * First allocate the next block, so that we can store it in the + * 'next' pointer of this block. + */ + nextBlockNumber = ltsGetBlock(lt->tapeSet, lt); + + /* set the next-pointer and dump the current block. */ + TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber; + ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); + + /* initialize the prev-pointer of the next block */ + TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber; + lt->curBlockNumber = nextBlockNumber; + lt->pos = 0; + lt->nbytes = 0; + } + + nthistime = TapeBlockPayloadSize - lt->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(lt->buffer + lt->pos, ptr, nthistime); + + lt->dirty = true; + lt->pos += nthistime; + if (lt->nbytes < lt->pos) + lt->nbytes = lt->pos; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + } +} + +/* + * Rewind logical tape and switch from writing to reading. + * + * The tape must currently be in writing state, or "frozen" in read state. + * + * 'buffer_size' specifies how much memory to use for the read buffer. + * Regardless of the argument, the actual amount of memory used is between + * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is + * rounded down and truncated to fit those constraints, if necessary. If the + * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ + * byte buffer is used. + */ +void +LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size) +{ + LogicalTapeSet *lts = lt->tapeSet; + + /* + * Round and cap buffer_size if needed. + */ + if (lt->frozen) + buffer_size = BLCKSZ; + else + { + /* need at least one block */ + if (buffer_size < BLCKSZ) + buffer_size = BLCKSZ; + + /* palloc() larger than max_size is unlikely to be helpful */ + if (buffer_size > lt->max_size) + buffer_size = lt->max_size; + + /* round down to BLCKSZ boundary */ + buffer_size -= buffer_size % BLCKSZ; + } + + if (lt->writing) + { + /* + * Completion of a write phase. Flush last partial data block, and + * rewind for normal (destructive) read. + */ + if (lt->dirty) + { + /* + * As long as we've filled the buffer at least once, its contents + * are entirely defined from valgrind's point of view, even though + * contents beyond the current end point may be stale. But it's + * possible - at least in the case of a parallel sort - to sort + * such small amount of data that we do not fill the buffer even + * once. Tell valgrind that its contents are defined, so it + * doesn't bleat. + */ + VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes, + lt->buffer_size - lt->nbytes); + + TapeBlockSetNBytes(lt->buffer, lt->nbytes); + ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); + } + lt->writing = false; + } + else + { + /* + * This is only OK if tape is frozen; we rewind for (another) read + * pass. + */ + Assert(lt->frozen); + } + + if (lt->buffer) + pfree(lt->buffer); + + /* the buffer is lazily allocated, but set the size here */ + lt->buffer = NULL; + lt->buffer_size = buffer_size; + + /* free the preallocation list, and return unused block numbers */ + if (lt->prealloc != NULL) + { + for (int i = lt->nprealloc; i > 0; i--) + ltsReleaseBlock(lts, lt->prealloc[i - 1]); + pfree(lt->prealloc); + lt->prealloc = NULL; + lt->nprealloc = 0; + lt->prealloc_size = 0; + } +} + +/* + * Read from a logical tape. + * + * Early EOF is indicated by return value less than #bytes requested. + */ +size_t +LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size) +{ + size_t nread = 0; + size_t nthistime; + + Assert(!lt->writing); + + if (lt->buffer == NULL) + ltsInitReadBuffer(lt); + + while (size > 0) + { + if (lt->pos >= lt->nbytes) + { + /* Try to load more data into buffer. */ + if (!ltsReadFillBuffer(lt)) + break; /* EOF */ + } + + nthistime = lt->nbytes - lt->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(ptr, lt->buffer + lt->pos, nthistime); + + lt->pos += nthistime; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + nread += nthistime; + } + + return nread; +} + +/* + * "Freeze" the contents of a tape so that it can be read multiple times + * and/or read backwards. Once a tape is frozen, its contents will not + * be released until the LogicalTapeSet is destroyed. This is expected + * to be used only for the final output pass of a merge. + * + * This *must* be called just at the end of a write pass, before the + * tape is rewound (after rewind is too late!). It performs a rewind + * and switch to read mode "for free". An immediately following rewind- + * for-read call is OK but not necessary. + * + * share output argument is set with details of storage used for tape after + * freezing, which may be passed to LogicalTapeSetCreate within leader + * process later. This metadata is only of interest to worker callers + * freezing their final output for leader (single materialized tape). + * Serial sorts should set share to NULL. + */ +void +LogicalTapeFreeze(LogicalTape *lt, TapeShare *share) +{ + LogicalTapeSet *lts = lt->tapeSet; + + Assert(lt->writing); + Assert(lt->offsetBlockNumber == 0L); + + /* + * Completion of a write phase. Flush last partial data block, and rewind + * for nondestructive read. + */ + if (lt->dirty) + { + /* + * As long as we've filled the buffer at least once, its contents are + * entirely defined from valgrind's point of view, even though + * contents beyond the current end point may be stale. But it's + * possible - at least in the case of a parallel sort - to sort such + * small amount of data that we do not fill the buffer even once. Tell + * valgrind that its contents are defined, so it doesn't bleat. + */ + VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes, + lt->buffer_size - lt->nbytes); + + TapeBlockSetNBytes(lt->buffer, lt->nbytes); + ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); + } + lt->writing = false; + lt->frozen = true; + + /* + * The seek and backspace functions assume a single block read buffer. + * That's OK with current usage. A larger buffer is helpful to make the + * read pattern of the backing file look more sequential to the OS, when + * we're reading from multiple tapes. But at the end of a sort, when a + * tape is frozen, we only read from a single tape anyway. + */ + if (!lt->buffer || lt->buffer_size != BLCKSZ) + { + if (lt->buffer) + pfree(lt->buffer); + lt->buffer = palloc(BLCKSZ); + lt->buffer_size = BLCKSZ; + } + + /* Read the first block, or reset if tape is empty */ + lt->curBlockNumber = lt->firstBlockNumber; + lt->pos = 0; + lt->nbytes = 0; + + if (lt->firstBlockNumber == -1L) + lt->nextBlockNumber = -1L; + ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer); + if (TapeBlockIsLast(lt->buffer)) + lt->nextBlockNumber = -1L; + else + lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next; + lt->nbytes = TapeBlockGetNBytes(lt->buffer); + + /* Handle extra steps when caller is to share its tapeset */ + if (share) + { + BufFileExportFileSet(lts->pfile); + share->firstblocknumber = lt->firstBlockNumber; + } +} + +/* + * Backspace the tape a given number of bytes. (We also support a more + * general seek interface, see below.) + * + * *Only* a frozen-for-read tape can be backed up; we don't support + * random access during write, and an unfrozen read tape may have + * already discarded the desired data! + * + * Returns the number of bytes backed up. It can be less than the + * requested amount, if there isn't that much data before the current + * position. The tape is positioned to the beginning of the tape in + * that case. + */ +size_t +LogicalTapeBackspace(LogicalTape *lt, size_t size) +{ + size_t seekpos = 0; + + Assert(lt->frozen); + Assert(lt->buffer_size == BLCKSZ); + + if (lt->buffer == NULL) + ltsInitReadBuffer(lt); + + /* + * Easy case for seek within current block. + */ + if (size <= (size_t) lt->pos) + { + lt->pos -= (int) size; + return size; + } + + /* + * Not-so-easy case, have to walk back the chain of blocks. This + * implementation would be pretty inefficient for long seeks, but we + * really aren't doing that (a seek over one tuple is typical). + */ + seekpos = (size_t) lt->pos; /* part within this block */ + while (size > seekpos) + { + long prev = TapeBlockGetTrailer(lt->buffer)->prev; + + if (prev == -1L) + { + /* Tried to back up beyond the beginning of tape. */ + if (lt->curBlockNumber != lt->firstBlockNumber) + elog(ERROR, "unexpected end of tape"); + lt->pos = 0; + return seekpos; + } + + ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer); + + if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber) + elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld", + prev, + TapeBlockGetTrailer(lt->buffer)->next, + lt->curBlockNumber); + + lt->nbytes = TapeBlockPayloadSize; + lt->curBlockNumber = prev; + lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next; + + seekpos += TapeBlockPayloadSize; + } + + /* + * 'seekpos' can now be greater than 'size', because it points to the + * beginning the target block. The difference is the position within the + * page. + */ + lt->pos = seekpos - size; + return size; +} + +/* + * Seek to an arbitrary position in a logical tape. + * + * *Only* a frozen-for-read tape can be seeked. + * + * Must be called with a block/offset previously returned by + * LogicalTapeTell(). + */ +void +LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset) +{ + Assert(lt->frozen); + Assert(offset >= 0 && offset <= TapeBlockPayloadSize); + Assert(lt->buffer_size == BLCKSZ); + + if (lt->buffer == NULL) + ltsInitReadBuffer(lt); + + if (blocknum != lt->curBlockNumber) + { + ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer); + lt->curBlockNumber = blocknum; + lt->nbytes = TapeBlockPayloadSize; + lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next; + } + + if (offset > lt->nbytes) + elog(ERROR, "invalid tape seek position"); + lt->pos = offset; +} + +/* + * Obtain current position in a form suitable for a later LogicalTapeSeek. + * + * NOTE: it'd be OK to do this during write phase with intention of using + * the position for a seek after freezing. Not clear if anyone needs that. + */ +void +LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset) +{ + if (lt->buffer == NULL) + ltsInitReadBuffer(lt); + + Assert(lt->offsetBlockNumber == 0L); + + /* With a larger buffer, 'pos' wouldn't be the same as offset within page */ + Assert(lt->buffer_size == BLCKSZ); + + *blocknum = lt->curBlockNumber; + *offset = lt->pos; +} + +/* + * Obtain total disk space currently used by a LogicalTapeSet, in blocks. + * + * This should not be called while there are open write buffers; otherwise it + * may not account for buffered data. + */ +long +LogicalTapeSetBlocks(LogicalTapeSet *lts) +{ + return lts->nBlocksWritten - lts->nHoleBlocks; +} diff --git a/src/backend/utils/sort/qsort_interruptible.c b/src/backend/utils/sort/qsort_interruptible.c new file mode 100644 index 0000000..f179b25 --- /dev/null +++ b/src/backend/utils/sort/qsort_interruptible.c @@ -0,0 +1,16 @@ +/* + * qsort_interruptible.c: qsort_arg that includes CHECK_FOR_INTERRUPTS + */ + +#include "postgres.h" +#include "miscadmin.h" + +#define ST_SORT qsort_interruptible +#define ST_ELEMENT_TYPE_VOID +#define ST_COMPARATOR_TYPE_NAME qsort_arg_comparator +#define ST_COMPARE_RUNTIME_POINTER +#define ST_COMPARE_ARG_TYPE void +#define ST_SCOPE +#define ST_DEFINE +#define ST_CHECK_FOR_INTERRUPTS +#include "lib/sort_template.h" diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c new file mode 100644 index 0000000..464d4c5 --- /dev/null +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -0,0 +1,631 @@ +/*------------------------------------------------------------------------- + * + * sharedtuplestore.c + * Simple mechanism for sharing tuples between backends. + * + * This module contains a shared temporary tuple storage mechanism providing + * a parallel-aware subset of the features of tuplestore.c. Multiple backends + * can write to a SharedTuplestore, and then multiple backends can later scan + * the stored tuples. Currently, the only scan type supported is a parallel + * scan where each backend reads an arbitrary subset of the tuples that were + * written. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/utils/sort/sharedtuplestore.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup.h" +#include "access/htup_details.h" +#include "miscadmin.h" +#include "storage/buffile.h" +#include "storage/lwlock.h" +#include "storage/sharedfileset.h" +#include "utils/sharedtuplestore.h" + +/* + * The size of chunks, in pages. This is somewhat arbitrarily set to match + * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples + * at approximately the same rate as it allocates new chunks of memory to + * insert them into. + */ +#define STS_CHUNK_PAGES 4 +#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data) +#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE) + +/* Chunk written to disk. */ +typedef struct SharedTuplestoreChunk +{ + int ntuples; /* Number of tuples in this chunk. */ + int overflow; /* If overflow, how many including this one? */ + char data[FLEXIBLE_ARRAY_MEMBER]; +} SharedTuplestoreChunk; + +/* Per-participant shared state. */ +typedef struct SharedTuplestoreParticipant +{ + LWLock lock; + BlockNumber read_page; /* Page number for next read. */ + BlockNumber npages; /* Number of pages written. */ + bool writing; /* Used only for assertions. */ +} SharedTuplestoreParticipant; + +/* The control object that lives in shared memory. */ +struct SharedTuplestore +{ + int nparticipants; /* Number of participants that can write. */ + int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */ + size_t meta_data_size; /* Size of per-tuple header. */ + char name[NAMEDATALEN]; /* A name for this tuplestore. */ + + /* Followed by per-participant shared state. */ + SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* Per-participant state that lives in backend-local memory. */ +struct SharedTuplestoreAccessor +{ + int participant; /* My participant number. */ + SharedTuplestore *sts; /* The shared state. */ + SharedFileSet *fileset; /* The SharedFileSet holding files. */ + MemoryContext context; /* Memory context for buffers. */ + + /* State for reading. */ + int read_participant; /* The current participant to read from. */ + BufFile *read_file; /* The current file to read from. */ + int read_ntuples_available; /* The number of tuples in chunk. */ + int read_ntuples; /* How many tuples have we read from chunk? */ + size_t read_bytes; /* How many bytes have we read from chunk? */ + char *read_buffer; /* A buffer for loading tuples. */ + size_t read_buffer_size; + BlockNumber read_next_page; /* Lowest block we'll consider reading. */ + + /* State for writing. */ + SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */ + BufFile *write_file; /* The current file to write to. */ + BlockNumber write_page; /* The next page to write to. */ + char *write_pointer; /* Current write pointer within chunk. */ + char *write_end; /* One past the end of the current chunk. */ +}; + +static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, + int participant); + +/* + * Return the amount of shared memory required to hold SharedTuplestore for a + * given number of participants. + */ +size_t +sts_estimate(int participants) +{ + return offsetof(SharedTuplestore, participants) + + sizeof(SharedTuplestoreParticipant) * participants; +} + +/* + * Initialize a SharedTuplestore in existing shared memory. There must be + * space for sts_estimate(participants) bytes. If flags includes the value + * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more + * eagerly (but this isn't yet implemented). + * + * Tuples that are stored may optionally carry a piece of fixed sized + * meta-data which will be retrieved along with the tuple. This is useful for + * the hash values used in multi-batch hash joins, but could have other + * applications. + * + * The caller must supply a SharedFileSet, which is essentially a directory + * that will be cleaned up automatically, and a name which must be unique + * across all SharedTuplestores created in the same SharedFileSet. + */ +SharedTuplestoreAccessor * +sts_initialize(SharedTuplestore *sts, int participants, + int my_participant_number, + size_t meta_data_size, + int flags, + SharedFileSet *fileset, + const char *name) +{ + SharedTuplestoreAccessor *accessor; + int i; + + Assert(my_participant_number < participants); + + sts->nparticipants = participants; + sts->meta_data_size = meta_data_size; + sts->flags = flags; + + if (strlen(name) > sizeof(sts->name) - 1) + elog(ERROR, "SharedTuplestore name too long"); + strcpy(sts->name, name); + + /* + * Limit meta-data so it + tuple size always fits into a single chunk. + * sts_puttuple() and sts_read_tuple() could be made to support scenarios + * where that's not the case, but it's not currently required. If so, + * meta-data size probably should be made variable, too. + */ + if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE) + elog(ERROR, "meta-data too long"); + + for (i = 0; i < participants; ++i) + { + LWLockInitialize(&sts->participants[i].lock, + LWTRANCHE_SHARED_TUPLESTORE); + sts->participants[i].read_page = 0; + sts->participants[i].npages = 0; + sts->participants[i].writing = false; + } + + accessor = palloc0(sizeof(SharedTuplestoreAccessor)); + accessor->participant = my_participant_number; + accessor->sts = sts; + accessor->fileset = fileset; + accessor->context = CurrentMemoryContext; + + return accessor; +} + +/* + * Attach to a SharedTuplestore that has been initialized by another backend, + * so that this backend can read and write tuples. + */ +SharedTuplestoreAccessor * +sts_attach(SharedTuplestore *sts, + int my_participant_number, + SharedFileSet *fileset) +{ + SharedTuplestoreAccessor *accessor; + + Assert(my_participant_number < sts->nparticipants); + + accessor = palloc0(sizeof(SharedTuplestoreAccessor)); + accessor->participant = my_participant_number; + accessor->sts = sts; + accessor->fileset = fileset; + accessor->context = CurrentMemoryContext; + + return accessor; +} + +static void +sts_flush_chunk(SharedTuplestoreAccessor *accessor) +{ + size_t size; + + size = STS_CHUNK_PAGES * BLCKSZ; + BufFileWrite(accessor->write_file, accessor->write_chunk, size); + memset(accessor->write_chunk, 0, size); + accessor->write_pointer = &accessor->write_chunk->data[0]; + accessor->sts->participants[accessor->participant].npages += + STS_CHUNK_PAGES; +} + +/* + * Finish writing tuples. This must be called by all backends that have + * written data before any backend begins reading it. + */ +void +sts_end_write(SharedTuplestoreAccessor *accessor) +{ + if (accessor->write_file != NULL) + { + sts_flush_chunk(accessor); + BufFileClose(accessor->write_file); + pfree(accessor->write_chunk); + accessor->write_chunk = NULL; + accessor->write_file = NULL; + accessor->sts->participants[accessor->participant].writing = false; + } +} + +/* + * Prepare to rescan. Only one participant must call this. After it returns, + * all participants may call sts_begin_parallel_scan() and then loop over + * sts_parallel_scan_next(). This function must not be called concurrently + * with a scan, and synchronization to avoid that is the caller's + * responsibility. + */ +void +sts_reinitialize(SharedTuplestoreAccessor *accessor) +{ + int i; + + /* + * Reset the shared read head for all participants' files. Also set the + * initial chunk size to the minimum (any increases from that size will be + * recorded in chunk_expansion_log). + */ + for (i = 0; i < accessor->sts->nparticipants; ++i) + { + accessor->sts->participants[i].read_page = 0; + } +} + +/* + * Begin scanning the contents in parallel. + */ +void +sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor) +{ + int i PG_USED_FOR_ASSERTS_ONLY; + + /* End any existing scan that was in progress. */ + sts_end_parallel_scan(accessor); + + /* + * Any backend that might have written into this shared tuplestore must + * have called sts_end_write(), so that all buffers are flushed and the + * files have stopped growing. + */ + for (i = 0; i < accessor->sts->nparticipants; ++i) + Assert(!accessor->sts->participants[i].writing); + + /* + * We will start out reading the file that THIS backend wrote. There may + * be some caching locality advantage to that. + */ + accessor->read_participant = accessor->participant; + accessor->read_file = NULL; + accessor->read_next_page = 0; +} + +/* + * Finish a parallel scan, freeing associated backend-local resources. + */ +void +sts_end_parallel_scan(SharedTuplestoreAccessor *accessor) +{ + /* + * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but + * we'd probably need a reference count of current parallel scanners so we + * could safely do it only when the reference count reaches zero. + */ + if (accessor->read_file != NULL) + { + BufFileClose(accessor->read_file); + accessor->read_file = NULL; + } +} + +/* + * Write a tuple. If a meta-data size was provided to sts_initialize, then a + * pointer to meta data of that size must be provided. + */ +void +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, + MinimalTuple tuple) +{ + size_t size; + + /* Do we have our own file yet? */ + if (accessor->write_file == NULL) + { + SharedTuplestoreParticipant *participant; + char name[MAXPGPATH]; + + /* Create one. Only this backend will write into it. */ + sts_filename(name, accessor, accessor->participant); + accessor->write_file = + BufFileCreateFileSet(&accessor->fileset->fs, name); + + /* Set up the shared state for this backend's file. */ + participant = &accessor->sts->participants[accessor->participant]; + participant->writing = true; /* for assertions only */ + } + + /* Do we have space? */ + size = accessor->sts->meta_data_size + tuple->t_len; + if (accessor->write_pointer + size > accessor->write_end) + { + if (accessor->write_chunk == NULL) + { + /* First time through. Allocate chunk. */ + accessor->write_chunk = (SharedTuplestoreChunk *) + MemoryContextAllocZero(accessor->context, + STS_CHUNK_PAGES * BLCKSZ); + accessor->write_chunk->ntuples = 0; + accessor->write_pointer = &accessor->write_chunk->data[0]; + accessor->write_end = (char *) + accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ; + } + else + { + /* See if flushing helps. */ + sts_flush_chunk(accessor); + } + + /* It may still not be enough in the case of a gigantic tuple. */ + if (accessor->write_pointer + size > accessor->write_end) + { + size_t written; + + /* + * We'll write the beginning of the oversized tuple, and then + * write the rest in some number of 'overflow' chunks. + * + * sts_initialize() verifies that the size of the tuple + + * meta-data always fits into a chunk. Because the chunk has been + * flushed above, we can be sure to have all of a chunk's usable + * space available. + */ + Assert(accessor->write_pointer + accessor->sts->meta_data_size + + sizeof(uint32) < accessor->write_end); + + /* Write the meta-data as one chunk. */ + if (accessor->sts->meta_data_size > 0) + memcpy(accessor->write_pointer, meta_data, + accessor->sts->meta_data_size); + + /* + * Write as much of the tuple as we can fit. This includes the + * tuple's size at the start. + */ + written = accessor->write_end - accessor->write_pointer - + accessor->sts->meta_data_size; + memcpy(accessor->write_pointer + accessor->sts->meta_data_size, + tuple, written); + ++accessor->write_chunk->ntuples; + size -= accessor->sts->meta_data_size; + size -= written; + /* Now write as many overflow chunks as we need for the rest. */ + while (size > 0) + { + size_t written_this_chunk; + + sts_flush_chunk(accessor); + + /* + * How many overflow chunks to go? This will allow readers to + * skip all of them at once instead of reading each one. + */ + accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) / + STS_CHUNK_DATA_SIZE; + written_this_chunk = + Min(accessor->write_end - accessor->write_pointer, size); + memcpy(accessor->write_pointer, (char *) tuple + written, + written_this_chunk); + accessor->write_pointer += written_this_chunk; + size -= written_this_chunk; + written += written_this_chunk; + } + return; + } + } + + /* Copy meta-data and tuple into buffer. */ + if (accessor->sts->meta_data_size > 0) + memcpy(accessor->write_pointer, meta_data, + accessor->sts->meta_data_size); + memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple, + tuple->t_len); + accessor->write_pointer += size; + ++accessor->write_chunk->ntuples; +} + +static MinimalTuple +sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data) +{ + MinimalTuple tuple; + uint32 size; + size_t remaining_size; + size_t this_chunk_size; + char *destination; + + /* + * We'll keep track of bytes read from this chunk so that we can detect an + * overflowing tuple and switch to reading overflow pages. + */ + if (accessor->sts->meta_data_size > 0) + { + if (BufFileRead(accessor->read_file, + meta_data, + accessor->sts->meta_data_size) != + accessor->sts->meta_data_size) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from shared tuplestore temporary file"), + errdetail_internal("Short read while reading meta-data."))); + accessor->read_bytes += accessor->sts->meta_data_size; + } + if (BufFileRead(accessor->read_file, + &size, + sizeof(size)) != sizeof(size)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from shared tuplestore temporary file"), + errdetail_internal("Short read while reading size."))); + accessor->read_bytes += sizeof(size); + if (size > accessor->read_buffer_size) + { + size_t new_read_buffer_size; + + if (accessor->read_buffer != NULL) + pfree(accessor->read_buffer); + new_read_buffer_size = Max(size, accessor->read_buffer_size * 2); + accessor->read_buffer = + MemoryContextAlloc(accessor->context, new_read_buffer_size); + accessor->read_buffer_size = new_read_buffer_size; + } + remaining_size = size - sizeof(uint32); + this_chunk_size = Min(remaining_size, + BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes); + destination = accessor->read_buffer + sizeof(uint32); + if (BufFileRead(accessor->read_file, + destination, + this_chunk_size) != this_chunk_size) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from shared tuplestore temporary file"), + errdetail_internal("Short read while reading tuple."))); + accessor->read_bytes += this_chunk_size; + remaining_size -= this_chunk_size; + destination += this_chunk_size; + ++accessor->read_ntuples; + + /* Check if we need to read any overflow chunks. */ + while (remaining_size > 0) + { + /* We are now positioned at the start of an overflow chunk. */ + SharedTuplestoreChunk chunk_header; + + if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) != + STS_CHUNK_HEADER_SIZE) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from shared tuplestore temporary file"), + errdetail_internal("Short read while reading overflow chunk header."))); + accessor->read_bytes = STS_CHUNK_HEADER_SIZE; + if (chunk_header.overflow == 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected chunk in shared tuplestore temporary file"), + errdetail_internal("Expected overflow chunk."))); + accessor->read_next_page += STS_CHUNK_PAGES; + this_chunk_size = Min(remaining_size, + BLCKSZ * STS_CHUNK_PAGES - + STS_CHUNK_HEADER_SIZE); + if (BufFileRead(accessor->read_file, + destination, + this_chunk_size) != this_chunk_size) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from shared tuplestore temporary file"), + errdetail_internal("Short read while reading tuple."))); + accessor->read_bytes += this_chunk_size; + remaining_size -= this_chunk_size; + destination += this_chunk_size; + + /* + * These will be used to count regular tuples following the oversized + * tuple that spilled into this overflow chunk. + */ + accessor->read_ntuples = 0; + accessor->read_ntuples_available = chunk_header.ntuples; + } + + tuple = (MinimalTuple) accessor->read_buffer; + tuple->t_len = size; + + return tuple; +} + +/* + * Get the next tuple in the current parallel scan. + */ +MinimalTuple +sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) +{ + SharedTuplestoreParticipant *p; + BlockNumber read_page; + bool eof; + + for (;;) + { + /* Can we read more tuples from the current chunk? */ + if (accessor->read_ntuples < accessor->read_ntuples_available) + return sts_read_tuple(accessor, meta_data); + + /* Find the location of a new chunk to read. */ + p = &accessor->sts->participants[accessor->read_participant]; + + LWLockAcquire(&p->lock, LW_EXCLUSIVE); + /* We can skip directly past overflow pages we know about. */ + if (p->read_page < accessor->read_next_page) + p->read_page = accessor->read_next_page; + eof = p->read_page >= p->npages; + if (!eof) + { + /* Claim the next chunk. */ + read_page = p->read_page; + /* Advance the read head for the next reader. */ + p->read_page += STS_CHUNK_PAGES; + accessor->read_next_page = p->read_page; + } + LWLockRelease(&p->lock); + + if (!eof) + { + SharedTuplestoreChunk chunk_header; + size_t nread; + + /* Make sure we have the file open. */ + if (accessor->read_file == NULL) + { + char name[MAXPGPATH]; + + sts_filename(name, accessor, accessor->read_participant); + accessor->read_file = + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, + false); + } + + /* Seek and load the chunk header. */ + if (BufFileSeekBlock(accessor->read_file, read_page) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to block %u in shared tuplestore temporary file", + read_page))); + nread = BufFileRead(accessor->read_file, &chunk_header, + STS_CHUNK_HEADER_SIZE); + if (nread != STS_CHUNK_HEADER_SIZE) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes", + nread, STS_CHUNK_HEADER_SIZE))); + + /* + * If this is an overflow chunk, we skip it and any following + * overflow chunks all at once. + */ + if (chunk_header.overflow > 0) + { + accessor->read_next_page = read_page + + chunk_header.overflow * STS_CHUNK_PAGES; + continue; + } + + accessor->read_ntuples = 0; + accessor->read_ntuples_available = chunk_header.ntuples; + accessor->read_bytes = STS_CHUNK_HEADER_SIZE; + + /* Go around again, so we can get a tuple from this chunk. */ + } + else + { + if (accessor->read_file != NULL) + { + BufFileClose(accessor->read_file); + accessor->read_file = NULL; + } + + /* + * Try the next participant's file. If we've gone full circle, + * we're done. + */ + accessor->read_participant = (accessor->read_participant + 1) % + accessor->sts->nparticipants; + if (accessor->read_participant == accessor->participant) + break; + accessor->read_next_page = 0; + + /* Go around again, so we can get a chunk from this file. */ + } + } + + return NULL; +} + +/* + * Create the name used for the BufFile that a given participant will write. + */ +static void +sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant) +{ + snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant); +} diff --git a/src/backend/utils/sort/sortsupport.c b/src/backend/utils/sort/sortsupport.c new file mode 100644 index 0000000..b38bcd8 --- /dev/null +++ b/src/backend/utils/sort/sortsupport.c @@ -0,0 +1,211 @@ +/*------------------------------------------------------------------------- + * + * sortsupport.c + * Support routines for accelerated sorting. + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/utils/sort/sortsupport.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/gist.h" +#include "access/nbtree.h" +#include "catalog/pg_am.h" +#include "fmgr.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/sortsupport.h" + + +/* Info needed to use an old-style comparison function as a sort comparator */ +typedef struct +{ + FmgrInfo flinfo; /* lookup data for comparison function */ + FunctionCallInfoBaseData fcinfo; /* reusable callinfo structure */ +} SortShimExtra; + +#define SizeForSortShimExtra(nargs) (offsetof(SortShimExtra, fcinfo) + SizeForFunctionCallInfo(nargs)) + +/* + * Shim function for calling an old-style comparator + * + * This is essentially an inlined version of FunctionCall2Coll(), except + * we assume that the FunctionCallInfoBaseData was already mostly set up by + * PrepareSortSupportComparisonShim. + */ +static int +comparison_shim(Datum x, Datum y, SortSupport ssup) +{ + SortShimExtra *extra = (SortShimExtra *) ssup->ssup_extra; + Datum result; + + extra->fcinfo.args[0].value = x; + extra->fcinfo.args[1].value = y; + + /* just for paranoia's sake, we reset isnull each time */ + extra->fcinfo.isnull = false; + + result = FunctionCallInvoke(&extra->fcinfo); + + /* Check for null result, since caller is clearly not expecting one */ + if (extra->fcinfo.isnull) + elog(ERROR, "function %u returned NULL", extra->flinfo.fn_oid); + + return result; +} + +/* + * Set up a shim function to allow use of an old-style btree comparison + * function as if it were a sort support comparator. + */ +void +PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup) +{ + SortShimExtra *extra; + + extra = (SortShimExtra *) MemoryContextAlloc(ssup->ssup_cxt, + SizeForSortShimExtra(2)); + + /* Lookup the comparison function */ + fmgr_info_cxt(cmpFunc, &extra->flinfo, ssup->ssup_cxt); + + /* We can initialize the callinfo just once and re-use it */ + InitFunctionCallInfoData(extra->fcinfo, &extra->flinfo, 2, + ssup->ssup_collation, NULL, NULL); + extra->fcinfo.args[0].isnull = false; + extra->fcinfo.args[1].isnull = false; + + ssup->ssup_extra = extra; + ssup->comparator = comparison_shim; +} + +/* + * Look up and call sortsupport function to setup SortSupport comparator; + * or if no such function exists or it declines to set up the appropriate + * state, prepare a suitable shim. + */ +static void +FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup) +{ + Oid sortSupportFunction; + + /* Look for a sort support function */ + sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype, + BTSORTSUPPORT_PROC); + if (OidIsValid(sortSupportFunction)) + { + /* + * The sort support function can provide a comparator, but it can also + * choose not to so (e.g. based on the selected collation). + */ + OidFunctionCall1(sortSupportFunction, PointerGetDatum(ssup)); + } + + if (ssup->comparator == NULL) + { + Oid sortFunction; + + sortFunction = get_opfamily_proc(opfamily, opcintype, opcintype, + BTORDER_PROC); + + if (!OidIsValid(sortFunction)) + elog(ERROR, "missing support function %d(%u,%u) in opfamily %u", + BTORDER_PROC, opcintype, opcintype, opfamily); + + /* We'll use a shim to call the old-style btree comparator */ + PrepareSortSupportComparisonShim(sortFunction, ssup); + } +} + +/* + * Fill in SortSupport given an ordering operator (btree "<" or ">" operator). + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill + * in ssup_reverse as well as the comparator function pointer. + */ +void +PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup) +{ + Oid opfamily; + Oid opcintype; + int16 strategy; + + Assert(ssup->comparator == NULL); + + /* Find the operator in pg_amop */ + if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype, + &strategy)) + elog(ERROR, "operator %u is not a valid ordering operator", + orderingOp); + ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); + + FinishSortSupportFunction(opfamily, opcintype, ssup); +} + +/* + * Fill in SortSupport given an index relation, attribute, and strategy. + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This + * will fill in ssup_reverse (based on the supplied strategy), as well as the + * comparator function pointer. + */ +void +PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy, + SortSupport ssup) +{ + Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1]; + Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1]; + + Assert(ssup->comparator == NULL); + + if (indexRel->rd_rel->relam != BTREE_AM_OID) + elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam); + if (strategy != BTGreaterStrategyNumber && + strategy != BTLessStrategyNumber) + elog(ERROR, "unexpected sort support strategy: %d", strategy); + ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber); + + FinishSortSupportFunction(opfamily, opcintype, ssup); +} + +/* + * Fill in SortSupport given a GiST index relation + * + * Caller must previously have zeroed the SortSupportData structure and then + * filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This + * will fill in ssup_reverse (always false for GiST index build), as well as + * the comparator function pointer. + */ +void +PrepareSortSupportFromGistIndexRel(Relation indexRel, SortSupport ssup) +{ + Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1]; + Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1]; + Oid sortSupportFunction; + + Assert(ssup->comparator == NULL); + + if (indexRel->rd_rel->relam != GIST_AM_OID) + elog(ERROR, "unexpected non-gist AM: %u", indexRel->rd_rel->relam); + ssup->ssup_reverse = false; + + /* + * Look up the sort support function. This is simpler than for B-tree + * indexes because we don't support the old-style btree comparators. + */ + sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype, + GIST_SORTSUPPORT_PROC); + if (!OidIsValid(sortSupportFunction)) + elog(ERROR, "missing support function %d(%u,%u) in opfamily %u", + GIST_SORTSUPPORT_PROC, opcintype, opcintype, opfamily); + OidFunctionCall1(sortSupportFunction, PointerGetDatum(ssup)); +} diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c new file mode 100644 index 0000000..421afcf --- /dev/null +++ b/src/backend/utils/sort/tuplesort.c @@ -0,0 +1,4938 @@ +/*------------------------------------------------------------------------- + * + * tuplesort.c + * Generalized tuple sorting routines. + * + * This module handles sorting of heap tuples, index tuples, or single + * Datums (and could easily support other kinds of sortable objects, + * if necessary). It works efficiently for both small and large amounts + * of data. Small amounts are sorted in-memory using qsort(). Large + * amounts are sorted using temporary files and a standard external sort + * algorithm. + * + * See Knuth, volume 3, for more than you want to know about external + * sorting algorithms. The algorithm we use is a balanced k-way merge. + * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's + * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced + * merge is better. Knuth is assuming that tape drives are expensive + * beasts, and in particular that there will always be many more runs than + * tape drives. The polyphase merge algorithm was good at keeping all the + * tape drives busy, but in our implementation a "tape drive" doesn't cost + * much more than a few Kb of memory buffers, so we can afford to have + * lots of them. In particular, if we can have as many tape drives as + * sorted runs, we can eliminate any repeated I/O at all. + * + * Historically, we divided the input into sorted runs using replacement + * selection, in the form of a priority tree implemented as a heap + * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort + * for run generation. + * + * The approximate amount of memory allowed for any one sort operation + * is specified in kilobytes by the caller (most pass work_mem). Initially, + * we absorb tuples and simply store them in an unsorted array as long as + * we haven't exceeded workMem. If we reach the end of the input without + * exceeding workMem, we sort the array using qsort() and subsequently return + * tuples just by scanning the tuple array sequentially. If we do exceed + * workMem, we begin to emit tuples into sorted runs in temporary tapes. + * When tuples are dumped in batch after quicksorting, we begin a new run + * with a new output tape. If we reach the max number of tapes, we write + * subsequent runs on the existing tapes in a round-robin fashion. We will + * need multiple merge passes to finish the merge in that case. After the + * end of the input is reached, we dump out remaining tuples in memory into + * a final run, then merge the runs. + * + * When merging runs, we use a heap containing just the frontmost tuple from + * each source run; we repeatedly output the smallest tuple and replace it + * with the next tuple from its source tape (if any). When the heap empties, + * the merge is complete. The basic merge algorithm thus needs very little + * memory --- only M tuples for an M-way merge, and M is constrained to a + * small number. However, we can still make good use of our full workMem + * allocation by pre-reading additional blocks from each source tape. Without + * prereading, our access pattern to the temporary file would be very erratic; + * on average we'd read one block from each of M source tapes during the same + * time that we're writing M blocks to the output tape, so there is no + * sequentiality of access at all, defeating the read-ahead methods used by + * most Unix kernels. Worse, the output tape gets written into a very random + * sequence of blocks of the temp file, ensuring that things will be even + * worse when it comes time to read that tape. A straightforward merge pass + * thus ends up doing a lot of waiting for disk seeks. We can improve matters + * by prereading from each source tape sequentially, loading about workMem/M + * bytes from each tape in turn, and making the sequential blocks immediately + * available for reuse. This approach helps to localize both read and write + * accesses. The pre-reading is handled by logtape.c, we just tell it how + * much memory to use for the buffers. + * + * In the current code we determine the number of input tapes M on the basis + * of workMem: we want workMem/M to be large enough that we read a fair + * amount of data each time we read from a tape, so as to maintain the + * locality of access described above. Nonetheless, with large workMem we + * can have many tapes. The logical "tapes" are implemented by logtape.c, + * which avoids space wastage by recycling disk space as soon as each block + * is read from its "tape". + * + * When the caller requests random access to the sort result, we form + * the final sorted run on a logical tape which is then "frozen", so + * that we can access it randomly. When the caller does not need random + * access, we return from tuplesort_performsort() as soon as we are down + * to one run per logical tape. The final merge is then performed + * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this + * saves one cycle of writing all the data out to disk and reading it in. + * + * This module supports parallel sorting. Parallel sorts involve coordination + * among one or more worker processes, and a leader process, each with its own + * tuplesort state. The leader process (or, more accurately, the + * Tuplesortstate associated with a leader process) creates a full tapeset + * consisting of worker tapes with one run to merge; a run for every + * worker process. This is then merged. Worker processes are guaranteed to + * produce exactly one output run from their partial input. + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/utils/sort/tuplesort.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <limits.h> + +#include "access/hash.h" +#include "access/htup_details.h" +#include "access/nbtree.h" +#include "catalog/index.h" +#include "catalog/pg_am.h" +#include "commands/tablespace.h" +#include "executor/executor.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "utils/datum.h" +#include "utils/logtape.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_rusage.h" +#include "utils/rel.h" +#include "utils/sortsupport.h" +#include "utils/tuplesort.h" + + +/* sort-type codes for sort__start probes */ +#define HEAP_SORT 0 +#define INDEX_SORT 1 +#define DATUM_SORT 2 +#define CLUSTER_SORT 3 + +/* Sort parallel code from state for sort__start probes */ +#define PARALLEL_SORT(state) ((state)->shared == NULL ? 0 : \ + (state)->worker >= 0 ? 1 : 2) + +/* + * Initial size of memtuples array. We're trying to select this size so that + * array doesn't exceed ALLOCSET_SEPARATE_THRESHOLD and so that the overhead of + * allocation might possibly be lowered. However, we don't consider array sizes + * less than 1024. + * + */ +#define INITIAL_MEMTUPSIZE Max(1024, \ + ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1) + +/* GUC variables */ +#ifdef TRACE_SORT +bool trace_sort = false; +#endif + +#ifdef DEBUG_BOUNDED_SORT +bool optimize_bounded_sort = true; +#endif + + +/* + * The objects we actually sort are SortTuple structs. These contain + * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), + * which is a separate palloc chunk --- we assume it is just one chunk and + * can be freed by a simple pfree() (except during merge, when we use a + * simple slab allocator). SortTuples also contain the tuple's first key + * column in Datum/nullflag format, and a source/input tape number that + * tracks which tape each heap element/slot belongs to during merging. + * + * Storing the first key column lets us save heap_getattr or index_getattr + * calls during tuple comparisons. We could extract and save all the key + * columns not just the first, but this would increase code complexity and + * overhead, and wouldn't actually save any comparison cycles in the common + * case where the first key determines the comparison result. Note that + * for a pass-by-reference datatype, datum1 points into the "tuple" storage. + * + * There is one special case: when the sort support infrastructure provides an + * "abbreviated key" representation, where the key is (typically) a pass by + * value proxy for a pass by reference type. In this case, the abbreviated key + * is stored in datum1 in place of the actual first key column. + * + * When sorting single Datums, the data value is represented directly by + * datum1/isnull1 for pass by value types (or null values). If the datatype is + * pass-by-reference and isnull1 is false, then "tuple" points to a separately + * palloc'd data value, otherwise "tuple" is NULL. The value of datum1 is then + * either the same pointer as "tuple", or is an abbreviated key value as + * described above. Accordingly, "tuple" is always used in preference to + * datum1 as the authoritative value for pass-by-reference cases. + */ +typedef struct +{ + void *tuple; /* the tuple itself */ + Datum datum1; /* value of first key column */ + bool isnull1; /* is first key column NULL? */ + int srctape; /* source tape number */ +} SortTuple; + +/* + * During merge, we use a pre-allocated set of fixed-size slots to hold + * tuples. To avoid palloc/pfree overhead. + * + * Merge doesn't require a lot of memory, so we can afford to waste some, + * by using gratuitously-sized slots. If a tuple is larger than 1 kB, the + * palloc() overhead is not significant anymore. + * + * 'nextfree' is valid when this chunk is in the free list. When in use, the + * slot holds a tuple. + */ +#define SLAB_SLOT_SIZE 1024 + +typedef union SlabSlot +{ + union SlabSlot *nextfree; + char buffer[SLAB_SLOT_SIZE]; +} SlabSlot; + +/* + * Possible states of a Tuplesort object. These denote the states that + * persist between calls of Tuplesort routines. + */ +typedef enum +{ + TSS_INITIAL, /* Loading tuples; still within memory limit */ + TSS_BOUNDED, /* Loading tuples into bounded-size heap */ + TSS_BUILDRUNS, /* Loading tuples; writing to tape */ + TSS_SORTEDINMEM, /* Sort completed entirely in memory */ + TSS_SORTEDONTAPE, /* Sort completed, final run is on tape */ + TSS_FINALMERGE /* Performing final merge on-the-fly */ +} TupSortStatus; + +/* + * Parameters for calculation of number of tapes to use --- see inittapes() + * and tuplesort_merge_order(). + * + * In this calculation we assume that each tape will cost us about 1 blocks + * worth of buffer space. This ignores the overhead of all the other data + * structures needed for each tape, but it's probably close enough. + * + * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each + * input tape, for pre-reading (see discussion at top of file). This is *in + * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD. + */ +#define MINORDER 6 /* minimum merge order */ +#define MAXORDER 500 /* maximum merge order */ +#define TAPE_BUFFER_OVERHEAD BLCKSZ +#define MERGE_BUFFER_SIZE (BLCKSZ * 32) + +typedef int (*SortTupleComparator) (const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); + +/* + * Private state of a Tuplesort operation. + */ +struct Tuplesortstate +{ + TupSortStatus status; /* enumerated value as shown above */ + int nKeys; /* number of columns in sort key */ + int sortopt; /* Bitmask of flags used to setup sort */ + bool bounded; /* did caller specify a maximum number of + * tuples to return? */ + bool boundUsed; /* true if we made use of a bounded heap */ + int bound; /* if bounded, the maximum number of tuples */ + bool tuples; /* Can SortTuple.tuple ever be set? */ + int64 availMem; /* remaining memory available, in bytes */ + int64 allowedMem; /* total memory allowed, in bytes */ + int maxTapes; /* max number of input tapes to merge in each + * pass */ + int64 maxSpace; /* maximum amount of space occupied among sort + * of groups, either in-memory or on-disk */ + bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk + * space, false when it's value for in-memory + * space */ + TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */ + MemoryContext maincontext; /* memory context for tuple sort metadata that + * persists across multiple batches */ + MemoryContext sortcontext; /* memory context holding most sort data */ + MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ + LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ + + /* + * These function pointers decouple the routines that must know what kind + * of tuple we are sorting from the routines that don't need to know it. + * They are set up by the tuplesort_begin_xxx routines. + * + * Function to compare two tuples; result is per qsort() convention, ie: + * <0, 0, >0 according as a<b, a=b, a>b. The API must match + * qsort_arg_comparator. + */ + SortTupleComparator comparetup; + + /* + * Function to copy a supplied input tuple into palloc'd space and set up + * its SortTuple representation (ie, set tuple/datum1/isnull1). Also, + * state->availMem must be decreased by the amount of space used for the + * tuple copy (note the SortTuple struct itself is not counted). + */ + void (*copytup) (Tuplesortstate *state, SortTuple *stup, void *tup); + + /* + * Function to write a stored tuple onto tape. The representation of the + * tuple on tape need not be the same as it is in memory; requirements on + * the tape representation are given below. Unless the slab allocator is + * used, after writing the tuple, pfree() the out-of-line data (not the + * SortTuple struct!), and increase state->availMem by the amount of + * memory space thereby released. + */ + void (*writetup) (Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); + + /* + * Function to read a stored tuple from tape back into memory. 'len' is + * the already-read length of the stored tuple. The tuple is allocated + * from the slab memory arena, or is palloc'd, see readtup_alloc(). + */ + void (*readtup) (Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); + + /* + * Whether SortTuple's datum1 and isnull1 members are maintained by the + * above routines. If not, some sort specializations are disabled. + */ + bool haveDatum1; + + /* + * This array holds the tuples now in sort memory. If we are in state + * INITIAL, the tuples are in no particular order; if we are in state + * SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS + * and FINALMERGE, the tuples are organized in "heap" order per Algorithm + * H. In state SORTEDONTAPE, the array is not used. + */ + SortTuple *memtuples; /* array of SortTuple structs */ + int memtupcount; /* number of tuples currently present */ + int memtupsize; /* allocated length of memtuples array */ + bool growmemtuples; /* memtuples' growth still underway? */ + + /* + * Memory for tuples is sometimes allocated using a simple slab allocator, + * rather than with palloc(). Currently, we switch to slab allocation + * when we start merging. Merging only needs to keep a small, fixed + * number of tuples in memory at any time, so we can avoid the + * palloc/pfree overhead by recycling a fixed number of fixed-size slots + * to hold the tuples. + * + * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE + * slots. The allocation is sized to have one slot per tape, plus one + * additional slot. We need that many slots to hold all the tuples kept + * in the heap during merge, plus the one we have last returned from the + * sort, with tuplesort_gettuple. + * + * Initially, all the slots are kept in a linked list of free slots. When + * a tuple is read from a tape, it is put to the next available slot, if + * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd + * instead. + * + * When we're done processing a tuple, we return the slot back to the free + * list, or pfree() if it was palloc'd. We know that a tuple was + * allocated from the slab, if its pointer value is between + * slabMemoryBegin and -End. + * + * When the slab allocator is used, the USEMEM/LACKMEM mechanism of + * tracking memory usage is not used. + */ + bool slabAllocatorUsed; + + char *slabMemoryBegin; /* beginning of slab memory arena */ + char *slabMemoryEnd; /* end of slab memory arena */ + SlabSlot *slabFreeHead; /* head of free list */ + + /* Memory used for input and output tape buffers. */ + size_t tape_buffer_mem; + + /* + * When we return a tuple to the caller in tuplesort_gettuple_XXX, that + * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE + * modes), we remember the tuple in 'lastReturnedTuple', so that we can + * recycle the memory on next gettuple call. + */ + void *lastReturnedTuple; + + /* + * While building initial runs, this is the current output run number. + * Afterwards, it is the number of initial runs we made. + */ + int currentRun; + + /* + * Logical tapes, for merging. + * + * The initial runs are written in the output tapes. In each merge pass, + * the output tapes of the previous pass become the input tapes, and new + * output tapes are created as needed. When nInputTapes equals + * nInputRuns, there is only one merge pass left. + */ + LogicalTape **inputTapes; + int nInputTapes; + int nInputRuns; + + LogicalTape **outputTapes; + int nOutputTapes; + int nOutputRuns; + + LogicalTape *destTape; /* current output tape */ + + /* + * These variables are used after completion of sorting to keep track of + * the next tuple to return. (In the tape case, the tape's current read + * position is also critical state.) + */ + LogicalTape *result_tape; /* actual tape of finished output */ + int current; /* array index (only used if SORTEDINMEM) */ + bool eof_reached; /* reached EOF (needed for cursors) */ + + /* markpos_xxx holds marked position for mark and restore */ + long markpos_block; /* tape block# (only used if SORTEDONTAPE) */ + int markpos_offset; /* saved "current", or offset in tape block */ + bool markpos_eof; /* saved "eof_reached" */ + + /* + * These variables are used during parallel sorting. + * + * worker is our worker identifier. Follows the general convention that + * -1 value relates to a leader tuplesort, and values >= 0 worker + * tuplesorts. (-1 can also be a serial tuplesort.) + * + * shared is mutable shared memory state, which is used to coordinate + * parallel sorts. + * + * nParticipants is the number of worker Tuplesortstates known by the + * leader to have actually been launched, which implies that they must + * finish a run that the leader needs to merge. Typically includes a + * worker state held by the leader process itself. Set in the leader + * Tuplesortstate only. + */ + int worker; + Sharedsort *shared; + int nParticipants; + + /* + * The sortKeys variable is used by every case other than the hash index + * case; it is set by tuplesort_begin_xxx. tupDesc is only used by the + * MinimalTuple and CLUSTER routines, though. + */ + TupleDesc tupDesc; + SortSupport sortKeys; /* array of length nKeys */ + + /* + * This variable is shared by the single-key MinimalTuple case and the + * Datum case (which both use qsort_ssup()). Otherwise, it's NULL. The + * presence of a value in this field is also checked by various sort + * specialization functions as an optimization when comparing the leading + * key in a tiebreak situation to determine if there are any subsequent + * keys to sort on. + */ + SortSupport onlyKey; + + /* + * Additional state for managing "abbreviated key" sortsupport routines + * (which currently may be used by all cases except the hash index case). + * Tracks the intervals at which the optimization's effectiveness is + * tested. + */ + int64 abbrevNext; /* Tuple # at which to next check + * applicability */ + + /* + * These variables are specific to the CLUSTER case; they are set by + * tuplesort_begin_cluster. + */ + IndexInfo *indexInfo; /* info about index being used for reference */ + EState *estate; /* for evaluating index expressions */ + + /* + * These variables are specific to the IndexTuple case; they are set by + * tuplesort_begin_index_xxx and used only by the IndexTuple routines. + */ + Relation heapRel; /* table the index is being built on */ + Relation indexRel; /* index being built */ + + /* These are specific to the index_btree subcase: */ + bool enforceUnique; /* complain if we find duplicate tuples */ + bool uniqueNullsNotDistinct; /* unique constraint null treatment */ + + /* These are specific to the index_hash subcase: */ + uint32 high_mask; /* masks for sortable part of hash code */ + uint32 low_mask; + uint32 max_buckets; + + /* + * These variables are specific to the Datum case; they are set by + * tuplesort_begin_datum and used only by the DatumTuple routines. + */ + Oid datumType; + /* we need typelen in order to know how to copy the Datums. */ + int datumTypeLen; + + /* + * Resource snapshot for time of sort start. + */ +#ifdef TRACE_SORT + PGRUsage ru_start; +#endif +}; + +/* + * Private mutable state of tuplesort-parallel-operation. This is allocated + * in shared memory. + */ +struct Sharedsort +{ + /* mutex protects all fields prior to tapes */ + slock_t mutex; + + /* + * currentWorker generates ordinal identifier numbers for parallel sort + * workers. These start from 0, and are always gapless. + * + * Workers increment workersFinished to indicate having finished. If this + * is equal to state.nParticipants within the leader, leader is ready to + * merge worker runs. + */ + int currentWorker; + int workersFinished; + + /* Temporary file space */ + SharedFileSet fileset; + + /* Size of tapes flexible array */ + int nTapes; + + /* + * Tapes array used by workers to report back information needed by the + * leader to concatenate all worker tapes into one for merging + */ + TapeShare tapes[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * Is the given tuple allocated from the slab memory arena? + */ +#define IS_SLAB_SLOT(state, tuple) \ + ((char *) (tuple) >= (state)->slabMemoryBegin && \ + (char *) (tuple) < (state)->slabMemoryEnd) + +/* + * Return the given tuple to the slab memory free list, or free it + * if it was palloc'd. + */ +#define RELEASE_SLAB_SLOT(state, tuple) \ + do { \ + SlabSlot *buf = (SlabSlot *) tuple; \ + \ + if (IS_SLAB_SLOT((state), buf)) \ + { \ + buf->nextfree = (state)->slabFreeHead; \ + (state)->slabFreeHead = buf; \ + } else \ + pfree(buf); \ + } while(0) + +#define COMPARETUP(state,a,b) ((*(state)->comparetup) (a, b, state)) +#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) +#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) +#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) +#define LACKMEM(state) ((state)->availMem < 0 && !(state)->slabAllocatorUsed) +#define USEMEM(state,amt) ((state)->availMem -= (amt)) +#define FREEMEM(state,amt) ((state)->availMem += (amt)) +#define SERIAL(state) ((state)->shared == NULL) +#define WORKER(state) ((state)->shared && (state)->worker != -1) +#define LEADER(state) ((state)->shared && (state)->worker == -1) + +/* + * NOTES about on-tape representation of tuples: + * + * We require the first "unsigned int" of a stored tuple to be the total size + * on-tape of the tuple, including itself (so it is never zero; an all-zero + * unsigned int is used to delimit runs). The remainder of the stored tuple + * may or may not match the in-memory representation of the tuple --- + * any conversion needed is the job of the writetup and readtup routines. + * + * If state->sortopt contains TUPLESORT_RANDOMACCESS, then the stored + * representation of the tuple must be followed by another "unsigned int" that + * is a copy of the length --- so the total tape space used is actually + * sizeof(unsigned int) more than the stored length value. This allows + * read-backwards. When the random access flag was not specified, the + * write/read routines may omit the extra length word. + * + * writetup is expected to write both length words as well as the tuple + * data. When readtup is called, the tape is positioned just after the + * front length word; readtup must read the tuple data and advance past + * the back length word (if present). + * + * The write/read routines can make use of the tuple description data + * stored in the Tuplesortstate record, if needed. They are also expected + * to adjust state->availMem by the amount of memory space (not tape space!) + * released or consumed. There is no error return from either writetup + * or readtup; they should ereport() on failure. + * + * + * NOTES about memory consumption calculations: + * + * We count space allocated for tuples against the workMem limit, plus + * the space used by the variable-size memtuples array. Fixed-size space + * is not counted; it's small enough to not be interesting. + * + * Note that we count actual space used (as shown by GetMemoryChunkSpace) + * rather than the originally-requested size. This is important since + * palloc can add substantial overhead. It's not a complete answer since + * we won't count any wasted space in palloc allocation blocks, but it's + * a lot better than what we were doing before 7.3. As of 9.6, a + * separate memory context is used for caller passed tuples. Resetting + * it at certain key increments significantly ameliorates fragmentation. + * Note that this places a responsibility on copytup routines to use the + * correct memory context for these tuples (and to not use the reset + * context for anything whose lifetime needs to span multiple external + * sort runs). readtup routines use the slab allocator (they cannot use + * the reset context because it gets deleted at the point that merging + * begins). + */ + +/* When using this macro, beware of double evaluation of len */ +#define LogicalTapeReadExact(tape, ptr, len) \ + do { \ + if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \ + elog(ERROR, "unexpected end of data"); \ + } while(0) + + +static Tuplesortstate *tuplesort_begin_common(int workMem, + SortCoordinate coordinate, + int sortopt); +static void tuplesort_begin_batch(Tuplesortstate *state); +static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); +static bool consider_abort_common(Tuplesortstate *state); +static void inittapes(Tuplesortstate *state, bool mergeruns); +static void inittapestate(Tuplesortstate *state, int maxTapes); +static void selectnewtape(Tuplesortstate *state); +static void init_slab_allocator(Tuplesortstate *state, int numSlots); +static void mergeruns(Tuplesortstate *state); +static void mergeonerun(Tuplesortstate *state); +static void beginmerge(Tuplesortstate *state); +static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup); +static void dumptuples(Tuplesortstate *state, bool alltuples); +static void make_bounded_heap(Tuplesortstate *state); +static void sort_bounded_heap(Tuplesortstate *state); +static void tuplesort_sort_memtuples(Tuplesortstate *state); +static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); +static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); +static void tuplesort_heap_delete_top(Tuplesortstate *state); +static void reversedirection(Tuplesortstate *state); +static unsigned int getlen(LogicalTape *tape, bool eofOK); +static void markrunend(LogicalTape *tape); +static void *readtup_alloc(Tuplesortstate *state, Size tuplen); +static int comparetup_heap(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); +static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); +static void writetup_heap(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_heap(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); +static int comparetup_cluster(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); +static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); +static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); +static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); +static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); +static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); +static void writetup_index(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_index(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); +static int comparetup_datum(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); +static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); +static void writetup_datum(Tuplesortstate *state, LogicalTape *tape, + SortTuple *stup); +static void readtup_datum(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len); +static int worker_get_identifier(Tuplesortstate *state); +static void worker_freeze_result_tape(Tuplesortstate *state); +static void worker_nomergeruns(Tuplesortstate *state); +static void leader_takeover_tapes(Tuplesortstate *state); +static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup); +static void tuplesort_free(Tuplesortstate *state); +static void tuplesort_updatemax(Tuplesortstate *state); + +/* + * Specialized comparators that we can inline into specialized sorts. The goal + * is to try to sort two tuples without having to follow the pointers to the + * comparator or the tuple. + * + * XXX: For now, these fall back to comparator functions that will compare the + * leading datum a second time. + * + * XXX: For now, there is no specialization for cases where datum1 is + * authoritative and we don't even need to fall back to a callback at all (that + * would be true for types like int4/int8/timestamp/date, but not true for + * abbreviations of text or multi-key sorts. There could be! Is it worth it? + */ + +/* Used if first key's comparator is ssup_datum_unsigned_compare */ +static pg_attribute_always_inline int +qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) +{ + int compare; + + compare = ApplyUnsignedSortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + &state->sortKeys[0]); + if (compare != 0) + return compare; + + /* + * No need to waste effort calling the tiebreak function when there are no + * other keys to sort on. + */ + if (state->onlyKey != NULL) + return 0; + + return state->comparetup(a, b, state); +} + +#if SIZEOF_DATUM >= 8 +/* Used if first key's comparator is ssup_datum_signed_compare */ +static pg_attribute_always_inline int +qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) +{ + int compare; + + compare = ApplySignedSortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + &state->sortKeys[0]); + + if (compare != 0) + return compare; + + /* + * No need to waste effort calling the tiebreak function when there are no + * other keys to sort on. + */ + if (state->onlyKey != NULL) + return 0; + + return state->comparetup(a, b, state); +} +#endif + +/* Used if first key's comparator is ssup_datum_int32_compare */ +static pg_attribute_always_inline int +qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) +{ + int compare; + + compare = ApplyInt32SortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + &state->sortKeys[0]); + + if (compare != 0) + return compare; + + /* + * No need to waste effort calling the tiebreak function when there are no + * other keys to sort on. + */ + if (state->onlyKey != NULL) + return 0; + + return state->comparetup(a, b, state); +} + +/* + * Special versions of qsort just for SortTuple objects. qsort_tuple() sorts + * any variant of SortTuples, using the appropriate comparetup function. + * qsort_ssup() is specialized for the case where the comparetup function + * reduces to ApplySortComparator(), that is single-key MinimalTuple sorts + * and Datum sorts. qsort_tuple_{unsigned,signed,int32} are specialized for + * common comparison functions on pass-by-value leading datums. + */ + +#define ST_SORT qsort_tuple_unsigned +#define ST_ELEMENT_TYPE SortTuple +#define ST_COMPARE(a, b, state) qsort_tuple_unsigned_compare(a, b, state) +#define ST_COMPARE_ARG_TYPE Tuplesortstate +#define ST_CHECK_FOR_INTERRUPTS +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + +#if SIZEOF_DATUM >= 8 +#define ST_SORT qsort_tuple_signed +#define ST_ELEMENT_TYPE SortTuple +#define ST_COMPARE(a, b, state) qsort_tuple_signed_compare(a, b, state) +#define ST_COMPARE_ARG_TYPE Tuplesortstate +#define ST_CHECK_FOR_INTERRUPTS +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" +#endif + +#define ST_SORT qsort_tuple_int32 +#define ST_ELEMENT_TYPE SortTuple +#define ST_COMPARE(a, b, state) qsort_tuple_int32_compare(a, b, state) +#define ST_COMPARE_ARG_TYPE Tuplesortstate +#define ST_CHECK_FOR_INTERRUPTS +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + +#define ST_SORT qsort_tuple +#define ST_ELEMENT_TYPE SortTuple +#define ST_COMPARE_RUNTIME_POINTER +#define ST_COMPARE_ARG_TYPE Tuplesortstate +#define ST_CHECK_FOR_INTERRUPTS +#define ST_SCOPE static +#define ST_DECLARE +#define ST_DEFINE +#include "lib/sort_template.h" + +#define ST_SORT qsort_ssup +#define ST_ELEMENT_TYPE SortTuple +#define ST_COMPARE(a, b, ssup) \ + ApplySortComparator((a)->datum1, (a)->isnull1, \ + (b)->datum1, (b)->isnull1, (ssup)) +#define ST_COMPARE_ARG_TYPE SortSupportData +#define ST_CHECK_FOR_INTERRUPTS +#define ST_SCOPE static +#define ST_DEFINE +#include "lib/sort_template.h" + +/* + * tuplesort_begin_xxx + * + * Initialize for a tuple sort operation. + * + * After calling tuplesort_begin, the caller should call tuplesort_putXXX + * zero or more times, then call tuplesort_performsort when all the tuples + * have been supplied. After performsort, retrieve the tuples in sorted + * order by calling tuplesort_getXXX until it returns false/NULL. (If random + * access was requested, rescan, markpos, and restorepos can also be called.) + * Call tuplesort_end to terminate the operation and release memory/disk space. + * + * Each variant of tuplesort_begin has a workMem parameter specifying the + * maximum number of kilobytes of RAM to use before spilling data to disk. + * (The normal value of this parameter is work_mem, but some callers use + * other values.) Each variant also has a sortopt which is a bitmask of + * sort options. See TUPLESORT_* definitions in tuplesort.h + */ + +static Tuplesortstate * +tuplesort_begin_common(int workMem, SortCoordinate coordinate, int sortopt) +{ + Tuplesortstate *state; + MemoryContext maincontext; + MemoryContext sortcontext; + MemoryContext oldcontext; + + /* See leader_takeover_tapes() remarks on random access support */ + if (coordinate && (sortopt & TUPLESORT_RANDOMACCESS)) + elog(ERROR, "random access disallowed under parallel sort"); + + /* + * Memory context surviving tuplesort_reset. This memory context holds + * data which is useful to keep while sorting multiple similar batches. + */ + maincontext = AllocSetContextCreate(CurrentMemoryContext, + "TupleSort main", + ALLOCSET_DEFAULT_SIZES); + + /* + * Create a working memory context for one sort operation. The content of + * this context is deleted by tuplesort_reset. + */ + sortcontext = AllocSetContextCreate(maincontext, + "TupleSort sort", + ALLOCSET_DEFAULT_SIZES); + + /* + * Additionally a working memory context for tuples is setup in + * tuplesort_begin_batch. + */ + + /* + * Make the Tuplesortstate within the per-sortstate context. This way, we + * don't need a separate pfree() operation for it at shutdown. + */ + oldcontext = MemoryContextSwitchTo(maincontext); + + state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate)); + +#ifdef TRACE_SORT + if (trace_sort) + pg_rusage_init(&state->ru_start); +#endif + + state->sortopt = sortopt; + state->tuples = true; + + /* + * workMem is forced to be at least 64KB, the current minimum valid value + * for the work_mem GUC. This is a defense against parallel sort callers + * that divide out memory among many workers in a way that leaves each + * with very little memory. + */ + state->allowedMem = Max(workMem, 64) * (int64) 1024; + state->sortcontext = sortcontext; + state->maincontext = maincontext; + + /* + * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; + * see comments in grow_memtuples(). + */ + state->memtupsize = INITIAL_MEMTUPSIZE; + state->memtuples = NULL; + + /* + * After all of the other non-parallel-related state, we setup all of the + * state needed for each batch. + */ + tuplesort_begin_batch(state); + + /* + * Initialize parallel-related state based on coordination information + * from caller + */ + if (!coordinate) + { + /* Serial sort */ + state->shared = NULL; + state->worker = -1; + state->nParticipants = -1; + } + else if (coordinate->isWorker) + { + /* Parallel worker produces exactly one final run from all input */ + state->shared = coordinate->sharedsort; + state->worker = worker_get_identifier(state); + state->nParticipants = -1; + } + else + { + /* Parallel leader state only used for final merge */ + state->shared = coordinate->sharedsort; + state->worker = -1; + state->nParticipants = coordinate->nParticipants; + Assert(state->nParticipants >= 1); + } + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * tuplesort_begin_batch + * + * Setup, or reset, all state need for processing a new set of tuples with this + * sort state. Called both from tuplesort_begin_common (the first time sorting + * with this sort state) and tuplesort_reset (for subsequent usages). + */ +static void +tuplesort_begin_batch(Tuplesortstate *state) +{ + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->maincontext); + + /* + * Caller tuple (e.g. IndexTuple) memory context. + * + * A dedicated child context used exclusively for caller passed tuples + * eases memory management. Resetting at key points reduces + * fragmentation. Note that the memtuples array of SortTuples is allocated + * in the parent context, not this context, because there is no need to + * free memtuples early. For bounded sorts, tuples may be pfreed in any + * order, so we use a regular aset.c context so that it can make use of + * free'd memory. When the sort is not bounded, we make use of a + * generation.c context as this keeps allocations more compact with less + * wastage. Allocations are also slightly more CPU efficient. + */ + if (state->sortopt & TUPLESORT_ALLOWBOUNDED) + state->tuplecontext = AllocSetContextCreate(state->sortcontext, + "Caller tuples", + ALLOCSET_DEFAULT_SIZES); + else + state->tuplecontext = GenerationContextCreate(state->sortcontext, + "Caller tuples", + ALLOCSET_DEFAULT_SIZES); + + + state->status = TSS_INITIAL; + state->bounded = false; + state->boundUsed = false; + + state->availMem = state->allowedMem; + + state->tapeset = NULL; + + state->memtupcount = 0; + + /* + * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; + * see comments in grow_memtuples(). + */ + state->growmemtuples = true; + state->slabAllocatorUsed = false; + if (state->memtuples != NULL && state->memtupsize != INITIAL_MEMTUPSIZE) + { + pfree(state->memtuples); + state->memtuples = NULL; + state->memtupsize = INITIAL_MEMTUPSIZE; + } + if (state->memtuples == NULL) + { + state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + } + + /* workMem must be large enough for the minimal memtuples array */ + if (LACKMEM(state)) + elog(ERROR, "insufficient memory allowed for sort"); + + state->currentRun = 0; + + /* + * Tape variables (inputTapes, outputTapes, etc.) will be initialized by + * inittapes(), if needed. + */ + + state->result_tape = NULL; /* flag that result tape has not been formed */ + + MemoryContextSwitchTo(oldcontext); +} + +Tuplesortstate * +tuplesort_begin_heap(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, Oid *sortCollations, + bool *nullsFirstFlags, + int workMem, SortCoordinate coordinate, int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(state->maincontext); + + AssertArg(nkeys > 0); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", + nkeys, workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + state->nKeys = nkeys; + + TRACE_POSTGRESQL_SORT_START(HEAP_SORT, + false, /* no unique check */ + nkeys, + workMem, + sortopt & TUPLESORT_RANDOMACCESS, + PARALLEL_SORT(state)); + + state->comparetup = comparetup_heap; + state->copytup = copytup_heap; + state->writetup = writetup_heap; + state->readtup = readtup_heap; + state->haveDatum1 = true; + + state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + state->abbrevNext = 10; + + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData)); + + for (i = 0; i < nkeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + + AssertArg(attNums[i] != 0); + AssertArg(sortOperators[i] != 0); + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = sortCollations[i]; + sortKey->ssup_nulls_first = nullsFirstFlags[i]; + sortKey->ssup_attno = attNums[i]; + /* Convey if abbreviation optimization is applicable in principle */ + sortKey->abbreviate = (i == 0 && state->haveDatum1); + + PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); + } + + /* + * The "onlyKey" optimization cannot be used with abbreviated keys, since + * tie-breaker comparisons may be required. Typically, the optimization + * is only of value to pass-by-value types anyway, whereas abbreviated + * keys are typically only of value to pass-by-reference types. + */ + if (nkeys == 1 && !state->sortKeys->abbrev_converter) + state->onlyKey = state->sortKeys; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +Tuplesortstate * +tuplesort_begin_cluster(TupleDesc tupDesc, + Relation indexRel, + int workMem, + SortCoordinate coordinate, int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + BTScanInsert indexScanKey; + MemoryContext oldcontext; + int i; + + Assert(indexRel->rd_rel->relam == BTREE_AM_OID); + + oldcontext = MemoryContextSwitchTo(state->maincontext); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin tuple sort: nkeys = %d, workMem = %d, randomAccess = %c", + RelationGetNumberOfAttributes(indexRel), + workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); + + TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT, + false, /* no unique check */ + state->nKeys, + workMem, + sortopt & TUPLESORT_RANDOMACCESS, + PARALLEL_SORT(state)); + + state->comparetup = comparetup_cluster; + state->copytup = copytup_cluster; + state->writetup = writetup_cluster; + state->readtup = readtup_cluster; + state->abbrevNext = 10; + + state->indexInfo = BuildIndexInfo(indexRel); + + /* + * If we don't have a simple leading attribute, we don't currently + * initialize datum1, so disable optimizations that require it. + */ + if (state->indexInfo->ii_IndexAttrNumbers[0] == 0) + state->haveDatum1 = false; + else + state->haveDatum1 = true; + + state->tupDesc = tupDesc; /* assume we need not copy tupDesc */ + + indexScanKey = _bt_mkscankey(indexRel, NULL); + + if (state->indexInfo->ii_Expressions != NULL) + { + TupleTableSlot *slot; + ExprContext *econtext; + + /* + * We will need to use FormIndexDatum to evaluate the index + * expressions. To do that, we need an EState, as well as a + * TupleTableSlot to put the table tuples into. The econtext's + * scantuple has to point to that slot, too. + */ + state->estate = CreateExecutorState(); + slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsHeapTuple); + econtext = GetPerTupleExprContext(state->estate); + econtext->ecxt_scantuple = slot; + } + + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + ScanKey scanKey = indexScanKey->scankeys + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + /* Convey if abbreviation optimization is applicable in principle */ + sortKey->abbreviate = (i == 0 && state->haveDatum1); + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); + } + + pfree(indexScanKey); + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +Tuplesortstate * +tuplesort_begin_index_btree(Relation heapRel, + Relation indexRel, + bool enforceUnique, + bool uniqueNullsNotDistinct, + int workMem, + SortCoordinate coordinate, + int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + BTScanInsert indexScanKey; + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(state->maincontext); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin index sort: unique = %c, workMem = %d, randomAccess = %c", + enforceUnique ? 't' : 'f', + workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); + + TRACE_POSTGRESQL_SORT_START(INDEX_SORT, + enforceUnique, + state->nKeys, + workMem, + sortopt & TUPLESORT_RANDOMACCESS, + PARALLEL_SORT(state)); + + state->comparetup = comparetup_index_btree; + state->copytup = copytup_index; + state->writetup = writetup_index; + state->readtup = readtup_index; + state->abbrevNext = 10; + state->haveDatum1 = true; + + state->heapRel = heapRel; + state->indexRel = indexRel; + state->enforceUnique = enforceUnique; + state->uniqueNullsNotDistinct = uniqueNullsNotDistinct; + + indexScanKey = _bt_mkscankey(indexRel, NULL); + + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + ScanKey scanKey = indexScanKey->scankeys + i; + int16 strategy; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = scanKey->sk_collation; + sortKey->ssup_nulls_first = + (scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0; + sortKey->ssup_attno = scanKey->sk_attno; + /* Convey if abbreviation optimization is applicable in principle */ + sortKey->abbreviate = (i == 0 && state->haveDatum1); + + AssertState(sortKey->ssup_attno != 0); + + strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ? + BTGreaterStrategyNumber : BTLessStrategyNumber; + + PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey); + } + + pfree(indexScanKey); + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +Tuplesortstate * +tuplesort_begin_index_hash(Relation heapRel, + Relation indexRel, + uint32 high_mask, + uint32 low_mask, + uint32 max_buckets, + int workMem, + SortCoordinate coordinate, + int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(state->maincontext); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin index sort: high_mask = 0x%x, low_mask = 0x%x, " + "max_buckets = 0x%x, workMem = %d, randomAccess = %c", + high_mask, + low_mask, + max_buckets, + workMem, + sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + state->nKeys = 1; /* Only one sort column, the hash code */ + + state->comparetup = comparetup_index_hash; + state->copytup = copytup_index; + state->writetup = writetup_index; + state->readtup = readtup_index; + state->haveDatum1 = true; + + state->heapRel = heapRel; + state->indexRel = indexRel; + + state->high_mask = high_mask; + state->low_mask = low_mask; + state->max_buckets = max_buckets; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +Tuplesortstate * +tuplesort_begin_index_gist(Relation heapRel, + Relation indexRel, + int workMem, + SortCoordinate coordinate, + int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(state->sortcontext); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin index sort: workMem = %d, randomAccess = %c", + workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel); + + state->comparetup = comparetup_index_btree; + state->copytup = copytup_index; + state->writetup = writetup_index; + state->readtup = readtup_index; + state->haveDatum1 = true; + + state->heapRel = heapRel; + state->indexRel = indexRel; + + /* Prepare SortSupport data for each column */ + state->sortKeys = (SortSupport) palloc0(state->nKeys * + sizeof(SortSupportData)); + + for (i = 0; i < state->nKeys; i++) + { + SortSupport sortKey = state->sortKeys + i; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = indexRel->rd_indcollation[i]; + sortKey->ssup_nulls_first = false; + sortKey->ssup_attno = i + 1; + /* Convey if abbreviation optimization is applicable in principle */ + sortKey->abbreviate = (i == 0 && state->haveDatum1); + + AssertState(sortKey->ssup_attno != 0); + + /* Look for a sort support function */ + PrepareSortSupportFromGistIndexRel(indexRel, sortKey); + } + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +Tuplesortstate * +tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, + bool nullsFirstFlag, int workMem, + SortCoordinate coordinate, int sortopt) +{ + Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate, + sortopt); + MemoryContext oldcontext; + int16 typlen; + bool typbyval; + + oldcontext = MemoryContextSwitchTo(state->maincontext); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, + "begin datum sort: workMem = %d, randomAccess = %c", + workMem, sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f'); +#endif + + state->nKeys = 1; /* always a one-column sort */ + + TRACE_POSTGRESQL_SORT_START(DATUM_SORT, + false, /* no unique check */ + 1, + workMem, + sortopt & TUPLESORT_RANDOMACCESS, + PARALLEL_SORT(state)); + + state->comparetup = comparetup_datum; + state->copytup = copytup_datum; + state->writetup = writetup_datum; + state->readtup = readtup_datum; + state->abbrevNext = 10; + state->haveDatum1 = true; + + state->datumType = datumType; + + /* lookup necessary attributes of the datum type */ + get_typlenbyval(datumType, &typlen, &typbyval); + state->datumTypeLen = typlen; + state->tuples = !typbyval; + + /* Prepare SortSupport data */ + state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); + + state->sortKeys->ssup_cxt = CurrentMemoryContext; + state->sortKeys->ssup_collation = sortCollation; + state->sortKeys->ssup_nulls_first = nullsFirstFlag; + + /* + * Abbreviation is possible here only for by-reference types. In theory, + * a pass-by-value datatype could have an abbreviated form that is cheaper + * to compare. In a tuple sort, we could support that, because we can + * always extract the original datum from the tuple as needed. Here, we + * can't, because a datum sort only stores a single copy of the datum; the + * "tuple" field of each SortTuple is NULL. + */ + state->sortKeys->abbreviate = !typbyval; + + PrepareSortSupportFromOrderingOp(sortOperator, state->sortKeys); + + /* + * The "onlyKey" optimization cannot be used with abbreviated keys, since + * tie-breaker comparisons may be required. Typically, the optimization + * is only of value to pass-by-value types anyway, whereas abbreviated + * keys are typically only of value to pass-by-reference types. + */ + if (!state->sortKeys->abbrev_converter) + state->onlyKey = state->sortKeys; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * tuplesort_set_bound + * + * Advise tuplesort that at most the first N result tuples are required. + * + * Must be called before inserting any tuples. (Actually, we could allow it + * as long as the sort hasn't spilled to disk, but there seems no need for + * delayed calls at the moment.) + * + * This is a hint only. The tuplesort may still return more tuples than + * requested. Parallel leader tuplesorts will always ignore the hint. + */ +void +tuplesort_set_bound(Tuplesortstate *state, int64 bound) +{ + /* Assert we're called before loading any tuples */ + Assert(state->status == TSS_INITIAL && state->memtupcount == 0); + /* Assert we allow bounded sorts */ + Assert(state->sortopt & TUPLESORT_ALLOWBOUNDED); + /* Can't set the bound twice, either */ + Assert(!state->bounded); + /* Also, this shouldn't be called in a parallel worker */ + Assert(!WORKER(state)); + + /* Parallel leader allows but ignores hint */ + if (LEADER(state)) + return; + +#ifdef DEBUG_BOUNDED_SORT + /* Honor GUC setting that disables the feature (for easy testing) */ + if (!optimize_bounded_sort) + return; +#endif + + /* We want to be able to compute bound * 2, so limit the setting */ + if (bound > (int64) (INT_MAX / 2)) + return; + + state->bounded = true; + state->bound = (int) bound; + + /* + * Bounded sorts are not an effective target for abbreviated key + * optimization. Disable by setting state to be consistent with no + * abbreviation support. + */ + state->sortKeys->abbrev_converter = NULL; + if (state->sortKeys->abbrev_full_comparator) + state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; + + /* Not strictly necessary, but be tidy */ + state->sortKeys->abbrev_abort = NULL; + state->sortKeys->abbrev_full_comparator = NULL; +} + +/* + * tuplesort_used_bound + * + * Allow callers to find out if the sort state was able to use a bound. + */ +bool +tuplesort_used_bound(Tuplesortstate *state) +{ + return state->boundUsed; +} + +/* + * tuplesort_free + * + * Internal routine for freeing resources of tuplesort. + */ +static void +tuplesort_free(Tuplesortstate *state) +{ + /* context swap probably not needed, but let's be safe */ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + +#ifdef TRACE_SORT + long spaceUsed; + + if (state->tapeset) + spaceUsed = LogicalTapeSetBlocks(state->tapeset); + else + spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024; +#endif + + /* + * Delete temporary "tape" files, if any. + * + * Note: want to include this in reported total cost of sort, hence need + * for two #ifdef TRACE_SORT sections. + * + * We don't bother to destroy the individual tapes here. They will go away + * with the sortcontext. (In TSS_FINALMERGE state, we have closed + * finished tapes already.) + */ + if (state->tapeset) + LogicalTapeSetClose(state->tapeset); + +#ifdef TRACE_SORT + if (trace_sort) + { + if (state->tapeset) + elog(LOG, "%s of worker %d ended, %ld disk blocks used: %s", + SERIAL(state) ? "external sort" : "parallel external sort", + state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); + else + elog(LOG, "%s of worker %d ended, %ld KB used: %s", + SERIAL(state) ? "internal sort" : "unperformed parallel sort", + state->worker, spaceUsed, pg_rusage_show(&state->ru_start)); + } + + TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, spaceUsed); +#else + + /* + * If you disabled TRACE_SORT, you can still probe sort__done, but you + * ain't getting space-used stats. + */ + TRACE_POSTGRESQL_SORT_DONE(state->tapeset != NULL, 0L); +#endif + + /* Free any execution state created for CLUSTER case */ + if (state->estate != NULL) + { + ExprContext *econtext = GetPerTupleExprContext(state->estate); + + ExecDropSingleTupleTableSlot(econtext->ecxt_scantuple); + FreeExecutorState(state->estate); + } + + MemoryContextSwitchTo(oldcontext); + + /* + * Free the per-sort memory context, thereby releasing all working memory. + */ + MemoryContextReset(state->sortcontext); +} + +/* + * tuplesort_end + * + * Release resources and clean up. + * + * NOTE: after calling this, any pointers returned by tuplesort_getXXX are + * pointing to garbage. Be careful not to attempt to use or free such + * pointers afterwards! + */ +void +tuplesort_end(Tuplesortstate *state) +{ + tuplesort_free(state); + + /* + * Free the main memory context, including the Tuplesortstate struct + * itself. + */ + MemoryContextDelete(state->maincontext); +} + +/* + * tuplesort_updatemax + * + * Update maximum resource usage statistics. + */ +static void +tuplesort_updatemax(Tuplesortstate *state) +{ + int64 spaceUsed; + bool isSpaceDisk; + + /* + * Note: it might seem we should provide both memory and disk usage for a + * disk-based sort. However, the current code doesn't track memory space + * accurately once we have begun to return tuples to the caller (since we + * don't account for pfree's the caller is expected to do), so we cannot + * rely on availMem in a disk sort. This does not seem worth the overhead + * to fix. Is it worth creating an API for the memory context code to + * tell us how much is actually used in sortcontext? + */ + if (state->tapeset) + { + isSpaceDisk = true; + spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ; + } + else + { + isSpaceDisk = false; + spaceUsed = state->allowedMem - state->availMem; + } + + /* + * Sort evicts data to the disk when it wasn't able to fit that data into + * main memory. This is why we assume space used on the disk to be more + * important for tracking resource usage than space used in memory. Note + * that the amount of space occupied by some tupleset on the disk might be + * less than amount of space occupied by the same tupleset in memory due + * to more compact representation. + */ + if ((isSpaceDisk && !state->isMaxSpaceDisk) || + (isSpaceDisk == state->isMaxSpaceDisk && spaceUsed > state->maxSpace)) + { + state->maxSpace = spaceUsed; + state->isMaxSpaceDisk = isSpaceDisk; + state->maxSpaceStatus = state->status; + } +} + +/* + * tuplesort_reset + * + * Reset the tuplesort. Reset all the data in the tuplesort, but leave the + * meta-information in. After tuplesort_reset, tuplesort is ready to start + * a new sort. This allows avoiding recreation of tuple sort states (and + * save resources) when sorting multiple small batches. + */ +void +tuplesort_reset(Tuplesortstate *state) +{ + tuplesort_updatemax(state); + tuplesort_free(state); + + /* + * After we've freed up per-batch memory, re-setup all of the state common + * to both the first batch and any subsequent batch. + */ + tuplesort_begin_batch(state); + + state->lastReturnedTuple = NULL; + state->slabMemoryBegin = NULL; + state->slabMemoryEnd = NULL; + state->slabFreeHead = NULL; +} + +/* + * Grow the memtuples[] array, if possible within our memory constraint. We + * must not exceed INT_MAX tuples in memory or the caller-provided memory + * limit. Return true if we were able to enlarge the array, false if not. + * + * Normally, at each increment we double the size of the array. When doing + * that would exceed a limit, we attempt one last, smaller increase (and then + * clear the growmemtuples flag so we don't try any more). That allows us to + * use memory as fully as permitted; sticking to the pure doubling rule could + * result in almost half going unused. Because availMem moves around with + * tuple addition/removal, we need some rule to prevent making repeated small + * increases in memtupsize, which would just be useless thrashing. The + * growmemtuples flag accomplishes that and also prevents useless + * recalculations in this function. + */ +static bool +grow_memtuples(Tuplesortstate *state) +{ + int newmemtupsize; + int memtupsize = state->memtupsize; + int64 memNowUsed = state->allowedMem - state->availMem; + + /* Forget it if we've already maxed out memtuples, per comment above */ + if (!state->growmemtuples) + return false; + + /* Select new value of memtupsize */ + if (memNowUsed <= state->availMem) + { + /* + * We've used no more than half of allowedMem; double our usage, + * clamping at INT_MAX tuples. + */ + if (memtupsize < INT_MAX / 2) + newmemtupsize = memtupsize * 2; + else + { + newmemtupsize = INT_MAX; + state->growmemtuples = false; + } + } + else + { + /* + * This will be the last increment of memtupsize. Abandon doubling + * strategy and instead increase as much as we safely can. + * + * To stay within allowedMem, we can't increase memtupsize by more + * than availMem / sizeof(SortTuple) elements. In practice, we want + * to increase it by considerably less, because we need to leave some + * space for the tuples to which the new array slots will refer. We + * assume the new tuples will be about the same size as the tuples + * we've already seen, and thus we can extrapolate from the space + * consumption so far to estimate an appropriate new size for the + * memtuples array. The optimal value might be higher or lower than + * this estimate, but it's hard to know that in advance. We again + * clamp at INT_MAX tuples. + * + * This calculation is safe against enlarging the array so much that + * LACKMEM becomes true, because the memory currently used includes + * the present array; thus, there would be enough allowedMem for the + * new array elements even if no other memory were currently used. + * + * We do the arithmetic in float8, because otherwise the product of + * memtupsize and allowedMem could overflow. Any inaccuracy in the + * result should be insignificant; but even if we computed a + * completely insane result, the checks below will prevent anything + * really bad from happening. + */ + double grow_ratio; + + grow_ratio = (double) state->allowedMem / (double) memNowUsed; + if (memtupsize * grow_ratio < INT_MAX) + newmemtupsize = (int) (memtupsize * grow_ratio); + else + newmemtupsize = INT_MAX; + + /* We won't make any further enlargement attempts */ + state->growmemtuples = false; + } + + /* Must enlarge array by at least one element, else report failure */ + if (newmemtupsize <= memtupsize) + goto noalloc; + + /* + * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp + * to ensure our request won't be rejected. Note that we can easily + * exhaust address space before facing this outcome. (This is presently + * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but + * don't rely on that at this distance.) + */ + if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(SortTuple)) + { + newmemtupsize = (int) (MaxAllocHugeSize / sizeof(SortTuple)); + state->growmemtuples = false; /* can't grow any more */ + } + + /* + * We need to be sure that we do not cause LACKMEM to become true, else + * the space management algorithm will go nuts. The code above should + * never generate a dangerous request, but to be safe, check explicitly + * that the array growth fits within availMem. (We could still cause + * LACKMEM if the memory chunk overhead associated with the memtuples + * array were to increase. That shouldn't happen because we chose the + * initial array size large enough to ensure that palloc will be treating + * both old and new arrays as separate chunks. But we'll check LACKMEM + * explicitly below just in case.) + */ + if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(SortTuple))) + goto noalloc; + + /* OK, do it */ + FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); + state->memtupsize = newmemtupsize; + state->memtuples = (SortTuple *) + repalloc_huge(state->memtuples, + state->memtupsize * sizeof(SortTuple)); + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + if (LACKMEM(state)) + elog(ERROR, "unexpected out-of-memory situation in tuplesort"); + return true; + +noalloc: + /* If for any reason we didn't realloc, shut off future attempts */ + state->growmemtuples = false; + return false; +} + +/* + * Accept one tuple while collecting input data for sort. + * + * Note that the input data is always copied; the caller need not save it. + */ +void +tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + SortTuple stup; + + /* + * Copy the given tuple into memory we control, and decrease availMem. + * Then call the common code. + */ + COPYTUP(state, &stup, (void *) slot); + + puttuple_common(state, &stup); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Accept one tuple while collecting input data for sort. + * + * Note that the input data is always copied; the caller need not save it. + */ +void +tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + SortTuple stup; + + /* + * Copy the given tuple into memory we control, and decrease availMem. + * Then call the common code. + */ + COPYTUP(state, &stup, (void *) tup); + + puttuple_common(state, &stup); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Collect one index tuple while collecting input data for sort, building + * it from caller-supplied values. + */ +void +tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, + ItemPointer self, Datum *values, + bool *isnull) +{ + MemoryContext oldcontext; + SortTuple stup; + Datum original; + IndexTuple tuple; + + stup.tuple = index_form_tuple_context(RelationGetDescr(rel), values, + isnull, state->tuplecontext); + tuple = ((IndexTuple) stup.tuple); + tuple->t_tid = *self; + USEMEM(state, GetMemoryChunkSpace(stup.tuple)); + /* set up first-column key value */ + original = index_getattr(tuple, + 1, + RelationGetDescr(state->indexRel), + &stup.isnull1); + + oldcontext = MemoryContextSwitchTo(state->sortcontext); + + if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) + { + /* + * Store ordinary Datum representation, or NULL value. If there is a + * converter it won't expect NULL values, and cost model is not + * required to account for NULL, so in that case we avoid calling + * converter and just set datum1 to zeroed representation (to be + * consistent, and to support cheap inequality tests for NULL + * abbreviated keys). + */ + stup.datum1 = original; + } + else if (!consider_abort_common(state)) + { + /* Store abbreviated key representation */ + stup.datum1 = state->sortKeys->abbrev_converter(original, + state->sortKeys); + } + else + { + /* Abort abbreviation */ + int i; + + stup.datum1 = original; + + /* + * Set state to be consistent with never trying abbreviation. + * + * Alter datum1 representation in already-copied tuples, so as to + * ensure a consistent representation (current tuple was just + * handled). It does not matter if some dumped tuples are already + * sorted on tape, since serialized tuples lack abbreviated keys + * (TSS_BUILDRUNS state prevents control reaching here in any case). + */ + for (i = 0; i < state->memtupcount; i++) + { + SortTuple *mtup = &state->memtuples[i]; + + tuple = mtup->tuple; + mtup->datum1 = index_getattr(tuple, + 1, + RelationGetDescr(state->indexRel), + &mtup->isnull1); + } + } + + puttuple_common(state, &stup); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Accept one Datum while collecting input data for sort. + * + * If the Datum is pass-by-ref type, the value will be copied. + */ +void +tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); + SortTuple stup; + + /* + * Pass-by-value types or null values are just stored directly in + * stup.datum1 (and stup.tuple is not used and set to NULL). + * + * Non-null pass-by-reference values need to be copied into memory we + * control, and possibly abbreviated. The copied value is pointed to by + * stup.tuple and is treated as the canonical copy (e.g. to return via + * tuplesort_getdatum or when writing to tape); stup.datum1 gets the + * abbreviated value if abbreviation is happening, otherwise it's + * identical to stup.tuple. + */ + + if (isNull || !state->tuples) + { + /* + * Set datum1 to zeroed representation for NULLs (to be consistent, + * and to support cheap inequality tests for NULL abbreviated keys). + */ + stup.datum1 = !isNull ? val : (Datum) 0; + stup.isnull1 = isNull; + stup.tuple = NULL; /* no separate storage */ + MemoryContextSwitchTo(state->sortcontext); + } + else + { + Datum original = datumCopy(val, false, state->datumTypeLen); + + stup.isnull1 = false; + stup.tuple = DatumGetPointer(original); + USEMEM(state, GetMemoryChunkSpace(stup.tuple)); + MemoryContextSwitchTo(state->sortcontext); + + if (!state->sortKeys->abbrev_converter) + { + stup.datum1 = original; + } + else if (!consider_abort_common(state)) + { + /* Store abbreviated key representation */ + stup.datum1 = state->sortKeys->abbrev_converter(original, + state->sortKeys); + } + else + { + /* Abort abbreviation */ + int i; + + stup.datum1 = original; + + /* + * Set state to be consistent with never trying abbreviation. + * + * Alter datum1 representation in already-copied tuples, so as to + * ensure a consistent representation (current tuple was just + * handled). It does not matter if some dumped tuples are already + * sorted on tape, since serialized tuples lack abbreviated keys + * (TSS_BUILDRUNS state prevents control reaching here in any + * case). + */ + for (i = 0; i < state->memtupcount; i++) + { + SortTuple *mtup = &state->memtuples[i]; + + mtup->datum1 = PointerGetDatum(mtup->tuple); + } + } + } + + puttuple_common(state, &stup); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Shared code for tuple and datum cases. + */ +static void +puttuple_common(Tuplesortstate *state, SortTuple *tuple) +{ + Assert(!LEADER(state)); + + switch (state->status) + { + case TSS_INITIAL: + + /* + * Save the tuple into the unsorted array. First, grow the array + * as needed. Note that we try to grow the array when there is + * still one free slot remaining --- if we fail, there'll still be + * room to store the incoming tuple, and then we'll switch to + * tape-based operation. + */ + if (state->memtupcount >= state->memtupsize - 1) + { + (void) grow_memtuples(state); + Assert(state->memtupcount < state->memtupsize); + } + state->memtuples[state->memtupcount++] = *tuple; + + /* + * Check if it's time to switch over to a bounded heapsort. We do + * so if the input tuple count exceeds twice the desired tuple + * count (this is a heuristic for where heapsort becomes cheaper + * than a quicksort), or if we've just filled workMem and have + * enough tuples to meet the bound. + * + * Note that once we enter TSS_BOUNDED state we will always try to + * complete the sort that way. In the worst case, if later input + * tuples are larger than earlier ones, this might cause us to + * exceed workMem significantly. + */ + if (state->bounded && + (state->memtupcount > state->bound * 2 || + (state->memtupcount > state->bound && LACKMEM(state)))) + { +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "switching to bounded heapsort at %d tuples: %s", + state->memtupcount, + pg_rusage_show(&state->ru_start)); +#endif + make_bounded_heap(state); + return; + } + + /* + * Done if we still fit in available memory and have array slots. + */ + if (state->memtupcount < state->memtupsize && !LACKMEM(state)) + return; + + /* + * Nope; time to switch to tape-based operation. + */ + inittapes(state, true); + + /* + * Dump all tuples. + */ + dumptuples(state, false); + break; + + case TSS_BOUNDED: + + /* + * We don't want to grow the array here, so check whether the new + * tuple can be discarded before putting it in. This should be a + * good speed optimization, too, since when there are many more + * input tuples than the bound, most input tuples can be discarded + * with just this one comparison. Note that because we currently + * have the sort direction reversed, we must check for <= not >=. + */ + if (COMPARETUP(state, tuple, &state->memtuples[0]) <= 0) + { + /* new tuple <= top of the heap, so we can discard it */ + free_sort_tuple(state, tuple); + CHECK_FOR_INTERRUPTS(); + } + else + { + /* discard top of heap, replacing it with the new tuple */ + free_sort_tuple(state, &state->memtuples[0]); + tuplesort_heap_replace_top(state, tuple); + } + break; + + case TSS_BUILDRUNS: + + /* + * Save the tuple into the unsorted array (there must be space) + */ + state->memtuples[state->memtupcount++] = *tuple; + + /* + * If we are over the memory limit, dump all tuples. + */ + dumptuples(state, false); + break; + + default: + elog(ERROR, "invalid tuplesort state"); + break; + } +} + +static bool +consider_abort_common(Tuplesortstate *state) +{ + Assert(state->sortKeys[0].abbrev_converter != NULL); + Assert(state->sortKeys[0].abbrev_abort != NULL); + Assert(state->sortKeys[0].abbrev_full_comparator != NULL); + + /* + * Check effectiveness of abbreviation optimization. Consider aborting + * when still within memory limit. + */ + if (state->status == TSS_INITIAL && + state->memtupcount >= state->abbrevNext) + { + state->abbrevNext *= 2; + + /* + * Check opclass-supplied abbreviation abort routine. It may indicate + * that abbreviation should not proceed. + */ + if (!state->sortKeys->abbrev_abort(state->memtupcount, + state->sortKeys)) + return false; + + /* + * Finally, restore authoritative comparator, and indicate that + * abbreviation is not in play by setting abbrev_converter to NULL + */ + state->sortKeys[0].comparator = state->sortKeys[0].abbrev_full_comparator; + state->sortKeys[0].abbrev_converter = NULL; + /* Not strictly necessary, but be tidy */ + state->sortKeys[0].abbrev_abort = NULL; + state->sortKeys[0].abbrev_full_comparator = NULL; + + /* Give up - expect original pass-by-value representation */ + return true; + } + + return false; +} + +/* + * All tuples have been provided; finish the sort. + */ +void +tuplesort_performsort(Tuplesortstate *state) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "performsort of worker %d starting: %s", + state->worker, pg_rusage_show(&state->ru_start)); +#endif + + switch (state->status) + { + case TSS_INITIAL: + + /* + * We were able to accumulate all the tuples within the allowed + * amount of memory, or leader to take over worker tapes + */ + if (SERIAL(state)) + { + /* Just qsort 'em and we're done */ + tuplesort_sort_memtuples(state); + state->status = TSS_SORTEDINMEM; + } + else if (WORKER(state)) + { + /* + * Parallel workers must still dump out tuples to tape. No + * merge is required to produce single output run, though. + */ + inittapes(state, false); + dumptuples(state, true); + worker_nomergeruns(state); + state->status = TSS_SORTEDONTAPE; + } + else + { + /* + * Leader will take over worker tapes and merge worker runs. + * Note that mergeruns sets the correct state->status. + */ + leader_takeover_tapes(state); + mergeruns(state); + } + state->current = 0; + state->eof_reached = false; + state->markpos_block = 0L; + state->markpos_offset = 0; + state->markpos_eof = false; + break; + + case TSS_BOUNDED: + + /* + * We were able to accumulate all the tuples required for output + * in memory, using a heap to eliminate excess tuples. Now we + * have to transform the heap to a properly-sorted array. + */ + sort_bounded_heap(state); + state->current = 0; + state->eof_reached = false; + state->markpos_offset = 0; + state->markpos_eof = false; + state->status = TSS_SORTEDINMEM; + break; + + case TSS_BUILDRUNS: + + /* + * Finish tape-based sort. First, flush all tuples remaining in + * memory out to tape; then merge until we have a single remaining + * run (or, if !randomAccess and !WORKER(), one run per tape). + * Note that mergeruns sets the correct state->status. + */ + dumptuples(state, true); + mergeruns(state); + state->eof_reached = false; + state->markpos_block = 0L; + state->markpos_offset = 0; + state->markpos_eof = false; + break; + + default: + elog(ERROR, "invalid tuplesort state"); + break; + } + +#ifdef TRACE_SORT + if (trace_sort) + { + if (state->status == TSS_FINALMERGE) + elog(LOG, "performsort of worker %d done (except %d-way final merge): %s", + state->worker, state->nInputTapes, + pg_rusage_show(&state->ru_start)); + else + elog(LOG, "performsort of worker %d done: %s", + state->worker, pg_rusage_show(&state->ru_start)); + } +#endif + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Internal routine to fetch the next tuple in either forward or back + * direction into *stup. Returns false if no more tuples. + * Returned tuple belongs to tuplesort memory context, and must not be freed + * by caller. Note that fetched tuple is stored in memory that may be + * recycled by any future fetch. + */ +static bool +tuplesort_gettuple_common(Tuplesortstate *state, bool forward, + SortTuple *stup) +{ + unsigned int tuplen; + size_t nmoved; + + Assert(!WORKER(state)); + + switch (state->status) + { + case TSS_SORTEDINMEM: + Assert(forward || state->sortopt & TUPLESORT_RANDOMACCESS); + Assert(!state->slabAllocatorUsed); + if (forward) + { + if (state->current < state->memtupcount) + { + *stup = state->memtuples[state->current++]; + return true; + } + state->eof_reached = true; + + /* + * Complain if caller tries to retrieve more tuples than + * originally asked for in a bounded sort. This is because + * returning EOF here might be the wrong thing. + */ + if (state->bounded && state->current >= state->bound) + elog(ERROR, "retrieved too many tuples in a bounded sort"); + + return false; + } + else + { + if (state->current <= 0) + return false; + + /* + * if all tuples are fetched already then we return last + * tuple, else - tuple before last returned. + */ + if (state->eof_reached) + state->eof_reached = false; + else + { + state->current--; /* last returned tuple */ + if (state->current <= 0) + return false; + } + *stup = state->memtuples[state->current - 1]; + return true; + } + break; + + case TSS_SORTEDONTAPE: + Assert(forward || state->sortopt & TUPLESORT_RANDOMACCESS); + Assert(state->slabAllocatorUsed); + + /* + * The slot that held the tuple that we returned in previous + * gettuple call can now be reused. + */ + if (state->lastReturnedTuple) + { + RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); + state->lastReturnedTuple = NULL; + } + + if (forward) + { + if (state->eof_reached) + return false; + + if ((tuplen = getlen(state->result_tape, true)) != 0) + { + READTUP(state, stup, state->result_tape, tuplen); + + /* + * Remember the tuple we return, so that we can recycle + * its memory on next call. (This can be NULL, in the + * !state->tuples case). + */ + state->lastReturnedTuple = stup->tuple; + + return true; + } + else + { + state->eof_reached = true; + return false; + } + } + + /* + * Backward. + * + * if all tuples are fetched already then we return last tuple, + * else - tuple before last returned. + */ + if (state->eof_reached) + { + /* + * Seek position is pointing just past the zero tuplen at the + * end of file; back up to fetch last tuple's ending length + * word. If seek fails we must have a completely empty file. + */ + nmoved = LogicalTapeBackspace(state->result_tape, + 2 * sizeof(unsigned int)); + if (nmoved == 0) + return false; + else if (nmoved != 2 * sizeof(unsigned int)) + elog(ERROR, "unexpected tape position"); + state->eof_reached = false; + } + else + { + /* + * Back up and fetch previously-returned tuple's ending length + * word. If seek fails, assume we are at start of file. + */ + nmoved = LogicalTapeBackspace(state->result_tape, + sizeof(unsigned int)); + if (nmoved == 0) + return false; + else if (nmoved != sizeof(unsigned int)) + elog(ERROR, "unexpected tape position"); + tuplen = getlen(state->result_tape, false); + + /* + * Back up to get ending length word of tuple before it. + */ + nmoved = LogicalTapeBackspace(state->result_tape, + tuplen + 2 * sizeof(unsigned int)); + if (nmoved == tuplen + sizeof(unsigned int)) + { + /* + * We backed up over the previous tuple, but there was no + * ending length word before it. That means that the prev + * tuple is the first tuple in the file. It is now the + * next to read in forward direction (not obviously right, + * but that is what in-memory case does). + */ + return false; + } + else if (nmoved != tuplen + 2 * sizeof(unsigned int)) + elog(ERROR, "bogus tuple length in backward scan"); + } + + tuplen = getlen(state->result_tape, false); + + /* + * Now we have the length of the prior tuple, back up and read it. + * Note: READTUP expects we are positioned after the initial + * length word of the tuple, so back up to that point. + */ + nmoved = LogicalTapeBackspace(state->result_tape, + tuplen); + if (nmoved != tuplen) + elog(ERROR, "bogus tuple length in backward scan"); + READTUP(state, stup, state->result_tape, tuplen); + + /* + * Remember the tuple we return, so that we can recycle its memory + * on next call. (This can be NULL, in the Datum case). + */ + state->lastReturnedTuple = stup->tuple; + + return true; + + case TSS_FINALMERGE: + Assert(forward); + /* We are managing memory ourselves, with the slab allocator. */ + Assert(state->slabAllocatorUsed); + + /* + * The slab slot holding the tuple that we returned in previous + * gettuple call can now be reused. + */ + if (state->lastReturnedTuple) + { + RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); + state->lastReturnedTuple = NULL; + } + + /* + * This code should match the inner loop of mergeonerun(). + */ + if (state->memtupcount > 0) + { + int srcTapeIndex = state->memtuples[0].srctape; + LogicalTape *srcTape = state->inputTapes[srcTapeIndex]; + SortTuple newtup; + + *stup = state->memtuples[0]; + + /* + * Remember the tuple we return, so that we can recycle its + * memory on next call. (This can be NULL, in the Datum case). + */ + state->lastReturnedTuple = stup->tuple; + + /* + * Pull next tuple from tape, and replace the returned tuple + * at top of the heap with it. + */ + if (!mergereadnext(state, srcTape, &newtup)) + { + /* + * If no more data, we've reached end of run on this tape. + * Remove the top node from the heap. + */ + tuplesort_heap_delete_top(state); + state->nInputRuns--; + + /* + * Close the tape. It'd go away at the end of the sort + * anyway, but better to release the memory early. + */ + LogicalTapeClose(srcTape); + return true; + } + newtup.srctape = srcTapeIndex; + tuplesort_heap_replace_top(state, &newtup); + return true; + } + return false; + + default: + elog(ERROR, "invalid tuplesort state"); + return false; /* keep compiler quiet */ + } +} + +/* + * Fetch the next tuple in either forward or back direction. + * If successful, put tuple in slot and return true; else, clear the slot + * and return false. + * + * Caller may optionally be passed back abbreviated value (on true return + * value) when abbreviation was used, which can be used to cheaply avoid + * equality checks that might otherwise be required. Caller can safely make a + * determination of "non-equal tuple" based on simple binary inequality. A + * NULL value in leading attribute will set abbreviated value to zeroed + * representation, which caller may rely on in abbreviated inequality check. + * + * If copy is true, the slot receives a tuple that's been copied into the + * caller's memory context, so that it will stay valid regardless of future + * manipulations of the tuplesort's state (up to and including deleting the + * tuplesort). If copy is false, the slot will just receive a pointer to a + * tuple held within the tuplesort, which is more efficient, but only safe for + * callers that are prepared to have any subsequent manipulation of the + * tuplesort's state invalidate slot contents. + */ +bool +tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, + TupleTableSlot *slot, Datum *abbrev) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + SortTuple stup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + stup.tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + if (stup.tuple) + { + /* Record abbreviated key for caller */ + if (state->sortKeys->abbrev_converter && abbrev) + *abbrev = stup.datum1; + + if (copy) + stup.tuple = heap_copy_minimal_tuple((MinimalTuple) stup.tuple); + + ExecStoreMinimalTuple((MinimalTuple) stup.tuple, slot, copy); + return true; + } + else + { + ExecClearTuple(slot); + return false; + } +} + +/* + * Fetch the next tuple in either forward or back direction. + * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory + * context, and must not be freed by caller. Caller may not rely on tuple + * remaining valid after any further manipulation of tuplesort. + */ +HeapTuple +tuplesort_getheaptuple(Tuplesortstate *state, bool forward) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + SortTuple stup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + stup.tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + return stup.tuple; +} + +/* + * Fetch the next index tuple in either forward or back direction. + * Returns NULL if no more tuples. Returned tuple belongs to tuplesort memory + * context, and must not be freed by caller. Caller may not rely on tuple + * remaining valid after any further manipulation of tuplesort. + */ +IndexTuple +tuplesort_getindextuple(Tuplesortstate *state, bool forward) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + SortTuple stup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + stup.tuple = NULL; + + MemoryContextSwitchTo(oldcontext); + + return (IndexTuple) stup.tuple; +} + +/* + * Fetch the next Datum in either forward or back direction. + * Returns false if no more datums. + * + * If the Datum is pass-by-ref type, the returned value is freshly palloc'd + * in caller's context, and is now owned by the caller (this differs from + * similar routines for other types of tuplesorts). + * + * Caller may optionally be passed back abbreviated value (on true return + * value) when abbreviation was used, which can be used to cheaply avoid + * equality checks that might otherwise be required. Caller can safely make a + * determination of "non-equal tuple" based on simple binary inequality. A + * NULL value will have a zeroed abbreviated value representation, which caller + * may rely on in abbreviated inequality check. + */ +bool +tuplesort_getdatum(Tuplesortstate *state, bool forward, + Datum *val, bool *isNull, Datum *abbrev) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + SortTuple stup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + { + MemoryContextSwitchTo(oldcontext); + return false; + } + + /* Ensure we copy into caller's memory context */ + MemoryContextSwitchTo(oldcontext); + + /* Record abbreviated key for caller */ + if (state->sortKeys->abbrev_converter && abbrev) + *abbrev = stup.datum1; + + if (stup.isnull1 || !state->tuples) + { + *val = stup.datum1; + *isNull = stup.isnull1; + } + else + { + /* use stup.tuple because stup.datum1 may be an abbreviation */ + *val = datumCopy(PointerGetDatum(stup.tuple), false, state->datumTypeLen); + *isNull = false; + } + + return true; +} + +/* + * Advance over N tuples in either forward or back direction, + * without returning any data. N==0 is a no-op. + * Returns true if successful, false if ran out of tuples. + */ +bool +tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples, bool forward) +{ + MemoryContext oldcontext; + + /* + * We don't actually support backwards skip yet, because no callers need + * it. The API is designed to allow for that later, though. + */ + Assert(forward); + Assert(ntuples >= 0); + Assert(!WORKER(state)); + + switch (state->status) + { + case TSS_SORTEDINMEM: + if (state->memtupcount - state->current >= ntuples) + { + state->current += ntuples; + return true; + } + state->current = state->memtupcount; + state->eof_reached = true; + + /* + * Complain if caller tries to retrieve more tuples than + * originally asked for in a bounded sort. This is because + * returning EOF here might be the wrong thing. + */ + if (state->bounded && state->current >= state->bound) + elog(ERROR, "retrieved too many tuples in a bounded sort"); + + return false; + + case TSS_SORTEDONTAPE: + case TSS_FINALMERGE: + + /* + * We could probably optimize these cases better, but for now it's + * not worth the trouble. + */ + oldcontext = MemoryContextSwitchTo(state->sortcontext); + while (ntuples-- > 0) + { + SortTuple stup; + + if (!tuplesort_gettuple_common(state, forward, &stup)) + { + MemoryContextSwitchTo(oldcontext); + return false; + } + CHECK_FOR_INTERRUPTS(); + } + MemoryContextSwitchTo(oldcontext); + return true; + + default: + elog(ERROR, "invalid tuplesort state"); + return false; /* keep compiler quiet */ + } +} + +/* + * tuplesort_merge_order - report merge order we'll use for given memory + * (note: "merge order" just means the number of input tapes in the merge). + * + * This is exported for use by the planner. allowedMem is in bytes. + */ +int +tuplesort_merge_order(int64 allowedMem) +{ + int mOrder; + + /*---------- + * In the merge phase, we need buffer space for each input and output tape. + * Each pass in the balanced merge algorithm reads from M input tapes, and + * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes + * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per + * input tape. + * + * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) + + * N * TAPE_BUFFER_OVERHEAD + * + * Except for the last and next-to-last merge passes, where there can be + * fewer tapes left to process, M = N. We choose M so that we have the + * desired amount of memory available for the input buffers + * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory + * available for the tape buffers (allowedMem). + * + * Note: you might be thinking we need to account for the memtuples[] + * array in this calculation, but we effectively treat that as part of the + * MERGE_BUFFER_SIZE workspace. + *---------- + */ + mOrder = allowedMem / + (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE); + + /* + * Even in minimum memory, use at least a MINORDER merge. On the other + * hand, even when we have lots of memory, do not use more than a MAXORDER + * merge. Tapes are pretty cheap, but they're not entirely free. Each + * additional tape reduces the amount of memory available to build runs, + * which in turn can cause the same sort to need more runs, which makes + * merging slower even if it can still be done in a single pass. Also, + * high order merges are quite slow due to CPU cache effects; it can be + * faster to pay the I/O cost of a multi-pass merge than to perform a + * single merge pass across many hundreds of tapes. + */ + mOrder = Max(mOrder, MINORDER); + mOrder = Min(mOrder, MAXORDER); + + return mOrder; +} + +/* + * Helper function to calculate how much memory to allocate for the read buffer + * of each input tape in a merge pass. + * + * 'avail_mem' is the amount of memory available for the buffers of all the + * tapes, both input and output. + * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs. + * 'maxOutputTapes' is the max. number of output tapes we should produce. + */ +static int64 +merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, + int maxOutputTapes) +{ + int nOutputRuns; + int nOutputTapes; + + /* + * How many output tapes will we produce in this pass? + * + * This is nInputRuns / nInputTapes, rounded up. + */ + nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes; + + nOutputTapes = Min(nOutputRuns, maxOutputTapes); + + /* + * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All + * remaining memory is divided evenly between the input tapes. + * + * This also follows from the formula in tuplesort_merge_order, but here + * we derive the input buffer size from the amount of memory available, + * and M and N. + */ + return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0); +} + +/* + * inittapes - initialize for tape sorting. + * + * This is called only if we have found we won't sort in memory. + */ +static void +inittapes(Tuplesortstate *state, bool mergeruns) +{ + Assert(!LEADER(state)); + + if (mergeruns) + { + /* Compute number of input tapes to use when merging */ + state->maxTapes = tuplesort_merge_order(state->allowedMem); + } + else + { + /* Workers can sometimes produce single run, output without merge */ + Assert(WORKER(state)); + state->maxTapes = MINORDER; + } + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "worker %d switching to external sort with %d tapes: %s", + state->worker, state->maxTapes, pg_rusage_show(&state->ru_start)); +#endif + + /* Create the tape set */ + inittapestate(state, state->maxTapes); + state->tapeset = + LogicalTapeSetCreate(false, + state->shared ? &state->shared->fileset : NULL, + state->worker); + + state->currentRun = 0; + + /* + * Initialize logical tape arrays. + */ + state->inputTapes = NULL; + state->nInputTapes = 0; + state->nInputRuns = 0; + + state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *)); + state->nOutputTapes = 0; + state->nOutputRuns = 0; + + state->status = TSS_BUILDRUNS; + + selectnewtape(state); +} + +/* + * inittapestate - initialize generic tape management state + */ +static void +inittapestate(Tuplesortstate *state, int maxTapes) +{ + int64 tapeSpace; + + /* + * Decrease availMem to reflect the space needed for tape buffers; but + * don't decrease it to the point that we have no room for tuples. (That + * case is only likely to occur if sorting pass-by-value Datums; in all + * other scenarios the memtuples[] array is unlikely to occupy more than + * half of allowedMem. In the pass-by-value case it's not important to + * account for tuple space, so we don't care if LACKMEM becomes + * inaccurate.) + */ + tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; + + if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) + USEMEM(state, tapeSpace); + + /* + * Make sure that the temp file(s) underlying the tape set are created in + * suitable temp tablespaces. For parallel sorts, this should have been + * called already, but it doesn't matter if it is called a second time. + */ + PrepareTempTablespaces(); +} + +/* + * selectnewtape -- select next tape to output to. + * + * This is called after finishing a run when we know another run + * must be started. This is used both when building the initial + * runs, and during merge passes. + */ +static void +selectnewtape(Tuplesortstate *state) +{ + /* + * At the beginning of each merge pass, nOutputTapes and nOutputRuns are + * both zero. On each call, we create a new output tape to hold the next + * run, until maxTapes is reached. After that, we assign new runs to the + * existing tapes in a round robin fashion. + */ + if (state->nOutputTapes < state->maxTapes) + { + /* Create a new tape to hold the next run */ + Assert(state->outputTapes[state->nOutputRuns] == NULL); + Assert(state->nOutputRuns == state->nOutputTapes); + state->destTape = LogicalTapeCreate(state->tapeset); + state->outputTapes[state->nOutputTapes] = state->destTape; + state->nOutputTapes++; + state->nOutputRuns++; + } + else + { + /* + * We have reached the max number of tapes. Append to an existing + * tape. + */ + state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes]; + state->nOutputRuns++; + } +} + +/* + * Initialize the slab allocation arena, for the given number of slots. + */ +static void +init_slab_allocator(Tuplesortstate *state, int numSlots) +{ + if (numSlots > 0) + { + char *p; + int i; + + state->slabMemoryBegin = palloc(numSlots * SLAB_SLOT_SIZE); + state->slabMemoryEnd = state->slabMemoryBegin + + numSlots * SLAB_SLOT_SIZE; + state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; + USEMEM(state, numSlots * SLAB_SLOT_SIZE); + + p = state->slabMemoryBegin; + for (i = 0; i < numSlots - 1; i++) + { + ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); + p += SLAB_SLOT_SIZE; + } + ((SlabSlot *) p)->nextfree = NULL; + } + else + { + state->slabMemoryBegin = state->slabMemoryEnd = NULL; + state->slabFreeHead = NULL; + } + state->slabAllocatorUsed = true; +} + +/* + * mergeruns -- merge all the completed initial runs. + * + * This implements the Balanced k-Way Merge Algorithm. All input data has + * already been written to initial runs on tape (see dumptuples). + */ +static void +mergeruns(Tuplesortstate *state) +{ + int tapenum; + + Assert(state->status == TSS_BUILDRUNS); + Assert(state->memtupcount == 0); + + if (state->sortKeys != NULL && state->sortKeys->abbrev_converter != NULL) + { + /* + * If there are multiple runs to be merged, when we go to read back + * tuples from disk, abbreviated keys will not have been stored, and + * we don't care to regenerate them. Disable abbreviation from this + * point on. + */ + state->sortKeys->abbrev_converter = NULL; + state->sortKeys->comparator = state->sortKeys->abbrev_full_comparator; + + /* Not strictly necessary, but be tidy */ + state->sortKeys->abbrev_abort = NULL; + state->sortKeys->abbrev_full_comparator = NULL; + } + + /* + * Reset tuple memory. We've freed all the tuples that we previously + * allocated. We will use the slab allocator from now on. + */ + MemoryContextResetOnly(state->tuplecontext); + + /* + * We no longer need a large memtuples array. (We will allocate a smaller + * one for the heap later.) + */ + FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); + pfree(state->memtuples); + state->memtuples = NULL; + + /* + * Initialize the slab allocator. We need one slab slot per input tape, + * for the tuples in the heap, plus one to hold the tuple last returned + * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, + * however, we don't need to do allocate anything.) + * + * In a multi-pass merge, we could shrink this allocation for the last + * merge pass, if it has fewer tapes than previous passes, but we don't + * bother. + * + * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism + * to track memory usage of individual tuples. + */ + if (state->tuples) + init_slab_allocator(state, state->nOutputTapes + 1); + else + init_slab_allocator(state, 0); + + /* + * Allocate a new 'memtuples' array, for the heap. It will hold one tuple + * from each input tape. + * + * We could shrink this, too, between passes in a multi-pass merge, but we + * don't bother. (The initial input tapes are still in outputTapes. The + * number of input tapes will not increase between passes.) + */ + state->memtupsize = state->nOutputTapes; + state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext, + state->nOutputTapes * sizeof(SortTuple)); + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + + /* + * Use all the remaining memory we have available for tape buffers among + * all the input tapes. At the beginning of each merge pass, we will + * divide this memory between the input and output tapes in the pass. + */ + state->tape_buffer_mem = state->availMem; + USEMEM(state, state->tape_buffer_mem); +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "worker %d using %zu KB of memory for tape buffers", + state->worker, state->tape_buffer_mem / 1024); +#endif + + for (;;) + { + /* + * On the first iteration, or if we have read all the runs from the + * input tapes in a multi-pass merge, it's time to start a new pass. + * Rewind all the output tapes, and make them inputs for the next + * pass. + */ + if (state->nInputRuns == 0) + { + int64 input_buffer_size; + + /* Close the old, emptied, input tapes */ + if (state->nInputTapes > 0) + { + for (tapenum = 0; tapenum < state->nInputTapes; tapenum++) + LogicalTapeClose(state->inputTapes[tapenum]); + pfree(state->inputTapes); + } + + /* Previous pass's outputs become next pass's inputs. */ + state->inputTapes = state->outputTapes; + state->nInputTapes = state->nOutputTapes; + state->nInputRuns = state->nOutputRuns; + + /* + * Reset output tape variables. The actual LogicalTapes will be + * created as needed, here we only allocate the array to hold + * them. + */ + state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *)); + state->nOutputTapes = 0; + state->nOutputRuns = 0; + + /* + * Redistribute the memory allocated for tape buffers, among the + * new input and output tapes. + */ + input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem, + state->nInputTapes, + state->nInputRuns, + state->maxTapes); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s", + state->nInputRuns, state->nInputTapes, input_buffer_size / 1024, + pg_rusage_show(&state->ru_start)); +#endif + + /* Prepare the new input tapes for merge pass. */ + for (tapenum = 0; tapenum < state->nInputTapes; tapenum++) + LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size); + + /* + * If there's just one run left on each input tape, then only one + * merge pass remains. If we don't have to produce a materialized + * sorted tape, we can stop at this point and do the final merge + * on-the-fly. + */ + if ((state->sortopt & TUPLESORT_RANDOMACCESS) == 0 + && state->nInputRuns <= state->nInputTapes + && !WORKER(state)) + { + /* Tell logtape.c we won't be writing anymore */ + LogicalTapeSetForgetFreeSpace(state->tapeset); + /* Initialize for the final merge pass */ + beginmerge(state); + state->status = TSS_FINALMERGE; + return; + } + } + + /* Select an output tape */ + selectnewtape(state); + + /* Merge one run from each input tape. */ + mergeonerun(state); + + /* + * If the input tapes are empty, and we output only one output run, + * we're done. The current output tape contains the final result. + */ + if (state->nInputRuns == 0 && state->nOutputRuns <= 1) + break; + } + + /* + * Done. The result is on a single run on a single tape. + */ + state->result_tape = state->outputTapes[0]; + if (!WORKER(state)) + LogicalTapeFreeze(state->result_tape, NULL); + else + worker_freeze_result_tape(state); + state->status = TSS_SORTEDONTAPE; + + /* Close all the now-empty input tapes, to release their read buffers. */ + for (tapenum = 0; tapenum < state->nInputTapes; tapenum++) + LogicalTapeClose(state->inputTapes[tapenum]); +} + +/* + * Merge one run from each input tape. + */ +static void +mergeonerun(Tuplesortstate *state) +{ + int srcTapeIndex; + LogicalTape *srcTape; + + /* + * Start the merge by loading one tuple from each active source tape into + * the heap. + */ + beginmerge(state); + + /* + * Execute merge by repeatedly extracting lowest tuple in heap, writing it + * out, and replacing it with next tuple from same tape (if there is + * another one). + */ + while (state->memtupcount > 0) + { + SortTuple stup; + + /* write the tuple to destTape */ + srcTapeIndex = state->memtuples[0].srctape; + srcTape = state->inputTapes[srcTapeIndex]; + WRITETUP(state, state->destTape, &state->memtuples[0]); + + /* recycle the slot of the tuple we just wrote out, for the next read */ + if (state->memtuples[0].tuple) + RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); + + /* + * pull next tuple from the tape, and replace the written-out tuple in + * the heap with it. + */ + if (mergereadnext(state, srcTape, &stup)) + { + stup.srctape = srcTapeIndex; + tuplesort_heap_replace_top(state, &stup); + } + else + { + tuplesort_heap_delete_top(state); + state->nInputRuns--; + } + } + + /* + * When the heap empties, we're done. Write an end-of-run marker on the + * output tape. + */ + markrunend(state->destTape); +} + +/* + * beginmerge - initialize for a merge pass + * + * Fill the merge heap with the first tuple from each input tape. + */ +static void +beginmerge(Tuplesortstate *state) +{ + int activeTapes; + int srcTapeIndex; + + /* Heap should be empty here */ + Assert(state->memtupcount == 0); + + activeTapes = Min(state->nInputTapes, state->nInputRuns); + + for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++) + { + SortTuple tup; + + if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup)) + { + tup.srctape = srcTapeIndex; + tuplesort_heap_insert(state, &tup); + } + } +} + +/* + * mergereadnext - read next tuple from one merge input tape + * + * Returns false on EOF. + */ +static bool +mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup) +{ + unsigned int tuplen; + + /* read next tuple, if any */ + if ((tuplen = getlen(srcTape, true)) == 0) + return false; + READTUP(state, stup, srcTape, tuplen); + + return true; +} + +/* + * dumptuples - remove tuples from memtuples and write initial run to tape + * + * When alltuples = true, dump everything currently in memory. (This case is + * only used at end of input data.) + */ +static void +dumptuples(Tuplesortstate *state, bool alltuples) +{ + int memtupwrite; + int i; + + /* + * Nothing to do if we still fit in available memory and have array slots, + * unless this is the final call during initial run generation. + */ + if (state->memtupcount < state->memtupsize && !LACKMEM(state) && + !alltuples) + return; + + /* + * Final call might require no sorting, in rare cases where we just so + * happen to have previously LACKMEM()'d at the point where exactly all + * remaining tuples are loaded into memory, just before input was + * exhausted. In general, short final runs are quite possible, but avoid + * creating a completely empty run. In a worker, though, we must produce + * at least one tape, even if it's empty. + */ + if (state->memtupcount == 0 && state->currentRun > 0) + return; + + Assert(state->status == TSS_BUILDRUNS); + + /* + * It seems unlikely that this limit will ever be exceeded, but take no + * chances + */ + if (state->currentRun == INT_MAX) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("cannot have more than %d runs for an external sort", + INT_MAX))); + + if (state->currentRun > 0) + selectnewtape(state); + + state->currentRun++; + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "worker %d starting quicksort of run %d: %s", + state->worker, state->currentRun, + pg_rusage_show(&state->ru_start)); +#endif + + /* + * Sort all tuples accumulated within the allowed amount of memory for + * this run using quicksort + */ + tuplesort_sort_memtuples(state); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "worker %d finished quicksort of run %d: %s", + state->worker, state->currentRun, + pg_rusage_show(&state->ru_start)); +#endif + + memtupwrite = state->memtupcount; + for (i = 0; i < memtupwrite; i++) + { + WRITETUP(state, state->destTape, &state->memtuples[i]); + state->memtupcount--; + } + + /* + * Reset tuple memory. We've freed all of the tuples that we previously + * allocated. It's important to avoid fragmentation when there is a stark + * change in the sizes of incoming tuples. Fragmentation due to + * AllocSetFree's bucketing by size class might be particularly bad if + * this step wasn't taken. + */ + MemoryContextReset(state->tuplecontext); + + markrunend(state->destTape); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "worker %d finished writing run %d to tape %d: %s", + state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1, + pg_rusage_show(&state->ru_start)); +#endif +} + +/* + * tuplesort_rescan - rewind and replay the scan + */ +void +tuplesort_rescan(Tuplesortstate *state) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + + Assert(state->sortopt & TUPLESORT_RANDOMACCESS); + + switch (state->status) + { + case TSS_SORTEDINMEM: + state->current = 0; + state->eof_reached = false; + state->markpos_offset = 0; + state->markpos_eof = false; + break; + case TSS_SORTEDONTAPE: + LogicalTapeRewindForRead(state->result_tape, 0); + state->eof_reached = false; + state->markpos_block = 0L; + state->markpos_offset = 0; + state->markpos_eof = false; + break; + default: + elog(ERROR, "invalid tuplesort state"); + break; + } + + MemoryContextSwitchTo(oldcontext); +} + +/* + * tuplesort_markpos - saves current position in the merged sort file + */ +void +tuplesort_markpos(Tuplesortstate *state) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + + Assert(state->sortopt & TUPLESORT_RANDOMACCESS); + + switch (state->status) + { + case TSS_SORTEDINMEM: + state->markpos_offset = state->current; + state->markpos_eof = state->eof_reached; + break; + case TSS_SORTEDONTAPE: + LogicalTapeTell(state->result_tape, + &state->markpos_block, + &state->markpos_offset); + state->markpos_eof = state->eof_reached; + break; + default: + elog(ERROR, "invalid tuplesort state"); + break; + } + + MemoryContextSwitchTo(oldcontext); +} + +/* + * tuplesort_restorepos - restores current position in merged sort file to + * last saved position + */ +void +tuplesort_restorepos(Tuplesortstate *state) +{ + MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + + Assert(state->sortopt & TUPLESORT_RANDOMACCESS); + + switch (state->status) + { + case TSS_SORTEDINMEM: + state->current = state->markpos_offset; + state->eof_reached = state->markpos_eof; + break; + case TSS_SORTEDONTAPE: + LogicalTapeSeek(state->result_tape, + state->markpos_block, + state->markpos_offset); + state->eof_reached = state->markpos_eof; + break; + default: + elog(ERROR, "invalid tuplesort state"); + break; + } + + MemoryContextSwitchTo(oldcontext); +} + +/* + * tuplesort_get_stats - extract summary statistics + * + * This can be called after tuplesort_performsort() finishes to obtain + * printable summary information about how the sort was performed. + */ +void +tuplesort_get_stats(Tuplesortstate *state, + TuplesortInstrumentation *stats) +{ + /* + * Note: it might seem we should provide both memory and disk usage for a + * disk-based sort. However, the current code doesn't track memory space + * accurately once we have begun to return tuples to the caller (since we + * don't account for pfree's the caller is expected to do), so we cannot + * rely on availMem in a disk sort. This does not seem worth the overhead + * to fix. Is it worth creating an API for the memory context code to + * tell us how much is actually used in sortcontext? + */ + tuplesort_updatemax(state); + + if (state->isMaxSpaceDisk) + stats->spaceType = SORT_SPACE_TYPE_DISK; + else + stats->spaceType = SORT_SPACE_TYPE_MEMORY; + stats->spaceUsed = (state->maxSpace + 1023) / 1024; + + switch (state->maxSpaceStatus) + { + case TSS_SORTEDINMEM: + if (state->boundUsed) + stats->sortMethod = SORT_TYPE_TOP_N_HEAPSORT; + else + stats->sortMethod = SORT_TYPE_QUICKSORT; + break; + case TSS_SORTEDONTAPE: + stats->sortMethod = SORT_TYPE_EXTERNAL_SORT; + break; + case TSS_FINALMERGE: + stats->sortMethod = SORT_TYPE_EXTERNAL_MERGE; + break; + default: + stats->sortMethod = SORT_TYPE_STILL_IN_PROGRESS; + break; + } +} + +/* + * Convert TuplesortMethod to a string. + */ +const char * +tuplesort_method_name(TuplesortMethod m) +{ + switch (m) + { + case SORT_TYPE_STILL_IN_PROGRESS: + return "still in progress"; + case SORT_TYPE_TOP_N_HEAPSORT: + return "top-N heapsort"; + case SORT_TYPE_QUICKSORT: + return "quicksort"; + case SORT_TYPE_EXTERNAL_SORT: + return "external sort"; + case SORT_TYPE_EXTERNAL_MERGE: + return "external merge"; + } + + return "unknown"; +} + +/* + * Convert TuplesortSpaceType to a string. + */ +const char * +tuplesort_space_type_name(TuplesortSpaceType t) +{ + Assert(t == SORT_SPACE_TYPE_DISK || t == SORT_SPACE_TYPE_MEMORY); + return t == SORT_SPACE_TYPE_DISK ? "Disk" : "Memory"; +} + + +/* + * Heap manipulation routines, per Knuth's Algorithm 5.2.3H. + */ + +/* + * Convert the existing unordered array of SortTuples to a bounded heap, + * discarding all but the smallest "state->bound" tuples. + * + * When working with a bounded heap, we want to keep the largest entry + * at the root (array entry zero), instead of the smallest as in the normal + * sort case. This allows us to discard the largest entry cheaply. + * Therefore, we temporarily reverse the sort direction. + */ +static void +make_bounded_heap(Tuplesortstate *state) +{ + int tupcount = state->memtupcount; + int i; + + Assert(state->status == TSS_INITIAL); + Assert(state->bounded); + Assert(tupcount >= state->bound); + Assert(SERIAL(state)); + + /* Reverse sort direction so largest entry will be at root */ + reversedirection(state); + + state->memtupcount = 0; /* make the heap empty */ + for (i = 0; i < tupcount; i++) + { + if (state->memtupcount < state->bound) + { + /* Insert next tuple into heap */ + /* Must copy source tuple to avoid possible overwrite */ + SortTuple stup = state->memtuples[i]; + + tuplesort_heap_insert(state, &stup); + } + else + { + /* + * The heap is full. Replace the largest entry with the new + * tuple, or just discard it, if it's larger than anything already + * in the heap. + */ + if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0) + { + free_sort_tuple(state, &state->memtuples[i]); + CHECK_FOR_INTERRUPTS(); + } + else + tuplesort_heap_replace_top(state, &state->memtuples[i]); + } + } + + Assert(state->memtupcount == state->bound); + state->status = TSS_BOUNDED; +} + +/* + * Convert the bounded heap to a properly-sorted array + */ +static void +sort_bounded_heap(Tuplesortstate *state) +{ + int tupcount = state->memtupcount; + + Assert(state->status == TSS_BOUNDED); + Assert(state->bounded); + Assert(tupcount == state->bound); + Assert(SERIAL(state)); + + /* + * We can unheapify in place because each delete-top call will remove the + * largest entry, which we can promptly store in the newly freed slot at + * the end. Once we're down to a single-entry heap, we're done. + */ + while (state->memtupcount > 1) + { + SortTuple stup = state->memtuples[0]; + + /* this sifts-up the next-largest entry and decreases memtupcount */ + tuplesort_heap_delete_top(state); + state->memtuples[state->memtupcount] = stup; + } + state->memtupcount = tupcount; + + /* + * Reverse sort direction back to the original state. This is not + * actually necessary but seems like a good idea for tidiness. + */ + reversedirection(state); + + state->status = TSS_SORTEDINMEM; + state->boundUsed = true; +} + +/* + * Sort all memtuples using specialized qsort() routines. + * + * Quicksort is used for small in-memory sorts, and external sort runs. + */ +static void +tuplesort_sort_memtuples(Tuplesortstate *state) +{ + Assert(!LEADER(state)); + + if (state->memtupcount > 1) + { + /* + * Do we have the leading column's value or abbreviation in datum1, + * and is there a specialization for its comparator? + */ + if (state->haveDatum1 && state->sortKeys) + { + if (state->sortKeys[0].comparator == ssup_datum_unsigned_cmp) + { + qsort_tuple_unsigned(state->memtuples, + state->memtupcount, + state); + return; + } +#if SIZEOF_DATUM >= 8 + else if (state->sortKeys[0].comparator == ssup_datum_signed_cmp) + { + qsort_tuple_signed(state->memtuples, + state->memtupcount, + state); + return; + } +#endif + else if (state->sortKeys[0].comparator == ssup_datum_int32_cmp) + { + qsort_tuple_int32(state->memtuples, + state->memtupcount, + state); + return; + } + } + + /* Can we use the single-key sort function? */ + if (state->onlyKey != NULL) + { + qsort_ssup(state->memtuples, state->memtupcount, + state->onlyKey); + } + else + { + qsort_tuple(state->memtuples, + state->memtupcount, + state->comparetup, + state); + } + } +} + +/* + * Insert a new tuple into an empty or existing heap, maintaining the + * heap invariant. Caller is responsible for ensuring there's room. + * + * Note: For some callers, tuple points to a memtuples[] entry above the + * end of the heap. This is safe as long as it's not immediately adjacent + * to the end of the heap (ie, in the [memtupcount] array entry) --- if it + * is, it might get overwritten before being moved into the heap! + */ +static void +tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple) +{ + SortTuple *memtuples; + int j; + + memtuples = state->memtuples; + Assert(state->memtupcount < state->memtupsize); + + CHECK_FOR_INTERRUPTS(); + + /* + * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is + * using 1-based array indexes, not 0-based. + */ + j = state->memtupcount++; + while (j > 0) + { + int i = (j - 1) >> 1; + + if (COMPARETUP(state, tuple, &memtuples[i]) >= 0) + break; + memtuples[j] = memtuples[i]; + j = i; + } + memtuples[j] = *tuple; +} + +/* + * Remove the tuple at state->memtuples[0] from the heap. Decrement + * memtupcount, and sift up to maintain the heap invariant. + * + * The caller has already free'd the tuple the top node points to, + * if necessary. + */ +static void +tuplesort_heap_delete_top(Tuplesortstate *state) +{ + SortTuple *memtuples = state->memtuples; + SortTuple *tuple; + + if (--state->memtupcount <= 0) + return; + + /* + * Remove the last tuple in the heap, and re-insert it, by replacing the + * current top node with it. + */ + tuple = &memtuples[state->memtupcount]; + tuplesort_heap_replace_top(state, tuple); +} + +/* + * Replace the tuple at state->memtuples[0] with a new tuple. Sift up to + * maintain the heap invariant. + * + * This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H, + * Heapsort, steps H3-H8). + */ +static void +tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple) +{ + SortTuple *memtuples = state->memtuples; + unsigned int i, + n; + + Assert(state->memtupcount >= 1); + + CHECK_FOR_INTERRUPTS(); + + /* + * state->memtupcount is "int", but we use "unsigned int" for i, j, n. + * This prevents overflow in the "2 * i + 1" calculation, since at the top + * of the loop we must have i < n <= INT_MAX <= UINT_MAX/2. + */ + n = state->memtupcount; + i = 0; /* i is where the "hole" is */ + for (;;) + { + unsigned int j = 2 * i + 1; + + if (j >= n) + break; + if (j + 1 < n && + COMPARETUP(state, &memtuples[j], &memtuples[j + 1]) > 0) + j++; + if (COMPARETUP(state, tuple, &memtuples[j]) <= 0) + break; + memtuples[i] = memtuples[j]; + i = j; + } + memtuples[i] = *tuple; +} + +/* + * Function to reverse the sort direction from its current state + * + * It is not safe to call this when performing hash tuplesorts + */ +static void +reversedirection(Tuplesortstate *state) +{ + SortSupport sortKey = state->sortKeys; + int nkey; + + for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++) + { + sortKey->ssup_reverse = !sortKey->ssup_reverse; + sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first; + } +} + + +/* + * Tape interface routines + */ + +static unsigned int +getlen(LogicalTape *tape, bool eofOK) +{ + unsigned int len; + + if (LogicalTapeRead(tape, + &len, sizeof(len)) != sizeof(len)) + elog(ERROR, "unexpected end of tape"); + if (len == 0 && !eofOK) + elog(ERROR, "unexpected end of data"); + return len; +} + +static void +markrunend(LogicalTape *tape) +{ + unsigned int len = 0; + + LogicalTapeWrite(tape, (void *) &len, sizeof(len)); +} + +/* + * Get memory for tuple from within READTUP() routine. + * + * We use next free slot from the slab allocator, or palloc() if the tuple + * is too large for that. + */ +static void * +readtup_alloc(Tuplesortstate *state, Size tuplen) +{ + SlabSlot *buf; + + /* + * We pre-allocate enough slots in the slab arena that we should never run + * out. + */ + Assert(state->slabFreeHead); + + if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) + return MemoryContextAlloc(state->sortcontext, tuplen); + else + { + buf = state->slabFreeHead; + /* Reuse this slot */ + state->slabFreeHead = buf->nextfree; + + return buf; + } +} + + +/* + * Routines specialized for HeapTuple (actually MinimalTuple) case + */ + +static int +comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) +{ + SortSupport sortKey = state->sortKeys; + HeapTupleData ltup; + HeapTupleData rtup; + TupleDesc tupDesc; + int nkey; + int32 compare; + AttrNumber attno; + Datum datum1, + datum2; + bool isnull1, + isnull2; + + + /* Compare the leading sort key */ + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); + if (compare != 0) + return compare; + + /* Compare additional sort keys */ + ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; + ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET); + rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET; + rtup.t_data = (HeapTupleHeader) ((char *) b->tuple - MINIMAL_TUPLE_OFFSET); + tupDesc = state->tupDesc; + + if (sortKey->abbrev_converter) + { + attno = sortKey->ssup_attno; + + datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); + datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); + + compare = ApplySortAbbrevFullComparator(datum1, isnull1, + datum2, isnull2, + sortKey); + if (compare != 0) + return compare; + } + + sortKey++; + for (nkey = 1; nkey < state->nKeys; nkey++, sortKey++) + { + attno = sortKey->ssup_attno; + + datum1 = heap_getattr(<up, attno, tupDesc, &isnull1); + datum2 = heap_getattr(&rtup, attno, tupDesc, &isnull2); + + compare = ApplySortComparator(datum1, isnull1, + datum2, isnull2, + sortKey); + if (compare != 0) + return compare; + } + + return 0; +} + +static void +copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) +{ + /* + * We expect the passed "tup" to be a TupleTableSlot, and form a + * MinimalTuple using the exported interface for that. + */ + TupleTableSlot *slot = (TupleTableSlot *) tup; + Datum original; + MinimalTuple tuple; + HeapTupleData htup; + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); + + /* copy the tuple into sort storage */ + tuple = ExecCopySlotMinimalTuple(slot); + stup->tuple = (void *) tuple; + USEMEM(state, GetMemoryChunkSpace(tuple)); + /* set up first-column key value */ + htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); + original = heap_getattr(&htup, + state->sortKeys[0].ssup_attno, + state->tupDesc, + &stup->isnull1); + + MemoryContextSwitchTo(oldcontext); + + if (!state->sortKeys->abbrev_converter || stup->isnull1) + { + /* + * Store ordinary Datum representation, or NULL value. If there is a + * converter it won't expect NULL values, and cost model is not + * required to account for NULL, so in that case we avoid calling + * converter and just set datum1 to zeroed representation (to be + * consistent, and to support cheap inequality tests for NULL + * abbreviated keys). + */ + stup->datum1 = original; + } + else if (!consider_abort_common(state)) + { + /* Store abbreviated key representation */ + stup->datum1 = state->sortKeys->abbrev_converter(original, + state->sortKeys); + } + else + { + /* Abort abbreviation */ + int i; + + stup->datum1 = original; + + /* + * Set state to be consistent with never trying abbreviation. + * + * Alter datum1 representation in already-copied tuples, so as to + * ensure a consistent representation (current tuple was just + * handled). It does not matter if some dumped tuples are already + * sorted on tape, since serialized tuples lack abbreviated keys + * (TSS_BUILDRUNS state prevents control reaching here in any case). + */ + for (i = 0; i < state->memtupcount; i++) + { + SortTuple *mtup = &state->memtuples[i]; + + htup.t_len = ((MinimalTuple) mtup->tuple)->t_len + + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader) ((char *) mtup->tuple - + MINIMAL_TUPLE_OFFSET); + + mtup->datum1 = heap_getattr(&htup, + state->sortKeys[0].ssup_attno, + state->tupDesc, + &mtup->isnull1); + } + } +} + +static void +writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + MinimalTuple tuple = (MinimalTuple) stup->tuple; + + /* the part of the MinimalTuple we'll write: */ + char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; + unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; + + /* total on-disk footprint: */ + unsigned int tuplen = tupbodylen + sizeof(int); + + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) tupbody, tupbodylen); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + + if (!state->slabAllocatorUsed) + { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_free_minimal_tuple(tuple); + } +} + +static void +readtup_heap(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len) +{ + unsigned int tupbodylen = len - sizeof(int); + unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; + MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); + char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; + HeapTupleData htup; + + /* read in the tuple proper */ + tuple->t_len = tuplen; + LogicalTapeReadExact(tape, tupbody, tupbodylen); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); + stup->tuple = (void *) tuple; + /* set up first-column key value */ + htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); + stup->datum1 = heap_getattr(&htup, + state->sortKeys[0].ssup_attno, + state->tupDesc, + &stup->isnull1); +} + +/* + * Routines specialized for the CLUSTER case (HeapTuple data, with + * comparisons per a btree index definition) + */ + +static int +comparetup_cluster(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + SortSupport sortKey = state->sortKeys; + HeapTuple ltup; + HeapTuple rtup; + TupleDesc tupDesc; + int nkey; + int32 compare; + Datum datum1, + datum2; + bool isnull1, + isnull2; + + /* Be prepared to compare additional sort keys */ + ltup = (HeapTuple) a->tuple; + rtup = (HeapTuple) b->tuple; + tupDesc = state->tupDesc; + + /* Compare the leading sort key, if it's simple */ + if (state->haveDatum1) + { + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); + if (compare != 0) + return compare; + + if (sortKey->abbrev_converter) + { + AttrNumber leading = state->indexInfo->ii_IndexAttrNumbers[0]; + + datum1 = heap_getattr(ltup, leading, tupDesc, &isnull1); + datum2 = heap_getattr(rtup, leading, tupDesc, &isnull2); + + compare = ApplySortAbbrevFullComparator(datum1, isnull1, + datum2, isnull2, + sortKey); + } + if (compare != 0 || state->nKeys == 1) + return compare; + /* Compare additional columns the hard way */ + sortKey++; + nkey = 1; + } + else + { + /* Must compare all keys the hard way */ + nkey = 0; + } + + if (state->indexInfo->ii_Expressions == NULL) + { + /* If not expression index, just compare the proper heap attrs */ + + for (; nkey < state->nKeys; nkey++, sortKey++) + { + AttrNumber attno = state->indexInfo->ii_IndexAttrNumbers[nkey]; + + datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1); + datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2); + + compare = ApplySortComparator(datum1, isnull1, + datum2, isnull2, + sortKey); + if (compare != 0) + return compare; + } + } + else + { + /* + * In the expression index case, compute the whole index tuple and + * then compare values. It would perhaps be faster to compute only as + * many columns as we need to compare, but that would require + * duplicating all the logic in FormIndexDatum. + */ + Datum l_index_values[INDEX_MAX_KEYS]; + bool l_index_isnull[INDEX_MAX_KEYS]; + Datum r_index_values[INDEX_MAX_KEYS]; + bool r_index_isnull[INDEX_MAX_KEYS]; + TupleTableSlot *ecxt_scantuple; + + /* Reset context each time to prevent memory leakage */ + ResetPerTupleExprContext(state->estate); + + ecxt_scantuple = GetPerTupleExprContext(state->estate)->ecxt_scantuple; + + ExecStoreHeapTuple(ltup, ecxt_scantuple, false); + FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, + l_index_values, l_index_isnull); + + ExecStoreHeapTuple(rtup, ecxt_scantuple, false); + FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate, + r_index_values, r_index_isnull); + + for (; nkey < state->nKeys; nkey++, sortKey++) + { + compare = ApplySortComparator(l_index_values[nkey], + l_index_isnull[nkey], + r_index_values[nkey], + r_index_isnull[nkey], + sortKey); + if (compare != 0) + return compare; + } + } + + return 0; +} + +static void +copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) +{ + HeapTuple tuple = (HeapTuple) tup; + Datum original; + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); + + /* copy the tuple into sort storage */ + tuple = heap_copytuple(tuple); + stup->tuple = (void *) tuple; + USEMEM(state, GetMemoryChunkSpace(tuple)); + + MemoryContextSwitchTo(oldcontext); + + /* + * set up first-column key value, and potentially abbreviate, if it's a + * simple column + */ + if (!state->haveDatum1) + return; + + original = heap_getattr(tuple, + state->indexInfo->ii_IndexAttrNumbers[0], + state->tupDesc, + &stup->isnull1); + + if (!state->sortKeys->abbrev_converter || stup->isnull1) + { + /* + * Store ordinary Datum representation, or NULL value. If there is a + * converter it won't expect NULL values, and cost model is not + * required to account for NULL, so in that case we avoid calling + * converter and just set datum1 to zeroed representation (to be + * consistent, and to support cheap inequality tests for NULL + * abbreviated keys). + */ + stup->datum1 = original; + } + else if (!consider_abort_common(state)) + { + /* Store abbreviated key representation */ + stup->datum1 = state->sortKeys->abbrev_converter(original, + state->sortKeys); + } + else + { + /* Abort abbreviation */ + int i; + + stup->datum1 = original; + + /* + * Set state to be consistent with never trying abbreviation. + * + * Alter datum1 representation in already-copied tuples, so as to + * ensure a consistent representation (current tuple was just + * handled). It does not matter if some dumped tuples are already + * sorted on tape, since serialized tuples lack abbreviated keys + * (TSS_BUILDRUNS state prevents control reaching here in any case). + */ + for (i = 0; i < state->memtupcount; i++) + { + SortTuple *mtup = &state->memtuples[i]; + + tuple = (HeapTuple) mtup->tuple; + mtup->datum1 = heap_getattr(tuple, + state->indexInfo->ii_IndexAttrNumbers[0], + state->tupDesc, + &mtup->isnull1); + } + } +} + +static void +writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + HeapTuple tuple = (HeapTuple) stup->tuple; + unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); + + /* We need to store t_self, but not other fields of HeapTupleData */ + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData)); + LogicalTapeWrite(tape, tuple->t_data, tuple->t_len); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + + if (!state->slabAllocatorUsed) + { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_freetuple(tuple); + } +} + +static void +readtup_cluster(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int tuplen) +{ + unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); + HeapTuple tuple = (HeapTuple) readtup_alloc(state, + t_len + HEAPTUPLESIZE); + + /* Reconstruct the HeapTupleData header */ + tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); + tuple->t_len = t_len; + LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData)); + /* We don't currently bother to reconstruct t_tableOid */ + tuple->t_tableOid = InvalidOid; + /* Read in the tuple body */ + LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); + stup->tuple = (void *) tuple; + /* set up first-column key value, if it's a simple column */ + if (state->haveDatum1) + stup->datum1 = heap_getattr(tuple, + state->indexInfo->ii_IndexAttrNumbers[0], + state->tupDesc, + &stup->isnull1); +} + +/* + * Routines specialized for IndexTuple case + * + * The btree and hash cases require separate comparison functions, but the + * IndexTuple representation is the same so the copy/write/read support + * functions can be shared. + */ + +static int +comparetup_index_btree(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + /* + * This is similar to comparetup_heap(), but expects index tuples. There + * is also special handling for enforcing uniqueness, and special + * treatment for equal keys at the end. + */ + SortSupport sortKey = state->sortKeys; + IndexTuple tuple1; + IndexTuple tuple2; + int keysz; + TupleDesc tupDes; + bool equal_hasnull = false; + int nkey; + int32 compare; + Datum datum1, + datum2; + bool isnull1, + isnull2; + + + /* Compare the leading sort key */ + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); + if (compare != 0) + return compare; + + /* Compare additional sort keys */ + tuple1 = (IndexTuple) a->tuple; + tuple2 = (IndexTuple) b->tuple; + keysz = state->nKeys; + tupDes = RelationGetDescr(state->indexRel); + + if (sortKey->abbrev_converter) + { + datum1 = index_getattr(tuple1, 1, tupDes, &isnull1); + datum2 = index_getattr(tuple2, 1, tupDes, &isnull2); + + compare = ApplySortAbbrevFullComparator(datum1, isnull1, + datum2, isnull2, + sortKey); + if (compare != 0) + return compare; + } + + /* they are equal, so we only need to examine one null flag */ + if (a->isnull1) + equal_hasnull = true; + + sortKey++; + for (nkey = 2; nkey <= keysz; nkey++, sortKey++) + { + datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1); + datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2); + + compare = ApplySortComparator(datum1, isnull1, + datum2, isnull2, + sortKey); + if (compare != 0) + return compare; /* done when we find unequal attributes */ + + /* they are equal, so we only need to examine one null flag */ + if (isnull1) + equal_hasnull = true; + } + + /* + * If btree has asked us to enforce uniqueness, complain if two equal + * tuples are detected (unless there was at least one NULL field and NULLS + * NOT DISTINCT was not set). + * + * It is sufficient to make the test here, because if two tuples are equal + * they *must* get compared at some stage of the sort --- otherwise the + * sort algorithm wouldn't have checked whether one must appear before the + * other. + */ + if (state->enforceUnique && !(!state->uniqueNullsNotDistinct && equal_hasnull)) + { + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + char *key_desc; + + /* + * Some rather brain-dead implementations of qsort (such as the one in + * QNX 4) will sometimes call the comparison routine to compare a + * value to itself, but we always use our own implementation, which + * does not. + */ + Assert(tuple1 != tuple2); + + index_deform_tuple(tuple1, tupDes, values, isnull); + + key_desc = BuildIndexValueDescription(state->indexRel, values, isnull); + + ereport(ERROR, + (errcode(ERRCODE_UNIQUE_VIOLATION), + errmsg("could not create unique index \"%s\"", + RelationGetRelationName(state->indexRel)), + key_desc ? errdetail("Key %s is duplicated.", key_desc) : + errdetail("Duplicate keys exist."), + errtableconstraint(state->heapRel, + RelationGetRelationName(state->indexRel)))); + } + + /* + * If key values are equal, we sort on ItemPointer. This is required for + * btree indexes, since heap TID is treated as an implicit last key + * attribute in order to ensure that all keys in the index are physically + * unique. + */ + { + BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); + BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); + + if (blk1 != blk2) + return (blk1 < blk2) ? -1 : 1; + } + { + OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); + OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); + + if (pos1 != pos2) + return (pos1 < pos2) ? -1 : 1; + } + + /* ItemPointer values should never be equal */ + Assert(false); + + return 0; +} + +static int +comparetup_index_hash(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + Bucket bucket1; + Bucket bucket2; + IndexTuple tuple1; + IndexTuple tuple2; + + /* + * Fetch hash keys and mask off bits we don't want to sort by. We know + * that the first column of the index tuple is the hash key. + */ + Assert(!a->isnull1); + bucket1 = _hash_hashkey2bucket(DatumGetUInt32(a->datum1), + state->max_buckets, state->high_mask, + state->low_mask); + Assert(!b->isnull1); + bucket2 = _hash_hashkey2bucket(DatumGetUInt32(b->datum1), + state->max_buckets, state->high_mask, + state->low_mask); + if (bucket1 > bucket2) + return 1; + else if (bucket1 < bucket2) + return -1; + + /* + * If hash values are equal, we sort on ItemPointer. This does not affect + * validity of the finished index, but it may be useful to have index + * scans in physical order. + */ + tuple1 = (IndexTuple) a->tuple; + tuple2 = (IndexTuple) b->tuple; + + { + BlockNumber blk1 = ItemPointerGetBlockNumber(&tuple1->t_tid); + BlockNumber blk2 = ItemPointerGetBlockNumber(&tuple2->t_tid); + + if (blk1 != blk2) + return (blk1 < blk2) ? -1 : 1; + } + { + OffsetNumber pos1 = ItemPointerGetOffsetNumber(&tuple1->t_tid); + OffsetNumber pos2 = ItemPointerGetOffsetNumber(&tuple2->t_tid); + + if (pos1 != pos2) + return (pos1 < pos2) ? -1 : 1; + } + + /* ItemPointer values should never be equal */ + Assert(false); + + return 0; +} + +static void +copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) +{ + /* Not currently needed */ + elog(ERROR, "copytup_index() should not be called"); +} + +static void +writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + IndexTuple tuple = (IndexTuple) stup->tuple; + unsigned int tuplen; + + tuplen = IndexTupleSize(tuple) + sizeof(tuplen); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple)); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + + if (!state->slabAllocatorUsed) + { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + pfree(tuple); + } +} + +static void +readtup_index(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len) +{ + unsigned int tuplen = len - sizeof(unsigned int); + IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); + + LogicalTapeReadExact(tape, tuple, tuplen); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); + stup->tuple = (void *) tuple; + /* set up first-column key value */ + stup->datum1 = index_getattr(tuple, + 1, + RelationGetDescr(state->indexRel), + &stup->isnull1); +} + +/* + * Routines specialized for DatumTuple case + */ + +static int +comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) +{ + int compare; + + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + state->sortKeys); + if (compare != 0) + return compare; + + /* if we have abbreviations, then "tuple" has the original value */ + + if (state->sortKeys->abbrev_converter) + compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1, + PointerGetDatum(b->tuple), b->isnull1, + state->sortKeys); + + return compare; +} + +static void +copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) +{ + /* Not currently needed */ + elog(ERROR, "copytup_datum() should not be called"); +} + +static void +writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) +{ + void *waddr; + unsigned int tuplen; + unsigned int writtenlen; + + if (stup->isnull1) + { + waddr = NULL; + tuplen = 0; + } + else if (!state->tuples) + { + waddr = &stup->datum1; + tuplen = sizeof(Datum); + } + else + { + waddr = stup->tuple; + tuplen = datumGetSize(PointerGetDatum(stup->tuple), false, state->datumTypeLen); + Assert(tuplen != 0); + } + + writtenlen = tuplen + sizeof(unsigned int); + + LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen)); + LogicalTapeWrite(tape, waddr, tuplen); + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen)); + + if (!state->slabAllocatorUsed && stup->tuple) + { + FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); + pfree(stup->tuple); + } +} + +static void +readtup_datum(Tuplesortstate *state, SortTuple *stup, + LogicalTape *tape, unsigned int len) +{ + unsigned int tuplen = len - sizeof(unsigned int); + + if (tuplen == 0) + { + /* it's NULL */ + stup->datum1 = (Datum) 0; + stup->isnull1 = true; + stup->tuple = NULL; + } + else if (!state->tuples) + { + Assert(tuplen == sizeof(Datum)); + LogicalTapeReadExact(tape, &stup->datum1, tuplen); + stup->isnull1 = false; + stup->tuple = NULL; + } + else + { + void *raddr = readtup_alloc(state, tuplen); + + LogicalTapeReadExact(tape, raddr, tuplen); + stup->datum1 = PointerGetDatum(raddr); + stup->isnull1 = false; + stup->tuple = raddr; + } + + if (state->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length + * word? */ + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); +} + +/* + * Parallel sort routines + */ + +/* + * tuplesort_estimate_shared - estimate required shared memory allocation + * + * nWorkers is an estimate of the number of workers (it's the number that + * will be requested). + */ +Size +tuplesort_estimate_shared(int nWorkers) +{ + Size tapesSize; + + Assert(nWorkers > 0); + + /* Make sure that BufFile shared state is MAXALIGN'd */ + tapesSize = mul_size(sizeof(TapeShare), nWorkers); + tapesSize = MAXALIGN(add_size(tapesSize, offsetof(Sharedsort, tapes))); + + return tapesSize; +} + +/* + * tuplesort_initialize_shared - initialize shared tuplesort state + * + * Must be called from leader process before workers are launched, to + * establish state needed up-front for worker tuplesortstates. nWorkers + * should match the argument passed to tuplesort_estimate_shared(). + */ +void +tuplesort_initialize_shared(Sharedsort *shared, int nWorkers, dsm_segment *seg) +{ + int i; + + Assert(nWorkers > 0); + + SpinLockInit(&shared->mutex); + shared->currentWorker = 0; + shared->workersFinished = 0; + SharedFileSetInit(&shared->fileset, seg); + shared->nTapes = nWorkers; + for (i = 0; i < nWorkers; i++) + { + shared->tapes[i].firstblocknumber = 0L; + } +} + +/* + * tuplesort_attach_shared - attach to shared tuplesort state + * + * Must be called by all worker processes. + */ +void +tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg) +{ + /* Attach to SharedFileSet */ + SharedFileSetAttach(&shared->fileset, seg); +} + +/* + * worker_get_identifier - Assign and return ordinal identifier for worker + * + * The order in which these are assigned is not well defined, and should not + * matter; worker numbers across parallel sort participants need only be + * distinct and gapless. logtape.c requires this. + * + * Note that the identifiers assigned from here have no relation to + * ParallelWorkerNumber number, to avoid making any assumption about + * caller's requirements. However, we do follow the ParallelWorkerNumber + * convention of representing a non-worker with worker number -1. This + * includes the leader, as well as serial Tuplesort processes. + */ +static int +worker_get_identifier(Tuplesortstate *state) +{ + Sharedsort *shared = state->shared; + int worker; + + Assert(WORKER(state)); + + SpinLockAcquire(&shared->mutex); + worker = shared->currentWorker++; + SpinLockRelease(&shared->mutex); + + return worker; +} + +/* + * worker_freeze_result_tape - freeze worker's result tape for leader + * + * This is called by workers just after the result tape has been determined, + * instead of calling LogicalTapeFreeze() directly. They do so because + * workers require a few additional steps over similar serial + * TSS_SORTEDONTAPE external sort cases, which also happen here. The extra + * steps are around freeing now unneeded resources, and representing to + * leader that worker's input run is available for its merge. + * + * There should only be one final output run for each worker, which consists + * of all tuples that were originally input into worker. + */ +static void +worker_freeze_result_tape(Tuplesortstate *state) +{ + Sharedsort *shared = state->shared; + TapeShare output; + + Assert(WORKER(state)); + Assert(state->result_tape != NULL); + Assert(state->memtupcount == 0); + + /* + * Free most remaining memory, in case caller is sensitive to our holding + * on to it. memtuples may not be a tiny merge heap at this point. + */ + pfree(state->memtuples); + /* Be tidy */ + state->memtuples = NULL; + state->memtupsize = 0; + + /* + * Parallel worker requires result tape metadata, which is to be stored in + * shared memory for leader + */ + LogicalTapeFreeze(state->result_tape, &output); + + /* Store properties of output tape, and update finished worker count */ + SpinLockAcquire(&shared->mutex); + shared->tapes[state->worker] = output; + shared->workersFinished++; + SpinLockRelease(&shared->mutex); +} + +/* + * worker_nomergeruns - dump memtuples in worker, without merging + * + * This called as an alternative to mergeruns() with a worker when no + * merging is required. + */ +static void +worker_nomergeruns(Tuplesortstate *state) +{ + Assert(WORKER(state)); + Assert(state->result_tape == NULL); + Assert(state->nOutputRuns == 1); + + state->result_tape = state->destTape; + worker_freeze_result_tape(state); +} + +/* + * leader_takeover_tapes - create tapeset for leader from worker tapes + * + * So far, leader Tuplesortstate has performed no actual sorting. By now, all + * sorting has occurred in workers, all of which must have already returned + * from tuplesort_performsort(). + * + * When this returns, leader process is left in a state that is virtually + * indistinguishable from it having generated runs as a serial external sort + * might have. + */ +static void +leader_takeover_tapes(Tuplesortstate *state) +{ + Sharedsort *shared = state->shared; + int nParticipants = state->nParticipants; + int workersFinished; + int j; + + Assert(LEADER(state)); + Assert(nParticipants >= 1); + + SpinLockAcquire(&shared->mutex); + workersFinished = shared->workersFinished; + SpinLockRelease(&shared->mutex); + + if (nParticipants != workersFinished) + elog(ERROR, "cannot take over tapes before all workers finish"); + + /* + * Create the tapeset from worker tapes, including a leader-owned tape at + * the end. Parallel workers are far more expensive than logical tapes, + * so the number of tapes allocated here should never be excessive. + */ + inittapestate(state, nParticipants); + state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1); + + /* + * Set currentRun to reflect the number of runs we will merge (it's not + * used for anything, this is just pro forma) + */ + state->currentRun = nParticipants; + + /* + * Initialize the state to look the same as after building the initial + * runs. + * + * There will always be exactly 1 run per worker, and exactly one input + * tape per run, because workers always output exactly 1 run, even when + * there were no input tuples for workers to sort. + */ + state->inputTapes = NULL; + state->nInputTapes = 0; + state->nInputRuns = 0; + + state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *)); + state->nOutputTapes = nParticipants; + state->nOutputRuns = nParticipants; + + for (j = 0; j < nParticipants; j++) + { + state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]); + } + + state->status = TSS_BUILDRUNS; +} + +/* + * Convenience routine to free a tuple previously loaded into sort memory + */ +static void +free_sort_tuple(Tuplesortstate *state, SortTuple *stup) +{ + if (stup->tuple) + { + FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); + pfree(stup->tuple); + stup->tuple = NULL; + } +} + +int +ssup_datum_unsigned_cmp(Datum x, Datum y, SortSupport ssup) +{ + if (x < y) + return -1; + else if (x > y) + return 1; + else + return 0; +} + +#if SIZEOF_DATUM >= 8 +int +ssup_datum_signed_cmp(Datum x, Datum y, SortSupport ssup) +{ + int64 xx = DatumGetInt64(x); + int64 yy = DatumGetInt64(y); + + if (xx < yy) + return -1; + else if (xx > yy) + return 1; + else + return 0; +} +#endif + +int +ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup) +{ + int32 xx = DatumGetInt32(x); + int32 yy = DatumGetInt32(y); + + if (xx < yy) + return -1; + else if (xx > yy) + return 1; + else + return 0; +} diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c new file mode 100644 index 0000000..f605ece --- /dev/null +++ b/src/backend/utils/sort/tuplestore.c @@ -0,0 +1,1552 @@ +/*------------------------------------------------------------------------- + * + * tuplestore.c + * Generalized routines for temporary tuple storage. + * + * This module handles temporary storage of tuples for purposes such + * as Materialize nodes, hashjoin batch files, etc. It is essentially + * a dumbed-down version of tuplesort.c; it does no sorting of tuples + * but can only store and regurgitate a sequence of tuples. However, + * because no sort is required, it is allowed to start reading the sequence + * before it has all been written. This is particularly useful for cursors, + * because it allows random access within the already-scanned portion of + * a query without having to process the underlying scan to completion. + * Also, it is possible to support multiple independent read pointers. + * + * A temporary file is used to handle the data if it exceeds the + * space limit specified by the caller. + * + * The (approximate) amount of memory allowed to the tuplestore is specified + * in kilobytes by the caller. We absorb tuples and simply store them in an + * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed + * maxKBytes, we dump all the tuples into a temp file and then read from that + * when needed. + * + * Upon creation, a tuplestore supports a single read pointer, numbered 0. + * Additional read pointers can be created using tuplestore_alloc_read_pointer. + * Mark/restore behavior is supported by copying read pointers. + * + * When the caller requests backward-scan capability, we write the temp file + * in a format that allows either forward or backward scan. Otherwise, only + * forward scan is allowed. A request for backward scan must be made before + * putting any tuples into the tuplestore. Rewind is normally allowed but + * can be turned off via tuplestore_set_eflags; turning off rewind for all + * read pointers enables truncation of the tuplestore at the oldest read point + * for minimal memory usage. (The caller must explicitly call tuplestore_trim + * at appropriate times for truncation to actually happen.) + * + * Note: in TSS_WRITEFILE state, the temp file's seek position is the + * current write position, and the write-position variables in the tuplestore + * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's + * seek position is the active read pointer's position, and that read pointer + * isn't kept up to date. We update the appropriate variables using ftell() + * before switching to the other state or activating a different read pointer. + * + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/utils/sort/tuplestore.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <limits.h> + +#include "access/htup_details.h" +#include "commands/tablespace.h" +#include "executor/executor.h" +#include "miscadmin.h" +#include "storage/buffile.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + + +/* + * Possible states of a Tuplestore object. These denote the states that + * persist between calls of Tuplestore routines. + */ +typedef enum +{ + TSS_INMEM, /* Tuples still fit in memory */ + TSS_WRITEFILE, /* Writing to temp file */ + TSS_READFILE /* Reading from temp file */ +} TupStoreStatus; + +/* + * State for a single read pointer. If we are in state INMEM then all the + * read pointers' "current" fields denote the read positions. In state + * WRITEFILE, the file/offset fields denote the read positions. In state + * READFILE, inactive read pointers have valid file/offset, but the active + * read pointer implicitly has position equal to the temp file's seek position. + * + * Special case: if eof_reached is true, then the pointer's read position is + * implicitly equal to the write position, and current/file/offset aren't + * maintained. This way we need not update all the read pointers each time + * we write. + */ +typedef struct +{ + int eflags; /* capability flags */ + bool eof_reached; /* read has reached EOF */ + int current; /* next array index to read */ + int file; /* temp file# */ + off_t offset; /* byte offset in file */ +} TSReadPointer; + +/* + * Private state of a Tuplestore operation. + */ +struct Tuplestorestate +{ + TupStoreStatus status; /* enumerated value as shown above */ + int eflags; /* capability flags (OR of pointers' flags) */ + bool backward; /* store extra length words in file? */ + bool interXact; /* keep open through transactions? */ + bool truncated; /* tuplestore_trim has removed tuples? */ + int64 availMem; /* remaining memory available, in bytes */ + int64 allowedMem; /* total memory allowed, in bytes */ + int64 tuples; /* number of tuples added */ + BufFile *myfile; /* underlying file, or NULL if none */ + MemoryContext context; /* memory context for holding tuples */ + ResourceOwner resowner; /* resowner for holding temp files */ + + /* + * These function pointers decouple the routines that must know what kind + * of tuple we are handling from the routines that don't need to know it. + * They are set up by the tuplestore_begin_xxx routines. + * + * (Although tuplestore.c currently only supports heap tuples, I've copied + * this part of tuplesort.c so that extension to other kinds of objects + * will be easy if it's ever needed.) + * + * Function to copy a supplied input tuple into palloc'd space. (NB: we + * assume that a single pfree() is enough to release the tuple later, so + * the representation must be "flat" in one palloc chunk.) state->availMem + * must be decreased by the amount of space used. + */ + void *(*copytup) (Tuplestorestate *state, void *tup); + + /* + * Function to write a stored tuple onto tape. The representation of the + * tuple on tape need not be the same as it is in memory; requirements on + * the tape representation are given below. After writing the tuple, + * pfree() it, and increase state->availMem by the amount of memory space + * thereby released. + */ + void (*writetup) (Tuplestorestate *state, void *tup); + + /* + * Function to read a stored tuple from tape back into memory. 'len' is + * the already-read length of the stored tuple. Create and return a + * palloc'd copy, and decrease state->availMem by the amount of memory + * space consumed. + */ + void *(*readtup) (Tuplestorestate *state, unsigned int len); + + /* + * This array holds pointers to tuples in memory if we are in state INMEM. + * In states WRITEFILE and READFILE it's not used. + * + * When memtupdeleted > 0, the first memtupdeleted pointers are already + * released due to a tuplestore_trim() operation, but we haven't expended + * the effort to slide the remaining pointers down. These unused pointers + * are set to NULL to catch any invalid accesses. Note that memtupcount + * includes the deleted pointers. + */ + void **memtuples; /* array of pointers to palloc'd tuples */ + int memtupdeleted; /* the first N slots are currently unused */ + int memtupcount; /* number of tuples currently present */ + int memtupsize; /* allocated length of memtuples array */ + bool growmemtuples; /* memtuples' growth still underway? */ + + /* + * These variables are used to keep track of the current positions. + * + * In state WRITEFILE, the current file seek position is the write point; + * in state READFILE, the write position is remembered in writepos_xxx. + * (The write position is the same as EOF, but since BufFileSeek doesn't + * currently implement SEEK_END, we have to remember it explicitly.) + */ + TSReadPointer *readptrs; /* array of read pointers */ + int activeptr; /* index of the active read pointer */ + int readptrcount; /* number of pointers currently valid */ + int readptrsize; /* allocated length of readptrs array */ + + int writepos_file; /* file# (valid if READFILE state) */ + off_t writepos_offset; /* offset (valid if READFILE state) */ +}; + +#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup)) +#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup)) +#define READTUP(state,len) ((*(state)->readtup) (state, len)) +#define LACKMEM(state) ((state)->availMem < 0) +#define USEMEM(state,amt) ((state)->availMem -= (amt)) +#define FREEMEM(state,amt) ((state)->availMem += (amt)) + +/*-------------------- + * + * NOTES about on-tape representation of tuples: + * + * We require the first "unsigned int" of a stored tuple to be the total size + * on-tape of the tuple, including itself (so it is never zero). + * The remainder of the stored tuple + * may or may not match the in-memory representation of the tuple --- + * any conversion needed is the job of the writetup and readtup routines. + * + * If state->backward is true, then the stored representation of + * the tuple must be followed by another "unsigned int" that is a copy of the + * length --- so the total tape space used is actually sizeof(unsigned int) + * more than the stored length value. This allows read-backwards. When + * state->backward is not set, the write/read routines may omit the extra + * length word. + * + * writetup is expected to write both length words as well as the tuple + * data. When readtup is called, the tape is positioned just after the + * front length word; readtup must read the tuple data and advance past + * the back length word (if present). + * + * The write/read routines can make use of the tuple description data + * stored in the Tuplestorestate record, if needed. They are also expected + * to adjust state->availMem by the amount of memory space (not tape space!) + * released or consumed. There is no error return from either writetup + * or readtup; they should ereport() on failure. + * + * + * NOTES about memory consumption calculations: + * + * We count space allocated for tuples against the maxKBytes limit, + * plus the space used by the variable-size array memtuples. + * Fixed-size space (primarily the BufFile I/O buffer) is not counted. + * We don't worry about the size of the read pointer array, either. + * + * Note that we count actual space used (as shown by GetMemoryChunkSpace) + * rather than the originally-requested size. This is important since + * palloc can add substantial overhead. It's not a complete answer since + * we won't count any wasted space in palloc allocation blocks, but it's + * a lot better than what we were doing before 7.3. + * + *-------------------- + */ + + +static Tuplestorestate *tuplestore_begin_common(int eflags, + bool interXact, + int maxKBytes); +static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple); +static void dumptuples(Tuplestorestate *state); +static unsigned int getlen(Tuplestorestate *state, bool eofOK); +static void *copytup_heap(Tuplestorestate *state, void *tup); +static void writetup_heap(Tuplestorestate *state, void *tup); +static void *readtup_heap(Tuplestorestate *state, unsigned int len); + + +/* + * tuplestore_begin_xxx + * + * Initialize for a tuple store operation. + */ +static Tuplestorestate * +tuplestore_begin_common(int eflags, bool interXact, int maxKBytes) +{ + Tuplestorestate *state; + + state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate)); + + state->status = TSS_INMEM; + state->eflags = eflags; + state->interXact = interXact; + state->truncated = false; + state->allowedMem = maxKBytes * 1024L; + state->availMem = state->allowedMem; + state->myfile = NULL; + state->context = CurrentMemoryContext; + state->resowner = CurrentResourceOwner; + + state->memtupdeleted = 0; + state->memtupcount = 0; + state->tuples = 0; + + /* + * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD; + * see comments in grow_memtuples(). + */ + state->memtupsize = Max(16384 / sizeof(void *), + ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1); + + state->growmemtuples = true; + state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *)); + + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + + state->activeptr = 0; + state->readptrcount = 1; + state->readptrsize = 8; /* arbitrary */ + state->readptrs = (TSReadPointer *) + palloc(state->readptrsize * sizeof(TSReadPointer)); + + state->readptrs[0].eflags = eflags; + state->readptrs[0].eof_reached = false; + state->readptrs[0].current = 0; + + return state; +} + +/* + * tuplestore_begin_heap + * + * Create a new tuplestore; other types of tuple stores (other than + * "heap" tuple stores, for heap tuples) are possible, but not presently + * implemented. + * + * randomAccess: if true, both forward and backward accesses to the + * tuple store are allowed. + * + * interXact: if true, the files used for on-disk storage persist beyond the + * end of the current transaction. NOTE: It's the caller's responsibility to + * create such a tuplestore in a memory context and resource owner that will + * also survive transaction boundaries, and to ensure the tuplestore is closed + * when it's no longer wanted. + * + * maxKBytes: how much data to store in memory (any data beyond this + * amount is paged to disk). When in doubt, use work_mem. + */ +Tuplestorestate * +tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) +{ + Tuplestorestate *state; + int eflags; + + /* + * This interpretation of the meaning of randomAccess is compatible with + * the pre-8.3 behavior of tuplestores. + */ + eflags = randomAccess ? + (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : + (EXEC_FLAG_REWIND); + + state = tuplestore_begin_common(eflags, interXact, maxKBytes); + + state->copytup = copytup_heap; + state->writetup = writetup_heap; + state->readtup = readtup_heap; + + return state; +} + +/* + * tuplestore_set_eflags + * + * Set the capability flags for read pointer 0 at a finer grain than is + * allowed by tuplestore_begin_xxx. This must be called before inserting + * any data into the tuplestore. + * + * eflags is a bitmask following the meanings used for executor node + * startup flags (see executor.h). tuplestore pays attention to these bits: + * EXEC_FLAG_REWIND need rewind to start + * EXEC_FLAG_BACKWARD need backward fetch + * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD + * is set per "randomAccess" in the tuplestore_begin_xxx call. + * + * NOTE: setting BACKWARD without REWIND means the pointer can read backwards, + * but not further than the truncation point (the furthest-back read pointer + * position at the time of the last tuplestore_trim call). + */ +void +tuplestore_set_eflags(Tuplestorestate *state, int eflags) +{ + int i; + + if (state->status != TSS_INMEM || state->memtupcount != 0) + elog(ERROR, "too late to call tuplestore_set_eflags"); + + state->readptrs[0].eflags = eflags; + for (i = 1; i < state->readptrcount; i++) + eflags |= state->readptrs[i].eflags; + state->eflags = eflags; +} + +/* + * tuplestore_alloc_read_pointer - allocate another read pointer. + * + * Returns the pointer's index. + * + * The new pointer initially copies the position of read pointer 0. + * It can have its own eflags, but if any data has been inserted into + * the tuplestore, these eflags must not represent an increase in + * requirements. + */ +int +tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags) +{ + /* Check for possible increase of requirements */ + if (state->status != TSS_INMEM || state->memtupcount != 0) + { + if ((state->eflags | eflags) != state->eflags) + elog(ERROR, "too late to require new tuplestore eflags"); + } + + /* Make room for another read pointer if needed */ + if (state->readptrcount >= state->readptrsize) + { + int newcnt = state->readptrsize * 2; + + state->readptrs = (TSReadPointer *) + repalloc(state->readptrs, newcnt * sizeof(TSReadPointer)); + state->readptrsize = newcnt; + } + + /* And set it up */ + state->readptrs[state->readptrcount] = state->readptrs[0]; + state->readptrs[state->readptrcount].eflags = eflags; + + state->eflags |= eflags; + + return state->readptrcount++; +} + +/* + * tuplestore_clear + * + * Delete all the contents of a tuplestore, and reset its read pointers + * to the start. + */ +void +tuplestore_clear(Tuplestorestate *state) +{ + int i; + TSReadPointer *readptr; + + if (state->myfile) + BufFileClose(state->myfile); + state->myfile = NULL; + if (state->memtuples) + { + for (i = state->memtupdeleted; i < state->memtupcount; i++) + { + FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i])); + pfree(state->memtuples[i]); + } + } + state->status = TSS_INMEM; + state->truncated = false; + state->memtupdeleted = 0; + state->memtupcount = 0; + state->tuples = 0; + readptr = state->readptrs; + for (i = 0; i < state->readptrcount; readptr++, i++) + { + readptr->eof_reached = false; + readptr->current = 0; + } +} + +/* + * tuplestore_end + * + * Release resources and clean up. + */ +void +tuplestore_end(Tuplestorestate *state) +{ + int i; + + if (state->myfile) + BufFileClose(state->myfile); + if (state->memtuples) + { + for (i = state->memtupdeleted; i < state->memtupcount; i++) + pfree(state->memtuples[i]); + pfree(state->memtuples); + } + pfree(state->readptrs); + pfree(state); +} + +/* + * tuplestore_select_read_pointer - make the specified read pointer active + */ +void +tuplestore_select_read_pointer(Tuplestorestate *state, int ptr) +{ + TSReadPointer *readptr; + TSReadPointer *oldptr; + + Assert(ptr >= 0 && ptr < state->readptrcount); + + /* No work if already active */ + if (ptr == state->activeptr) + return; + + readptr = &state->readptrs[ptr]; + oldptr = &state->readptrs[state->activeptr]; + + switch (state->status) + { + case TSS_INMEM: + case TSS_WRITEFILE: + /* no work */ + break; + case TSS_READFILE: + + /* + * First, save the current read position in the pointer about to + * become inactive. + */ + if (!oldptr->eof_reached) + BufFileTell(state->myfile, + &oldptr->file, + &oldptr->offset); + + /* + * We have to make the temp file's seek position equal to the + * logical position of the new read pointer. In eof_reached + * state, that's the EOF, which we have available from the saved + * write position. + */ + if (readptr->eof_reached) + { + if (BufFileSeek(state->myfile, + state->writepos_file, + state->writepos_offset, + SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + } + else + { + if (BufFileSeek(state->myfile, + readptr->file, + readptr->offset, + SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + } + break; + default: + elog(ERROR, "invalid tuplestore state"); + break; + } + + state->activeptr = ptr; +} + +/* + * tuplestore_tuple_count + * + * Returns the number of tuples added since creation or the last + * tuplestore_clear(). + */ +int64 +tuplestore_tuple_count(Tuplestorestate *state) +{ + return state->tuples; +} + +/* + * tuplestore_ateof + * + * Returns the active read pointer's eof_reached state. + */ +bool +tuplestore_ateof(Tuplestorestate *state) +{ + return state->readptrs[state->activeptr].eof_reached; +} + +/* + * Grow the memtuples[] array, if possible within our memory constraint. We + * must not exceed INT_MAX tuples in memory or the caller-provided memory + * limit. Return true if we were able to enlarge the array, false if not. + * + * Normally, at each increment we double the size of the array. When doing + * that would exceed a limit, we attempt one last, smaller increase (and then + * clear the growmemtuples flag so we don't try any more). That allows us to + * use memory as fully as permitted; sticking to the pure doubling rule could + * result in almost half going unused. Because availMem moves around with + * tuple addition/removal, we need some rule to prevent making repeated small + * increases in memtupsize, which would just be useless thrashing. The + * growmemtuples flag accomplishes that and also prevents useless + * recalculations in this function. + */ +static bool +grow_memtuples(Tuplestorestate *state) +{ + int newmemtupsize; + int memtupsize = state->memtupsize; + int64 memNowUsed = state->allowedMem - state->availMem; + + /* Forget it if we've already maxed out memtuples, per comment above */ + if (!state->growmemtuples) + return false; + + /* Select new value of memtupsize */ + if (memNowUsed <= state->availMem) + { + /* + * We've used no more than half of allowedMem; double our usage, + * clamping at INT_MAX tuples. + */ + if (memtupsize < INT_MAX / 2) + newmemtupsize = memtupsize * 2; + else + { + newmemtupsize = INT_MAX; + state->growmemtuples = false; + } + } + else + { + /* + * This will be the last increment of memtupsize. Abandon doubling + * strategy and instead increase as much as we safely can. + * + * To stay within allowedMem, we can't increase memtupsize by more + * than availMem / sizeof(void *) elements. In practice, we want to + * increase it by considerably less, because we need to leave some + * space for the tuples to which the new array slots will refer. We + * assume the new tuples will be about the same size as the tuples + * we've already seen, and thus we can extrapolate from the space + * consumption so far to estimate an appropriate new size for the + * memtuples array. The optimal value might be higher or lower than + * this estimate, but it's hard to know that in advance. We again + * clamp at INT_MAX tuples. + * + * This calculation is safe against enlarging the array so much that + * LACKMEM becomes true, because the memory currently used includes + * the present array; thus, there would be enough allowedMem for the + * new array elements even if no other memory were currently used. + * + * We do the arithmetic in float8, because otherwise the product of + * memtupsize and allowedMem could overflow. Any inaccuracy in the + * result should be insignificant; but even if we computed a + * completely insane result, the checks below will prevent anything + * really bad from happening. + */ + double grow_ratio; + + grow_ratio = (double) state->allowedMem / (double) memNowUsed; + if (memtupsize * grow_ratio < INT_MAX) + newmemtupsize = (int) (memtupsize * grow_ratio); + else + newmemtupsize = INT_MAX; + + /* We won't make any further enlargement attempts */ + state->growmemtuples = false; + } + + /* Must enlarge array by at least one element, else report failure */ + if (newmemtupsize <= memtupsize) + goto noalloc; + + /* + * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp + * to ensure our request won't be rejected. Note that we can easily + * exhaust address space before facing this outcome. (This is presently + * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but + * don't rely on that at this distance.) + */ + if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *)) + { + newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *)); + state->growmemtuples = false; /* can't grow any more */ + } + + /* + * We need to be sure that we do not cause LACKMEM to become true, else + * the space management algorithm will go nuts. The code above should + * never generate a dangerous request, but to be safe, check explicitly + * that the array growth fits within availMem. (We could still cause + * LACKMEM if the memory chunk overhead associated with the memtuples + * array were to increase. That shouldn't happen because we chose the + * initial array size large enough to ensure that palloc will be treating + * both old and new arrays as separate chunks. But we'll check LACKMEM + * explicitly below just in case.) + */ + if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *))) + goto noalloc; + + /* OK, do it */ + FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); + state->memtupsize = newmemtupsize; + state->memtuples = (void **) + repalloc_huge(state->memtuples, + state->memtupsize * sizeof(void *)); + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + if (LACKMEM(state)) + elog(ERROR, "unexpected out-of-memory situation in tuplestore"); + return true; + +noalloc: + /* If for any reason we didn't realloc, shut off future attempts */ + state->growmemtuples = false; + return false; +} + +/* + * Accept one tuple and append it to the tuplestore. + * + * Note that the input tuple is always copied; the caller need not save it. + * + * If the active read pointer is currently "at EOF", it remains so (the read + * pointer implicitly advances along with the write pointer); otherwise the + * read pointer is unchanged. Non-active read pointers do not move, which + * means they are certain to not be "at EOF" immediately after puttuple. + * This curious-seeming behavior is for the convenience of nodeMaterial.c and + * nodeCtescan.c, which would otherwise need to do extra pointer repositioning + * steps. + * + * tuplestore_puttupleslot() is a convenience routine to collect data from + * a TupleTableSlot without an extra copy operation. + */ +void +tuplestore_puttupleslot(Tuplestorestate *state, + TupleTableSlot *slot) +{ + MinimalTuple tuple; + MemoryContext oldcxt = MemoryContextSwitchTo(state->context); + + /* + * Form a MinimalTuple in working memory + */ + tuple = ExecCopySlotMinimalTuple(slot); + USEMEM(state, GetMemoryChunkSpace(tuple)); + + tuplestore_puttuple_common(state, (void *) tuple); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * "Standard" case to copy from a HeapTuple. This is actually now somewhat + * deprecated, but not worth getting rid of in view of the number of callers. + */ +void +tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple) +{ + MemoryContext oldcxt = MemoryContextSwitchTo(state->context); + + /* + * Copy the tuple. (Must do this even in WRITEFILE case. Note that + * COPYTUP includes USEMEM, so we needn't do that here.) + */ + tuple = COPYTUP(state, tuple); + + tuplestore_puttuple_common(state, (void *) tuple); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Similar to tuplestore_puttuple(), but work from values + nulls arrays. + * This avoids an extra tuple-construction operation. + */ +void +tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, + Datum *values, bool *isnull) +{ + MinimalTuple tuple; + MemoryContext oldcxt = MemoryContextSwitchTo(state->context); + + tuple = heap_form_minimal_tuple(tdesc, values, isnull); + USEMEM(state, GetMemoryChunkSpace(tuple)); + + tuplestore_puttuple_common(state, (void *) tuple); + + MemoryContextSwitchTo(oldcxt); +} + +static void +tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) +{ + TSReadPointer *readptr; + int i; + ResourceOwner oldowner; + + state->tuples++; + + switch (state->status) + { + case TSS_INMEM: + + /* + * Update read pointers as needed; see API spec above. + */ + readptr = state->readptrs; + for (i = 0; i < state->readptrcount; readptr++, i++) + { + if (readptr->eof_reached && i != state->activeptr) + { + readptr->eof_reached = false; + readptr->current = state->memtupcount; + } + } + + /* + * Grow the array as needed. Note that we try to grow the array + * when there is still one free slot remaining --- if we fail, + * there'll still be room to store the incoming tuple, and then + * we'll switch to tape-based operation. + */ + if (state->memtupcount >= state->memtupsize - 1) + { + (void) grow_memtuples(state); + Assert(state->memtupcount < state->memtupsize); + } + + /* Stash the tuple in the in-memory array */ + state->memtuples[state->memtupcount++] = tuple; + + /* + * Done if we still fit in available memory and have array slots. + */ + if (state->memtupcount < state->memtupsize && !LACKMEM(state)) + return; + + /* + * Nope; time to switch to tape-based operation. Make sure that + * the temp file(s) are created in suitable temp tablespaces. + */ + PrepareTempTablespaces(); + + /* associate the file with the store's resource owner */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = state->resowner; + + state->myfile = BufFileCreateTemp(state->interXact); + + CurrentResourceOwner = oldowner; + + /* + * Freeze the decision about whether trailing length words will be + * used. We can't change this choice once data is on tape, even + * though callers might drop the requirement. + */ + state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0; + state->status = TSS_WRITEFILE; + dumptuples(state); + break; + case TSS_WRITEFILE: + + /* + * Update read pointers as needed; see API spec above. Note: + * BufFileTell is quite cheap, so not worth trying to avoid + * multiple calls. + */ + readptr = state->readptrs; + for (i = 0; i < state->readptrcount; readptr++, i++) + { + if (readptr->eof_reached && i != state->activeptr) + { + readptr->eof_reached = false; + BufFileTell(state->myfile, + &readptr->file, + &readptr->offset); + } + } + + WRITETUP(state, tuple); + break; + case TSS_READFILE: + + /* + * Switch from reading to writing. + */ + if (!state->readptrs[state->activeptr].eof_reached) + BufFileTell(state->myfile, + &state->readptrs[state->activeptr].file, + &state->readptrs[state->activeptr].offset); + if (BufFileSeek(state->myfile, + state->writepos_file, state->writepos_offset, + SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + state->status = TSS_WRITEFILE; + + /* + * Update read pointers as needed; see API spec above. + */ + readptr = state->readptrs; + for (i = 0; i < state->readptrcount; readptr++, i++) + { + if (readptr->eof_reached && i != state->activeptr) + { + readptr->eof_reached = false; + readptr->file = state->writepos_file; + readptr->offset = state->writepos_offset; + } + } + + WRITETUP(state, tuple); + break; + default: + elog(ERROR, "invalid tuplestore state"); + break; + } +} + +/* + * Fetch the next tuple in either forward or back direction. + * Returns NULL if no more tuples. If should_free is set, the + * caller must pfree the returned tuple when done with it. + * + * Backward scan is only allowed if randomAccess was set true or + * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags(). + */ +static void * +tuplestore_gettuple(Tuplestorestate *state, bool forward, + bool *should_free) +{ + TSReadPointer *readptr = &state->readptrs[state->activeptr]; + unsigned int tuplen; + void *tup; + + Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); + + switch (state->status) + { + case TSS_INMEM: + *should_free = false; + if (forward) + { + if (readptr->eof_reached) + return NULL; + if (readptr->current < state->memtupcount) + { + /* We have another tuple, so return it */ + return state->memtuples[readptr->current++]; + } + readptr->eof_reached = true; + return NULL; + } + else + { + /* + * if all tuples are fetched already then we return last + * tuple, else tuple before last returned. + */ + if (readptr->eof_reached) + { + readptr->current = state->memtupcount; + readptr->eof_reached = false; + } + else + { + if (readptr->current <= state->memtupdeleted) + { + Assert(!state->truncated); + return NULL; + } + readptr->current--; /* last returned tuple */ + } + if (readptr->current <= state->memtupdeleted) + { + Assert(!state->truncated); + return NULL; + } + return state->memtuples[readptr->current - 1]; + } + break; + + case TSS_WRITEFILE: + /* Skip state change if we'll just return NULL */ + if (readptr->eof_reached && forward) + return NULL; + + /* + * Switch from writing to reading. + */ + BufFileTell(state->myfile, + &state->writepos_file, &state->writepos_offset); + if (!readptr->eof_reached) + if (BufFileSeek(state->myfile, + readptr->file, readptr->offset, + SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + state->status = TSS_READFILE; + /* FALLTHROUGH */ + + case TSS_READFILE: + *should_free = true; + if (forward) + { + if ((tuplen = getlen(state, true)) != 0) + { + tup = READTUP(state, tuplen); + return tup; + } + else + { + readptr->eof_reached = true; + return NULL; + } + } + + /* + * Backward. + * + * if all tuples are fetched already then we return last tuple, + * else tuple before last returned. + * + * Back up to fetch previously-returned tuple's ending length + * word. If seek fails, assume we are at start of file. + */ + if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int), + SEEK_CUR) != 0) + { + /* even a failed backwards fetch gets you out of eof state */ + readptr->eof_reached = false; + Assert(!state->truncated); + return NULL; + } + tuplen = getlen(state, false); + + if (readptr->eof_reached) + { + readptr->eof_reached = false; + /* We will return the tuple returned before returning NULL */ + } + else + { + /* + * Back up to get ending length word of tuple before it. + */ + if (BufFileSeek(state->myfile, 0, + -(long) (tuplen + 2 * sizeof(unsigned int)), + SEEK_CUR) != 0) + { + /* + * If that fails, presumably the prev tuple is the first + * in the file. Back up so that it becomes next to read + * in forward direction (not obviously right, but that is + * what in-memory case does). + */ + if (BufFileSeek(state->myfile, 0, + -(long) (tuplen + sizeof(unsigned int)), + SEEK_CUR) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + Assert(!state->truncated); + return NULL; + } + tuplen = getlen(state, false); + } + + /* + * Now we have the length of the prior tuple, back up and read it. + * Note: READTUP expects we are positioned after the initial + * length word of the tuple, so back up to that point. + */ + if (BufFileSeek(state->myfile, 0, + -(long) tuplen, + SEEK_CUR) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + tup = READTUP(state, tuplen); + return tup; + + default: + elog(ERROR, "invalid tuplestore state"); + return NULL; /* keep compiler quiet */ + } +} + +/* + * tuplestore_gettupleslot - exported function to fetch a MinimalTuple + * + * If successful, put tuple in slot and return true; else, clear the slot + * and return false. + * + * If copy is true, the slot receives a copied tuple (allocated in current + * memory context) that will stay valid regardless of future manipulations of + * the tuplestore's state. If copy is false, the slot may just receive a + * pointer to a tuple held within the tuplestore. The latter is more + * efficient but the slot contents may be corrupted if additional writes to + * the tuplestore occur. (If using tuplestore_trim, see comments therein.) + */ +bool +tuplestore_gettupleslot(Tuplestorestate *state, bool forward, + bool copy, TupleTableSlot *slot) +{ + MinimalTuple tuple; + bool should_free; + + tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free); + + if (tuple) + { + if (copy && !should_free) + { + tuple = heap_copy_minimal_tuple(tuple); + should_free = true; + } + ExecStoreMinimalTuple(tuple, slot, should_free); + return true; + } + else + { + ExecClearTuple(slot); + return false; + } +} + +/* + * tuplestore_advance - exported function to adjust position without fetching + * + * We could optimize this case to avoid palloc/pfree overhead, but for the + * moment it doesn't seem worthwhile. + */ +bool +tuplestore_advance(Tuplestorestate *state, bool forward) +{ + void *tuple; + bool should_free; + + tuple = tuplestore_gettuple(state, forward, &should_free); + + if (tuple) + { + if (should_free) + pfree(tuple); + return true; + } + else + { + return false; + } +} + +/* + * Advance over N tuples in either forward or back direction, + * without returning any data. N<=0 is a no-op. + * Returns true if successful, false if ran out of tuples. + */ +bool +tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward) +{ + TSReadPointer *readptr = &state->readptrs[state->activeptr]; + + Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD)); + + if (ntuples <= 0) + return true; + + switch (state->status) + { + case TSS_INMEM: + if (forward) + { + if (readptr->eof_reached) + return false; + if (state->memtupcount - readptr->current >= ntuples) + { + readptr->current += ntuples; + return true; + } + readptr->current = state->memtupcount; + readptr->eof_reached = true; + return false; + } + else + { + if (readptr->eof_reached) + { + readptr->current = state->memtupcount; + readptr->eof_reached = false; + ntuples--; + } + if (readptr->current - state->memtupdeleted > ntuples) + { + readptr->current -= ntuples; + return true; + } + Assert(!state->truncated); + readptr->current = state->memtupdeleted; + return false; + } + break; + + default: + /* We don't currently try hard to optimize other cases */ + while (ntuples-- > 0) + { + void *tuple; + bool should_free; + + tuple = tuplestore_gettuple(state, forward, &should_free); + + if (tuple == NULL) + return false; + if (should_free) + pfree(tuple); + CHECK_FOR_INTERRUPTS(); + } + return true; + } +} + +/* + * dumptuples - remove tuples from memory and write to tape + * + * As a side effect, we must convert each read pointer's position from + * "current" to file/offset format. But eof_reached pointers don't + * need to change state. + */ +static void +dumptuples(Tuplestorestate *state) +{ + int i; + + for (i = state->memtupdeleted;; i++) + { + TSReadPointer *readptr = state->readptrs; + int j; + + for (j = 0; j < state->readptrcount; readptr++, j++) + { + if (i == readptr->current && !readptr->eof_reached) + BufFileTell(state->myfile, + &readptr->file, &readptr->offset); + } + if (i >= state->memtupcount) + break; + WRITETUP(state, state->memtuples[i]); + } + state->memtupdeleted = 0; + state->memtupcount = 0; +} + +/* + * tuplestore_rescan - rewind the active read pointer to start + */ +void +tuplestore_rescan(Tuplestorestate *state) +{ + TSReadPointer *readptr = &state->readptrs[state->activeptr]; + + Assert(readptr->eflags & EXEC_FLAG_REWIND); + Assert(!state->truncated); + + switch (state->status) + { + case TSS_INMEM: + readptr->eof_reached = false; + readptr->current = 0; + break; + case TSS_WRITEFILE: + readptr->eof_reached = false; + readptr->file = 0; + readptr->offset = 0L; + break; + case TSS_READFILE: + readptr->eof_reached = false; + if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + break; + default: + elog(ERROR, "invalid tuplestore state"); + break; + } +} + +/* + * tuplestore_copy_read_pointer - copy a read pointer's state to another + */ +void +tuplestore_copy_read_pointer(Tuplestorestate *state, + int srcptr, int destptr) +{ + TSReadPointer *sptr = &state->readptrs[srcptr]; + TSReadPointer *dptr = &state->readptrs[destptr]; + + Assert(srcptr >= 0 && srcptr < state->readptrcount); + Assert(destptr >= 0 && destptr < state->readptrcount); + + /* Assigning to self is a no-op */ + if (srcptr == destptr) + return; + + if (dptr->eflags != sptr->eflags) + { + /* Possible change of overall eflags, so copy and then recompute */ + int eflags; + int i; + + *dptr = *sptr; + eflags = state->readptrs[0].eflags; + for (i = 1; i < state->readptrcount; i++) + eflags |= state->readptrs[i].eflags; + state->eflags = eflags; + } + else + *dptr = *sptr; + + switch (state->status) + { + case TSS_INMEM: + case TSS_WRITEFILE: + /* no work */ + break; + case TSS_READFILE: + + /* + * This case is a bit tricky since the active read pointer's + * position corresponds to the seek point, not what is in its + * variables. Assigning to the active requires a seek, and + * assigning from the active requires a tell, except when + * eof_reached. + */ + if (destptr == state->activeptr) + { + if (dptr->eof_reached) + { + if (BufFileSeek(state->myfile, + state->writepos_file, + state->writepos_offset, + SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + } + else + { + if (BufFileSeek(state->myfile, + dptr->file, dptr->offset, + SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek in tuplestore temporary file"))); + } + } + else if (srcptr == state->activeptr) + { + if (!dptr->eof_reached) + BufFileTell(state->myfile, + &dptr->file, + &dptr->offset); + } + break; + default: + elog(ERROR, "invalid tuplestore state"); + break; + } +} + +/* + * tuplestore_trim - remove all no-longer-needed tuples + * + * Calling this function authorizes the tuplestore to delete all tuples + * before the oldest read pointer, if no read pointer is marked as requiring + * REWIND capability. + * + * Note: this is obviously safe if no pointer has BACKWARD capability either. + * If a pointer is marked as BACKWARD but not REWIND capable, it means that + * the pointer can be moved backward but not before the oldest other read + * pointer. + */ +void +tuplestore_trim(Tuplestorestate *state) +{ + int oldest; + int nremove; + int i; + + /* + * Truncation is disallowed if any read pointer requires rewind + * capability. + */ + if (state->eflags & EXEC_FLAG_REWIND) + return; + + /* + * We don't bother trimming temp files since it usually would mean more + * work than just letting them sit in kernel buffers until they age out. + */ + if (state->status != TSS_INMEM) + return; + + /* Find the oldest read pointer */ + oldest = state->memtupcount; + for (i = 0; i < state->readptrcount; i++) + { + if (!state->readptrs[i].eof_reached) + oldest = Min(oldest, state->readptrs[i].current); + } + + /* + * Note: you might think we could remove all the tuples before the oldest + * "current", since that one is the next to be returned. However, since + * tuplestore_gettuple returns a direct pointer to our internal copy of + * the tuple, it's likely that the caller has still got the tuple just + * before "current" referenced in a slot. So we keep one extra tuple + * before the oldest "current". (Strictly speaking, we could require such + * callers to use the "copy" flag to tuplestore_gettupleslot, but for + * efficiency we allow this one case to not use "copy".) + */ + nremove = oldest - 1; + if (nremove <= 0) + return; /* nothing to do */ + + Assert(nremove >= state->memtupdeleted); + Assert(nremove <= state->memtupcount); + + /* Release no-longer-needed tuples */ + for (i = state->memtupdeleted; i < nremove; i++) + { + FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i])); + pfree(state->memtuples[i]); + state->memtuples[i] = NULL; + } + state->memtupdeleted = nremove; + + /* mark tuplestore as truncated (used for Assert crosschecks only) */ + state->truncated = true; + + /* + * If nremove is less than 1/8th memtupcount, just stop here, leaving the + * "deleted" slots as NULL. This prevents us from expending O(N^2) time + * repeatedly memmove-ing a large pointer array. The worst case space + * wastage is pretty small, since it's just pointers and not whole tuples. + */ + if (nremove < state->memtupcount / 8) + return; + + /* + * Slide the array down and readjust pointers. + * + * In mergejoin's current usage, it's demonstrable that there will always + * be exactly one non-removed tuple; so optimize that case. + */ + if (nremove + 1 == state->memtupcount) + state->memtuples[0] = state->memtuples[nremove]; + else + memmove(state->memtuples, state->memtuples + nremove, + (state->memtupcount - nremove) * sizeof(void *)); + + state->memtupdeleted = 0; + state->memtupcount -= nremove; + for (i = 0; i < state->readptrcount; i++) + { + if (!state->readptrs[i].eof_reached) + state->readptrs[i].current -= nremove; + } +} + +/* + * tuplestore_in_memory + * + * Returns true if the tuplestore has not spilled to disk. + * + * XXX exposing this is a violation of modularity ... should get rid of it. + */ +bool +tuplestore_in_memory(Tuplestorestate *state) +{ + return (state->status == TSS_INMEM); +} + + +/* + * Tape interface routines + */ + +static unsigned int +getlen(Tuplestorestate *state, bool eofOK) +{ + unsigned int len; + size_t nbytes; + + nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len)); + if (nbytes == sizeof(len)) + return len; + if (nbytes != 0 || !eofOK) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes", + nbytes, sizeof(len)))); + return 0; +} + + +/* + * Routines specialized for HeapTuple case + * + * The stored form is actually a MinimalTuple, but for largely historical + * reasons we allow COPYTUP to work from a HeapTuple. + * + * Since MinimalTuple already has length in its first word, we don't need + * to write that separately. + */ + +static void * +copytup_heap(Tuplestorestate *state, void *tup) +{ + MinimalTuple tuple; + + tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup); + USEMEM(state, GetMemoryChunkSpace(tuple)); + return (void *) tuple; +} + +static void +writetup_heap(Tuplestorestate *state, void *tup) +{ + MinimalTuple tuple = (MinimalTuple) tup; + + /* the part of the MinimalTuple we'll write: */ + char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; + unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET; + + /* total on-disk footprint: */ + unsigned int tuplen = tupbodylen + sizeof(int); + + BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen)); + BufFileWrite(state->myfile, (void *) tupbody, tupbodylen); + if (state->backward) /* need trailing length word? */ + BufFileWrite(state->myfile, (void *) &tuplen, sizeof(tuplen)); + + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_free_minimal_tuple(tuple); +} + +static void * +readtup_heap(Tuplestorestate *state, unsigned int len) +{ + unsigned int tupbodylen = len - sizeof(int); + unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; + MinimalTuple tuple = (MinimalTuple) palloc(tuplen); + char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; + size_t nread; + + USEMEM(state, GetMemoryChunkSpace(tuple)); + /* read in the tuple proper */ + tuple->t_len = tuplen; + nread = BufFileRead(state->myfile, (void *) tupbody, tupbodylen); + if (nread != (size_t) tupbodylen) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes", + nread, (size_t) tupbodylen))); + if (state->backward) /* need trailing length word? */ + { + nread = BufFileRead(state->myfile, (void *) &tuplen, sizeof(tuplen)); + if (nread != sizeof(tuplen)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from tuplestore temporary file: read only %zu of %zu bytes", + nread, sizeof(tuplen)))); + } + return (void *) tuple; +} |