summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_basebackup/walmethods.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/bin/pg_basebackup/walmethods.c
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r--src/bin/pg_basebackup/walmethods.c1386
1 files changed, 1386 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
new file mode 100644
index 0000000..602727f
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -0,0 +1,1386 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.c - implementations of different ways to write received wal
+ *
+ * NOTE! The caller must ensure that only one method is instantiated in
+ * any given program, and that it's only instantiated once!
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/walmethods.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <time.h>
+#include <unistd.h>
+
+#ifdef USE_LZ4
+#include <lz4frame.h>
+#endif
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "common/file_perm.h"
+#include "common/file_utils.h"
+#include "common/logging.h"
+#include "pgtar.h"
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* Size of zlib buffer for .tar.gz */
+#define ZLIB_OUT_SIZE 4096
+
+/* Size of LZ4 input chunk for .lz4 */
+#define LZ4_IN_SIZE 4096
+
+/*-------------------------------------------------------------------------
+ * WalDirectoryMethod - write wal to a directory looking like pg_wal
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * Global static data for this method
+ */
+typedef struct DirectoryMethodData
+{
+ char *basedir;
+ pg_compress_algorithm compression_algorithm;
+ int compression_level;
+ bool sync;
+ const char *lasterrstring; /* if set, takes precedence over lasterrno */
+ int lasterrno;
+} DirectoryMethodData;
+static DirectoryMethodData *dir_data = NULL;
+
+/*
+ * Local file handle
+ */
+typedef struct DirectoryMethodFile
+{
+ int fd;
+ off_t currpos;
+ char *pathname;
+ char *fullpath;
+ char *temp_suffix;
+#ifdef HAVE_LIBZ
+ gzFile gzfp;
+#endif
+#ifdef USE_LZ4
+ LZ4F_compressionContext_t ctx;
+ size_t lz4bufsize;
+ void *lz4buf;
+#endif
+} DirectoryMethodFile;
+
+#define dir_clear_error() \
+ (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
+#define dir_set_error(msg) \
+ (dir_data->lasterrstring = _(msg))
+
+static const char *
+dir_getlasterror(void)
+{
+ if (dir_data->lasterrstring)
+ return dir_data->lasterrstring;
+ return strerror(dir_data->lasterrno);
+}
+
+static char *
+dir_get_file_name(const char *pathname, const char *temp_suffix)
+{
+ char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
+
+ snprintf(filename, MAXPGPATH, "%s%s%s",
+ pathname,
+ dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
+ dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
+ temp_suffix ? temp_suffix : "");
+
+ return filename;
+}
+
+static Walfile
+dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+ char tmppath[MAXPGPATH];
+ char *filename;
+ int fd;
+ DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+ gzFile gzfp = NULL;
+#endif
+#ifdef USE_LZ4
+ LZ4F_compressionContext_t ctx = NULL;
+ size_t lz4bufsize = 0;
+ void *lz4buf = NULL;
+#endif
+
+ dir_clear_error();
+
+ filename = dir_get_file_name(pathname, temp_suffix);
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, filename);
+ pg_free(filename);
+
+ /*
+ * Open a file for non-compressed as well as compressed files. Tracking
+ * the file descriptor is important for dir_sync() method as gzflush()
+ * does not do any system calls to fsync() to make changes permanent on
+ * disk.
+ */
+ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
+ if (fd < 0)
+ {
+ dir_data->lasterrno = errno;
+ return NULL;
+ }
+
+#ifdef HAVE_LIBZ
+ if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ gzfp = gzdopen(fd, "wb");
+ if (gzfp == NULL)
+ {
+ dir_data->lasterrno = errno;
+ close(fd);
+ return NULL;
+ }
+
+ if (gzsetparams(gzfp, dir_data->compression_level,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ dir_data->lasterrno = errno;
+ gzclose(gzfp);
+ return NULL;
+ }
+ }
+#endif
+#ifdef USE_LZ4
+ if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ {
+ size_t ctx_out;
+ size_t header_size;
+ LZ4F_preferences_t prefs;
+
+ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctx_out))
+ {
+ dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
+ close(fd);
+ return NULL;
+ }
+
+ lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
+ lz4buf = pg_malloc0(lz4bufsize);
+
+ /* assign the compression level, default is 0 */
+ memset(&prefs, 0, sizeof(prefs));
+ prefs.compressionLevel = dir_data->compression_level;
+
+ /* add the header */
+ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
+ if (LZ4F_isError(header_size))
+ {
+ dir_data->lasterrstring = LZ4F_getErrorName(header_size);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ return NULL;
+ }
+
+ errno = 0;
+ if (write(fd, lz4buf, header_size) != header_size)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ return NULL;
+ }
+ }
+#endif
+
+ /* Do pre-padding on non-compressed files */
+ if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
+ {
+ PGAlignedXLogBlock zerobuf;
+ int bytes;
+
+ memset(zerobuf.data, 0, XLOG_BLCKSZ);
+ for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
+ {
+ errno = 0;
+ if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ close(fd);
+ return NULL;
+ }
+ }
+
+ if (lseek(fd, 0, SEEK_SET) != 0)
+ {
+ dir_data->lasterrno = errno;
+ close(fd);
+ return NULL;
+ }
+ }
+
+ /*
+ * fsync WAL file and containing directory, to ensure the file is
+ * persistently created and zeroed (if padded). That's particularly
+ * important when using synchronous mode, where the file is modified and
+ * fsynced in-place, without a directory fsync.
+ */
+ if (dir_data->sync)
+ {
+ if (fsync_fname(tmppath, false) != 0 ||
+ fsync_parent_path(tmppath) != 0)
+ {
+ dir_data->lasterrno = errno;
+#ifdef HAVE_LIBZ
+ if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ gzclose(gzfp);
+ else
+#endif
+#ifdef USE_LZ4
+ if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ {
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ }
+ else
+#endif
+ close(fd);
+ return NULL;
+ }
+ }
+
+ f = pg_malloc0(sizeof(DirectoryMethodFile));
+#ifdef HAVE_LIBZ
+ if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ f->gzfp = gzfp;
+#endif
+#ifdef USE_LZ4
+ if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ {
+ f->ctx = ctx;
+ f->lz4buf = lz4buf;
+ f->lz4bufsize = lz4bufsize;
+ }
+#endif
+
+ f->fd = fd;
+ f->currpos = 0;
+ f->pathname = pg_strdup(pathname);
+ f->fullpath = pg_strdup(tmppath);
+ if (temp_suffix)
+ f->temp_suffix = pg_strdup(temp_suffix);
+
+ return f;
+}
+
+static ssize_t
+dir_write(Walfile f, const void *buf, size_t count)
+{
+ ssize_t r;
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+
+ Assert(f != NULL);
+ dir_clear_error();
+
+#ifdef HAVE_LIBZ
+ if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ errno = 0;
+ r = (ssize_t) gzwrite(df->gzfp, buf, count);
+ if (r != count)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ }
+ }
+ else
+#endif
+#ifdef USE_LZ4
+ if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ {
+ size_t chunk;
+ size_t remaining;
+ const void *inbuf = buf;
+
+ remaining = count;
+ while (remaining > 0)
+ {
+ size_t compressed;
+
+ if (remaining > LZ4_IN_SIZE)
+ chunk = LZ4_IN_SIZE;
+ else
+ chunk = remaining;
+
+ remaining -= chunk;
+ compressed = LZ4F_compressUpdate(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ inbuf, chunk,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ {
+ dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ return -1;
+ }
+
+ errno = 0;
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ return -1;
+ }
+
+ inbuf = ((char *) inbuf) + chunk;
+ }
+
+ /* Our caller keeps track of the uncompressed size. */
+ r = (ssize_t) count;
+ }
+ else
+#endif
+ {
+ errno = 0;
+ r = write(df->fd, buf, count);
+ if (r != count)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ }
+ }
+ if (r > 0)
+ df->currpos += r;
+ return r;
+}
+
+static off_t
+dir_get_current_pos(Walfile f)
+{
+ Assert(f != NULL);
+ dir_clear_error();
+
+ /* Use a cached value to prevent lots of reseeks */
+ return ((DirectoryMethodFile *) f)->currpos;
+}
+
+static int
+dir_close(Walfile f, WalCloseMethod method)
+{
+ int r;
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ char tmppath[MAXPGPATH];
+ char tmppath2[MAXPGPATH];
+
+ Assert(f != NULL);
+ dir_clear_error();
+
+#ifdef HAVE_LIBZ
+ if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ errno = 0; /* in case gzclose() doesn't set it */
+ r = gzclose(df->gzfp);
+ }
+ else
+#endif
+#ifdef USE_LZ4
+ if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ {
+ size_t compressed;
+
+ compressed = LZ4F_compressEnd(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ {
+ dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ return -1;
+ }
+
+ errno = 0;
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ return -1;
+ }
+
+ r = close(df->fd);
+ }
+ else
+#endif
+ r = close(df->fd);
+
+ if (r == 0)
+ {
+ /* Build path to the current version of the file */
+ if (method == CLOSE_NORMAL && df->temp_suffix)
+ {
+ char *filename;
+ char *filename2;
+
+ /*
+ * If we have a temp prefix, normal operation is to rename the
+ * file.
+ */
+ filename = dir_get_file_name(df->pathname, df->temp_suffix);
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, filename);
+ pg_free(filename);
+
+ /* permanent name, so no need for the prefix */
+ filename2 = dir_get_file_name(df->pathname, NULL);
+ snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
+ dir_data->basedir, filename2);
+ pg_free(filename2);
+ if (dir_data->sync)
+ r = durable_rename(tmppath, tmppath2);
+ else
+ {
+ if (rename(tmppath, tmppath2) != 0)
+ {
+ pg_log_error("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, tmppath2);
+ r = -1;
+ }
+ }
+ }
+ else if (method == CLOSE_UNLINK)
+ {
+ char *filename;
+
+ /* Unlink the file once it's closed */
+ filename = dir_get_file_name(df->pathname, df->temp_suffix);
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, filename);
+ pg_free(filename);
+ r = unlink(tmppath);
+ }
+ else
+ {
+ /*
+ * Else either CLOSE_NORMAL and no temp suffix, or
+ * CLOSE_NO_RENAME. In this case, fsync the file and containing
+ * directory if sync mode is requested.
+ */
+ if (dir_data->sync)
+ {
+ r = fsync_fname(df->fullpath, false);
+ if (r == 0)
+ r = fsync_parent_path(df->fullpath);
+ }
+ }
+ }
+
+ if (r != 0)
+ dir_data->lasterrno = errno;
+
+#ifdef USE_LZ4
+ pg_free(df->lz4buf);
+ /* supports free on NULL */
+ LZ4F_freeCompressionContext(df->ctx);
+#endif
+
+ pg_free(df->pathname);
+ pg_free(df->fullpath);
+ if (df->temp_suffix)
+ pg_free(df->temp_suffix);
+ pg_free(df);
+
+ return r;
+}
+
+static int
+dir_sync(Walfile f)
+{
+ int r;
+
+ Assert(f != NULL);
+ dir_clear_error();
+
+ if (!dir_data->sync)
+ return 0;
+
+#ifdef HAVE_LIBZ
+ if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
+ {
+ dir_data->lasterrno = errno;
+ return -1;
+ }
+ }
+#endif
+#ifdef USE_LZ4
+ if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ {
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ size_t compressed;
+
+ /* Flush any internal buffers */
+ compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
+ if (LZ4F_isError(compressed))
+ {
+ dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ return -1;
+ }
+
+ errno = 0;
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ dir_data->lasterrno = errno ? errno : ENOSPC;
+ return -1;
+ }
+ }
+#endif
+
+ r = fsync(((DirectoryMethodFile *) f)->fd);
+ if (r < 0)
+ dir_data->lasterrno = errno;
+ return r;
+}
+
+static ssize_t
+dir_get_file_size(const char *pathname)
+{
+ struct stat statbuf;
+ char tmppath[MAXPGPATH];
+
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, pathname);
+
+ if (stat(tmppath, &statbuf) != 0)
+ {
+ dir_data->lasterrno = errno;
+ return -1;
+ }
+
+ return statbuf.st_size;
+}
+
+static pg_compress_algorithm
+dir_compression_algorithm(void)
+{
+ return dir_data->compression_algorithm;
+}
+
+static bool
+dir_existsfile(const char *pathname)
+{
+ char tmppath[MAXPGPATH];
+ int fd;
+
+ dir_clear_error();
+
+ snprintf(tmppath, sizeof(tmppath), "%s/%s",
+ dir_data->basedir, pathname);
+
+ fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
+ if (fd < 0)
+ return false;
+ close(fd);
+ return true;
+}
+
+static bool
+dir_finish(void)
+{
+ dir_clear_error();
+
+ if (dir_data->sync)
+ {
+ /*
+ * Files are fsynced when they are closed, but we need to fsync the
+ * directory entry here as well.
+ */
+ if (fsync_fname(dir_data->basedir, true) != 0)
+ {
+ dir_data->lasterrno = errno;
+ return false;
+ }
+ }
+ return true;
+}
+
+
+WalWriteMethod *
+CreateWalDirectoryMethod(const char *basedir,
+ pg_compress_algorithm compression_algorithm,
+ int compression_level, bool sync)
+{
+ WalWriteMethod *method;
+
+ method = pg_malloc0(sizeof(WalWriteMethod));
+ method->open_for_write = dir_open_for_write;
+ method->write = dir_write;
+ method->get_current_pos = dir_get_current_pos;
+ method->get_file_size = dir_get_file_size;
+ method->get_file_name = dir_get_file_name;
+ method->compression_algorithm = dir_compression_algorithm;
+ method->close = dir_close;
+ method->sync = dir_sync;
+ method->existsfile = dir_existsfile;
+ method->finish = dir_finish;
+ method->getlasterror = dir_getlasterror;
+
+ dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+ dir_data->compression_algorithm = compression_algorithm;
+ dir_data->compression_level = compression_level;
+ dir_data->basedir = pg_strdup(basedir);
+ dir_data->sync = sync;
+
+ return method;
+}
+
+void
+FreeWalDirectoryMethod(void)
+{
+ pg_free(dir_data->basedir);
+ pg_free(dir_data);
+ dir_data = NULL;
+}
+
+
+/*-------------------------------------------------------------------------
+ * WalTarMethod - write wal to a tar file containing pg_wal contents
+ *-------------------------------------------------------------------------
+ */
+
+typedef struct TarMethodFile
+{
+ off_t ofs_start; /* Where does the *header* for this file start */
+ off_t currpos;
+ char header[TAR_BLOCK_SIZE];
+ char *pathname;
+ size_t pad_to_size;
+} TarMethodFile;
+
+typedef struct TarMethodData
+{
+ char *tarfilename;
+ int fd;
+ pg_compress_algorithm compression_algorithm;
+ int compression_level;
+ bool sync;
+ TarMethodFile *currentfile;
+ const char *lasterrstring; /* if set, takes precedence over lasterrno */
+ int lasterrno;
+#ifdef HAVE_LIBZ
+ z_streamp zp;
+ void *zlibOut;
+#endif
+} TarMethodData;
+static TarMethodData *tar_data = NULL;
+
+#define tar_clear_error() \
+ (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
+#define tar_set_error(msg) \
+ (tar_data->lasterrstring = _(msg))
+
+static const char *
+tar_getlasterror(void)
+{
+ if (tar_data->lasterrstring)
+ return tar_data->lasterrstring;
+ return strerror(tar_data->lasterrno);
+}
+
+#ifdef HAVE_LIBZ
+static bool
+tar_write_compressed_data(void *buf, size_t count, bool flush)
+{
+ tar_data->zp->next_in = buf;
+ tar_data->zp->avail_in = count;
+
+ while (tar_data->zp->avail_in || flush)
+ {
+ int r;
+
+ r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
+ if (r == Z_STREAM_ERROR)
+ {
+ tar_set_error("could not compress data");
+ return false;
+ }
+
+ if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+ {
+ size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+ errno = 0;
+ if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ tar_data->lasterrno = errno ? errno : ENOSPC;
+ return false;
+ }
+
+ tar_data->zp->next_out = tar_data->zlibOut;
+ tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+ }
+
+ if (r == Z_STREAM_END)
+ break;
+ }
+
+ if (flush)
+ {
+ /* Reset the stream for writing */
+ if (deflateReset(tar_data->zp) != Z_OK)
+ {
+ tar_set_error("could not reset compression stream");
+ return false;
+ }
+ }
+
+ return true;
+}
+#endif
+
+static ssize_t
+tar_write(Walfile f, const void *buf, size_t count)
+{
+ ssize_t r;
+
+ Assert(f != NULL);
+ tar_clear_error();
+
+ /* Tarfile will always be positioned at the end */
+ if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ {
+ errno = 0;
+ r = write(tar_data->fd, buf, count);
+ if (r != count)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ tar_data->lasterrno = errno ? errno : ENOSPC;
+ return -1;
+ }
+ ((TarMethodFile *) f)->currpos += r;
+ return r;
+ }
+#ifdef HAVE_LIBZ
+ else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
+ return -1;
+ ((TarMethodFile *) f)->currpos += count;
+ return count;
+ }
+#endif
+ else
+ {
+ /* Can't happen - compression enabled with no method set */
+ tar_data->lasterrno = ENOSYS;
+ return -1;
+ }
+}
+
+static bool
+tar_write_padding_data(TarMethodFile *f, size_t bytes)
+{
+ PGAlignedXLogBlock zerobuf;
+ size_t bytesleft = bytes;
+
+ memset(zerobuf.data, 0, XLOG_BLCKSZ);
+ while (bytesleft)
+ {
+ size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
+ ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
+
+ if (r < 0)
+ return false;
+ bytesleft -= r;
+ }
+
+ return true;
+}
+
+static char *
+tar_get_file_name(const char *pathname, const char *temp_suffix)
+{
+ char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
+
+ snprintf(filename, MAXPGPATH, "%s%s",
+ pathname, temp_suffix ? temp_suffix : "");
+
+ return filename;
+}
+
+static Walfile
+tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+ char *tmppath;
+
+ tar_clear_error();
+
+ if (tar_data->fd < 0)
+ {
+ /*
+ * We open the tar file only when we first try to write to it.
+ */
+ tar_data->fd = open(tar_data->tarfilename,
+ O_WRONLY | O_CREAT | PG_BINARY,
+ pg_file_create_mode);
+ if (tar_data->fd < 0)
+ {
+ tar_data->lasterrno = errno;
+ return NULL;
+ }
+
+#ifdef HAVE_LIBZ
+ if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+ tar_data->zp->zalloc = Z_NULL;
+ tar_data->zp->zfree = Z_NULL;
+ tar_data->zp->opaque = Z_NULL;
+ tar_data->zp->next_out = tar_data->zlibOut;
+ tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+
+ /*
+ * Initialize deflation library. Adding the magic value 16 to the
+ * default 15 for the windowBits parameter makes the output be
+ * gzip instead of zlib.
+ */
+ if (deflateInit2(tar_data->zp, tar_data->compression_level,
+ Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ pg_free(tar_data->zp);
+ tar_data->zp = NULL;
+ tar_set_error("could not initialize compression library");
+ return NULL;
+ }
+ }
+#endif
+
+ /* There's no tar header itself, the file starts with regular files */
+ }
+
+ if (tar_data->currentfile != NULL)
+ {
+ tar_set_error("implementation error: tar files can't have more than one open file");
+ return NULL;
+ }
+
+ tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+
+ tmppath = tar_get_file_name(pathname, temp_suffix);
+
+ /* Create a header with size set to 0 - we will fill out the size on close */
+ if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
+ {
+ pg_free(tar_data->currentfile);
+ pg_free(tmppath);
+ tar_data->currentfile = NULL;
+ tar_set_error("could not create tar header");
+ return NULL;
+ }
+
+ pg_free(tmppath);
+
+#ifdef HAVE_LIBZ
+ if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ /* Flush existing data */
+ if (!tar_write_compressed_data(NULL, 0, true))
+ return NULL;
+
+ /* Turn off compression for header */
+ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ tar_set_error("could not change compression parameters");
+ return NULL;
+ }
+ }
+#endif
+
+ tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
+ if (tar_data->currentfile->ofs_start == -1)
+ {
+ tar_data->lasterrno = errno;
+ pg_free(tar_data->currentfile);
+ tar_data->currentfile = NULL;
+ return NULL;
+ }
+ tar_data->currentfile->currpos = 0;
+
+ if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ {
+ errno = 0;
+ if (write(tar_data->fd, tar_data->currentfile->header,
+ TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ tar_data->lasterrno = errno ? errno : ENOSPC;
+ pg_free(tar_data->currentfile);
+ tar_data->currentfile = NULL;
+ return NULL;
+ }
+ }
+#ifdef HAVE_LIBZ
+ else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ /* Write header through the zlib APIs but with no compression */
+ if (!tar_write_compressed_data(tar_data->currentfile->header,
+ TAR_BLOCK_SIZE, true))
+ return NULL;
+
+ /* Re-enable compression for the rest of the file */
+ if (deflateParams(tar_data->zp, tar_data->compression_level,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ tar_set_error("could not change compression parameters");
+ return NULL;
+ }
+ }
+#endif
+ else
+ {
+ /* not reachable */
+ Assert(false);
+ }
+
+ tar_data->currentfile->pathname = pg_strdup(pathname);
+
+ /*
+ * Uncompressed files are padded on creation, but for compression we can't
+ * do that
+ */
+ if (pad_to_size)
+ {
+ tar_data->currentfile->pad_to_size = pad_to_size;
+ if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ {
+ /* Uncompressed, so pad now */
+ if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
+ return NULL;
+ /* Seek back to start */
+ if (lseek(tar_data->fd,
+ tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
+ SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
+ {
+ tar_data->lasterrno = errno;
+ return NULL;
+ }
+
+ tar_data->currentfile->currpos = 0;
+ }
+ }
+
+ return tar_data->currentfile;
+}
+
+static ssize_t
+tar_get_file_size(const char *pathname)
+{
+ tar_clear_error();
+
+ /* Currently not used, so not supported */
+ tar_data->lasterrno = ENOSYS;
+ return -1;
+}
+
+static pg_compress_algorithm
+tar_compression_algorithm(void)
+{
+ return tar_data->compression_algorithm;
+}
+
+static off_t
+tar_get_current_pos(Walfile f)
+{
+ Assert(f != NULL);
+ tar_clear_error();
+
+ return ((TarMethodFile *) f)->currpos;
+}
+
+static int
+tar_sync(Walfile f)
+{
+ int r;
+
+ Assert(f != NULL);
+ tar_clear_error();
+
+ if (!tar_data->sync)
+ return 0;
+
+ /*
+ * Always sync the whole tarfile, because that's all we can do. This makes
+ * no sense on compressed files, so just ignore those.
+ */
+ if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ return 0;
+
+ r = fsync(tar_data->fd);
+ if (r < 0)
+ tar_data->lasterrno = errno;
+ return r;
+}
+
+static int
+tar_close(Walfile f, WalCloseMethod method)
+{
+ ssize_t filesize;
+ int padding;
+ TarMethodFile *tf = (TarMethodFile *) f;
+
+ Assert(f != NULL);
+ tar_clear_error();
+
+ if (method == CLOSE_UNLINK)
+ {
+ if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ {
+ tar_set_error("unlink not supported with compression");
+ return -1;
+ }
+
+ /*
+ * Unlink the file that we just wrote to the tar. We do this by
+ * truncating it to the start of the header. This is safe as we only
+ * allow writing of the very last file.
+ */
+ if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
+ {
+ tar_data->lasterrno = errno;
+ return -1;
+ }
+
+ pg_free(tf->pathname);
+ pg_free(tf);
+ tar_data->currentfile = NULL;
+
+ return 0;
+ }
+
+ /*
+ * Pad the file itself with zeroes if necessary. Note that this is
+ * different from the tar format padding -- this is the padding we asked
+ * for when the file was opened.
+ */
+ if (tf->pad_to_size)
+ {
+ if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ /*
+ * A compressed tarfile is padded on close since we cannot know
+ * the size of the compressed output until the end.
+ */
+ size_t sizeleft = tf->pad_to_size - tf->currpos;
+
+ if (sizeleft)
+ {
+ if (!tar_write_padding_data(tf, sizeleft))
+ return -1;
+ }
+ }
+ else
+ {
+ /*
+ * An uncompressed tarfile was padded on creation, so just adjust
+ * the current position as if we seeked to the end.
+ */
+ tf->currpos = tf->pad_to_size;
+ }
+ }
+
+ /*
+ * Get the size of the file, and pad out to a multiple of the tar block
+ * size.
+ */
+ filesize = tar_get_current_pos(f);
+ padding = tarPaddingBytesRequired(filesize);
+ if (padding)
+ {
+ char zerobuf[TAR_BLOCK_SIZE];
+
+ MemSet(zerobuf, 0, padding);
+ if (tar_write(f, zerobuf, padding) != padding)
+ return -1;
+ }
+
+
+#ifdef HAVE_LIBZ
+ if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ /* Flush the current buffer */
+ if (!tar_write_compressed_data(NULL, 0, true))
+ return -1;
+ }
+#endif
+
+ /*
+ * Now go back and update the header with the correct filesize and
+ * possibly also renaming the file. We overwrite the entire current header
+ * when done, including the checksum.
+ */
+ print_tar_number(&(tf->header[124]), 12, filesize);
+
+ if (method == CLOSE_NORMAL)
+
+ /*
+ * We overwrite it with what it was before if we have no tempname,
+ * since we're going to write the buffer anyway.
+ */
+ strlcpy(&(tf->header[0]), tf->pathname, 100);
+
+ print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
+ if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
+ {
+ tar_data->lasterrno = errno;
+ return -1;
+ }
+ if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ {
+ errno = 0;
+ if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ tar_data->lasterrno = errno ? errno : ENOSPC;
+ return -1;
+ }
+ }
+#ifdef HAVE_LIBZ
+ else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ /* Turn off compression */
+ if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ tar_set_error("could not change compression parameters");
+ return -1;
+ }
+
+ /* Overwrite the header, assuming the size will be the same */
+ if (!tar_write_compressed_data(tar_data->currentfile->header,
+ TAR_BLOCK_SIZE, true))
+ return -1;
+
+ /* Turn compression back on */
+ if (deflateParams(tar_data->zp, tar_data->compression_level,
+ Z_DEFAULT_STRATEGY) != Z_OK)
+ {
+ tar_set_error("could not change compression parameters");
+ return -1;
+ }
+ }
+#endif
+ else
+ {
+ /* not reachable */
+ Assert(false);
+ }
+
+ /* Move file pointer back down to end, so we can write the next file */
+ if (lseek(tar_data->fd, 0, SEEK_END) < 0)
+ {
+ tar_data->lasterrno = errno;
+ return -1;
+ }
+
+ /* Always fsync on close, so the padding gets fsynced */
+ if (tar_sync(f) < 0)
+ {
+ /* XXX this seems pretty bogus; why is only this case fatal? */
+ pg_fatal("could not fsync file \"%s\": %s",
+ tf->pathname, tar_getlasterror());
+ }
+
+ /* Clean up and done */
+ pg_free(tf->pathname);
+ pg_free(tf);
+ tar_data->currentfile = NULL;
+
+ return 0;
+}
+
+static bool
+tar_existsfile(const char *pathname)
+{
+ tar_clear_error();
+ /* We only deal with new tarfiles, so nothing externally created exists */
+ return false;
+}
+
+static bool
+tar_finish(void)
+{
+ char zerobuf[1024];
+
+ tar_clear_error();
+
+ if (tar_data->currentfile)
+ {
+ if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+ return false;
+ }
+
+ /* A tarfile always ends with two empty blocks */
+ MemSet(zerobuf, 0, sizeof(zerobuf));
+ if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ {
+ errno = 0;
+ if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
+ {
+ /* If write didn't set errno, assume problem is no disk space */
+ tar_data->lasterrno = errno ? errno : ENOSPC;
+ return false;
+ }
+ }
+#ifdef HAVE_LIBZ
+ else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ {
+ if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+ return false;
+
+ /* Also flush all data to make sure the gzip stream is finished */
+ tar_data->zp->next_in = NULL;
+ tar_data->zp->avail_in = 0;
+ while (true)
+ {
+ int r;
+
+ r = deflate(tar_data->zp, Z_FINISH);
+
+ if (r == Z_STREAM_ERROR)
+ {
+ tar_set_error("could not compress data");
+ return false;
+ }
+ if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+ {
+ size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+ errno = 0;
+ if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+ {
+ /*
+ * If write didn't set errno, assume problem is no disk
+ * space.
+ */
+ tar_data->lasterrno = errno ? errno : ENOSPC;
+ return false;
+ }
+ }
+ if (r == Z_STREAM_END)
+ break;
+ }
+
+ if (deflateEnd(tar_data->zp) != Z_OK)
+ {
+ tar_set_error("could not close compression stream");
+ return false;
+ }
+ }
+#endif
+ else
+ {
+ /* not reachable */
+ Assert(false);
+ }
+
+ /* sync the empty blocks as well, since they're after the last file */
+ if (tar_data->sync)
+ {
+ if (fsync(tar_data->fd) != 0)
+ {
+ tar_data->lasterrno = errno;
+ return false;
+ }
+ }
+
+ if (close(tar_data->fd) != 0)
+ {
+ tar_data->lasterrno = errno;
+ return false;
+ }
+
+ tar_data->fd = -1;
+
+ if (tar_data->sync)
+ {
+ if (fsync_fname(tar_data->tarfilename, false) != 0 ||
+ fsync_parent_path(tar_data->tarfilename) != 0)
+ {
+ tar_data->lasterrno = errno;
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/*
+ * The argument compression_algorithm is currently ignored. It is in place for
+ * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
+ * between the different compression methods. CreateWalTarMethod and its family
+ * of functions handle only zlib compression.
+ */
+WalWriteMethod *
+CreateWalTarMethod(const char *tarbase,
+ pg_compress_algorithm compression_algorithm,
+ int compression_level, bool sync)
+{
+ WalWriteMethod *method;
+ const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
+ ".tar.gz" : ".tar";
+
+ method = pg_malloc0(sizeof(WalWriteMethod));
+ method->open_for_write = tar_open_for_write;
+ method->write = tar_write;
+ method->get_current_pos = tar_get_current_pos;
+ method->get_file_size = tar_get_file_size;
+ method->get_file_name = tar_get_file_name;
+ method->compression_algorithm = tar_compression_algorithm;
+ method->close = tar_close;
+ method->sync = tar_sync;
+ method->existsfile = tar_existsfile;
+ method->finish = tar_finish;
+ method->getlasterror = tar_getlasterror;
+
+ tar_data = pg_malloc0(sizeof(TarMethodData));
+ tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+ sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
+ tar_data->fd = -1;
+ tar_data->compression_algorithm = compression_algorithm;
+ tar_data->compression_level = compression_level;
+ tar_data->sync = sync;
+#ifdef HAVE_LIBZ
+ if (compression_algorithm == PG_COMPRESSION_GZIP)
+ tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+#endif
+
+ return method;
+}
+
+void
+FreeWalTarMethod(void)
+{
+ pg_free(tar_data->tarfilename);
+#ifdef HAVE_LIBZ
+ if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ pg_free(tar_data->zlibOut);
+#endif
+ pg_free(tar_data);
+ tar_data = NULL;
+}