diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:19:15 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:19:15 +0000 |
commit | 6eb9c5a5657d1fe77b55cc261450f3538d35a94d (patch) | |
tree | 657d8194422a5daccecfd42d654b8a245ef7b4c8 /src/backend/executor/tqueue.c | |
parent | Initial commit. (diff) | |
download | postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.tar.xz postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.zip |
Adding upstream version 13.4.upstream/13.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/executor/tqueue.c')
-rw-r--r-- | src/backend/executor/tqueue.c | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c new file mode 100644 index 0000000..e5656fb --- /dev/null +++ b/src/backend/executor/tqueue.c @@ -0,0 +1,212 @@ +/*------------------------------------------------------------------------- + * + * tqueue.c + * Use shm_mq to send & receive tuples between parallel backends + * + * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver + * under the hood, writes tuples from the executor to a shm_mq. + * + * A TupleQueueReader reads tuples from a shm_mq and returns the tuples. + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/tqueue.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "executor/tqueue.h" + +/* + * DestReceiver object's private contents + * + * queue is a pointer to data supplied by DestReceiver's caller. + */ +typedef struct TQueueDestReceiver +{ + DestReceiver pub; /* public fields */ + shm_mq_handle *queue; /* shm_mq to send to */ +} TQueueDestReceiver; + +/* + * TupleQueueReader object's private contents + * + * queue is a pointer to data supplied by reader's caller. + * + * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h + */ +struct TupleQueueReader +{ + shm_mq_handle *queue; /* shm_mq to receive from */ +}; + +/* + * Receive a tuple from a query, and send it to the designated shm_mq. + * + * Returns true if successful, false if shm_mq has been detached. + */ +static bool +tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + HeapTuple tuple; + shm_mq_result result; + bool should_free; + + /* Send the tuple itself. */ + tuple = ExecFetchSlotHeapTuple(slot, true, &should_free); + result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false); + + if (should_free) + heap_freetuple(tuple); + + /* Check for failure. */ + if (result == SHM_MQ_DETACHED) + return false; + else if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send tuple to shared-memory queue"))); + + return true; +} + +/* + * Prepare to receive tuples from executor. + */ +static void +tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + /* do nothing */ +} + +/* + * Clean up at end of an executor run + */ +static void +tqueueShutdownReceiver(DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + + if (tqueue->queue != NULL) + shm_mq_detach(tqueue->queue); + tqueue->queue = NULL; +} + +/* + * Destroy receiver when done with it + */ +static void +tqueueDestroyReceiver(DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + + /* We probably already detached from queue, but let's be sure */ + if (tqueue->queue != NULL) + shm_mq_detach(tqueue->queue); + pfree(self); +} + +/* + * Create a DestReceiver that writes tuples to a tuple queue. + */ +DestReceiver * +CreateTupleQueueDestReceiver(shm_mq_handle *handle) +{ + TQueueDestReceiver *self; + + self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver)); + + self->pub.receiveSlot = tqueueReceiveSlot; + self->pub.rStartup = tqueueStartupReceiver; + self->pub.rShutdown = tqueueShutdownReceiver; + self->pub.rDestroy = tqueueDestroyReceiver; + self->pub.mydest = DestTupleQueue; + self->queue = handle; + + return (DestReceiver *) self; +} + +/* + * Create a tuple queue reader. + */ +TupleQueueReader * +CreateTupleQueueReader(shm_mq_handle *handle) +{ + TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); + + reader->queue = handle; + + return reader; +} + +/* + * Destroy a tuple queue reader. + * + * Note: cleaning up the underlying shm_mq is the caller's responsibility. + * We won't access it here, as it may be detached already. + */ +void +DestroyTupleQueueReader(TupleQueueReader *reader) +{ + pfree(reader); +} + +/* + * Fetch a tuple from a tuple queue reader. + * + * The return value is NULL if there are no remaining tuples or if + * nowait = true and no tuple is ready to return. *done, if not NULL, + * is set to true when there are no remaining tuples and otherwise to false. + * + * The returned tuple, if any, is allocated in CurrentMemoryContext. + * Note that this routine must not leak memory! (We used to allow that, + * but not any more.) + * + * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still + * accumulate bytes from a partially-read message, so it's useful to call + * this with nowait = true even if nothing is returned. + */ +HeapTuple +TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) +{ + HeapTupleData htup; + shm_mq_result result; + Size nbytes; + void *data; + + if (done != NULL) + *done = false; + + /* Attempt to read a message. */ + result = shm_mq_receive(reader->queue, &nbytes, &data, nowait); + + /* If queue is detached, set *done and return NULL. */ + if (result == SHM_MQ_DETACHED) + { + if (done != NULL) + *done = true; + return NULL; + } + + /* In non-blocking mode, bail out if no message ready yet. */ + if (result == SHM_MQ_WOULD_BLOCK) + return NULL; + Assert(result == SHM_MQ_SUCCESS); + + /* + * Set up a dummy HeapTupleData pointing to the data from the shm_mq + * (which had better be sufficiently aligned). + */ + ItemPointerSetInvalid(&htup.t_self); + htup.t_tableOid = InvalidOid; + htup.t_len = nbytes; + htup.t_data = data; + + return heap_copytuple(&htup); +} |