summaryrefslogtreecommitdiffstats
path: root/src/backend/replication/walreceiverfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r--src/backend/replication/walreceiverfuncs.c408
1 files changed, 408 insertions, 0 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
new file mode 100644
index 0000000..8305022
--- /dev/null
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -0,0 +1,408 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiverfuncs.c
+ *
+ * This file contains functions used by the startup process to communicate
+ * with the walreceiver process. Functions implementing walreceiver itself
+ * are in walreceiver.c.
+ *
+ * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/walreceiverfuncs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <time.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
+#include "pgstat.h"
+#include "postmaster/startup.h"
+#include "replication/walreceiver.h"
+#include "storage/pmsignal.h"
+#include "storage/shmem.h"
+#include "utils/timestamp.h"
+
+WalRcvData *WalRcv = NULL;
+
+/*
+ * How long to wait for walreceiver to start up after requesting
+ * postmaster to launch it. In seconds.
+ */
+#define WALRCV_STARTUP_TIMEOUT 10
+
+/* Report shared memory space needed by WalRcvShmemInit */
+Size
+WalRcvShmemSize(void)
+{
+ Size size = 0;
+
+ size = add_size(size, sizeof(WalRcvData));
+
+ return size;
+}
+
+/* Allocate and initialize walreceiver-related shared memory */
+void
+WalRcvShmemInit(void)
+{
+ bool found;
+
+ WalRcv = (WalRcvData *)
+ ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
+
+ if (!found)
+ {
+ /* First time through, so initialize */
+ MemSet(WalRcv, 0, WalRcvShmemSize());
+ WalRcv->walRcvState = WALRCV_STOPPED;
+ ConditionVariableInit(&WalRcv->walRcvStoppedCV);
+ SpinLockInit(&WalRcv->mutex);
+ pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+ WalRcv->latch = NULL;
+ }
+}
+
+/* Is walreceiver running (or starting up)? */
+bool
+WalRcvRunning(void)
+{
+ WalRcvData *walrcv = WalRcv;
+ WalRcvState state;
+ pg_time_t startTime;
+
+ SpinLockAcquire(&walrcv->mutex);
+
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
+
+ SpinLockRelease(&walrcv->mutex);
+
+ /*
+ * If it has taken too long for walreceiver to start up, give up. Setting
+ * the state to STOPPED ensures that if walreceiver later does start up
+ * after all, it will see that it's not supposed to be running and die
+ * without doing anything.
+ */
+ if (state == WALRCV_STARTING)
+ {
+ pg_time_t now = (pg_time_t) time(NULL);
+
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
+ {
+ bool stopped = false;
+
+ SpinLockAcquire(&walrcv->mutex);
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ {
+ state = walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
+ }
+ SpinLockRelease(&walrcv->mutex);
+
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+ }
+ }
+
+ if (state != WALRCV_STOPPED)
+ return true;
+ else
+ return false;
+}
+
+/*
+ * Is walreceiver running and streaming (or at least attempting to connect,
+ * or starting up)?
+ */
+bool
+WalRcvStreaming(void)
+{
+ WalRcvData *walrcv = WalRcv;
+ WalRcvState state;
+ pg_time_t startTime;
+
+ SpinLockAcquire(&walrcv->mutex);
+
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
+
+ SpinLockRelease(&walrcv->mutex);
+
+ /*
+ * If it has taken too long for walreceiver to start up, give up. Setting
+ * the state to STOPPED ensures that if walreceiver later does start up
+ * after all, it will see that it's not supposed to be running and die
+ * without doing anything.
+ */
+ if (state == WALRCV_STARTING)
+ {
+ pg_time_t now = (pg_time_t) time(NULL);
+
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
+ {
+ bool stopped = false;
+
+ SpinLockAcquire(&walrcv->mutex);
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ {
+ state = walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
+ }
+ SpinLockRelease(&walrcv->mutex);
+
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+ }
+ }
+
+ if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+ state == WALRCV_RESTARTING)
+ return true;
+ else
+ return false;
+}
+
+/*
+ * Stop walreceiver (if running) and wait for it to die.
+ * Executed by the Startup process.
+ */
+void
+ShutdownWalRcv(void)
+{
+ WalRcvData *walrcv = WalRcv;
+ pid_t walrcvpid = 0;
+ bool stopped = false;
+
+ /*
+ * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
+ * mode once it's finished, and will also request postmaster to not
+ * restart itself.
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ switch (walrcv->walRcvState)
+ {
+ case WALRCV_STOPPED:
+ break;
+ case WALRCV_STARTING:
+ walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
+ break;
+
+ case WALRCV_STREAMING:
+ case WALRCV_WAITING:
+ case WALRCV_RESTARTING:
+ walrcv->walRcvState = WALRCV_STOPPING;
+ /* fall through */
+ case WALRCV_STOPPING:
+ walrcvpid = walrcv->pid;
+ break;
+ }
+ SpinLockRelease(&walrcv->mutex);
+
+ /* Unnecessary but consistent. */
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+
+ /*
+ * Signal walreceiver process if it was still running.
+ */
+ if (walrcvpid != 0)
+ kill(walrcvpid, SIGTERM);
+
+ /*
+ * Wait for walreceiver to acknowledge its death by setting state to
+ * WALRCV_STOPPED.
+ */
+ ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
+ while (WalRcvRunning())
+ ConditionVariableSleep(&walrcv->walRcvStoppedCV,
+ WAIT_EVENT_WAL_RECEIVER_EXIT);
+ ConditionVariableCancelSleep();
+}
+
+/*
+ * Request postmaster to start walreceiver.
+ *
+ * "recptr" indicates the position where streaming should begin. "conninfo"
+ * is a libpq connection string to use. "slotname" is, optionally, the name
+ * of a replication slot to acquire. "create_temp_slot" indicates to create
+ * a temporary slot when no "slotname" is given.
+ *
+ * WAL receivers do not directly load GUC parameters used for the connection
+ * to the primary, and rely on the values passed down by the caller of this
+ * routine instead. Hence, the addition of any new parameters should happen
+ * through this code path.
+ */
+void
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
+ const char *slotname, bool create_temp_slot)
+{
+ WalRcvData *walrcv = WalRcv;
+ bool launch = false;
+ pg_time_t now = (pg_time_t) time(NULL);
+ Latch *latch;
+
+ /*
+ * We always start at the beginning of the segment. That prevents a broken
+ * segment (i.e., with no records in the first half of a segment) from
+ * being created by XLOG streaming, which might cause trouble later on if
+ * the segment is e.g archived.
+ */
+ if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
+ recptr -= XLogSegmentOffset(recptr, wal_segment_size);
+
+ SpinLockAcquire(&walrcv->mutex);
+
+ /* It better be stopped if we try to restart it */
+ Assert(walrcv->walRcvState == WALRCV_STOPPED ||
+ walrcv->walRcvState == WALRCV_WAITING);
+
+ if (conninfo != NULL)
+ strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
+ else
+ walrcv->conninfo[0] = '\0';
+
+ /*
+ * Use configured replication slot if present, and ignore the value of
+ * create_temp_slot as the slot name should be persistent. Otherwise, use
+ * create_temp_slot to determine whether this WAL receiver should create a
+ * temporary slot by itself and use it, or not.
+ */
+ if (slotname != NULL && slotname[0] != '\0')
+ {
+ strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
+ walrcv->is_temp_slot = false;
+ }
+ else
+ {
+ walrcv->slotname[0] = '\0';
+ walrcv->is_temp_slot = create_temp_slot;
+ }
+
+ if (walrcv->walRcvState == WALRCV_STOPPED)
+ {
+ launch = true;
+ walrcv->walRcvState = WALRCV_STARTING;
+ }
+ else
+ walrcv->walRcvState = WALRCV_RESTARTING;
+ walrcv->startTime = now;
+
+ /*
+ * If this is the first startup of walreceiver (on this timeline),
+ * initialize flushedUpto and latestChunkStart to the starting point.
+ */
+ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
+ {
+ walrcv->flushedUpto = recptr;
+ walrcv->receivedTLI = tli;
+ walrcv->latestChunkStart = recptr;
+ }
+ walrcv->receiveStart = recptr;
+ walrcv->receiveStartTLI = tli;
+
+ latch = walrcv->latch;
+
+ SpinLockRelease(&walrcv->mutex);
+
+ if (launch)
+ SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+ else if (latch)
+ SetLatch(latch);
+}
+
+/*
+ * Returns the last+1 byte position that walreceiver has flushed.
+ *
+ * Optionally, returns the previous chunk start, that is the first byte
+ * written in the most recent walreceiver flush cycle. Callers not
+ * interested in that value may pass NULL for latestChunkStart. Same for
+ * receiveTLI.
+ */
+XLogRecPtr
+GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
+{
+ WalRcvData *walrcv = WalRcv;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&walrcv->mutex);
+ recptr = walrcv->flushedUpto;
+ if (latestChunkStart)
+ *latestChunkStart = walrcv->latestChunkStart;
+ if (receiveTLI)
+ *receiveTLI = walrcv->receivedTLI;
+ SpinLockRelease(&walrcv->mutex);
+
+ return recptr;
+}
+
+/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+ WalRcvData *walrcv = WalRcv;
+
+ return pg_atomic_read_u64(&walrcv->writtenUpto);
+}
+
+/*
+ * Returns the replication apply delay in ms or -1
+ * if the apply delay info is not available
+ */
+int
+GetReplicationApplyDelay(void)
+{
+ WalRcvData *walrcv = WalRcv;
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+ TimestampTz chunkReplayStartTime;
+
+ SpinLockAcquire(&walrcv->mutex);
+ receivePtr = walrcv->flushedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ replayPtr = GetXLogReplayRecPtr(NULL);
+
+ if (receivePtr == replayPtr)
+ return 0;
+
+ chunkReplayStartTime = GetCurrentChunkReplayStartTime();
+
+ if (chunkReplayStartTime == 0)
+ return -1;
+
+ return TimestampDifferenceMilliseconds(chunkReplayStartTime,
+ GetCurrentTimestamp());
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+ WalRcvData *walrcv = WalRcv;
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
+ SpinLockAcquire(&walrcv->mutex);
+ lastMsgSendTime = walrcv->lastMsgSendTime;
+ lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ return TimestampDifferenceMilliseconds(lastMsgSendTime,
+ lastMsgReceiptTime);
+}