diff options
Diffstat (limited to '')
-rw-r--r-- | src/backend/replication/walreceiver.c | 1460 |
1 files changed, 1460 insertions, 0 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c new file mode 100644 index 0000000..4831a25 --- /dev/null +++ b/src/backend/replication/walreceiver.c @@ -0,0 +1,1460 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.c + * + * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It + * is the process in the standby server that takes charge of receiving + * XLOG records from a primary server during streaming replication. + * + * When the startup process determines that it's time to start streaming, + * it instructs postmaster to start walreceiver. Walreceiver first connects + * to the primary server (it will be served by a walsender process + * in the primary server), and then keeps receiving XLOG records and + * writing them to the disk as long as the connection is alive. As XLOG + * records are received and flushed to disk, it updates the + * WalRcv->flushedUpto variable in shared memory, to inform the startup + * process of how far it can proceed with XLOG replay. + * + * A WAL receiver cannot directly load GUC parameters used when establishing + * its connection to the primary. Instead it relies on parameter values + * that are passed down by the startup process when streaming is requested. + * This applies, for example, to the replication slot and the connection + * string to be used for the connection with the primary. + * + * If the primary server ends streaming, but doesn't disconnect, walreceiver + * goes into "waiting" mode, and waits for the startup process to give new + * instructions. The startup process will treat that the same as + * disconnection, and will rescan the archive/pg_wal directory. But when the + * startup process wants to try streaming replication again, it will just + * nudge the existing walreceiver process that's waiting, instead of launching + * a new one. + * + * Normal termination is by SIGTERM, which instructs the walreceiver to + * exit(0). Emergency termination is by SIGQUIT; like any postmaster child + * process, the walreceiver will simply abort and exit on SIGQUIT. A close + * of the connection and a FATAL error are treated not as a crash but as + * normal operation. + * + * This file contains the server-facing parts of walreceiver. The libpq- + * specific parts are in the libpqwalreceiver module. It's loaded + * dynamically to avoid linking the server with libpq. + * + * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/walreceiver.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/htup_details.h" +#include "access/timeline.h" +#include "access/transam.h" +#include "access/xlog_internal.h" +#include "access/xlogarchive.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_type.h" +#include "common/ip.h" +#include "funcapi.h" +#include "libpq/pqformat.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/pg_lsn.h" +#include "utils/ps_status.h" +#include "utils/resowner.h" +#include "utils/timestamp.h" + + +/* + * GUC variables. (Other variables that affect walreceiver are in xlog.c + * because they're passed down from the startup process, for better + * synchronization.) + */ +int wal_receiver_status_interval; +int wal_receiver_timeout; +bool hot_standby_feedback; + +/* libpqwalreceiver connection */ +static WalReceiverConn *wrconn = NULL; +WalReceiverFunctionsType *WalReceiverFunctions = NULL; + +#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ + +/* + * These variables are used similarly to openLogFile/SegNo, + * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID + * corresponding the filename of recvFile. + */ +static int recvFile = -1; +static TimeLineID recvFileTLI = 0; +static XLogSegNo recvSegNo = 0; + +/* + * LogstreamResult indicates the byte positions that we have already + * written/fsynced. + */ +static struct +{ + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; + +static StringInfoData reply_message; +static StringInfoData incoming_message; + +/* Prototypes for private functions */ +static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); +static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); +static void WalRcvDie(int code, Datum arg); +static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalRcvFlush(bool dying); +static void XLogWalRcvClose(XLogRecPtr recptr); +static void XLogWalRcvSendReply(bool force, bool requestReply); +static void XLogWalRcvSendHSFeedback(bool immed); +static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); + +/* + * Process any interrupts the walreceiver process may have received. + * This should be called any time the process's latch has become set. + * + * Currently, only SIGTERM is of interest. We can't just exit(1) within the + * SIGTERM signal handler, because the signal might arrive in the middle of + * some critical operation, like while we're holding a spinlock. Instead, the + * signal handler sets a flag variable as well as setting the process's latch. + * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the + * latch has become set. Operations that could block for a long time, such as + * reading from a remote server, must pay attention to the latch too; see + * libpqrcv_PQgetResult for example. + */ +void +ProcessWalRcvInterrupts(void) +{ + /* + * Although walreceiver interrupt handling doesn't use the same scheme as + * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive + * any incoming signals on Win32, and also to make sure we process any + * barrier events. + */ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating walreceiver process due to administrator command"))); + } +} + + +/* Main entry point for walreceiver process */ +void +WalReceiverMain(void) +{ + char conninfo[MAXCONNINFO]; + char *tmp_conninfo; + char slotname[NAMEDATALEN]; + bool is_temp_slot; + XLogRecPtr startpoint; + TimeLineID startpointTLI; + TimeLineID primaryTLI; + bool first_stream; + WalRcvData *walrcv = WalRcv; + TimestampTz last_recv_timestamp; + TimestampTz now; + bool ping_sent; + char *err; + char *sender_host = NULL; + int sender_port = 0; + + /* + * WalRcv should be set up already (if we are a backend, we inherit this + * by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + Assert(walrcv != NULL); + + now = GetCurrentTimestamp(); + + /* + * Mark walreceiver as running in shared memory. + * + * Do this as early as possible, so that if we fail later on, we'll set + * state to STOPPED. If we die before this, the startup process will keep + * waiting for us to start up, until it times out. + */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + switch (walrcv->walRcvState) + { + case WALRCV_STOPPING: + /* If we've already been requested to stop, don't start up. */ + walrcv->walRcvState = WALRCV_STOPPED; + /* fall through */ + + case WALRCV_STOPPED: + SpinLockRelease(&walrcv->mutex); + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + proc_exit(1); + break; + + case WALRCV_STARTING: + /* The usual case */ + break; + + case WALRCV_WAITING: + case WALRCV_STREAMING: + case WALRCV_RESTARTING: + default: + /* Shouldn't happen */ + SpinLockRelease(&walrcv->mutex); + elog(PANIC, "walreceiver still running according to shared memory state"); + } + /* Advertise our PID so that the startup process can kill us */ + walrcv->pid = MyProcPid; + walrcv->walRcvState = WALRCV_STREAMING; + + /* Fetch information required to start streaming */ + walrcv->ready_to_display = false; + strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); + is_temp_slot = walrcv->is_temp_slot; + startpoint = walrcv->receiveStart; + startpointTLI = walrcv->receiveStartTLI; + + /* + * At most one of is_temp_slot and slotname can be set; otherwise, + * RequestXLogStreaming messed up. + */ + Assert(!is_temp_slot || (slotname[0] == '\0')); + + /* Initialise to a sanish value */ + walrcv->lastMsgSendTime = + walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; + + /* Report the latch to use to awaken this process */ + walrcv->latch = &MyProc->procLatch; + + SpinLockRelease(&walrcv->mutex); + + pg_atomic_write_u64(&WalRcv->writtenUpto, 0); + + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvDie, 0); + + /* Properly accept or ignore signals the postmaster might send us */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config + * file */ + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */ + /* SIGQUIT handler was already set up by InitPostmasterChild */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (WalReceiverFunctions == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); + + /* Establish the connection to the primary for XLOG streaming */ + wrconn = walrcv_connect(conninfo, false, + cluster_name[0] ? cluster_name : "walreceiver", + &err); + if (!wrconn) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err))); + + /* + * Save user-visible connection string. This clobbers the original + * conninfo, for security. Also save host and port of the sender server + * this walreceiver is connected to. + */ + tmp_conninfo = walrcv_get_conninfo(wrconn); + walrcv_get_senderinfo(wrconn, &sender_host, &sender_port); + SpinLockAcquire(&walrcv->mutex); + memset(walrcv->conninfo, 0, MAXCONNINFO); + if (tmp_conninfo) + strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO); + + memset(walrcv->sender_host, 0, NI_MAXHOST); + if (sender_host) + strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST); + + walrcv->sender_port = sender_port; + walrcv->ready_to_display = true; + SpinLockRelease(&walrcv->mutex); + + if (tmp_conninfo) + pfree(tmp_conninfo); + + if (sender_host) + pfree(sender_host); + + first_stream = true; + for (;;) + { + char *primary_sysid; + char standby_sysid[32]; + WalRcvStreamOptions options; + + /* + * Check that we're connected to a valid server using the + * IDENTIFY_SYSTEM replication command. + */ + primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); + + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("database system identifier differs between the primary and standby"), + errdetail("The primary's identifier is %s, the standby's identifier is %s.", + primary_sysid, standby_sysid))); + } + + /* + * Confirm that the current timeline of the primary is the same or + * ahead of ours. + */ + if (primaryTLI < startpointTLI) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("highest timeline %u of the primary is behind recovery timeline %u", + primaryTLI, startpointTLI))); + + /* + * Get any missing history files. We do this always, even when we're + * not interested in that timeline, so that if we're promoted to + * become the primary later on, we don't select the same timeline that + * was already used in the current primary. This isn't bullet-proof - + * you'll need some external software to manage your cluster if you + * need to ensure that a unique timeline id is chosen in every case, + * but let's avoid the confusion of timeline id collisions where we + * can. + */ + WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + + /* + * Create temporary replication slot if requested, and update slot + * name in shared memory. (Note the slot name cannot already be set + * in this case.) + */ + if (is_temp_slot) + { + snprintf(slotname, sizeof(slotname), + "pg_walreceiver_%lld", + (long long int) walrcv_get_backend_pid(wrconn)); + + walrcv_create_slot(wrconn, slotname, true, 0, NULL); + + SpinLockAcquire(&walrcv->mutex); + strlcpy(walrcv->slotname, slotname, NAMEDATALEN); + SpinLockRelease(&walrcv->mutex); + } + + /* + * Start streaming. + * + * We'll try to start at the requested starting point and timeline, + * even if it's different from the server's latest timeline. In case + * we've already reached the end of the old timeline, the server will + * finish the streaming immediately, and we will go back to await + * orders from the startup process. If recovery_target_timeline is + * 'latest', the startup process will scan pg_wal and find the new + * history file, bump recovery target timeline, and ask us to restart + * on the new timeline. + */ + options.logical = false; + options.startpoint = startpoint; + options.slotname = slotname[0] != '\0' ? slotname : NULL; + options.proto.physical.startpointTLI = startpointTLI; + ThisTimeLineID = startpointTLI; + if (walrcv_startstreaming(wrconn, &options)) + { + if (first_stream) + ereport(LOG, + (errmsg("started streaming WAL from primary at %X/%X on timeline %u", + LSN_FORMAT_ARGS(startpoint), startpointTLI))); + else + ereport(LOG, + (errmsg("restarted WAL streaming at %X/%X on timeline %u", + LSN_FORMAT_ARGS(startpoint), startpointTLI))); + first_stream = false; + + /* Initialize LogstreamResult and buffers for processing messages */ + LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); + initStringInfo(&reply_message); + initStringInfo(&incoming_message); + + /* Initialize the last recv timestamp */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + + /* Loop until end-of-streaming or error */ + for (;;) + { + char *buf; + int len; + bool endofwal = false; + pgsocket wait_fd = PGINVALID_SOCKET; + int rc; + + /* + * Exit walreceiver if we're not in recovery. This should not + * happen, but cross-check the status here. + */ + if (!RecoveryInProgress()) + ereport(FATAL, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot continue WAL streaming, recovery has already ended"))); + + /* Process any requests or signals received recently */ + ProcessWalRcvInterrupts(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + XLogWalRcvSendHSFeedback(true); + } + + /* See if we can read data immediately */ + len = walrcv_receive(wrconn, &buf, &wait_fd); + if (len != 0) + { + /* + * Process the received data, and any subsequent data we + * can read without blocking. + */ + for (;;) + { + if (len > 0) + { + /* + * Something was received from primary, so reset + * timeout + */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); + } + else if (len == 0) + break; + else if (len < 0) + { + ereport(LOG, + (errmsg("replication terminated by primary server"), + errdetail("End of WAL reached on timeline %u at %X/%X.", + startpointTLI, + LSN_FORMAT_ARGS(LogstreamResult.Write)))); + endofwal = true; + break; + } + len = walrcv_receive(wrconn, &buf, &wait_fd); + } + + /* Let the primary know that we received some data. */ + XLogWalRcvSendReply(false, false); + + /* + * If we've written some records, flush them to disk and + * let the startup process and primary server know about + * them. + */ + XLogWalRcvFlush(false); + } + + /* Check if we need to exit the streaming loop. */ + if (endofwal) + break; + + /* + * Ideally we would reuse a WaitEventSet object repeatedly + * here to avoid the overheads of WaitLatchOrSocket on epoll + * systems, but we can't be sure that libpq (or any other + * walreceiver implementation) has the same socket (even if + * the fd is the same number, it may have been closed and + * reopened since the last time). In future, if there is a + * function for removing sockets from WaitEventSet, then we + * could add and remove just the socket each time, potentially + * avoiding some system calls. + */ + Assert(wait_fd != PGINVALID_SOCKET); + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_LATCH_SET, + wait_fd, + NAPTIME_PER_CYCLE, + WAIT_EVENT_WAL_RECEIVER_MAIN); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + ProcessWalRcvInterrupts(); + + if (walrcv->force_reply) + { + /* + * The recovery process has asked us to send apply + * feedback now. Make sure the flag is really set to + * false in shared memory before sending the reply, so + * we don't miss a new request for a reply. + */ + walrcv->force_reply = false; + pg_memory_barrier(); + XLogWalRcvSendReply(true, false); + } + } + if (rc & WL_TIMEOUT) + { + /* + * We didn't receive anything new. If we haven't heard + * anything from the server for more than + * wal_receiver_timeout / 2, ping the server. Also, if + * it's been longer than wal_receiver_status_interval + * since the last update we sent, send a status update to + * the primary anyway, to report any progress in applying + * WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from primary has + * reached the configured limit. + */ + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = + TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating walreceiver due to timeout"))); + + /* + * We didn't receive anything new, for half of + * receiver replication timeout. Ping the server. + */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout / 2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + XLogWalRcvSendReply(requestReply, requestReply); + XLogWalRcvSendHSFeedback(false); + } + } + + /* + * The backend finished streaming. Exit streaming COPY-mode from + * our side, too. + */ + walrcv_endstreaming(wrconn, &primaryTLI); + + /* + * If the server had switched to a new timeline that we didn't + * know about when we began streaming, fetch its timeline history + * file now. + */ + WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); + } + else + ereport(LOG, + (errmsg("primary server contains no more WAL on requested timeline %u", + startpointTLI))); + + /* + * End of WAL reached on the requested timeline. Close the last + * segment, and await for new orders from the startup process. + */ + if (recvFile >= 0) + { + char xlogfname[MAXFNAMELEN]; + + XLogWalRcvFlush(false); + XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + + /* + * Create .done file forcibly to prevent the streamed segment from + * being archived later. + */ + if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) + XLogArchiveForceDone(xlogfname); + else + XLogArchiveNotify(xlogfname); + } + recvFile = -1; + + elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); + WalRcvWaitForStartPosition(&startpoint, &startpointTLI); + } + /* not reached */ +} + +/* + * Wait for startup process to set receiveStart and receiveStartTLI. + */ +static void +WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) +{ + WalRcvData *walrcv = WalRcv; + int state; + + SpinLockAcquire(&walrcv->mutex); + state = walrcv->walRcvState; + if (state != WALRCV_STREAMING) + { + SpinLockRelease(&walrcv->mutex); + if (state == WALRCV_STOPPING) + proc_exit(0); + else + elog(FATAL, "unexpected walreceiver state"); + } + walrcv->walRcvState = WALRCV_WAITING; + walrcv->receiveStart = InvalidXLogRecPtr; + walrcv->receiveStartTLI = 0; + SpinLockRelease(&walrcv->mutex); + + set_ps_display("idle"); + + /* + * nudge startup process to notice that we've stopped streaming and are + * now waiting for instructions. + */ + WakeupRecovery(); + for (;;) + { + ResetLatch(MyLatch); + + ProcessWalRcvInterrupts(); + + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_RESTARTING || + walrcv->walRcvState == WALRCV_WAITING || + walrcv->walRcvState == WALRCV_STOPPING); + if (walrcv->walRcvState == WALRCV_RESTARTING) + { + /* + * No need to handle changes in primary_conninfo or + * primary_slotname here. Startup process will signal us to + * terminate in case those change. + */ + *startpoint = walrcv->receiveStart; + *startpointTLI = walrcv->receiveStartTLI; + walrcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&walrcv->mutex); + break; + } + if (walrcv->walRcvState == WALRCV_STOPPING) + { + /* + * We should've received SIGTERM if the startup process wants us + * to die, but might as well check it here too. + */ + SpinLockRelease(&walrcv->mutex); + exit(1); + } + SpinLockRelease(&walrcv->mutex); + + (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, + WAIT_EVENT_WAL_RECEIVER_WAIT_START); + } + + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X", + LSN_FORMAT_ARGS(*startpoint)); + set_ps_display(activitymsg); + } +} + +/* + * Fetch any missing timeline history files between 'first' and 'last' + * (inclusive) from the server. + */ +static void +WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) +{ + TimeLineID tli; + + for (tli = first; tli <= last; tli++) + { + /* there's no history file for timeline 1 */ + if (tli != 1 && !existsTimeLineHistory(tli)) + { + char *fname; + char *content; + int len; + char expectedfname[MAXFNAMELEN]; + + ereport(LOG, + (errmsg("fetching timeline history file for timeline %u from primary server", + tli))); + + walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); + + /* + * Check that the filename on the primary matches what we + * calculated ourselves. This is just a sanity check, it should + * always match. + */ + TLHistoryFileName(expectedfname, tli); + if (strcmp(fname, expectedfname) != 0) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u", + tli))); + + /* + * Write the file to pg_wal. + */ + writeTimeLineHistoryFile(tli, content, len); + + /* + * Mark the streamed history file as ready for archiving if + * archive_mode is always. + */ + if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) + XLogArchiveForceDone(fname); + else + XLogArchiveNotify(fname); + + pfree(fname); + pfree(content); + } + } +} + +/* + * Mark us as STOPPED in shared memory at exit. + */ +static void +WalRcvDie(int code, Datum arg) +{ + WalRcvData *walrcv = WalRcv; + + /* Ensure that all WAL records received are flushed to disk */ + XLogWalRcvFlush(true); + + /* Mark ourselves inactive in shared memory */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_RESTARTING || + walrcv->walRcvState == WALRCV_STARTING || + walrcv->walRcvState == WALRCV_WAITING || + walrcv->walRcvState == WALRCV_STOPPING); + Assert(walrcv->pid == MyProcPid); + walrcv->walRcvState = WALRCV_STOPPED; + walrcv->pid = 0; + walrcv->ready_to_display = false; + walrcv->latch = NULL; + SpinLockRelease(&walrcv->mutex); + + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + + /* Terminate the connection gracefully. */ + if (wrconn != NULL) + walrcv_disconnect(wrconn); + + /* Wake up the startup process to notice promptly that we're gone */ + WakeupRecovery(); +} + +/* + * Accept the message from XLOG stream, and process it. + */ +static void +XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) +{ + int hdrlen; + XLogRecPtr dataStart; + XLogRecPtr walEnd; + TimestampTz sendTime; + bool replyRequested; + + resetStringInfo(&incoming_message); + + switch (type) + { + case 'w': /* WAL records */ + { + /* copy message to StringInfo */ + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); + if (len < hdrlen) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid WAL message received from primary"))); + appendBinaryStringInfo(&incoming_message, buf, hdrlen); + + /* read the fields */ + dataStart = pq_getmsgint64(&incoming_message); + walEnd = pq_getmsgint64(&incoming_message); + sendTime = pq_getmsgint64(&incoming_message); + ProcessWalSndrMessage(walEnd, sendTime); + + buf += hdrlen; + len -= hdrlen; + XLogWalRcvWrite(buf, len, dataStart); + break; + } + case 'k': /* Keepalive */ + { + /* copy message to StringInfo */ + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + if (len != hdrlen) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid keepalive message received from primary"))); + appendBinaryStringInfo(&incoming_message, buf, hdrlen); + + /* read the fields */ + walEnd = pq_getmsgint64(&incoming_message); + sendTime = pq_getmsgint64(&incoming_message); + replyRequested = pq_getmsgbyte(&incoming_message); + + ProcessWalSndrMessage(walEnd, sendTime); + + /* If the primary requested a reply, send one immediately */ + if (replyRequested) + XLogWalRcvSendReply(true, false); + break; + } + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid replication message type %d", + type))); + } +} + +/* + * Write XLOG data to disk. + */ +static void +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) +{ + int startoff; + int byteswritten; + + while (nbytes > 0) + { + int segbytes; + + /* Close the current segment if it's completed */ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + XLogWalRcvClose(recptr); + + if (recvFile < 0) + { + bool use_existent = true; + + /* Create/use new log file */ + XLByteToSeg(recptr, recvSegNo, wal_segment_size); + recvFile = XLogFileInit(recvSegNo, &use_existent, true); + recvFileTLI = ThisTimeLineID; + } + + /* Calculate the start offset of the received logs */ + startoff = XLogSegmentOffset(recptr, wal_segment_size); + + if (startoff + nbytes > wal_segment_size) + segbytes = wal_segment_size - startoff; + else + segbytes = nbytes; + + /* OK to write the logs */ + errno = 0; + + byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); + if (byteswritten <= 0) + { + char xlogfname[MAXFNAMELEN]; + int save_errno; + + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + + save_errno = errno; + XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); + errno = save_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log segment %s " + "at offset %u, length %lu: %m", + xlogfname, startoff, (unsigned long) segbytes))); + } + + /* Update state for write */ + recptr += byteswritten; + + nbytes -= byteswritten; + buf += byteswritten; + + LogstreamResult.Write = recptr; + } + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + + /* + * Close the current segment if it's fully written up in the last cycle of + * the loop, to create its archive notification file soon. Otherwise WAL + * archiving of the segment will be delayed until any data in the next + * segment is received and written. + */ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + XLogWalRcvClose(recptr); +} + +/* + * Flush the log to disk. + * + * If we're in the midst of dying, it's unwise to do anything that might throw + * an error, so we skip sending a reply in that case. + */ +static void +XLogWalRcvFlush(bool dying) +{ + if (LogstreamResult.Flush < LogstreamResult.Write) + { + WalRcvData *walrcv = WalRcv; + + issue_xlog_fsync(recvFile, recvSegNo); + + LogstreamResult.Flush = LogstreamResult.Write; + + /* Update shared-memory status */ + SpinLockAcquire(&walrcv->mutex); + if (walrcv->flushedUpto < LogstreamResult.Flush) + { + walrcv->latestChunkStart = walrcv->flushedUpto; + walrcv->flushedUpto = LogstreamResult.Flush; + walrcv->receivedTLI = ThisTimeLineID; + } + SpinLockRelease(&walrcv->mutex); + + /* Signal the startup process and walsender that new WAL has arrived */ + WakeupRecovery(); + if (AllowCascadeReplication()) + WalSndWakeup(); + + /* Report XLOG streaming progress in PS display */ + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + LSN_FORMAT_ARGS(LogstreamResult.Write)); + set_ps_display(activitymsg); + } + + /* Also let the primary know that we made some progress */ + if (!dying) + { + XLogWalRcvSendReply(false, false); + XLogWalRcvSendHSFeedback(false); + } + } +} + +/* + * Close the current segment. + * + * Flush the segment to disk before closing it. Otherwise we have to + * reopen and fsync it later. + * + * Create an archive notification file since the segment is known completed. + */ +static void +XLogWalRcvClose(XLogRecPtr recptr) +{ + char xlogfname[MAXFNAMELEN]; + + Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)); + + /* + * fsync() and close current file before we switch to next one. We would + * otherwise have to reopen this file to fsync it later + */ + XLogWalRcvFlush(false); + + XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); + + /* + * XLOG segment files will be re-read by recovery in startup process soon, + * so we don't advise the OS to release cache pages associated with the + * file like XLogFileClose() does. + */ + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + + /* + * Create .done file forcibly to prevent the streamed segment from being + * archived later. + */ + if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) + XLogArchiveForceDone(xlogfname); + else + XLogArchiveNotify(xlogfname); + + recvFile = -1; +} + +/* + * Send reply message to primary, indicating our current WAL locations, oldest + * xmin and the current time. + * + * If 'force' is not set, the message is only sent if enough time has + * passed since last status update to reach wal_receiver_status_interval. + * If wal_receiver_status_interval is disabled altogether and 'force' is + * false, this is a no-op. + * + * If 'requestReply' is true, requests the server to reply immediately upon + * receiving this message. This is used for heartbeats, when approaching + * wal_receiver_timeout. + */ +static void +XLogWalRcvSendReply(bool force, bool requestReply) +{ + static XLogRecPtr writePtr = 0; + static XLogRecPtr flushPtr = 0; + XLogRecPtr applyPtr; + static TimestampTz sendTime = 0; + TimestampTz now; + + /* + * If the user doesn't want status to be reported to the primary, be sure + * to exit before doing anything at all. + */ + if (!force && wal_receiver_status_interval <= 0) + return; + + /* Get current timestamp. */ + now = GetCurrentTimestamp(); + + /* + * We can compare the write and flush positions to the last message we + * sent without taking any lock, but the apply position requires a spin + * lock, so we don't check that unless something else has changed or 10 + * seconds have passed. This means that the apply WAL location will + * appear, from the primary's point of view, to lag slightly, but since + * this is only for reporting purposes and only on idle systems, that's + * probably OK. + */ + if (!force + && writePtr == LogstreamResult.Write + && flushPtr == LogstreamResult.Flush + && !TimestampDifferenceExceeds(sendTime, now, + wal_receiver_status_interval * 1000)) + return; + sendTime = now; + + /* Construct a new message */ + writePtr = LogstreamResult.Write; + flushPtr = LogstreamResult.Flush; + applyPtr = GetXLogReplayRecPtr(NULL); + + resetStringInfo(&reply_message); + pq_sendbyte(&reply_message, 'r'); + pq_sendint64(&reply_message, writePtr); + pq_sendint64(&reply_message, flushPtr); + pq_sendint64(&reply_message, applyPtr); + pq_sendint64(&reply_message, GetCurrentTimestamp()); + pq_sendbyte(&reply_message, requestReply ? 1 : 0); + + /* Send it */ + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", + LSN_FORMAT_ARGS(writePtr), + LSN_FORMAT_ARGS(flushPtr), + LSN_FORMAT_ARGS(applyPtr), + requestReply ? " (reply requested)" : ""); + + walrcv_send(wrconn, reply_message.data, reply_message.len); +} + +/* + * Send hot standby feedback message to primary, plus the current time, + * in case they don't have a watch. + * + * If the user disables feedback, send one final message to tell sender + * to forget about the xmin on this standby. We also send this message + * on first connect because a previous connection might have set xmin + * on a replication slot. (If we're not using a slot it's harmless to + * send a feedback message explicitly setting InvalidTransactionId). + */ +static void +XLogWalRcvSendHSFeedback(bool immed) +{ + TimestampTz now; + FullTransactionId nextFullXid; + TransactionId nextXid; + uint32 xmin_epoch, + catalog_xmin_epoch; + TransactionId xmin, + catalog_xmin; + static TimestampTz sendTime = 0; + + /* initially true so we always send at least one feedback message */ + static bool primary_has_standby_xmin = true; + + /* + * If the user doesn't want status to be reported to the primary, be sure + * to exit before doing anything at all. + */ + if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && + !primary_has_standby_xmin) + return; + + /* Get current timestamp. */ + now = GetCurrentTimestamp(); + + if (!immed) + { + /* + * Send feedback at most once per wal_receiver_status_interval. + */ + if (!TimestampDifferenceExceeds(sendTime, now, + wal_receiver_status_interval * 1000)) + return; + sendTime = now; + } + + /* + * If Hot Standby is not yet accepting connections there is nothing to + * send. Check this after the interval has expired to reduce number of + * calls. + * + * Bailing out here also ensures that we don't send feedback until we've + * read our own replication slot state, so we don't tell the primary to + * discard needed xmin or catalog_xmin from any slots that may exist on + * this replica. + */ + if (!HotStandbyActive()) + return; + + /* + * Make the expensive call to get the oldest xmin once we are certain + * everything else has been checked. + */ + if (hot_standby_feedback) + { + GetReplicationHorizons(&xmin, &catalog_xmin); + } + else + { + xmin = InvalidTransactionId; + catalog_xmin = InvalidTransactionId; + } + + /* + * Get epoch and adjust if nextXid and oldestXmin are different sides of + * the epoch boundary. + */ + nextFullXid = ReadNextFullTransactionId(); + nextXid = XidFromFullTransactionId(nextFullXid); + xmin_epoch = EpochFromFullTransactionId(nextFullXid); + catalog_xmin_epoch = xmin_epoch; + if (nextXid < xmin) + xmin_epoch--; + if (nextXid < catalog_xmin) + catalog_xmin_epoch--; + + elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", + xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); + + /* Construct the message and send it. */ + resetStringInfo(&reply_message); + pq_sendbyte(&reply_message, 'h'); + pq_sendint64(&reply_message, GetCurrentTimestamp()); + pq_sendint32(&reply_message, xmin); + pq_sendint32(&reply_message, xmin_epoch); + pq_sendint32(&reply_message, catalog_xmin); + pq_sendint32(&reply_message, catalog_xmin_epoch); + walrcv_send(wrconn, reply_message.data, reply_message.len); + if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) + primary_has_standby_xmin = true; + else + primary_has_standby_xmin = false; +} + +/* + * Update shared memory status upon receiving a message from primary. + * + * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest + * message, reported by primary. + */ +static void +ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) +{ + WalRcvData *walrcv = WalRcv; + + TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); + + /* Update shared-memory status */ + SpinLockAcquire(&walrcv->mutex); + if (walrcv->latestWalEnd < walEnd) + walrcv->latestWalEndTime = sendTime; + walrcv->latestWalEnd = walEnd; + walrcv->lastMsgSendTime = sendTime; + walrcv->lastMsgReceiptTime = lastMsgReceiptTime; + SpinLockRelease(&walrcv->mutex); + + if (message_level_is_interesting(DEBUG2)) + { + char *sendtime; + char *receipttime; + int applyDelay; + + /* Copy because timestamptz_to_str returns a static buffer */ + sendtime = pstrdup(timestamptz_to_str(sendTime)); + receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime)); + applyDelay = GetReplicationApplyDelay(); + + /* apply delay is not available */ + if (applyDelay == -1) + elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms", + sendtime, + receipttime, + GetReplicationTransferLatency()); + else + elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms", + sendtime, + receipttime, + applyDelay, + GetReplicationTransferLatency()); + + pfree(sendtime); + pfree(receipttime); + } +} + +/* + * Wake up the walreceiver main loop. + * + * This is called by the startup process whenever interesting xlog records + * are applied, so that walreceiver can check if it needs to send an apply + * notification back to the primary which may be waiting in a COMMIT with + * synchronous_commit = remote_apply. + */ +void +WalRcvForceReply(void) +{ + Latch *latch; + + WalRcv->force_reply = true; + /* fetching the latch pointer might not be atomic, so use spinlock */ + SpinLockAcquire(&WalRcv->mutex); + latch = WalRcv->latch; + SpinLockRelease(&WalRcv->mutex); + if (latch) + SetLatch(latch); +} + +/* + * Return a string constant representing the state. This is used + * in system functions and views, and should *not* be translated. + */ +static const char * +WalRcvGetStateString(WalRcvState state) +{ + switch (state) + { + case WALRCV_STOPPED: + return "stopped"; + case WALRCV_STARTING: + return "starting"; + case WALRCV_STREAMING: + return "streaming"; + case WALRCV_WAITING: + return "waiting"; + case WALRCV_RESTARTING: + return "restarting"; + case WALRCV_STOPPING: + return "stopping"; + } + return "UNKNOWN"; +} + +/* + * Returns activity of WAL receiver, including pid, state and xlog locations + * received from the WAL sender of another server. + */ +Datum +pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) +{ + TupleDesc tupdesc; + Datum *values; + bool *nulls; + int pid; + bool ready_to_display; + WalRcvState state; + XLogRecPtr receive_start_lsn; + TimeLineID receive_start_tli; + XLogRecPtr written_lsn; + XLogRecPtr flushed_lsn; + TimeLineID received_tli; + TimestampTz last_send_time; + TimestampTz last_receipt_time; + XLogRecPtr latest_end_lsn; + TimestampTz latest_end_time; + char sender_host[NI_MAXHOST]; + int sender_port = 0; + char slotname[NAMEDATALEN]; + char conninfo[MAXCONNINFO]; + + /* Take a lock to ensure value consistency */ + SpinLockAcquire(&WalRcv->mutex); + pid = (int) WalRcv->pid; + ready_to_display = WalRcv->ready_to_display; + state = WalRcv->walRcvState; + receive_start_lsn = WalRcv->receiveStart; + receive_start_tli = WalRcv->receiveStartTLI; + flushed_lsn = WalRcv->flushedUpto; + received_tli = WalRcv->receivedTLI; + last_send_time = WalRcv->lastMsgSendTime; + last_receipt_time = WalRcv->lastMsgReceiptTime; + latest_end_lsn = WalRcv->latestWalEnd; + latest_end_time = WalRcv->latestWalEndTime; + strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname)); + strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host)); + sender_port = WalRcv->sender_port; + strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo)); + SpinLockRelease(&WalRcv->mutex); + + /* + * No WAL receiver (or not ready yet), just return a tuple with NULL + * values + */ + if (pid == 0 || !ready_to_display) + PG_RETURN_NULL(); + + /* + * Read "writtenUpto" without holding a spinlock. Note that it may not be + * consistent with the other shared variables of the WAL receiver + * protected by a spinlock, but this should not be used for data integrity + * checks. + */ + written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto); + + /* determine result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + values = palloc0(sizeof(Datum) * tupdesc->natts); + nulls = palloc0(sizeof(bool) * tupdesc->natts); + + /* Fetch values */ + values[0] = Int32GetDatum(pid); + + if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS)) + { + /* + * Only superusers and members of pg_read_all_stats can see details. + * Other users only get the pid value to know whether it is a WAL + * receiver, but no details. + */ + MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1)); + } + else + { + values[1] = CStringGetTextDatum(WalRcvGetStateString(state)); + + if (XLogRecPtrIsInvalid(receive_start_lsn)) + nulls[2] = true; + else + values[2] = LSNGetDatum(receive_start_lsn); + values[3] = Int32GetDatum(receive_start_tli); + if (XLogRecPtrIsInvalid(written_lsn)) + nulls[4] = true; + else + values[4] = LSNGetDatum(written_lsn); + if (XLogRecPtrIsInvalid(flushed_lsn)) + nulls[5] = true; + else + values[5] = LSNGetDatum(flushed_lsn); + values[6] = Int32GetDatum(received_tli); + if (last_send_time == 0) + nulls[7] = true; + else + values[7] = TimestampTzGetDatum(last_send_time); + if (last_receipt_time == 0) + nulls[8] = true; + else + values[8] = TimestampTzGetDatum(last_receipt_time); + if (XLogRecPtrIsInvalid(latest_end_lsn)) + nulls[9] = true; + else + values[9] = LSNGetDatum(latest_end_lsn); + if (latest_end_time == 0) + nulls[10] = true; + else + values[10] = TimestampTzGetDatum(latest_end_time); + if (*slotname == '\0') + nulls[11] = true; + else + values[11] = CStringGetTextDatum(slotname); + if (*sender_host == '\0') + nulls[12] = true; + else + values[12] = CStringGetTextDatum(sender_host); + if (sender_port == 0) + nulls[13] = true; + else + values[13] = Int32GetDatum(sender_port); + if (*conninfo == '\0') + nulls[14] = true; + else + values[14] = CStringGetTextDatum(conninfo); + } + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} |