diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/walsender.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 3729 |
1 files changed, 3729 insertions, 0 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c new file mode 100644 index 0000000..28f0a29 --- /dev/null +++ b/src/backend/replication/walsender.c @@ -0,0 +1,3729 @@ +/*------------------------------------------------------------------------- + * + * walsender.c + * + * The WAL sender process (walsender) is new as of Postgres 9.0. It takes + * care of sending XLOG from the primary server to a single recipient. + * (Note that there can be more than one walsender process concurrently.) + * It is started by the postmaster when the walreceiver of a standby server + * connects to the primary server and requests XLOG streaming replication. + * + * A walsender is similar to a regular backend, ie. there is a one-to-one + * relationship between a connection and a walsender process, but instead + * of processing SQL queries, it understands a small set of special + * replication-mode commands. The START_REPLICATION command begins streaming + * WAL to the client. While streaming, the walsender keeps reading XLOG + * records from the disk and sends them to the standby server over the + * COPY protocol, until either side ends the replication by exiting COPY + * mode (or until the connection is closed). + * + * Normal termination is by SIGTERM, which instructs the walsender to + * close the connection and exit(0) at the next convenient moment. Emergency + * termination is by SIGQUIT; like any backend, the walsender will simply + * abort and exit on SIGQUIT. A close of the connection and a FATAL error + * are treated as not a crash but approximately normal termination; + * the walsender will exit quickly without sending any more XLOG records. + * + * If the server is shut down, checkpointer sends us + * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If + * the backend is idle or runs an SQL query this causes the backend to + * shutdown, if logical replication is in progress all existing WAL records + * are processed followed by a shutdown. Otherwise this causes the walsender + * to switch to the "stopping" state. In this state, the walsender will reject + * any further replication commands. The checkpointer begins the shutdown + * checkpoint once all walsenders are confirmed as stopping. When the shutdown + * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs + * walsender to send any outstanding WAL, including the shutdown checkpoint + * record, wait for it to be replicated to the standby, and then exit. + * + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/walsender.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <signal.h> +#include <unistd.h> + +#include "access/printtup.h" +#include "access/timeline.h" +#include "access/transam.h" +#include "access/xact.h" +#include "access/xlog_internal.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_type.h" +#include "commands/dbcommands.h" +#include "commands/defrem.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "miscadmin.h" +#include "nodes/replnodes.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/basebackup.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/slot.h" +#include "replication/snapbuild.h" +#include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" +#include "replication/walsender_private.h" +#include "storage/condition_variable.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "tcop/dest.h" +#include "tcop/tcopprot.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" +#include "utils/portal.h" +#include "utils/ps_status.h" +#include "utils/timeout.h" +#include "utils/timestamp.h" + +/* + * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. + * + * We don't have a good idea of what a good value would be; there's some + * overhead per message in both walsender and walreceiver, but on the other + * hand sending large batches makes walsender less responsive to signals + * because signals are checked only between messages. 128kB (with + * default 8k blocks) seems like a reasonable guess for now. + */ +#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) + +/* Array of WalSnds in shared memory */ +WalSndCtlData *WalSndCtl = NULL; + +/* My slot in the shared memory array */ +WalSnd *MyWalSnd = NULL; + +/* Global state */ +bool am_walsender = false; /* Am I a walsender process? */ +bool am_cascading_walsender = false; /* Am I cascading WAL to another + * standby? */ +bool am_db_walsender = false; /* Connected to a database? */ + +/* User-settable parameters for walsender */ +int max_wal_senders = 0; /* the maximum number of concurrent + * walsenders */ +int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL + * data message */ +bool log_replication_commands = false; + +/* + * State for WalSndWakeupRequest + */ +bool wake_wal_senders = false; + +/* + * xlogreader used for replication. Note that a WAL sender doing physical + * replication does not need xlogreader to read WAL, but it needs one to + * keep a state of its work. + */ +static XLogReaderState *xlogreader = NULL; + +/* + * These variables keep track of the state of the timeline we're currently + * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric, + * the timeline is not the latest timeline on this server, and the server's + * history forked off from that timeline at sendTimeLineValidUpto. + */ +static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; +static bool sendTimeLineIsHistoric = false; +static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; + +/* + * How far have we sent WAL already? This is also advertised in + * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) + */ +static XLogRecPtr sentPtr = InvalidXLogRecPtr; + +/* Buffers for constructing outgoing messages and processing reply messages. */ +static StringInfoData output_message; +static StringInfoData reply_message; +static StringInfoData tmpbuf; + +/* Timestamp of last ProcessRepliesIfAny(). */ +static TimestampTz last_processing = 0; + +/* + * Timestamp of last ProcessRepliesIfAny() that saw a reply from the + * standby. Set to 0 if wal_sender_timeout doesn't need to be active. + */ +static TimestampTz last_reply_timestamp = 0; + +/* Have we sent a heartbeat message asking for reply, since last reply? */ +static bool waiting_for_ping_response = false; + +/* + * While streaming WAL in Copy mode, streamingDoneSending is set to true + * after we have sent CopyDone. We should not send any more CopyData messages + * after that. streamingDoneReceiving is set to true when we receive CopyDone + * from the other end. When both become true, it's time to exit Copy mode. + */ +static bool streamingDoneSending; +static bool streamingDoneReceiving; + +/* Are we there yet? */ +static bool WalSndCaughtUp = false; + +/* Flags set by signal handlers for later service in main loop */ +static volatile sig_atomic_t got_SIGUSR2 = false; +static volatile sig_atomic_t got_STOPPING = false; + +/* + * This is set while we are streaming. When not set + * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, + * the main loop is responsible for checking got_STOPPING and terminating when + * it's set (after streaming any remaining WAL). + */ +static volatile sig_atomic_t replication_active = false; + +static LogicalDecodingContext *logical_decoding_ctx = NULL; + +/* A sample associating a WAL location with the time it was written. */ +typedef struct +{ + XLogRecPtr lsn; + TimestampTz time; +} WalTimeSample; + +/* The size of our buffer of time samples. */ +#define LAG_TRACKER_BUFFER_SIZE 8192 + +/* A mechanism for tracking replication lag. */ +typedef struct +{ + XLogRecPtr last_lsn; + WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]; + int write_head; + int read_heads[NUM_SYNC_REP_WAIT_MODE]; + WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; +} LagTracker; + +static LagTracker *lag_tracker; + +/* Signal handlers */ +static void WalSndLastCycleHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +typedef void (*WalSndSendDataCallback) (void); +static void WalSndLoop(WalSndSendDataCallback send_data); +static void InitWalSenderSlot(void); +static void WalSndKill(int code, Datum arg); +static void WalSndShutdown(void) pg_attribute_noreturn(); +static void XLogSendPhysical(void); +static void XLogSendLogical(void); +static void WalSndDone(WalSndSendDataCallback send_data); +static XLogRecPtr GetStandbyFlushRecPtr(void); +static void IdentifySystem(void); +static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); +static void DropReplicationSlot(DropReplicationSlotCmd *cmd); +static void StartReplication(StartReplicationCmd *cmd); +static void StartLogicalReplication(StartReplicationCmd *cmd); +static void ProcessStandbyMessage(void); +static void ProcessStandbyReplyMessage(void); +static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessRepliesIfAny(void); +static void ProcessPendingWrites(void); +static void WalSndKeepalive(bool requestReply); +static void WalSndKeepaliveIfNecessary(void); +static void WalSndCheckTimeOut(void); +static long WalSndComputeSleeptime(TimestampTz now); +static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); +static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); +static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); +static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); +static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); +static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); +static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); + +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p); + + +/* Initialize walsender process before entering the main command loop */ +void +InitWalSender(void) +{ + am_cascading_walsender = RecoveryInProgress(); + + /* Create a per-walsender data structure in shared memory */ + InitWalSenderSlot(); + + /* + * We don't currently need any ResourceOwner in a walsender process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ + + /* + * Let postmaster know that we're a WAL sender. Once we've declared us as + * a WAL sender process, postmaster will let us outlive the bgwriter and + * kill us last in the shutdown sequence, so we get a chance to stream all + * remaining WAL at shutdown, including the shutdown checkpoint. Note that + * there's no going back, and we mustn't write any WAL records after this. + */ + MarkPostmasterChildWalSender(); + SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); + + /* Initialize empty timestamp buffer for lag tracking. */ + lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); +} + +/* + * Clean up after an error. + * + * WAL sender processes don't use transactions like regular backends do. + * This function does any cleanup required after an error in a WAL sender + * process, similar to what transaction abort does in a regular backend. + */ +void +WalSndErrorCleanup(void) +{ + LWLockReleaseAll(); + ConditionVariableCancelSleep(); + pgstat_report_wait_end(); + + if (xlogreader != NULL && xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); + + if (MyReplicationSlot != NULL) + ReplicationSlotRelease(); + + ReplicationSlotCleanup(); + + replication_active = false; + + /* + * If there is a transaction in progress, it will clean up our + * ResourceOwner, but if a replication command set up a resource owner + * without a transaction, we've got to clean that up now. + */ + if (!IsTransactionOrTransactionBlock()) + WalSndResourceCleanup(false); + + if (got_STOPPING || got_SIGUSR2) + proc_exit(0); + + /* Revert back to startup state */ + WalSndSetState(WALSNDSTATE_STARTUP); +} + +/* + * Clean up any ResourceOwner we created. + */ +void +WalSndResourceCleanup(bool isCommit) +{ + ResourceOwner resowner; + + if (CurrentResourceOwner == NULL) + return; + + /* + * Deleting CurrentResourceOwner is not allowed, so we must save a pointer + * in a local variable and clear it first. + */ + resowner = CurrentResourceOwner; + CurrentResourceOwner = NULL; + + /* Now we can release resources and delete it. */ + ResourceOwnerRelease(resowner, + RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true); + ResourceOwnerRelease(resowner, + RESOURCE_RELEASE_LOCKS, isCommit, true); + ResourceOwnerRelease(resowner, + RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true); + ResourceOwnerDelete(resowner); +} + +/* + * Handle a client's connection abort in an orderly manner. + */ +static void +WalSndShutdown(void) +{ + /* + * Reset whereToSendOutput to prevent ereport from attempting to send any + * more messages to the standby. + */ + if (whereToSendOutput == DestRemote) + whereToSendOutput = DestNone; + + proc_exit(0); + abort(); /* keep the compiler quiet */ +} + +/* + * Handle the IDENTIFY_SYSTEM command. + */ +static void +IdentifySystem(void) +{ + char sysid[32]; + char xloc[MAXFNAMELEN]; + XLogRecPtr logptr; + char *dbname = NULL; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[4]; + bool nulls[4]; + + /* + * Reply with a result set with one row, four columns. First col is system + * ID, second is timeline ID, third is current xlog location and the + * fourth contains the database name if we are connected to one. + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + + am_cascading_walsender = RecoveryInProgress(); + if (am_cascading_walsender) + { + /* this also updates ThisTimeLineID */ + logptr = GetStandbyFlushRecPtr(); + } + else + logptr = GetFlushRecPtr(); + + snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr)); + + if (MyDatabaseId != InvalidOid) + { + MemoryContext cur = CurrentMemoryContext; + + /* syscache access needs a transaction env. */ + StartTransactionCommand(); + /* make dbname live outside TX context */ + MemoryContextSwitchTo(cur); + dbname = get_database_name(MyDatabaseId); + CommitTransactionCommand(); + /* CommitTransactionCommand switches to TopMemoryContext */ + MemoryContextSwitchTo(cur); + } + + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); + + /* need a tuple descriptor representing four columns */ + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline", + INT4OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname", + TEXTOID, -1, 0); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* column 1: system identifier */ + values[0] = CStringGetTextDatum(sysid); + + /* column 2: timeline */ + values[1] = Int32GetDatum(ThisTimeLineID); + + /* column 3: wal location */ + values[2] = CStringGetTextDatum(xloc); + + /* column 4: database name, or NULL if none */ + if (dbname) + values[3] = CStringGetTextDatum(dbname); + else + nulls[3] = true; + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + + end_tup_output(tstate); +} + + +/* + * Handle TIMELINE_HISTORY command. + */ +static void +SendTimeLineHistory(TimeLineHistoryCmd *cmd) +{ + StringInfoData buf; + char histfname[MAXFNAMELEN]; + char path[MAXPGPATH]; + int fd; + off_t histfilelen; + off_t bytesleft; + Size len; + + /* + * Reply with a result set with one row, and two columns. The first col is + * the name of the history file, 2nd is the contents. + */ + + TLHistoryFileName(histfname, cmd->timeline); + TLHistoryFilePath(path, cmd->timeline); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint16(&buf, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "filename"); /* col name */ + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, TEXTOID); /* type oid */ + pq_sendint16(&buf, -1); /* typlen */ + pq_sendint32(&buf, 0); /* typmod */ + pq_sendint16(&buf, 0); /* format code */ + + /* second field */ + pq_sendstring(&buf, "content"); /* col name */ + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, TEXTOID); /* type oid */ + pq_sendint16(&buf, -1); /* typlen */ + pq_sendint32(&buf, 0); /* typmod */ + pq_sendint16(&buf, 0); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 2); /* # of columns */ + len = strlen(histfname); + pq_sendint32(&buf, len); /* col1 len */ + pq_sendbytes(&buf, histfname, len); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* Determine file length and send it to client */ + histfilelen = lseek(fd, 0, SEEK_END); + if (histfilelen < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to end of file \"%s\": %m", path))); + if (lseek(fd, 0, SEEK_SET) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not seek to beginning of file \"%s\": %m", path))); + + pq_sendint32(&buf, histfilelen); /* col2 len */ + + bytesleft = histfilelen; + while (bytesleft > 0) + { + PGAlignedBlock rbuf; + int nread; + + pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ); + nread = read(fd, rbuf.data, sizeof(rbuf)); + pgstat_report_wait_end(); + if (nread < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + else if (nread == 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, nread, (Size) bytesleft))); + + pq_sendbytes(&buf, rbuf.data, nread); + bytesleft -= nread; + } + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); + + pq_endmessage(&buf); +} + +/* + * Handle START_REPLICATION command. + * + * At the moment, this never returns, but an ereport(ERROR) will take us back + * to the main loop. + */ +static void +StartReplication(StartReplicationCmd *cmd) +{ + StringInfoData buf; + XLogRecPtr FlushPtr; + + if (ThisTimeLineID == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); + + /* create xlogreader for physical replication */ + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + NULL); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + /* + * We assume here that we're logging enough information in the WAL for + * log-shipping, since this is checked in PostmasterMain(). + * + * NOTE: wal_level can only change at shutdown, so in most cases it is + * difficult for there to be WAL data that we can still see that was + * written at wal_level='minimal'. + */ + + if (cmd->slotname) + { + ReplicationSlotAcquire(cmd->slotname, true); + if (SlotIsLogical(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use a logical replication slot for physical replication"))); + + /* + * We don't need to verify the slot's restart_lsn here; instead we + * rely on the caller requesting the starting point to use. If the + * WAL segment doesn't exist, we'll fail later. + */ + } + + /* + * Select the timeline. If it was given explicitly by the client, use + * that. Otherwise use the timeline of the last replayed record, which is + * kept in ThisTimeLineID. + */ + if (am_cascading_walsender) + { + /* this also updates ThisTimeLineID */ + FlushPtr = GetStandbyFlushRecPtr(); + } + else + FlushPtr = GetFlushRecPtr(); + + if (cmd->timeline != 0) + { + XLogRecPtr switchpoint; + + sendTimeLine = cmd->timeline; + if (sendTimeLine == ThisTimeLineID) + { + sendTimeLineIsHistoric = false; + sendTimeLineValidUpto = InvalidXLogRecPtr; + } + else + { + List *timeLineHistory; + + sendTimeLineIsHistoric = true; + + /* + * Check that the timeline the client requested exists, and the + * requested start location is on that timeline. + */ + timeLineHistory = readTimeLineHistory(ThisTimeLineID); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, + &sendTimeLineNextTLI); + list_free_deep(timeLineHistory); + + /* + * Found the requested timeline in the history. Check that + * requested startpoint is on that timeline in our history. + * + * This is quite loose on purpose. We only check that we didn't + * fork off the requested timeline before the switchpoint. We + * don't check that we switched *to* it before the requested + * starting point. This is because the client can legitimately + * request to start replication from the beginning of the WAL + * segment that contains switchpoint, but on the new timeline, so + * that it doesn't end up with a partial segment. If you ask for + * too old a starting point, you'll get an error later when we + * fail to find the requested WAL segment in pg_wal. + * + * XXX: we could be more strict here and only allow a startpoint + * that's older than the switchpoint, if it's still in the same + * WAL segment. + */ + if (!XLogRecPtrIsInvalid(switchpoint) && + switchpoint < cmd->startpoint) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", + LSN_FORMAT_ARGS(cmd->startpoint), + cmd->timeline), + errdetail("This server's history forked from timeline %u at %X/%X.", + cmd->timeline, + LSN_FORMAT_ARGS(switchpoint)))); + } + sendTimeLineValidUpto = switchpoint; + } + } + else + { + sendTimeLine = ThisTimeLineID; + sendTimeLineValidUpto = InvalidXLogRecPtr; + sendTimeLineIsHistoric = false; + } + + streamingDoneSending = streamingDoneReceiving = false; + + /* If there is nothing to stream, don't even enter COPY mode */ + if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) + { + /* + * When we first start replication the standby will be behind the + * primary. For some applications, for example synchronous + * replication, it is important to have a clear state for this initial + * catchup mode, so we can trigger actions when we change streaming + * state later. We may stay in this state for a long time, which is + * exactly why we want to be able to monitor whether or not we are + * still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + pq_flush(); + + /* + * Don't allow a request to stream from a future point in WAL that + * hasn't been flushed to disk in this server yet. + */ + if (FlushPtr < cmd->startpoint) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", + LSN_FORMAT_ARGS(cmd->startpoint), + LSN_FORMAT_ARGS(FlushPtr)))); + } + + /* Start streaming from the requested point */ + sentPtr = cmd->startpoint; + + /* Initialize shared memory status, too */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = sentPtr; + SpinLockRelease(&MyWalSnd->mutex); + + SyncRepInitConfig(); + + /* Main loop of walsender */ + replication_active = true; + + WalSndLoop(XLogSendPhysical); + + replication_active = false; + if (got_STOPPING) + proc_exit(0); + WalSndSetState(WALSNDSTATE_STARTUP); + + Assert(streamingDoneSending && streamingDoneReceiving); + } + + if (cmd->slotname) + ReplicationSlotRelease(); + + /* + * Copy is finished now. Send a single-row result set indicating the next + * timeline. + */ + if (sendTimeLineIsHistoric) + { + char startpos_str[8 + 1 + 8 + 1]; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[2]; + bool nulls[2]; + + snprintf(startpos_str, sizeof(startpos_str), "%X/%X", + LSN_FORMAT_ARGS(sendTimeLineValidUpto)); + + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); + + /* + * Need a tuple descriptor representing two columns. int8 may seem + * like a surprising data type for this, but in theory int4 would not + * be wide enough for this, as TimeLineID is unsigned. + */ + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos", + TEXTOID, -1, 0); + + /* prepare for projection of tuple */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + values[0] = Int64GetDatum((int64) sendTimeLineNextTLI); + values[1] = CStringGetTextDatum(startpos_str); + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + + end_tup_output(tstate); + } + + /* Send CommandComplete message */ + EndReplicationCommand("START_STREAMING"); +} + +/* + * XLogReaderRoutine->page_read callback for logical decoding contexts, as a + * walsender process. + * + * Inside the walsender we can do better than read_local_xlog_page, + * which has to do a plain sleep/busy loop, because the walsender's latch gets + * set every time WAL is flushed. + */ +static int +logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *cur_page) +{ + XLogRecPtr flushptr; + int count; + WALReadError errinfo; + XLogSegNo segno; + + XLogReadDetermineTimeline(state, targetPagePtr, reqLen); + sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); + sendTimeLine = state->currTLI; + sendTimeLineValidUpto = state->currTLIValidUntil; + sendTimeLineNextTLI = state->nextTLI; + + /* make sure we have enough WAL available */ + flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + + /* fail if not (implies we are going to shut down) */ + if (flushptr < targetPagePtr + reqLen) + return -1; + + if (targetPagePtr + XLOG_BLCKSZ <= flushptr) + count = XLOG_BLCKSZ; /* more than one block available */ + else + count = flushptr - targetPagePtr; /* part of the page available */ + + /* now actually read the data, we know it's there */ + if (!WALRead(state, + cur_page, + targetPagePtr, + XLOG_BLCKSZ, + state->seg.ws_tli, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new + * TLI is needed. */ + &errinfo)) + WALReadRaiseError(&errinfo); + + /* + * After reading into the buffer, check that what we read was valid. We do + * this after reading, because even though the segment was present when we + * opened it, it might get recycled or removed while we read it. The + * read() succeeds in that case, but the data we tried to read might + * already have been overwritten with new WAL records. + */ + XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); + CheckXLogRemoved(segno, state->seg.ws_tli); + + return count; +} + +/* + * Process extra options given to CREATE_REPLICATION_SLOT. + */ +static void +parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, + bool *reserve_wal, + CRSSnapshotAction *snapshot_action) +{ + ListCell *lc; + bool snapshot_action_given = false; + bool reserve_wal_given = false; + + /* Parse options */ + foreach(lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "export_snapshot") == 0) + { + if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + snapshot_action_given = true; + *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT : + CRS_NOEXPORT_SNAPSHOT; + } + else if (strcmp(defel->defname, "use_snapshot") == 0) + { + if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + snapshot_action_given = true; + *snapshot_action = CRS_USE_SNAPSHOT; + } + else if (strcmp(defel->defname, "reserve_wal") == 0) + { + if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + reserve_wal_given = true; + *reserve_wal = true; + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Create a new replication slot. + */ +static void +CreateReplicationSlot(CreateReplicationSlotCmd *cmd) +{ + const char *snapshot_name = NULL; + char xloc[MAXFNAMELEN]; + char *slot_name; + bool reserve_wal = false; + CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[4]; + bool nulls[4]; + + Assert(!MyReplicationSlot); + + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action); + + /* setup state for WalSndSegmentOpen */ + sendTimeLineIsHistoric = false; + sendTimeLine = ThisTimeLineID; + + if (cmd->kind == REPLICATION_KIND_PHYSICAL) + { + ReplicationSlotCreate(cmd->slotname, false, + cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, + false); + } + else + { + CheckLogicalDecodingRequirements(); + + /* + * Initially create persistent slot as ephemeral - that allows us to + * nicely handle errors during initialization because it'll get + * dropped if this transaction fails. We'll make it persistent at the + * end. Temporary slots can be created as temporary from beginning as + * they get dropped on error as well. + */ + ReplicationSlotCreate(cmd->slotname, true, + cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, + false); + } + + if (cmd->kind == REPLICATION_KIND_LOGICAL) + { + LogicalDecodingContext *ctx; + bool need_full_snapshot = false; + + /* + * Do options check early so that we can bail before calling the + * DecodingContextFindStartpoint which can take long time. + */ + if (snapshot_action == CRS_EXPORT_SNAPSHOT) + { + if (IsTransactionBlock()) + ereport(ERROR, + /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ + (errmsg("%s must not be called inside a transaction", + "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT"))); + + need_full_snapshot = true; + } + else if (snapshot_action == CRS_USE_SNAPSHOT) + { + if (!IsTransactionBlock()) + ereport(ERROR, + /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ + (errmsg("%s must be called inside a transaction", + "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + + if (XactIsoLevel != XACT_REPEATABLE_READ) + ereport(ERROR, + /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ + (errmsg("%s must be called in REPEATABLE READ isolation mode transaction", + "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + + if (FirstSnapshotSet) + ereport(ERROR, + /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ + (errmsg("%s must be called before any query", + "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + + if (IsSubTransaction()) + ereport(ERROR, + /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ + (errmsg("%s must not be called in a subtransaction", + "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + + need_full_snapshot = true; + } + + ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + InvalidXLogRecPtr, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); + + /* + * Signal that we don't need the timeout mechanism. We're just + * creating the replication slot and don't yet accept feedback + * messages or send keepalives. As we possibly need to wait for + * further WAL the walsender would otherwise possibly be killed too + * soon. + */ + last_reply_timestamp = 0; + + /* build initial snapshot, might take a while */ + DecodingContextFindStartpoint(ctx); + + /* + * Export or use the snapshot if we've been asked to do so. + * + * NB. We will convert the snapbuild.c kind of snapshot to normal + * snapshot when doing this. + */ + if (snapshot_action == CRS_EXPORT_SNAPSHOT) + { + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); + } + else if (snapshot_action == CRS_USE_SNAPSHOT) + { + Snapshot snap; + + snap = SnapBuildInitialSnapshot(ctx->snapshot_builder); + RestoreTransactionSnapshot(snap, MyProc); + } + + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + + if (!cmd->temporary) + ReplicationSlotPersist(); + } + else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal) + { + ReplicationSlotReserveWal(); + + ReplicationSlotMarkDirty(); + + /* Write this slot to disk if it's a permanent one. */ + if (!cmd->temporary) + ReplicationSlotSave(); + } + + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush)); + + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); + + /*---------- + * Need a tuple descriptor representing four columns: + * - first field: the slot name + * - second field: LSN at which we became consistent + * - third field: exported snapshot's name + * - fourth field: output plugin + *---------- + */ + tupdesc = CreateTemplateTupleDesc(4); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin", + TEXTOID, -1, 0); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* slot_name */ + slot_name = NameStr(MyReplicationSlot->data.name); + values[0] = CStringGetTextDatum(slot_name); + + /* consistent wal location */ + values[1] = CStringGetTextDatum(xloc); + + /* snapshot name, or NULL if none */ + if (snapshot_name != NULL) + values[2] = CStringGetTextDatum(snapshot_name); + else + nulls[2] = true; + + /* plugin, or NULL if none */ + if (cmd->plugin != NULL) + values[3] = CStringGetTextDatum(cmd->plugin); + else + nulls[3] = true; + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); + + ReplicationSlotRelease(); +} + +/* + * Get rid of a replication slot that is no longer wanted. + */ +static void +DropReplicationSlot(DropReplicationSlotCmd *cmd) +{ + ReplicationSlotDrop(cmd->slotname, !cmd->wait); +} + +/* + * Load previously initiated logical slot and prepare for sending data (via + * WalSndLoop). + */ +static void +StartLogicalReplication(StartReplicationCmd *cmd) +{ + StringInfoData buf; + QueryCompletion qc; + + /* make sure that our requirements are still fulfilled */ + CheckLogicalDecodingRequirements(); + + Assert(!MyReplicationSlot); + + ReplicationSlotAcquire(cmd->slotname, true); + + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + cmd->slotname), + errdetail("This slot has been invalidated because it exceeded the maximum reserved size."))); + + /* + * Force a disconnect, so that the decoding code doesn't need to care + * about an eventual switch from running in recovery, to running in a + * normal environment. Client code is expected to handle reconnects. + */ + if (am_cascading_walsender && !RecoveryInProgress()) + { + ereport(LOG, + (errmsg("terminating walsender process after promotion"))); + got_STOPPING = true; + } + + /* + * Create our decoding context, making it start at the previously ack'ed + * position. + * + * Do this before sending a CopyBothResponse message, so that any errors + * are reported early. + */ + logical_decoding_ctx = + CreateDecodingContext(cmd->startpoint, cmd->options, false, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); + xlogreader = logical_decoding_ctx->reader; + + WalSndSetState(WALSNDSTATE_CATCHUP); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + pq_flush(); + + /* Start reading WAL from the oldest required WAL. */ + XLogBeginRead(logical_decoding_ctx->reader, + MyReplicationSlot->data.restart_lsn); + + /* + * Report the location after which we'll send out further commits as the + * current sentPtr. + */ + sentPtr = MyReplicationSlot->data.confirmed_flush; + + /* Also update the sent position status in shared memory */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn; + SpinLockRelease(&MyWalSnd->mutex); + + replication_active = true; + + SyncRepInitConfig(); + + /* Main loop of walsender */ + WalSndLoop(XLogSendLogical); + + FreeDecodingContext(logical_decoding_ctx); + ReplicationSlotRelease(); + + replication_active = false; + if (got_STOPPING) + proc_exit(0); + WalSndSetState(WALSNDSTATE_STARTUP); + + /* Get out of COPY mode (CommandComplete). */ + SetQueryCompletion(&qc, CMDTAG_COPY, 0); + EndCommand(&qc, DestRemote, false); +} + +/* + * LogicalDecodingContext 'prepare_write' callback. + * + * Prepare a write into a StringInfo. + * + * Don't do anything lasting in here, it's quite possible that nothing will be done + * with the data. + */ +static void +WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) +{ + /* can't have sync rep confused by sending the same LSN several times */ + if (!last_write) + lsn = InvalidXLogRecPtr; + + resetStringInfo(ctx->out); + + pq_sendbyte(ctx->out, 'w'); + pq_sendint64(ctx->out, lsn); /* dataStart */ + pq_sendint64(ctx->out, lsn); /* walEnd */ + + /* + * Fill out the sendtime later, just as it's done in XLogSendPhysical, but + * reserve space here. + */ + pq_sendint64(ctx->out, 0); /* sendtime */ +} + +/* + * LogicalDecodingContext 'write' callback. + * + * Actually write out data previously prepared by WalSndPrepareWrite out to + * the network. Take as long as needed, but process replies from the other + * side and check timeouts during that. + */ +static void +WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, + bool last_write) +{ + TimestampTz now; + + /* + * Fill the send timestamp last, so that it is taken as late as possible. + * This is somewhat ugly, but the protocol is set as it's already used for + * several releases by streaming physical replication. + */ + resetStringInfo(&tmpbuf); + now = GetCurrentTimestamp(); + pq_sendint64(&tmpbuf, now); + memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], + tmpbuf.data, sizeof(int64)); + + /* output previously gathered data in a CopyData packet */ + pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); + + CHECK_FOR_INTERRUPTS(); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* Try taking fast path unless we get too close to walsender timeout. */ + if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2) && + !pq_is_send_pending()) + { + return; + } + + /* If we have pending write here, go to slow path */ + ProcessPendingWrites(); +} + +/* + * Wait until there is no pending write. Also process replies from the other + * side and check timeouts during that. + */ +static void +ProcessPendingWrites(void) +{ + for (;;) + { + long sleeptime; + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); + + if (!pq_is_send_pending()) + break; + + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + /* Sleep until something happens or we time out */ + WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); + + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } + + /* reactivate latch so WalSndLoop knows to continue */ + SetLatch(MyLatch); +} + +/* + * LogicalDecodingContext 'update_progress' callback. + * + * Write the current position to the lag tracker (see XLogSendPhysical). + */ +static void +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +{ + static TimestampTz sendTime = 0; + TimestampTz now = GetCurrentTimestamp(); + bool end_xact = ctx->end_xact; + + /* + * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to + * avoid flooding the lag tracker when we commit frequently. + * + * We don't have a mechanism to get the ack for any LSN other than end + * xact LSN from the downstream. So, we track lag only for end of + * transaction LSN. + */ +#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 + if (end_xact && TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + { + LagTrackerWrite(lsn, now); + sendTime = now; + } + + /* + * Try to send a keepalive if required. We don't need to try sending keep + * alive messages at the transaction end as that will be done at a later + * point in time. This is required only for large transactions where we + * don't send any changes to the downstream and the receiver can timeout + * due to that. + */ + if (!end_xact && + now >= TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2)) + ProcessPendingWrites(); +} + +/* + * Wait till WAL < loc is flushed to disk so it can be safely sent to client. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but + * if we detect a shutdown request (either from postmaster or client) + * we will return early, so caller must always check. + */ +static XLogRecPtr +WalSndWaitForWal(XLogRecPtr loc) +{ + int wakeEvents; + static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + + /* + * Fast path to avoid acquiring the spinlock in case we already know we + * have enough WAL available. This is particularly interesting if we're + * far behind. + */ + if (RecentFlushPtr != InvalidXLogRecPtr && + loc <= RecentFlushPtr) + return RecentFlushPtr; + + /* Get a more recent flush pointer. */ + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + + for (;;) + { + long sleeptime; + + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* + * If we're shutting down, trigger pending WAL to be written out, + * otherwise we'd possibly end up waiting for WAL that never gets + * written, because walwriter has shut down already. + */ + if (got_STOPPING) + XLogBackgroundFlush(); + + /* Update our idea of the currently flushed position. */ + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + + /* + * If postmaster asked us to stop, don't wait anymore. + * + * It's important to do this check after the recomputation of + * RecentFlushPtr, so we can send all remaining data before shutting + * down. + */ + if (got_STOPPING) + break; + + /* + * We only send regular messages to the client for full decoded + * transactions, but a synchronous replication and walsender shutdown + * possibly are waiting for a later location. So, before sleeping, we + * send a ping containing the flush location. If the receiver is + * otherwise idle, this keepalive will trigger a reply. Processing the + * reply will update these MyWalSnd locations. + */ + if (MyWalSnd->flush < sentPtr && + MyWalSnd->write < sentPtr && + !waiting_for_ping_response) + WalSndKeepalive(false); + + /* check whether we're done */ + if (loc <= RecentFlushPtr) + break; + + /* Waiting for new WAL. Since we need to wait, we're now caught up. */ + WalSndCaughtUp = true; + + /* + * Try to flush any pending output to the client. + */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* + * If we have received CopyDone from the client, sent CopyDone + * ourselves, and the output buffer is empty, it's time to exit + * streaming, so fail the current WAL fetch request. + */ + if (streamingDoneReceiving && streamingDoneSending && + !pq_is_send_pending()) + break; + + /* die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); + + /* + * Sleep until something happens or we time out. Also wait for the + * socket becoming writable, if there's still pending output. + * Otherwise we might sit on sendable output data while waiting for + * new WAL to be generated. (But if we have nothing to send, we don't + * want to wake on socket-writable.) + */ + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + wakeEvents = WL_SOCKET_READABLE; + + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; + + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + + /* reactivate latch so WalSndLoop knows to continue */ + SetLatch(MyLatch); + return RecentFlushPtr; +} + +/* + * Execute an incoming replication command. + * + * Returns true if the cmd_string was recognized as WalSender command, false + * if not. + */ +bool +exec_replication_command(const char *cmd_string) +{ + int parse_rc; + Node *cmd_node; + const char *cmdtag; + MemoryContext cmd_context; + MemoryContext old_context; + + /* + * If WAL sender has been told that shutdown is getting close, switch its + * status accordingly to handle the next replication commands correctly. + */ + if (got_STOPPING) + WalSndSetState(WALSNDSTATE_STOPPING); + + /* + * Throw error if in stopping mode. We need prevent commands that could + * generate WAL while the shutdown checkpoint is being written. To be + * safe, we just prohibit all new commands. + */ + if (MyWalSnd->state == WALSNDSTATE_STOPPING) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot execute new commands while WAL sender is in stopping mode"))); + + /* + * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next + * command arrives. Clean up the old stuff if there's anything. + */ + SnapBuildClearExportedSnapshot(); + + CHECK_FOR_INTERRUPTS(); + + /* + * Prepare to parse and execute the command. + */ + cmd_context = AllocSetContextCreate(CurrentMemoryContext, + "Replication command context", + ALLOCSET_DEFAULT_SIZES); + old_context = MemoryContextSwitchTo(cmd_context); + + replication_scanner_init(cmd_string); + + /* + * Is it a WalSender command? + */ + if (!replication_scanner_is_replication_command()) + { + /* Nope; clean up and get out. */ + replication_scanner_finish(); + + MemoryContextSwitchTo(old_context); + MemoryContextDelete(cmd_context); + + /* XXX this is a pretty random place to make this check */ + if (MyDatabaseId == InvalidOid) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute SQL commands in WAL sender for physical replication"))); + + /* Tell the caller that this wasn't a WalSender command. */ + return false; + } + + /* + * Looks like a WalSender command, so parse it. + */ + parse_rc = replication_yyparse(); + if (parse_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg_internal("replication command parser returned %d", + parse_rc))); + replication_scanner_finish(); + + cmd_node = replication_parse_result; + + /* + * Report query to various monitoring facilities. For this purpose, we + * report replication commands just like SQL commands. + */ + debug_query_string = cmd_string; + + pgstat_report_activity(STATE_RUNNING, cmd_string); + + /* + * Log replication command if log_replication_commands is enabled. Even + * when it's disabled, log the command with DEBUG1 level for backward + * compatibility. + */ + ereport(log_replication_commands ? LOG : DEBUG1, + (errmsg("received replication command: %s", cmd_string))); + + /* + * Disallow replication commands in aborted transaction blocks. + */ + if (IsAbortedTransactionBlockState()) + ereport(ERROR, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("current transaction is aborted, " + "commands ignored until end of transaction block"))); + + CHECK_FOR_INTERRUPTS(); + + /* + * Allocate buffers that will be used for each outgoing and incoming + * message. We do this just once per command to reduce palloc overhead. + */ + initStringInfo(&output_message); + initStringInfo(&reply_message); + initStringInfo(&tmpbuf); + + switch (cmd_node->type) + { + case T_IdentifySystemCmd: + cmdtag = "IDENTIFY_SYSTEM"; + set_ps_display(cmdtag); + IdentifySystem(); + EndReplicationCommand(cmdtag); + break; + + case T_BaseBackupCmd: + cmdtag = "BASE_BACKUP"; + set_ps_display(cmdtag); + PreventInTransactionBlock(true, cmdtag); + SendBaseBackup((BaseBackupCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + + case T_CreateReplicationSlotCmd: + cmdtag = "CREATE_REPLICATION_SLOT"; + set_ps_display(cmdtag); + CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + + case T_DropReplicationSlotCmd: + cmdtag = "DROP_REPLICATION_SLOT"; + set_ps_display(cmdtag); + DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + + case T_StartReplicationCmd: + { + StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + + cmdtag = "START_REPLICATION"; + set_ps_display(cmdtag); + PreventInTransactionBlock(true, cmdtag); + + if (cmd->kind == REPLICATION_KIND_PHYSICAL) + StartReplication(cmd); + else + StartLogicalReplication(cmd); + + /* dupe, but necessary per libpqrcv_endstreaming */ + EndReplicationCommand(cmdtag); + + Assert(xlogreader != NULL); + break; + } + + case T_TimeLineHistoryCmd: + cmdtag = "TIMELINE_HISTORY"; + set_ps_display(cmdtag); + PreventInTransactionBlock(true, cmdtag); + SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + + case T_VariableShowStmt: + { + DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); + VariableShowStmt *n = (VariableShowStmt *) cmd_node; + + cmdtag = "SHOW"; + set_ps_display(cmdtag); + + /* syscache access needs a transaction environment */ + StartTransactionCommand(); + GetPGVariable(n->name, dest); + CommitTransactionCommand(); + EndReplicationCommand(cmdtag); + } + break; + + default: + elog(ERROR, "unrecognized replication command node tag: %u", + cmd_node->type); + } + + /* done */ + MemoryContextSwitchTo(old_context); + MemoryContextDelete(cmd_context); + + /* + * We need not update ps display or pg_stat_activity, because PostgresMain + * will reset those to "idle". But we must reset debug_query_string to + * ensure it doesn't become a dangling pointer. + */ + debug_query_string = NULL; + + return true; +} + +/* + * Process any incoming messages while streaming. Also checks if the remote + * end has closed the connection. + */ +static void +ProcessRepliesIfAny(void) +{ + unsigned char firstchar; + int maxmsglen; + int r; + bool received = false; + + last_processing = GetCurrentTimestamp(); + + /* + * If we already received a CopyDone from the frontend, any subsequent + * message is the beginning of a new command, and should be processed in + * the main processing loop. + */ + while (!streamingDoneReceiving) + { + pq_startmsgread(); + r = pq_getbyte_if_available(&firstchar); + if (r < 0) + { + /* unexpected error or EOF */ + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + if (r == 0) + { + /* no data available without blocking */ + pq_endmsgread(); + break; + } + + /* Validate message type and set packet size limit */ + switch (firstchar) + { + case 'd': + maxmsglen = PQ_LARGE_MESSAGE_LIMIT; + break; + case 'c': + case 'X': + maxmsglen = PQ_SMALL_MESSAGE_LIMIT; + break; + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby message type \"%c\"", + firstchar))); + maxmsglen = 0; /* keep compiler quiet */ + break; + } + + /* Read the message contents */ + resetStringInfo(&reply_message); + if (pq_getmessage(&reply_message, maxmsglen)) + { + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + + /* ... and process it */ + switch (firstchar) + { + /* + * 'd' means a standby reply wrapped in a CopyData packet. + */ + case 'd': + ProcessStandbyMessage(); + received = true; + break; + + /* + * CopyDone means the standby requested to finish streaming. + * Reply with CopyDone, if we had not sent that already. + */ + case 'c': + if (!streamingDoneSending) + { + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + } + + streamingDoneReceiving = true; + received = true; + break; + + /* + * 'X' means that the standby is closing down the socket. + */ + case 'X': + proc_exit(0); + + default: + Assert(false); /* NOT REACHED */ + } + } + + /* + * Save the last reply timestamp if we've received at least one reply. + */ + if (received) + { + last_reply_timestamp = last_processing; + waiting_for_ping_response = false; + } +} + +/* + * Process a status update message received from standby. + */ +static void +ProcessStandbyMessage(void) +{ + char msgtype; + + /* + * Check message type from the first byte. + */ + msgtype = pq_getmsgbyte(&reply_message); + + switch (msgtype) + { + case 'r': + ProcessStandbyReplyMessage(); + break; + + case 'h': + ProcessStandbyHSFeedbackMessage(); + break; + + default: + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type \"%c\"", msgtype))); + proc_exit(0); + } +} + +/* + * Remember that a walreceiver just confirmed receipt of lsn `lsn`. + */ +static void +PhysicalConfirmReceivedLocation(XLogRecPtr lsn) +{ + bool changed = false; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(lsn != InvalidXLogRecPtr); + SpinLockAcquire(&slot->mutex); + if (slot->data.restart_lsn != lsn) + { + changed = true; + slot->data.restart_lsn = lsn; + } + SpinLockRelease(&slot->mutex); + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredLSN(); + } + + /* + * One could argue that the slot should be saved to disk now, but that'd + * be energy wasted - the worst lost information can do here is give us + * wrong information in a statistics view - we'll just potentially be more + * conservative in removing files. + */ +} + +/* + * Regular reply from standby advising of WAL locations on standby server. + */ +static void +ProcessStandbyReplyMessage(void) +{ + XLogRecPtr writePtr, + flushPtr, + applyPtr; + bool replyRequested; + TimeOffset writeLag, + flushLag, + applyLag; + bool clearLagTimes; + TimestampTz now; + TimestampTz replyTime; + + static bool fullyAppliedLastTime = false; + + /* the caller already consumed the msgtype byte */ + writePtr = pq_getmsgint64(&reply_message); + flushPtr = pq_getmsgint64(&reply_message); + applyPtr = pq_getmsgint64(&reply_message); + replyTime = pq_getmsgint64(&reply_message); + replyRequested = pq_getmsgbyte(&reply_message); + + if (message_level_is_interesting(DEBUG2)) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s", + LSN_FORMAT_ARGS(writePtr), + LSN_FORMAT_ARGS(flushPtr), + LSN_FORMAT_ARGS(applyPtr), + replyRequested ? " (reply requested)" : "", + replyTimeStr); + + pfree(replyTimeStr); + } + + /* See if we can compute the round-trip lag for these positions. */ + now = GetCurrentTimestamp(); + writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now); + flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now); + applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now); + + /* + * If the standby reports that it has fully replayed the WAL in two + * consecutive reply messages, then the second such message must result + * from wal_receiver_status_interval expiring on the standby. This is a + * convenient time to forget the lag times measured when it last + * wrote/flushed/applied a WAL record, to avoid displaying stale lag data + * until more WAL traffic arrives. + */ + clearLagTimes = false; + if (applyPtr == sentPtr) + { + if (fullyAppliedLastTime) + clearLagTimes = true; + fullyAppliedLastTime = true; + } + else + fullyAppliedLastTime = false; + + /* Send a reply if the standby requested one. */ + if (replyRequested) + WalSndKeepalive(false); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->write = writePtr; + walsnd->flush = flushPtr; + walsnd->apply = applyPtr; + if (writeLag != -1 || clearLagTimes) + walsnd->writeLag = writeLag; + if (flushLag != -1 || clearLagTimes) + walsnd->flushLag = flushLag; + if (applyLag != -1 || clearLagTimes) + walsnd->applyLag = applyLag; + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + } + + if (!am_cascading_walsender) + SyncRepReleaseWaiters(); + + /* + * Advance our local xmin horizon when the client confirmed a flush. + */ + if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) + { + if (SlotIsLogical(MyReplicationSlot)) + LogicalConfirmReceivedLocation(flushPtr); + else + PhysicalConfirmReceivedLocation(flushPtr); + } +} + +/* compute new replication slot xmin horizon if needed */ +static void +PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin) +{ + bool changed = false; + ReplicationSlot *slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + MyProc->xmin = InvalidTransactionId; + + /* + * For physical replication we don't need the interlock provided by xmin + * and effective_xmin since the consequences of a missed increase are + * limited to query cancellations, so set both at once. + */ + if (!TransactionIdIsNormal(slot->data.xmin) || + !TransactionIdIsNormal(feedbackXmin) || + TransactionIdPrecedes(slot->data.xmin, feedbackXmin)) + { + changed = true; + slot->data.xmin = feedbackXmin; + slot->effective_xmin = feedbackXmin; + } + if (!TransactionIdIsNormal(slot->data.catalog_xmin) || + !TransactionIdIsNormal(feedbackCatalogXmin) || + TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) + { + changed = true; + slot->data.catalog_xmin = feedbackCatalogXmin; + slot->effective_catalog_xmin = feedbackCatalogXmin; + } + SpinLockRelease(&slot->mutex); + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + } +} + +/* + * Check that the provided xmin/epoch are sane, that is, not in the future + * and not so far back as to be already wrapped around. + * + * Epoch of nextXid should be same as standby, or if the counter has + * wrapped, then one greater than standby. + * + * This check doesn't care about whether clog exists for these xids + * at all. + */ +static bool +TransactionIdInRecentPast(TransactionId xid, uint32 epoch) +{ + FullTransactionId nextFullXid; + TransactionId nextXid; + uint32 nextEpoch; + + nextFullXid = ReadNextFullTransactionId(); + nextXid = XidFromFullTransactionId(nextFullXid); + nextEpoch = EpochFromFullTransactionId(nextFullXid); + + if (xid <= nextXid) + { + if (epoch != nextEpoch) + return false; + } + else + { + if (epoch + 1 != nextEpoch) + return false; + } + + if (!TransactionIdPrecedesOrEquals(xid, nextXid)) + return false; /* epoch OK, but it's wrapped around */ + + return true; +} + +/* + * Hot Standby feedback + */ +static void +ProcessStandbyHSFeedbackMessage(void) +{ + TransactionId feedbackXmin; + uint32 feedbackEpoch; + TransactionId feedbackCatalogXmin; + uint32 feedbackCatalogEpoch; + TimestampTz replyTime; + + /* + * Decipher the reply message. The caller already consumed the msgtype + * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation + * of this message. + */ + replyTime = pq_getmsgint64(&reply_message); + feedbackXmin = pq_getmsgint(&reply_message, 4); + feedbackEpoch = pq_getmsgint(&reply_message, 4); + feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); + feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); + + if (message_level_is_interesting(DEBUG2)) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s", + feedbackXmin, + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch, + replyTimeStr); + + pfree(replyTimeStr); + } + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + } + + /* + * Unset WalSender's xmins if the feedback message values are invalid. + * This happens when the downstream turned hot_standby_feedback off. + */ + if (!TransactionIdIsNormal(feedbackXmin) + && !TransactionIdIsNormal(feedbackCatalogXmin)) + { + MyProc->xmin = InvalidTransactionId; + if (MyReplicationSlot != NULL) + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); + return; + } + + /* + * Check that the provided xmin/epoch are sane, that is, not in the future + * and not so far back as to be already wrapped around. Ignore if not. + */ + if (TransactionIdIsNormal(feedbackXmin) && + !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch)) + return; + + if (TransactionIdIsNormal(feedbackCatalogXmin) && + !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch)) + return; + + /* + * Set the WalSender's xmin equal to the standby's requested xmin, so that + * the xmin will be taken into account by GetSnapshotData() / + * ComputeXidHorizons(). This will hold back the removal of dead rows and + * thereby prevent the generation of cleanup conflicts on the standby + * server. + * + * There is a small window for a race condition here: although we just + * checked that feedbackXmin precedes nextXid, the nextXid could have + * gotten advanced between our fetching it and applying the xmin below, + * perhaps far enough to make feedbackXmin wrap around. In that case the + * xmin we set here would be "in the future" and have no effect. No point + * in worrying about this since it's too late to save the desired data + * anyway. Assuming that the standby sends us an increasing sequence of + * xmins, this could only happen during the first reply cycle, else our + * own xmin would prevent nextXid from advancing so far. + * + * We don't bother taking the ProcArrayLock here. Setting the xmin field + * is assumed atomic, and there's no real need to prevent concurrent + * horizon determinations. (If we're moving our xmin forward, this is + * obviously safe, and if we're moving it backwards, well, the data is at + * risk already since a VACUUM could already have determined the horizon.) + * + * If we're using a replication slot we reserve the xmin via that, + * otherwise via the walsender's PGPROC entry. We can only track the + * catalog xmin separately when using a slot, so we store the least of the + * two provided when not using a slot. + * + * XXX: It might make sense to generalize the ephemeral slot concept and + * always use the slot mechanism to handle the feedback xmin. + */ + if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ + PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); + else + { + if (TransactionIdIsNormal(feedbackCatalogXmin) + && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin)) + MyProc->xmin = feedbackCatalogXmin; + else + MyProc->xmin = feedbackXmin; + } +} + +/* + * Compute how long send/receive loops should sleep. + * + * If wal_sender_timeout is enabled we want to wake up in time to send + * keepalives and to abort the connection if wal_sender_timeout has been + * reached. + */ +static long +WalSndComputeSleeptime(TimestampTz now) +{ + long sleeptime = 10000; /* 10 s */ + + if (wal_sender_timeout > 0 && last_reply_timestamp > 0) + { + TimestampTz wakeup_time; + + /* + * At the latest stop sleeping once wal_sender_timeout has been + * reached. + */ + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout); + + /* + * If no ping has been sent yet, wakeup when it's time to do so. + * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of + * the timeout passed without a response. + */ + if (!waiting_for_ping_response) + wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + + /* Compute relative time until wakeup. */ + sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time); + } + + return sleeptime; +} + +/* + * Check whether there have been responses by the client within + * wal_sender_timeout and shutdown if not. Using last_processing as the + * reference point avoids counting server-side stalls against the client. + * However, a long server-side stall can make WalSndKeepaliveIfNecessary() + * postdate last_processing by more than wal_sender_timeout. If that happens, + * the client must reply almost immediately to avoid a timeout. This rarely + * affects the default configuration, under which clients spontaneously send a + * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We + * could eliminate that problem by recognizing timeout expiration at + * wal_sender_timeout/2 after the keepalive. + */ +static void +WalSndCheckTimeOut(void) +{ + TimestampTz timeout; + + /* don't bail out if we're doing something that doesn't require timeouts */ + if (last_reply_timestamp <= 0) + return; + + timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout); + + if (wal_sender_timeout > 0 && last_processing >= timeout) + { + /* + * Since typically expiration of replication timeout means + * communication problem, we don't send the error message to the + * standby. + */ + ereport(COMMERROR, + (errmsg("terminating walsender process due to replication timeout"))); + + WalSndShutdown(); + } +} + +/* Main loop of walsender process that streams the WAL over Copy messages. */ +static void +WalSndLoop(WalSndSendDataCallback send_data) +{ + /* + * Initialize the last reply timestamp. That enables timeout processing + * from hereon. + */ + last_reply_timestamp = GetCurrentTimestamp(); + waiting_for_ping_response = false; + + /* + * Loop until we reach the end of this timeline or the client requests to + * stop streaming. + */ + for (;;) + { + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* + * If we have received CopyDone from the client, sent CopyDone + * ourselves, and the output buffer is empty, it's time to exit + * streaming. + */ + if (streamingDoneReceiving && streamingDoneSending && + !pq_is_send_pending()) + break; + + /* + * If we don't have any pending data in the output buffer, try to send + * some more. If there is some, we don't bother to call send_data + * again until we've flushed it ... but we'd better assume we are not + * caught up. + */ + if (!pq_is_send_pending()) + send_data(); + else + WalSndCaughtUp = false; + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* If nothing remains to be sent right now ... */ + if (WalSndCaughtUp && !pq_is_send_pending()) + { + /* + * If we're in catchup state, move to streaming. This is an + * important state change for users to know about, since before + * this point data loss might occur if the primary dies and we + * need to failover to the standby. The state change is also + * important for synchronous replication, since commits that + * started to wait at that point might wait for some time. + */ + if (MyWalSnd->state == WALSNDSTATE_CATCHUP) + { + ereport(DEBUG1, + (errmsg_internal("\"%s\" has now caught up with upstream server", + application_name))); + WalSndSetState(WALSNDSTATE_STREAMING); + } + + /* + * When SIGUSR2 arrives, we send any outstanding logs up to the + * shutdown checkpoint record (i.e., the latest record), wait for + * them to be replicated to the standby, and exit. This may be a + * normal termination at shutdown, or a promotion, the walsender + * is not sure which. + */ + if (got_SIGUSR2) + WalSndDone(send_data); + } + + /* Check for replication timeout. */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); + + /* + * Block if we have unsent data. XXX For logical replication, let + * WalSndWaitForWal() handle any other blocking; idle receivers need + * its additional actions. For physical replication, also block if + * caught up; its send_data does not block. + */ + if ((WalSndCaughtUp && send_data != XLogSendLogical && + !streamingDoneSending) || + pq_is_send_pending()) + { + long sleeptime; + int wakeEvents; + + if (!streamingDoneReceiving) + wakeEvents = WL_SOCKET_READABLE; + else + wakeEvents = 0; + + /* + * Use fresh timestamp, not last_processing, to reduce the chance + * of reaching wal_sender_timeout before sending a keepalive. + */ + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; + + /* Sleep until something happens or we time out */ + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); + } + } +} + +/* Initialize a per-walsender data structure for this walsender process */ +static void +InitWalSenderSlot(void) +{ + int i; + + /* + * WalSndCtl should be set up already (we inherit this by fork() or + * EXEC_BACKEND mechanism from the postmaster). + */ + Assert(WalSndCtl != NULL); + Assert(MyWalSnd == NULL); + + /* + * Find a free walsender slot and reserve it. This must not fail due to + * the prior check for free WAL senders in InitProcess(). + */ + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockAcquire(&walsnd->mutex); + + if (walsnd->pid != 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + else + { + /* + * Found a free slot. Reserve it for us. + */ + walsnd->pid = MyProcPid; + walsnd->state = WALSNDSTATE_STARTUP; + walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->needreload = false; + walsnd->write = InvalidXLogRecPtr; + walsnd->flush = InvalidXLogRecPtr; + walsnd->apply = InvalidXLogRecPtr; + walsnd->writeLag = -1; + walsnd->flushLag = -1; + walsnd->applyLag = -1; + walsnd->sync_standby_priority = 0; + walsnd->latch = &MyProc->procLatch; + walsnd->replyTime = 0; + SpinLockRelease(&walsnd->mutex); + /* don't need the lock anymore */ + MyWalSnd = (WalSnd *) walsnd; + + break; + } + } + + Assert(MyWalSnd != NULL); + + /* Arrange to clean up at walsender exit */ + on_shmem_exit(WalSndKill, 0); +} + +/* Destroy the per-walsender data structure for this walsender process */ +static void +WalSndKill(int code, Datum arg) +{ + WalSnd *walsnd = MyWalSnd; + + Assert(walsnd != NULL); + + MyWalSnd = NULL; + + SpinLockAcquire(&walsnd->mutex); + /* clear latch while holding the spinlock, so it can safely be read */ + walsnd->latch = NULL; + /* Mark WalSnd struct as no longer being in use. */ + walsnd->pid = 0; + SpinLockRelease(&walsnd->mutex); +} + +/* XLogReaderRoutine->segment_open callback */ +static void +WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + char path[MAXPGPATH]; + + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + *tli_p = sendTimeLine; + if (sendTimeLineIsHistoric) + { + XLogSegNo endSegNo; + + XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize); + if (nextSegNo == endSegNo) + *tli_p = sendTimeLineNextTLI; + } + + XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; + + /* + * If the file is not found, assume it's because the standby asked for a + * too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) + { + char xlogfname[MAXFNAMELEN]; + int save_errno = errno; + + XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + xlogfname))); + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); +} + +/* + * Send out the WAL in its normal physical/stored form. + * + * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, + * but not yet sent to the client, and buffer it in the libpq output + * buffer. + * + * If there is no unsent WAL remaining, WalSndCaughtUp is set to true, + * otherwise WalSndCaughtUp is set to false. + */ +static void +XLogSendPhysical(void) +{ + XLogRecPtr SendRqstPtr; + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; + XLogSegNo segno; + WALReadError errinfo; + + /* If requested switch the WAL sender to the stopping state. */ + if (got_STOPPING) + WalSndSetState(WALSNDSTATE_STOPPING); + + if (streamingDoneSending) + { + WalSndCaughtUp = true; + return; + } + + /* Figure out how far we can safely send the WAL. */ + if (sendTimeLineIsHistoric) + { + /* + * Streaming an old timeline that's in this server's history, but is + * not the one we're currently inserting or replaying. It can be + * streamed up to the point where we switched off that timeline. + */ + SendRqstPtr = sendTimeLineValidUpto; + } + else if (am_cascading_walsender) + { + /* + * Streaming the latest timeline on a standby. + * + * Attempt to send all WAL that has already been replayed, so that we + * know it's valid. If we're receiving WAL through streaming + * replication, it's also OK to send any WAL that has been received + * but not replayed. + * + * The timeline we're recovering from can change, or we can be + * promoted. In either case, the current timeline becomes historic. We + * need to detect that so that we don't try to stream past the point + * where we switched to another timeline. We check for promotion or + * timeline switch after calculating FlushPtr, to avoid a race + * condition: if the timeline becomes historic just after we checked + * that it was still current, it's still be OK to stream it up to the + * FlushPtr that was calculated before it became historic. + */ + bool becameHistoric = false; + + SendRqstPtr = GetStandbyFlushRecPtr(); + + if (!RecoveryInProgress()) + { + /* + * We have been promoted. RecoveryInProgress() updated + * ThisTimeLineID to the new current timeline. + */ + am_cascading_walsender = false; + becameHistoric = true; + } + else + { + /* + * Still a cascading standby. But is the timeline we're sending + * still the one recovery is recovering from? ThisTimeLineID was + * updated by the GetStandbyFlushRecPtr() call above. + */ + if (sendTimeLine != ThisTimeLineID) + becameHistoric = true; + } + + if (becameHistoric) + { + /* + * The timeline we were sending has become historic. Read the + * timeline history file of the new timeline to see where exactly + * we forked off from the timeline we were sending. + */ + List *history; + + history = readTimeLineHistory(ThisTimeLineID); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); + + Assert(sendTimeLine < sendTimeLineNextTLI); + list_free_deep(history); + + sendTimeLineIsHistoric = true; + + SendRqstPtr = sendTimeLineValidUpto; + } + } + else + { + /* + * Streaming the current timeline on a primary. + * + * Attempt to send all data that's already been written out and + * fsync'd to disk. We cannot go further than what's been written out + * given the current implementation of WALRead(). And in any case + * it's unsafe to send WAL that is not securely down to disk on the + * primary: if the primary subsequently crashes and restarts, standbys + * must not have applied any WAL that got lost on the primary. + */ + SendRqstPtr = GetFlushRecPtr(); + } + + /* + * Record the current system time as an approximation of the time at which + * this WAL location was written for the purposes of lag tracking. + * + * In theory we could make XLogFlush() record a time in shmem whenever WAL + * is flushed and we could get that time as well as the LSN when we call + * GetFlushRecPtr() above (and likewise for the cascading standby + * equivalent), but rather than putting any new code into the hot WAL path + * it seems good enough to capture the time here. We should reach this + * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that + * may take some time, we read the WAL flush pointer and take the time + * very close to together here so that we'll get a later position if it is + * still moving. + * + * Because LagTrackerWrite ignores samples when the LSN hasn't advanced, + * this gives us a cheap approximation for the WAL flush time for this + * LSN. + * + * Note that the LSN is not necessarily the LSN for the data contained in + * the present message; it's the end of the WAL, which might be further + * ahead. All the lag tracking machinery cares about is finding out when + * that arbitrary LSN is eventually reported as written, flushed and + * applied, so that it can measure the elapsed time. + */ + LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); + + /* + * If this is a historic timeline and we've reached the point where we + * forked to the next timeline, stop streaming. + * + * Note: We might already have sent WAL > sendTimeLineValidUpto. The + * startup process will normally replay all WAL that has been received + * from the primary, before promoting, but if the WAL streaming is + * terminated at a WAL page boundary, the valid portion of the timeline + * might end in the middle of a WAL record. We might've already sent the + * first half of that partial WAL record to the cascading standby, so that + * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't + * replay the partial WAL record either, so it can still follow our + * timeline switch. + */ + if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) + { + /* close the current file. */ + if (xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); + + /* Send CopyDone */ + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + + WalSndCaughtUp = true; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + LSN_FORMAT_ARGS(sendTimeLineValidUpto), + LSN_FORMAT_ARGS(sentPtr)); + return; + } + + /* Do we have any work to do? */ + Assert(sentPtr <= SendRqstPtr); + if (SendRqstPtr <= sentPtr) + { + WalSndCaughtUp = true; + return; + } + + /* + * Figure out how much to send in one message. If there's no more than + * MAX_SEND_SIZE bytes to send, send everything. Otherwise send + * MAX_SEND_SIZE bytes, but round back to logfile or page boundary. + * + * The rounding is not only for performance reasons. Walreceiver relies on + * the fact that we never split a WAL record across two messages. Since a + * long WAL record is split at page boundary into continuation records, + * page boundary is always a safe cut-off point. We also assume that + * SendRqstPtr never points to the middle of a WAL record. + */ + startptr = sentPtr; + endptr = startptr; + endptr += MAX_SEND_SIZE; + + /* if we went beyond SendRqstPtr, back off */ + if (SendRqstPtr <= endptr) + { + endptr = SendRqstPtr; + if (sendTimeLineIsHistoric) + WalSndCaughtUp = false; + else + WalSndCaughtUp = true; + } + else + { + /* round down to page boundary. */ + endptr -= (endptr % XLOG_BLCKSZ); + WalSndCaughtUp = false; + } + + nbytes = endptr - startptr; + Assert(nbytes <= MAX_SEND_SIZE); + + /* + * OK to read and send the slice. + */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 'w'); + + pq_sendint64(&output_message, startptr); /* dataStart */ + pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, 0); /* sendtime, filled in last */ + + /* + * Read the log directly into the output buffer to avoid extra memcpy + * calls. + */ + enlargeStringInfo(&output_message, nbytes); + +retry: + if (!WALRead(xlogreader, + &output_message.data[output_message.len], + startptr, + nbytes, + xlogreader->seg.ws_tli, /* Pass the current TLI because + * only WalSndSegmentOpen controls + * whether new TLI is needed. */ + &errinfo)) + WALReadRaiseError(&errinfo); + + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); + CheckXLogRemoved(segno, xlogreader->seg.ws_tli); + + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && xlogreader->seg.ws_file >= 0) + { + wal_segment_close(xlogreader); + + goto retry; + } + } + + output_message.len += nbytes; + output_message.data[output_message.len] = '\0'; + + /* + * Fill the send timestamp last, so that it is taken as late as possible. + */ + resetStringInfo(&tmpbuf); + pq_sendint64(&tmpbuf, GetCurrentTimestamp()); + memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], + tmpbuf.data, sizeof(int64)); + + pq_putmessage_noblock('d', output_message.data, output_message.len); + + sentPtr = endptr; + + /* Update shared memory status */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + + /* Report progress of XLOG streaming in PS display */ + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + LSN_FORMAT_ARGS(sentPtr)); + set_ps_display(activitymsg); + } +} + +/* + * Stream out logically decoded data. + */ +static void +XLogSendLogical(void) +{ + XLogRecord *record; + char *errm; + + /* + * We'll use the current flush point to determine whether we've caught up. + * This variable is static in order to cache it across calls. Caching is + * helpful because GetFlushRecPtr() needs to acquire a heavily-contended + * spinlock. + */ + static XLogRecPtr flushPtr = InvalidXLogRecPtr; + + /* + * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to + * true in WalSndWaitForWal, if we're actually waiting. We also set to + * true if XLogReadRecord() had to stop reading but WalSndWaitForWal + * didn't wait - i.e. when we're shutting down. + */ + WalSndCaughtUp = false; + + record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + + /* xlog record was invalid */ + if (errm != NULL) + elog(ERROR, "%s", errm); + + if (record != NULL) + { + /* + * Note the lack of any call to LagTrackerWrite() which is handled by + * WalSndUpdateProgress which is called by output plugin through + * logical decoding write api. + */ + LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); + + sentPtr = logical_decoding_ctx->reader->EndRecPtr; + } + + /* + * If first time through in this session, initialize flushPtr. Otherwise, + * we only need to update flushPtr if EndRecPtr is past it. + */ + if (flushPtr == InvalidXLogRecPtr) + flushPtr = GetFlushRecPtr(); + else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + flushPtr = GetFlushRecPtr(); + + /* If EndRecPtr is still past our flushPtr, it means we caught up. */ + if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + WalSndCaughtUp = true; + + /* + * If we're caught up and have been requested to stop, have WalSndLoop() + * terminate the connection in an orderly manner, after writing out all + * the pending data. + */ + if (WalSndCaughtUp && got_STOPPING) + got_SIGUSR2 = true; + + /* Update shared memory status */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } +} + +/* + * Shutdown if the sender is caught up. + * + * NB: This should only be called when the shutdown signal has been received + * from postmaster. + * + * Note that if we determine that there's still more data to send, this + * function will return control to the caller. + */ +static void +WalSndDone(WalSndSendDataCallback send_data) +{ + XLogRecPtr replicatedPtr; + + /* ... let's just be real sure we're caught up ... */ + send_data(); + + /* + * To figure out whether all WAL has successfully been replicated, check + * flush location if valid, write otherwise. Tools like pg_receivewal will + * usually (unless in synchronous mode) return an invalid flush location. + */ + replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? + MyWalSnd->write : MyWalSnd->flush; + + if (WalSndCaughtUp && sentPtr == replicatedPtr && + !pq_is_send_pending()) + { + QueryCompletion qc; + + /* Inform the standby that XLOG streaming is done */ + SetQueryCompletion(&qc, CMDTAG_COPY, 0); + EndCommand(&qc, DestRemote, false); + pq_flush(); + + proc_exit(0); + } + if (!waiting_for_ping_response) + WalSndKeepalive(true); +} + +/* + * Returns the latest point in WAL that has been safely flushed to disk, and + * can be sent to the standby. This should only be called when in recovery, + * ie. we're streaming to a cascaded standby. + * + * As a side-effect, ThisTimeLineID is updated to the TLI of the last + * replayed WAL record. + */ +static XLogRecPtr +GetStandbyFlushRecPtr(void) +{ + XLogRecPtr replayPtr; + TimeLineID replayTLI; + XLogRecPtr receivePtr; + TimeLineID receiveTLI; + XLogRecPtr result; + + /* + * We can safely send what's already been replayed. Also, if walreceiver + * is streaming WAL from the same timeline, we can send anything that it + * has streamed, but hasn't been replayed yet. + */ + + receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); + replayPtr = GetXLogReplayRecPtr(&replayTLI); + + ThisTimeLineID = replayTLI; + + result = replayPtr; + if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) + result = receivePtr; + + return result; +} + +/* + * Request walsenders to reload the currently-open WAL file + */ +void +WalSndRqstFileReload(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + walsnd->needreload = true; + SpinLockRelease(&walsnd->mutex); + } +} + +/* + * Handle PROCSIG_WALSND_INIT_STOPPING signal. + */ +void +HandleWalSndInitStopping(void) +{ + Assert(am_walsender); + + /* + * If replication has not yet started, die like with SIGTERM. If + * replication is active, only set a flag and wake up the main loop. It + * will send any outstanding WAL, wait for it to be replicated to the + * standby, and then exit gracefully. + */ + if (!replication_active) + kill(MyProcPid, SIGTERM); + else + got_STOPPING = true; +} + +/* + * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL + * sender should already have been switched to WALSNDSTATE_STOPPING at + * this point. + */ +static void +WalSndLastCycleHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGUSR2 = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* Set up signal handlers */ +void +WalSndSignals(void) +{ + /* Set up signal handlers */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, StatementCancelHandler); /* query cancel */ + pqsignal(SIGTERM, die); /* request shutdown */ + /* SIGQUIT handler was already set up by InitPostmasterChild */ + InitializeTimeouts(); /* establishes SIGALRM handler */ + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and + * shutdown */ + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); +} + +/* Report shared-memory space needed by WalSndShmemInit */ +Size +WalSndShmemSize(void) +{ + Size size = 0; + + size = offsetof(WalSndCtlData, walsnds); + size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd))); + + return size; +} + +/* Allocate and initialize walsender-related shared memory */ +void +WalSndShmemInit(void) +{ + bool found; + int i; + + WalSndCtl = (WalSndCtlData *) + ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); + + if (!found) + { + /* First time through, so initialize */ + MemSet(WalSndCtl, 0, WalSndShmemSize()); + + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockInit(&walsnd->mutex); + } + } +} + +/* + * Wake up all walsenders + * + * This will be called inside critical sections, so throwing an error is not + * advisable. + */ +void +WalSndWakeup(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + Latch *latch; + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* + * Get latch pointer with spinlock held, for the unlikely case that + * pointer reads aren't atomic (as they're 8 bytes). + */ + SpinLockAcquire(&walsnd->mutex); + latch = walsnd->latch; + SpinLockRelease(&walsnd->mutex); + + if (latch != NULL) + SetLatch(latch); + } +} + +/* + * Wait for readiness on the FeBe socket, or a timeout. The mask should be + * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit + * on postmaster death. + */ +static void +WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) +{ + WaitEvent event; + + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL); + if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 && + (event.events & WL_POSTMASTER_DEATH)) + proc_exit(1); +} + +/* + * Signal all walsenders to move to stopping state. + * + * This will trigger walsenders to move to a state where no further WAL can be + * generated. See this file's header for details. + */ +void +WalSndInitStopping(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + pid_t pid; + + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + SpinLockRelease(&walsnd->mutex); + + if (pid == 0) + continue; + + SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId); + } +} + +/* + * Wait that all the WAL senders have quit or reached the stopping state. This + * is used by the checkpointer to control when the shutdown checkpoint can + * safely be performed. + */ +void +WalSndWaitStopping(void) +{ + for (;;) + { + int i; + bool all_stopped = true; + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockAcquire(&walsnd->mutex); + + if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + + if (walsnd->state != WALSNDSTATE_STOPPING) + { + all_stopped = false; + SpinLockRelease(&walsnd->mutex); + break; + } + SpinLockRelease(&walsnd->mutex); + } + + /* safe to leave if confirmation is done for all WAL senders */ + if (all_stopped) + return; + + pg_usleep(10000L); /* wait for 10 msec */ + } +} + +/* Set state for current walsender (only called in walsender) */ +void +WalSndSetState(WalSndState state) +{ + WalSnd *walsnd = MyWalSnd; + + Assert(am_walsender); + + if (walsnd->state == state) + return; + + SpinLockAcquire(&walsnd->mutex); + walsnd->state = state; + SpinLockRelease(&walsnd->mutex); +} + +/* + * Return a string constant representing the state. This is used + * in system views, and should *not* be translated. + */ +static const char * +WalSndGetStateString(WalSndState state) +{ + switch (state) + { + case WALSNDSTATE_STARTUP: + return "startup"; + case WALSNDSTATE_BACKUP: + return "backup"; + case WALSNDSTATE_CATCHUP: + return "catchup"; + case WALSNDSTATE_STREAMING: + return "streaming"; + case WALSNDSTATE_STOPPING: + return "stopping"; + } + return "UNKNOWN"; +} + +static Interval * +offset_to_interval(TimeOffset offset) +{ + Interval *result = palloc(sizeof(Interval)); + + result->month = 0; + result->day = 0; + result->time = offset; + + return result; +} + +/* + * Returns activity of walsenders, including pids and xlog locations sent to + * standby servers. + */ +Datum +pg_stat_get_wal_senders(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_WAL_SENDERS_COLS 12 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + SyncRepStandbyData *sync_standbys; + int num_standbys; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + /* + * Get the currently active synchronous standbys. This could be out of + * date before we're done, but we'll use the data anyway. + */ + num_standbys = SyncRepGetCandidateStandbys(&sync_standbys); + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + XLogRecPtr sentPtr; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + TimeOffset writeLag; + TimeOffset flushLag; + TimeOffset applyLag; + int priority; + int pid; + WalSndState state; + TimestampTz replyTime; + bool is_sync_standby; + Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; + bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + int j; + + /* Collect data from shared memory */ + SpinLockAcquire(&walsnd->mutex); + if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + pid = walsnd->pid; + sentPtr = walsnd->sentPtr; + state = walsnd->state; + write = walsnd->write; + flush = walsnd->flush; + apply = walsnd->apply; + writeLag = walsnd->writeLag; + flushLag = walsnd->flushLag; + applyLag = walsnd->applyLag; + priority = walsnd->sync_standby_priority; + replyTime = walsnd->replyTime; + SpinLockRelease(&walsnd->mutex); + + /* + * Detect whether walsender is/was considered synchronous. We can + * provide some protection against stale data by checking the PID + * along with walsnd_index. + */ + is_sync_standby = false; + for (j = 0; j < num_standbys; j++) + { + if (sync_standbys[j].walsnd_index == i && + sync_standbys[j].pid == pid) + { + is_sync_standby = true; + break; + } + } + + memset(nulls, 0, sizeof(nulls)); + values[0] = Int32GetDatum(pid); + + if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS)) + { + /* + * Only superusers and members of pg_read_all_stats can see + * details. Other users only get the pid value to know it's a + * walsender, but no details. + */ + MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1); + } + else + { + values[1] = CStringGetTextDatum(WalSndGetStateString(state)); + + if (XLogRecPtrIsInvalid(sentPtr)) + nulls[2] = true; + values[2] = LSNGetDatum(sentPtr); + + if (XLogRecPtrIsInvalid(write)) + nulls[3] = true; + values[3] = LSNGetDatum(write); + + if (XLogRecPtrIsInvalid(flush)) + nulls[4] = true; + values[4] = LSNGetDatum(flush); + + if (XLogRecPtrIsInvalid(apply)) + nulls[5] = true; + values[5] = LSNGetDatum(apply); + + /* + * Treat a standby such as a pg_basebackup background process + * which always returns an invalid flush location, as an + * asynchronous standby. + */ + priority = XLogRecPtrIsInvalid(flush) ? 0 : priority; + + if (writeLag < 0) + nulls[6] = true; + else + values[6] = IntervalPGetDatum(offset_to_interval(writeLag)); + + if (flushLag < 0) + nulls[7] = true; + else + values[7] = IntervalPGetDatum(offset_to_interval(flushLag)); + + if (applyLag < 0) + nulls[8] = true; + else + values[8] = IntervalPGetDatum(offset_to_interval(applyLag)); + + values[9] = Int32GetDatum(priority); + + /* + * More easily understood version of standby state. This is purely + * informational. + * + * In quorum-based sync replication, the role of each standby + * listed in synchronous_standby_names can be changing very + * frequently. Any standbys considered as "sync" at one moment can + * be switched to "potential" ones at the next moment. So, it's + * basically useless to report "sync" or "potential" as their sync + * states. We report just "quorum" for them. + */ + if (priority == 0) + values[10] = CStringGetTextDatum("async"); + else if (is_sync_standby) + values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? + CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); + else + values[10] = CStringGetTextDatum("potential"); + + if (replyTime == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(replyTime); + } + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Send a keepalive message to standby. + * + * If requestReply is set, the message requests the other party to send + * a message back to us, for heartbeat purposes. We also set a flag to + * let nearby code that we're waiting for that response, to avoid + * repeated requests. + */ +static void +WalSndKeepalive(bool requestReply) +{ + elog(DEBUG2, "sending replication keepalive"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 'k'); + pq_sendint64(&output_message, sentPtr); + pq_sendint64(&output_message, GetCurrentTimestamp()); + pq_sendbyte(&output_message, requestReply ? 1 : 0); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock('d', output_message.data, output_message.len); + + /* Set local flag */ + if (requestReply) + waiting_for_ping_response = true; +} + +/* + * Send keepalive message if too much time has elapsed. + */ +static void +WalSndKeepaliveIfNecessary(void) +{ + TimestampTz ping_time; + + /* + * Don't send keepalive messages if timeouts are globally disabled or + * we're doing something not partaking in timeouts. + */ + if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) + return; + + if (waiting_for_ping_response) + return; + + /* + * If half of wal_sender_timeout has lapsed without receiving any reply + * from the standby, send a keep-alive message to the standby requesting + * an immediate reply. + */ + ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, + wal_sender_timeout / 2); + if (last_processing >= ping_time) + { + WalSndKeepalive(true); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } +} + +/* + * Record the end of the WAL and the time it was flushed locally, so that + * LagTrackerRead can compute the elapsed time (lag) when this WAL location is + * eventually reported to have been written, flushed and applied by the + * standby in a reply message. + */ +static void +LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) +{ + bool buffer_full; + int new_write_head; + int i; + + if (!am_walsender) + return; + + /* + * If the lsn hasn't advanced since last time, then do nothing. This way + * we only record a new sample when new WAL has been written. + */ + if (lag_tracker->last_lsn == lsn) + return; + lag_tracker->last_lsn = lsn; + + /* + * If advancing the write head of the circular buffer would crash into any + * of the read heads, then the buffer is full. In other words, the + * slowest reader (presumably apply) is the one that controls the release + * of space. + */ + new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; + buffer_full = false; + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) + { + if (new_write_head == lag_tracker->read_heads[i]) + buffer_full = true; + } + + /* + * If the buffer is full, for now we just rewind by one slot and overwrite + * the last sample, as a simple (if somewhat uneven) way to lower the + * sampling rate. There may be better adaptive compaction algorithms. + */ + if (buffer_full) + { + new_write_head = lag_tracker->write_head; + if (lag_tracker->write_head > 0) + lag_tracker->write_head--; + else + lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1; + } + + /* Store a sample at the current write head position. */ + lag_tracker->buffer[lag_tracker->write_head].lsn = lsn; + lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time; + lag_tracker->write_head = new_write_head; +} + +/* + * Find out how much time has elapsed between the moment WAL location 'lsn' + * (or the highest known earlier LSN) was flushed locally and the time 'now'. + * We have a separate read head for each of the reported LSN locations we + * receive in replies from standby; 'head' controls which read head is + * used. Whenever a read head crosses an LSN which was written into the + * lag buffer with LagTrackerWrite, we can use the associated timestamp to + * find out the time this LSN (or an earlier one) was flushed locally, and + * therefore compute the lag. + * + * Return -1 if no new sample data is available, and otherwise the elapsed + * time in microseconds. + */ +static TimeOffset +LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) +{ + TimestampTz time = 0; + + /* Read all unread samples up to this LSN or end of buffer. */ + while (lag_tracker->read_heads[head] != lag_tracker->write_head && + lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn) + { + time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; + lag_tracker->last_read[head] = + lag_tracker->buffer[lag_tracker->read_heads[head]]; + lag_tracker->read_heads[head] = + (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; + } + + /* + * If the lag tracker is empty, that means the standby has processed + * everything we've ever sent so we should now clear 'last_read'. If we + * didn't do that, we'd risk using a stale and irrelevant sample for + * interpolation at the beginning of the next burst of WAL after a period + * of idleness. + */ + if (lag_tracker->read_heads[head] == lag_tracker->write_head) + lag_tracker->last_read[head].time = 0; + + if (time > now) + { + /* If the clock somehow went backwards, treat as not found. */ + return -1; + } + else if (time == 0) + { + /* + * We didn't cross a time. If there is a future sample that we + * haven't reached yet, and we've already reached at least one sample, + * let's interpolate the local flushed time. This is mainly useful + * for reporting a completely stuck apply position as having + * increasing lag, since otherwise we'd have to wait for it to + * eventually start moving again and cross one of our samples before + * we can show the lag increasing. + */ + if (lag_tracker->read_heads[head] == lag_tracker->write_head) + { + /* There are no future samples, so we can't interpolate. */ + return -1; + } + else if (lag_tracker->last_read[head].time != 0) + { + /* We can interpolate between last_read and the next sample. */ + double fraction; + WalTimeSample prev = lag_tracker->last_read[head]; + WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]]; + + if (lsn < prev.lsn) + { + /* + * Reported LSNs shouldn't normally go backwards, but it's + * possible when there is a timeline change. Treat as not + * found. + */ + return -1; + } + + Assert(prev.lsn < next.lsn); + + if (prev.time > next.time) + { + /* If the clock somehow went backwards, treat as not found. */ + return -1; + } + + /* See how far we are between the previous and next samples. */ + fraction = + (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn); + + /* Scale the local flush time proportionally. */ + time = (TimestampTz) + ((double) prev.time + (next.time - prev.time) * fraction); + } + else + { + /* + * We have only a future sample, implying that we were entirely + * caught up but and now there is a new burst of WAL and the + * standby hasn't processed the first sample yet. Until the + * standby reaches the future sample the best we can do is report + * the hypothetical lag if that sample were to be replayed now. + */ + time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; + } + } + + /* Return the elapsed time since local flush time in microseconds. */ + Assert(time != 0); + return now - time; +} |