/*------------------------------------------------------------------------- * * walreceiver.c * * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It * is the process in the standby server that takes charge of receiving * XLOG records from a primary server during streaming replication. * * When the startup process determines that it's time to start streaming, * it instructs postmaster to start walreceiver. Walreceiver first connects * to the primary server (it will be served by a walsender process * in the primary server), and then keeps receiving XLOG records and * writing them to the disk as long as the connection is alive. As XLOG * records are received and flushed to disk, it updates the * WalRcv->flushedUpto variable in shared memory, to inform the startup * process of how far it can proceed with XLOG replay. * * A WAL receiver cannot directly load GUC parameters used when establishing * its connection to the primary. Instead it relies on parameter values * that are passed down by the startup process when streaming is requested. * This applies, for example, to the replication slot and the connection * string to be used for the connection with the primary. * * If the primary server ends streaming, but doesn't disconnect, walreceiver * goes into "waiting" mode, and waits for the startup process to give new * instructions. The startup process will treat that the same as * disconnection, and will rescan the archive/pg_wal directory. But when the * startup process wants to try streaming replication again, it will just * nudge the existing walreceiver process that's waiting, instead of launching * a new one. * * Normal termination is by SIGTERM, which instructs the walreceiver to * exit(0). Emergency termination is by SIGQUIT; like any postmaster child * process, the walreceiver will simply abort and exit on SIGQUIT. A close * of the connection and a FATAL error are treated not as a crash but as * normal operation. * * This file contains the server-facing parts of walreceiver. The libpq- * specific parts are in the libpqwalreceiver module. It's loaded * dynamically to avoid linking the server with libpq. * * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group * * * IDENTIFICATION * src/backend/replication/walreceiver.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include "access/htup_details.h" #include "access/timeline.h" #include "access/transam.h" #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "common/ip.h" #include "funcapi.h" #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/procarray.h" #include "storage/procsignal.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/resowner.h" #include "utils/timestamp.h" /* * GUC variables. (Other variables that affect walreceiver are in xlog.c * because they're passed down from the startup process, for better * synchronization.) */ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; /* libpqwalreceiver connection */ static WalReceiverConn *wrconn = NULL; WalReceiverFunctionsType *WalReceiverFunctions = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ /* * These variables are used similarly to openLogFile/SegNo, * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID * corresponding the filename of recvFile. */ static int recvFile = -1; static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; /* * Flags set by interrupt handlers of walreceiver for later service in the * main loop. */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; /* * LogstreamResult indicates the byte positions that we have already * written/fsynced. */ static struct { XLogRecPtr Write; /* last byte + 1 written out in the standby */ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; static StringInfoData reply_message; static StringInfoData incoming_message; /* Prototypes for private functions */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); static void WalRcvShutdownHandler(SIGNAL_ARGS); /* * Process any interrupts the walreceiver process may have received. * This should be called any time the process's latch has become set. * * Currently, only SIGTERM is of interest. We can't just exit(1) within the * SIGTERM signal handler, because the signal might arrive in the middle of * some critical operation, like while we're holding a spinlock. Instead, the * signal handler sets a flag variable as well as setting the process's latch. * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the * latch has become set. Operations that could block for a long time, such as * reading from a remote server, must pay attention to the latch too; see * libpqrcv_PQgetResult for example. */ void ProcessWalRcvInterrupts(void) { /* * Although walreceiver interrupt handling doesn't use the same scheme as * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive * any incoming signals on Win32, and also to make sure we process any * barrier events. */ CHECK_FOR_INTERRUPTS(); if (got_SIGTERM) { ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating walreceiver process due to administrator command"))); } } /* Main entry point for walreceiver process */ void WalReceiverMain(void) { char conninfo[MAXCONNINFO]; char *tmp_conninfo; char slotname[NAMEDATALEN]; bool is_temp_slot; XLogRecPtr startpoint; TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; TimestampTz now; bool ping_sent; char *err; char *sender_host = NULL; int sender_port = 0; /* * WalRcv should be set up already (if we are a backend, we inherit this * by fork() or EXEC_BACKEND mechanism from the postmaster). */ Assert(walrcv != NULL); now = GetCurrentTimestamp(); /* * Mark walreceiver as running in shared memory. * * Do this as early as possible, so that if we fail later on, we'll set * state to STOPPED. If we die before this, the startup process will keep * waiting for us to start up, until it times out. */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->pid == 0); switch (walrcv->walRcvState) { case WALRCV_STOPPING: /* If we've already been requested to stop, don't start up. */ walrcv->walRcvState = WALRCV_STOPPED; /* fall through */ case WALRCV_STOPPED: SpinLockRelease(&walrcv->mutex); proc_exit(1); break; case WALRCV_STARTING: /* The usual case */ break; case WALRCV_WAITING: case WALRCV_STREAMING: case WALRCV_RESTARTING: default: /* Shouldn't happen */ SpinLockRelease(&walrcv->mutex); elog(PANIC, "walreceiver still running according to shared memory state"); } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; walrcv->walRcvState = WALRCV_STREAMING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN); is_temp_slot = walrcv->is_temp_slot; startpoint = walrcv->receiveStart; startpointTLI = walrcv->receiveStartTLI; /* * At most one of is_temp_slot and slotname can be set; otherwise, * RequestXLogStreaming messed up. */ Assert(!is_temp_slot || (slotname[0] == '\0')); /* Initialise to a sanish value */ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; /* Report the latch to use to awaken this process */ walrcv->latch = &MyProc->procLatch; SpinLockRelease(&walrcv->mutex); pg_atomic_write_u64(&WalRcv->writtenUpto, 0); /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ pqsignal(SIGINT, SIG_IGN); pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ pqsignal(SIGQUIT, SignalHandlerForCrashExit); pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, SIG_IGN); /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); /* We allow SIGQUIT (quickdie) at all times */ sigdelset(&BlockSig, SIGQUIT); /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); /* Establish the connection to the primary for XLOG streaming */ wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) ereport(ERROR, (errmsg("could not connect to the primary server: %s", err))); /* * Save user-visible connection string. This clobbers the original * conninfo, for security. Also save host and port of the sender server * this walreceiver is connected to. */ tmp_conninfo = walrcv_get_conninfo(wrconn); walrcv_get_senderinfo(wrconn, &sender_host, &sender_port); SpinLockAcquire(&walrcv->mutex); memset(walrcv->conninfo, 0, MAXCONNINFO); if (tmp_conninfo) strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO); memset(walrcv->sender_host, 0, NI_MAXHOST); if (sender_host) strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST); walrcv->sender_port = sender_port; walrcv->ready_to_display = true; SpinLockRelease(&walrcv->mutex); if (tmp_conninfo) pfree(tmp_conninfo); if (sender_host) pfree(sender_host); first_stream = true; for (;;) { char *primary_sysid; char standby_sysid[32]; WalRcvStreamOptions options; /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command. */ primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, GetSystemIdentifier()); if (strcmp(primary_sysid, standby_sysid) != 0) { ereport(ERROR, (errmsg("database system identifier differs between the primary and standby"), errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } /* * Confirm that the current timeline of the primary is the same or * ahead of ours. */ if (primaryTLI < startpointTLI) ereport(ERROR, (errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); /* * Get any missing history files. We do this always, even when we're * not interested in that timeline, so that if we're promoted to * become the master later on, we don't select the same timeline that * was already used in the current master. This isn't bullet-proof - * you'll need some external software to manage your cluster if you * need to ensure that a unique timeline id is chosen in every case, * but let's avoid the confusion of timeline id collisions where we * can. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); /* * Create temporary replication slot if requested, and update slot * name in shared memory. (Note the slot name cannot already be set * in this case.) */ if (is_temp_slot) { snprintf(slotname, sizeof(slotname), "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); walrcv_create_slot(wrconn, slotname, true, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); SpinLockRelease(&walrcv->mutex); } /* * Start streaming. * * We'll try to start at the requested starting point and timeline, * even if it's different from the server's latest timeline. In case * we've already reached the end of the old timeline, the server will * finish the streaming immediately, and we will go back to await * orders from the startup process. If recovery_target_timeline is * 'latest', the startup process will scan pg_wal and find the new * history file, bump recovery target timeline, and ask us to restart * on the new timeline. */ options.logical = false; options.startpoint = startpoint; options.slotname = slotname[0] != '\0' ? slotname : NULL; options.proto.physical.startpointTLI = startpointTLI; ThisTimeLineID = startpointTLI; if (walrcv_startstreaming(wrconn, &options)) { if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", (uint32) (startpoint >> 32), (uint32) startpoint, startpointTLI))); else ereport(LOG, (errmsg("restarted WAL streaming at %X/%X on timeline %u", (uint32) (startpoint >> 32), (uint32) startpoint, startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); initStringInfo(&incoming_message); /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; /* Loop until end-of-streaming or error */ for (;;) { char *buf; int len; bool endofwal = false; pgsocket wait_fd = PGINVALID_SOCKET; int rc; /* * Exit walreceiver if we're not in recovery. This should not * happen, but cross-check the status here. */ if (!RecoveryInProgress()) ereport(FATAL, (errmsg("cannot continue WAL streaming, recovery has already ended"))); /* Process any requests or signals received recently */ ProcessWalRcvInterrupts(); if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); XLogWalRcvSendHSFeedback(true); } /* See if we can read data immediately */ len = walrcv_receive(wrconn, &buf, &wait_fd); if (len != 0) { /* * Process the received data, and any subsequent data we * can read without blocking. */ for (;;) { if (len > 0) { /* * Something was received from master, so reset * timeout */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); } else if (len == 0) break; else if (len < 0) { ereport(LOG, (errmsg("replication terminated by primary server"), errdetail("End of WAL reached on timeline %u at %X/%X.", startpointTLI, (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write))); endofwal = true; break; } len = walrcv_receive(wrconn, &buf, &wait_fd); } /* Let the master know that we received some data. */ XLogWalRcvSendReply(false, false); /* * If we've written some records, flush them to disk and * let the startup process and primary server know about * them. */ XLogWalRcvFlush(false); } /* Check if we need to exit the streaming loop. */ if (endofwal) break; /* * Ideally we would reuse a WaitEventSet object repeatedly * here to avoid the overheads of WaitLatchOrSocket on epoll * systems, but we can't be sure that libpq (or any other * walreceiver implementation) has the same socket (even if * the fd is the same number, it may have been closed and * reopened since the last time). In future, if there is a * function for removing sockets from WaitEventSet, then we * could add and remove just the socket each time, potentially * avoiding some system calls. */ Assert(wait_fd != PGINVALID_SOCKET); rc = WaitLatchOrSocket(walrcv->latch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET, wait_fd, NAPTIME_PER_CYCLE, WAIT_EVENT_WAL_RECEIVER_MAIN); if (rc & WL_LATCH_SET) { ResetLatch(walrcv->latch); ProcessWalRcvInterrupts(); if (walrcv->force_reply) { /* * The recovery process has asked us to send apply * feedback now. Make sure the flag is really set to * false in shared memory before sending the reply, so * we don't miss a new request for a reply. */ walrcv->force_reply = false; pg_memory_barrier(); XLogWalRcvSendReply(true, false); } } if (rc & WL_TIMEOUT) { /* * We didn't receive anything new. If we haven't heard * anything from the server for more than * wal_receiver_timeout / 2, ping the server. Also, if * it's been longer than wal_receiver_status_interval * since the last update we sent, send a status update to * the master anyway, to report any progress in applying * WAL. */ bool requestReply = false; /* * Check if time since last receive from standby has * reached the configured limit. */ if (wal_receiver_timeout > 0) { TimestampTz now = GetCurrentTimestamp(); TimestampTz timeout; timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout); if (now >= timeout) ereport(ERROR, (errmsg("terminating walreceiver due to timeout"))); /* * We didn't receive anything new, for half of * receiver replication timeout. Ping the server. */ if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout / 2)); if (now >= timeout) { requestReply = true; ping_sent = true; } } } XLogWalRcvSendReply(requestReply, requestReply); XLogWalRcvSendHSFeedback(false); } } /* * The backend finished streaming. Exit streaming COPY-mode from * our side, too. */ walrcv_endstreaming(wrconn, &primaryTLI); /* * If the server had switched to a new timeline that we didn't * know about when we began streaming, fetch its timeline history * file now. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); } else ereport(LOG, (errmsg("primary server contains no more WAL on requested timeline %u", startpointTLI))); /* * End of WAL reached on the requested timeline. Close the last * segment, and await for new orders from the startup process. */ if (recvFile >= 0) { char xlogfname[MAXFNAMELEN]; XLogWalRcvFlush(false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); if (close(recvFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", xlogfname))); /* * Create .done file forcibly to prevent the streamed segment from * being archived later. */ if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else XLogArchiveNotify(xlogfname); } recvFile = -1; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); WalRcvWaitForStartPosition(&startpoint, &startpointTLI); } /* not reached */ } /* * Wait for startup process to set receiveStart and receiveStartTLI. */ static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) { WalRcvData *walrcv = WalRcv; int state; SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; if (state != WALRCV_STREAMING) { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) proc_exit(0); else elog(FATAL, "unexpected walreceiver state"); } walrcv->walRcvState = WALRCV_WAITING; walrcv->receiveStart = InvalidXLogRecPtr; walrcv->receiveStartTLI = 0; SpinLockRelease(&walrcv->mutex); set_ps_display("idle"); /* * nudge startup process to notice that we've stopped streaming and are * now waiting for instructions. */ WakeupRecovery(); for (;;) { ResetLatch(walrcv->latch); ProcessWalRcvInterrupts(); SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_WAITING || walrcv->walRcvState == WALRCV_STOPPING); if (walrcv->walRcvState == WALRCV_RESTARTING) { /* * No need to handle changes in primary_conninfo or * primary_slotname here. Startup process will signal us to * terminate in case those change. */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; walrcv->walRcvState = WALRCV_STREAMING; SpinLockRelease(&walrcv->mutex); break; } if (walrcv->walRcvState == WALRCV_STOPPING) { /* * We should've received SIGTERM if the startup process wants us * to die, but might as well check it here too. */ SpinLockRelease(&walrcv->mutex); exit(1); } SpinLockRelease(&walrcv->mutex); (void) WaitLatch(walrcv->latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, WAIT_EVENT_WAL_RECEIVER_WAIT_START); } if (update_process_title) { char activitymsg[50]; snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X", (uint32) (*startpoint >> 32), (uint32) *startpoint); set_ps_display(activitymsg); } } /* * Fetch any missing timeline history files between 'first' and 'last' * (inclusive) from the server. */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) { TimeLineID tli; for (tli = first; tli <= last; tli++) { /* there's no history file for timeline 1 */ if (tli != 1 && !existsTimeLineHistory(tli)) { char *fname; char *content; int len; char expectedfname[MAXFNAMELEN]; ereport(LOG, (errmsg("fetching timeline history file for timeline %u from primary server", tli))); walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); /* * Check that the filename on the master matches what we * calculated ourselves. This is just a sanity check, it should * always match. */ TLHistoryFileName(expectedfname, tli); if (strcmp(fname, expectedfname) != 0) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u", tli))); /* * Write the file to pg_wal. */ writeTimeLineHistoryFile(tli, content, len); /* * Mark the streamed history file as ready for archiving * if archive_mode is always. */ if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(fname); else XLogArchiveNotify(fname); pfree(fname); pfree(content); } } } /* * Mark us as STOPPED in shared memory at exit. */ static void WalRcvDie(int code, Datum arg) { WalRcvData *walrcv = WalRcv; /* Ensure that all WAL records received are flushed to disk */ XLogWalRcvFlush(true); /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || walrcv->walRcvState == WALRCV_STOPPING); Assert(walrcv->pid == MyProcPid); walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; walrcv->ready_to_display = false; walrcv->latch = NULL; SpinLockRelease(&walrcv->mutex); /* Terminate the connection gracefully. */ if (wrconn != NULL) walrcv_disconnect(wrconn); /* Wake up the startup process to notice promptly that we're gone */ WakeupRecovery(); } /* SIGHUP: set flag to re-read config file at next convenient time */ static void WalRcvSigHupHandler(SIGNAL_ARGS) { got_SIGHUP = true; } /* SIGTERM: set flag for ProcessWalRcvInterrupts */ static void WalRcvShutdownHandler(SIGNAL_ARGS) { int save_errno = errno; got_SIGTERM = true; if (WalRcv->latch) SetLatch(WalRcv->latch); errno = save_errno; } /* * Accept the message from XLOG stream, and process it. */ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { int hdrlen; XLogRecPtr dataStart; XLogRecPtr walEnd; TimestampTz sendTime; bool replyRequested; resetStringInfo(&incoming_message); switch (type) { case 'w': /* WAL records */ { /* copy message to StringInfo */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); appendBinaryStringInfo(&incoming_message, buf, hdrlen); /* read the fields */ dataStart = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); buf += hdrlen; len -= hdrlen; XLogWalRcvWrite(buf, len, dataStart); break; } case 'k': /* Keepalive */ { /* copy message to StringInfo */ hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary"))); appendBinaryStringInfo(&incoming_message, buf, hdrlen); /* read the fields */ walEnd = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); replyRequested = pq_getmsgbyte(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); /* If the primary requested a reply, send one immediately */ if (replyRequested) XLogWalRcvSendReply(true, false); break; } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type))); } } /* * Write XLOG data to disk. */ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int startoff; int byteswritten; while (nbytes > 0) { int segbytes; if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) { bool use_existent; /* * fsync() and close current file before we switch to next one. We * would otherwise have to reopen this file to fsync it later */ if (recvFile >= 0) { char xlogfname[MAXFNAMELEN]; XLogWalRcvFlush(false); XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); /* * XLOG segment files will be re-read by recovery in startup * process soon, so we don't advise the OS to release cache * pages associated with the file like XLogFileClose() does. */ if (close(recvFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", xlogfname))); /* * Create .done file forcibly to prevent the streamed segment * from being archived later. */ if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else XLogArchiveNotify(xlogfname); } recvFile = -1; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); use_existent = true; recvFile = XLogFileInit(recvSegNo, &use_existent, true); recvFileTLI = ThisTimeLineID; } /* Calculate the start offset of the received logs */ startoff = XLogSegmentOffset(recptr, wal_segment_size); if (startoff + nbytes > wal_segment_size) segbytes = wal_segment_size - startoff; else segbytes = nbytes; /* OK to write the logs */ errno = 0; byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff); if (byteswritten <= 0) { char xlogfname[MAXFNAMELEN]; int save_errno; /* if write didn't set errno, assume no disk space */ if (errno == 0) errno = ENOSPC; save_errno = errno; XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); errno = save_errno; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to log segment %s " "at offset %u, length %lu: %m", xlogfname, startoff, (unsigned long) segbytes))); } /* Update state for write */ recptr += byteswritten; nbytes -= byteswritten; buf += byteswritten; LogstreamResult.Write = recptr; } /* Update shared-memory status */ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } /* * Flush the log to disk. * * If we're in the midst of dying, it's unwise to do anything that might throw * an error, so we skip sending a reply in that case. */ static void XLogWalRcvFlush(bool dying) { if (LogstreamResult.Flush < LogstreamResult.Write) { WalRcvData *walrcv = WalRcv; issue_xlog_fsync(recvFile, recvSegNo); LogstreamResult.Flush = LogstreamResult.Write; /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); if (walrcv->flushedUpto < LogstreamResult.Flush) { walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) WalSndWakeup(); /* Report XLOG streaming progress in PS display */ if (update_process_title) { char activitymsg[50]; snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write); set_ps_display(activitymsg); } /* Also let the master know that we made some progress */ if (!dying) { XLogWalRcvSendReply(false, false); XLogWalRcvSendHSFeedback(false); } } } /* * Send reply message to primary, indicating our current WAL locations, oldest * xmin and the current time. * * If 'force' is not set, the message is only sent if enough time has * passed since last status update to reach wal_receiver_status_interval. * If wal_receiver_status_interval is disabled altogether and 'force' is * false, this is a no-op. * * If 'requestReply' is true, requests the server to reply immediately upon * receiving this message. This is used for heartbeats, when approaching * wal_receiver_timeout. */ static void XLogWalRcvSendReply(bool force, bool requestReply) { static XLogRecPtr writePtr = 0; static XLogRecPtr flushPtr = 0; XLogRecPtr applyPtr; static TimestampTz sendTime = 0; TimestampTz now; /* * If the user doesn't want status to be reported to the master, be sure * to exit before doing anything at all. */ if (!force && wal_receiver_status_interval <= 0) return; /* Get current timestamp. */ now = GetCurrentTimestamp(); /* * We can compare the write and flush positions to the last message we * sent without taking any lock, but the apply position requires a spin * lock, so we don't check that unless something else has changed or 10 * seconds have passed. This means that the apply WAL location will * appear, from the master's point of view, to lag slightly, but since * this is only for reporting purposes and only on idle systems, that's * probably OK. */ if (!force && writePtr == LogstreamResult.Write && flushPtr == LogstreamResult.Flush && !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; sendTime = now; /* Construct a new message */ writePtr = LogstreamResult.Write; flushPtr = LogstreamResult.Flush; applyPtr = GetXLogReplayRecPtr(NULL); resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'r'); pq_sendint64(&reply_message, writePtr); pq_sendint64(&reply_message, flushPtr); pq_sendint64(&reply_message, applyPtr); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendbyte(&reply_message, requestReply ? 1 : 0); /* Send it */ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, (uint32) (applyPtr >> 32), (uint32) applyPtr, requestReply ? " (reply requested)" : ""); walrcv_send(wrconn, reply_message.data, reply_message.len); } /* * Send hot standby feedback message to primary, plus the current time, * in case they don't have a watch. * * If the user disables feedback, send one final message to tell sender * to forget about the xmin on this standby. We also send this message * on first connect because a previous connection might have set xmin * on a replication slot. (If we're not using a slot it's harmless to * send a feedback message explicitly setting InvalidTransactionId). */ static void XLogWalRcvSendHSFeedback(bool immed) { TimestampTz now; FullTransactionId nextFullXid; TransactionId nextXid; uint32 xmin_epoch, catalog_xmin_epoch; TransactionId xmin, catalog_xmin; static TimestampTz sendTime = 0; /* initially true so we always send at least one feedback message */ static bool master_has_standby_xmin = true; /* * If the user doesn't want status to be reported to the master, be sure * to exit before doing anything at all. */ if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && !master_has_standby_xmin) return; /* Get current timestamp. */ now = GetCurrentTimestamp(); if (!immed) { /* * Send feedback at most once per wal_receiver_status_interval. */ if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; sendTime = now; } /* * If Hot Standby is not yet accepting connections there is nothing to * send. Check this after the interval has expired to reduce number of * calls. * * Bailing out here also ensures that we don't send feedback until we've * read our own replication slot state, so we don't tell the master to * discard needed xmin or catalog_xmin from any slots that may exist on * this replica. */ if (!HotStandbyActive()) return; /* * Make the expensive call to get the oldest xmin once we are certain * everything else has been checked. */ if (hot_standby_feedback) { TransactionId slot_xmin; /* * Usually GetOldestXmin() would include both global replication slot * xmin and catalog_xmin in its calculations, but we want to derive * separate values for each of those. So we ask for an xmin that * excludes the catalog_xmin. */ xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN); ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin); if (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedes(slot_xmin, xmin)) xmin = slot_xmin; } else { xmin = InvalidTransactionId; catalog_xmin = InvalidTransactionId; } /* * Get epoch and adjust if nextXid and oldestXmin are different sides of * the epoch boundary. */ nextFullXid = ReadNextFullTransactionId(); nextXid = XidFromFullTransactionId(nextFullXid); xmin_epoch = EpochFromFullTransactionId(nextFullXid); catalog_xmin_epoch = xmin_epoch; if (nextXid < xmin) xmin_epoch--; if (nextXid < catalog_xmin) catalog_xmin_epoch--; elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u", xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch); /* Construct the message and send it. */ resetStringInfo(&reply_message); pq_sendbyte(&reply_message, 'h'); pq_sendint64(&reply_message, GetCurrentTimestamp()); pq_sendint32(&reply_message, xmin); pq_sendint32(&reply_message, xmin_epoch); pq_sendint32(&reply_message, catalog_xmin); pq_sendint32(&reply_message, catalog_xmin_epoch); walrcv_send(wrconn, reply_message.data, reply_message.len); if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin)) master_has_standby_xmin = true; else master_has_standby_xmin = false; } /* * Update shared memory status upon receiving a message from primary. * * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest * message, reported by primary. */ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) { WalRcvData *walrcv = WalRcv; TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); if (walrcv->latestWalEnd < walEnd) walrcv->latestWalEndTime = sendTime; walrcv->latestWalEnd = walEnd; walrcv->lastMsgSendTime = sendTime; walrcv->lastMsgReceiptTime = lastMsgReceiptTime; SpinLockRelease(&walrcv->mutex); if (log_min_messages <= DEBUG2) { char *sendtime; char *receipttime; int applyDelay; /* Copy because timestamptz_to_str returns a static buffer */ sendtime = pstrdup(timestamptz_to_str(sendTime)); receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime)); applyDelay = GetReplicationApplyDelay(); /* apply delay is not available */ if (applyDelay == -1) elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms", sendtime, receipttime, GetReplicationTransferLatency()); else elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms", sendtime, receipttime, applyDelay, GetReplicationTransferLatency()); pfree(sendtime); pfree(receipttime); } } /* * Wake up the walreceiver main loop. * * This is called by the startup process whenever interesting xlog records * are applied, so that walreceiver can check if it needs to send an apply * notification back to the master which may be waiting in a COMMIT with * synchronous_commit = remote_apply. */ void WalRcvForceReply(void) { Latch *latch; WalRcv->force_reply = true; /* fetching the latch pointer might not be atomic, so use spinlock */ SpinLockAcquire(&WalRcv->mutex); latch = WalRcv->latch; SpinLockRelease(&WalRcv->mutex); if (latch) SetLatch(latch); } /* * Return a string constant representing the state. This is used * in system functions and views, and should *not* be translated. */ static const char * WalRcvGetStateString(WalRcvState state) { switch (state) { case WALRCV_STOPPED: return "stopped"; case WALRCV_STARTING: return "starting"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: return "waiting"; case WALRCV_RESTARTING: return "restarting"; case WALRCV_STOPPING: return "stopping"; } return "UNKNOWN"; } /* * Returns activity of WAL receiver, including pid, state and xlog locations * received from the WAL sender of another server. */ Datum pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) { TupleDesc tupdesc; Datum *values; bool *nulls; int pid; bool ready_to_display; WalRcvState state; XLogRecPtr receive_start_lsn; TimeLineID receive_start_tli; XLogRecPtr written_lsn; XLogRecPtr flushed_lsn; TimeLineID received_tli; TimestampTz last_send_time; TimestampTz last_receipt_time; XLogRecPtr latest_end_lsn; TimestampTz latest_end_time; char sender_host[NI_MAXHOST]; int sender_port = 0; char slotname[NAMEDATALEN]; char conninfo[MAXCONNINFO]; /* Take a lock to ensure value consistency */ SpinLockAcquire(&WalRcv->mutex); pid = (int) WalRcv->pid; ready_to_display = WalRcv->ready_to_display; state = WalRcv->walRcvState; receive_start_lsn = WalRcv->receiveStart; receive_start_tli = WalRcv->receiveStartTLI; flushed_lsn = WalRcv->flushedUpto; received_tli = WalRcv->receivedTLI; last_send_time = WalRcv->lastMsgSendTime; last_receipt_time = WalRcv->lastMsgReceiptTime; latest_end_lsn = WalRcv->latestWalEnd; latest_end_time = WalRcv->latestWalEndTime; strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname)); strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host)); sender_port = WalRcv->sender_port; strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo)); SpinLockRelease(&WalRcv->mutex); /* * No WAL receiver (or not ready yet), just return a tuple with NULL * values */ if (pid == 0 || !ready_to_display) PG_RETURN_NULL(); /* * Read "writtenUpto" without holding a spinlock. Note that it may not be * consistent with the other shared variables of the WAL receiver * protected by a spinlock, but this should not be used for data integrity * checks. */ written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto); /* determine result type */ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); values = palloc0(sizeof(Datum) * tupdesc->natts); nulls = palloc0(sizeof(bool) * tupdesc->natts); /* Fetch values */ values[0] = Int32GetDatum(pid); if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS)) { /* * Only superusers and members of pg_read_all_stats can see details. * Other users only get the pid value to know whether it is a WAL * receiver, but no details. */ MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1)); } else { values[1] = CStringGetTextDatum(WalRcvGetStateString(state)); if (XLogRecPtrIsInvalid(receive_start_lsn)) nulls[2] = true; else values[2] = LSNGetDatum(receive_start_lsn); values[3] = Int32GetDatum(receive_start_tli); if (XLogRecPtrIsInvalid(written_lsn)) nulls[4] = true; else values[4] = LSNGetDatum(written_lsn); if (XLogRecPtrIsInvalid(flushed_lsn)) nulls[5] = true; else values[5] = LSNGetDatum(flushed_lsn); values[6] = Int32GetDatum(received_tli); if (last_send_time == 0) nulls[7] = true; else values[7] = TimestampTzGetDatum(last_send_time); if (last_receipt_time == 0) nulls[8] = true; else values[8] = TimestampTzGetDatum(last_receipt_time); if (XLogRecPtrIsInvalid(latest_end_lsn)) nulls[9] = true; else values[9] = LSNGetDatum(latest_end_lsn); if (latest_end_time == 0) nulls[10] = true; else values[10] = TimestampTzGetDatum(latest_end_time); if (*slotname == '\0') nulls[11] = true; else values[11] = CStringGetTextDatum(slotname); if (*sender_host == '\0') nulls[12] = true; else values[12] = CStringGetTextDatum(sender_host); if (sender_port == 0) nulls[13] = true; else values[13] = Int32GetDatum(sender_port); if (*conninfo == '\0') nulls[14] = true; else values[14] = CStringGetTextDatum(conninfo); } /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); }