diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
commit | 311bcfc6b3acdd6fd152798c7f287ddf74fa2a98 (patch) | |
tree | 0ec307299b1dada3701e42f4ca6eda57d708261e /src/bin/pg_basebackup/pg_recvlogical.c | |
parent | Initial commit. (diff) | |
download | postgresql-15-311bcfc6b3acdd6fd152798c7f287ddf74fa2a98.tar.xz postgresql-15-311bcfc6b3acdd6fd152798c7f287ddf74fa2a98.zip |
Adding upstream version 15.4.upstream/15.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_basebackup/pg_recvlogical.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_recvlogical.c | 1039 |
1 files changed, 1039 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c new file mode 100644 index 0000000..02b8e27 --- /dev/null +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -0,0 +1,1039 @@ +/*------------------------------------------------------------------------- + * + * pg_recvlogical.c - receive data from a logical decoding slot in a streaming + * fashion and write it to a local file. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_recvlogical.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <dirent.h> +#include <limits.h> +#include <sys/stat.h> +#include <unistd.h> +#ifdef HAVE_SYS_SELECT_H +#include <sys/select.h> +#endif + +#include "access/xlog_internal.h" +#include "common/fe_memutils.h" +#include "common/file_perm.h" +#include "common/logging.h" +#include "fe_utils/option_utils.h" +#include "getopt_long.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "pqexpbuffer.h" +#include "streamutil.h" + +/* Time to sleep between reconnection attempts */ +#define RECONNECT_SLEEP_TIME 5 + +/* Global Options */ +static char *outfile = NULL; +static int verbose = 0; +static bool two_phase = false; +static int noloop = 0; +static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static int fsync_interval = 10 * 1000; /* 10 sec = default */ +static XLogRecPtr startpos = InvalidXLogRecPtr; +static XLogRecPtr endpos = InvalidXLogRecPtr; +static bool do_create_slot = false; +static bool slot_exists_ok = false; +static bool do_start_slot = false; +static bool do_drop_slot = false; +static char *replication_slot = NULL; + +/* filled pairwise with option, value. value may be NULL */ +static char **options; +static size_t noptions = 0; +static const char *plugin = "test_decoding"; + +/* Global State */ +static int outfd = -1; +static volatile sig_atomic_t time_to_abort = false; +static volatile sig_atomic_t output_reopen = false; +static bool output_isfile; +static TimestampTz output_last_fsync = -1; +static bool output_needs_fsync = false; +static XLogRecPtr output_written_lsn = InvalidXLogRecPtr; +static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; + +static void usage(void); +static void StreamLogicalLog(void); +static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); +static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, + bool keepalive, XLogRecPtr lsn); + +static void +usage(void) +{ + printf(_("%s controls PostgreSQL logical decoding streams.\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nAction to be performed:\n")); + printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n")); + printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n")); + printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n")); + printf(_("\nOptions:\n")); + printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n")); + printf(_(" -f, --file=FILE receive log into this file, - for stdout\n")); + printf(_(" -F --fsync-interval=SECS\n" + " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000)); + printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); + printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n")); + printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -o, --option=NAME[=VALUE]\n" + " pass option NAME with optional value VALUE to the\n" + " output plugin\n")); + printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin); + printf(_(" -s, --status-interval=SECS\n" + " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); + printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n")); + printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_("\nConnection options:\n")); + printf(_(" -d, --dbname=DBNAME database to connect to\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); + printf(_(" -p, --port=PORT database server port number\n")); + printf(_(" -U, --username=NAME connect as specified database user\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT); + printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL); +} + +/* + * Send a Standby Status Update message to server. + */ +static bool +sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested) +{ + static XLogRecPtr last_written_lsn = InvalidXLogRecPtr; + static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr; + + char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + int len = 0; + + /* + * we normally don't want to send superfluous feedback, but if it's + * because of a timeout we need to, otherwise wal_sender_timeout will kill + * us. + */ + if (!force && + last_written_lsn == output_written_lsn && + last_fsync_lsn == output_fsync_lsn) + return true; + + if (verbose) + pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)", + LSN_FORMAT_ARGS(output_written_lsn), + LSN_FORMAT_ARGS(output_fsync_lsn), + replication_slot); + + replybuf[len] = 'r'; + len += 1; + fe_sendint64(output_written_lsn, &replybuf[len]); /* write */ + len += 8; + fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */ + len += 8; + fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + len += 8; + fe_sendint64(now, &replybuf[len]); /* sendTime */ + len += 8; + replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ + len += 1; + + startpos = output_written_lsn; + last_written_lsn = output_written_lsn; + last_fsync_lsn = output_fsync_lsn; + + if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + { + pg_log_error("could not send feedback packet: %s", + PQerrorMessage(conn)); + return false; + } + + return true; +} + +static void +disconnect_atexit(void) +{ + if (conn != NULL) + PQfinish(conn); +} + +static bool +OutputFsync(TimestampTz now) +{ + output_last_fsync = now; + + output_fsync_lsn = output_written_lsn; + + if (fsync_interval <= 0) + return true; + + if (!output_needs_fsync) + return true; + + output_needs_fsync = false; + + /* can only fsync if it's a regular file */ + if (!output_isfile) + return true; + + if (fsync(outfd) != 0) + pg_fatal("could not fsync file \"%s\": %m", outfile); + + return true; +} + +/* + * Start the log streaming + */ +static void +StreamLogicalLog(void) +{ + PGresult *res; + char *copybuf = NULL; + TimestampTz last_status = -1; + int i; + PQExpBuffer query; + + output_written_lsn = InvalidXLogRecPtr; + output_fsync_lsn = InvalidXLogRecPtr; + + /* + * Connect in replication mode to the server + */ + if (!conn) + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + return; + + /* + * Start the replication + */ + if (verbose) + pg_log_info("starting log streaming at %X/%X (slot %s)", + LSN_FORMAT_ARGS(startpos), + replication_slot); + + /* Initiate the replication stream at specified location */ + query = createPQExpBuffer(); + appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X", + replication_slot, LSN_FORMAT_ARGS(startpos)); + + /* print options if there are any */ + if (noptions) + appendPQExpBufferStr(query, " ("); + + for (i = 0; i < noptions; i++) + { + /* separator */ + if (i > 0) + appendPQExpBufferStr(query, ", "); + + /* write option name */ + appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]); + + /* write option value if specified */ + if (options[(i * 2) + 1] != NULL) + appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]); + } + + if (noptions) + appendPQExpBufferChar(query, ')'); + + res = PQexec(conn, query->data); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + pg_log_error("could not send replication command \"%s\": %s", + query->data, PQresultErrorMessage(res)); + PQclear(res); + goto error; + } + PQclear(res); + resetPQExpBuffer(query); + + if (verbose) + pg_log_info("streaming initiated"); + + while (!time_to_abort) + { + int r; + int bytes_left; + int bytes_written; + TimestampTz now; + int hdr_len; + XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + /* + * Potentially send a status message to the primary. + */ + now = feGetCurrentTimestamp(); + + if (outfd != -1 && + feTimestampDifferenceExceeds(output_last_fsync, now, + fsync_interval)) + { + if (!OutputFsync(now)) + goto error; + } + + if (standby_message_timeout > 0 && + feTimestampDifferenceExceeds(last_status, now, + standby_message_timeout)) + { + /* Time to send feedback! */ + if (!sendFeedback(conn, now, true, false)) + goto error; + + last_status = now; + } + + /* got SIGHUP, close output file */ + if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0) + { + now = feGetCurrentTimestamp(); + if (!OutputFsync(now)) + goto error; + close(outfd); + outfd = -1; + } + output_reopen = false; + + /* open the output file, if not open yet */ + if (outfd == -1) + { + struct stat statbuf; + + if (strcmp(outfile, "-") == 0) + outfd = fileno(stdout); + else + outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (outfd == -1) + { + pg_log_error("could not open log file \"%s\": %m", outfile); + goto error; + } + + if (fstat(outfd, &statbuf) != 0) + { + pg_log_error("could not stat file \"%s\": %m", outfile); + goto error; + } + + output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd); + } + + r = PQgetCopyData(conn, ©buf, 1); + if (r == 0) + { + /* + * In async mode, and no data available. We block on reading but + * not more than the specified timeout, so that we can send a + * response back to the client. + */ + fd_set input_mask; + TimestampTz message_target = 0; + TimestampTz fsync_target = 0; + struct timeval timeout; + struct timeval *timeoutptr = NULL; + + if (PQsocket(conn) < 0) + { + pg_log_error("invalid socket: %s", PQerrorMessage(conn)); + goto error; + } + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + + /* Compute when we need to wakeup to send a keepalive message. */ + if (standby_message_timeout) + message_target = last_status + (standby_message_timeout - 1) * + ((int64) 1000); + + /* Compute when we need to wakeup to fsync the output file. */ + if (fsync_interval > 0 && output_needs_fsync) + fsync_target = output_last_fsync + (fsync_interval - 1) * + ((int64) 1000); + + /* Now compute when to wakeup. */ + if (message_target > 0 || fsync_target > 0) + { + TimestampTz targettime; + long secs; + int usecs; + + targettime = message_target; + + if (fsync_target > 0 && fsync_target < targettime) + targettime = fsync_target; + + feTimestampDifference(now, + targettime, + &secs, + &usecs); + if (secs <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + else + timeout.tv_sec = secs; + timeout.tv_usec = usecs; + timeoutptr = &timeout; + } + + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Continue the loop and either + * deliver a status packet to the server or just go back into + * blocking. + */ + continue; + } + else if (r < 0) + { + pg_log_error("%s() failed: %m", "select"); + goto error; + } + + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + pg_log_error("could not receive data from WAL stream: %s", + PQerrorMessage(conn)); + goto error; + } + continue; + } + + /* End of copy stream */ + if (r == -1) + break; + + /* Failure while reading the copy stream */ + if (r == -2) + { + pg_log_error("could not read COPY data: %s", + PQerrorMessage(conn)); + goto error; + } + + /* Check the message type. */ + if (copybuf[0] == 'k') + { + int pos; + bool replyRequested; + XLogRecPtr walEnd; + bool endposReached = false; + + /* + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. + */ + pos = 1; /* skip msgtype 'k' */ + walEnd = fe_recvint64(©buf[pos]); + output_written_lsn = Max(walEnd, output_written_lsn); + + pos += 8; /* read walEnd */ + + pos += 8; /* skip sendTime */ + + if (r < pos + 1) + { + pg_log_error("streaming header too small: %d", r); + goto error; + } + replyRequested = copybuf[pos]; + + if (endpos != InvalidXLogRecPtr && walEnd >= endpos) + { + /* + * If there's nothing to read on the socket until a keepalive + * we know that the server has nothing to send us; and if + * walEnd has passed endpos, we know nothing else can have + * committed before endpos. So we can bail out now. + */ + endposReached = true; + } + + /* Send a reply, if necessary */ + if (replyRequested || endposReached) + { + if (!flushAndSendFeedback(conn, &now)) + goto error; + last_status = now; + } + + if (endposReached) + { + prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + time_to_abort = true; + break; + } + + continue; + } + else if (copybuf[0] != 'w') + { + pg_log_error("unrecognized streaming header: \"%c\"", + copybuf[0]); + goto error; + } + + /* + * Read the header of the XLogData message, enclosed in the CopyData + * message. We only need the WAL location field (dataStart), the rest + * of the header is ignored. + */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (r < hdr_len + 1) + { + pg_log_error("streaming header too small: %d", r); + goto error; + } + + /* Extract WAL location for this block */ + cur_record_lsn = fe_recvint64(©buf[1]); + + if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) + { + /* + * We've read past our endpoint, so prepare to go away being + * cautious about what happens to our output data. + */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; + } + + output_written_lsn = Max(cur_record_lsn, output_written_lsn); + + bytes_left = r - hdr_len; + bytes_written = 0; + + /* signal that a fsync is needed */ + output_needs_fsync = true; + + while (bytes_left) + { + int ret; + + ret = write(outfd, + copybuf + hdr_len + bytes_written, + bytes_left); + + if (ret < 0) + { + pg_log_error("could not write %d bytes to log file \"%s\": %m", + bytes_left, outfile); + goto error; + } + + /* Write was successful, advance our position */ + bytes_written += ret; + bytes_left -= ret; + } + + if (write(outfd, "\n", 1) != 1) + { + pg_log_error("could not write %d bytes to log file \"%s\": %m", + 1, outfile); + goto error; + } + + if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos) + { + /* endpos was exactly the record we just processed, we're done */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; + } + } + + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_COPY_OUT) + { + PQclear(res); + + /* + * We're doing a client-initiated clean exit and have sent CopyDone to + * the server. Drain any messages, so we don't miss a last-minute + * ErrorResponse. The walsender stops generating XLogData records once + * it sees CopyDone, so expect this to finish quickly. After CopyDone, + * it's too late for sendFeedback(), even if this were to take a long + * time. Hence, use synchronous-mode PQgetCopyData(). + */ + while (1) + { + int r; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + r = PQgetCopyData(conn, ©buf, 0); + if (r == -1) + break; + if (r == -2) + { + pg_log_error("could not read COPY data: %s", + PQerrorMessage(conn)); + time_to_abort = false; /* unclean exit */ + goto error; + } + } + + res = PQgetResult(conn); + } + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("unexpected termination of replication stream: %s", + PQresultErrorMessage(res)); + goto error; + } + PQclear(res); + + if (outfd != -1 && strcmp(outfile, "-") != 0) + { + TimestampTz t = feGetCurrentTimestamp(); + + /* no need to jump to error on failure here, we're finishing anyway */ + OutputFsync(t); + + if (close(outfd) != 0) + pg_log_error("could not close file \"%s\": %m", outfile); + } + outfd = -1; +error: + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + destroyPQExpBuffer(query); + PQfinish(conn); + conn = NULL; +} + +/* + * Unfortunately we can't do sensible signal handling on windows... + */ +#ifndef WIN32 + +/* + * When sigint is called, just tell the system to exit at the next possible + * moment. + */ +static void +sigint_handler(int signum) +{ + time_to_abort = true; +} + +/* + * Trigger the output file to be reopened. + */ +static void +sighup_handler(int signum) +{ + output_reopen = true; +} +#endif + + +int +main(int argc, char **argv) +{ + static struct option long_options[] = { +/* general options */ + {"file", required_argument, NULL, 'f'}, + {"fsync-interval", required_argument, NULL, 'F'}, + {"no-loop", no_argument, NULL, 'n'}, + {"verbose", no_argument, NULL, 'v'}, + {"two-phase", no_argument, NULL, 't'}, + {"version", no_argument, NULL, 'V'}, + {"help", no_argument, NULL, '?'}, +/* connection options */ + {"dbname", required_argument, NULL, 'd'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"username", required_argument, NULL, 'U'}, + {"no-password", no_argument, NULL, 'w'}, + {"password", no_argument, NULL, 'W'}, +/* replication options */ + {"startpos", required_argument, NULL, 'I'}, + {"endpos", required_argument, NULL, 'E'}, + {"option", required_argument, NULL, 'o'}, + {"plugin", required_argument, NULL, 'P'}, + {"status-interval", required_argument, NULL, 's'}, + {"slot", required_argument, NULL, 'S'}, +/* action */ + {"create-slot", no_argument, NULL, 1}, + {"start", no_argument, NULL, 2}, + {"drop-slot", no_argument, NULL, 3}, + {"if-not-exists", no_argument, NULL, 4}, + {NULL, 0, NULL, 0} + }; + int c; + int option_index; + uint32 hi, + lo; + char *db_name; + + pg_logging_init(argv[0]); + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 || + strcmp(argv[1], "--version") == 0) + { + puts("pg_recvlogical (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:", + long_options, &option_index)) != -1) + { + switch (c) + { +/* general options */ + case 'f': + outfile = pg_strdup(optarg); + break; + case 'F': + if (!option_parse_int(optarg, "-F/--fsync-interval", 0, + INT_MAX / 1000, + &fsync_interval)) + exit(1); + fsync_interval *= 1000; + break; + case 'n': + noloop = 1; + break; + case 'v': + verbose++; + break; + case 't': + two_phase = true; + break; +/* connection options */ + case 'd': + dbname = pg_strdup(optarg); + break; + case 'h': + dbhost = pg_strdup(optarg); + break; + case 'p': + dbport = pg_strdup(optarg); + break; + case 'U': + dbuser = pg_strdup(optarg); + break; + case 'w': + dbgetpassword = -1; + break; + case 'W': + dbgetpassword = 1; + break; +/* replication options */ + case 'I': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + pg_fatal("could not parse start position \"%s\"", optarg); + startpos = ((uint64) hi) << 32 | lo; + break; + case 'E': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + pg_fatal("could not parse end position \"%s\"", optarg); + endpos = ((uint64) hi) << 32 | lo; + break; + case 'o': + { + char *data = pg_strdup(optarg); + char *val = strchr(data, '='); + + if (val != NULL) + { + /* remove =; separate data from val */ + *val = '\0'; + val++; + } + + noptions += 1; + options = pg_realloc(options, sizeof(char *) * noptions * 2); + + options[(noptions - 1) * 2] = data; + options[(noptions - 1) * 2 + 1] = val; + } + + break; + case 'P': + plugin = pg_strdup(optarg); + break; + case 's': + if (!option_parse_int(optarg, "-s/--status-interval", 0, + INT_MAX / 1000, + &standby_message_timeout)) + exit(1); + standby_message_timeout *= 1000; + break; + case 'S': + replication_slot = pg_strdup(optarg); + break; +/* action */ + case 1: + do_create_slot = true; + break; + case 2: + do_start_slot = true; + break; + case 3: + do_drop_slot = true; + break; + case 4: + slot_exists_ok = true; + break; + + default: + /* getopt_long already emitted a complaint */ + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + } + + /* + * Any non-option arguments? + */ + if (optind < argc) + { + pg_log_error("too many command-line arguments (first is \"%s\")", + argv[optind]); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + /* + * Required arguments + */ + if (replication_slot == NULL) + { + pg_log_error("no slot specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (do_start_slot && outfile == NULL) + { + pg_log_error("no target file specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (!do_drop_slot && dbname == NULL) + { + pg_log_error("no database specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (!do_drop_slot && !do_create_slot && !do_start_slot) + { + pg_log_error("at least one action needs to be specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (do_drop_slot && (do_create_slot || do_start_slot)) + { + pg_log_error("cannot use --create-slot or --start together with --drop-slot"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot)) + { + pg_log_error("cannot use --create-slot or --drop-slot together with --startpos"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (endpos != InvalidXLogRecPtr && !do_start_slot) + { + pg_log_error("--endpos may only be specified with --start"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + if (two_phase && !do_create_slot) + { + pg_log_error("--two-phase may only be specified with --create-slot"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + /* + * Obtain a connection to server. Notably, if we need a password, we want + * to collect it from the user immediately. + */ + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + exit(1); + atexit(disconnect_atexit); + + /* + * Trap signals. (Don't do this until after the initial password prompt, + * if one is needed, in GetConnection.) + */ +#ifndef WIN32 + pqsignal(SIGINT, sigint_handler); + pqsignal(SIGHUP, sighup_handler); +#endif + + /* + * Run IDENTIFY_SYSTEM to make sure we connected using a database specific + * replication connection. + */ + if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name)) + exit(1); + + if (db_name == NULL) + pg_fatal("could not establish database-specific replication connection"); + + /* + * Set umask so that directories/files are created with the same + * permissions as directories/files in the source data directory. + * + * pg_mode_mask is set to owner-only by default and then updated in + * GetConnection() where we get the mode from the server-side with + * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm(). + */ + umask(pg_mode_mask); + + /* Drop a replication slot. */ + if (do_drop_slot) + { + if (verbose) + pg_log_info("dropping replication slot \"%s\"", replication_slot); + + if (!DropReplicationSlot(conn, replication_slot)) + exit(1); + } + + /* Create a replication slot. */ + if (do_create_slot) + { + if (verbose) + pg_log_info("creating replication slot \"%s\"", replication_slot); + + if (!CreateReplicationSlot(conn, replication_slot, plugin, false, + false, false, slot_exists_ok, two_phase)) + exit(1); + startpos = InvalidXLogRecPtr; + } + + if (!do_start_slot) + exit(0); + + /* Stream loop */ + while (true) + { + StreamLogicalLog(); + if (time_to_abort) + { + /* + * We've been Ctrl-C'ed or reached an exit limit condition. That's + * not an error, so exit without an errorcode. + */ + exit(0); + } + else if (noloop) + pg_fatal("disconnected"); + else + { + /* translator: check source for value for %d */ + pg_log_info("disconnected; waiting %d seconds to try again", + RECONNECT_SLEEP_TIME); + pg_usleep(RECONNECT_SLEEP_TIME * 1000000); + } + } +} + +/* + * Fsync our output data, and send a feedback message to the server. Returns + * true if successful, false otherwise. + * + * If successful, *now is updated to the current timestamp just before sending + * feedback. + */ +static bool +flushAndSendFeedback(PGconn *conn, TimestampTz *now) +{ + /* flush data to disk, so that we send a recent flush pointer */ + if (!OutputFsync(*now)) + return false; + *now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, *now, true, false)) + return false; + + return true; +} + +/* + * Try to inform the server about our upcoming demise, but don't wait around or + * retry on failure. + */ +static void +prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) +{ + (void) PQputCopyEnd(conn, NULL); + (void) PQflush(conn); + + if (verbose) + { + if (keepalive) + pg_log_info("end position %X/%X reached by keepalive", + LSN_FORMAT_ARGS(endpos)); + else + pg_log_info("end position %X/%X reached by WAL record at %X/%X", + LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn)); + } +} |