summaryrefslogtreecommitdiffstats
path: root/src/fe_utils/parallel_slot.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:46:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:46:48 +0000
commit311bcfc6b3acdd6fd152798c7f287ddf74fa2a98 (patch)
tree0ec307299b1dada3701e42f4ca6eda57d708261e /src/fe_utils/parallel_slot.c
parentInitial commit. (diff)
downloadpostgresql-15-upstream.tar.xz
postgresql-15-upstream.zip
Adding upstream version 15.4.upstream/15.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fe_utils/parallel_slot.c')
-rw-r--r--src/fe_utils/parallel_slot.c530
1 files changed, 530 insertions, 0 deletions
diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c
new file mode 100644
index 0000000..4bf0536
--- /dev/null
+++ b/src/fe_utils/parallel_slot.c
@@ -0,0 +1,530 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel_slot.c
+ * Parallel support for front-end parallel database connections
+ *
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/fe_utils/parallel_slot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifdef WIN32
+#define FD_SETSIZE 1024 /* must set before winsock2.h is included */
+#endif
+
+#include "postgres_fe.h"
+
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "common/logging.h"
+#include "fe_utils/cancel.h"
+#include "fe_utils/parallel_slot.h"
+#include "fe_utils/query_utils.h"
+
+#define ERRCODE_UNDEFINED_TABLE "42P01"
+
+static int select_loop(int maxFd, fd_set *workerset);
+static bool processQueryResult(ParallelSlot *slot, PGresult *result);
+
+/*
+ * Process (and delete) a query result. Returns true if there's no problem,
+ * false otherwise. It's up to the handler to decide what constitutes a
+ * problem.
+ */
+static bool
+processQueryResult(ParallelSlot *slot, PGresult *result)
+{
+ Assert(slot->handler != NULL);
+
+ /* On failure, the handler should return NULL after freeing the result */
+ if (!slot->handler(result, slot->connection, slot->handler_context))
+ return false;
+
+ /* Ok, we have to free it ourself */
+ PQclear(result);
+ return true;
+}
+
+/*
+ * Consume all the results generated for the given connection until
+ * nothing remains. If at least one error is encountered, return false.
+ * Note that this will block if the connection is busy.
+ */
+static bool
+consumeQueryResult(ParallelSlot *slot)
+{
+ bool ok = true;
+ PGresult *result;
+
+ SetCancelConn(slot->connection);
+ while ((result = PQgetResult(slot->connection)) != NULL)
+ {
+ if (!processQueryResult(slot, result))
+ ok = false;
+ }
+ ResetCancelConn();
+ return ok;
+}
+
+/*
+ * Wait until a file descriptor from the given set becomes readable.
+ *
+ * Returns the number of ready descriptors, or -1 on failure (including
+ * getting a cancel request).
+ */
+static int
+select_loop(int maxFd, fd_set *workerset)
+{
+ int i;
+ fd_set saveSet = *workerset;
+
+ if (CancelRequested)
+ return -1;
+
+ for (;;)
+ {
+ /*
+ * On Windows, we need to check once in a while for cancel requests;
+ * on other platforms we rely on select() returning when interrupted.
+ */
+ struct timeval *tvp;
+#ifdef WIN32
+ struct timeval tv = {0, 1000000};
+
+ tvp = &tv;
+#else
+ tvp = NULL;
+#endif
+
+ *workerset = saveSet;
+ i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+ if (i == SOCKET_ERROR)
+ {
+ i = -1;
+
+ if (WSAGetLastError() == WSAEINTR)
+ errno = EINTR;
+ }
+#endif
+
+ if (i < 0 && errno == EINTR)
+ continue; /* ignore this */
+ if (i < 0 || CancelRequested)
+ return -1; /* but not this */
+ if (i == 0)
+ continue; /* timeout (Win32 only) */
+ break;
+ }
+
+ return i;
+}
+
+/*
+ * Return the offset of a suitable idle slot, or -1 if none are available. If
+ * the given dbname is not null, only idle slots connected to the given
+ * database are considered suitable, otherwise all idle connected slots are
+ * considered suitable.
+ */
+static int
+find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ if (sa->slots[i].inUse)
+ continue;
+
+ if (sa->slots[i].connection == NULL)
+ continue;
+
+ if (dbname == NULL ||
+ strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
+ return i;
+ }
+ return -1;
+}
+
+/*
+ * Return the offset of the first slot without a database connection, or -1 if
+ * all slots are connected.
+ */
+static int
+find_unconnected_slot(const ParallelSlotArray *sa)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ if (sa->slots[i].inUse)
+ continue;
+
+ if (sa->slots[i].connection == NULL)
+ return i;
+ }
+
+ return -1;
+}
+
+/*
+ * Return the offset of the first idle slot, or -1 if all slots are busy.
+ */
+static int
+find_any_idle_slot(const ParallelSlotArray *sa)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ if (!sa->slots[i].inUse)
+ return i;
+
+ return -1;
+}
+
+/*
+ * Wait for any slot's connection to have query results, consume the results,
+ * and update the slot's status as appropriate. Returns true on success,
+ * false on cancellation, on error, or if no slots are connected.
+ */
+static bool
+wait_on_slots(ParallelSlotArray *sa)
+{
+ int i;
+ fd_set slotset;
+ int maxFd = 0;
+ PGconn *cancelconn = NULL;
+
+ /* We must reconstruct the fd_set for each call to select_loop */
+ FD_ZERO(&slotset);
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ int sock;
+
+ /* We shouldn't get here if we still have slots without connections */
+ Assert(sa->slots[i].connection != NULL);
+
+ sock = PQsocket(sa->slots[i].connection);
+
+ /*
+ * We don't really expect any connections to lose their sockets after
+ * startup, but just in case, cope by ignoring them.
+ */
+ if (sock < 0)
+ continue;
+
+ /* Keep track of the first valid connection we see. */
+ if (cancelconn == NULL)
+ cancelconn = sa->slots[i].connection;
+
+ FD_SET(sock, &slotset);
+ if (sock > maxFd)
+ maxFd = sock;
+ }
+
+ /*
+ * If we get this far with no valid connections, processing cannot
+ * continue.
+ */
+ if (cancelconn == NULL)
+ return false;
+
+ SetCancelConn(cancelconn);
+ i = select_loop(maxFd, &slotset);
+ ResetCancelConn();
+
+ /* failure? */
+ if (i < 0)
+ return false;
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ int sock;
+
+ sock = PQsocket(sa->slots[i].connection);
+
+ if (sock >= 0 && FD_ISSET(sock, &slotset))
+ {
+ /* select() says input is available, so consume it */
+ PQconsumeInput(sa->slots[i].connection);
+ }
+
+ /* Collect result(s) as long as any are available */
+ while (!PQisBusy(sa->slots[i].connection))
+ {
+ PGresult *result = PQgetResult(sa->slots[i].connection);
+
+ if (result != NULL)
+ {
+ /* Handle and discard the command result */
+ if (!processQueryResult(&sa->slots[i], result))
+ return false;
+ }
+ else
+ {
+ /* This connection has become idle */
+ sa->slots[i].inUse = false;
+ ParallelSlotClearHandler(&sa->slots[i]);
+ break;
+ }
+ }
+ }
+ return true;
+}
+
+/*
+ * Open a new database connection using the stored connection parameters and
+ * optionally a given dbname if not null, execute the stored initial command if
+ * any, and associate the new connection with the given slot.
+ */
+static void
+connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
+{
+ const char *old_override;
+ ParallelSlot *slot = &sa->slots[slotno];
+
+ old_override = sa->cparams->override_dbname;
+ if (dbname)
+ sa->cparams->override_dbname = dbname;
+ slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
+ sa->cparams->override_dbname = old_override;
+
+ if (PQsocket(slot->connection) >= FD_SETSIZE)
+ pg_fatal("too many jobs for this platform");
+
+ /* Setup the connection using the supplied command, if any. */
+ if (sa->initcmd)
+ executeCommand(slot->connection, sa->initcmd, sa->echo);
+}
+
+/*
+ * ParallelSlotsGetIdle
+ * Return a connection slot that is ready to execute a command.
+ *
+ * The slot returned is chosen as follows:
+ *
+ * If any idle slot already has an open connection, and if either dbname is
+ * null or the existing connection is to the given database, that slot will be
+ * returned allowing the connection to be reused.
+ *
+ * Otherwise, if any idle slot is not yet connected to any database, the slot
+ * will be returned with it's connection opened using the stored cparams and
+ * optionally the given dbname if not null.
+ *
+ * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
+ * after having it's connection disconnected and reconnected using the stored
+ * cparams and optionally the given dbname if not null.
+ *
+ * Otherwise, if any slots have connections that are busy, we loop on select()
+ * until one socket becomes available. When this happens, we read the whole
+ * set and mark as free all sockets that become available. We then select a
+ * slot using the same rules as above.
+ *
+ * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
+ *
+ * For any connection created, if the stored initcmd is not null, it will be
+ * executed as a command on the newly formed connection before the slot is
+ * returned.
+ *
+ * If an error occurs, NULL is returned.
+ */
+ParallelSlot *
+ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
+{
+ int offset;
+
+ Assert(sa);
+ Assert(sa->numslots > 0);
+
+ while (1)
+ {
+ /* First choice: a slot already connected to the desired database. */
+ offset = find_matching_idle_slot(sa, dbname);
+ if (offset >= 0)
+ {
+ sa->slots[offset].inUse = true;
+ return &sa->slots[offset];
+ }
+
+ /* Second choice: a slot not connected to any database. */
+ offset = find_unconnected_slot(sa);
+ if (offset >= 0)
+ {
+ connect_slot(sa, offset, dbname);
+ sa->slots[offset].inUse = true;
+ return &sa->slots[offset];
+ }
+
+ /* Third choice: a slot connected to the wrong database. */
+ offset = find_any_idle_slot(sa);
+ if (offset >= 0)
+ {
+ disconnectDatabase(sa->slots[offset].connection);
+ sa->slots[offset].connection = NULL;
+ connect_slot(sa, offset, dbname);
+ sa->slots[offset].inUse = true;
+ return &sa->slots[offset];
+ }
+
+ /*
+ * Fourth choice: block until one or more slots become available. If
+ * any slots hit a fatal error, we'll find out about that here and
+ * return NULL.
+ */
+ if (!wait_on_slots(sa))
+ return NULL;
+ }
+}
+
+/*
+ * ParallelSlotsSetup
+ * Prepare a set of parallel slots but do not connect to any database.
+ *
+ * This creates and initializes a set of slots, marking all parallel slots as
+ * free and ready to use. Establishing connections is delayed until requesting
+ * a free slot. The cparams, progname, echo, and initcmd are stored for later
+ * use and must remain valid for the lifetime of the returned array.
+ */
+ParallelSlotArray *
+ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
+ bool echo, const char *initcmd)
+{
+ ParallelSlotArray *sa;
+
+ Assert(numslots > 0);
+ Assert(cparams != NULL);
+ Assert(progname != NULL);
+
+ sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
+ numslots * sizeof(ParallelSlot));
+
+ sa->numslots = numslots;
+ sa->cparams = cparams;
+ sa->progname = progname;
+ sa->echo = echo;
+ sa->initcmd = initcmd;
+
+ return sa;
+}
+
+/*
+ * ParallelSlotsAdoptConn
+ * Assign an open connection to the slots array for reuse.
+ *
+ * This turns over ownership of an open connection to a slots array. The
+ * caller should not further use or close the connection. All the connection's
+ * parameters (user, host, port, etc.) except possibly dbname should match
+ * those of the slots array's cparams, as given in ParallelSlotsSetup. If
+ * these parameters differ, subsequent behavior is undefined.
+ */
+void
+ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
+{
+ int offset;
+
+ offset = find_unconnected_slot(sa);
+ if (offset >= 0)
+ sa->slots[offset].connection = conn;
+ else
+ disconnectDatabase(conn);
+}
+
+/*
+ * ParallelSlotsTerminate
+ * Clean up a set of parallel slots
+ *
+ * Iterate through all connections in a given set of ParallelSlots and
+ * terminate all connections.
+ */
+void
+ParallelSlotsTerminate(ParallelSlotArray *sa)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ PGconn *conn = sa->slots[i].connection;
+
+ if (conn == NULL)
+ continue;
+
+ disconnectDatabase(conn);
+ }
+}
+
+/*
+ * ParallelSlotsWaitCompletion
+ *
+ * Wait for all connections to finish, returning false if at least one
+ * error has been found on the way.
+ */
+bool
+ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ if (sa->slots[i].connection == NULL)
+ continue;
+ if (!consumeQueryResult(&sa->slots[i]))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * TableCommandResultHandler
+ *
+ * ParallelSlotResultHandler for results of commands (not queries) against
+ * tables.
+ *
+ * Requires that the result status is either PGRES_COMMAND_OK or an error about
+ * a missing table. This is useful for utilities that compile a list of tables
+ * to process and then run commands (vacuum, reindex, or whatever) against
+ * those tables, as there is a race condition between the time the list is
+ * compiled and the time the command attempts to open the table.
+ *
+ * For missing tables, logs an error but allows processing to continue.
+ *
+ * For all other errors, logs an error and terminates further processing.
+ *
+ * res: PGresult from the query executed on the slot's connection
+ * conn: connection belonging to the slot
+ * context: unused
+ */
+bool
+TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
+{
+ Assert(res != NULL);
+ Assert(conn != NULL);
+
+ /*
+ * If it's an error, report it. Errors about a missing table are harmless
+ * so we continue processing; but die for other errors.
+ */
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+ pg_log_error("processing of database \"%s\" failed: %s",
+ PQdb(conn), PQerrorMessage(conn));
+
+ if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+ {
+ PQclear(res);
+ return false;
+ }
+ }
+
+ return true;
+}