summaryrefslogtreecommitdiffstats
path: root/src/backend/storage/ipc/shm_mq.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/storage/ipc/shm_mq.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/storage/ipc/shm_mq.c')
-rw-r--r--src/backend/storage/ipc/shm_mq.c1288
1 files changed, 1288 insertions, 0 deletions
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
new file mode 100644
index 0000000..3240af4
--- /dev/null
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -0,0 +1,1288 @@
+/*-------------------------------------------------------------------------
+ *
+ * shm_mq.c
+ * single-reader, single-writer shared memory message queue
+ *
+ * Both the sender and the receiver must have a PGPROC; their respective
+ * process latches are used for synchronization. Only the sender may send,
+ * and only the receiver may receive. This is intended to allow a user
+ * backend to communicate with worker backends that it has registered.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/ipc/shm_mq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+#include "utils/memutils.h"
+
+/*
+ * This structure represents the actual queue, stored in shared memory.
+ *
+ * Some notes on synchronization:
+ *
+ * mq_receiver and mq_bytes_read can only be changed by the receiver; and
+ * mq_sender and mq_bytes_written can only be changed by the sender.
+ * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
+ * they cannot change once set, and thus may be read without a lock once this
+ * is known to be the case.
+ *
+ * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
+ * they are written atomically using 8 byte loads and stores. Memory barriers
+ * must be carefully used to synchronize reads and writes of these values with
+ * reads and writes of the actual data in mq_ring.
+ *
+ * mq_detached needs no locking. It can be set by either the sender or the
+ * receiver, but only ever from false to true, so redundant writes don't
+ * matter. It is important that if we set mq_detached and then set the
+ * counterparty's latch, the counterparty must be certain to see the change
+ * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
+ * ends with one, this should be OK.
+ *
+ * mq_ring_size and mq_ring_offset never change after initialization, and
+ * can therefore be read without the lock.
+ *
+ * Importantly, mq_ring can be safely read and written without a lock.
+ * At any given time, the difference between mq_bytes_read and
+ * mq_bytes_written defines the number of bytes within mq_ring that contain
+ * unread data, and mq_bytes_read defines the position where those bytes
+ * begin. The sender can increase the number of unread bytes at any time,
+ * but only the receiver can give license to overwrite those bytes, by
+ * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
+ * the unread bytes it knows to be present without the lock. Conversely,
+ * the sender can write to the unused portion of the ring buffer without
+ * the lock, because nobody else can be reading or writing those bytes. The
+ * receiver could be making more bytes unused by incrementing mq_bytes_read,
+ * but that's OK. Note that it would be unsafe for the receiver to read any
+ * data it's already marked as read, or to write any data; and it would be
+ * unsafe for the sender to reread any data after incrementing
+ * mq_bytes_written, but fortunately there's no need for any of that.
+ */
+struct shm_mq
+{
+ slock_t mq_mutex;
+ PGPROC *mq_receiver;
+ PGPROC *mq_sender;
+ pg_atomic_uint64 mq_bytes_read;
+ pg_atomic_uint64 mq_bytes_written;
+ Size mq_ring_size;
+ bool mq_detached;
+ uint8 mq_ring_offset;
+ char mq_ring[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/*
+ * This structure is a backend-private handle for access to a queue.
+ *
+ * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
+ * an optional pointer to the dynamic shared memory segment that contains it.
+ * (If mqh_segment is provided, we register an on_dsm_detach callback to
+ * make sure we detach from the queue before detaching from DSM.)
+ *
+ * If this queue is intended to connect the current process with a background
+ * worker that started it, the user can pass a pointer to the worker handle
+ * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
+ * is to allow us to begin sending to or receiving from that queue before the
+ * process we'll be communicating with has even been started. If it fails
+ * to start, the handle will allow us to notice that and fail cleanly, rather
+ * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
+ * simple cases - e.g. where there are just 2 processes communicating; in
+ * more complex scenarios, every process may not have a BackgroundWorkerHandle
+ * available, or may need to watch for the failure of more than one other
+ * process at a time.
+ *
+ * When a message exists as a contiguous chunk of bytes in the queue - that is,
+ * it is smaller than the size of the ring buffer and does not wrap around
+ * the end - we return the message to the caller as a pointer into the buffer.
+ * For messages that are larger or happen to wrap, we reassemble the message
+ * locally by copying the chunks into a backend-local buffer. mqh_buffer is
+ * the buffer, and mqh_buflen is the number of bytes allocated for it.
+ *
+ * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
+ * are used to track the state of non-blocking operations. When the caller
+ * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
+ * are expected to retry the call at a later time with the same argument;
+ * we need to retain enough state to pick up where we left off.
+ * mqh_length_word_complete tracks whether we are done sending or receiving
+ * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
+ * the number of bytes read or written for either the length word or the
+ * message itself, and mqh_expected_bytes - which is used only for reads -
+ * tracks the expected total size of the payload.
+ *
+ * mqh_counterparty_attached tracks whether we know the counterparty to have
+ * attached to the queue at some previous point. This lets us avoid some
+ * mutex acquisitions.
+ *
+ * mqh_context is the memory context in effect at the time we attached to
+ * the shm_mq. The shm_mq_handle itself is allocated in this context, and
+ * we make sure any other allocations we do happen in this context as well,
+ * to avoid nasty surprises.
+ */
+struct shm_mq_handle
+{
+ shm_mq *mqh_queue;
+ dsm_segment *mqh_segment;
+ BackgroundWorkerHandle *mqh_handle;
+ char *mqh_buffer;
+ Size mqh_buflen;
+ Size mqh_consume_pending;
+ Size mqh_partial_bytes;
+ Size mqh_expected_bytes;
+ bool mqh_length_word_complete;
+ bool mqh_counterparty_attached;
+ MemoryContext mqh_context;
+};
+
+static void shm_mq_detach_internal(shm_mq *mq);
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
+ const void *data, bool nowait, Size *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
+ Size bytes_needed, bool nowait, Size *nbytesp,
+ void **datap);
+static bool shm_mq_counterparty_gone(shm_mq *mq,
+ BackgroundWorkerHandle *handle);
+static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
+ BackgroundWorkerHandle *handle);
+static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
+static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
+static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
+
+/* Minimum queue size is enough for header and at least one chunk of data. */
+const Size shm_mq_minimum_size =
+MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
+
+#define MQH_INITIAL_BUFSIZE 8192
+
+/*
+ * Initialize a new shared message queue.
+ */
+shm_mq *
+shm_mq_create(void *address, Size size)
+{
+ shm_mq *mq = address;
+ Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+
+ /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
+ size = MAXALIGN_DOWN(size);
+
+ /* Queue size must be large enough to hold some data. */
+ Assert(size > data_offset);
+
+ /* Initialize queue header. */
+ SpinLockInit(&mq->mq_mutex);
+ mq->mq_receiver = NULL;
+ mq->mq_sender = NULL;
+ pg_atomic_init_u64(&mq->mq_bytes_read, 0);
+ pg_atomic_init_u64(&mq->mq_bytes_written, 0);
+ mq->mq_ring_size = size - data_offset;
+ mq->mq_detached = false;
+ mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
+
+ return mq;
+}
+
+/*
+ * Set the identity of the process that will receive from a shared message
+ * queue.
+ */
+void
+shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
+{
+ PGPROC *sender;
+
+ SpinLockAcquire(&mq->mq_mutex);
+ Assert(mq->mq_receiver == NULL);
+ mq->mq_receiver = proc;
+ sender = mq->mq_sender;
+ SpinLockRelease(&mq->mq_mutex);
+
+ if (sender != NULL)
+ SetLatch(&sender->procLatch);
+}
+
+/*
+ * Set the identity of the process that will send to a shared message queue.
+ */
+void
+shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
+{
+ PGPROC *receiver;
+
+ SpinLockAcquire(&mq->mq_mutex);
+ Assert(mq->mq_sender == NULL);
+ mq->mq_sender = proc;
+ receiver = mq->mq_receiver;
+ SpinLockRelease(&mq->mq_mutex);
+
+ if (receiver != NULL)
+ SetLatch(&receiver->procLatch);
+}
+
+/*
+ * Get the configured receiver.
+ */
+PGPROC *
+shm_mq_get_receiver(shm_mq *mq)
+{
+ PGPROC *receiver;
+
+ SpinLockAcquire(&mq->mq_mutex);
+ receiver = mq->mq_receiver;
+ SpinLockRelease(&mq->mq_mutex);
+
+ return receiver;
+}
+
+/*
+ * Get the configured sender.
+ */
+PGPROC *
+shm_mq_get_sender(shm_mq *mq)
+{
+ PGPROC *sender;
+
+ SpinLockAcquire(&mq->mq_mutex);
+ sender = mq->mq_sender;
+ SpinLockRelease(&mq->mq_mutex);
+
+ return sender;
+}
+
+/*
+ * Attach to a shared message queue so we can send or receive messages.
+ *
+ * The memory context in effect at the time this function is called should
+ * be one which will last for at least as long as the message queue itself.
+ * We'll allocate the handle in that context, and future allocations that
+ * are needed to buffer incoming data will happen in that context as well.
+ *
+ * If seg != NULL, the queue will be automatically detached when that dynamic
+ * shared memory segment is detached.
+ *
+ * If handle != NULL, the queue can be read or written even before the
+ * other process has attached. We'll wait for it to do so if needed. The
+ * handle must be for a background worker initialized with bgw_notify_pid
+ * equal to our PID.
+ *
+ * shm_mq_detach() should be called when done. This will free the
+ * shm_mq_handle and mark the queue itself as detached, so that our
+ * counterpart won't get stuck waiting for us to fill or drain the queue
+ * after we've already lost interest.
+ */
+shm_mq_handle *
+shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
+{
+ shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
+
+ Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
+ mqh->mqh_queue = mq;
+ mqh->mqh_segment = seg;
+ mqh->mqh_handle = handle;
+ mqh->mqh_buffer = NULL;
+ mqh->mqh_buflen = 0;
+ mqh->mqh_consume_pending = 0;
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_expected_bytes = 0;
+ mqh->mqh_length_word_complete = false;
+ mqh->mqh_counterparty_attached = false;
+ mqh->mqh_context = CurrentMemoryContext;
+
+ if (seg != NULL)
+ on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
+
+ return mqh;
+}
+
+/*
+ * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
+ * been passed to shm_mq_attach.
+ */
+void
+shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
+{
+ Assert(mqh->mqh_handle == NULL);
+ mqh->mqh_handle = handle;
+}
+
+/*
+ * Write a message into a shared message queue.
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+{
+ shm_mq_iovec iov;
+
+ iov.data = data;
+ iov.len = nbytes;
+
+ return shm_mq_sendv(mqh, &iov, 1, nowait);
+}
+
+/*
+ * Write a message into a shared message queue, gathered from multiple
+ * addresses.
+ *
+ * When nowait = false, we'll wait on our process latch when the ring buffer
+ * fills up, and then continue writing once the receiver has drained some data.
+ * The process latch is reset after each wait.
+ *
+ * When nowait = true, we do not manipulate the state of the process latch;
+ * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
+ * this case, the caller should call this function again, with the same
+ * arguments, each time the process latch is set. (Once begun, the sending
+ * of a message cannot be aborted except by detaching from the queue; changing
+ * the length or payload will corrupt the queue.)
+ */
+shm_mq_result
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
+{
+ shm_mq_result res;
+ shm_mq *mq = mqh->mqh_queue;
+ PGPROC *receiver;
+ Size nbytes = 0;
+ Size bytes_written;
+ int i;
+ int which_iov = 0;
+ Size offset;
+
+ Assert(mq->mq_sender == MyProc);
+
+ /* Compute total size of write. */
+ for (i = 0; i < iovcnt; ++i)
+ nbytes += iov[i].len;
+
+ /* Prevent writing messages overwhelming the receiver. */
+ if (nbytes > MaxAllocSize)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("cannot send a message of size %zu via shared memory queue",
+ nbytes)));
+
+ /* Try to write, or finish writing, the length word into the buffer. */
+ while (!mqh->mqh_length_word_complete)
+ {
+ Assert(mqh->mqh_partial_bytes < sizeof(Size));
+ res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+ ((char *) &nbytes) + mqh->mqh_partial_bytes,
+ nowait, &bytes_written);
+
+ if (res == SHM_MQ_DETACHED)
+ {
+ /* Reset state in case caller tries to send another message. */
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = false;
+ return res;
+ }
+ mqh->mqh_partial_bytes += bytes_written;
+
+ if (mqh->mqh_partial_bytes >= sizeof(Size))
+ {
+ Assert(mqh->mqh_partial_bytes == sizeof(Size));
+
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = true;
+ }
+
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+
+ /* Length word can't be split unless bigger than required alignment. */
+ Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
+ }
+
+ /* Write the actual data bytes into the buffer. */
+ Assert(mqh->mqh_partial_bytes <= nbytes);
+ offset = mqh->mqh_partial_bytes;
+ do
+ {
+ Size chunksize;
+
+ /* Figure out which bytes need to be sent next. */
+ if (offset >= iov[which_iov].len)
+ {
+ offset -= iov[which_iov].len;
+ ++which_iov;
+ if (which_iov >= iovcnt)
+ break;
+ continue;
+ }
+
+ /*
+ * We want to avoid copying the data if at all possible, but every
+ * chunk of bytes we write into the queue has to be MAXALIGN'd, except
+ * the last. Thus, if a chunk other than the last one ends on a
+ * non-MAXALIGN'd boundary, we have to combine the tail end of its
+ * data with data from one or more following chunks until we either
+ * reach the last chunk or accumulate a number of bytes which is
+ * MAXALIGN'd.
+ */
+ if (which_iov + 1 < iovcnt &&
+ offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
+ {
+ char tmpbuf[MAXIMUM_ALIGNOF];
+ int j = 0;
+
+ for (;;)
+ {
+ if (offset < iov[which_iov].len)
+ {
+ tmpbuf[j] = iov[which_iov].data[offset];
+ j++;
+ offset++;
+ if (j == MAXIMUM_ALIGNOF)
+ break;
+ }
+ else
+ {
+ offset -= iov[which_iov].len;
+ which_iov++;
+ if (which_iov >= iovcnt)
+ break;
+ }
+ }
+
+ res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+
+ if (res == SHM_MQ_DETACHED)
+ {
+ /* Reset state in case caller tries to send another message. */
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = false;
+ return res;
+ }
+
+ mqh->mqh_partial_bytes += bytes_written;
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+ continue;
+ }
+
+ /*
+ * If this is the last chunk, we can write all the data, even if it
+ * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
+ * MAXALIGN_DOWN the write size.
+ */
+ chunksize = iov[which_iov].len - offset;
+ if (which_iov + 1 < iovcnt)
+ chunksize = MAXALIGN_DOWN(chunksize);
+ res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
+ nowait, &bytes_written);
+
+ if (res == SHM_MQ_DETACHED)
+ {
+ /* Reset state in case caller tries to send another message. */
+ mqh->mqh_length_word_complete = false;
+ mqh->mqh_partial_bytes = 0;
+ return res;
+ }
+
+ mqh->mqh_partial_bytes += bytes_written;
+ offset += bytes_written;
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+ } while (mqh->mqh_partial_bytes < nbytes);
+
+ /* Reset for next message. */
+ mqh->mqh_partial_bytes = 0;
+ mqh->mqh_length_word_complete = false;
+
+ /* If queue has been detached, let caller know. */
+ if (mq->mq_detached)
+ return SHM_MQ_DETACHED;
+
+ /*
+ * If the counterparty is known to have attached, we can read mq_receiver
+ * without acquiring the spinlock and assume it isn't NULL. Otherwise,
+ * more caution is needed.
+ */
+ if (mqh->mqh_counterparty_attached)
+ receiver = mq->mq_receiver;
+ else
+ {
+ SpinLockAcquire(&mq->mq_mutex);
+ receiver = mq->mq_receiver;
+ SpinLockRelease(&mq->mq_mutex);
+ if (receiver == NULL)
+ return SHM_MQ_SUCCESS;
+ mqh->mqh_counterparty_attached = true;
+ }
+
+ /* Notify receiver of the newly-written data, and return. */
+ SetLatch(&receiver->procLatch);
+ return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Receive a message from a shared message queue.
+ *
+ * We set *nbytes to the message length and *data to point to the message
+ * payload. If the entire message exists in the queue as a single,
+ * contiguous chunk, *data will point directly into shared memory; otherwise,
+ * it will point to a temporary buffer. This mostly avoids data copying in
+ * the hoped-for case where messages are short compared to the buffer size,
+ * while still allowing longer messages. In either case, the return value
+ * remains valid until the next receive operation is performed on the queue.
+ *
+ * When nowait = false, we'll wait on our process latch when the ring buffer
+ * is empty and we have not yet received a full message. The sender will
+ * set our process latch after more data has been written, and we'll resume
+ * processing. Each call will therefore return a complete message
+ * (unless the sender detaches the queue).
+ *
+ * When nowait = true, we do not manipulate the state of the process latch;
+ * instead, whenever the buffer is empty and we need to read from it, we
+ * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
+ * function again after the process latch has been set.
+ */
+shm_mq_result
+shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
+{
+ shm_mq *mq = mqh->mqh_queue;
+ shm_mq_result res;
+ Size rb = 0;
+ Size nbytes;
+ void *rawdata;
+
+ Assert(mq->mq_receiver == MyProc);
+
+ /* We can't receive data until the sender has attached. */
+ if (!mqh->mqh_counterparty_attached)
+ {
+ if (nowait)
+ {
+ int counterparty_gone;
+
+ /*
+ * We shouldn't return at this point at all unless the sender
+ * hasn't attached yet. However, the correct return value depends
+ * on whether the sender is still attached. If we first test
+ * whether the sender has ever attached and then test whether the
+ * sender has detached, there's a race condition: a sender that
+ * attaches and detaches very quickly might fool us into thinking
+ * the sender never attached at all. So, test whether our
+ * counterparty is definitively gone first, and only afterwards
+ * check whether the sender ever attached in the first place.
+ */
+ counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
+ if (shm_mq_get_sender(mq) == NULL)
+ {
+ if (counterparty_gone)
+ return SHM_MQ_DETACHED;
+ else
+ return SHM_MQ_WOULD_BLOCK;
+ }
+ }
+ else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
+ && shm_mq_get_sender(mq) == NULL)
+ {
+ mq->mq_detached = true;
+ return SHM_MQ_DETACHED;
+ }
+ mqh->mqh_counterparty_attached = true;
+ }
+
+ /*
+ * If we've consumed an amount of data greater than 1/4th of the ring
+ * size, mark it consumed in shared memory. We try to avoid doing this
+ * unnecessarily when only a small amount of data has been consumed,
+ * because SetLatch() is fairly expensive and we don't want to do it too
+ * often.
+ */
+ if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
+ {
+ shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+ mqh->mqh_consume_pending = 0;
+ }
+
+ /* Try to read, or finish reading, the length word from the buffer. */
+ while (!mqh->mqh_length_word_complete)
+ {
+ /* Try to receive the message length word. */
+ Assert(mqh->mqh_partial_bytes < sizeof(Size));
+ res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+ nowait, &rb, &rawdata);
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+
+ /*
+ * Hopefully, we'll receive the entire message length word at once.
+ * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
+ * multiple reads.
+ */
+ if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
+ {
+ Size needed;
+
+ nbytes = *(Size *) rawdata;
+
+ /* If we've already got the whole message, we're done. */
+ needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
+ if (rb >= needed)
+ {
+ mqh->mqh_consume_pending += needed;
+ *nbytesp = nbytes;
+ *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
+ return SHM_MQ_SUCCESS;
+ }
+
+ /*
+ * We don't have the whole message, but we at least have the whole
+ * length word.
+ */
+ mqh->mqh_expected_bytes = nbytes;
+ mqh->mqh_length_word_complete = true;
+ mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
+ rb -= MAXALIGN(sizeof(Size));
+ }
+ else
+ {
+ Size lengthbytes;
+
+ /* Can't be split unless bigger than required alignment. */
+ Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
+
+ /* Message word is split; need buffer to reassemble. */
+ if (mqh->mqh_buffer == NULL)
+ {
+ mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
+ MQH_INITIAL_BUFSIZE);
+ mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
+ }
+ Assert(mqh->mqh_buflen >= sizeof(Size));
+
+ /* Copy partial length word; remember to consume it. */
+ if (mqh->mqh_partial_bytes + rb > sizeof(Size))
+ lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
+ else
+ lengthbytes = rb;
+ memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
+ lengthbytes);
+ mqh->mqh_partial_bytes += lengthbytes;
+ mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
+ rb -= lengthbytes;
+
+ /* If we now have the whole word, we're ready to read payload. */
+ if (mqh->mqh_partial_bytes >= sizeof(Size))
+ {
+ Assert(mqh->mqh_partial_bytes == sizeof(Size));
+ mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
+ mqh->mqh_length_word_complete = true;
+ mqh->mqh_partial_bytes = 0;
+ }
+ }
+ }
+ nbytes = mqh->mqh_expected_bytes;
+
+ /*
+ * Should be disallowed on the sending side already, but better check and
+ * error out on the receiver side as well rather than trying to read a
+ * prohibitively large message.
+ */
+ if (nbytes > MaxAllocSize)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("invalid message size %zu in shared memory queue",
+ nbytes)));
+
+ if (mqh->mqh_partial_bytes == 0)
+ {
+ /*
+ * Try to obtain the whole message in a single chunk. If this works,
+ * we need not copy the data and can return a pointer directly into
+ * shared memory.
+ */
+ res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+ if (rb >= nbytes)
+ {
+ mqh->mqh_length_word_complete = false;
+ mqh->mqh_consume_pending += MAXALIGN(nbytes);
+ *nbytesp = nbytes;
+ *datap = rawdata;
+ return SHM_MQ_SUCCESS;
+ }
+
+ /*
+ * The message has wrapped the buffer. We'll need to copy it in order
+ * to return it to the client in one chunk. First, make sure we have
+ * a large enough buffer available.
+ */
+ if (mqh->mqh_buflen < nbytes)
+ {
+ Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
+
+ /*
+ * Double the buffer size until the payload fits, but limit to
+ * MaxAllocSize.
+ */
+ while (newbuflen < nbytes)
+ newbuflen *= 2;
+ newbuflen = Min(newbuflen, MaxAllocSize);
+
+ if (mqh->mqh_buffer != NULL)
+ {
+ pfree(mqh->mqh_buffer);
+ mqh->mqh_buffer = NULL;
+ mqh->mqh_buflen = 0;
+ }
+ mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
+ mqh->mqh_buflen = newbuflen;
+ }
+ }
+
+ /* Loop until we've copied the entire message. */
+ for (;;)
+ {
+ Size still_needed;
+
+ /* Copy as much as we can. */
+ Assert(mqh->mqh_partial_bytes + rb <= nbytes);
+ if (rb > 0)
+ {
+ memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
+ mqh->mqh_partial_bytes += rb;
+ }
+
+ /*
+ * Update count of bytes that can be consumed, accounting for
+ * alignment padding. Note that this will never actually insert any
+ * padding except at the end of a message, because the buffer size is
+ * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
+ */
+ Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
+ mqh->mqh_consume_pending += MAXALIGN(rb);
+
+ /* If we got all the data, exit the loop. */
+ if (mqh->mqh_partial_bytes >= nbytes)
+ break;
+
+ /* Wait for some more data. */
+ still_needed = nbytes - mqh->mqh_partial_bytes;
+ res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
+ if (res != SHM_MQ_SUCCESS)
+ return res;
+ if (rb > still_needed)
+ rb = still_needed;
+ }
+
+ /* Return the complete message, and reset for next message. */
+ *nbytesp = nbytes;
+ *datap = mqh->mqh_buffer;
+ mqh->mqh_length_word_complete = false;
+ mqh->mqh_partial_bytes = 0;
+ return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Wait for the other process that's supposed to use this queue to attach
+ * to it.
+ *
+ * The return value is SHM_MQ_DETACHED if the worker has already detached or
+ * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
+ * Note that we will only be able to detect that the worker has died before
+ * attaching if a background worker handle was passed to shm_mq_attach().
+ */
+shm_mq_result
+shm_mq_wait_for_attach(shm_mq_handle *mqh)
+{
+ shm_mq *mq = mqh->mqh_queue;
+ PGPROC **victim;
+
+ if (shm_mq_get_receiver(mq) == MyProc)
+ victim = &mq->mq_sender;
+ else
+ {
+ Assert(shm_mq_get_sender(mq) == MyProc);
+ victim = &mq->mq_receiver;
+ }
+
+ if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
+ return SHM_MQ_SUCCESS;
+ else
+ return SHM_MQ_DETACHED;
+}
+
+/*
+ * Detach from a shared message queue, and destroy the shm_mq_handle.
+ */
+void
+shm_mq_detach(shm_mq_handle *mqh)
+{
+ /* Notify counterparty that we're outta here. */
+ shm_mq_detach_internal(mqh->mqh_queue);
+
+ /* Cancel on_dsm_detach callback, if any. */
+ if (mqh->mqh_segment)
+ cancel_on_dsm_detach(mqh->mqh_segment,
+ shm_mq_detach_callback,
+ PointerGetDatum(mqh->mqh_queue));
+
+ /* Release local memory associated with handle. */
+ if (mqh->mqh_buffer != NULL)
+ pfree(mqh->mqh_buffer);
+ pfree(mqh);
+}
+
+/*
+ * Notify counterparty that we're detaching from shared message queue.
+ *
+ * The purpose of this function is to make sure that the process
+ * with which we're communicating doesn't block forever waiting for us to
+ * fill or drain the queue once we've lost interest. When the sender
+ * detaches, the receiver can read any messages remaining in the queue;
+ * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
+ * further attempts to send messages will likewise return SHM_MQ_DETACHED.
+ *
+ * This is separated out from shm_mq_detach() because if the on_dsm_detach
+ * callback fires, we only want to do this much. We do not try to touch
+ * the local shm_mq_handle, as it may have been pfree'd already.
+ */
+static void
+shm_mq_detach_internal(shm_mq *mq)
+{
+ PGPROC *victim;
+
+ SpinLockAcquire(&mq->mq_mutex);
+ if (mq->mq_sender == MyProc)
+ victim = mq->mq_receiver;
+ else
+ {
+ Assert(mq->mq_receiver == MyProc);
+ victim = mq->mq_sender;
+ }
+ mq->mq_detached = true;
+ SpinLockRelease(&mq->mq_mutex);
+
+ if (victim != NULL)
+ SetLatch(&victim->procLatch);
+}
+
+/*
+ * Get the shm_mq from handle.
+ */
+shm_mq *
+shm_mq_get_queue(shm_mq_handle *mqh)
+{
+ return mqh->mqh_queue;
+}
+
+/*
+ * Write bytes into a shared message queue.
+ */
+static shm_mq_result
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
+ bool nowait, Size *bytes_written)
+{
+ shm_mq *mq = mqh->mqh_queue;
+ Size sent = 0;
+ uint64 used;
+ Size ringsize = mq->mq_ring_size;
+ Size available;
+
+ while (sent < nbytes)
+ {
+ uint64 rb;
+ uint64 wb;
+
+ /* Compute number of ring buffer bytes used and available. */
+ rb = pg_atomic_read_u64(&mq->mq_bytes_read);
+ wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+ Assert(wb >= rb);
+ used = wb - rb;
+ Assert(used <= ringsize);
+ available = Min(ringsize - used, nbytes - sent);
+
+ /*
+ * Bail out if the queue has been detached. Note that we would be in
+ * trouble if the compiler decided to cache the value of
+ * mq->mq_detached in a register or on the stack across loop
+ * iterations. It probably shouldn't do that anyway since we'll
+ * always return, call an external function that performs a system
+ * call, or reach a memory barrier at some point later in the loop,
+ * but just to be sure, insert a compiler barrier here.
+ */
+ pg_compiler_barrier();
+ if (mq->mq_detached)
+ {
+ *bytes_written = sent;
+ return SHM_MQ_DETACHED;
+ }
+
+ if (available == 0 && !mqh->mqh_counterparty_attached)
+ {
+ /*
+ * The queue is full, so if the receiver isn't yet known to be
+ * attached, we must wait for that to happen.
+ */
+ if (nowait)
+ {
+ if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
+ {
+ *bytes_written = sent;
+ return SHM_MQ_DETACHED;
+ }
+ if (shm_mq_get_receiver(mq) == NULL)
+ {
+ *bytes_written = sent;
+ return SHM_MQ_WOULD_BLOCK;
+ }
+ }
+ else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
+ mqh->mqh_handle))
+ {
+ mq->mq_detached = true;
+ *bytes_written = sent;
+ return SHM_MQ_DETACHED;
+ }
+ mqh->mqh_counterparty_attached = true;
+
+ /*
+ * The receiver may have read some data after attaching, so we
+ * must not wait without rechecking the queue state.
+ */
+ }
+ else if (available == 0)
+ {
+ /*
+ * Since mq->mqh_counterparty_attached is known to be true at this
+ * point, mq_receiver has been set, and it can't change once set.
+ * Therefore, we can read it without acquiring the spinlock.
+ */
+ Assert(mqh->mqh_counterparty_attached);
+ SetLatch(&mq->mq_receiver->procLatch);
+
+ /* Skip manipulation of our latch if nowait = true. */
+ if (nowait)
+ {
+ *bytes_written = sent;
+ return SHM_MQ_WOULD_BLOCK;
+ }
+
+ /*
+ * Wait for our latch to be set. It might already be set for some
+ * unrelated reason, but that'll just result in one extra trip
+ * through the loop. It's worth it to avoid resetting the latch
+ * at top of loop, because setting an already-set latch is much
+ * cheaper than setting one that has been reset.
+ */
+ (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+ WAIT_EVENT_MQ_SEND);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+ else
+ {
+ Size offset;
+ Size sendnow;
+
+ offset = wb % (uint64) ringsize;
+ sendnow = Min(available, ringsize - offset);
+
+ /*
+ * Write as much data as we can via a single memcpy(). Make sure
+ * these writes happen after the read of mq_bytes_read, above.
+ * This barrier pairs with the one in shm_mq_inc_bytes_read.
+ * (Since we're separating the read of mq_bytes_read from a
+ * subsequent write to mq_ring, we need a full barrier here.)
+ */
+ pg_memory_barrier();
+ memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
+ (char *) data + sent, sendnow);
+ sent += sendnow;
+
+ /*
+ * Update count of bytes written, with alignment padding. Note
+ * that this will never actually insert any padding except at the
+ * end of a run of bytes, because the buffer size is a multiple of
+ * MAXIMUM_ALIGNOF, and each read is as well.
+ */
+ Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+ shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
+
+ /*
+ * For efficiency, we don't set the reader's latch here. We'll do
+ * that only when the buffer fills up or after writing an entire
+ * message.
+ */
+ }
+ }
+
+ *bytes_written = sent;
+ return SHM_MQ_SUCCESS;
+}
+
+/*
+ * Wait until at least *nbytesp bytes are available to be read from the
+ * shared message queue, or until the buffer wraps around. If the queue is
+ * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
+ * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
+ * to the location at which data bytes can be read, *nbytesp is set to the
+ * number of bytes which can be read at that address, and the return value
+ * is SHM_MQ_SUCCESS.
+ */
+static shm_mq_result
+shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
+ Size *nbytesp, void **datap)
+{
+ shm_mq *mq = mqh->mqh_queue;
+ Size ringsize = mq->mq_ring_size;
+ uint64 used;
+ uint64 written;
+
+ for (;;)
+ {
+ Size offset;
+ uint64 read;
+
+ /* Get bytes written, so we can compute what's available to read. */
+ written = pg_atomic_read_u64(&mq->mq_bytes_written);
+
+ /*
+ * Get bytes read. Include bytes we could consume but have not yet
+ * consumed.
+ */
+ read = pg_atomic_read_u64(&mq->mq_bytes_read) +
+ mqh->mqh_consume_pending;
+ used = written - read;
+ Assert(used <= ringsize);
+ offset = read % (uint64) ringsize;
+
+ /* If we have enough data or buffer has wrapped, we're done. */
+ if (used >= bytes_needed || offset + used >= ringsize)
+ {
+ *nbytesp = Min(used, ringsize - offset);
+ *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
+
+ /*
+ * Separate the read of mq_bytes_written, above, from caller's
+ * attempt to read the data itself. Pairs with the barrier in
+ * shm_mq_inc_bytes_written.
+ */
+ pg_read_barrier();
+ return SHM_MQ_SUCCESS;
+ }
+
+ /*
+ * Fall out before waiting if the queue has been detached.
+ *
+ * Note that we don't check for this until *after* considering whether
+ * the data already available is enough, since the receiver can finish
+ * receiving a message stored in the buffer even after the sender has
+ * detached.
+ */
+ if (mq->mq_detached)
+ {
+ /*
+ * If the writer advanced mq_bytes_written and then set
+ * mq_detached, we might not have read the final value of
+ * mq_bytes_written above. Insert a read barrier and then check
+ * again if mq_bytes_written has advanced.
+ */
+ pg_read_barrier();
+ if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
+ continue;
+
+ return SHM_MQ_DETACHED;
+ }
+
+ /*
+ * We didn't get enough data to satisfy the request, so mark any data
+ * previously-consumed as read to make more buffer space.
+ */
+ if (mqh->mqh_consume_pending > 0)
+ {
+ shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+ mqh->mqh_consume_pending = 0;
+ }
+
+ /* Skip manipulation of our latch if nowait = true. */
+ if (nowait)
+ return SHM_MQ_WOULD_BLOCK;
+
+ /*
+ * Wait for our latch to be set. It might already be set for some
+ * unrelated reason, but that'll just result in one extra trip through
+ * the loop. It's worth it to avoid resetting the latch at top of
+ * loop, because setting an already-set latch is much cheaper than
+ * setting one that has been reset.
+ */
+ (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+ WAIT_EVENT_MQ_RECEIVE);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+}
+
+/*
+ * Test whether a counterparty who may not even be alive yet is definitely gone.
+ */
+static bool
+shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
+{
+ pid_t pid;
+
+ /* If the queue has been detached, counterparty is definitely gone. */
+ if (mq->mq_detached)
+ return true;
+
+ /* If there's a handle, check worker status. */
+ if (handle != NULL)
+ {
+ BgwHandleStatus status;
+
+ /* Check for unexpected worker death. */
+ status = GetBackgroundWorkerPid(handle, &pid);
+ if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
+ {
+ /* Mark it detached, just to make it official. */
+ mq->mq_detached = true;
+ return true;
+ }
+ }
+
+ /* Counterparty is not definitively gone. */
+ return false;
+}
+
+/*
+ * This is used when a process is waiting for its counterpart to attach to the
+ * queue. We exit when the other process attaches as expected, or, if
+ * handle != NULL, when the referenced background process or the postmaster
+ * dies. Note that if handle == NULL, and the process fails to attach, we'll
+ * potentially get stuck here forever waiting for a process that may never
+ * start. We do check for interrupts, though.
+ *
+ * ptr is a pointer to the memory address that we're expecting to become
+ * non-NULL when our counterpart attaches to the queue.
+ */
+static bool
+shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
+{
+ bool result = false;
+
+ for (;;)
+ {
+ BgwHandleStatus status;
+ pid_t pid;
+
+ /* Acquire the lock just long enough to check the pointer. */
+ SpinLockAcquire(&mq->mq_mutex);
+ result = (*ptr != NULL);
+ SpinLockRelease(&mq->mq_mutex);
+
+ /* Fail if detached; else succeed if initialized. */
+ if (mq->mq_detached)
+ {
+ result = false;
+ break;
+ }
+ if (result)
+ break;
+
+ if (handle != NULL)
+ {
+ /* Check for unexpected worker death. */
+ status = GetBackgroundWorkerPid(handle, &pid);
+ if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
+ {
+ result = false;
+ break;
+ }
+ }
+
+ /* Wait to be signaled. */
+ (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+ WAIT_EVENT_MQ_INTERNAL);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ return result;
+}
+
+/*
+ * Increment the number of bytes read.
+ */
+static void
+shm_mq_inc_bytes_read(shm_mq *mq, Size n)
+{
+ PGPROC *sender;
+
+ /*
+ * Separate prior reads of mq_ring from the increment of mq_bytes_read
+ * which follows. This pairs with the full barrier in
+ * shm_mq_send_bytes(). We only need a read barrier here because the
+ * increment of mq_bytes_read is actually a read followed by a dependent
+ * write.
+ */
+ pg_read_barrier();
+
+ /*
+ * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+ * else can be changing this value. This method should be cheaper.
+ */
+ pg_atomic_write_u64(&mq->mq_bytes_read,
+ pg_atomic_read_u64(&mq->mq_bytes_read) + n);
+
+ /*
+ * We shouldn't have any bytes to read without a sender, so we can read
+ * mq_sender here without a lock. Once it's initialized, it can't change.
+ */
+ sender = mq->mq_sender;
+ Assert(sender != NULL);
+ SetLatch(&sender->procLatch);
+}
+
+/*
+ * Increment the number of bytes written.
+ */
+static void
+shm_mq_inc_bytes_written(shm_mq *mq, Size n)
+{
+ /*
+ * Separate prior reads of mq_ring from the write of mq_bytes_written
+ * which we're about to do. Pairs with the read barrier found in
+ * shm_mq_receive_bytes.
+ */
+ pg_write_barrier();
+
+ /*
+ * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+ * else can be changing this value. This method avoids taking the bus
+ * lock unnecessarily.
+ */
+ pg_atomic_write_u64(&mq->mq_bytes_written,
+ pg_atomic_read_u64(&mq->mq_bytes_written) + n);
+}
+
+/* Shim for on_dsm_detach callback. */
+static void
+shm_mq_detach_callback(dsm_segment *seg, Datum arg)
+{
+ shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
+
+ shm_mq_detach_internal(mq);
+}