diff options
Diffstat (limited to 'src/backend/executor/tqueue.c')
-rw-r--r-- | src/backend/executor/tqueue.c | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c new file mode 100644 index 0000000..3449b80 --- /dev/null +++ b/src/backend/executor/tqueue.c @@ -0,0 +1,210 @@ +/*------------------------------------------------------------------------- + * + * tqueue.c + * Use shm_mq to send & receive tuples between parallel backends + * + * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver + * under the hood, writes tuples from the executor to a shm_mq. + * + * A TupleQueueReader reads tuples from a shm_mq and returns the tuples. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/tqueue.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "executor/tqueue.h" + +/* + * DestReceiver object's private contents + * + * queue is a pointer to data supplied by DestReceiver's caller. + */ +typedef struct TQueueDestReceiver +{ + DestReceiver pub; /* public fields */ + shm_mq_handle *queue; /* shm_mq to send to */ +} TQueueDestReceiver; + +/* + * TupleQueueReader object's private contents + * + * queue is a pointer to data supplied by reader's caller. + * + * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h + */ +struct TupleQueueReader +{ + shm_mq_handle *queue; /* shm_mq to receive from */ +}; + +/* + * Receive a tuple from a query, and send it to the designated shm_mq. + * + * Returns true if successful, false if shm_mq has been detached. + */ +static bool +tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + MinimalTuple tuple; + shm_mq_result result; + bool should_free; + + /* Send the tuple itself. */ + tuple = ExecFetchSlotMinimalTuple(slot, &should_free); + result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false); + + if (should_free) + pfree(tuple); + + /* Check for failure. */ + if (result == SHM_MQ_DETACHED) + return false; + else if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send tuple to shared-memory queue"))); + + return true; +} + +/* + * Prepare to receive tuples from executor. + */ +static void +tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + /* do nothing */ +} + +/* + * Clean up at end of an executor run + */ +static void +tqueueShutdownReceiver(DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + + if (tqueue->queue != NULL) + shm_mq_detach(tqueue->queue); + tqueue->queue = NULL; +} + +/* + * Destroy receiver when done with it + */ +static void +tqueueDestroyReceiver(DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + + /* We probably already detached from queue, but let's be sure */ + if (tqueue->queue != NULL) + shm_mq_detach(tqueue->queue); + pfree(self); +} + +/* + * Create a DestReceiver that writes tuples to a tuple queue. + */ +DestReceiver * +CreateTupleQueueDestReceiver(shm_mq_handle *handle) +{ + TQueueDestReceiver *self; + + self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver)); + + self->pub.receiveSlot = tqueueReceiveSlot; + self->pub.rStartup = tqueueStartupReceiver; + self->pub.rShutdown = tqueueShutdownReceiver; + self->pub.rDestroy = tqueueDestroyReceiver; + self->pub.mydest = DestTupleQueue; + self->queue = handle; + + return (DestReceiver *) self; +} + +/* + * Create a tuple queue reader. + */ +TupleQueueReader * +CreateTupleQueueReader(shm_mq_handle *handle) +{ + TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); + + reader->queue = handle; + + return reader; +} + +/* + * Destroy a tuple queue reader. + * + * Note: cleaning up the underlying shm_mq is the caller's responsibility. + * We won't access it here, as it may be detached already. + */ +void +DestroyTupleQueueReader(TupleQueueReader *reader) +{ + pfree(reader); +} + +/* + * Fetch a tuple from a tuple queue reader. + * + * The return value is NULL if there are no remaining tuples or if + * nowait = true and no tuple is ready to return. *done, if not NULL, + * is set to true when there are no remaining tuples and otherwise to false. + * + * The returned tuple, if any, is either in shared memory or a private buffer + * and should not be freed. The pointer is invalid after the next call to + * TupleQueueReaderNext(). + * + * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still + * accumulate bytes from a partially-read message, so it's useful to call + * this with nowait = true even if nothing is returned. + */ +MinimalTuple +TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) +{ + MinimalTuple tuple; + shm_mq_result result; + Size nbytes; + void *data; + + if (done != NULL) + *done = false; + + /* Attempt to read a message. */ + result = shm_mq_receive(reader->queue, &nbytes, &data, nowait); + + /* If queue is detached, set *done and return NULL. */ + if (result == SHM_MQ_DETACHED) + { + if (done != NULL) + *done = true; + return NULL; + } + + /* In non-blocking mode, bail out if no message ready yet. */ + if (result == SHM_MQ_WOULD_BLOCK) + return NULL; + Assert(result == SHM_MQ_SUCCESS); + + /* + * Return a pointer to the queue memory directly (which had better be + * sufficiently aligned). + */ + tuple = (MinimalTuple) data; + Assert(tuple->t_len == nbytes); + + return tuple; +} |