diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/logical/launcher.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 1024 |
1 files changed, 1024 insertions, 0 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c new file mode 100644 index 0000000..3c69817 --- /dev/null +++ b/src/backend/replication/logical/launcher.c @@ -0,0 +1,1024 @@ +/*------------------------------------------------------------------------- + * launcher.c + * PostgreSQL logical replication worker launcher process + * + * Copyright (c) 2016-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/launcher.c + * + * NOTES + * This module contains the logical replication worker launcher which + * uses the background worker infrastructure to start the logical + * replication workers for every enabled subscription. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" +#include "funcapi.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/slot.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "tcop/tcopprot.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/ps_status.h" +#include "utils/snapmgr.h" +#include "utils/timeout.h" + +/* max sleep time between cycles (3min) */ +#define DEFAULT_NAPTIME_PER_CYCLE 180000L + +int max_logical_replication_workers = 4; +int max_sync_workers_per_subscription = 2; + +LogicalRepWorker *MyLogicalRepWorker = NULL; + +typedef struct LogicalRepCtxStruct +{ + /* Supervisor process. */ + pid_t launcher_pid; + + /* Background workers. */ + LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} LogicalRepCtxStruct; + +LogicalRepCtxStruct *LogicalRepCtx; + +static void ApplyLauncherWakeup(void); +static void logicalrep_launcher_onexit(int code, Datum arg); +static void logicalrep_worker_onexit(int code, Datum arg); +static void logicalrep_worker_detach(void); +static void logicalrep_worker_cleanup(LogicalRepWorker *worker); + +static bool on_commit_launcher_wakeup = false; + +Datum pg_stat_get_subscription(PG_FUNCTION_ARGS); + + +/* + * Load the list of subscriptions. + * + * Only the fields interesting for worker start/stop functions are filled for + * each subscription. + */ +static List * +get_subscription_list(void) +{ + List *res = NIL; + Relation rel; + TableScanDesc scan; + HeapTuple tup; + MemoryContext resultcxt; + + /* This is the context that we will allocate our output data in */ + resultcxt = CurrentMemoryContext; + + /* + * Start a transaction so we can access pg_database, and get a snapshot. + * We don't have a use for the snapshot itself, but we're interested in + * the secondary effect that it sets RecentGlobalXmin. (This is critical + * for anything that reads heap pages, because HOT may decide to prune + * them even if the process doesn't attempt to modify any tuples.) + * + * FIXME: This comment is inaccurate / the code buggy. A snapshot that is + * not pushed/active does not reliably prevent HOT pruning (->xmin could + * e.g. be cleared when cache invalidations are processed). + */ + StartTransactionCommand(); + (void) GetTransactionSnapshot(); + + rel = table_open(SubscriptionRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); + Subscription *sub; + MemoryContext oldcxt; + + /* + * Allocate our results in the caller's context, not the + * transaction's. We do this inside the loop, and restore the original + * context at the end, so that leaky things like heap_getnext() are + * not called in a potentially long-lived context. + */ + oldcxt = MemoryContextSwitchTo(resultcxt); + + sub = (Subscription *) palloc0(sizeof(Subscription)); + sub->oid = subform->oid; + sub->dbid = subform->subdbid; + sub->owner = subform->subowner; + sub->enabled = subform->subenabled; + sub->name = pstrdup(NameStr(subform->subname)); + /* We don't fill fields we are not interested in. */ + + res = lappend(res, sub); + MemoryContextSwitchTo(oldcxt); + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + CommitTransactionCommand(); + + return res; +} + +/* + * Wait for a background worker to start up and attach to the shmem context. + * + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. + */ +static void +WaitForReplicationWorkerAttach(LogicalRepWorker *worker, + uint16 generation, + BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Worker either died or has started; no need to do anything. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + LWLockRelease(LogicalRepWorkerLock); + + /* Check if worker has died before attaching, and clean up after it. */ + status = GetBackgroundWorkerPid(handle, &pid); + + if (status == BGWH_STOPPED) + { + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * We need timeout because we generally don't get notified via latch + * about the worker attach. But we don't expect to have to wait long. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } +} + +/* + * Walks the workers array and searches for one that matches given + * subscription id and relid. + */ +LogicalRepWorker * +logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +{ + int i; + LogicalRepWorker *res = NULL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && w->relid == relid && + (!only_running || w->proc)) + { + res = w; + break; + } + } + + return res; +} + +/* + * Similar to logicalrep_worker_find(), but returns list of all workers for + * the subscription, instead just one. + */ +List * +logicalrep_workers_find(Oid subid, bool only_running) +{ + int i; + List *res = NIL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && (!only_running || w->proc)) + res = lappend(res, w); + } + + return res; +} + +/* + * Start new apply background worker, if possible. + */ +void +logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, + Oid relid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + int i; + int slot = 0; + LogicalRepWorker *worker = NULL; + int nsyncworkers; + TimestampTz now; + + ereport(DEBUG1, + (errmsg_internal("starting logical replication worker for subscription \"%s\"", + subname))); + + /* Report this after the initial starting message for consistency. */ + if (max_replication_slots == 0) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("cannot start logical replication workers when max_replication_slots = 0"))); + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + +retry: + /* Find unused worker slot. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (!w->in_use) + { + worker = w; + slot = i; + break; + } + } + + nsyncworkers = logicalrep_sync_worker_count(subid); + + now = GetCurrentTimestamp(); + + /* + * If we didn't find a free slot, try to do garbage collection. The + * reason we do this is because if some worker failed to start up and its + * parent has crashed while waiting, the in_use state was never cleared. + */ + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + { + bool did_cleanup = false; + + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* + * If the worker was marked in use but didn't manage to attach in + * time, clean it up. + */ + if (w->in_use && !w->proc && + TimestampDifferenceExceeds(w->launch_time, now, + wal_receiver_timeout)) + { + elog(WARNING, + "logical replication worker for subscription %u took too long to start; canceled", + w->subid); + + logicalrep_worker_cleanup(w); + did_cleanup = true; + } + } + + if (did_cleanup) + goto retry; + } + + /* + * We don't allow to invoke more sync workers once we have reached the sync + * worker limit per subscription. So, just return silently as we might get + * here because of an otherwise harmless race condition. + */ + if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * However if there are no more free worker slots, inform user about it + * before exiting. + */ + if (worker == NULL) + { + LWLockRelease(LogicalRepWorkerLock); + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of logical replication worker slots"), + errhint("You might need to increase max_logical_replication_workers."))); + return; + } + + /* Prepare the worker slot. */ + worker->launch_time = now; + worker->in_use = true; + worker->generation++; + worker->proc = NULL; + worker->dbid = dbid; + worker->userid = userid; + worker->subid = subid; + worker->relid = relid; + worker->relstate = SUBREL_STATE_UNKNOWN; + worker->relstate_lsn = InvalidXLogRecPtr; + worker->last_lsn = InvalidXLogRecPtr; + TIMESTAMP_NOBEGIN(worker->last_send_time); + TIMESTAMP_NOBEGIN(worker->last_recv_time); + worker->reply_lsn = InvalidXLogRecPtr; + TIMESTAMP_NOBEGIN(worker->reply_time); + + /* Before releasing lock, remember generation for future identification. */ + generation = worker->generation; + + LWLockRelease(LogicalRepWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + if (OidIsValid(relid)) + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker for subscription %u sync %u", subid, relid); + else + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker for subscription %u", subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->generation); + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase max_worker_processes."))); + return; + } + + /* Now wait until it attaches. */ + WaitForReplicationWorkerAttach(worker, generation, bgw_handle); +} + +/* + * Stop the logical replication worker for subid/relid, if any, and wait until + * it detaches from the slot. + */ +void +logicalrep_worker_stop(Oid subid, Oid relid) +{ + LogicalRepWorker *worker; + uint16 generation; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(subid, relid, false); + + /* No worker, nothing to do. */ + if (!worker) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * Remember which generation was our worker so we can check if what we see + * is still the same one. + */ + generation = worker->generation; + + /* + * If we found a worker but it does not have proc set then it is still + * starting up; wait for it to finish starting and then kill it. + */ + while (worker->in_use && !worker->proc) + { + int rc; + + LWLockRelease(LogicalRepWorkerLock); + + /* Wait a bit --- we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Recheck worker status. */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* + * Check whether the worker slot is no longer used, which would mean + * that the worker has exited, or whether the worker generation is + * different, meaning that a different worker has taken the slot. + */ + if (!worker->in_use || worker->generation != generation) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* Worker has assigned proc, so it has started. */ + if (worker->proc) + break; + } + + /* Now terminate the worker ... */ + kill(worker->proc->pid, SIGTERM); + + /* ... and wait for it to die. */ + for (;;) + { + int rc; + + /* is it gone? */ + if (!worker->proc || worker->generation != generation) + break; + + LWLockRelease(LogicalRepWorkerLock); + + /* Wait a bit --- we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + } + + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Wake up (using latch) any logical replication worker for specified sub/rel. + */ +void +logicalrep_worker_wakeup(Oid subid, Oid relid) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(subid, relid, true); + + if (worker) + logicalrep_worker_wakeup_ptr(worker); + + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Wake up (using latch) the specified logical replication worker. + * + * Caller must hold lock, else worker->proc could change under us. + */ +void +logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) +{ + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + SetLatch(&worker->proc->procLatch); +} + +/* + * Attach to a slot. + */ +void +logicalrep_worker_attach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_logical_replication_workers); + MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; + + if (!MyLogicalRepWorker->in_use) + { + LWLockRelease(LogicalRepWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is empty, cannot attach", + slot))); + } + + if (MyLogicalRepWorker->proc) + { + LWLockRelease(LogicalRepWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication worker slot %d is already used by " + "another worker, cannot attach", slot))); + } + + MyLogicalRepWorker->proc = MyProc; + before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); + + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Detach the worker (cleans up the worker info). + */ +static void +logicalrep_worker_detach(void) +{ + /* Block concurrent access. */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + + logicalrep_worker_cleanup(MyLogicalRepWorker); + + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Clean up worker info. + */ +static void +logicalrep_worker_cleanup(LogicalRepWorker *worker) +{ + Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); + + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; + worker->userid = InvalidOid; + worker->subid = InvalidOid; + worker->relid = InvalidOid; +} + +/* + * Cleanup function for logical replication launcher. + * + * Called on logical replication launcher exit. + */ +static void +logicalrep_launcher_onexit(int code, Datum arg) +{ + LogicalRepCtx->launcher_pid = 0; +} + +/* + * Cleanup function. + * + * Called on logical replication worker exit. + */ +static void +logicalrep_worker_onexit(int code, Datum arg) +{ + /* Disconnect gracefully from the remote side. */ + if (LogRepWorkerWalRcvConn) + walrcv_disconnect(LogRepWorkerWalRcvConn); + + logicalrep_worker_detach(); + + ApplyLauncherWakeup(); +} + +/* + * Count the number of registered (not necessarily running) sync workers + * for a subscription. + */ +int +logicalrep_sync_worker_count(Oid subid) +{ + int i; + int res = 0; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->subid == subid && OidIsValid(w->relid)) + res++; + } + + return res; +} + +/* + * ApplyLauncherShmemSize + * Compute space needed for replication launcher shared memory + */ +Size +ApplyLauncherShmemSize(void) +{ + Size size; + + /* + * Need the fixed struct and the array of LogicalRepWorker. + */ + size = sizeof(LogicalRepCtxStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_logical_replication_workers, + sizeof(LogicalRepWorker))); + return size; +} + +/* + * ApplyLauncherRegister + * Register a background worker running the logical replication launcher. + */ +void +ApplyLauncherRegister(void) +{ + BackgroundWorker bgw; + + if (max_logical_replication_workers == 0) + return; + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication launcher"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "logical replication launcher"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * ApplyLauncherShmemInit + * Allocate and initialize replication launcher shared memory + */ +void +ApplyLauncherShmemInit(void) +{ + bool found; + + LogicalRepCtx = (LogicalRepCtxStruct *) + ShmemInitStruct("Logical Replication Launcher Data", + ApplyLauncherShmemSize(), + &found); + + if (!found) + { + int slot; + + memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); + + /* Initialize memory and spin locks for each worker slot. */ + for (slot = 0; slot < max_logical_replication_workers; slot++) + { + LogicalRepWorker *worker = &LogicalRepCtx->workers[slot]; + + memset(worker, 0, sizeof(LogicalRepWorker)); + SpinLockInit(&worker->relmutex); + } + } +} + +/* + * Wakeup the launcher on commit if requested. + */ +void +AtEOXact_ApplyLauncher(bool isCommit) +{ + if (isCommit) + { + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup(); + } + + on_commit_launcher_wakeup = false; +} + +/* + * Request wakeup of the launcher on commit of the transaction. + * + * This is used to send launcher signal to stop sleeping and process the + * subscriptions when current transaction commits. Should be used when new + * tuple was added to the pg_subscription catalog. +*/ +void +ApplyLauncherWakeupAtCommit(void) +{ + if (!on_commit_launcher_wakeup) + on_commit_launcher_wakeup = true; +} + +static void +ApplyLauncherWakeup(void) +{ + if (LogicalRepCtx->launcher_pid != 0) + kill(LogicalRepCtx->launcher_pid, SIGUSR1); +} + +/* + * Main loop for the apply launcher process. + */ +void +ApplyLauncherMain(Datum main_arg) +{ + TimestampTz last_start_time = 0; + + ereport(DEBUG1, + (errmsg_internal("logical replication launcher started"))); + + before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); + + Assert(LogicalRepCtx->launcher_pid == 0); + LogicalRepCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Establish connection to nailed catalogs (we only ever access + * pg_subscription). + */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* Enter main loop */ + for (;;) + { + int rc; + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + TimestampTz now; + long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + + CHECK_FOR_INTERRUPTS(); + + now = GetCurrentTimestamp(); + + /* Limit the start retry to once a wal_retrieve_retry_interval */ + if (TimestampDifferenceExceeds(last_start_time, now, + wal_retrieve_retry_interval)) + { + /* Use temporary context for the database list and worker info. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* search for subscriptions to start or stop. */ + sublist = get_subscription_list(); + + /* Start the missing workers for enabled subscriptions. */ + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w == NULL) + { + last_start_time = now; + wait_time = wal_retrieve_retry_interval; + + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + } + else + { + /* + * The wait in previous cycle was interrupted in less than + * wal_retrieve_retry_interval since last worker was started, this + * usually means crash of the worker, so we should retry in + * wal_retrieve_retry_interval again. + */ + wait_time = wal_retrieve_retry_interval; + } + + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + wait_time, + WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + + /* Not reachable */ +} + +/* + * Is current process the logical replication launcher? + */ +bool +IsLogicalLauncher(void) +{ + return LogicalRepCtx->launcher_pid == MyProcPid; +} + +/* + * Returns state of the subscriptions. + */ +Datum +pg_stat_get_subscription(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_SUBSCRIPTION_COLS 8 + Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); + int i; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + /* Make sure we get consistent view of the workers. */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + for (i = 0; i < max_logical_replication_workers; i++) + { + /* for each row */ + Datum values[PG_STAT_GET_SUBSCRIPTION_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS]; + int worker_pid; + LogicalRepWorker worker; + + memcpy(&worker, &LogicalRepCtx->workers[i], + sizeof(LogicalRepWorker)); + if (!worker.proc || !IsBackendPid(worker.proc->pid)) + continue; + + if (OidIsValid(subid) && worker.subid != subid) + continue; + + worker_pid = worker.proc->pid; + + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(worker.subid); + if (OidIsValid(worker.relid)) + values[1] = ObjectIdGetDatum(worker.relid); + else + nulls[1] = true; + values[2] = Int32GetDatum(worker_pid); + if (XLogRecPtrIsInvalid(worker.last_lsn)) + nulls[3] = true; + else + values[3] = LSNGetDatum(worker.last_lsn); + if (worker.last_send_time == 0) + nulls[4] = true; + else + values[4] = TimestampTzGetDatum(worker.last_send_time); + if (worker.last_recv_time == 0) + nulls[5] = true; + else + values[5] = TimestampTzGetDatum(worker.last_recv_time); + if (XLogRecPtrIsInvalid(worker.reply_lsn)) + nulls[6] = true; + else + values[6] = LSNGetDatum(worker.reply_lsn); + if (worker.reply_time == 0) + nulls[7] = true; + else + values[7] = TimestampTzGetDatum(worker.reply_time); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + + /* + * If only a single subscription was requested, and we found it, + * break. + */ + if (OidIsValid(subid)) + break; + } + + LWLockRelease(LogicalRepWorkerLock); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} |