summaryrefslogtreecommitdiffstats
path: root/src/bin/scripts/scripts_parallel.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:19:15 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:19:15 +0000
commit6eb9c5a5657d1fe77b55cc261450f3538d35a94d (patch)
tree657d8194422a5daccecfd42d654b8a245ef7b4c8 /src/bin/scripts/scripts_parallel.c
parentInitial commit. (diff)
downloadpostgresql-13-upstream.tar.xz
postgresql-13-upstream.zip
Adding upstream version 13.4.upstream/13.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/scripts/scripts_parallel.c')
-rw-r--r--src/bin/scripts/scripts_parallel.c284
1 files changed, 284 insertions, 0 deletions
diff --git a/src/bin/scripts/scripts_parallel.c b/src/bin/scripts/scripts_parallel.c
new file mode 100644
index 0000000..ec264a2
--- /dev/null
+++ b/src/bin/scripts/scripts_parallel.c
@@ -0,0 +1,284 @@
+/*-------------------------------------------------------------------------
+ *
+ * scripts_parallel.c
+ * Parallel support for bin/scripts/
+ *
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/bin/scripts/scripts_parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifdef WIN32
+#define FD_SETSIZE 1024 /* must set before winsock2.h is included */
+#endif
+
+#include "postgres_fe.h"
+
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "common.h"
+#include "common/logging.h"
+#include "fe_utils/cancel.h"
+#include "scripts_parallel.h"
+
+static void init_slot(ParallelSlot *slot, PGconn *conn);
+static int select_loop(int maxFd, fd_set *workerset);
+
+static void
+init_slot(ParallelSlot *slot, PGconn *conn)
+{
+ slot->connection = conn;
+ /* Initially assume connection is idle */
+ slot->isFree = true;
+}
+
+/*
+ * Wait until a file descriptor from the given set becomes readable.
+ *
+ * Returns the number of ready descriptors, or -1 on failure (including
+ * getting a cancel request).
+ */
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+ int i;
+ fd_set saveSet = *workerset;
+
+ if (CancelRequested)
+ return -1;
+
+ for (;;)
+ {
+ /*
+ * On Windows, we need to check once in a while for cancel requests;
+ * on other platforms we rely on select() returning when interrupted.
+ */
+ struct timeval *tvp;
+#ifdef WIN32
+ struct timeval tv = {0, 1000000};
+
+ tvp = &tv;
+#else
+ tvp = NULL;
+#endif
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+ if (i == SOCKET_ERROR)
+ {
+ i = -1;
+
+ if (WSAGetLastError() == WSAEINTR)
+ errno = EINTR;
+ }
+#endif
+
+ if (i < 0 && errno == EINTR)
+ continue; /* ignore this */
+ if (i < 0 || CancelRequested)
+ return -1; /* but not this */
+ if (i == 0)
+ continue; /* timeout (Win32 only) */
+ break;
+ }
+
+ return i;
+}
+
+/*
+ * ParallelSlotsGetIdle
+ * Return a connection slot that is ready to execute a command.
+ *
+ * This returns the first slot we find that is marked isFree, if one is;
+ * otherwise, we loop on select() until one socket becomes available. When
+ * this happens, we read the whole set and mark as free all sockets that
+ * become available. If an error occurs, NULL is returned.
+ */
+ParallelSlot *
+ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
+{
+ int i;
+ int firstFree = -1;
+
+ /*
+ * Look for any connection currently free. If there is one, mark it as
+ * taken and let the caller know the slot to use.
+ */
+ for (i = 0; i < numslots; i++)
+ {
+ if (slots[i].isFree)
+ {
+ slots[i].isFree = false;
+ return slots + i;
+ }
+ }
+
+ /*
+ * No free slot found, so wait until one of the connections has finished
+ * its task and return the available slot.
+ */
+ while (firstFree < 0)
+ {
+ fd_set slotset;
+ int maxFd = 0;
+
+ /* We must reconstruct the fd_set for each call to select_loop */
+ FD_ZERO(&slotset);
+
+ for (i = 0; i < numslots; i++)
+ {
+ int sock = PQsocket(slots[i].connection);
+
+ /*
+ * We don't really expect any connections to lose their sockets
+ * after startup, but just in case, cope by ignoring them.
+ */
+ if (sock < 0)
+ continue;
+
+ FD_SET(sock, &slotset);
+ if (sock > maxFd)
+ maxFd = sock;
+ }
+
+ SetCancelConn(slots->connection);
+ i = select_loop(maxFd, &slotset);
+ ResetCancelConn();
+
+ /* failure? */
+ if (i < 0)
+ return NULL;
+
+ for (i = 0; i < numslots; i++)
+ {
+ int sock = PQsocket(slots[i].connection);
+
+ if (sock >= 0 && FD_ISSET(sock, &slotset))
+ {
+ /* select() says input is available, so consume it */
+ PQconsumeInput(slots[i].connection);
+ }
+
+ /* Collect result(s) as long as any are available */
+ while (!PQisBusy(slots[i].connection))
+ {
+ PGresult *result = PQgetResult(slots[i].connection);
+
+ if (result != NULL)
+ {
+ /* Check and discard the command result */
+ if (!processQueryResult(slots[i].connection, result))
+ return NULL;
+ }
+ else
+ {
+ /* This connection has become idle */
+ slots[i].isFree = true;
+ if (firstFree < 0)
+ firstFree = i;
+ break;
+ }
+ }
+ }
+ }
+
+ slots[firstFree].isFree = false;
+ return slots + firstFree;
+}
+
+/*
+ * ParallelSlotsSetup
+ * Prepare a set of parallel slots to use on a given database.
+ *
+ * This creates and initializes a set of connections to the database
+ * using the information given by the caller, marking all parallel slots
+ * as free and ready to use. "conn" is an initial connection set up
+ * by the caller and is associated with the first slot in the parallel
+ * set.
+ */
+ParallelSlot *
+ParallelSlotsSetup(const ConnParams *cparams,
+ const char *progname, bool echo,
+ PGconn *conn, int numslots)
+{
+ ParallelSlot *slots;
+ int i;
+
+ Assert(conn != NULL);
+
+ slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
+ init_slot(slots, conn);
+ if (numslots > 1)
+ {
+ for (i = 1; i < numslots; i++)
+ {
+ conn = connectDatabase(cparams, progname, echo, false, true);
+
+ /*
+ * Fail and exit immediately if trying to use a socket in an
+ * unsupported range. POSIX requires open(2) to use the lowest
+ * unused file descriptor and the hint given relies on that.
+ */
+ if (PQsocket(conn) >= FD_SETSIZE)
+ {
+ pg_log_fatal("too many jobs for this platform -- try %d", i);
+ exit(1);
+ }
+
+ init_slot(slots + i, conn);
+ }
+ }
+
+ return slots;
+}
+
+/*
+ * ParallelSlotsTerminate
+ * Clean up a set of parallel slots
+ *
+ * Iterate through all connections in a given set of ParallelSlots and
+ * terminate all connections.
+ */
+void
+ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
+{
+ int i;
+
+ for (i = 0; i < numslots; i++)
+ {
+ PGconn *conn = slots[i].connection;
+
+ if (conn == NULL)
+ continue;
+
+ disconnectDatabase(conn);
+ }
+}
+
+/*
+ * ParallelSlotsWaitCompletion
+ *
+ * Wait for all connections to finish, returning false if at least one
+ * error has been found on the way.
+ */
+bool
+ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
+{
+ int i;
+
+ for (i = 0; i < numslots; i++)
+ {
+ if (!consumeQueryResult((slots + i)->connection))
+ return false;
+ }
+
+ return true;
+}