summaryrefslogtreecommitdiffstats
path: root/src/backend/backup
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/backup')
-rw-r--r--src/backend/backup/Makefile30
-rw-r--r--src/backend/backup/backup_manifest.c402
-rw-r--r--src/backend/backup/basebackup.c1852
-rw-r--r--src/backend/backup/basebackup_copy.c473
-rw-r--r--src/backend/backup/basebackup_gzip.c304
-rw-r--r--src/backend/backup/basebackup_lz4.c296
-rw-r--r--src/backend/backup/basebackup_progress.c246
-rw-r--r--src/backend/backup/basebackup_server.c309
-rw-r--r--src/backend/backup/basebackup_sink.c125
-rw-r--r--src/backend/backup/basebackup_target.c241
-rw-r--r--src/backend/backup/basebackup_throttle.c199
-rw-r--r--src/backend/backup/basebackup_zstd.c313
12 files changed, 4790 insertions, 0 deletions
diff --git a/src/backend/backup/Makefile b/src/backend/backup/Makefile
new file mode 100644
index 0000000..b21bd8f
--- /dev/null
+++ b/src/backend/backup/Makefile
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/backup
+#
+# IDENTIFICATION
+# src/backend/backup/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/backup
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
+
+OBJS = \
+ backup_manifest.o \
+ basebackup.o \
+ basebackup_copy.o \
+ basebackup_gzip.o \
+ basebackup_lz4.o \
+ basebackup_zstd.o \
+ basebackup_progress.o \
+ basebackup_server.o \
+ basebackup_sink.o \
+ basebackup_target.o \
+ basebackup_throttle.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c
new file mode 100644
index 0000000..618bd0d
--- /dev/null
+++ b/src/backend/backup/backup_manifest.c
@@ -0,0 +1,402 @@
+/*-------------------------------------------------------------------------
+ *
+ * backup_manifest.c
+ * code for generating and sending a backup manifest
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/backup_manifest.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/timeline.h"
+#include "backup/backup_manifest.h"
+#include "backup/basebackup_sink.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "mb/pg_wchar.h"
+#include "utils/builtins.h"
+#include "utils/json.h"
+
+static void AppendStringToManifest(backup_manifest_info *manifest, char *s);
+
+/*
+ * Does the user want a backup manifest?
+ *
+ * It's simplest to always have a manifest_info object, so that we don't need
+ * checks for NULL pointers in too many places. However, if the user doesn't
+ * want a manifest, we set manifest->buffile to NULL.
+ */
+static inline bool
+IsManifestEnabled(backup_manifest_info *manifest)
+{
+ return (manifest->buffile != NULL);
+}
+
+/*
+ * Convenience macro for appending data to the backup manifest.
+ */
+#define AppendToManifest(manifest, ...) \
+ { \
+ char *_manifest_s = psprintf(__VA_ARGS__); \
+ AppendStringToManifest(manifest, _manifest_s); \
+ pfree(_manifest_s); \
+ }
+
+/*
+ * Initialize state so that we can construct a backup manifest.
+ *
+ * NB: Although the checksum type for the data files is configurable, the
+ * checksum for the manifest itself always uses SHA-256. See comments in
+ * SendBackupManifest.
+ */
+void
+InitializeBackupManifest(backup_manifest_info *manifest,
+ backup_manifest_option want_manifest,
+ pg_checksum_type manifest_checksum_type)
+{
+ memset(manifest, 0, sizeof(backup_manifest_info));
+ manifest->checksum_type = manifest_checksum_type;
+
+ if (want_manifest == MANIFEST_OPTION_NO)
+ manifest->buffile = NULL;
+ else
+ {
+ manifest->buffile = BufFileCreateTemp(false);
+ manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
+ if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
+ elog(ERROR, "failed to initialize checksum of backup manifest: %s",
+ pg_cryptohash_error(manifest->manifest_ctx));
+ }
+
+ manifest->manifest_size = UINT64CONST(0);
+ manifest->force_encode = (want_manifest == MANIFEST_OPTION_FORCE_ENCODE);
+ manifest->first_file = true;
+ manifest->still_checksumming = true;
+
+ if (want_manifest != MANIFEST_OPTION_NO)
+ AppendToManifest(manifest,
+ "{ \"PostgreSQL-Backup-Manifest-Version\": 1,\n"
+ "\"Files\": [");
+}
+
+/*
+ * Free resources assigned to a backup manifest constructed.
+ */
+void
+FreeBackupManifest(backup_manifest_info *manifest)
+{
+ pg_cryptohash_free(manifest->manifest_ctx);
+ manifest->manifest_ctx = NULL;
+}
+
+/*
+ * Add an entry to the backup manifest for a file.
+ */
+void
+AddFileToBackupManifest(backup_manifest_info *manifest, const char *spcoid,
+ const char *pathname, size_t size, pg_time_t mtime,
+ pg_checksum_context *checksum_ctx)
+{
+ char pathbuf[MAXPGPATH];
+ int pathlen;
+ StringInfoData buf;
+
+ if (!IsManifestEnabled(manifest))
+ return;
+
+ /*
+ * If this file is part of a tablespace, the pathname passed to this
+ * function will be relative to the tar file that contains it. We want the
+ * pathname relative to the data directory (ignoring the intermediate
+ * symlink traversal).
+ */
+ if (spcoid != NULL)
+ {
+ snprintf(pathbuf, sizeof(pathbuf), "pg_tblspc/%s/%s", spcoid,
+ pathname);
+ pathname = pathbuf;
+ }
+
+ /*
+ * Each file's entry needs to be separated from any entry that follows by
+ * a comma, but there's no comma before the first one or after the last
+ * one. To make that work, adding a file to the manifest starts by
+ * terminating the most recently added line, with a comma if appropriate,
+ * but does not terminate the line inserted for this file.
+ */
+ initStringInfo(&buf);
+ if (manifest->first_file)
+ {
+ appendStringInfoChar(&buf, '\n');
+ manifest->first_file = false;
+ }
+ else
+ appendStringInfoString(&buf, ",\n");
+
+ /*
+ * Write the relative pathname to this file out to the manifest. The
+ * manifest is always stored in UTF-8, so we have to encode paths that are
+ * not valid in that encoding.
+ */
+ pathlen = strlen(pathname);
+ if (!manifest->force_encode &&
+ pg_verify_mbstr(PG_UTF8, pathname, pathlen, true))
+ {
+ appendStringInfoString(&buf, "{ \"Path\": ");
+ escape_json(&buf, pathname);
+ appendStringInfoString(&buf, ", ");
+ }
+ else
+ {
+ appendStringInfoString(&buf, "{ \"Encoded-Path\": \"");
+ enlargeStringInfo(&buf, 2 * pathlen);
+ buf.len += hex_encode(pathname, pathlen,
+ &buf.data[buf.len]);
+ appendStringInfoString(&buf, "\", ");
+ }
+
+ appendStringInfo(&buf, "\"Size\": %zu, ", size);
+
+ /*
+ * Convert last modification time to a string and append it to the
+ * manifest. Since it's not clear what time zone to use and since time
+ * zone definitions can change, possibly causing confusion, use GMT
+ * always.
+ */
+ appendStringInfoString(&buf, "\"Last-Modified\": \"");
+ enlargeStringInfo(&buf, 128);
+ buf.len += pg_strftime(&buf.data[buf.len], 128, "%Y-%m-%d %H:%M:%S %Z",
+ pg_gmtime(&mtime));
+ appendStringInfoChar(&buf, '"');
+
+ /* Add checksum information. */
+ if (checksum_ctx->type != CHECKSUM_TYPE_NONE)
+ {
+ uint8 checksumbuf[PG_CHECKSUM_MAX_LENGTH];
+ int checksumlen;
+
+ checksumlen = pg_checksum_final(checksum_ctx, checksumbuf);
+ if (checksumlen < 0)
+ elog(ERROR, "could not finalize checksum of file \"%s\"",
+ pathname);
+
+ appendStringInfo(&buf,
+ ", \"Checksum-Algorithm\": \"%s\", \"Checksum\": \"",
+ pg_checksum_type_name(checksum_ctx->type));
+ enlargeStringInfo(&buf, 2 * checksumlen);
+ buf.len += hex_encode((char *) checksumbuf, checksumlen,
+ &buf.data[buf.len]);
+ appendStringInfoChar(&buf, '"');
+ }
+
+ /* Close out the object. */
+ appendStringInfoString(&buf, " }");
+
+ /* OK, add it to the manifest. */
+ AppendStringToManifest(manifest, buf.data);
+
+ /* Avoid leaking memory. */
+ pfree(buf.data);
+}
+
+/*
+ * Add information about the WAL that will need to be replayed when restoring
+ * this backup to the manifest.
+ */
+void
+AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr,
+ TimeLineID starttli, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ List *timelines;
+ ListCell *lc;
+ bool first_wal_range = true;
+ bool found_start_timeline = false;
+
+ if (!IsManifestEnabled(manifest))
+ return;
+
+ /* Terminate the list of files. */
+ AppendStringToManifest(manifest, "\n],\n");
+
+ /* Read the timeline history for the ending timeline. */
+ timelines = readTimeLineHistory(endtli);
+
+ /* Start a list of LSN ranges. */
+ AppendStringToManifest(manifest, "\"WAL-Ranges\": [\n");
+
+ foreach(lc, timelines)
+ {
+ TimeLineHistoryEntry *entry = lfirst(lc);
+ XLogRecPtr tl_beginptr;
+
+ /*
+ * We only care about timelines that were active during the backup.
+ * Skip any that ended before the backup started. (Note that if
+ * entry->end is InvalidXLogRecPtr, it means that the timeline has not
+ * yet ended.)
+ */
+ if (!XLogRecPtrIsInvalid(entry->end) && entry->end < startptr)
+ continue;
+
+ /*
+ * Because the timeline history file lists newer timelines before
+ * older ones, the first timeline we encounter that is new enough to
+ * matter ought to match the ending timeline of the backup.
+ */
+ if (first_wal_range && endtli != entry->tli)
+ ereport(ERROR,
+ errmsg("expected end timeline %u but found timeline %u",
+ starttli, entry->tli));
+
+ /*
+ * If this timeline entry matches with the timeline on which the
+ * backup started, WAL needs to be checked from the start LSN of the
+ * backup. If this entry refers to a newer timeline, WAL needs to be
+ * checked since the beginning of this timeline, so use the LSN where
+ * the timeline began.
+ */
+ if (starttli == entry->tli)
+ tl_beginptr = startptr;
+ else
+ {
+ tl_beginptr = entry->begin;
+
+ /*
+ * If we reach a TLI that has no valid beginning LSN, there can't
+ * be any more timelines in the history after this point, so we'd
+ * better have arrived at the expected starting TLI. If not,
+ * something's gone horribly wrong.
+ */
+ if (XLogRecPtrIsInvalid(entry->begin))
+ ereport(ERROR,
+ errmsg("expected start timeline %u but found timeline %u",
+ starttli, entry->tli));
+ }
+
+ AppendToManifest(manifest,
+ "%s{ \"Timeline\": %u, \"Start-LSN\": \"%X/%X\", \"End-LSN\": \"%X/%X\" }",
+ first_wal_range ? "" : ",\n",
+ entry->tli,
+ LSN_FORMAT_ARGS(tl_beginptr),
+ LSN_FORMAT_ARGS(endptr));
+
+ if (starttli == entry->tli)
+ {
+ found_start_timeline = true;
+ break;
+ }
+
+ endptr = entry->begin;
+ first_wal_range = false;
+ }
+
+ /*
+ * The last entry in the timeline history for the ending timeline should
+ * be the ending timeline itself. Verify that this is what we observed.
+ */
+ if (!found_start_timeline)
+ ereport(ERROR,
+ errmsg("start timeline %u not found in history of timeline %u",
+ starttli, endtli));
+
+ /* Terminate the list of WAL ranges. */
+ AppendStringToManifest(manifest, "\n],\n");
+}
+
+/*
+ * Finalize the backup manifest, and send it to the client.
+ */
+void
+SendBackupManifest(backup_manifest_info *manifest, bbsink *sink)
+{
+ uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH];
+ char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH];
+ size_t manifest_bytes_done = 0;
+
+ if (!IsManifestEnabled(manifest))
+ return;
+
+ /*
+ * Append manifest checksum, so that the problems with the manifest itself
+ * can be detected.
+ *
+ * We always use SHA-256 for this, regardless of what algorithm is chosen
+ * for checksumming the files. If we ever want to make the checksum
+ * algorithm used for the manifest file variable, the client will need a
+ * way to figure out which algorithm to use as close to the beginning of
+ * the manifest file as possible, to avoid having to read the whole thing
+ * twice.
+ */
+ manifest->still_checksumming = false;
+ if (pg_cryptohash_final(manifest->manifest_ctx, checksumbuf,
+ sizeof(checksumbuf)) < 0)
+ elog(ERROR, "failed to finalize checksum of backup manifest: %s",
+ pg_cryptohash_error(manifest->manifest_ctx));
+ AppendStringToManifest(manifest, "\"Manifest-Checksum\": \"");
+
+ hex_encode((char *) checksumbuf, sizeof checksumbuf, checksumstringbuf);
+ checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH - 1] = '\0';
+
+ AppendStringToManifest(manifest, checksumstringbuf);
+ AppendStringToManifest(manifest, "\"}\n");
+
+ /*
+ * We've written all the data to the manifest file. Rewind the file so
+ * that we can read it all back.
+ */
+ if (BufFileSeek(manifest->buffile, 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind temporary file")));
+
+
+ /*
+ * Send the backup manifest.
+ */
+ bbsink_begin_manifest(sink);
+ while (manifest_bytes_done < manifest->manifest_size)
+ {
+ size_t bytes_to_read;
+ size_t rc;
+
+ bytes_to_read = Min(sink->bbs_buffer_length,
+ manifest->manifest_size - manifest_bytes_done);
+ rc = BufFileRead(manifest->buffile, sink->bbs_buffer,
+ bytes_to_read);
+ if (rc != bytes_to_read)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from temporary file: read only %zu of %zu bytes",
+ rc, bytes_to_read)));
+ bbsink_manifest_contents(sink, bytes_to_read);
+ manifest_bytes_done += bytes_to_read;
+ }
+ bbsink_end_manifest(sink);
+
+ /* Release resources */
+ BufFileClose(manifest->buffile);
+}
+
+/*
+ * Append a cstring to the manifest.
+ */
+static void
+AppendStringToManifest(backup_manifest_info *manifest, char *s)
+{
+ int len = strlen(s);
+
+ Assert(manifest != NULL);
+ if (manifest->still_checksumming)
+ {
+ if (pg_cryptohash_update(manifest->manifest_ctx, (uint8 *) s, len) < 0)
+ elog(ERROR, "failed to update checksum of backup manifest: %s",
+ pg_cryptohash_error(manifest->manifest_ctx));
+ }
+ BufFileWrite(manifest->buffile, s, len);
+ manifest->manifest_size += len;
+}
diff --git a/src/backend/backup/basebackup.c b/src/backend/backup/basebackup.c
new file mode 100644
index 0000000..078a3fe
--- /dev/null
+++ b/src/backend/backup/basebackup.c
@@ -0,0 +1,1852 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup.c
+ * code for taking a base backup and streaming it to a standby
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
+#include <time.h>
+
+#include "access/xlog_internal.h"
+#include "backup/backup_manifest.h"
+#include "backup/basebackup.h"
+#include "backup/basebackup_sink.h"
+#include "backup/basebackup_target.h"
+#include "commands/defrem.h"
+#include "common/compression.h"
+#include "common/file_perm.h"
+#include "lib/stringinfo.h"
+#include "miscadmin.h"
+#include "nodes/pg_list.h"
+#include "pgstat.h"
+#include "pgtar.h"
+#include "port.h"
+#include "postmaster/syslogger.h"
+#include "replication/walsender.h"
+#include "replication/walsender_private.h"
+#include "storage/bufpage.h"
+#include "storage/checksum.h"
+#include "storage/dsm_impl.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/reinit.h"
+#include "utils/builtins.h"
+#include "utils/ps_status.h"
+#include "utils/relcache.h"
+#include "utils/resowner.h"
+#include "utils/timestamp.h"
+
+/*
+ * How much data do we want to send in one CopyData message? Note that
+ * this may also result in reading the underlying files in chunks of this
+ * size.
+ *
+ * NB: The buffer size is required to be a multiple of the system block
+ * size, so use that value instead if it's bigger than our preference.
+ */
+#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ)
+
+typedef struct
+{
+ const char *label;
+ bool progress;
+ bool fastcheckpoint;
+ bool nowait;
+ bool includewal;
+ uint32 maxrate;
+ bool sendtblspcmapfile;
+ bool send_to_client;
+ bool use_copytblspc;
+ BaseBackupTargetHandle *target_handle;
+ backup_manifest_option manifest;
+ pg_compress_algorithm compression;
+ pg_compress_specification compression_specification;
+ pg_checksum_type manifest_checksum_type;
+} basebackup_options;
+
+static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly,
+ struct backup_manifest_info *manifest);
+static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks,
+ backup_manifest_info *manifest, const char *spcoid);
+static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
+ struct stat *statbuf, bool missing_ok, Oid dboid,
+ backup_manifest_info *manifest, const char *spcoid);
+static void sendFileWithContent(bbsink *sink, const char *filename,
+ const char *content,
+ backup_manifest_info *manifest);
+static int64 _tarWriteHeader(bbsink *sink, const char *filename,
+ const char *linktarget, struct stat *statbuf,
+ bool sizeonly);
+static void _tarWritePadding(bbsink *sink, int len);
+static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
+static void perform_base_backup(basebackup_options *opt, bbsink *sink);
+static void parse_basebackup_options(List *options, basebackup_options *opt);
+static int compareWalFileNames(const ListCell *a, const ListCell *b);
+static bool is_checksummed_file(const char *fullpath, const char *filename);
+static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
+ const char *filename, bool partial_read_ok);
+
+/* Was the backup currently in-progress initiated in recovery mode? */
+static bool backup_started_in_recovery = false;
+
+/* Total number of checksum failures during base backup. */
+static long long int total_checksum_failures;
+
+/* Do not verify checksums. */
+static bool noverify_checksums = false;
+
+/*
+ * Definition of one element part of an exclusion list, used for paths part
+ * of checksum validation or base backups. "name" is the name of the file
+ * or path to check for exclusion. If "match_prefix" is true, any items
+ * matching the name as prefix are excluded.
+ */
+struct exclude_list_item
+{
+ const char *name;
+ bool match_prefix;
+};
+
+/*
+ * The contents of these directories are removed or recreated during server
+ * start so they are not included in backups. The directories themselves are
+ * kept and included as empty to preserve access permissions.
+ *
+ * Note: this list should be kept in sync with the filter lists in pg_rewind's
+ * filemap.c.
+ */
+static const char *const excludeDirContents[] =
+{
+ /*
+ * Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped
+ * because extensions like pg_stat_statements store data there.
+ */
+ PG_STAT_TMP_DIR,
+
+ /*
+ * It is generally not useful to backup the contents of this directory
+ * even if the intention is to restore to another primary. See backup.sgml
+ * for a more detailed description.
+ */
+ "pg_replslot",
+
+ /* Contents removed on startup, see dsm_cleanup_for_mmap(). */
+ PG_DYNSHMEM_DIR,
+
+ /* Contents removed on startup, see AsyncShmemInit(). */
+ "pg_notify",
+
+ /*
+ * Old contents are loaded for possible debugging but are not required for
+ * normal operation, see SerialInit().
+ */
+ "pg_serial",
+
+ /* Contents removed on startup, see DeleteAllExportedSnapshotFiles(). */
+ "pg_snapshots",
+
+ /* Contents zeroed on startup, see StartupSUBTRANS(). */
+ "pg_subtrans",
+
+ /* end of list */
+ NULL
+};
+
+/*
+ * List of files excluded from backups.
+ */
+static const struct exclude_list_item excludeFiles[] =
+{
+ /* Skip auto conf temporary file. */
+ {PG_AUTOCONF_FILENAME ".tmp", false},
+
+ /* Skip current log file temporary file */
+ {LOG_METAINFO_DATAFILE_TMP, false},
+
+ /*
+ * Skip relation cache because it is rebuilt on startup. This includes
+ * temporary files.
+ */
+ {RELCACHE_INIT_FILENAME, true},
+
+ /*
+ * backup_label and tablespace_map should not exist in a running cluster
+ * capable of doing an online backup, but exclude them just in case.
+ */
+ {BACKUP_LABEL_FILE, false},
+ {TABLESPACE_MAP, false},
+
+ /*
+ * If there's a backup_manifest, it belongs to a backup that was used to
+ * start this server. It is *not* correct for this backup. Our
+ * backup_manifest is injected into the backup separately if users want
+ * it.
+ */
+ {"backup_manifest", false},
+
+ {"postmaster.pid", false},
+ {"postmaster.opts", false},
+
+ /* end of list */
+ {NULL, false}
+};
+
+/*
+ * List of files excluded from checksum validation.
+ *
+ * Note: this list should be kept in sync with what pg_checksums.c
+ * includes.
+ */
+static const struct exclude_list_item noChecksumFiles[] = {
+ {"pg_control", false},
+ {"pg_filenode.map", false},
+ {"pg_internal.init", true},
+ {"PG_VERSION", false},
+#ifdef EXEC_BACKEND
+ {"config_exec_params", true},
+#endif
+ {NULL, false}
+};
+
+/*
+ * Actually do a base backup for the specified tablespaces.
+ *
+ * This is split out mainly to avoid complaints about "variable might be
+ * clobbered by longjmp" from stupider versions of gcc.
+ */
+static void
+perform_base_backup(basebackup_options *opt, bbsink *sink)
+{
+ bbsink_state state;
+ XLogRecPtr endptr;
+ TimeLineID endtli;
+ StringInfo labelfile;
+ StringInfo tblspc_map_file;
+ backup_manifest_info manifest;
+
+ /* Initial backup state, insofar as we know it now. */
+ state.tablespaces = NIL;
+ state.tablespace_num = 0;
+ state.bytes_done = 0;
+ state.bytes_total = 0;
+ state.bytes_total_is_valid = false;
+
+ /* we're going to use a BufFile, so we need a ResourceOwner */
+ Assert(CurrentResourceOwner == NULL);
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "base backup");
+
+ backup_started_in_recovery = RecoveryInProgress();
+
+ labelfile = makeStringInfo();
+ tblspc_map_file = makeStringInfo();
+ InitializeBackupManifest(&manifest, opt->manifest,
+ opt->manifest_checksum_type);
+
+ total_checksum_failures = 0;
+
+ basebackup_progress_wait_checkpoint();
+ state.startptr = do_pg_backup_start(opt->label, opt->fastcheckpoint,
+ &state.starttli,
+ labelfile, &state.tablespaces,
+ tblspc_map_file);
+
+ /*
+ * Once do_pg_backup_start has been called, ensure that any failure causes
+ * us to abort the backup so we don't "leak" a backup counter. For this
+ * reason, *all* functionality between do_pg_backup_start() and the end of
+ * do_pg_backup_stop() should be inside the error cleanup block!
+ */
+
+ PG_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
+ {
+ ListCell *lc;
+ tablespaceinfo *ti;
+
+ /* Add a node for the base directory at the end */
+ ti = palloc0(sizeof(tablespaceinfo));
+ ti->size = -1;
+ state.tablespaces = lappend(state.tablespaces, ti);
+
+ /*
+ * Calculate the total backup size by summing up the size of each
+ * tablespace
+ */
+ if (opt->progress)
+ {
+ basebackup_progress_estimate_backup_size();
+
+ foreach(lc, state.tablespaces)
+ {
+ tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc);
+
+ if (tmp->path == NULL)
+ tmp->size = sendDir(sink, ".", 1, true, state.tablespaces,
+ true, NULL, NULL);
+ else
+ tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true,
+ NULL);
+ state.bytes_total += tmp->size;
+ }
+ state.bytes_total_is_valid = true;
+ }
+
+ /* notify basebackup sink about start of backup */
+ bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH);
+
+ /* Send off our tablespaces one by one */
+ foreach(lc, state.tablespaces)
+ {
+ tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+ if (ti->path == NULL)
+ {
+ struct stat statbuf;
+ bool sendtblspclinks = true;
+
+ bbsink_begin_archive(sink, "base.tar");
+
+ /* In the main tar, include the backup_label first... */
+ sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data,
+ &manifest);
+
+ /* Then the tablespace_map file, if required... */
+ if (opt->sendtblspcmapfile)
+ {
+ sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data,
+ &manifest);
+ sendtblspclinks = false;
+ }
+
+ /* Then the bulk of the files... */
+ sendDir(sink, ".", 1, false, state.tablespaces,
+ sendtblspclinks, &manifest, NULL);
+
+ /* ... and pg_control after everything else. */
+ if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ XLOG_CONTROL_FILE)));
+ sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
+ false, InvalidOid, &manifest, NULL);
+ }
+ else
+ {
+ char *archive_name = psprintf("%s.tar", ti->oid);
+
+ bbsink_begin_archive(sink, archive_name);
+
+ sendTablespace(sink, ti->path, ti->oid, false, &manifest);
+ }
+
+ /*
+ * If we're including WAL, and this is the main data directory we
+ * don't treat this as the end of the tablespace. Instead, we will
+ * include the xlog files below and stop afterwards. This is safe
+ * since the main data directory is always sent *last*.
+ */
+ if (opt->includewal && ti->path == NULL)
+ {
+ Assert(lnext(state.tablespaces, lc) == NULL);
+ }
+ else
+ {
+ /* Properly terminate the tarfile. */
+ StaticAssertStmt(2 * TAR_BLOCK_SIZE <= BLCKSZ,
+ "BLCKSZ too small for 2 tar blocks");
+ memset(sink->bbs_buffer, 0, 2 * TAR_BLOCK_SIZE);
+ bbsink_archive_contents(sink, 2 * TAR_BLOCK_SIZE);
+
+ /* OK, that's the end of the archive. */
+ bbsink_end_archive(sink);
+ }
+ }
+
+ basebackup_progress_wait_wal_archive(&state);
+ endptr = do_pg_backup_stop(labelfile->data, !opt->nowait, &endtli);
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
+
+
+ if (opt->includewal)
+ {
+ /*
+ * We've left the last tar file "open", so we can now append the
+ * required WAL files to it.
+ */
+ char pathbuf[MAXPGPATH];
+ XLogSegNo segno;
+ XLogSegNo startsegno;
+ XLogSegNo endsegno;
+ struct stat statbuf;
+ List *historyFileList = NIL;
+ List *walFileList = NIL;
+ char firstoff[MAXFNAMELEN];
+ char lastoff[MAXFNAMELEN];
+ DIR *dir;
+ struct dirent *de;
+ ListCell *lc;
+ TimeLineID tli;
+
+ basebackup_progress_transfer_wal();
+
+ /*
+ * I'd rather not worry about timelines here, so scan pg_wal and
+ * include all WAL files in the range between 'startptr' and 'endptr',
+ * regardless of the timeline the file is stamped with. If there are
+ * some spurious WAL files belonging to timelines that don't belong in
+ * this server's history, they will be included too. Normally there
+ * shouldn't be such files, but if there are, there's little harm in
+ * including them.
+ */
+ XLByteToSeg(state.startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size);
+ XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
+ XLogFileName(lastoff, endtli, endsegno, wal_segment_size);
+
+ dir = AllocateDir("pg_wal");
+ while ((de = ReadDir(dir, "pg_wal")) != NULL)
+ {
+ /* Does it look like a WAL segment, and is it in the range? */
+ if (IsXLogFileName(de->d_name) &&
+ strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
+ strcmp(de->d_name + 8, lastoff + 8) <= 0)
+ {
+ walFileList = lappend(walFileList, pstrdup(de->d_name));
+ }
+ /* Does it look like a timeline history file? */
+ else if (IsTLHistoryFileName(de->d_name))
+ {
+ historyFileList = lappend(historyFileList, pstrdup(de->d_name));
+ }
+ }
+ FreeDir(dir);
+
+ /*
+ * Before we go any further, check that none of the WAL segments we
+ * need were removed.
+ */
+ CheckXLogRemoved(startsegno, state.starttli);
+
+ /*
+ * Sort the WAL filenames. We want to send the files in order from
+ * oldest to newest, to reduce the chance that a file is recycled
+ * before we get a chance to send it over.
+ */
+ list_sort(walFileList, compareWalFileNames);
+
+ /*
+ * There must be at least one xlog file in the pg_wal directory, since
+ * we are doing backup-including-xlog.
+ */
+ if (walFileList == NIL)
+ ereport(ERROR,
+ (errmsg("could not find any WAL files")));
+
+ /*
+ * Sanity check: the first and last segment should cover startptr and
+ * endptr, with no gaps in between.
+ */
+ XLogFromFileName((char *) linitial(walFileList),
+ &tli, &segno, wal_segment_size);
+ if (segno != startsegno)
+ {
+ char startfname[MAXFNAMELEN];
+
+ XLogFileName(startfname, state.starttli, startsegno,
+ wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", startfname)));
+ }
+ foreach(lc, walFileList)
+ {
+ char *walFileName = (char *) lfirst(lc);
+ XLogSegNo currsegno = segno;
+ XLogSegNo nextsegno = segno + 1;
+
+ XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+ if (!(nextsegno == segno || currsegno == segno))
+ {
+ char nextfname[MAXFNAMELEN];
+
+ XLogFileName(nextfname, tli, nextsegno, wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", nextfname)));
+ }
+ }
+ if (segno != endsegno)
+ {
+ char endfname[MAXFNAMELEN];
+
+ XLogFileName(endfname, endtli, endsegno, wal_segment_size);
+ ereport(ERROR,
+ (errmsg("could not find WAL file \"%s\"", endfname)));
+ }
+
+ /* Ok, we have everything we need. Send the WAL files. */
+ foreach(lc, walFileList)
+ {
+ char *walFileName = (char *) lfirst(lc);
+ int fd;
+ size_t cnt;
+ pgoff_t len = 0;
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
+ XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+
+ fd = OpenTransientFile(pathbuf, O_RDONLY | PG_BINARY);
+ if (fd < 0)
+ {
+ int save_errno = errno;
+
+ /*
+ * Most likely reason for this is that the file was already
+ * removed by a checkpoint, so check for that to get a better
+ * error message.
+ */
+ CheckXLogRemoved(segno, tli);
+
+ errno = save_errno;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", pathbuf)));
+ }
+
+ if (fstat(fd, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ pathbuf)));
+ if (statbuf.st_size != wal_segment_size)
+ {
+ CheckXLogRemoved(segno, tli);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected WAL file size \"%s\"", walFileName)));
+ }
+
+ /* send the WAL file itself */
+ _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false);
+
+ while ((cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length,
+ wal_segment_size - len),
+ len, pathbuf, true)) > 0)
+ {
+ CheckXLogRemoved(segno, tli);
+ bbsink_archive_contents(sink, cnt);
+
+ len += cnt;
+
+ if (len == wal_segment_size)
+ break;
+ }
+
+ if (len != wal_segment_size)
+ {
+ CheckXLogRemoved(segno, tli);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected WAL file size \"%s\"", walFileName)));
+ }
+
+ /*
+ * wal_segment_size is a multiple of TAR_BLOCK_SIZE, so no need
+ * for padding.
+ */
+ Assert(wal_segment_size % TAR_BLOCK_SIZE == 0);
+
+ CloseTransientFile(fd);
+
+ /*
+ * Mark file as archived, otherwise files can get archived again
+ * after promotion of a new node. This is in line with
+ * walreceiver.c always doing an XLogArchiveForceDone() after a
+ * complete segment.
+ */
+ StatusFilePath(pathbuf, walFileName, ".done");
+ sendFileWithContent(sink, pathbuf, "", &manifest);
+ }
+
+ /*
+ * Send timeline history files too. Only the latest timeline history
+ * file is required for recovery, and even that only if there happens
+ * to be a timeline switch in the first WAL segment that contains the
+ * checkpoint record, or if we're taking a base backup from a standby
+ * server and the target timeline changes while the backup is taken.
+ * But they are small and highly useful for debugging purposes, so
+ * better include them all, always.
+ */
+ foreach(lc, historyFileList)
+ {
+ char *fname = lfirst(lc);
+
+ snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
+
+ if (lstat(pathbuf, &statbuf) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", pathbuf)));
+
+ sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid,
+ &manifest, NULL);
+
+ /* unconditionally mark file as archived */
+ StatusFilePath(pathbuf, fname, ".done");
+ sendFileWithContent(sink, pathbuf, "", &manifest);
+ }
+
+ /* Properly terminate the tar file. */
+ StaticAssertStmt(2 * TAR_BLOCK_SIZE <= BLCKSZ,
+ "BLCKSZ too small for 2 tar blocks");
+ memset(sink->bbs_buffer, 0, 2 * TAR_BLOCK_SIZE);
+ bbsink_archive_contents(sink, 2 * TAR_BLOCK_SIZE);
+
+ /* OK, that's the end of the archive. */
+ bbsink_end_archive(sink);
+ }
+
+ AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli,
+ endptr, endtli);
+
+ SendBackupManifest(&manifest, sink);
+
+ bbsink_end_backup(sink, endptr, endtli);
+
+ if (total_checksum_failures)
+ {
+ if (total_checksum_failures > 1)
+ ereport(WARNING,
+ (errmsg_plural("%lld total checksum verification failure",
+ "%lld total checksum verification failures",
+ total_checksum_failures,
+ total_checksum_failures)));
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("checksum verification failure during base backup")));
+ }
+
+ /*
+ * Make sure to free the manifest before the resource owners as manifests
+ * use cryptohash contexts that may depend on resource owners (like
+ * OpenSSL).
+ */
+ FreeBackupManifest(&manifest);
+
+ /* clean up the resource owner we created */
+ WalSndResourceCleanup(true);
+
+ basebackup_progress_done();
+}
+
+/*
+ * list_sort comparison function, to compare log/seg portion of WAL segment
+ * filenames, ignoring the timeline portion.
+ */
+static int
+compareWalFileNames(const ListCell *a, const ListCell *b)
+{
+ char *fna = (char *) lfirst(a);
+ char *fnb = (char *) lfirst(b);
+
+ return strcmp(fna + 8, fnb + 8);
+}
+
+/*
+ * Parse the base backup options passed down by the parser
+ */
+static void
+parse_basebackup_options(List *options, basebackup_options *opt)
+{
+ ListCell *lopt;
+ bool o_label = false;
+ bool o_progress = false;
+ bool o_checkpoint = false;
+ bool o_nowait = false;
+ bool o_wal = false;
+ bool o_maxrate = false;
+ bool o_tablespace_map = false;
+ bool o_noverify_checksums = false;
+ bool o_manifest = false;
+ bool o_manifest_checksums = false;
+ bool o_target = false;
+ bool o_target_detail = false;
+ char *target_str = NULL;
+ char *target_detail_str = NULL;
+ bool o_compression = false;
+ bool o_compression_detail = false;
+ char *compression_detail_str = NULL;
+
+ MemSet(opt, 0, sizeof(*opt));
+ opt->manifest = MANIFEST_OPTION_NO;
+ opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C;
+ opt->compression = PG_COMPRESSION_NONE;
+ opt->compression_specification.algorithm = PG_COMPRESSION_NONE;
+
+ foreach(lopt, options)
+ {
+ DefElem *defel = (DefElem *) lfirst(lopt);
+
+ if (strcmp(defel->defname, "label") == 0)
+ {
+ if (o_label)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ opt->label = defGetString(defel);
+ o_label = true;
+ }
+ else if (strcmp(defel->defname, "progress") == 0)
+ {
+ if (o_progress)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ opt->progress = defGetBoolean(defel);
+ o_progress = true;
+ }
+ else if (strcmp(defel->defname, "checkpoint") == 0)
+ {
+ char *optval = defGetString(defel);
+
+ if (o_checkpoint)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ if (pg_strcasecmp(optval, "fast") == 0)
+ opt->fastcheckpoint = true;
+ else if (pg_strcasecmp(optval, "spread") == 0)
+ opt->fastcheckpoint = false;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized checkpoint type: \"%s\"",
+ optval)));
+ o_checkpoint = true;
+ }
+ else if (strcmp(defel->defname, "wait") == 0)
+ {
+ if (o_nowait)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ opt->nowait = !defGetBoolean(defel);
+ o_nowait = true;
+ }
+ else if (strcmp(defel->defname, "wal") == 0)
+ {
+ if (o_wal)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ opt->includewal = defGetBoolean(defel);
+ o_wal = true;
+ }
+ else if (strcmp(defel->defname, "max_rate") == 0)
+ {
+ int64 maxrate;
+
+ if (o_maxrate)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+
+ maxrate = defGetInt64(defel);
+ if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)",
+ (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER)));
+
+ opt->maxrate = (uint32) maxrate;
+ o_maxrate = true;
+ }
+ else if (strcmp(defel->defname, "tablespace_map") == 0)
+ {
+ if (o_tablespace_map)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ opt->sendtblspcmapfile = defGetBoolean(defel);
+ o_tablespace_map = true;
+ }
+ else if (strcmp(defel->defname, "verify_checksums") == 0)
+ {
+ if (o_noverify_checksums)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ noverify_checksums = !defGetBoolean(defel);
+ o_noverify_checksums = true;
+ }
+ else if (strcmp(defel->defname, "manifest") == 0)
+ {
+ char *optval = defGetString(defel);
+ bool manifest_bool;
+
+ if (o_manifest)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ if (parse_bool(optval, &manifest_bool))
+ {
+ if (manifest_bool)
+ opt->manifest = MANIFEST_OPTION_YES;
+ else
+ opt->manifest = MANIFEST_OPTION_NO;
+ }
+ else if (pg_strcasecmp(optval, "force-encode") == 0)
+ opt->manifest = MANIFEST_OPTION_FORCE_ENCODE;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized manifest option: \"%s\"",
+ optval)));
+ o_manifest = true;
+ }
+ else if (strcmp(defel->defname, "manifest_checksums") == 0)
+ {
+ char *optval = defGetString(defel);
+
+ if (o_manifest_checksums)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ if (!pg_checksum_parse_type(optval,
+ &opt->manifest_checksum_type))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized checksum algorithm: \"%s\"",
+ optval)));
+ o_manifest_checksums = true;
+ }
+ else if (strcmp(defel->defname, "target") == 0)
+ {
+ if (o_target)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ target_str = defGetString(defel);
+ o_target = true;
+ }
+ else if (strcmp(defel->defname, "target_detail") == 0)
+ {
+ char *optval = defGetString(defel);
+
+ if (o_target_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ target_detail_str = optval;
+ o_target_detail = true;
+ }
+ else if (strcmp(defel->defname, "compression") == 0)
+ {
+ char *optval = defGetString(defel);
+
+ if (o_compression)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ if (!parse_compress_algorithm(optval, &opt->compression))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized compression algorithm: \"%s\"",
+ optval)));
+ o_compression = true;
+ }
+ else if (strcmp(defel->defname, "compression_detail") == 0)
+ {
+ if (o_compression_detail)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ compression_detail_str = defGetString(defel);
+ o_compression_detail = true;
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized base backup option: \"%s\"",
+ defel->defname)));
+ }
+
+ if (opt->label == NULL)
+ opt->label = "base backup";
+ if (opt->manifest == MANIFEST_OPTION_NO)
+ {
+ if (o_manifest_checksums)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("manifest checksums require a backup manifest")));
+ opt->manifest_checksum_type = CHECKSUM_TYPE_NONE;
+ }
+
+ if (target_str == NULL)
+ {
+ if (target_detail_str != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("target detail cannot be used without target")));
+ opt->use_copytblspc = true;
+ opt->send_to_client = true;
+ }
+ else if (strcmp(target_str, "client") == 0)
+ {
+ if (target_detail_str != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("target \"%s\" does not accept a target detail",
+ target_str)));
+ opt->send_to_client = true;
+ }
+ else
+ opt->target_handle =
+ BaseBackupGetTargetHandle(target_str, target_detail_str);
+
+ if (o_compression_detail && !o_compression)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("compression detail cannot be specified unless compression is enabled")));
+
+ if (o_compression)
+ {
+ char *error_detail;
+
+ parse_compress_specification(opt->compression, compression_detail_str,
+ &opt->compression_specification);
+ error_detail =
+ validate_compress_specification(&opt->compression_specification);
+ if (error_detail != NULL)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid compression specification: %s",
+ error_detail));
+ }
+}
+
+
+/*
+ * SendBaseBackup() - send a complete base backup.
+ *
+ * The function will put the system into backup mode like pg_backup_start()
+ * does, so that the backup is consistent even though we read directly from
+ * the filesystem, bypassing the buffer cache.
+ */
+void
+SendBaseBackup(BaseBackupCmd *cmd)
+{
+ basebackup_options opt;
+ bbsink *sink;
+ SessionBackupState status = get_backup_status();
+
+ if (status == SESSION_BACKUP_RUNNING)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("a backup is already in progress in this session")));
+
+ parse_basebackup_options(cmd->options, &opt);
+
+ WalSndSetState(WALSNDSTATE_BACKUP);
+
+ if (update_process_title)
+ {
+ char activitymsg[50];
+
+ snprintf(activitymsg, sizeof(activitymsg), "sending backup \"%s\"",
+ opt.label);
+ set_ps_display(activitymsg);
+ }
+
+ /*
+ * If the target is specifically 'client' then set up to stream the backup
+ * to the client; otherwise, it's being sent someplace else and should not
+ * be sent to the client. BaseBackupGetSink has the job of setting up a
+ * sink to send the backup data wherever it needs to go.
+ */
+ sink = bbsink_copystream_new(opt.send_to_client);
+ if (opt.target_handle != NULL)
+ sink = BaseBackupGetSink(opt.target_handle, sink);
+
+ /* Set up network throttling, if client requested it */
+ if (opt.maxrate > 0)
+ sink = bbsink_throttle_new(sink, opt.maxrate);
+
+ /* Set up server-side compression, if client requested it */
+ if (opt.compression == PG_COMPRESSION_GZIP)
+ sink = bbsink_gzip_new(sink, &opt.compression_specification);
+ else if (opt.compression == PG_COMPRESSION_LZ4)
+ sink = bbsink_lz4_new(sink, &opt.compression_specification);
+ else if (opt.compression == PG_COMPRESSION_ZSTD)
+ sink = bbsink_zstd_new(sink, &opt.compression_specification);
+
+ /* Set up progress reporting. */
+ sink = bbsink_progress_new(sink, opt.progress);
+
+ /*
+ * Perform the base backup, but make sure we clean up the bbsink even if
+ * an error occurs.
+ */
+ PG_TRY();
+ {
+ perform_base_backup(&opt, sink);
+ }
+ PG_FINALLY();
+ {
+ bbsink_cleanup(sink);
+ }
+ PG_END_TRY();
+}
+
+/*
+ * Inject a file with given name and content in the output tar stream.
+ */
+static void
+sendFileWithContent(bbsink *sink, const char *filename, const char *content,
+ backup_manifest_info *manifest)
+{
+ struct stat statbuf;
+ int bytes_done = 0,
+ len;
+ pg_checksum_context checksum_ctx;
+
+ if (pg_checksum_init(&checksum_ctx, manifest->checksum_type) < 0)
+ elog(ERROR, "could not initialize checksum of file \"%s\"",
+ filename);
+
+ len = strlen(content);
+
+ /*
+ * Construct a stat struct for the backup_label file we're injecting in
+ * the tar.
+ */
+ /* Windows doesn't have the concept of uid and gid */
+#ifdef WIN32
+ statbuf.st_uid = 0;
+ statbuf.st_gid = 0;
+#else
+ statbuf.st_uid = geteuid();
+ statbuf.st_gid = getegid();
+#endif
+ statbuf.st_mtime = time(NULL);
+ statbuf.st_mode = pg_file_create_mode;
+ statbuf.st_size = len;
+
+ _tarWriteHeader(sink, filename, NULL, &statbuf, false);
+
+ if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
+ elog(ERROR, "could not update checksum of file \"%s\"",
+ filename);
+
+ while (bytes_done < len)
+ {
+ size_t remaining = len - bytes_done;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
+
+ memcpy(sink->bbs_buffer, content, nbytes);
+ bbsink_archive_contents(sink, nbytes);
+ bytes_done += nbytes;
+ content += nbytes;
+ }
+
+ _tarWritePadding(sink, len);
+
+ AddFileToBackupManifest(manifest, NULL, filename, len,
+ (pg_time_t) statbuf.st_mtime, &checksum_ctx);
+}
+
+/*
+ * Include the tablespace directory pointed to by 'path' in the output tar
+ * stream. If 'sizeonly' is true, we just calculate a total length and return
+ * it, without actually sending anything.
+ *
+ * Only used to send auxiliary tablespaces, not PGDATA.
+ */
+static int64
+sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly,
+ backup_manifest_info *manifest)
+{
+ int64 size;
+ char pathbuf[MAXPGPATH];
+ struct stat statbuf;
+
+ /*
+ * 'path' points to the tablespace location, but we only want to include
+ * the version directory in it that belongs to us.
+ */
+ snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path,
+ TABLESPACE_VERSION_DIRECTORY);
+
+ /*
+ * Store a directory entry in the tar file so we get the permissions
+ * right.
+ */
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the tablespace went away while scanning, it's no error. */
+ return 0;
+ }
+
+ size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
+ sizeonly);
+
+ /* Send all the files in the tablespace version directory */
+ size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest,
+ spcoid);
+
+ return size;
+}
+
+/*
+ * Include all files from the given directory in the output tar stream. If
+ * 'sizeonly' is true, we just calculate a total length and return it, without
+ * actually sending anything.
+ *
+ * Omit any directory in the tablespaces list, to avoid backing up
+ * tablespaces twice when they were created inside PGDATA.
+ *
+ * If sendtblspclinks is true, we need to include symlink
+ * information in the tar file. If not, we can skip that
+ * as it will be sent separately in the tablespace_map file.
+ */
+static int64
+sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest,
+ const char *spcoid)
+{
+ DIR *dir;
+ struct dirent *de;
+ char pathbuf[MAXPGPATH * 2];
+ struct stat statbuf;
+ int64 size = 0;
+ const char *lastDir; /* Split last dir from parent path. */
+ bool isDbDir = false; /* Does this directory contain relations? */
+
+ /*
+ * Determine if the current path is a database directory that can contain
+ * relations.
+ *
+ * Start by finding the location of the delimiter between the parent path
+ * and the current path.
+ */
+ lastDir = last_dir_separator(path);
+
+ /* Does this path look like a database path (i.e. all digits)? */
+ if (lastDir != NULL &&
+ strspn(lastDir + 1, "0123456789") == strlen(lastDir + 1))
+ {
+ /* Part of path that contains the parent directory. */
+ int parentPathLen = lastDir - path;
+
+ /*
+ * Mark path as a database directory if the parent path is either
+ * $PGDATA/base or a tablespace version path.
+ */
+ if (strncmp(path, "./base", parentPathLen) == 0 ||
+ (parentPathLen >= (sizeof(TABLESPACE_VERSION_DIRECTORY) - 1) &&
+ strncmp(lastDir - (sizeof(TABLESPACE_VERSION_DIRECTORY) - 1),
+ TABLESPACE_VERSION_DIRECTORY,
+ sizeof(TABLESPACE_VERSION_DIRECTORY) - 1) == 0))
+ isDbDir = true;
+ }
+
+ dir = AllocateDir(path);
+ while ((de = ReadDir(dir, path)) != NULL)
+ {
+ int excludeIdx;
+ bool excludeFound;
+ ForkNumber relForkNum; /* Type of fork if file is a relation */
+ int relOidChars; /* Chars in filename that are the rel oid */
+
+ /* Skip special stuff */
+ if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
+ continue;
+
+ /* Skip temporary files */
+ if (strncmp(de->d_name,
+ PG_TEMP_FILE_PREFIX,
+ strlen(PG_TEMP_FILE_PREFIX)) == 0)
+ continue;
+
+ /*
+ * Check if the postmaster has signaled us to exit, and abort with an
+ * error in that case. The error handler further up will call
+ * do_pg_abort_backup() for us. Also check that if the backup was
+ * started while still in recovery, the server wasn't promoted.
+ * do_pg_backup_stop() will check that too, but it's better to stop
+ * the backup early than continue to the end and fail there.
+ */
+ CHECK_FOR_INTERRUPTS();
+ if (RecoveryInProgress() != backup_started_in_recovery)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("the standby was promoted during online backup"),
+ errhint("This means that the backup being taken is corrupt "
+ "and should not be used. "
+ "Try taking another online backup.")));
+
+ /* Scan for files that should be excluded */
+ excludeFound = false;
+ for (excludeIdx = 0; excludeFiles[excludeIdx].name != NULL; excludeIdx++)
+ {
+ int cmplen = strlen(excludeFiles[excludeIdx].name);
+
+ if (!excludeFiles[excludeIdx].match_prefix)
+ cmplen++;
+ if (strncmp(de->d_name, excludeFiles[excludeIdx].name, cmplen) == 0)
+ {
+ elog(DEBUG1, "file \"%s\" excluded from backup", de->d_name);
+ excludeFound = true;
+ break;
+ }
+ }
+
+ if (excludeFound)
+ continue;
+
+ /* Exclude all forks for unlogged tables except the init fork */
+ if (isDbDir &&
+ parse_filename_for_nontemp_relation(de->d_name, &relOidChars,
+ &relForkNum))
+ {
+ /* Never exclude init forks */
+ if (relForkNum != INIT_FORKNUM)
+ {
+ char initForkFile[MAXPGPATH];
+ char relOid[OIDCHARS + 1];
+
+ /*
+ * If any other type of fork, check if there is an init fork
+ * with the same OID. If so, the file can be excluded.
+ */
+ memcpy(relOid, de->d_name, relOidChars);
+ relOid[relOidChars] = '\0';
+ snprintf(initForkFile, sizeof(initForkFile), "%s/%s_init",
+ path, relOid);
+
+ if (lstat(initForkFile, &statbuf) == 0)
+ {
+ elog(DEBUG2,
+ "unlogged relation file \"%s\" excluded from backup",
+ de->d_name);
+
+ continue;
+ }
+ }
+ }
+
+ /* Exclude temporary relations */
+ if (isDbDir && looks_like_temp_rel_name(de->d_name))
+ {
+ elog(DEBUG2,
+ "temporary relation file \"%s\" excluded from backup",
+ de->d_name);
+
+ continue;
+ }
+
+ snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path, de->d_name);
+
+ /* Skip pg_control here to back up it last */
+ if (strcmp(pathbuf, "./global/pg_control") == 0)
+ continue;
+
+ if (lstat(pathbuf, &statbuf) != 0)
+ {
+ if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file or directory \"%s\": %m",
+ pathbuf)));
+
+ /* If the file went away while scanning, it's not an error. */
+ continue;
+ }
+
+ /* Scan for directories whose contents should be excluded */
+ excludeFound = false;
+ for (excludeIdx = 0; excludeDirContents[excludeIdx] != NULL; excludeIdx++)
+ {
+ if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
+ {
+ elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+ convert_link_to_directory(pathbuf, &statbuf);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
+ excludeFound = true;
+ break;
+ }
+ }
+
+ if (excludeFound)
+ continue;
+
+ /*
+ * We can skip pg_wal, the WAL segments need to be fetched from the
+ * WAL archive anyway. But include it as an empty directory anyway, so
+ * we get permissions right.
+ */
+ if (strcmp(pathbuf, "./pg_wal") == 0)
+ {
+ /* If pg_wal is a symlink, write it as a directory anyway */
+ convert_link_to_directory(pathbuf, &statbuf);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
+
+ /*
+ * Also send archive_status directory (by hackishly reusing
+ * statbuf from above ...).
+ */
+ size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL,
+ &statbuf, sizeonly);
+
+ continue; /* don't recurse into pg_wal */
+ }
+
+ /* Allow symbolic links in pg_tblspc only */
+ if (strcmp(path, "./pg_tblspc") == 0 &&
+#ifndef WIN32
+ S_ISLNK(statbuf.st_mode)
+#else
+ pgwin32_is_junction(pathbuf)
+#endif
+ )
+ {
+#if defined(HAVE_READLINK) || defined(WIN32)
+ char linkpath[MAXPGPATH];
+ int rllen;
+
+ rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+ if (rllen < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read symbolic link \"%s\": %m",
+ pathbuf)));
+ if (rllen >= sizeof(linkpath))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("symbolic link \"%s\" target is too long",
+ pathbuf)));
+ linkpath[rllen] = '\0';
+
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath,
+ &statbuf, sizeonly);
+#else
+
+ /*
+ * If the platform does not have symbolic links, it should not be
+ * possible to have tablespaces - clearly somebody else created
+ * them. Warn about it and ignore.
+ */
+ ereport(WARNING,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("tablespaces are not supported on this platform")));
+ continue;
+#endif /* HAVE_READLINK */
+ }
+ else if (S_ISDIR(statbuf.st_mode))
+ {
+ bool skip_this_dir = false;
+ ListCell *lc;
+
+ /*
+ * Store a directory entry in the tar file so we can get the
+ * permissions right.
+ */
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf,
+ sizeonly);
+
+ /*
+ * Call ourselves recursively for a directory, unless it happens
+ * to be a separate tablespace located within PGDATA.
+ */
+ foreach(lc, tablespaces)
+ {
+ tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+ /*
+ * ti->rpath is the tablespace relative path within PGDATA, or
+ * NULL if the tablespace has been properly located somewhere
+ * else.
+ *
+ * Skip past the leading "./" in pathbuf when comparing.
+ */
+ if (ti->rpath && strcmp(ti->rpath, pathbuf + 2) == 0)
+ {
+ skip_this_dir = true;
+ break;
+ }
+ }
+
+ /*
+ * skip sending directories inside pg_tblspc, if not required.
+ */
+ if (strcmp(pathbuf, "./pg_tblspc") == 0 && !sendtblspclinks)
+ skip_this_dir = true;
+
+ if (!skip_this_dir)
+ size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces,
+ sendtblspclinks, manifest, spcoid);
+ }
+ else if (S_ISREG(statbuf.st_mode))
+ {
+ bool sent = false;
+
+ if (!sizeonly)
+ sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf,
+ true, isDbDir ? atooid(lastDir + 1) : InvalidOid,
+ manifest, spcoid);
+
+ if (sent || sizeonly)
+ {
+ /* Add size. */
+ size += statbuf.st_size;
+
+ /* Pad to a multiple of the tar block size. */
+ size += tarPaddingBytesRequired(statbuf.st_size);
+
+ /* Size of the header for the file. */
+ size += TAR_BLOCK_SIZE;
+ }
+ }
+ else
+ ereport(WARNING,
+ (errmsg("skipping special file \"%s\"", pathbuf)));
+ }
+ FreeDir(dir);
+ return size;
+}
+
+/*
+ * Check if a file should have its checksum validated.
+ * We validate checksums on files in regular tablespaces
+ * (including global and default) only, and in those there
+ * are some files that are explicitly excluded.
+ */
+static bool
+is_checksummed_file(const char *fullpath, const char *filename)
+{
+ /* Check that the file is in a tablespace */
+ if (strncmp(fullpath, "./global/", 9) == 0 ||
+ strncmp(fullpath, "./base/", 7) == 0 ||
+ strncmp(fullpath, "/", 1) == 0)
+ {
+ int excludeIdx;
+
+ /* Compare file against noChecksumFiles skip list */
+ for (excludeIdx = 0; noChecksumFiles[excludeIdx].name != NULL; excludeIdx++)
+ {
+ int cmplen = strlen(noChecksumFiles[excludeIdx].name);
+
+ if (!noChecksumFiles[excludeIdx].match_prefix)
+ cmplen++;
+ if (strncmp(filename, noChecksumFiles[excludeIdx].name,
+ cmplen) == 0)
+ return false;
+ }
+
+ return true;
+ }
+ else
+ return false;
+}
+
+/*****
+ * Functions for handling tar file format
+ *
+ * Copied from pg_dump, but modified to work with libpq for sending
+ */
+
+
+/*
+ * Given the member, write the TAR header & send the file.
+ *
+ * If 'missing_ok' is true, will not throw an error if the file is not found.
+ *
+ * If dboid is anything other than InvalidOid then any checksum failures
+ * detected will get reported to the cumulative stats system.
+ *
+ * Returns true if the file was successfully sent, false if 'missing_ok',
+ * and the file did not exist.
+ */
+static bool
+sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
+ struct stat *statbuf, bool missing_ok, Oid dboid,
+ backup_manifest_info *manifest, const char *spcoid)
+{
+ int fd;
+ BlockNumber blkno = 0;
+ bool block_retry = false;
+ uint16 checksum;
+ int checksum_failures = 0;
+ off_t cnt;
+ int i;
+ pgoff_t len = 0;
+ char *page;
+ PageHeader phdr;
+ int segmentno = 0;
+ char *segmentpath;
+ bool verify_checksum = false;
+ pg_checksum_context checksum_ctx;
+
+ if (pg_checksum_init(&checksum_ctx, manifest->checksum_type) < 0)
+ elog(ERROR, "could not initialize checksum of file \"%s\"",
+ readfilename);
+
+ fd = OpenTransientFile(readfilename, O_RDONLY | PG_BINARY);
+ if (fd < 0)
+ {
+ if (errno == ENOENT && missing_ok)
+ return false;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", readfilename)));
+ }
+
+ _tarWriteHeader(sink, tarfilename, NULL, statbuf, false);
+
+ if (!noverify_checksums && DataChecksumsEnabled())
+ {
+ char *filename;
+
+ /*
+ * Get the filename (excluding path). As last_dir_separator()
+ * includes the last directory separator, we chop that off by
+ * incrementing the pointer.
+ */
+ filename = last_dir_separator(readfilename) + 1;
+
+ if (is_checksummed_file(readfilename, filename))
+ {
+ verify_checksum = true;
+
+ /*
+ * Cut off at the segment boundary (".") to get the segment number
+ * in order to mix it into the checksum.
+ */
+ segmentpath = strstr(filename, ".");
+ if (segmentpath != NULL)
+ {
+ segmentno = atoi(segmentpath + 1);
+ if (segmentno == 0)
+ ereport(ERROR,
+ (errmsg("invalid segment number %d in file \"%s\"",
+ segmentno, filename)));
+ }
+ }
+ }
+
+ /*
+ * Loop until we read the amount of data the caller told us to expect. The
+ * file could be longer, if it was extended while we were sending it, but
+ * for a base backup we can ignore such extended data. It will be restored
+ * from WAL.
+ */
+ while (len < statbuf->st_size)
+ {
+ size_t remaining = statbuf->st_size - len;
+
+ /* Try to read some more data. */
+ cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length, remaining),
+ len, readfilename, true);
+
+ /*
+ * The checksums are verified at block level, so we iterate over the
+ * buffer in chunks of BLCKSZ, after making sure that
+ * TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of
+ * BLCKSZ bytes.
+ */
+ Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
+
+ if (verify_checksum && (cnt % BLCKSZ != 0))
+ {
+ ereport(WARNING,
+ (errmsg("could not verify checksum in file \"%s\", block "
+ "%u: read buffer size %d and page size %d "
+ "differ",
+ readfilename, blkno, (int) cnt, BLCKSZ)));
+ verify_checksum = false;
+ }
+
+ if (verify_checksum)
+ {
+ for (i = 0; i < cnt / BLCKSZ; i++)
+ {
+ page = sink->bbs_buffer + BLCKSZ * i;
+
+ /*
+ * Only check pages which have not been modified since the
+ * start of the base backup. Otherwise, they might have been
+ * written only halfway and the checksum would not be valid.
+ * However, replaying WAL would reinstate the correct page in
+ * this case. We also skip completely new pages, since they
+ * don't have a checksum yet.
+ */
+ if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr)
+ {
+ checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE);
+ phdr = (PageHeader) page;
+ if (phdr->pd_checksum != checksum)
+ {
+ /*
+ * Retry the block on the first failure. It's
+ * possible that we read the first 4K page of the
+ * block just before postgres updated the entire block
+ * so it ends up looking torn to us. We only need to
+ * retry once because the LSN should be updated to
+ * something we can ignore on the next pass. If the
+ * error happens again then it is a true validation
+ * failure.
+ */
+ if (block_retry == false)
+ {
+ int reread_cnt;
+
+ /* Reread the failed block */
+ reread_cnt =
+ basebackup_read_file(fd,
+ sink->bbs_buffer + BLCKSZ * i,
+ BLCKSZ, len + BLCKSZ * i,
+ readfilename,
+ false);
+ if (reread_cnt == 0)
+ {
+ /*
+ * If we hit end-of-file, a concurrent
+ * truncation must have occurred, so break out
+ * of this loop just as if the initial fread()
+ * returned 0. We'll drop through to the same
+ * code that handles that case. (We must fix
+ * up cnt first, though.)
+ */
+ cnt = BLCKSZ * i;
+ break;
+ }
+
+ /* Set flag so we know a retry was attempted */
+ block_retry = true;
+
+ /* Reset loop to validate the block again */
+ i--;
+ continue;
+ }
+
+ checksum_failures++;
+
+ if (checksum_failures <= 5)
+ ereport(WARNING,
+ (errmsg("checksum verification failed in "
+ "file \"%s\", block %u: calculated "
+ "%X but expected %X",
+ readfilename, blkno, checksum,
+ phdr->pd_checksum)));
+ if (checksum_failures == 5)
+ ereport(WARNING,
+ (errmsg("further checksum verification "
+ "failures in file \"%s\" will not "
+ "be reported", readfilename)));
+ }
+ }
+ block_retry = false;
+ blkno++;
+ }
+ }
+
+ /*
+ * If we hit end-of-file, a concurrent truncation must have occurred.
+ * That's not an error condition, because WAL replay will fix things
+ * up.
+ */
+ if (cnt == 0)
+ break;
+
+ /* Archive the data we just read. */
+ bbsink_archive_contents(sink, cnt);
+
+ /* Also feed it to the checksum machinery. */
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer, cnt) < 0)
+ elog(ERROR, "could not update checksum of base backup");
+
+ len += cnt;
+ }
+
+ /* If the file was truncated while we were sending it, pad it with zeros */
+ while (len < statbuf->st_size)
+ {
+ size_t remaining = statbuf->st_size - len;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
+
+ MemSet(sink->bbs_buffer, 0, nbytes);
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer,
+ nbytes) < 0)
+ elog(ERROR, "could not update checksum of base backup");
+ bbsink_archive_contents(sink, nbytes);
+ len += nbytes;
+ }
+
+ /*
+ * Pad to a block boundary, per tar format requirements. (This small piece
+ * of data is probably not worth throttling, and is not checksummed
+ * because it's not actually part of the file.)
+ */
+ _tarWritePadding(sink, len);
+
+ CloseTransientFile(fd);
+
+ if (checksum_failures > 1)
+ {
+ ereport(WARNING,
+ (errmsg_plural("file \"%s\" has a total of %d checksum verification failure",
+ "file \"%s\" has a total of %d checksum verification failures",
+ checksum_failures,
+ readfilename, checksum_failures)));
+
+ pgstat_report_checksum_failures_in_db(dboid, checksum_failures);
+ }
+
+ total_checksum_failures += checksum_failures;
+
+ AddFileToBackupManifest(manifest, spcoid, tarfilename, statbuf->st_size,
+ (pg_time_t) statbuf->st_mtime, &checksum_ctx);
+
+ return true;
+}
+
+static int64
+_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget,
+ struct stat *statbuf, bool sizeonly)
+{
+ enum tarError rc;
+
+ if (!sizeonly)
+ {
+ /*
+ * As of this writing, the smallest supported block size is 1kB, which
+ * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a
+ * multiple of BLCKSZ, it should be safe to assume that the buffer is
+ * large enough to fit an entire tar block. We double-check by means
+ * of these assertions.
+ */
+ StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ,
+ "BLCKSZ too small for tar block");
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+
+ rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget,
+ statbuf->st_size, statbuf->st_mode,
+ statbuf->st_uid, statbuf->st_gid,
+ statbuf->st_mtime);
+
+ switch (rc)
+ {
+ case TAR_OK:
+ break;
+ case TAR_NAME_TOO_LONG:
+ ereport(ERROR,
+ (errmsg("file name too long for tar format: \"%s\"",
+ filename)));
+ break;
+ case TAR_SYMLINK_TOO_LONG:
+ ereport(ERROR,
+ (errmsg("symbolic link target too long for tar format: "
+ "file name \"%s\", target \"%s\"",
+ filename, linktarget)));
+ break;
+ default:
+ elog(ERROR, "unrecognized tar error: %d", rc);
+ }
+
+ bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
+ }
+
+ return TAR_BLOCK_SIZE;
+}
+
+/*
+ * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE.
+ */
+static void
+_tarWritePadding(bbsink *sink, int len)
+{
+ int pad = tarPaddingBytesRequired(len);
+
+ /*
+ * As in _tarWriteHeader, it should be safe to assume that the buffer is
+ * large enough that we don't need to do this in multiple chunks.
+ */
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+ Assert(pad <= TAR_BLOCK_SIZE);
+
+ if (pad > 0)
+ {
+ MemSet(sink->bbs_buffer, 0, pad);
+ bbsink_archive_contents(sink, pad);
+ }
+}
+
+/*
+ * If the entry in statbuf is a link, then adjust statbuf to make it look like a
+ * directory, so that it will be written that way.
+ */
+static void
+convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
+{
+ /* If symlink, write it as a directory anyway */
+#ifndef WIN32
+ if (S_ISLNK(statbuf->st_mode))
+#else
+ if (pgwin32_is_junction(pathbuf))
+#endif
+ statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
+}
+
+/*
+ * Read some data from a file, setting a wait event and reporting any error
+ * encountered.
+ *
+ * If partial_read_ok is false, also report an error if the number of bytes
+ * read is not equal to the number of bytes requested.
+ *
+ * Returns the number of bytes read.
+ */
+static int
+basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
+ const char *filename, bool partial_read_ok)
+{
+ int rc;
+
+ pgstat_report_wait_start(WAIT_EVENT_BASEBACKUP_READ);
+ rc = pg_pread(fd, buf, nbytes, offset);
+ pgstat_report_wait_end();
+
+ if (rc < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m", filename)));
+ if (!partial_read_ok && rc > 0 && rc != nbytes)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": read %d of %zu",
+ filename, rc, nbytes)));
+
+ return rc;
+}
diff --git a/src/backend/backup/basebackup_copy.c b/src/backend/backup/basebackup_copy.c
new file mode 100644
index 0000000..62b518f
--- /dev/null
+++ b/src/backend/backup/basebackup_copy.c
@@ -0,0 +1,473 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_copy.c
+ * send basebackup archives using COPY OUT
+ *
+ * We send a result set with information about the tabelspaces to be included
+ * in the backup before starting COPY OUT. Then, we start a single COPY OUT
+ * operation and transmits all the archives and the manifest if present during
+ * the course of that single COPY OUT. Each CopyData message begins with a
+ * type byte, allowing us to signal the start of a new archive, or the
+ * manifest, by some means other than ending the COPY stream. This also allows
+ * for future protocol extensions, since we can include arbitrary information
+ * in the message stream as long as we're certain that the client will know
+ * what to do with it.
+ *
+ * An older method that sent each archive using a separate COPY OUT
+ * operation is no longer supported.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_copy.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "backup/basebackup.h"
+#include "backup/basebackup_sink.h"
+#include "catalog/pg_type_d.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_copystream
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* Are we sending the archives to the client, or somewhere else? */
+ bool send_to_client;
+
+ /*
+ * Protocol message buffer. We assemble CopyData protocol messages by
+ * setting the first character of this buffer to 'd' (archive or manifest
+ * data) and then making base.bbs_buffer point to the second character so
+ * that the rest of the data gets copied into the message just where we
+ * want it.
+ */
+ char *msgbuffer;
+
+ /*
+ * When did we last report progress to the client, and how much progress
+ * did we report?
+ */
+ TimestampTz last_progress_report_time;
+ uint64 bytes_done_at_last_time_check;
+} bbsink_copystream;
+
+/*
+ * We don't want to send progress messages to the client excessively
+ * frequently. Ideally, we'd like to send a message when the time since the
+ * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
+ * the system time every time we send a tiny bit of data seems too expensive.
+ * So we only check it after the number of bytes sine the last check reaches
+ * PROGRESS_REPORT_BYTE_INTERVAL.
+ */
+#define PROGRESS_REPORT_BYTE_INTERVAL 65536
+#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
+
+static void bbsink_copystream_begin_backup(bbsink *sink);
+static void bbsink_copystream_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_archive(bbsink *sink);
+static void bbsink_copystream_begin_manifest(bbsink *sink);
+static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_manifest(bbsink *sink);
+static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+static void bbsink_copystream_cleanup(bbsink *sink);
+
+static void SendCopyOutResponse(void);
+static void SendCopyDone(void);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
+static void SendTablespaceList(List *tablespaces);
+static void send_int8_string(StringInfoData *buf, int64 intval);
+
+static const bbsink_ops bbsink_copystream_ops = {
+ .begin_backup = bbsink_copystream_begin_backup,
+ .begin_archive = bbsink_copystream_begin_archive,
+ .archive_contents = bbsink_copystream_archive_contents,
+ .end_archive = bbsink_copystream_end_archive,
+ .begin_manifest = bbsink_copystream_begin_manifest,
+ .manifest_contents = bbsink_copystream_manifest_contents,
+ .end_manifest = bbsink_copystream_end_manifest,
+ .end_backup = bbsink_copystream_end_backup,
+ .cleanup = bbsink_copystream_cleanup
+};
+
+/*
+ * Create a new 'copystream' bbsink.
+ */
+bbsink *
+bbsink_copystream_new(bool send_to_client)
+{
+ bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
+
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
+ sink->send_to_client = send_to_client;
+
+ /* Set up for periodic progress reporting. */
+ sink->last_progress_report_time = GetCurrentTimestamp();
+ sink->bytes_done_at_last_time_check = UINT64CONST(0);
+
+ return &sink->base;
+}
+
+/*
+ * Send start-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_begin_backup(bbsink *sink)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = sink->bbs_state;
+ char *buf;
+
+ /*
+ * Initialize buffer. We ultimately want to send the archive and manifest
+ * data by means of CopyData messages where the payload portion of each
+ * message begins with a type byte. However, basebackup.c expects the
+ * buffer to be aligned, so we can't just allocate one extra byte for the
+ * type byte. Instead, allocate enough extra bytes that the portion of the
+ * buffer we reveal to our callers can be aligned, while leaving room to
+ * slip the type byte in just beforehand. That will allow us to ship the
+ * data with a single call to pq_putmessage and without needing any extra
+ * copying.
+ */
+ buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
+ mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
+ mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
+ mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+
+ /* Tell client the backup start location. */
+ SendXlogRecPtrResult(state->startptr, state->starttli);
+
+ /* Send client a list of tablespaces. */
+ SendTablespaceList(state->tablespaces);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+
+ /* Begin COPY stream. This will be used for all archives + manifest. */
+ SendCopyOutResponse();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of a new archive.
+ */
+static void
+bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_state *state = sink->bbs_state;
+ tablespaceinfo *ti;
+ StringInfoData buf;
+
+ ti = list_nth(state->tablespaces, state->tablespace_num);
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'n'); /* New archive */
+ pq_sendstring(&buf, archive_name);
+ pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message containing a chunk of archive content.
+ */
+static void
+bbsink_copystream_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = mysink->base.bbs_state;
+ StringInfoData buf;
+ uint64 targetbytes;
+
+ /* Send the archive content to the client, if appropriate. */
+ if (mysink->send_to_client)
+ {
+ /* Add one because we're also sending a leading type byte. */
+ pq_putmessage('d', mysink->msgbuffer, len + 1);
+ }
+
+ /* Consider whether to send a progress report to the client. */
+ targetbytes = mysink->bytes_done_at_last_time_check
+ + PROGRESS_REPORT_BYTE_INTERVAL;
+ if (targetbytes <= state->bytes_done)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ long ms;
+
+ /*
+ * OK, we've sent a decent number of bytes, so check the system time
+ * to see whether we're due to send a progress report.
+ */
+ mysink->bytes_done_at_last_time_check = state->bytes_done;
+ ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
+ now);
+
+ /*
+ * Send a progress report if enough time has passed. Also send one if
+ * the system clock was set backward, so that such occurrences don't
+ * have the effect of suppressing further progress messages.
+ */
+ if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
+ {
+ mysink->last_progress_report_time = now;
+
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendint64(&buf, state->bytes_done);
+ pq_endmessage(&buf);
+ pq_flush_if_writable();
+ }
+ }
+}
+
+/*
+ * We don't need to explicitly signal the end of the archive; the client
+ * will figure out that we've reached the end when we begin the next one,
+ * or begin the manifest, or end the COPY stream. However, this seems like
+ * a good time to force out a progress report. One reason for that is that
+ * if this is the last archive, and we don't force a progress report now,
+ * the client will never be told that we sent all the bytes.
+ */
+static void
+bbsink_copystream_end_archive(bbsink *sink)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = mysink->base.bbs_state;
+ StringInfoData buf;
+
+ mysink->bytes_done_at_last_time_check = state->bytes_done;
+ mysink->last_progress_report_time = GetCurrentTimestamp();
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendint64(&buf, state->bytes_done);
+ pq_endmessage(&buf);
+ pq_flush_if_writable();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of the backup manifest.
+ */
+static void
+bbsink_copystream_begin_manifest(bbsink *sink)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'm'); /* Manifest */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+
+ if (mysink->send_to_client)
+ {
+ /* Add one because we're also sending a leading type byte. */
+ pq_putmessage('d', mysink->msgbuffer, len + 1);
+ }
+}
+
+/*
+ * We don't need an explicit terminator for the backup manifest.
+ */
+static void
+bbsink_copystream_end_manifest(bbsink *sink)
+{
+ /* Do nothing. */
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ SendCopyDone();
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copystream_cleanup(bbsink *sink)
+{
+ /* Nothing to do. */
+}
+
+/*
+ * Send a CopyOutResponse message.
+ */
+static void
+SendCopyOutResponse(void)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyDone message.
+ */
+static void
+SendCopyDone(void)
+{
+ pq_putemptymessage('c');
+}
+
+/*
+ * Send a single resultset containing just a single
+ * XLogRecPtr record (in text format)
+ */
+static void
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
+{
+ StringInfoData buf;
+ char str[MAXFNAMELEN];
+ Size len;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* Field headers */
+ pq_sendstring(&buf, "recptr");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "tli");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+
+ /*
+ * int8 may seem like a surprising data type for this, but in theory int4
+ * would not be wide enough for this, as TimeLineID is unsigned.
+ */
+ pq_sendint32(&buf, INT8OID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ len = snprintf(str, sizeof(str),
+ "%X/%X", LSN_FORMAT_ARGS(ptr));
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ len = snprintf(str, sizeof(str), "%u", tli);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ pq_endmessage(&buf);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Send a result set via libpq describing the tablespace list.
+ */
+static void
+SendTablespaceList(List *tablespaces)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ /* Construct and send the directory information */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 3); /* 3 fields */
+
+ /* First field - spcoid */
+ pq_sendstring(&buf, "spcoid");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, OIDOID); /* type oid */
+ pq_sendint16(&buf, 4); /* typlen */
+ pq_sendint32(&buf, 0); /* typmod */
+ pq_sendint16(&buf, 0); /* format code */
+
+ /* Second field - spclocation */
+ pq_sendstring(&buf, "spclocation");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, tablespaces)
+ {
+ tablespaceinfo *ti = lfirst(lc);
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 3); /* number of columns */
+ if (ti->path == NULL)
+ {
+ pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+ pq_sendint32(&buf, -1);
+ }
+ else
+ {
+ Size len;
+
+ len = strlen(ti->oid);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->oid, len);
+
+ len = strlen(ti->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->path, len);
+ }
+ if (ti->size >= 0)
+ send_int8_string(&buf, ti->size / 1024);
+ else
+ pq_sendint32(&buf, -1); /* NULL */
+
+ pq_endmessage(&buf);
+ }
+}
+
+/*
+ * Send a 64-bit integer as a string via the wire protocol.
+ */
+static void
+send_int8_string(StringInfoData *buf, int64 intval)
+{
+ char is[32];
+
+ sprintf(is, INT64_FORMAT, intval);
+ pq_sendint32(buf, strlen(is));
+ pq_sendbytes(buf, is, strlen(is));
+}
diff --git a/src/backend/backup/basebackup_gzip.c b/src/backend/backup/basebackup_gzip.c
new file mode 100644
index 0000000..1ec4279
--- /dev/null
+++ b/src/backend/backup/basebackup_gzip.c
@@ -0,0 +1,304 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_gzip.c
+ * Basebackup sink implementing gzip compression.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_gzip.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "backup/basebackup_sink.h"
+
+#ifdef HAVE_LIBZ
+typedef struct bbsink_gzip
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* Compression level. */
+ int compresslevel;
+
+ /* Compressed data stream. */
+ z_stream zstream;
+
+ /* Number of bytes staged in output buffer. */
+ size_t bytes_written;
+} bbsink_gzip;
+
+static void bbsink_gzip_begin_backup(bbsink *sink);
+static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
+static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
+static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_gzip_end_archive(bbsink *sink);
+static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
+static void gzip_pfree(void *opaque, void *address);
+
+static const bbsink_ops bbsink_gzip_ops = {
+ .begin_backup = bbsink_gzip_begin_backup,
+ .begin_archive = bbsink_gzip_begin_archive,
+ .archive_contents = bbsink_gzip_archive_contents,
+ .end_archive = bbsink_gzip_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_gzip_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+#endif
+
+/*
+ * Create a new basebackup sink that performs gzip compression.
+ */
+bbsink *
+bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
+{
+#ifndef HAVE_LIBZ
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("gzip compression is not supported by this build")));
+ return NULL; /* keep compiler quiet */
+#else
+ bbsink_gzip *sink;
+ int compresslevel;
+
+ Assert(next != NULL);
+
+ compresslevel = compress->level;
+ Assert((compresslevel >= 1 && compresslevel <= 9) ||
+ compresslevel == Z_DEFAULT_COMPRESSION);
+
+ sink = palloc0(sizeof(bbsink_gzip));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
+ sink->base.bbs_next = next;
+ sink->compresslevel = compresslevel;
+
+ return &sink->base;
+#endif
+}
+
+#ifdef HAVE_LIBZ
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_gzip_begin_backup(bbsink *sink)
+{
+ /*
+ * We need our own buffer, because we're going to pass different data to
+ * the next sink than what gets passed to us.
+ */
+ sink->bbs_buffer = palloc(sink->bbs_buffer_length);
+
+ /*
+ * Since deflate() doesn't require the output buffer to be of any
+ * particular size, we can just make it the same size as the input buffer.
+ */
+ bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
+ sink->bbs_buffer_length);
+}
+
+/*
+ * Prepare to compress the next archive.
+ */
+static void
+bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_gzip *mysink = (bbsink_gzip *) sink;
+ char *gz_archive_name;
+ z_stream *zs = &mysink->zstream;
+
+ /* Initialize compressor object. */
+ memset(zs, 0, sizeof(z_stream));
+ zs->zalloc = gzip_palloc;
+ zs->zfree = gzip_pfree;
+ zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
+ zs->avail_out = sink->bbs_next->bbs_buffer_length;
+
+ /*
+ * We need to use deflateInit2() rather than deflateInit() here so that we
+ * can request a gzip header rather than a zlib header. Otherwise, we want
+ * to supply the same values that would have been used by default if we
+ * had just called deflateInit().
+ *
+ * Per the documentation for deflateInit2, the third argument must be
+ * Z_DEFLATED; the fourth argument is the number of "window bits", by
+ * default 15, but adding 16 gets you a gzip header rather than a zlib
+ * header; the fifth argument controls memory usage, and 8 is the default;
+ * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
+ */
+ if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ ereport(ERROR,
+ errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("could not initialize compression library"));
+
+ /*
+ * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
+ * archives named ".tar.gz" rather than ".tgz", so we match that here.
+ */
+ gz_archive_name = psprintf("%s.gz", archive_name);
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_archive(sink->bbs_next, gz_archive_name);
+ pfree(gz_archive_name);
+}
+
+/*
+ * Compress the input data to the output buffer until we run out of input
+ * data. Each time the output buffer fills up, invoke the archive_contents()
+ * method for then next sink.
+ *
+ * Note that since we're compressing the input, it may very commonly happen
+ * that we consume all the input data without filling the output buffer. In
+ * that case, the compressed representation of the current input data won't
+ * actually be sent to the next bbsink until a later call to this function,
+ * or perhaps even not until bbsink_gzip_end_archive() is invoked.
+ */
+static void
+bbsink_gzip_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_gzip *mysink = (bbsink_gzip *) sink;
+ z_stream *zs = &mysink->zstream;
+
+ /* Compress data from input buffer. */
+ zs->next_in = (uint8 *) mysink->base.bbs_buffer;
+ zs->avail_in = len;
+
+ while (zs->avail_in > 0)
+ {
+ int res;
+
+ /* Write output data into unused portion of output buffer. */
+ Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
+ zs->next_out = (uint8 *)
+ mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
+ zs->avail_out =
+ mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
+
+ /*
+ * Try to compress. Note that this will update zs->next_in and
+ * zs->avail_in according to how much input data was consumed, and
+ * zs->next_out and zs->avail_out according to how many output bytes
+ * were produced.
+ *
+ * According to the zlib documentation, Z_STREAM_ERROR should only
+ * occur if we've made a programming error, or if say there's been a
+ * memory clobber; we use elog() rather than Assert() here out of an
+ * abundance of caution.
+ */
+ res = deflate(zs, Z_NO_FLUSH);
+ if (res == Z_STREAM_ERROR)
+ elog(ERROR, "could not compress data: %s", zs->msg);
+
+ /* Update our notion of how many bytes we've written. */
+ mysink->bytes_written =
+ mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
+
+ /*
+ * If the output buffer is full, it's time for the next sink to
+ * process the contents.
+ */
+ if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
+ {
+ bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+ mysink->bytes_written = 0;
+ }
+ }
+}
+
+/*
+ * There might be some data inside zlib's internal buffers; we need to get
+ * that flushed out and forwarded to the successor sink as archive content.
+ *
+ * Then we can end processing for this archive.
+ */
+static void
+bbsink_gzip_end_archive(bbsink *sink)
+{
+ bbsink_gzip *mysink = (bbsink_gzip *) sink;
+ z_stream *zs = &mysink->zstream;
+
+ /* There is no more data available. */
+ zs->next_in = (uint8 *) mysink->base.bbs_buffer;
+ zs->avail_in = 0;
+
+ while (1)
+ {
+ int res;
+
+ /* Write output data into unused portion of output buffer. */
+ Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
+ zs->next_out = (uint8 *)
+ mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
+ zs->avail_out =
+ mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
+
+ /*
+ * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
+ * no more input.
+ */
+ res = deflate(zs, Z_FINISH);
+ if (res == Z_STREAM_ERROR)
+ elog(ERROR, "could not compress data: %s", zs->msg);
+
+ /* Update our notion of how many bytes we've written. */
+ mysink->bytes_written =
+ mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
+
+ /*
+ * Apparently we had no data in the output buffer and deflate() was
+ * not able to add any. We must be done.
+ */
+ if (mysink->bytes_written == 0)
+ break;
+
+ /* Send whatever accumulated output bytes we have. */
+ bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+ mysink->bytes_written = 0;
+ }
+
+ /* Must also pass on the information that this archive has ended. */
+ bbsink_forward_end_archive(sink);
+}
+
+/*
+ * Manifest contents are not compressed, but we do need to copy them into
+ * the successor sink's buffer, because we have our own.
+ */
+static void
+bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
+{
+ memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
+ bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Wrapper function to adjust the signature of palloc to match what libz
+ * expects.
+ */
+static void *
+gzip_palloc(void *opaque, unsigned items, unsigned size)
+{
+ return palloc(items * size);
+}
+
+/*
+ * Wrapper function to adjust the signature of pfree to match what libz
+ * expects.
+ */
+static void
+gzip_pfree(void *opaque, void *address)
+{
+ pfree(address);
+}
+
+#endif
diff --git a/src/backend/backup/basebackup_lz4.c b/src/backend/backup/basebackup_lz4.c
new file mode 100644
index 0000000..986272a
--- /dev/null
+++ b/src/backend/backup/basebackup_lz4.c
@@ -0,0 +1,296 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_lz4.c
+ * Basebackup sink implementing lz4 compression.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_lz4.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#ifdef USE_LZ4
+#include <lz4frame.h>
+#endif
+
+#include "backup/basebackup_sink.h"
+
+#ifdef USE_LZ4
+
+typedef struct bbsink_lz4
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* Compression level. */
+ int compresslevel;
+
+ LZ4F_compressionContext_t ctx;
+ LZ4F_preferences_t prefs;
+
+ /* Number of bytes staged in output buffer. */
+ size_t bytes_written;
+} bbsink_lz4;
+
+static void bbsink_lz4_begin_backup(bbsink *sink);
+static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
+static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
+static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_lz4_end_archive(bbsink *sink);
+static void bbsink_lz4_cleanup(bbsink *sink);
+
+static const bbsink_ops bbsink_lz4_ops = {
+ .begin_backup = bbsink_lz4_begin_backup,
+ .begin_archive = bbsink_lz4_begin_archive,
+ .archive_contents = bbsink_lz4_archive_contents,
+ .end_archive = bbsink_lz4_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_lz4_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_lz4_cleanup
+};
+#endif
+
+/*
+ * Create a new basebackup sink that performs lz4 compression.
+ */
+bbsink *
+bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
+{
+#ifndef USE_LZ4
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("lz4 compression is not supported by this build")));
+ return NULL; /* keep compiler quiet */
+#else
+ bbsink_lz4 *sink;
+ int compresslevel;
+
+ Assert(next != NULL);
+
+ compresslevel = compress->level;
+ Assert(compresslevel >= 0 && compresslevel <= 12);
+
+ sink = palloc0(sizeof(bbsink_lz4));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
+ sink->base.bbs_next = next;
+ sink->compresslevel = compresslevel;
+
+ return &sink->base;
+#endif
+}
+
+#ifdef USE_LZ4
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_lz4_begin_backup(bbsink *sink)
+{
+ bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
+ size_t output_buffer_bound;
+ LZ4F_preferences_t *prefs = &mysink->prefs;
+
+ /* Initialize compressor object. */
+ memset(prefs, 0, sizeof(LZ4F_preferences_t));
+ prefs->frameInfo.blockSizeID = LZ4F_max256KB;
+ prefs->compressionLevel = mysink->compresslevel;
+
+ /*
+ * We need our own buffer, because we're going to pass different data to
+ * the next sink than what gets passed to us.
+ */
+ mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
+
+ /*
+ * Since LZ4F_compressUpdate() requires the output buffer of size equal or
+ * greater than that of LZ4F_compressBound(), make sure we have the next
+ * sink's bbs_buffer of length that can accommodate the compressed input
+ * buffer.
+ */
+ output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
+ &mysink->prefs);
+
+ /*
+ * The buffer length is expected to be a multiple of BLCKSZ, so round up.
+ */
+ output_buffer_bound = output_buffer_bound + BLCKSZ -
+ (output_buffer_bound % BLCKSZ);
+
+ bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
+}
+
+/*
+ * Prepare to compress the next archive.
+ */
+static void
+bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
+ char *lz4_archive_name;
+ LZ4F_errorCode_t ctxError;
+ size_t headerSize;
+
+ ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctxError))
+ elog(ERROR, "could not create lz4 compression context: %s",
+ LZ4F_getErrorName(ctxError));
+
+ /* First of all write the frame header to destination buffer. */
+ headerSize = LZ4F_compressBegin(mysink->ctx,
+ mysink->base.bbs_next->bbs_buffer,
+ mysink->base.bbs_next->bbs_buffer_length,
+ &mysink->prefs);
+
+ if (LZ4F_isError(headerSize))
+ elog(ERROR, "could not write lz4 header: %s",
+ LZ4F_getErrorName(headerSize));
+
+ /*
+ * We need to write the compressed data after the header in the output
+ * buffer. So, make sure to update the notion of bytes written to output
+ * buffer.
+ */
+ mysink->bytes_written += headerSize;
+
+ /* Add ".lz4" to the archive name. */
+ lz4_archive_name = psprintf("%s.lz4", archive_name);
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
+ pfree(lz4_archive_name);
+}
+
+/*
+ * Compress the input data to the output buffer until we run out of input
+ * data. Each time the output buffer falls below the compression bound for
+ * the input buffer, invoke the archive_contents() method for then next sink.
+ *
+ * Note that since we're compressing the input, it may very commonly happen
+ * that we consume all the input data without filling the output buffer. In
+ * that case, the compressed representation of the current input data won't
+ * actually be sent to the next bbsink until a later call to this function,
+ * or perhaps even not until bbsink_lz4_end_archive() is invoked.
+ */
+static void
+bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
+{
+ bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
+ size_t compressedSize;
+ size_t avail_in_bound;
+
+ avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
+
+ /*
+ * If the number of available bytes has fallen below the value computed by
+ * LZ4F_compressBound(), ask the next sink to process the data so that we
+ * can empty the buffer.
+ */
+ if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
+ avail_in_bound)
+ {
+ bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+ mysink->bytes_written = 0;
+ }
+
+ /*
+ * Compress the input buffer and write it into the output buffer.
+ */
+ compressedSize = LZ4F_compressUpdate(mysink->ctx,
+ mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
+ mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
+ (uint8 *) mysink->base.bbs_buffer,
+ avail_in,
+ NULL);
+
+ if (LZ4F_isError(compressedSize))
+ elog(ERROR, "could not compress data: %s",
+ LZ4F_getErrorName(compressedSize));
+
+ /*
+ * Update our notion of how many bytes we've written into output buffer.
+ */
+ mysink->bytes_written += compressedSize;
+}
+
+/*
+ * There might be some data inside lz4's internal buffers; we need to get
+ * that flushed out and also finalize the lz4 frame and then get that forwarded
+ * to the successor sink as archive content.
+ *
+ * Then we can end processing for this archive.
+ */
+static void
+bbsink_lz4_end_archive(bbsink *sink)
+{
+ bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
+ size_t compressedSize;
+ size_t lz4_footer_bound;
+
+ lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
+
+ Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
+
+ if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
+ lz4_footer_bound)
+ {
+ bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+ mysink->bytes_written = 0;
+ }
+
+ compressedSize = LZ4F_compressEnd(mysink->ctx,
+ mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
+ mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
+ NULL);
+
+ if (LZ4F_isError(compressedSize))
+ elog(ERROR, "could not end lz4 compression: %s",
+ LZ4F_getErrorName(compressedSize));
+
+ /* Update our notion of how many bytes we've written. */
+ mysink->bytes_written += compressedSize;
+
+ /* Send whatever accumulated output bytes we have. */
+ bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+ mysink->bytes_written = 0;
+
+ /* Release the resources. */
+ LZ4F_freeCompressionContext(mysink->ctx);
+ mysink->ctx = NULL;
+
+ /* Pass on the information that this archive has ended. */
+ bbsink_forward_end_archive(sink);
+}
+
+/*
+ * Manifest contents are not compressed, but we do need to copy them into
+ * the successor sink's buffer, because we have our own.
+ */
+static void
+bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
+{
+ memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
+ bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * In case the backup fails, make sure we free the compression context by
+ * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
+ */
+static void
+bbsink_lz4_cleanup(bbsink *sink)
+{
+ bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
+
+ if (mysink->ctx)
+ {
+ LZ4F_freeCompressionContext(mysink->ctx);
+ mysink->ctx = NULL;
+ }
+}
+
+#endif
diff --git a/src/backend/backup/basebackup_progress.c b/src/backend/backup/basebackup_progress.c
new file mode 100644
index 0000000..6d4b5a2
--- /dev/null
+++ b/src/backend/backup/basebackup_progress.c
@@ -0,0 +1,246 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_progress.c
+ * Basebackup sink implementing progress tracking, including but not
+ * limited to command progress reporting.
+ *
+ * This should be used even if the PROGRESS option to the replication
+ * command BASE_BACKUP is not specified. Without that option, we won't
+ * have tallied up the size of the files that are going to need to be
+ * backed up, but we can still report to the command progress reporting
+ * facility how much data we've processed.
+ *
+ * Moreover, we also use this as a convenient place to update certain
+ * fields of the bbsink_state. That work is accurately described as
+ * keeping track of our progress, but it's not just for introspection.
+ * We need those fields to be updated properly in order for base backups
+ * to work.
+ *
+ * This particular basebackup sink requires extra callbacks that most base
+ * backup sinks don't. Rather than cramming those into the interface, we just
+ * have a few extra functions here that basebackup.c can call. (We could put
+ * the logic directly into that file as it's fairly simple, but it seems
+ * cleaner to have everything related to progress reporting in one place.)
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_progress.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "backup/basebackup.h"
+#include "backup/basebackup_sink.h"
+#include "commands/progress.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+static void bbsink_progress_begin_backup(bbsink *sink);
+static void bbsink_progress_archive_contents(bbsink *sink, size_t len);
+static void bbsink_progress_end_archive(bbsink *sink);
+
+static const bbsink_ops bbsink_progress_ops = {
+ .begin_backup = bbsink_progress_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_progress_archive_contents,
+ .end_archive = bbsink_progress_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_forward_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * Create a new basebackup sink that performs progress tracking functions and
+ * forwards data to a successor sink.
+ */
+bbsink *
+bbsink_progress_new(bbsink *next, bool estimate_backup_size)
+{
+ bbsink *sink;
+
+ Assert(next != NULL);
+
+ sink = palloc0(sizeof(bbsink));
+ *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops;
+ sink->bbs_next = next;
+
+ /*
+ * Report that a base backup is in progress, and set the total size of the
+ * backup to -1, which will get translated to NULL. If we're estimating
+ * the backup size, we'll insert the real estimate when we have it.
+ */
+ pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1);
+
+ return sink;
+}
+
+/*
+ * Progress reporting at start of backup.
+ */
+static void
+bbsink_progress_begin_backup(bbsink *sink)
+{
+ const int index[] = {
+ PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_BACKUP_TOTAL,
+ PROGRESS_BASEBACKUP_TBLSPC_TOTAL
+ };
+ int64 val[3];
+
+ /*
+ * Report that we are now streaming database files as a base backup. Also
+ * advertise the number of tablespaces, and, if known, the estimated total
+ * backup size.
+ */
+ val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP;
+ if (sink->bbs_state->bytes_total_is_valid)
+ val[1] = sink->bbs_state->bytes_total;
+ else
+ val[1] = -1;
+ val[2] = list_length(sink->bbs_state->tablespaces);
+ pgstat_progress_update_multi_param(3, index, val);
+
+ /* Delegate to next sink. */
+ bbsink_forward_begin_backup(sink);
+}
+
+/*
+ * End-of archive progress reporting.
+ */
+static void
+bbsink_progress_end_archive(bbsink *sink)
+{
+ /*
+ * We expect one archive per tablespace, so reaching the end of an archive
+ * also means reaching the end of a tablespace. (Some day we might have a
+ * reason to decouple these concepts.)
+ *
+ * If WAL is included in the backup, we'll mark the last tablespace
+ * complete before the last archive is complete, so we need a guard here
+ * to ensure that the number of tablespaces streamed doesn't exceed the
+ * total.
+ */
+ if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces))
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
+ sink->bbs_state->tablespace_num + 1);
+
+ /* Delegate to next sink. */
+ bbsink_forward_end_archive(sink);
+
+ /*
+ * This is a convenient place to update the bbsink_state's notion of which
+ * is the current tablespace. Note that the bbsink_state object is shared
+ * across all bbsink objects involved, but we're the outermost one and
+ * this is the very last thing we do.
+ */
+ sink->bbs_state->tablespace_num++;
+}
+
+/*
+ * Handle progress tracking for new archive contents.
+ *
+ * Increment the counter for the amount of data already streamed
+ * by the given number of bytes, and update the progress report for
+ * pg_stat_progress_basebackup.
+ */
+static void
+bbsink_progress_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_state *state = sink->bbs_state;
+ const int index[] = {
+ PROGRESS_BASEBACKUP_BACKUP_STREAMED,
+ PROGRESS_BASEBACKUP_BACKUP_TOTAL
+ };
+ int64 val[2];
+ int nparam = 0;
+
+ /* First update bbsink_state with # of bytes done. */
+ state->bytes_done += len;
+
+ /* Now forward to next sink. */
+ bbsink_forward_archive_contents(sink, len);
+
+ /* Prepare to set # of bytes done for command progress reporting. */
+ val[nparam++] = state->bytes_done;
+
+ /*
+ * We may also want to update # of total bytes, to avoid overflowing past
+ * 100% or the full size. This may make the total size number change as we
+ * approach the end of the backup (the estimate will always be wrong if
+ * WAL is included), but that's better than having the done column be
+ * bigger than the total.
+ */
+ if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total)
+ val[nparam++] = state->bytes_done;
+
+ pgstat_progress_update_multi_param(nparam, index, val);
+}
+
+/*
+ * Advertise that we are waiting for the start-of-backup checkpoint.
+ */
+void
+basebackup_progress_wait_checkpoint(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
+}
+
+/*
+ * Advertise that we are estimating the backup size.
+ */
+void
+basebackup_progress_estimate_backup_size(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
+}
+
+/*
+ * Advertise that we are waiting for WAL archiving at end-of-backup.
+ */
+void
+basebackup_progress_wait_wal_archive(bbsink_state *state)
+{
+ const int index[] = {
+ PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_TBLSPC_STREAMED
+ };
+ int64 val[2];
+
+ /*
+ * We report having finished all tablespaces at this point, even if the
+ * archive for the main tablespace is still open, because what's going to
+ * be added is WAL files, not files that are really from the main
+ * tablespace.
+ */
+ val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE;
+ val[1] = list_length(state->tablespaces);
+ pgstat_progress_update_multi_param(2, index, val);
+}
+
+/*
+ * Advertise that we are transferring WAL files into the final archive.
+ */
+void
+basebackup_progress_transfer_wal(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
+}
+
+/*
+ * Advertise that we are no longer performing a backup.
+ */
+void
+basebackup_progress_done(void)
+{
+ pgstat_progress_end_command();
+}
diff --git a/src/backend/backup/basebackup_server.c b/src/backend/backup/basebackup_server.c
new file mode 100644
index 0000000..0258d7a
--- /dev/null
+++ b/src/backend/backup/basebackup_server.c
@@ -0,0 +1,309 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_server.c
+ * store basebackup archives on the server
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_server.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "backup/basebackup.h"
+#include "backup/basebackup_sink.h"
+#include "catalog/pg_authid.h"
+#include "miscadmin.h"
+#include "storage/fd.h"
+#include "utils/acl.h"
+#include "utils/timestamp.h"
+#include "utils/wait_event.h"
+
+typedef struct bbsink_server
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* Directory in which backup is to be stored. */
+ char *pathname;
+
+ /* Currently open file (or 0 if nothing open). */
+ File file;
+
+ /* Current file position. */
+ off_t filepos;
+} bbsink_server;
+
+static void bbsink_server_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_server_archive_contents(bbsink *sink, size_t len);
+static void bbsink_server_end_archive(bbsink *sink);
+static void bbsink_server_begin_manifest(bbsink *sink);
+static void bbsink_server_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_server_end_manifest(bbsink *sink);
+
+static const bbsink_ops bbsink_server_ops = {
+ .begin_backup = bbsink_forward_begin_backup,
+ .begin_archive = bbsink_server_begin_archive,
+ .archive_contents = bbsink_server_archive_contents,
+ .end_archive = bbsink_server_end_archive,
+ .begin_manifest = bbsink_server_begin_manifest,
+ .manifest_contents = bbsink_server_manifest_contents,
+ .end_manifest = bbsink_server_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * Create a new 'server' bbsink.
+ */
+bbsink *
+bbsink_server_new(bbsink *next, char *pathname)
+{
+ bbsink_server *sink = palloc0(sizeof(bbsink_server));
+
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_server_ops;
+ sink->pathname = pathname;
+ sink->base.bbs_next = next;
+
+ /* Replication permission is not sufficient in this case. */
+ StartTransactionCommand();
+ if (!has_privs_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser or a role with privileges of the pg_write_server_files role to create backup stored on server")));
+ CommitTransactionCommand();
+
+ /*
+ * It's not a good idea to store your backups in the same directory that
+ * you're backing up. If we allowed a relative path here, that could
+ * easily happen accidentally, so we don't. The user could still
+ * accomplish the same thing by including the absolute path to $PGDATA in
+ * the pathname, but that's likely an intentional bad decision rather than
+ * an accident.
+ */
+ if (!is_absolute_path(pathname))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for backup stored on server")));
+
+ switch (pg_check_dir(pathname))
+ {
+ case 0:
+
+ /*
+ * Does not exist, so create it using the same permissions we'd
+ * use for a new subdirectory of the data directory itself.
+ */
+ if (MakePGDirectory(pathname) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m", pathname)));
+ break;
+
+ case 1:
+ /* Exists, empty. */
+ break;
+
+ case 2:
+ case 3:
+ case 4:
+ /* Exists, not empty. */
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_FILE),
+ errmsg("directory \"%s\" exists but is not empty",
+ pathname)));
+ break;
+
+ default:
+ /* Access problem. */
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not access directory \"%s\": %m",
+ pathname)));
+ }
+
+ return &sink->base;
+}
+
+/*
+ * Open the correct output file for this archive.
+ */
+static void
+bbsink_server_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_server *mysink = (bbsink_server *) sink;
+ char *filename;
+
+ Assert(mysink->file == 0);
+ Assert(mysink->filepos == 0);
+
+ filename = psprintf("%s/%s", mysink->pathname, archive_name);
+
+ mysink->file = PathNameOpenFile(filename,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
+ if (mysink->file <= 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", filename)));
+
+ pfree(filename);
+
+ bbsink_forward_begin_archive(sink, archive_name);
+}
+
+/*
+ * Write the data to the output file.
+ */
+static void
+bbsink_server_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_server *mysink = (bbsink_server *) sink;
+ int nbytes;
+
+ nbytes = FileWrite(mysink->file, mysink->base.bbs_buffer, len,
+ mysink->filepos, WAIT_EVENT_BASEBACKUP_WRITE);
+
+ if (nbytes != len)
+ {
+ if (nbytes < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ FilePathName(mysink->file)),
+ errhint("Check free disk space.")));
+ /* short write: complain appropriately */
+ ereport(ERROR,
+ (errcode(ERRCODE_DISK_FULL),
+ errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
+ FilePathName(mysink->file),
+ nbytes, (int) len, (unsigned) mysink->filepos),
+ errhint("Check free disk space.")));
+ }
+
+ mysink->filepos += nbytes;
+
+ bbsink_forward_archive_contents(sink, len);
+}
+
+/*
+ * fsync and close the current output file.
+ */
+static void
+bbsink_server_end_archive(bbsink *sink)
+{
+ bbsink_server *mysink = (bbsink_server *) sink;
+
+ /*
+ * We intentionally don't use data_sync_elevel here, because the server
+ * shouldn't PANIC just because we can't guarantee that the backup has
+ * been written down to disk. Running recovery won't fix anything in this
+ * case anyway.
+ */
+ if (FileSync(mysink->file, WAIT_EVENT_BASEBACKUP_SYNC) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ FilePathName(mysink->file))));
+
+
+ /* We're done with this file now. */
+ FileClose(mysink->file);
+ mysink->file = 0;
+ mysink->filepos = 0;
+
+ bbsink_forward_end_archive(sink);
+}
+
+/*
+ * Open the output file to which we will write the manifest.
+ *
+ * Just like pg_basebackup, we write the manifest first under a temporary
+ * name and then rename it into place after fsync. That way, if the manifest
+ * is there and under the correct name, the user can be sure that the backup
+ * completed.
+ */
+static void
+bbsink_server_begin_manifest(bbsink *sink)
+{
+ bbsink_server *mysink = (bbsink_server *) sink;
+ char *tmp_filename;
+
+ Assert(mysink->file == 0);
+
+ tmp_filename = psprintf("%s/backup_manifest.tmp", mysink->pathname);
+
+ mysink->file = PathNameOpenFile(tmp_filename,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
+ if (mysink->file <= 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", tmp_filename)));
+
+ pfree(tmp_filename);
+
+ bbsink_forward_begin_manifest(sink);
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_server_manifest_contents(bbsink *sink, size_t len)
+{
+ bbsink_server *mysink = (bbsink_server *) sink;
+ int nbytes;
+
+ nbytes = FileWrite(mysink->file, mysink->base.bbs_buffer, len,
+ mysink->filepos, WAIT_EVENT_BASEBACKUP_WRITE);
+
+ if (nbytes != len)
+ {
+ if (nbytes < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write file \"%s\": %m",
+ FilePathName(mysink->file)),
+ errhint("Check free disk space.")));
+ /* short write: complain appropriately */
+ ereport(ERROR,
+ (errcode(ERRCODE_DISK_FULL),
+ errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
+ FilePathName(mysink->file),
+ nbytes, (int) len, (unsigned) mysink->filepos),
+ errhint("Check free disk space.")));
+ }
+
+ mysink->filepos += nbytes;
+
+ bbsink_forward_manifest_contents(sink, len);
+}
+
+/*
+ * fsync the backup manifest, close the file, and then rename it into place.
+ */
+static void
+bbsink_server_end_manifest(bbsink *sink)
+{
+ bbsink_server *mysink = (bbsink_server *) sink;
+ char *tmp_filename;
+ char *filename;
+
+ /* We're done with this file now. */
+ FileClose(mysink->file);
+ mysink->file = 0;
+
+ /*
+ * Rename it into place. This also fsyncs the temporary file, so we don't
+ * need to do that here. We don't use data_sync_elevel here for the same
+ * reasons as in bbsink_server_end_archive.
+ */
+ tmp_filename = psprintf("%s/backup_manifest.tmp", mysink->pathname);
+ filename = psprintf("%s/backup_manifest", mysink->pathname);
+ durable_rename(tmp_filename, filename, ERROR);
+ pfree(filename);
+ pfree(tmp_filename);
+
+ bbsink_forward_end_manifest(sink);
+}
diff --git a/src/backend/backup/basebackup_sink.c b/src/backend/backup/basebackup_sink.c
new file mode 100644
index 0000000..4536029
--- /dev/null
+++ b/src/backend/backup/basebackup_sink.c
@@ -0,0 +1,125 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_sink.c
+ * Default implementations for bbsink (basebackup sink) callbacks.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/backend/backup/basebackup_sink.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "backup/basebackup_sink.h"
+
+/*
+ * Forward begin_backup callback.
+ *
+ * Only use this implementation if you want the bbsink you're implementing to
+ * share a buffer with the successor bbsink.
+ */
+void
+bbsink_forward_begin_backup(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_state != NULL);
+ bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
+ sink->bbs_buffer_length);
+ sink->bbs_buffer = sink->bbs_next->bbs_buffer;
+}
+
+/*
+ * Forward begin_archive callback.
+ */
+void
+bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_archive(sink->bbs_next, archive_name);
+}
+
+/*
+ * Forward archive_contents callback.
+ *
+ * Code that wants to use this should initialize its own bbs_buffer and
+ * bbs_buffer_length fields to the values from the successor sink. In cases
+ * where the buffer isn't shared, the data needs to be copied before forwarding
+ * the callback. We don't do try to do that here, because there's really no
+ * reason to have separately allocated buffers containing the same identical
+ * data.
+ */
+void
+bbsink_forward_archive_contents(bbsink *sink, size_t len)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
+ Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
+ bbsink_archive_contents(sink->bbs_next, len);
+}
+
+/*
+ * Forward end_archive callback.
+ */
+void
+bbsink_forward_end_archive(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_archive(sink->bbs_next);
+}
+
+/*
+ * Forward begin_manifest callback.
+ */
+void
+bbsink_forward_begin_manifest(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_manifest(sink->bbs_next);
+}
+
+/*
+ * Forward manifest_contents callback.
+ *
+ * As with the archive_contents callback, it's expected that the buffer is
+ * shared.
+ */
+void
+bbsink_forward_manifest_contents(bbsink *sink, size_t len)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
+ Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
+ bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Forward end_manifest callback.
+ */
+void
+bbsink_forward_end_manifest(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_manifest(sink->bbs_next);
+}
+
+/*
+ * Forward end_backup callback.
+ */
+void
+bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_backup(sink->bbs_next, endptr, endtli);
+}
+
+/*
+ * Forward cleanup callback.
+ */
+void
+bbsink_forward_cleanup(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_cleanup(sink->bbs_next);
+}
diff --git a/src/backend/backup/basebackup_target.c b/src/backend/backup/basebackup_target.c
new file mode 100644
index 0000000..4d15ca4
--- /dev/null
+++ b/src/backend/backup/basebackup_target.c
@@ -0,0 +1,241 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_target.c
+ * Base backups can be "targeted", which means that they can be sent
+ * somewhere other than to the client which requested the backup.
+ * Furthermore, new targets can be defined by extensions. This file
+ * contains code to support that functionality.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_target.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "backup/basebackup_target.h"
+#include "utils/memutils.h"
+
+typedef struct BaseBackupTargetType
+{
+ char *name;
+ void *(*check_detail) (char *, char *);
+ bbsink *(*get_sink) (bbsink *, void *);
+} BaseBackupTargetType;
+
+struct BaseBackupTargetHandle
+{
+ BaseBackupTargetType *type;
+ void *detail_arg;
+};
+
+static void initialize_target_list(void);
+static bbsink *blackhole_get_sink(bbsink *next_sink, void *detail_arg);
+static bbsink *server_get_sink(bbsink *next_sink, void *detail_arg);
+static void *reject_target_detail(char *target, char *target_detail);
+static void *server_check_detail(char *target, char *target_detail);
+
+static BaseBackupTargetType builtin_backup_targets[] =
+{
+ {
+ "blackhole", reject_target_detail, blackhole_get_sink
+ },
+ {
+ "server", server_check_detail, server_get_sink
+ },
+ {
+ NULL
+ }
+};
+
+static List *BaseBackupTargetTypeList = NIL;
+
+/*
+ * Add a new base backup target type.
+ *
+ * This is intended for use by server extensions.
+ */
+void
+BaseBackupAddTarget(char *name,
+ void *(*check_detail) (char *, char *),
+ bbsink *(*get_sink) (bbsink *, void *))
+{
+ BaseBackupTargetType *newtype;
+ MemoryContext oldcontext;
+ ListCell *lc;
+
+ /* If the target list is not yet initialized, do that first. */
+ if (BaseBackupTargetTypeList == NIL)
+ initialize_target_list();
+
+ /* Search the target type list for an existing entry with this name. */
+ foreach(lc, BaseBackupTargetTypeList)
+ {
+ BaseBackupTargetType *ttype = lfirst(lc);
+
+ if (strcmp(ttype->name, name) == 0)
+ {
+ /*
+ * We found one, so update it.
+ *
+ * It is probably not a great idea to call BaseBackupAddTarget for
+ * the same name multiple times, but if it happens, this seems
+ * like the sanest behavior.
+ */
+ ttype->check_detail = check_detail;
+ ttype->get_sink = get_sink;
+ return;
+ }
+ }
+
+ /*
+ * We use TopMemoryContext for allocations here to make sure that the data
+ * we need doesn't vanish under us; that's also why we copy the target
+ * name into a newly-allocated chunk of memory.
+ */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ newtype = palloc(sizeof(BaseBackupTargetType));
+ newtype->name = pstrdup(name);
+ newtype->check_detail = check_detail;
+ newtype->get_sink = get_sink;
+ BaseBackupTargetTypeList = lappend(BaseBackupTargetTypeList, newtype);
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Look up a base backup target and validate the target_detail.
+ *
+ * Extensions that define new backup targets will probably define a new
+ * type of bbsink to match. Validation of the target_detail can be performed
+ * either in the check_detail routine called here, or in the bbsink
+ * constructor, which will be called from BaseBackupGetSink. It's mostly
+ * a matter of taste, but the check_detail function runs somewhat earlier.
+ */
+BaseBackupTargetHandle *
+BaseBackupGetTargetHandle(char *target, char *target_detail)
+{
+ ListCell *lc;
+
+ /* If the target list is not yet initialized, do that first. */
+ if (BaseBackupTargetTypeList == NIL)
+ initialize_target_list();
+
+ /* Search the target type list for a match. */
+ foreach(lc, BaseBackupTargetTypeList)
+ {
+ BaseBackupTargetType *ttype = lfirst(lc);
+
+ if (strcmp(ttype->name, target) == 0)
+ {
+ BaseBackupTargetHandle *handle;
+
+ /* Found the target. */
+ handle = palloc(sizeof(BaseBackupTargetHandle));
+ handle->type = ttype;
+ handle->detail_arg = ttype->check_detail(target, target_detail);
+
+ return handle;
+ }
+ }
+
+ /* Did not find the target. */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unrecognized target: \"%s\"", target)));
+
+ /* keep compiler quiet */
+ return NULL;
+}
+
+/*
+ * Construct a bbsink that will implement the backup target.
+ *
+ * The get_sink function does all the real work, so all we have to do here
+ * is call it with the correct arguments. Whatever the check_detail function
+ * returned is here passed through to the get_sink function. This lets those
+ * two functions communicate with each other, if they wish. If not, the
+ * check_detail function can simply return the target_detail and let the
+ * get_sink function take it from there.
+ */
+bbsink *
+BaseBackupGetSink(BaseBackupTargetHandle *handle, bbsink *next_sink)
+{
+ return handle->type->get_sink(next_sink, handle->detail_arg);
+}
+
+/*
+ * Load predefined target types into BaseBackupTargetTypeList.
+ */
+static void
+initialize_target_list(void)
+{
+ BaseBackupTargetType *ttype = builtin_backup_targets;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ while (ttype->name != NULL)
+ {
+ BaseBackupTargetTypeList = lappend(BaseBackupTargetTypeList, ttype);
+ ++ttype;
+ }
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Normally, a get_sink function should construct and return a new bbsink that
+ * implements the backup target, but the 'blackhole' target just throws the
+ * data away. We could implement that by adding a bbsink that does nothing
+ * but forward, but it's even cheaper to implement that by not adding a bbsink
+ * at all.
+ */
+static bbsink *
+blackhole_get_sink(bbsink *next_sink, void *detail_arg)
+{
+ return next_sink;
+}
+
+/*
+ * Create a bbsink implementing a server-side backup.
+ */
+static bbsink *
+server_get_sink(bbsink *next_sink, void *detail_arg)
+{
+ return bbsink_server_new(next_sink, detail_arg);
+}
+
+/*
+ * Implement target-detail checking for a target that does not accept a
+ * detail.
+ */
+static void *
+reject_target_detail(char *target, char *target_detail)
+{
+ if (target_detail != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("target \"%s\" does not accept a target detail",
+ target)));
+
+ return NULL;
+}
+
+/*
+ * Implement target-detail checking for a server-side backup.
+ *
+ * target_detail should be the name of the directory to which the backup
+ * should be written, but we don't check that here. Rather, that check,
+ * as well as the necessary permissions checking, happens in bbsink_server_new.
+ */
+static void *
+server_check_detail(char *target, char *target_detail)
+{
+ if (target_detail == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("target \"%s\" requires a target detail",
+ target)));
+
+ return target_detail;
+}
diff --git a/src/backend/backup/basebackup_throttle.c b/src/backend/backup/basebackup_throttle.c
new file mode 100644
index 0000000..62ba732
--- /dev/null
+++ b/src/backend/backup/basebackup_throttle.c
@@ -0,0 +1,199 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_throttle.c
+ * Basebackup sink implementing throttling. Data is forwarded to the
+ * next base backup sink in the chain at a rate no greater than the
+ * configured maximum.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_throttle.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "backup/basebackup_sink.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_throttle
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* The actual number of bytes, transfer of which may cause sleep. */
+ uint64 throttling_sample;
+
+ /* Amount of data already transferred but not yet throttled. */
+ int64 throttling_counter;
+
+ /* The minimum time required to transfer throttling_sample bytes. */
+ TimeOffset elapsed_min_unit;
+
+ /* The last check of the transfer rate. */
+ TimestampTz throttled_last;
+} bbsink_throttle;
+
+static void bbsink_throttle_begin_backup(bbsink *sink);
+static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
+static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
+static void throttle(bbsink_throttle *sink, size_t increment);
+
+static const bbsink_ops bbsink_throttle_ops = {
+ .begin_backup = bbsink_throttle_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_throttle_archive_contents,
+ .end_archive = bbsink_forward_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_throttle_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY 8
+
+/*
+ * Create a new basebackup sink that performs throttling and forwards data
+ * to a successor sink.
+ */
+bbsink *
+bbsink_throttle_new(bbsink *next, uint32 maxrate)
+{
+ bbsink_throttle *sink;
+
+ Assert(next != NULL);
+ Assert(maxrate > 0);
+
+ sink = palloc0(sizeof(bbsink_throttle));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
+ sink->base.bbs_next = next;
+
+ sink->throttling_sample =
+ (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample bytes to be
+ * transferred.
+ */
+ sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ return &sink->base;
+}
+
+/*
+ * There's no real work to do here, but we need to record the current time so
+ * that it can be used for future calculations.
+ */
+static void
+bbsink_throttle_begin_backup(bbsink *sink)
+{
+ bbsink_throttle *mysink = (bbsink_throttle *) sink;
+
+ bbsink_forward_begin_backup(sink);
+
+ /* The 'real data' starts now (header was ignored). */
+ mysink->throttled_last = GetCurrentTimestamp();
+}
+
+/*
+ * First throttle, and then pass archive contents to next sink.
+ */
+static void
+bbsink_throttle_archive_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_archive_contents(sink, len);
+}
+
+/*
+ * First throttle, and then pass manifest contents to next sink.
+ */
+static void
+bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_manifest_contents(sink, len);
+}
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(bbsink_throttle *sink, size_t increment)
+{
+ TimeOffset elapsed_min;
+
+ Assert(sink->throttling_counter >= 0);
+
+ sink->throttling_counter += increment;
+ if (sink->throttling_counter < sink->throttling_sample)
+ return;
+
+ /* How much time should have elapsed at minimum? */
+ elapsed_min = sink->elapsed_min_unit *
+ (sink->throttling_counter / sink->throttling_sample);
+
+ /*
+ * Since the latch could be set repeatedly because of concurrently WAL
+ * activity, sleep in a loop to ensure enough time has passed.
+ */
+ for (;;)
+ {
+ TimeOffset elapsed,
+ sleep;
+ int wait_result;
+
+ /* Time elapsed since the last measurement (and possible wake up). */
+ elapsed = GetCurrentTimestamp() - sink->throttled_last;
+
+ /* sleep if the transfer is faster than it should be */
+ sleep = elapsed_min - elapsed;
+ if (sleep <= 0)
+ break;
+
+ ResetLatch(MyLatch);
+
+ /* We're eating a potentially set latch, so check for interrupts */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+ * the maximum time to sleep. Thus the cast to long is safe.
+ */
+ wait_result = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ (long) (sleep / 1000),
+ WAIT_EVENT_BASE_BACKUP_THROTTLE);
+
+ if (wait_result & WL_LATCH_SET)
+ CHECK_FOR_INTERRUPTS();
+
+ /* Done waiting? */
+ if (wait_result & WL_TIMEOUT)
+ break;
+ }
+
+ /*
+ * As we work with integers, only whole multiple of throttling_sample was
+ * processed. The rest will be done during the next call of this function.
+ */
+ sink->throttling_counter %= sink->throttling_sample;
+
+ /*
+ * Time interval for the remaining amount and possible next increments
+ * starts now.
+ */
+ sink->throttled_last = GetCurrentTimestamp();
+}
diff --git a/src/backend/backup/basebackup_zstd.c b/src/backend/backup/basebackup_zstd.c
new file mode 100644
index 0000000..84256e3
--- /dev/null
+++ b/src/backend/backup/basebackup_zstd.c
@@ -0,0 +1,313 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_zstd.c
+ * Basebackup sink implementing zstd compression.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/backup/basebackup_zstd.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
+
+#include "backup/basebackup_sink.h"
+
+#ifdef USE_ZSTD
+
+typedef struct bbsink_zstd
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* Compression options */
+ pg_compress_specification *compress;
+
+ ZSTD_CCtx *cctx;
+ ZSTD_outBuffer zstd_outBuf;
+} bbsink_zstd;
+
+static void bbsink_zstd_begin_backup(bbsink *sink);
+static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
+static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
+static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_zstd_end_archive(bbsink *sink);
+static void bbsink_zstd_cleanup(bbsink *sink);
+static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+
+static const bbsink_ops bbsink_zstd_ops = {
+ .begin_backup = bbsink_zstd_begin_backup,
+ .begin_archive = bbsink_zstd_begin_archive,
+ .archive_contents = bbsink_zstd_archive_contents,
+ .end_archive = bbsink_zstd_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_zstd_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_zstd_end_backup,
+ .cleanup = bbsink_zstd_cleanup
+};
+#endif
+
+/*
+ * Create a new basebackup sink that performs zstd compression.
+ */
+bbsink *
+bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
+{
+#ifndef USE_ZSTD
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("zstd compression is not supported by this build")));
+ return NULL; /* keep compiler quiet */
+#else
+ bbsink_zstd *sink;
+
+ Assert(next != NULL);
+
+ sink = palloc0(sizeof(bbsink_zstd));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
+ sink->base.bbs_next = next;
+ sink->compress = compress;
+
+ return &sink->base;
+#endif
+}
+
+#ifdef USE_ZSTD
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_zstd_begin_backup(bbsink *sink)
+{
+ bbsink_zstd *mysink = (bbsink_zstd *) sink;
+ size_t output_buffer_bound;
+ size_t ret;
+ pg_compress_specification *compress = mysink->compress;
+
+ mysink->cctx = ZSTD_createCCtx();
+ if (!mysink->cctx)
+ elog(ERROR, "could not create zstd compression context");
+
+ ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
+ compress->level);
+ if (ZSTD_isError(ret))
+ elog(ERROR, "could not set zstd compression level to %d: %s",
+ compress->level, ZSTD_getErrorName(ret));
+
+ if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
+ {
+ /*
+ * On older versions of libzstd, this option does not exist, and
+ * trying to set it will fail. Similarly for newer versions if they
+ * are compiled without threading support.
+ */
+ ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+ compress->workers);
+ if (ZSTD_isError(ret))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not set compression worker count to %d: %s",
+ compress->workers, ZSTD_getErrorName(ret)));
+ }
+
+ /*
+ * We need our own buffer, because we're going to pass different data to
+ * the next sink than what gets passed to us.
+ */
+ mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
+
+ /*
+ * Make sure that the next sink's bbs_buffer is big enough to accommodate
+ * the compressed input buffer.
+ */
+ output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
+
+ /*
+ * The buffer length is expected to be a multiple of BLCKSZ, so round up.
+ */
+ output_buffer_bound = output_buffer_bound + BLCKSZ -
+ (output_buffer_bound % BLCKSZ);
+
+ bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
+}
+
+/*
+ * Prepare to compress the next archive.
+ */
+static void
+bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_zstd *mysink = (bbsink_zstd *) sink;
+ char *zstd_archive_name;
+
+ /*
+ * At the start of each archive we reset the state to start a new
+ * compression operation. The parameters are sticky and they will stick
+ * around as we are resetting with option ZSTD_reset_session_only.
+ */
+ ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
+
+ mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
+ mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
+ mysink->zstd_outBuf.pos = 0;
+
+ /* Add ".zst" to the archive name. */
+ zstd_archive_name = psprintf("%s.zst", archive_name);
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
+ pfree(zstd_archive_name);
+}
+
+/*
+ * Compress the input data to the output buffer until we run out of input
+ * data. Each time the output buffer falls below the compression bound for
+ * the input buffer, invoke the archive_contents() method for the next sink.
+ *
+ * Note that since we're compressing the input, it may very commonly happen
+ * that we consume all the input data without filling the output buffer. In
+ * that case, the compressed representation of the current input data won't
+ * actually be sent to the next bbsink until a later call to this function,
+ * or perhaps even not until bbsink_zstd_end_archive() is invoked.
+ */
+static void
+bbsink_zstd_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_zstd *mysink = (bbsink_zstd *) sink;
+ ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
+
+ while (inBuf.pos < inBuf.size)
+ {
+ size_t yet_to_flush;
+ size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
+
+ /*
+ * If the out buffer is not left with enough space, send the output
+ * buffer to the next sink, and reset it.
+ */
+ if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
+ {
+ bbsink_archive_contents(mysink->base.bbs_next,
+ mysink->zstd_outBuf.pos);
+ mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
+ mysink->zstd_outBuf.size =
+ mysink->base.bbs_next->bbs_buffer_length;
+ mysink->zstd_outBuf.pos = 0;
+ }
+
+ yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
+ &inBuf, ZSTD_e_continue);
+
+ if (ZSTD_isError(yet_to_flush))
+ elog(ERROR,
+ "could not compress data: %s",
+ ZSTD_getErrorName(yet_to_flush));
+ }
+}
+
+/*
+ * There might be some data inside zstd's internal buffers; we need to get that
+ * flushed out, also end the zstd frame and then get that forwarded to the
+ * successor sink as archive content.
+ *
+ * Then we can end processing for this archive.
+ */
+static void
+bbsink_zstd_end_archive(bbsink *sink)
+{
+ bbsink_zstd *mysink = (bbsink_zstd *) sink;
+ size_t yet_to_flush;
+
+ do
+ {
+ ZSTD_inBuffer in = {NULL, 0, 0};
+ size_t max_needed = ZSTD_compressBound(0);
+
+ /*
+ * If the out buffer is not left with enough space, send the output
+ * buffer to the next sink, and reset it.
+ */
+ if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
+ {
+ bbsink_archive_contents(mysink->base.bbs_next,
+ mysink->zstd_outBuf.pos);
+ mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
+ mysink->zstd_outBuf.size =
+ mysink->base.bbs_next->bbs_buffer_length;
+ mysink->zstd_outBuf.pos = 0;
+ }
+
+ yet_to_flush = ZSTD_compressStream2(mysink->cctx,
+ &mysink->zstd_outBuf,
+ &in, ZSTD_e_end);
+
+ if (ZSTD_isError(yet_to_flush))
+ elog(ERROR, "could not compress data: %s",
+ ZSTD_getErrorName(yet_to_flush));
+
+ } while (yet_to_flush > 0);
+
+ /* Make sure to pass any remaining bytes to the next sink. */
+ if (mysink->zstd_outBuf.pos > 0)
+ bbsink_archive_contents(mysink->base.bbs_next,
+ mysink->zstd_outBuf.pos);
+
+ /* Pass on the information that this archive has ended. */
+ bbsink_forward_end_archive(sink);
+}
+
+/*
+ * Free the resources and context.
+ */
+static void
+bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ bbsink_zstd *mysink = (bbsink_zstd *) sink;
+
+ /* Release the context. */
+ if (mysink->cctx)
+ {
+ ZSTD_freeCCtx(mysink->cctx);
+ mysink->cctx = NULL;
+ }
+
+ bbsink_forward_end_backup(sink, endptr, endtli);
+}
+
+/*
+ * Manifest contents are not compressed, but we do need to copy them into
+ * the successor sink's buffer, because we have our own.
+ */
+static void
+bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
+{
+ memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
+ bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * In case the backup fails, make sure we free any compression context that
+ * got allocated, so that we don't leak memory.
+ */
+static void
+bbsink_zstd_cleanup(bbsink *sink)
+{
+ bbsink_zstd *mysink = (bbsink_zstd *) sink;
+
+ /* Release the context if not already released. */
+ if (mysink->cctx)
+ {
+ ZSTD_freeCCtx(mysink->cctx);
+ mysink->cctx = NULL;
+ }
+}
+
+#endif