/*------------------------------------------------------------------------- * * parallel_slot.c * Parallel support for front-end parallel database connections * * * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * src/fe_utils/parallel_slot.c * *------------------------------------------------------------------------- */ #if defined(WIN32) && FD_SETSIZE < 1024 #error FD_SETSIZE needs to have been increased #endif #include "postgres_fe.h" #include #include "common/logging.h" #include "fe_utils/cancel.h" #include "fe_utils/parallel_slot.h" #include "fe_utils/query_utils.h" #define ERRCODE_UNDEFINED_TABLE "42P01" static int select_loop(int maxFd, fd_set *workerset); static bool processQueryResult(ParallelSlot *slot, PGresult *result); /* * Process (and delete) a query result. Returns true if there's no problem, * false otherwise. It's up to the handler to decide what constitutes a * problem. */ static bool processQueryResult(ParallelSlot *slot, PGresult *result) { Assert(slot->handler != NULL); /* On failure, the handler should return NULL after freeing the result */ if (!slot->handler(result, slot->connection, slot->handler_context)) return false; /* Ok, we have to free it ourself */ PQclear(result); return true; } /* * Consume all the results generated for the given connection until * nothing remains. If at least one error is encountered, return false. * Note that this will block if the connection is busy. */ static bool consumeQueryResult(ParallelSlot *slot) { bool ok = true; PGresult *result; SetCancelConn(slot->connection); while ((result = PQgetResult(slot->connection)) != NULL) { if (!processQueryResult(slot, result)) ok = false; } ResetCancelConn(); return ok; } /* * Wait until a file descriptor from the given set becomes readable. * * Returns the number of ready descriptors, or -1 on failure (including * getting a cancel request). */ static int select_loop(int maxFd, fd_set *workerset) { int i; fd_set saveSet = *workerset; if (CancelRequested) return -1; for (;;) { /* * On Windows, we need to check once in a while for cancel requests; * on other platforms we rely on select() returning when interrupted. */ struct timeval *tvp; #ifdef WIN32 struct timeval tv = {0, 1000000}; tvp = &tv; #else tvp = NULL; #endif *workerset = saveSet; i = select(maxFd + 1, workerset, NULL, NULL, tvp); #ifdef WIN32 if (i == SOCKET_ERROR) { i = -1; if (WSAGetLastError() == WSAEINTR) errno = EINTR; } #endif if (i < 0 && errno == EINTR) continue; /* ignore this */ if (i < 0 || CancelRequested) return -1; /* but not this */ if (i == 0) continue; /* timeout (Win32 only) */ break; } return i; } /* * Return the offset of a suitable idle slot, or -1 if none are available. If * the given dbname is not null, only idle slots connected to the given * database are considered suitable, otherwise all idle connected slots are * considered suitable. */ static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname) { int i; for (i = 0; i < sa->numslots; i++) { if (sa->slots[i].inUse) continue; if (sa->slots[i].connection == NULL) continue; if (dbname == NULL || strcmp(PQdb(sa->slots[i].connection), dbname) == 0) return i; } return -1; } /* * Return the offset of the first slot without a database connection, or -1 if * all slots are connected. */ static int find_unconnected_slot(const ParallelSlotArray *sa) { int i; for (i = 0; i < sa->numslots; i++) { if (sa->slots[i].inUse) continue; if (sa->slots[i].connection == NULL) return i; } return -1; } /* * Return the offset of the first idle slot, or -1 if all slots are busy. */ static int find_any_idle_slot(const ParallelSlotArray *sa) { int i; for (i = 0; i < sa->numslots; i++) if (!sa->slots[i].inUse) return i; return -1; } /* * Wait for any slot's connection to have query results, consume the results, * and update the slot's status as appropriate. Returns true on success, * false on cancellation, on error, or if no slots are connected. */ static bool wait_on_slots(ParallelSlotArray *sa) { int i; fd_set slotset; int maxFd = 0; PGconn *cancelconn = NULL; /* We must reconstruct the fd_set for each call to select_loop */ FD_ZERO(&slotset); for (i = 0; i < sa->numslots; i++) { int sock; /* We shouldn't get here if we still have slots without connections */ Assert(sa->slots[i].connection != NULL); sock = PQsocket(sa->slots[i].connection); /* * We don't really expect any connections to lose their sockets after * startup, but just in case, cope by ignoring them. */ if (sock < 0) continue; /* Keep track of the first valid connection we see. */ if (cancelconn == NULL) cancelconn = sa->slots[i].connection; FD_SET(sock, &slotset); if (sock > maxFd) maxFd = sock; } /* * If we get this far with no valid connections, processing cannot * continue. */ if (cancelconn == NULL) return false; SetCancelConn(cancelconn); i = select_loop(maxFd, &slotset); ResetCancelConn(); /* failure? */ if (i < 0) return false; for (i = 0; i < sa->numslots; i++) { int sock; sock = PQsocket(sa->slots[i].connection); if (sock >= 0 && FD_ISSET(sock, &slotset)) { /* select() says input is available, so consume it */ PQconsumeInput(sa->slots[i].connection); } /* Collect result(s) as long as any are available */ while (!PQisBusy(sa->slots[i].connection)) { PGresult *result = PQgetResult(sa->slots[i].connection); if (result != NULL) { /* Handle and discard the command result */ if (!processQueryResult(&sa->slots[i], result)) return false; } else { /* This connection has become idle */ sa->slots[i].inUse = false; ParallelSlotClearHandler(&sa->slots[i]); break; } } } return true; } /* * Open a new database connection using the stored connection parameters and * optionally a given dbname if not null, execute the stored initial command if * any, and associate the new connection with the given slot. */ static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname) { const char *old_override; ParallelSlot *slot = &sa->slots[slotno]; old_override = sa->cparams->override_dbname; if (dbname) sa->cparams->override_dbname = dbname; slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true); sa->cparams->override_dbname = old_override; /* * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to * FD_SET() and allied macros. Windows defines it as a ceiling on the * count of file descriptors in the set, not a ceiling on the value of * each file descriptor; see * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select * and * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set. * We can't ignore that, because Windows starts file descriptors at a * higher value, delays reuse, and skips values. With less than ten * concurrent file descriptors, opened and closed rapidly, one can reach * file descriptor 1024. * * Doing a hard exit here is a bit grotty, but it doesn't seem worth * complicating the API to make it less grotty. */ #ifdef WIN32 if (slotno >= FD_SETSIZE) { pg_log_error("too many jobs for this platform: %d", slotno); exit(1); } #else { int fd = PQsocket(slot->connection); if (fd >= FD_SETSIZE) { pg_log_error("socket file descriptor out of range for select(): %d", fd); pg_log_error_hint("Try fewer jobs."); exit(1); } } #endif /* Setup the connection using the supplied command, if any. */ if (sa->initcmd) executeCommand(slot->connection, sa->initcmd, sa->echo); } /* * ParallelSlotsGetIdle * Return a connection slot that is ready to execute a command. * * The slot returned is chosen as follows: * * If any idle slot already has an open connection, and if either dbname is * null or the existing connection is to the given database, that slot will be * returned allowing the connection to be reused. * * Otherwise, if any idle slot is not yet connected to any database, the slot * will be returned with it's connection opened using the stored cparams and * optionally the given dbname if not null. * * Otherwise, if any idle slot exists, an idle slot will be chosen and returned * after having it's connection disconnected and reconnected using the stored * cparams and optionally the given dbname if not null. * * Otherwise, if any slots have connections that are busy, we loop on select() * until one socket becomes available. When this happens, we read the whole * set and mark as free all sockets that become available. We then select a * slot using the same rules as above. * * Otherwise, we cannot return a slot, which is an error, and NULL is returned. * * For any connection created, if the stored initcmd is not null, it will be * executed as a command on the newly formed connection before the slot is * returned. * * If an error occurs, NULL is returned. */ ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname) { int offset; Assert(sa); Assert(sa->numslots > 0); while (1) { /* First choice: a slot already connected to the desired database. */ offset = find_matching_idle_slot(sa, dbname); if (offset >= 0) { sa->slots[offset].inUse = true; return &sa->slots[offset]; } /* Second choice: a slot not connected to any database. */ offset = find_unconnected_slot(sa); if (offset >= 0) { connect_slot(sa, offset, dbname); sa->slots[offset].inUse = true; return &sa->slots[offset]; } /* Third choice: a slot connected to the wrong database. */ offset = find_any_idle_slot(sa); if (offset >= 0) { disconnectDatabase(sa->slots[offset].connection); sa->slots[offset].connection = NULL; connect_slot(sa, offset, dbname); sa->slots[offset].inUse = true; return &sa->slots[offset]; } /* * Fourth choice: block until one or more slots become available. If * any slots hit a fatal error, we'll find out about that here and * return NULL. */ if (!wait_on_slots(sa)) return NULL; } } /* * ParallelSlotsSetup * Prepare a set of parallel slots but do not connect to any database. * * This creates and initializes a set of slots, marking all parallel slots as * free and ready to use. Establishing connections is delayed until requesting * a free slot. The cparams, progname, echo, and initcmd are stored for later * use and must remain valid for the lifetime of the returned array. */ ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd) { ParallelSlotArray *sa; Assert(numslots > 0); Assert(cparams != NULL); Assert(progname != NULL); sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) + numslots * sizeof(ParallelSlot)); sa->numslots = numslots; sa->cparams = cparams; sa->progname = progname; sa->echo = echo; sa->initcmd = initcmd; return sa; } /* * ParallelSlotsAdoptConn * Assign an open connection to the slots array for reuse. * * This turns over ownership of an open connection to a slots array. The * caller should not further use or close the connection. All the connection's * parameters (user, host, port, etc.) except possibly dbname should match * those of the slots array's cparams, as given in ParallelSlotsSetup. If * these parameters differ, subsequent behavior is undefined. */ void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn) { int offset; offset = find_unconnected_slot(sa); if (offset >= 0) sa->slots[offset].connection = conn; else disconnectDatabase(conn); } /* * ParallelSlotsTerminate * Clean up a set of parallel slots * * Iterate through all connections in a given set of ParallelSlots and * terminate all connections. */ void ParallelSlotsTerminate(ParallelSlotArray *sa) { int i; for (i = 0; i < sa->numslots; i++) { PGconn *conn = sa->slots[i].connection; if (conn == NULL) continue; disconnectDatabase(conn); } } /* * ParallelSlotsWaitCompletion * * Wait for all connections to finish, returning false if at least one * error has been found on the way. */ bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa) { int i; for (i = 0; i < sa->numslots; i++) { if (sa->slots[i].connection == NULL) continue; if (!consumeQueryResult(&sa->slots[i])) return false; /* Mark connection as idle */ sa->slots[i].inUse = false; ParallelSlotClearHandler(&sa->slots[i]); } return true; } /* * TableCommandResultHandler * * ParallelSlotResultHandler for results of commands (not queries) against * tables. * * Requires that the result status is either PGRES_COMMAND_OK or an error about * a missing table. This is useful for utilities that compile a list of tables * to process and then run commands (vacuum, reindex, or whatever) against * those tables, as there is a race condition between the time the list is * compiled and the time the command attempts to open the table. * * For missing tables, logs an error but allows processing to continue. * * For all other errors, logs an error and terminates further processing. * * res: PGresult from the query executed on the slot's connection * conn: connection belonging to the slot * context: unused */ bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context) { Assert(res != NULL); Assert(conn != NULL); /* * If it's an error, report it. Errors about a missing table are harmless * so we continue processing; but die for other errors. */ if (PQresultStatus(res) != PGRES_COMMAND_OK) { char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE); pg_log_error("processing of database \"%s\" failed: %s", PQdb(conn), PQerrorMessage(conn)); if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0) { PQclear(res); return false; } } return true; }