diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/bin/pg_dump/parallel.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/bin/pg_dump/parallel.c | 1806 |
1 files changed, 1806 insertions, 0 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c new file mode 100644 index 0000000..f1577e7 --- /dev/null +++ b/src/bin/pg_dump/parallel.c @@ -0,0 +1,1806 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * + * Parallel support for pg_dump and pg_restore + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/parallel.c + * + *------------------------------------------------------------------------- + */ + +/* + * Parallel operation works like this: + * + * The original, leader process calls ParallelBackupStart(), which forks off + * the desired number of worker processes, which each enter WaitForCommands(). + * + * The leader process dispatches an individual work item to one of the worker + * processes in DispatchJobForTocEntry(). We send a command string such as + * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. + * The worker process receives and decodes the command and passes it to the + * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, + * which are routines of the current archive format. That routine performs + * the required action (dump or restore) and returns an integer status code. + * This is passed back to the leader where we pass it to the + * ParallelCompletionPtr callback function that was passed to + * DispatchJobForTocEntry(). The callback function does state updating + * for the leader control logic in pg_backup_archiver.c. + * + * In principle additional archive-format-specific information might be needed + * in commands or worker status responses, but so far that hasn't proved + * necessary, since workers have full copies of the ArchiveHandle/TocEntry + * data structures. Remember that we have forked off the workers only after + * we have read in the catalog. That's why our worker processes can also + * access the catalog information. (In the Windows case, the workers are + * threads in the same process. To avoid problems, they work with cloned + * copies of the Archive data structure; see RunWorker().) + * + * In the leader process, the workerStatus field for each worker has one of + * the following values: + * WRKR_NOT_STARTED: we've not yet forked this worker + * WRKR_IDLE: it's waiting for a command + * WRKR_WORKING: it's working on a command + * WRKR_TERMINATED: process ended + * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING + * state, and must be NULL in other states. + */ + +#include "postgres_fe.h" + +#ifndef WIN32 +#include <sys/wait.h> +#include <signal.h> +#include <unistd.h> +#include <fcntl.h> +#endif +#ifdef HAVE_SYS_SELECT_H +#include <sys/select.h> +#endif + +#include "fe_utils/string_utils.h" +#include "parallel.h" +#include "pg_backup_utils.h" +#include "port/pg_bswap.h" + +/* Mnemonic macros for indexing the fd array returned by pipe(2) */ +#define PIPE_READ 0 +#define PIPE_WRITE 1 + +#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ + +/* Worker process statuses */ +typedef enum +{ + WRKR_NOT_STARTED = 0, + WRKR_IDLE, + WRKR_WORKING, + WRKR_TERMINATED +} T_WorkerStatus; + +#define WORKER_IS_RUNNING(workerStatus) \ + ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING) + +/* + * Private per-parallel-worker state (typedef for this is in parallel.h). + * + * Much of this is valid only in the leader process (or, on Windows, should + * be touched only by the leader thread). But the AH field should be touched + * only by workers. The pipe descriptors are valid everywhere. + */ +struct ParallelSlot +{ + T_WorkerStatus workerStatus; /* see enum above */ + + /* These fields are valid if workerStatus == WRKR_WORKING: */ + ParallelCompletionPtr callback; /* function to call on completion */ + void *callback_data; /* passthrough data for it */ + + ArchiveHandle *AH; /* Archive data worker is using */ + + int pipeRead; /* leader's end of the pipes */ + int pipeWrite; + int pipeRevRead; /* child's end of the pipes */ + int pipeRevWrite; + + /* Child process/thread identity info: */ +#ifdef WIN32 + uintptr_t hThread; + unsigned int threadId; +#else + pid_t pid; +#endif +}; + +#ifdef WIN32 + +/* + * Structure to hold info passed by _beginthreadex() to the function it calls + * via its single allowed argument. + */ +typedef struct +{ + ArchiveHandle *AH; /* leader database connection */ + ParallelSlot *slot; /* this worker's parallel slot */ +} WorkerInfo; + +/* Windows implementation of pipe access */ +static int pgpipe(int handles[2]); +#define piperead(a,b,c) recv(a,b,c,0) +#define pipewrite(a,b,c) send(a,b,c,0) + +#else /* !WIN32 */ + +/* Non-Windows implementation of pipe access */ +#define pgpipe(a) pipe(a) +#define piperead(a,b,c) read(a,b,c) +#define pipewrite(a,b,c) write(a,b,c) + +#endif /* WIN32 */ + +/* + * State info for archive_close_connection() shutdown callback. + */ +typedef struct ShutdownInformation +{ + ParallelState *pstate; + Archive *AHX; +} ShutdownInformation; + +static ShutdownInformation shutdown_info; + +/* + * State info for signal handling. + * We assume signal_info initializes to zeroes. + * + * On Unix, myAH is the leader DB connection in the leader process, and the + * worker's own connection in worker processes. On Windows, we have only one + * instance of signal_info, so myAH is the leader connection and the worker + * connections must be dug out of pstate->parallelSlot[]. + */ +typedef struct DumpSignalInformation +{ + ArchiveHandle *myAH; /* database connection to issue cancel for */ + ParallelState *pstate; /* parallel state, if any */ + bool handler_set; /* signal handler set up in this process? */ +#ifndef WIN32 + bool am_worker; /* am I a worker process? */ +#endif +} DumpSignalInformation; + +static volatile DumpSignalInformation signal_info; + +#ifdef WIN32 +static CRITICAL_SECTION signal_info_lock; +#endif + +/* + * Write a simple string to stderr --- must be safe in a signal handler. + * We ignore the write() result since there's not much we could do about it. + * Certain compilers make that harder than it ought to be. + */ +#define write_stderr(str) \ + do { \ + const char *str_ = (str); \ + int rc_; \ + rc_ = write(fileno(stderr), str_, strlen(str_)); \ + (void) rc_; \ + } while (0) + + +#ifdef WIN32 +/* file-scope variables */ +static DWORD tls_index; + +/* globally visible variables (needed by exit_nicely) */ +bool parallel_init_done = false; +DWORD mainThreadId; +#endif /* WIN32 */ + +/* Local function prototypes */ +static ParallelSlot *GetMyPSlot(ParallelState *pstate); +static void archive_close_connection(int code, void *arg); +static void ShutdownWorkersHard(ParallelState *pstate); +static void WaitForTerminatingWorkers(ParallelState *pstate); +static void setup_cancel_handler(void); +static void set_cancel_pstate(ParallelState *pstate); +static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH); +static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); +static int GetIdleWorker(ParallelState *pstate); +static bool HasEveryWorkerTerminated(ParallelState *pstate); +static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); +static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); +static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, + bool do_wait); +static char *getMessageFromLeader(int pipefd[2]); +static void sendMessageToLeader(int pipefd[2], const char *str); +static int select_loop(int maxFd, fd_set *workerset); +static char *getMessageFromWorker(ParallelState *pstate, + bool do_wait, int *worker); +static void sendMessageToWorker(ParallelState *pstate, + int worker, const char *str); +static char *readMessageFromPipe(int fd); + +#define messageStartsWith(msg, prefix) \ + (strncmp(msg, prefix, strlen(prefix)) == 0) + + +/* + * Initialize parallel dump support --- should be called early in process + * startup. (Currently, this is called whether or not we intend parallel + * activity.) + */ +void +init_parallel_dump_utils(void) +{ +#ifdef WIN32 + if (!parallel_init_done) + { + WSADATA wsaData; + int err; + + /* Prepare for threaded operation */ + tls_index = TlsAlloc(); + mainThreadId = GetCurrentThreadId(); + + /* Initialize socket access */ + err = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (err != 0) + { + pg_log_error("%s() failed: error code %d", "WSAStartup", err); + exit_nicely(1); + } + + parallel_init_done = true; + } +#endif +} + +/* + * Find the ParallelSlot for the current worker process or thread. + * + * Returns NULL if no matching slot is found (this implies we're the leader). + */ +static ParallelSlot * +GetMyPSlot(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->numWorkers; i++) + { +#ifdef WIN32 + if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) +#else + if (pstate->parallelSlot[i].pid == getpid()) +#endif + return &(pstate->parallelSlot[i]); + } + + return NULL; +} + +/* + * A thread-local version of getLocalPQExpBuffer(). + * + * Non-reentrant but reduces memory leakage: we'll consume one buffer per + * thread, which is much better than one per fmtId/fmtQualifiedId call. + */ +#ifdef WIN32 +static PQExpBuffer +getThreadLocalPQExpBuffer(void) +{ + /* + * The Tls code goes awry if we use a static var, so we provide for both + * static and auto, and omit any use of the static var when using Tls. We + * rely on TlsGetValue() to return 0 if the value is not yet set. + */ + static PQExpBuffer s_id_return = NULL; + PQExpBuffer id_return; + + if (parallel_init_done) + id_return = (PQExpBuffer) TlsGetValue(tls_index); + else + id_return = s_id_return; + + if (id_return) /* first time through? */ + { + /* same buffer, just wipe contents */ + resetPQExpBuffer(id_return); + } + else + { + /* new buffer */ + id_return = createPQExpBuffer(); + if (parallel_init_done) + TlsSetValue(tls_index, id_return); + else + s_id_return = id_return; + } + + return id_return; +} +#endif /* WIN32 */ + +/* + * pg_dump and pg_restore call this to register the cleanup handler + * as soon as they've created the ArchiveHandle. + */ +void +on_exit_close_archive(Archive *AHX) +{ + shutdown_info.AHX = AHX; + on_exit_nicely(archive_close_connection, &shutdown_info); +} + +/* + * on_exit_nicely handler for shutting down database connections and + * worker processes cleanly. + */ +static void +archive_close_connection(int code, void *arg) +{ + ShutdownInformation *si = (ShutdownInformation *) arg; + + if (si->pstate) + { + /* In parallel mode, must figure out who we are */ + ParallelSlot *slot = GetMyPSlot(si->pstate); + + if (!slot) + { + /* + * We're the leader. Forcibly shut down workers, then close our + * own database connection, if any. + */ + ShutdownWorkersHard(si->pstate); + + if (si->AHX) + DisconnectDatabase(si->AHX); + } + else + { + /* + * We're a worker. Shut down our own DB connection if any. On + * Windows, we also have to close our communication sockets, to + * emulate what will happen on Unix when the worker process exits. + * (Without this, if this is a premature exit, the leader would + * fail to detect it because there would be no EOF condition on + * the other end of the pipe.) + */ + if (slot->AH) + DisconnectDatabase(&(slot->AH->public)); + +#ifdef WIN32 + closesocket(slot->pipeRevRead); + closesocket(slot->pipeRevWrite); +#endif + } + } + else + { + /* Non-parallel operation: just kill the leader DB connection */ + if (si->AHX) + DisconnectDatabase(si->AHX); + } +} + +/* + * Forcibly shut down any remaining workers, waiting for them to finish. + * + * Note that we don't expect to come here during normal exit (the workers + * should be long gone, and the ParallelState too). We're only here in a + * fatal() situation, so intervening to cancel active commands is + * appropriate. + */ +static void +ShutdownWorkersHard(ParallelState *pstate) +{ + int i; + + /* + * Close our write end of the sockets so that any workers waiting for + * commands know they can exit. (Note: some of the pipeWrite fields might + * still be zero, if we failed to initialize all the workers. Hence, just + * ignore errors here.) + */ + for (i = 0; i < pstate->numWorkers; i++) + closesocket(pstate->parallelSlot[i].pipeWrite); + + /* + * Force early termination of any commands currently in progress. + */ +#ifndef WIN32 + /* On non-Windows, send SIGTERM to each worker process. */ + for (i = 0; i < pstate->numWorkers; i++) + { + pid_t pid = pstate->parallelSlot[i].pid; + + if (pid != 0) + kill(pid, SIGTERM); + } +#else + + /* + * On Windows, send query cancels directly to the workers' backends. Use + * a critical section to ensure worker threads don't change state. + */ + EnterCriticalSection(&signal_info_lock); + for (i = 0; i < pstate->numWorkers; i++) + { + ArchiveHandle *AH = pstate->parallelSlot[i].AH; + char errbuf[1]; + + if (AH != NULL && AH->connCancel != NULL) + (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); + } + LeaveCriticalSection(&signal_info_lock); +#endif + + /* Now wait for them to terminate. */ + WaitForTerminatingWorkers(pstate); +} + +/* + * Wait for all workers to terminate. + */ +static void +WaitForTerminatingWorkers(ParallelState *pstate) +{ + while (!HasEveryWorkerTerminated(pstate)) + { + ParallelSlot *slot = NULL; + int j; + +#ifndef WIN32 + /* On non-Windows, use wait() to wait for next worker to end */ + int status; + pid_t pid = wait(&status); + + /* Find dead worker's slot, and clear the PID field */ + for (j = 0; j < pstate->numWorkers; j++) + { + slot = &(pstate->parallelSlot[j]); + if (slot->pid == pid) + { + slot->pid = 0; + break; + } + } +#else /* WIN32 */ + /* On Windows, we must use WaitForMultipleObjects() */ + HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); + int nrun = 0; + DWORD ret; + uintptr_t hThread; + + for (j = 0; j < pstate->numWorkers; j++) + { + if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus)) + { + lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; + nrun++; + } + } + ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE); + Assert(ret != WAIT_FAILED); + hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0]; + free(lpHandles); + + /* Find dead worker's slot, and clear the hThread field */ + for (j = 0; j < pstate->numWorkers; j++) + { + slot = &(pstate->parallelSlot[j]); + if (slot->hThread == hThread) + { + /* For cleanliness, close handles for dead threads */ + CloseHandle((HANDLE) slot->hThread); + slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE; + break; + } + } +#endif /* WIN32 */ + + /* On all platforms, update workerStatus and te[] as well */ + Assert(j < pstate->numWorkers); + slot->workerStatus = WRKR_TERMINATED; + pstate->te[j] = NULL; + } +} + + +/* + * Code for responding to cancel interrupts (SIGINT, control-C, etc) + * + * This doesn't quite belong in this module, but it needs access to the + * ParallelState data, so there's not really a better place either. + * + * When we get a cancel interrupt, we could just die, but in pg_restore that + * could leave a SQL command (e.g., CREATE INDEX on a large table) running + * for a long time. Instead, we try to send a cancel request and then die. + * pg_dump probably doesn't really need this, but we might as well use it + * there too. Note that sending the cancel directly from the signal handler + * is safe because PQcancel() is written to make it so. + * + * In parallel operation on Unix, each process is responsible for canceling + * its own connection (this must be so because nobody else has access to it). + * Furthermore, the leader process should attempt to forward its signal to + * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't + * needed because typing control-C at the console would deliver SIGINT to + * every member of the terminal process group --- but in other scenarios it + * might be that only the leader gets signaled. + * + * On Windows, the cancel handler runs in a separate thread, because that's + * how SetConsoleCtrlHandler works. We make it stop worker threads, send + * cancels on all active connections, and then return FALSE, which will allow + * the process to die. For safety's sake, we use a critical section to + * protect the PGcancel structures against being changed while the signal + * thread runs. + */ + +#ifndef WIN32 + +/* + * Signal handler (Unix only) + */ +static void +sigTermHandler(SIGNAL_ARGS) +{ + int i; + char errbuf[1]; + + /* + * Some platforms allow delivery of new signals to interrupt an active + * signal handler. That could muck up our attempt to send PQcancel, so + * disable the signals that setup_cancel_handler enabled. + */ + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, SIG_IGN); + pqsignal(SIGQUIT, SIG_IGN); + + /* + * If we're in the leader, forward signal to all workers. (It seems best + * to do this before PQcancel; killing the leader transaction will result + * in invalid-snapshot errors from active workers, which maybe we can + * quiet by killing workers first.) Ignore any errors. + */ + if (signal_info.pstate != NULL) + { + for (i = 0; i < signal_info.pstate->numWorkers; i++) + { + pid_t pid = signal_info.pstate->parallelSlot[i].pid; + + if (pid != 0) + kill(pid, SIGTERM); + } + } + + /* + * Send QueryCancel if we have a connection to send to. Ignore errors, + * there's not much we can do about them anyway. + */ + if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) + (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf)); + + /* + * Report we're quitting, using nothing more complicated than write(2). + * When in parallel operation, only the leader process should do this. + */ + if (!signal_info.am_worker) + { + if (progname) + { + write_stderr(progname); + write_stderr(": "); + } + write_stderr("terminated by user\n"); + } + + /* + * And die, using _exit() not exit() because the latter will invoke atexit + * handlers that can fail if we interrupted related code. + */ + _exit(1); +} + +/* + * Enable cancel interrupt handler, if not already done. + */ +static void +setup_cancel_handler(void) +{ + /* + * When forking, signal_info.handler_set will propagate into the new + * process, but that's fine because the signal handler state does too. + */ + if (!signal_info.handler_set) + { + signal_info.handler_set = true; + + pqsignal(SIGINT, sigTermHandler); + pqsignal(SIGTERM, sigTermHandler); + pqsignal(SIGQUIT, sigTermHandler); + } +} + +#else /* WIN32 */ + +/* + * Console interrupt handler --- runs in a newly-started thread. + * + * After stopping other threads and sending cancel requests on all open + * connections, we return FALSE which will allow the default ExitProcess() + * action to be taken. + */ +static BOOL WINAPI +consoleHandler(DWORD dwCtrlType) +{ + int i; + char errbuf[1]; + + if (dwCtrlType == CTRL_C_EVENT || + dwCtrlType == CTRL_BREAK_EVENT) + { + /* Critical section prevents changing data we look at here */ + EnterCriticalSection(&signal_info_lock); + + /* + * If in parallel mode, stop worker threads and send QueryCancel to + * their connected backends. The main point of stopping the worker + * threads is to keep them from reporting the query cancels as errors, + * which would clutter the user's screen. We needn't stop the leader + * thread since it won't be doing much anyway. Do this before + * canceling the main transaction, else we might get invalid-snapshot + * errors reported before we can stop the workers. Ignore errors, + * there's not much we can do about them anyway. + */ + if (signal_info.pstate != NULL) + { + for (i = 0; i < signal_info.pstate->numWorkers; i++) + { + ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]); + ArchiveHandle *AH = slot->AH; + HANDLE hThread = (HANDLE) slot->hThread; + + /* + * Using TerminateThread here may leave some resources leaked, + * but it doesn't matter since we're about to end the whole + * process. + */ + if (hThread != INVALID_HANDLE_VALUE) + TerminateThread(hThread, 0); + + if (AH != NULL && AH->connCancel != NULL) + (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); + } + } + + /* + * Send QueryCancel to leader connection, if enabled. Ignore errors, + * there's not much we can do about them anyway. + */ + if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) + (void) PQcancel(signal_info.myAH->connCancel, + errbuf, sizeof(errbuf)); + + LeaveCriticalSection(&signal_info_lock); + + /* + * Report we're quitting, using nothing more complicated than + * write(2). (We might be able to get away with using pg_log_*() + * here, but since we terminated other threads uncleanly above, it + * seems better to assume as little as possible.) + */ + if (progname) + { + write_stderr(progname); + write_stderr(": "); + } + write_stderr("terminated by user\n"); + } + + /* Always return FALSE to allow signal handling to continue */ + return FALSE; +} + +/* + * Enable cancel interrupt handler, if not already done. + */ +static void +setup_cancel_handler(void) +{ + if (!signal_info.handler_set) + { + signal_info.handler_set = true; + + InitializeCriticalSection(&signal_info_lock); + + SetConsoleCtrlHandler(consoleHandler, TRUE); + } +} + +#endif /* WIN32 */ + + +/* + * set_archive_cancel_info + * + * Fill AH->connCancel with cancellation info for the specified database + * connection; or clear it if conn is NULL. + */ +void +set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn) +{ + PGcancel *oldConnCancel; + + /* + * Activate the interrupt handler if we didn't yet in this process. On + * Windows, this also initializes signal_info_lock; therefore it's + * important that this happen at least once before we fork off any + * threads. + */ + setup_cancel_handler(); + + /* + * On Unix, we assume that storing a pointer value is atomic with respect + * to any possible signal interrupt. On Windows, use a critical section. + */ + +#ifdef WIN32 + EnterCriticalSection(&signal_info_lock); +#endif + + /* Free the old one if we have one */ + oldConnCancel = AH->connCancel; + /* be sure interrupt handler doesn't use pointer while freeing */ + AH->connCancel = NULL; + + if (oldConnCancel != NULL) + PQfreeCancel(oldConnCancel); + + /* Set the new one if specified */ + if (conn) + AH->connCancel = PQgetCancel(conn); + + /* + * On Unix, there's only ever one active ArchiveHandle per process, so we + * can just set signal_info.myAH unconditionally. On Windows, do that + * only in the main thread; worker threads have to make sure their + * ArchiveHandle appears in the pstate data, which is dealt with in + * RunWorker(). + */ +#ifndef WIN32 + signal_info.myAH = AH; +#else + if (mainThreadId == GetCurrentThreadId()) + signal_info.myAH = AH; +#endif + +#ifdef WIN32 + LeaveCriticalSection(&signal_info_lock); +#endif +} + +/* + * set_cancel_pstate + * + * Set signal_info.pstate to point to the specified ParallelState, if any. + * We need this mainly to have an interlock against Windows signal thread. + */ +static void +set_cancel_pstate(ParallelState *pstate) +{ +#ifdef WIN32 + EnterCriticalSection(&signal_info_lock); +#endif + + signal_info.pstate = pstate; + +#ifdef WIN32 + LeaveCriticalSection(&signal_info_lock); +#endif +} + +/* + * set_cancel_slot_archive + * + * Set ParallelSlot's AH field to point to the specified archive, if any. + * We need this mainly to have an interlock against Windows signal thread. + */ +static void +set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH) +{ +#ifdef WIN32 + EnterCriticalSection(&signal_info_lock); +#endif + + slot->AH = AH; + +#ifdef WIN32 + LeaveCriticalSection(&signal_info_lock); +#endif +} + + +/* + * This function is called by both Unix and Windows variants to set up + * and run a worker process. Caller should exit the process (or thread) + * upon return. + */ +static void +RunWorker(ArchiveHandle *AH, ParallelSlot *slot) +{ + int pipefd[2]; + + /* fetch child ends of pipes */ + pipefd[PIPE_READ] = slot->pipeRevRead; + pipefd[PIPE_WRITE] = slot->pipeRevWrite; + + /* + * Clone the archive so that we have our own state to work with, and in + * particular our own database connection. + * + * We clone on Unix as well as Windows, even though technically we don't + * need to because fork() gives us a copy in our own address space + * already. But CloneArchive resets the state information and also clones + * the database connection which both seem kinda helpful. + */ + AH = CloneArchive(AH); + + /* Remember cloned archive where signal handler can find it */ + set_cancel_slot_archive(slot, AH); + + /* + * Call the setup worker function that's defined in the ArchiveHandle. + */ + (AH->SetupWorkerPtr) ((Archive *) AH); + + /* + * Execute commands until done. + */ + WaitForCommands(AH, pipefd); + + /* + * Disconnect from database and clean up. + */ + set_cancel_slot_archive(slot, NULL); + DisconnectDatabase(&(AH->public)); + DeCloneArchive(AH); +} + +/* + * Thread base function for Windows + */ +#ifdef WIN32 +static unsigned __stdcall +init_spawned_worker_win32(WorkerInfo *wi) +{ + ArchiveHandle *AH = wi->AH; + ParallelSlot *slot = wi->slot; + + /* Don't need WorkerInfo anymore */ + free(wi); + + /* Run the worker ... */ + RunWorker(AH, slot); + + /* Exit the thread */ + _endthreadex(0); + return 0; +} +#endif /* WIN32 */ + +/* + * This function starts a parallel dump or restore by spawning off the worker + * processes. For Windows, it creates a number of threads; on Unix the + * workers are created with fork(). + */ +ParallelState * +ParallelBackupStart(ArchiveHandle *AH) +{ + ParallelState *pstate; + int i; + + Assert(AH->public.numWorkers > 0); + + pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); + + pstate->numWorkers = AH->public.numWorkers; + pstate->te = NULL; + pstate->parallelSlot = NULL; + + if (AH->public.numWorkers == 1) + return pstate; + + /* Create status arrays, being sure to initialize all fields to 0 */ + pstate->te = (TocEntry **) + pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); + pstate->parallelSlot = (ParallelSlot *) + pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot)); + +#ifdef WIN32 + /* Make fmtId() and fmtQualifiedId() use thread-local storage */ + getLocalPQExpBuffer = getThreadLocalPQExpBuffer; +#endif + + /* + * Set the pstate in shutdown_info, to tell the exit handler that it must + * clean up workers as well as the main database connection. But we don't + * set this in signal_info yet, because we don't want child processes to + * inherit non-NULL signal_info.pstate. + */ + shutdown_info.pstate = pstate; + + /* + * Temporarily disable query cancellation on the leader connection. This + * ensures that child processes won't inherit valid AH->connCancel + * settings and thus won't try to issue cancels against the leader's + * connection. No harm is done if we fail while it's disabled, because + * the leader connection is idle at this point anyway. + */ + set_archive_cancel_info(AH, NULL); + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + + /* Create desired number of workers */ + for (i = 0; i < pstate->numWorkers; i++) + { +#ifdef WIN32 + WorkerInfo *wi; + uintptr_t handle; +#else + pid_t pid; +#endif + ParallelSlot *slot = &(pstate->parallelSlot[i]); + int pipeMW[2], + pipeWM[2]; + + /* Create communication pipes for this worker */ + if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) + fatal("could not create communication channels: %m"); + + /* leader's ends of the pipes */ + slot->pipeRead = pipeWM[PIPE_READ]; + slot->pipeWrite = pipeMW[PIPE_WRITE]; + /* child's ends of the pipes */ + slot->pipeRevRead = pipeMW[PIPE_READ]; + slot->pipeRevWrite = pipeWM[PIPE_WRITE]; + +#ifdef WIN32 + /* Create transient structure to pass args to worker function */ + wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); + + wi->AH = AH; + wi->slot = slot; + + handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, + wi, 0, &(slot->threadId)); + slot->hThread = handle; + slot->workerStatus = WRKR_IDLE; +#else /* !WIN32 */ + pid = fork(); + if (pid == 0) + { + /* we are the worker */ + int j; + + /* this is needed for GetMyPSlot() */ + slot->pid = getpid(); + + /* instruct signal handler that we're in a worker now */ + signal_info.am_worker = true; + + /* close read end of Worker -> Leader */ + closesocket(pipeWM[PIPE_READ]); + /* close write end of Leader -> Worker */ + closesocket(pipeMW[PIPE_WRITE]); + + /* + * Close all inherited fds for communication of the leader with + * previously-forked workers. + */ + for (j = 0; j < i; j++) + { + closesocket(pstate->parallelSlot[j].pipeRead); + closesocket(pstate->parallelSlot[j].pipeWrite); + } + + /* Run the worker ... */ + RunWorker(AH, slot); + + /* We can just exit(0) when done */ + exit(0); + } + else if (pid < 0) + { + /* fork failed */ + fatal("could not create worker process: %m"); + } + + /* In Leader after successful fork */ + slot->pid = pid; + slot->workerStatus = WRKR_IDLE; + + /* close read end of Leader -> Worker */ + closesocket(pipeMW[PIPE_READ]); + /* close write end of Worker -> Leader */ + closesocket(pipeWM[PIPE_WRITE]); +#endif /* WIN32 */ + } + + /* + * Having forked off the workers, disable SIGPIPE so that leader isn't + * killed if it tries to send a command to a dead worker. We don't want + * the workers to inherit this setting, though. + */ +#ifndef WIN32 + pqsignal(SIGPIPE, SIG_IGN); +#endif + + /* + * Re-establish query cancellation on the leader connection. + */ + set_archive_cancel_info(AH, AH->connection); + + /* + * Tell the cancel signal handler to forward signals to worker processes, + * too. (As with query cancel, we did not need this earlier because the + * workers have not yet been given anything to do; if we die before this + * point, any already-started workers will see EOF and quit promptly.) + */ + set_cancel_pstate(pstate); + + return pstate; +} + +/* + * Close down a parallel dump or restore. + */ +void +ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) +{ + int i; + + /* No work if non-parallel */ + if (pstate->numWorkers == 1) + return; + + /* There should not be any unfinished jobs */ + Assert(IsEveryWorkerIdle(pstate)); + + /* Close the sockets so that the workers know they can exit */ + for (i = 0; i < pstate->numWorkers; i++) + { + closesocket(pstate->parallelSlot[i].pipeRead); + closesocket(pstate->parallelSlot[i].pipeWrite); + } + + /* Wait for them to exit */ + WaitForTerminatingWorkers(pstate); + + /* + * Unlink pstate from shutdown_info, so the exit handler will not try to + * use it; and likewise unlink from signal_info. + */ + shutdown_info.pstate = NULL; + set_cancel_pstate(NULL); + + /* Release state (mere neatnik-ism, since we're about to terminate) */ + free(pstate->te); + free(pstate->parallelSlot); + free(pstate); +} + +/* + * These next four functions handle construction and parsing of the command + * strings and response strings for parallel workers. + * + * Currently, these can be the same regardless of which archive format we are + * processing. In future, we might want to let format modules override these + * functions to add format-specific data to a command or response. + */ + +/* + * buildWorkerCommand: format a command string to send to a worker. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, + char *buf, int buflen) +{ + if (act == ACT_DUMP) + snprintf(buf, buflen, "DUMP %d", te->dumpId); + else if (act == ACT_RESTORE) + snprintf(buf, buflen, "RESTORE %d", te->dumpId); + else + Assert(false); +} + +/* + * parseWorkerCommand: interpret a command string in a worker. + */ +static void +parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, + const char *msg) +{ + DumpId dumpId; + int nBytes; + + if (messageStartsWith(msg, "DUMP ")) + { + *act = ACT_DUMP; + sscanf(msg, "DUMP %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } + else if (messageStartsWith(msg, "RESTORE ")) + { + *act = ACT_RESTORE; + sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } + else + fatal("unrecognized command received from leader: \"%s\"", + msg); +} + +/* + * buildWorkerResponse: format a response string to send to the leader. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, + char *buf, int buflen) +{ + snprintf(buf, buflen, "OK %d %d %d", + te->dumpId, + status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); +} + +/* + * parseWorkerResponse: parse the status message returned by a worker. + * + * Returns the integer status code, and may update fields of AH and/or te. + */ +static int +parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, + const char *msg) +{ + DumpId dumpId; + int nBytes, + n_errors; + int status = 0; + + if (messageStartsWith(msg, "OK ")) + { + sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(dumpId == te->dumpId); + Assert(nBytes == strlen(msg)); + + AH->public.n_errors += n_errors; + } + else + fatal("invalid message received from worker: \"%s\"", + msg); + + return status; +} + +/* + * Dispatch a job to some free worker. + * + * te is the TocEntry to be processed, act is the action to be taken on it. + * callback is the function to call on completion of the job. + * + * If no worker is currently available, this will block, and previously + * registered callback functions may be called. + */ +void +DispatchJobForTocEntry(ArchiveHandle *AH, + ParallelState *pstate, + TocEntry *te, + T_Action act, + ParallelCompletionPtr callback, + void *callback_data) +{ + int worker; + char buf[256]; + + /* Get a worker, waiting if none are idle */ + while ((worker = GetIdleWorker(pstate)) == NO_SLOT) + WaitForWorkers(AH, pstate, WFW_ONE_IDLE); + + /* Construct and send command string */ + buildWorkerCommand(AH, te, act, buf, sizeof(buf)); + + sendMessageToWorker(pstate, worker, buf); + + /* Remember worker is busy, and which TocEntry it's working on */ + pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; + pstate->parallelSlot[worker].callback = callback; + pstate->parallelSlot[worker].callback_data = callback_data; + pstate->te[worker] = te; +} + +/* + * Find an idle worker and return its slot number. + * Return NO_SLOT if none are idle. + */ +static int +GetIdleWorker(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->numWorkers; i++) + { + if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) + return i; + } + return NO_SLOT; +} + +/* + * Return true iff no worker is running. + */ +static bool +HasEveryWorkerTerminated(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->numWorkers; i++) + { + if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + return false; + } + return true; +} + +/* + * Return true iff every worker is in the WRKR_IDLE state. + */ +bool +IsEveryWorkerIdle(ParallelState *pstate) +{ + int i; + + for (i = 0; i < pstate->numWorkers; i++) + { + if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) + return false; + } + return true; +} + +/* + * Acquire lock on a table to be dumped by a worker process. + * + * The leader process is already holding an ACCESS SHARE lock. Ordinarily + * it's no problem for a worker to get one too, but if anything else besides + * pg_dump is running, there's a possible deadlock: + * + * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode. + * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted + * because the leader holds a conflicting ACCESS SHARE lock). + * 3) A worker process also requests an ACCESS SHARE lock to read the table. + * The worker is enqueued behind the ACCESS EXCLUSIVE lock request. + * 4) Now we have a deadlock, since the leader is effectively waiting for + * the worker. The server cannot detect that, however. + * + * To prevent an infinite wait, prior to touching a table in a worker, request + * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock, + * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and + * so we have a deadlock. We must fail the backup in that case. + */ +static void +lockTableForWorker(ArchiveHandle *AH, TocEntry *te) +{ + const char *qualId; + PQExpBuffer query; + PGresult *res; + + /* Nothing to do for BLOBS */ + if (strcmp(te->desc, "BLOBS") == 0) + return; + + query = createPQExpBuffer(); + + qualId = fmtQualifiedId(te->namespace, te->tag); + + appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT", + qualId); + + res = PQexec(AH->connection, query->data); + + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) + fatal("could not obtain lock on relation \"%s\"\n" + "This usually means that someone requested an ACCESS EXCLUSIVE lock " + "on the table after the pg_dump parent process had gotten the " + "initial ACCESS SHARE lock on the table.", qualId); + + PQclear(res); + destroyPQExpBuffer(query); +} + +/* + * WaitForCommands: main routine for a worker process. + * + * Read and execute commands from the leader until we see EOF on the pipe. + */ +static void +WaitForCommands(ArchiveHandle *AH, int pipefd[2]) +{ + char *command; + TocEntry *te; + T_Action act; + int status = 0; + char buf[256]; + + for (;;) + { + if (!(command = getMessageFromLeader(pipefd))) + { + /* EOF, so done */ + return; + } + + /* Decode the command */ + parseWorkerCommand(AH, &te, &act, command); + + if (act == ACT_DUMP) + { + /* Acquire lock on this table within the worker's session */ + lockTableForWorker(AH, te); + + /* Perform the dump command */ + status = (AH->WorkerJobDumpPtr) (AH, te); + } + else if (act == ACT_RESTORE) + { + /* Perform the restore command */ + status = (AH->WorkerJobRestorePtr) (AH, te); + } + else + Assert(false); + + /* Return status to leader */ + buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); + + sendMessageToLeader(pipefd, buf); + + /* command was pg_malloc'd and we are responsible for free()ing it. */ + free(command); + } +} + +/* + * Check for status messages from workers. + * + * If do_wait is true, wait to get a status message; otherwise, just return + * immediately if there is none available. + * + * When we get a status message, we pass the status code to the callback + * function that was specified to DispatchJobForTocEntry, then reset the + * worker status to IDLE. + * + * Returns true if we collected a status message, else false. + * + * XXX is it worth checking for more than one status message per call? + * It seems somewhat unlikely that multiple workers would finish at exactly + * the same time. + */ +static bool +ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) +{ + int worker; + char *msg; + + /* Try to collect a status message */ + msg = getMessageFromWorker(pstate, do_wait, &worker); + + if (!msg) + { + /* If do_wait is true, we must have detected EOF on some socket */ + if (do_wait) + fatal("a worker process died unexpectedly"); + return false; + } + + /* Process it and update our idea of the worker's status */ + if (messageStartsWith(msg, "OK ")) + { + ParallelSlot *slot = &pstate->parallelSlot[worker]; + TocEntry *te = pstate->te[worker]; + int status; + + status = parseWorkerResponse(AH, te, msg); + slot->callback(AH, te, status, slot->callback_data); + slot->workerStatus = WRKR_IDLE; + pstate->te[worker] = NULL; + } + else + fatal("invalid message received from worker: \"%s\"", + msg); + + /* Free the string returned from getMessageFromWorker */ + free(msg); + + return true; +} + +/* + * Check for status results from workers, waiting if necessary. + * + * Available wait modes are: + * WFW_NO_WAIT: reap any available status, but don't block + * WFW_GOT_STATUS: wait for at least one more worker to finish + * WFW_ONE_IDLE: wait for at least one worker to be idle + * WFW_ALL_IDLE: wait for all workers to be idle + * + * Any received results are passed to the callback specified to + * DispatchJobForTocEntry. + * + * This function is executed in the leader process. + */ +void +WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) +{ + bool do_wait = false; + + /* + * In GOT_STATUS mode, always block waiting for a message, since we can't + * return till we get something. In other modes, we don't block the first + * time through the loop. + */ + if (mode == WFW_GOT_STATUS) + { + /* Assert that caller knows what it's doing */ + Assert(!IsEveryWorkerIdle(pstate)); + do_wait = true; + } + + for (;;) + { + /* + * Check for status messages, even if we don't need to block. We do + * not try very hard to reap all available messages, though, since + * there's unlikely to be more than one. + */ + if (ListenToWorkers(AH, pstate, do_wait)) + { + /* + * If we got a message, we are done by definition for GOT_STATUS + * mode, and we can also be certain that there's at least one idle + * worker. So we're done in all but ALL_IDLE mode. + */ + if (mode != WFW_ALL_IDLE) + return; + } + + /* Check whether we must wait for new status messages */ + switch (mode) + { + case WFW_NO_WAIT: + return; /* never wait */ + case WFW_GOT_STATUS: + Assert(false); /* can't get here, because we waited */ + break; + case WFW_ONE_IDLE: + if (GetIdleWorker(pstate) != NO_SLOT) + return; + break; + case WFW_ALL_IDLE: + if (IsEveryWorkerIdle(pstate)) + return; + break; + } + + /* Loop back, and this time wait for something to happen */ + do_wait = true; + } +} + +/* + * Read one command message from the leader, blocking if necessary + * until one is available, and return it as a malloc'd string. + * On EOF, return NULL. + * + * This function is executed in worker processes. + */ +static char * +getMessageFromLeader(int pipefd[2]) +{ + return readMessageFromPipe(pipefd[PIPE_READ]); +} + +/* + * Send a status message to the leader. + * + * This function is executed in worker processes. + */ +static void +sendMessageToLeader(int pipefd[2], const char *str) +{ + int len = strlen(str) + 1; + + if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) + fatal("could not write to the communication channel: %m"); +} + +/* + * Wait until some descriptor in "workerset" becomes readable. + * Returns -1 on error, else the number of readable descriptors. + */ +static int +select_loop(int maxFd, fd_set *workerset) +{ + int i; + fd_set saveSet = *workerset; + + for (;;) + { + *workerset = saveSet; + i = select(maxFd + 1, workerset, NULL, NULL, NULL); + +#ifndef WIN32 + if (i < 0 && errno == EINTR) + continue; +#else + if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) + continue; +#endif + break; + } + + return i; +} + + +/* + * Check for messages from worker processes. + * + * If a message is available, return it as a malloc'd string, and put the + * index of the sending worker in *worker. + * + * If nothing is available, wait if "do_wait" is true, else return NULL. + * + * If we detect EOF on any socket, we'll return NULL. It's not great that + * that's hard to distinguish from the no-data-available case, but for now + * our one caller is okay with that. + * + * This function is executed in the leader process. + */ +static char * +getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) +{ + int i; + fd_set workerset; + int maxFd = -1; + struct timeval nowait = {0, 0}; + + /* construct bitmap of socket descriptors for select() */ + FD_ZERO(&workerset); + for (i = 0; i < pstate->numWorkers; i++) + { + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; + FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); + if (pstate->parallelSlot[i].pipeRead > maxFd) + maxFd = pstate->parallelSlot[i].pipeRead; + } + + if (do_wait) + { + i = select_loop(maxFd, &workerset); + Assert(i != 0); + } + else + { + if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) + return NULL; + } + + if (i < 0) + fatal("%s() failed: %m", "select"); + + for (i = 0; i < pstate->numWorkers; i++) + { + char *msg; + + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; + if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) + continue; + + /* + * Read the message if any. If the socket is ready because of EOF, + * we'll return NULL instead (and the socket will stay ready, so the + * condition will persist). + * + * Note: because this is a blocking read, we'll wait if only part of + * the message is available. Waiting a long time would be bad, but + * since worker status messages are short and are always sent in one + * operation, it shouldn't be a problem in practice. + */ + msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); + *worker = i; + return msg; + } + Assert(false); + return NULL; +} + +/* + * Send a command message to the specified worker process. + * + * This function is executed in the leader process. + */ +static void +sendMessageToWorker(ParallelState *pstate, int worker, const char *str) +{ + int len = strlen(str) + 1; + + if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) + { + fatal("could not write to the communication channel: %m"); + } +} + +/* + * Read one message from the specified pipe (fd), blocking if necessary + * until one is available, and return it as a malloc'd string. + * On EOF, return NULL. + * + * A "message" on the channel is just a null-terminated string. + */ +static char * +readMessageFromPipe(int fd) +{ + char *msg; + int msgsize, + bufsize; + int ret; + + /* + * In theory, if we let piperead() read multiple bytes, it might give us + * back fragments of multiple messages. (That can't actually occur, since + * neither leader nor workers send more than one message without waiting + * for a reply, but we don't wish to assume that here.) For simplicity, + * read a byte at a time until we get the terminating '\0'. This method + * is a bit inefficient, but since this is only used for relatively short + * command and status strings, it shouldn't matter. + */ + bufsize = 64; /* could be any number */ + msg = (char *) pg_malloc(bufsize); + msgsize = 0; + for (;;) + { + Assert(msgsize < bufsize); + ret = piperead(fd, msg + msgsize, 1); + if (ret <= 0) + break; /* error or connection closure */ + + Assert(ret == 1); + + if (msg[msgsize] == '\0') + return msg; /* collected whole message */ + + msgsize++; + if (msgsize == bufsize) /* enlarge buffer if needed */ + { + bufsize += 16; /* could be any number */ + msg = (char *) pg_realloc(msg, bufsize); + } + } + + /* Other end has closed the connection */ + pg_free(msg); + return NULL; +} + +#ifdef WIN32 + +/* + * This is a replacement version of pipe(2) for Windows which allows the pipe + * handles to be used in select(). + * + * Reads and writes on the pipe must go through piperead()/pipewrite(). + * + * For consistency with Unix we declare the returned handles as "int". + * This is okay even on WIN64 because system handles are not more than + * 32 bits wide, but we do have to do some casting. + */ +static int +pgpipe(int handles[2]) +{ + pgsocket s, + tmp_sock; + struct sockaddr_in serv_addr; + int len = sizeof(serv_addr); + + /* We have to use the Unix socket invalid file descriptor value here. */ + handles[0] = handles[1] = -1; + + /* + * setup listen socket + */ + if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) + { + pg_log_error("pgpipe: could not create socket: error code %d", + WSAGetLastError()); + return -1; + } + + memset((void *) &serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = pg_hton16(0); + serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK); + if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) + { + pg_log_error("pgpipe: could not bind: error code %d", + WSAGetLastError()); + closesocket(s); + return -1; + } + if (listen(s, 1) == SOCKET_ERROR) + { + pg_log_error("pgpipe: could not listen: error code %d", + WSAGetLastError()); + closesocket(s); + return -1; + } + if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR) + { + pg_log_error("pgpipe: %s() failed: error code %d", "getsockname", + WSAGetLastError()); + closesocket(s); + return -1; + } + + /* + * setup pipe handles + */ + if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) + { + pg_log_error("pgpipe: could not create second socket: error code %d", + WSAGetLastError()); + closesocket(s); + return -1; + } + handles[1] = (int) tmp_sock; + + if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) + { + pg_log_error("pgpipe: could not connect socket: error code %d", + WSAGetLastError()); + closesocket(handles[1]); + handles[1] = -1; + closesocket(s); + return -1; + } + if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET) + { + pg_log_error("pgpipe: could not accept connection: error code %d", + WSAGetLastError()); + closesocket(handles[1]); + handles[1] = -1; + closesocket(s); + return -1; + } + handles[0] = (int) tmp_sock; + + closesocket(s); + return 0; +} + +#endif /* WIN32 */ |