diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/test/modules/test_shm_mq | |
parent | Initial commit. (diff) | |
download | postgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/modules/test_shm_mq')
-rw-r--r-- | src/test/modules/test_shm_mq/.gitignore | 4 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/Makefile | 25 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/README | 49 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/expected/test_shm_mq.out | 36 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/setup.c | 316 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/sql/test_shm_mq.sql | 12 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/test.c | 266 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/test_shm_mq--1.0.sql | 19 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/test_shm_mq.control | 4 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/test_shm_mq.h | 45 | ||||
-rw-r--r-- | src/test/modules/test_shm_mq/worker.c | 197 |
11 files changed, 973 insertions, 0 deletions
diff --git a/src/test/modules/test_shm_mq/.gitignore b/src/test/modules/test_shm_mq/.gitignore new file mode 100644 index 0000000..5dcb3ff --- /dev/null +++ b/src/test/modules/test_shm_mq/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_shm_mq/Makefile b/src/test/modules/test_shm_mq/Makefile new file mode 100644 index 0000000..1171ced --- /dev/null +++ b/src/test/modules/test_shm_mq/Makefile @@ -0,0 +1,25 @@ +# src/test/modules/test_shm_mq/Makefile + +MODULE_big = test_shm_mq +OBJS = \ + $(WIN32RES) \ + setup.o \ + test.o \ + worker.o +PGFILEDESC = "test_shm_mq - example use of shared memory message queue" + +EXTENSION = test_shm_mq +DATA = test_shm_mq--1.0.sql + +REGRESS = test_shm_mq + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_shm_mq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_shm_mq/README b/src/test/modules/test_shm_mq/README new file mode 100644 index 0000000..641407b --- /dev/null +++ b/src/test/modules/test_shm_mq/README @@ -0,0 +1,49 @@ +test_shm_mq is an example of how to use dynamic shared memory +and the shared memory message queue facilities to coordinate a user backend +with the efforts of one or more background workers. It is not intended to +do anything useful on its own; rather, it is a demonstration of how these +facilities can be used, and a unit test of those facilities. + +The function is this extension send the same message repeatedly through +a loop of processes. The message payload, the size of the message queue +through which it is sent, and the number of processes in the loop are +configurable. At the end, the message may be verified to ensure that it +has not been corrupted in transmission. + +Functions +========= + + +test_shm_mq(queue_size int8, message text, + repeat_count int4 default 1, num_workers int4 default 1) + RETURNS void + +This function sends and receives messages synchronously. The user +backend sends the provided message to the first background worker using +a message queue of the given size. The first background worker sends +the message to the second background worker, if the number of workers +is greater than one, and so forth. Eventually, the last background +worker sends the message back to the user backend. If the repeat count +is greater than one, the user backend then sends the message back to +the first worker. Once the message has been sent and received by all +the coordinating processes a number of times equal to the repeat count, +the user backend verifies that the message finally received matches the +one originally sent and throws an error if not. + + +test_shm_mq_pipelined(queue_size int8, message text, + repeat_count int4 default 1, num_workers int4 default 1, + verify bool default true) + RETURNS void + +This function sends the same message multiple times, as specified by the +repeat count, to the first background worker using a queue of the given +size. These messages are then forwarded to each background worker in +turn, in each case using a queue of the given size. Finally, the last +background worker sends the messages back to the user backend. The user +backend uses non-blocking sends and receives, so that it may begin receiving +copies of the message before it has finished sending all copies of the +message. The 'verify' argument controls whether or not the +received copies are checked against the message that was sent. (This +takes nontrivial time so it may be useful to disable it for benchmarking +purposes.) diff --git a/src/test/modules/test_shm_mq/expected/test_shm_mq.out b/src/test/modules/test_shm_mq/expected/test_shm_mq.out new file mode 100644 index 0000000..c4858b0 --- /dev/null +++ b/src/test/modules/test_shm_mq/expected/test_shm_mq.out @@ -0,0 +1,36 @@ +CREATE EXTENSION test_shm_mq; +-- +-- These tests don't produce any interesting output. We're checking that +-- the operations complete without crashing or hanging and that none of their +-- internal sanity tests fail. +-- +SELECT test_shm_mq(1024, '', 2000, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq(1024, 'a', 2001, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3); + test_shm_mq_pipelined +----------------------- + +(1 row) + diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c new file mode 100644 index 0000000..e05e97c --- /dev/null +++ b/src/test/modules/test_shm_mq/setup.c @@ -0,0 +1,316 @@ +/*-------------------------------------------------------------------------- + * + * setup.c + * Code to set up a dynamic shared memory segments and a specified + * number of background workers for shared memory message queue + * testing. + * + * Copyright (c) 2013-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/setup.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/procsignal.h" +#include "storage/shm_toc.h" +#include "test_shm_mq.h" +#include "utils/memutils.h" + +typedef struct +{ + int nworkers; + BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER]; +} worker_state; + +static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, + dsm_segment **segp, + test_shm_mq_header **hdrp, + shm_mq **outp, shm_mq **inp); +static worker_state *setup_background_workers(int nworkers, + dsm_segment *seg); +static void cleanup_background_workers(dsm_segment *seg, Datum arg); +static void wait_for_workers_to_become_ready(worker_state *wstate, + volatile test_shm_mq_header *hdr); +static bool check_worker_status(worker_state *wstate); + +/* + * Set up a dynamic shared memory segment and zero or more background workers + * for a test run. + */ +void +test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, + shm_mq_handle **output, shm_mq_handle **input) +{ + dsm_segment *seg; + test_shm_mq_header *hdr; + shm_mq *outq = NULL; /* placate compiler */ + shm_mq *inq = NULL; /* placate compiler */ + worker_state *wstate; + + /* Set up a dynamic shared memory segment. */ + setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq); + *segp = seg; + + /* Register background workers. */ + wstate = setup_background_workers(nworkers, seg); + + /* Attach the queues. */ + *output = shm_mq_attach(outq, seg, wstate->handle[0]); + *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]); + + /* Wait for workers to become ready. */ + wait_for_workers_to_become_ready(wstate, hdr); + + /* + * Once we reach this point, all workers are ready. We no longer need to + * kill them if we die; they'll die on their own as the message queues + * shut down. + */ + cancel_on_dsm_detach(seg, cleanup_background_workers, + PointerGetDatum(wstate)); + pfree(wstate); +} + +/* + * Set up a dynamic shared memory segment. + * + * We set up a small control region that contains only a test_shm_mq_header, + * plus one region per message queue. There are as many message queues as + * the number of workers, plus one. + */ +static void +setup_dynamic_shared_memory(int64 queue_size, int nworkers, + dsm_segment **segp, test_shm_mq_header **hdrp, + shm_mq **outp, shm_mq **inp) +{ + shm_toc_estimator e; + int i; + Size segsize; + dsm_segment *seg; + shm_toc *toc; + test_shm_mq_header *hdr; + + /* Ensure a valid queue size. */ + if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("queue size must be at least %zu bytes", + shm_mq_minimum_size))); + if (queue_size != ((Size) queue_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("queue size overflows size_t"))); + + /* + * Estimate how much shared memory we need. + * + * Because the TOC machinery may choose to insert padding of oddly-sized + * requests, we must estimate each chunk separately. + * + * We need one key to register the location of the header, and we need + * nworkers + 1 keys to track the locations of the message queues. + */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header)); + for (i = 0; i <= nworkers; ++i) + shm_toc_estimate_chunk(&e, (Size) queue_size); + shm_toc_estimate_keys(&e, 2 + nworkers); + segsize = shm_toc_estimate(&e); + + /* Create the shared memory segment and establish a table of contents. */ + seg = dsm_create(shm_toc_estimate(&e), 0); + toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg), + segsize); + + /* Set up the header region. */ + hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header)); + SpinLockInit(&hdr->mutex); + hdr->workers_total = nworkers; + hdr->workers_attached = 0; + hdr->workers_ready = 0; + shm_toc_insert(toc, 0, hdr); + + /* Set up one message queue per worker, plus one. */ + for (i = 0; i <= nworkers; ++i) + { + shm_mq *mq; + + mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size), + (Size) queue_size); + shm_toc_insert(toc, i + 1, mq); + + if (i == 0) + { + /* We send messages to the first queue. */ + shm_mq_set_sender(mq, MyProc); + *outp = mq; + } + if (i == nworkers) + { + /* We receive messages from the last queue. */ + shm_mq_set_receiver(mq, MyProc); + *inp = mq; + } + } + + /* Return results to caller. */ + *segp = seg; + *hdrp = hdr; +} + +/* + * Register background workers. + */ +static worker_state * +setup_background_workers(int nworkers, dsm_segment *seg) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + worker_state *wstate; + int i; + + /* + * We need the worker_state object and the background worker handles to + * which it points to be allocated in CurTransactionContext rather than + * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach + * hooks run. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* Create worker state object. */ + wstate = MemoryContextAlloc(TopTransactionContext, + offsetof(worker_state, handle) + + sizeof(BackgroundWorkerHandle *) * nworkers); + wstate->nworkers = 0; + + /* + * Arrange to kill all the workers if we abort before all workers are + * finished hooking themselves up to the dynamic shared memory segment. + * + * If we die after all the workers have finished hooking themselves up to + * the dynamic shared memory segment, we'll mark the two queues to which + * we're directly connected as detached, and the worker(s) connected to + * those queues will exit, marking any other queues to which they are + * connected as detached. This will cause any as-yet-unaware workers + * connected to those queues to exit in their turn, and so on, until + * everybody exits. + * + * But suppose the workers which are supposed to connect to the queues to + * which we're directly attached exit due to some error before they + * actually attach the queues. The remaining workers will have no way of + * knowing this. From their perspective, they're still waiting for those + * workers to start, when in fact they've already died. + */ + on_dsm_detach(seg, cleanup_background_workers, + PointerGetDatum(wstate)); + + /* Configure a worker. */ + memset(&worker, 0, sizeof(worker)); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + sprintf(worker.bgw_library_name, "test_shm_mq"); + sprintf(worker.bgw_function_name, "test_shm_mq_main"); + snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + /* set bgw_notify_pid, so we can detect if the worker stops */ + worker.bgw_notify_pid = MyProcPid; + + /* Register the workers. */ + for (i = 0; i < nworkers; ++i) + { + if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i])) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + ++wstate->nworkers; + } + + /* All done. */ + MemoryContextSwitchTo(oldcontext); + return wstate; +} + +static void +cleanup_background_workers(dsm_segment *seg, Datum arg) +{ + worker_state *wstate = (worker_state *) DatumGetPointer(arg); + + while (wstate->nworkers > 0) + { + --wstate->nworkers; + TerminateBackgroundWorker(wstate->handle[wstate->nworkers]); + } +} + +static void +wait_for_workers_to_become_ready(worker_state *wstate, + volatile test_shm_mq_header *hdr) +{ + bool result = false; + + for (;;) + { + int workers_ready; + + /* If all the workers are ready, we have succeeded. */ + SpinLockAcquire(&hdr->mutex); + workers_ready = hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + if (workers_ready >= wstate->nworkers) + { + result = true; + break; + } + + /* If any workers (or the postmaster) have died, we have failed. */ + if (!check_worker_status(wstate)) + { + result = false; + break; + } + + /* Wait to be signaled. */ + (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, + PG_WAIT_EXTENSION); + + /* Reset the latch so we don't spin. */ + ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } + + if (!result) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("one or more background workers failed to start"))); +} + +static bool +check_worker_status(worker_state *wstate) +{ + int n; + + /* If any workers (or the postmaster) have died, we have failed. */ + for (n = 0; n < wstate->nworkers; ++n) + { + BgwHandleStatus status; + pid_t pid; + + status = GetBackgroundWorkerPid(wstate->handle[n], &pid); + if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED) + return false; + } + + /* Otherwise, things still look OK. */ + return true; +} diff --git a/src/test/modules/test_shm_mq/sql/test_shm_mq.sql b/src/test/modules/test_shm_mq/sql/test_shm_mq.sql new file mode 100644 index 0000000..9de19d3 --- /dev/null +++ b/src/test/modules/test_shm_mq/sql/test_shm_mq.sql @@ -0,0 +1,12 @@ +CREATE EXTENSION test_shm_mq; + +-- +-- These tests don't produce any interesting output. We're checking that +-- the operations complete without crashing or hanging and that none of their +-- internal sanity tests fail. +-- +SELECT test_shm_mq(1024, '', 2000, 1); +SELECT test_shm_mq(1024, 'a', 2001, 1); +SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1); +SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1); +SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3); diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c new file mode 100644 index 0000000..2d8d695 --- /dev/null +++ b/src/test/modules/test_shm_mq/test.c @@ -0,0 +1,266 @@ +/*-------------------------------------------------------------------------- + * + * test.c + * Test harness code for shared memory message queues. + * + * Copyright (c) 2013-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/test.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "fmgr.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "test_shm_mq.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_shm_mq); +PG_FUNCTION_INFO_V1(test_shm_mq_pipelined); + +void _PG_init(void); + +static void verify_message(Size origlen, char *origdata, Size newlen, + char *newdata); + +/* + * Simple test of the shared memory message queue infrastructure. + * + * We set up a ring of message queues passing through 1 or more background + * processes and eventually looping back to ourselves. We then send a message + * through the ring a number of times indicated by the loop count. At the end, + * we check whether the final message matches the one we started with. + */ +Datum +test_shm_mq(PG_FUNCTION_ARGS) +{ + int64 queue_size = PG_GETARG_INT64(0); + text *message = PG_GETARG_TEXT_PP(1); + char *message_contents = VARDATA_ANY(message); + int message_size = VARSIZE_ANY_EXHDR(message); + int32 loop_count = PG_GETARG_INT32(2); + int32 nworkers = PG_GETARG_INT32(3); + dsm_segment *seg; + shm_mq_handle *outqh; + shm_mq_handle *inqh; + shm_mq_result res; + Size len; + void *data; + + /* A negative loopcount is nonsensical. */ + if (loop_count < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("repeat count size must be an integer value greater than or equal to zero"))); + + /* + * Since this test sends data using the blocking interfaces, it cannot + * send data to itself. Therefore, a minimum of 1 worker is required. Of + * course, a negative worker count is nonsensical. + */ + if (nworkers <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of workers must be an integer value greater than zero"))); + + /* Set up dynamic shared memory segment and background workers. */ + test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); + + /* Send the initial message. */ + res = shm_mq_send(outqh, message_size, message_contents, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); + + /* + * Receive a message and send it back out again. Do this a number of + * times equal to the loop count. + */ + for (;;) + { + /* Receive a message. */ + res = shm_mq_receive(inqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not receive message"))); + + /* If this is supposed to be the last iteration, stop here. */ + if (--loop_count <= 0) + break; + + /* Send it back out. */ + res = shm_mq_send(outqh, len, data, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); + } + + /* + * Finally, check that we got back the same message from the last + * iteration that we originally sent. + */ + verify_message(message_size, message_contents, len, data); + + /* Clean up. */ + dsm_detach(seg); + + PG_RETURN_VOID(); +} + +/* + * Pipelined test of the shared memory message queue infrastructure. + * + * As in the basic test, we set up a ring of message queues passing through + * 1 or more background processes and eventually looping back to ourselves. + * Then, we send N copies of the user-specified message through the ring and + * receive them all back. Since this might fill up all message queues in the + * ring and then stall, we must be prepared to begin receiving the messages + * back before we've finished sending them. + */ +Datum +test_shm_mq_pipelined(PG_FUNCTION_ARGS) +{ + int64 queue_size = PG_GETARG_INT64(0); + text *message = PG_GETARG_TEXT_PP(1); + char *message_contents = VARDATA_ANY(message); + int message_size = VARSIZE_ANY_EXHDR(message); + int32 loop_count = PG_GETARG_INT32(2); + int32 nworkers = PG_GETARG_INT32(3); + bool verify = PG_GETARG_BOOL(4); + int32 send_count = 0; + int32 receive_count = 0; + dsm_segment *seg; + shm_mq_handle *outqh; + shm_mq_handle *inqh; + shm_mq_result res; + Size len; + void *data; + + /* A negative loopcount is nonsensical. */ + if (loop_count < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("repeat count size must be an integer value greater than or equal to zero"))); + + /* + * Using the nonblocking interfaces, we can even send data to ourselves, + * so the minimum number of workers for this test is zero. + */ + if (nworkers < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of workers must be an integer value greater than or equal to zero"))); + + /* Set up dynamic shared memory segment and background workers. */ + test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); + + /* Main loop. */ + for (;;) + { + bool wait = true; + + /* + * If we haven't yet sent the message the requisite number of times, + * try again to send it now. Note that when shm_mq_send() returns + * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the + * same message size and contents; that's not an issue here because + * we're sending the same message every time. + */ + if (send_count < loop_count) + { + res = shm_mq_send(outqh, message_size, message_contents, true); + if (res == SHM_MQ_SUCCESS) + { + ++send_count; + wait = false; + } + else if (res == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); + } + + /* + * If we haven't yet received the message the requisite number of + * times, try to receive it again now. + */ + if (receive_count < loop_count) + { + res = shm_mq_receive(inqh, &len, &data, true); + if (res == SHM_MQ_SUCCESS) + { + ++receive_count; + /* Verifying every time is slow, so it's optional. */ + if (verify) + verify_message(message_size, message_contents, len, data); + wait = false; + } + else if (res == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not receive message"))); + } + else + { + /* + * Otherwise, we've received the message enough times. This + * shouldn't happen unless we've also sent it enough times. + */ + if (send_count != receive_count) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("message sent %d times, but received %d times", + send_count, receive_count))); + break; + } + + if (wait) + { + /* + * If we made no progress, wait for one of the other processes to + * which we are connected to set our latch, indicating that they + * have read or written data and therefore there may now be work + * for us to do. + */ + (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + + /* Clean up. */ + dsm_detach(seg); + + PG_RETURN_VOID(); +} + +/* + * Verify that two messages are the same. + */ +static void +verify_message(Size origlen, char *origdata, Size newlen, char *newdata) +{ + Size i; + + if (origlen != newlen) + ereport(ERROR, + (errmsg("message corrupted"), + errdetail("The original message was %zu bytes but the final message is %zu bytes.", + origlen, newlen))); + + for (i = 0; i < origlen; ++i) + if (origdata[i] != newdata[i]) + ereport(ERROR, + (errmsg("message corrupted"), + errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen))); +} diff --git a/src/test/modules/test_shm_mq/test_shm_mq--1.0.sql b/src/test/modules/test_shm_mq/test_shm_mq--1.0.sql new file mode 100644 index 0000000..56db05d --- /dev/null +++ b/src/test/modules/test_shm_mq/test_shm_mq--1.0.sql @@ -0,0 +1,19 @@ +/* src/test/modules/test_shm_mq/test_shm_mq--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_shm_mq" to load this file. \quit + +CREATE FUNCTION test_shm_mq(queue_size pg_catalog.int8, + message pg_catalog.text, + repeat_count pg_catalog.int4 default 1, + num_workers pg_catalog.int4 default 1) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION test_shm_mq_pipelined(queue_size pg_catalog.int8, + message pg_catalog.text, + repeat_count pg_catalog.int4 default 1, + num_workers pg_catalog.int4 default 1, + verify pg_catalog.bool default true) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_shm_mq/test_shm_mq.control b/src/test/modules/test_shm_mq/test_shm_mq.control new file mode 100644 index 0000000..d9a74c7 --- /dev/null +++ b/src/test/modules/test_shm_mq/test_shm_mq.control @@ -0,0 +1,4 @@ +comment = 'Test code for shared memory message queues' +default_version = '1.0' +module_pathname = '$libdir/test_shm_mq' +relocatable = true diff --git a/src/test/modules/test_shm_mq/test_shm_mq.h b/src/test/modules/test_shm_mq/test_shm_mq.h new file mode 100644 index 0000000..a666121 --- /dev/null +++ b/src/test/modules/test_shm_mq/test_shm_mq.h @@ -0,0 +1,45 @@ +/*-------------------------------------------------------------------------- + * + * test_shm_mq.h + * Definitions for shared memory message queues + * + * Copyright (c) 2013-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/test_shm_mq.h + * + * ------------------------------------------------------------------------- + */ + +#ifndef TEST_SHM_MQ_H +#define TEST_SHM_MQ_H + +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" + +/* Identifier for shared memory segments used by this extension. */ +#define PG_TEST_SHM_MQ_MAGIC 0x79fb2447 + +/* + * This structure is stored in the dynamic shared memory segment. We use + * it to determine whether all workers started up OK and successfully + * attached to their respective shared message queues. + */ +typedef struct +{ + slock_t mutex; + int workers_total; + int workers_attached; + int workers_ready; +} test_shm_mq_header; + +/* Set up dynamic shared memory and background workers for test run. */ +extern void test_shm_mq_setup(int64 queue_size, int32 nworkers, + dsm_segment **seg, shm_mq_handle **output, + shm_mq_handle **input); + +/* Main entrypoint for a worker. */ +extern void test_shm_mq_main(Datum) pg_attribute_noreturn(); + +#endif diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c new file mode 100644 index 0000000..2180776 --- /dev/null +++ b/src/test/modules/test_shm_mq/worker.c @@ -0,0 +1,197 @@ +/*-------------------------------------------------------------------------- + * + * worker.c + * Code for sample worker making use of shared memory message queues. + * Our test worker simply reads messages from one message queue and + * writes them back out to another message queue. In a real + * application, you'd presumably want the worker to do some more + * complex calculation rather than simply returning the input, + * but it should be possible to use much of the control logic just + * as presented here. + * + * Copyright (c) 2013-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/worker.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "tcop/tcopprot.h" + +#include "test_shm_mq.h" + +static void attach_to_queues(dsm_segment *seg, shm_toc *toc, + int myworkernumber, shm_mq_handle **inqhp, + shm_mq_handle **outqhp); +static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh); + +/* + * Background worker entrypoint. + * + * This is intended to demonstrate how a background worker can be used to + * facilitate a parallel computation. Most of the logic here is fairly + * boilerplate stuff, designed to attach to the shared memory segment, + * notify the user backend that we're alive, and so on. The + * application-specific bits of logic that you'd replace for your own worker + * are attach_to_queues() and copy_messages(). + */ +void +test_shm_mq_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + shm_mq_handle *inqh; + shm_mq_handle *outqh; + volatile test_shm_mq_header *hdr; + int myworkernumber; + PGPROC *registrant; + + /* + * Establish signal handlers. + * + * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as + * it would a normal user backend. To make that happen, we use die(). + */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Connect to the dynamic shared memory segment. + * + * The backend that registered this worker passed us the ID of a shared + * memory segment to which we must attach for further instructions. Once + * we've mapped the segment in our address space, attach to the table of + * contents so we can locate the various data structures we'll need to + * find within the segment. + * + * Note: at this point, we have not created any ResourceOwner in this + * process. This will result in our DSM mapping surviving until process + * exit, which is fine. If there were a ResourceOwner, it would acquire + * ownership of the mapping, but we have no need for that. + */ + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* + * Acquire a worker number. + * + * By convention, the process registering this background worker should + * have stored the control structure at key 0. We look up that key to + * find it. Our worker number gives our identity: there may be just one + * worker involved in this parallel operation, or there may be many. + */ + hdr = shm_toc_lookup(toc, 0, false); + SpinLockAcquire(&hdr->mutex); + myworkernumber = ++hdr->workers_attached; + SpinLockRelease(&hdr->mutex); + if (myworkernumber > hdr->workers_total) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many message queue testing workers already"))); + + /* + * Attach to the appropriate message queues. + */ + attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh); + + /* + * Indicate that we're fully initialized and ready to begin the main part + * of the parallel operation. + * + * Once we signal that we're ready, the user backend is entitled to assume + * that our on_dsm_detach callbacks will fire before we disconnect from + * the shared memory segment and exit. Generally, that means we must have + * attached to all relevant dynamic shared memory data structures by now. + */ + SpinLockAcquire(&hdr->mutex); + ++hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid); + if (registrant == NULL) + { + elog(DEBUG1, "registrant backend has exited prematurely"); + proc_exit(1); + } + SetLatch(®istrant->procLatch); + + /* Do the work. */ + copy_messages(inqh, outqh); + + /* + * We're done. For cleanliness, explicitly detach from the shared memory + * segment (that would happen anyway during process exit, though). + */ + dsm_detach(seg); + proc_exit(1); +} + +/* + * Attach to shared memory message queues. + * + * We use our worker number to determine to which queue we should attach. + * The queues are registered at keys 1..<number-of-workers>. The user backend + * writes to queue #1 and reads from queue #<number-of-workers>; each worker + * reads from the queue whose number is equal to its worker number and writes + * to the next higher-numbered queue. + */ +static void +attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber, + shm_mq_handle **inqhp, shm_mq_handle **outqhp) +{ + shm_mq *inq; + shm_mq *outq; + + inq = shm_toc_lookup(toc, myworkernumber, false); + shm_mq_set_receiver(inq, MyProc); + *inqhp = shm_mq_attach(inq, seg, NULL); + outq = shm_toc_lookup(toc, myworkernumber + 1, false); + shm_mq_set_sender(outq, MyProc); + *outqhp = shm_mq_attach(outq, seg, NULL); +} + +/* + * Loop, receiving and sending messages, until the connection is broken. + * + * This is the "real work" performed by this worker process. Everything that + * happens before this is initialization of one form or another, and everything + * after this point is cleanup. + */ +static void +copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh) +{ + Size len; + void *data; + shm_mq_result res; + + for (;;) + { + /* Notice any interrupts that have occurred. */ + CHECK_FOR_INTERRUPTS(); + + /* Receive a message. */ + res = shm_mq_receive(inqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + break; + + /* Send it back out. */ + res = shm_mq_send(outqh, len, data, false); + if (res != SHM_MQ_SUCCESS) + break; + } +} |