diff options
Diffstat (limited to 'src/backend/backup/basebackup_copy.c')
-rw-r--r-- | src/backend/backup/basebackup_copy.c | 473 |
1 files changed, 473 insertions, 0 deletions
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c new file mode 100644 index 0000000..62b518f --- /dev/null +++ b/src/backend/backup/basebackup_copy.c @@ -0,0 +1,473 @@ +/*------------------------------------------------------------------------- + * + * basebackup_copy.c + * send basebackup archives using COPY OUT + * + * We send a result set with information about the tabelspaces to be included + * in the backup before starting COPY OUT. Then, we start a single COPY OUT + * operation and transmits all the archives and the manifest if present during + * the course of that single COPY OUT. Each CopyData message begins with a + * type byte, allowing us to signal the start of a new archive, or the + * manifest, by some means other than ending the COPY stream. This also allows + * for future protocol extensions, since we can include arbitrary information + * in the message stream as long as we're certain that the client will know + * what to do with it. + * + * An older method that sent each archive using a separate COPY OUT + * operation is no longer supported. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/backup/basebackup_copy.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "backup/basebackup.h" +#include "backup/basebackup_sink.h" +#include "catalog/pg_type_d.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "utils/timestamp.h" + +typedef struct bbsink_copystream +{ + /* Common information for all types of sink. */ + bbsink base; + + /* Are we sending the archives to the client, or somewhere else? */ + bool send_to_client; + + /* + * Protocol message buffer. We assemble CopyData protocol messages by + * setting the first character of this buffer to 'd' (archive or manifest + * data) and then making base.bbs_buffer point to the second character so + * that the rest of the data gets copied into the message just where we + * want it. + */ + char *msgbuffer; + + /* + * When did we last report progress to the client, and how much progress + * did we report? + */ + TimestampTz last_progress_report_time; + uint64 bytes_done_at_last_time_check; +} bbsink_copystream; + +/* + * We don't want to send progress messages to the client excessively + * frequently. Ideally, we'd like to send a message when the time since the + * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking + * the system time every time we send a tiny bit of data seems too expensive. + * So we only check it after the number of bytes sine the last check reaches + * PROGRESS_REPORT_BYTE_INTERVAL. + */ +#define PROGRESS_REPORT_BYTE_INTERVAL 65536 +#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000 + +static void bbsink_copystream_begin_backup(bbsink *sink); +static void bbsink_copystream_begin_archive(bbsink *sink, + const char *archive_name); +static void bbsink_copystream_archive_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_archive(bbsink *sink); +static void bbsink_copystream_begin_manifest(bbsink *sink); +static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_manifest(bbsink *sink); +static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); +static void bbsink_copystream_cleanup(bbsink *sink); + +static void SendCopyOutResponse(void); +static void SendCopyDone(void); +static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); +static void SendTablespaceList(List *tablespaces); +static void send_int8_string(StringInfoData *buf, int64 intval); + +static const bbsink_ops bbsink_copystream_ops = { + .begin_backup = bbsink_copystream_begin_backup, + .begin_archive = bbsink_copystream_begin_archive, + .archive_contents = bbsink_copystream_archive_contents, + .end_archive = bbsink_copystream_end_archive, + .begin_manifest = bbsink_copystream_begin_manifest, + .manifest_contents = bbsink_copystream_manifest_contents, + .end_manifest = bbsink_copystream_end_manifest, + .end_backup = bbsink_copystream_end_backup, + .cleanup = bbsink_copystream_cleanup +}; + +/* + * Create a new 'copystream' bbsink. + */ +bbsink * +bbsink_copystream_new(bool send_to_client) +{ + bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream)); + + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops; + sink->send_to_client = send_to_client; + + /* Set up for periodic progress reporting. */ + sink->last_progress_report_time = GetCurrentTimestamp(); + sink->bytes_done_at_last_time_check = UINT64CONST(0); + + return &sink->base; +} + +/* + * Send start-of-backup wire protocol messages. + */ +static void +bbsink_copystream_begin_backup(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = sink->bbs_state; + char *buf; + + /* + * Initialize buffer. We ultimately want to send the archive and manifest + * data by means of CopyData messages where the payload portion of each + * message begins with a type byte. However, basebackup.c expects the + * buffer to be aligned, so we can't just allocate one extra byte for the + * type byte. Instead, allocate enough extra bytes that the portion of the + * buffer we reveal to our callers can be aligned, while leaving room to + * slip the type byte in just beforehand. That will allow us to ship the + * data with a single call to pq_putmessage and without needing any extra + * copying. + */ + buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF); + mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1); + mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF; + mysink->msgbuffer[0] = 'd'; /* archive or manifest data */ + + /* Tell client the backup start location. */ + SendXlogRecPtrResult(state->startptr, state->starttli); + + /* Send client a list of tablespaces. */ + SendTablespaceList(state->tablespaces); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); + + /* Begin COPY stream. This will be used for all archives + manifest. */ + SendCopyOutResponse(); +} + +/* + * Send a CopyData message announcing the beginning of a new archive. + */ +static void +bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_state *state = sink->bbs_state; + tablespaceinfo *ti; + StringInfoData buf; + + ti = list_nth(state->tablespaces, state->tablespace_num); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'n'); /* New archive */ + pq_sendstring(&buf, archive_name); + pq_sendstring(&buf, ti->path == NULL ? "" : ti->path); + pq_endmessage(&buf); +} + +/* + * Send a CopyData message containing a chunk of archive content. + */ +static void +bbsink_copystream_archive_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + uint64 targetbytes; + + /* Send the archive content to the client, if appropriate. */ + if (mysink->send_to_client) + { + /* Add one because we're also sending a leading type byte. */ + pq_putmessage('d', mysink->msgbuffer, len + 1); + } + + /* Consider whether to send a progress report to the client. */ + targetbytes = mysink->bytes_done_at_last_time_check + + PROGRESS_REPORT_BYTE_INTERVAL; + if (targetbytes <= state->bytes_done) + { + TimestampTz now = GetCurrentTimestamp(); + long ms; + + /* + * OK, we've sent a decent number of bytes, so check the system time + * to see whether we're due to send a progress report. + */ + mysink->bytes_done_at_last_time_check = state->bytes_done; + ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time, + now); + + /* + * Send a progress report if enough time has passed. Also send one if + * the system clock was set backward, so that such occurrences don't + * have the effect of suppressing further progress messages. + */ + if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD) + { + mysink->last_progress_report_time = now; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); + } + } +} + +/* + * We don't need to explicitly signal the end of the archive; the client + * will figure out that we've reached the end when we begin the next one, + * or begin the manifest, or end the COPY stream. However, this seems like + * a good time to force out a progress report. One reason for that is that + * if this is the last archive, and we don't force a progress report now, + * the client will never be told that we sent all the bytes. + */ +static void +bbsink_copystream_end_archive(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + + mysink->bytes_done_at_last_time_check = state->bytes_done; + mysink->last_progress_report_time = GetCurrentTimestamp(); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); +} + +/* + * Send a CopyData message announcing the beginning of the backup manifest. + */ +static void +bbsink_copystream_begin_manifest(bbsink *sink) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'm'); /* Manifest */ + pq_endmessage(&buf); +} + +/* + * Each chunk of manifest data is sent using a CopyData message. + */ +static void +bbsink_copystream_manifest_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + + if (mysink->send_to_client) + { + /* Add one because we're also sending a leading type byte. */ + pq_putmessage('d', mysink->msgbuffer, len + 1); + } +} + +/* + * We don't need an explicit terminator for the backup manifest. + */ +static void +bbsink_copystream_end_manifest(bbsink *sink) +{ + /* Do nothing. */ +} + +/* + * Send end-of-backup wire protocol messages. + */ +static void +bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + SendCopyDone(); + SendXlogRecPtrResult(endptr, endtli); +} + +/* + * Cleanup. + */ +static void +bbsink_copystream_cleanup(bbsink *sink) +{ + /* Nothing to do. */ +} + +/* + * Send a CopyOutResponse message. + */ +static void +SendCopyOutResponse(void) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); +} + +/* + * Send a CopyDone message. + */ +static void +SendCopyDone(void) +{ + pq_putemptymessage('c'); +} + +/* + * Send a single resultset containing just a single + * XLogRecPtr record (in text format) + */ +static void +SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) +{ + StringInfoData buf; + char str[MAXFNAMELEN]; + Size len; + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 2); /* 2 fields */ + + /* Field headers */ + pq_sendstring(&buf, "recptr"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, TEXTOID); /* type oid */ + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + pq_sendstring(&buf, "tli"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + + /* + * int8 may seem like a surprising data type for this, but in theory int4 + * would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint32(&buf, INT8OID); /* type oid */ + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 2); /* number of columns */ + + len = snprintf(str, sizeof(str), + "%X/%X", LSN_FORMAT_ARGS(ptr)); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, str, len); + + len = snprintf(str, sizeof(str), "%u", tli); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, str, len); + + pq_endmessage(&buf); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* + * Send a result set via libpq describing the tablespace list. + */ +static void +SendTablespaceList(List *tablespaces) +{ + StringInfoData buf; + ListCell *lc; + + /* Construct and send the directory information */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 3); /* 3 fields */ + + /* First field - spcoid */ + pq_sendstring(&buf, "spcoid"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, OIDOID); /* type oid */ + pq_sendint16(&buf, 4); /* typlen */ + pq_sendint32(&buf, 0); /* typmod */ + pq_sendint16(&buf, 0); /* format code */ + + /* Second field - spclocation */ + pq_sendstring(&buf, "spclocation"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, TEXTOID); + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + /* Third field - size */ + pq_sendstring(&buf, "size"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, INT8OID); + pq_sendint16(&buf, 8); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + foreach(lc, tablespaces) + { + tablespaceinfo *ti = lfirst(lc); + + /* Send one datarow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 3); /* number of columns */ + if (ti->path == NULL) + { + pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ + pq_sendint32(&buf, -1); + } + else + { + Size len; + + len = strlen(ti->oid); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, ti->oid, len); + + len = strlen(ti->path); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, ti->path, len); + } + if (ti->size >= 0) + send_int8_string(&buf, ti->size / 1024); + else + pq_sendint32(&buf, -1); /* NULL */ + + pq_endmessage(&buf); + } +} + +/* + * Send a 64-bit integer as a string via the wire protocol. + */ +static void +send_int8_string(StringInfoData *buf, int64 intval) +{ + char is[32]; + + sprintf(is, INT64_FORMAT, intval); + pq_sendint32(buf, strlen(is)); + pq_sendbytes(buf, is, strlen(is)); +} |