summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_basebackup/bbstreamer_gzip.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/bin/pg_basebackup/bbstreamer_gzip.c
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_basebackup/bbstreamer_gzip.c')
-rw-r--r--src/bin/pg_basebackup/bbstreamer_gzip.c364
1 files changed, 364 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/bbstreamer_gzip.c b/src/bin/pg_basebackup/bbstreamer_gzip.c
new file mode 100644
index 0000000..c3455ff
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_gzip.c
@@ -0,0 +1,364 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_gzip.c
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_gzip.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+#ifdef HAVE_LIBZ
+typedef struct bbstreamer_gzip_writer
+{
+ bbstreamer base;
+ char *pathname;
+ gzFile gzfile;
+} bbstreamer_gzip_writer;
+
+typedef struct bbstreamer_gzip_decompressor
+{
+ bbstreamer base;
+ z_stream zstream;
+ size_t bytes_written;
+} bbstreamer_gzip_decompressor;
+
+static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
+static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
+static const char *get_gz_error(gzFile gzf);
+
+const bbstreamer_ops bbstreamer_gzip_writer_ops = {
+ .content = bbstreamer_gzip_writer_content,
+ .finalize = bbstreamer_gzip_writer_finalize,
+ .free = bbstreamer_gzip_writer_free
+};
+
+static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer);
+static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer);
+static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
+static void gzip_pfree(void *opaque, void *address);
+
+const bbstreamer_ops bbstreamer_gzip_decompressor_ops = {
+ .content = bbstreamer_gzip_decompressor_content,
+ .finalize = bbstreamer_gzip_decompressor_finalize,
+ .free = bbstreamer_gzip_decompressor_free
+};
+#endif
+
+/*
+ * Create a bbstreamer that just compresses data using gzip, and then writes
+ * it to a file.
+ *
+ * As in the case of bbstreamer_plain_writer_new, pathname is always used
+ * for error reporting purposes; if file is NULL, it is also the opened and
+ * closed so that the data may be written there.
+ */
+bbstreamer *
+bbstreamer_gzip_writer_new(char *pathname, FILE *file,
+ pg_compress_specification *compress)
+{
+#ifdef HAVE_LIBZ
+ bbstreamer_gzip_writer *streamer;
+
+ streamer = palloc0(sizeof(bbstreamer_gzip_writer));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_gzip_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+
+ if (file == NULL)
+ {
+ streamer->gzfile = gzopen(pathname, "wb");
+ if (streamer->gzfile == NULL)
+ pg_fatal("could not create compressed file \"%s\": %m",
+ pathname);
+ }
+ else
+ {
+ int fd = dup(fileno(file));
+
+ if (fd < 0)
+ pg_fatal("could not duplicate stdout: %m");
+
+ streamer->gzfile = gzdopen(fd, "wb");
+ if (streamer->gzfile == NULL)
+ pg_fatal("could not open output file: %m");
+ }
+
+ if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
+ pg_fatal("could not set compression level %d: %s",
+ compress->level, get_gz_error(streamer->gzfile));
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support gzip compression");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Write archive content to gzip file.
+ */
+static void
+bbstreamer_gzip_writer_content(bbstreamer *streamer,
+ bbstreamer_member *member, const char *data,
+ int len, bbstreamer_archive_context context)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (gzwrite(mystreamer->gzfile, data, len) != len)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_fatal("could not write to compressed file \"%s\": %s",
+ mystreamer->pathname, get_gz_error(mystreamer->gzfile));
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a gzip file consists of just
+ * calling gzclose.
+ *
+ * It makes no difference whether we opened the file or the caller did it,
+ * because libz provides no way of avoiding a close on the underling file
+ * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
+ * work around this issue, so that the behavior from the caller's viewpoint
+ * is the same as for bbstreamer_plain_writer.
+ */
+static void
+bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ errno = 0; /* in case gzclose() doesn't set it */
+ if (gzclose(mystreamer->gzfile) != 0)
+ pg_fatal("could not close compressed file \"%s\": %m",
+ mystreamer->pathname);
+
+ mystreamer->gzfile = NULL;
+}
+
+/*
+ * Free memory associated with this bbstreamer.
+ */
+static void
+bbstreamer_gzip_writer_free(bbstreamer *streamer)
+{
+ bbstreamer_gzip_writer *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_writer *) streamer;
+
+ Assert(mystreamer->base.bbs_next == NULL);
+ Assert(mystreamer->gzfile == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Helper function for libz error reporting.
+ */
+static const char *
+get_gz_error(gzFile gzf)
+{
+ int errnum;
+ const char *errmsg;
+
+ errmsg = gzerror(gzf, &errnum);
+ if (errnum == Z_ERRNO)
+ return strerror(errno);
+ else
+ return errmsg;
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of gzip
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_gzip_decompressor_new(bbstreamer *next)
+{
+#ifdef HAVE_LIBZ
+ bbstreamer_gzip_decompressor *streamer;
+ z_stream *zs;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(bbstreamer_gzip_decompressor));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_gzip_decompressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+
+ /* Initialize internal stream state for decompression */
+ zs = &streamer->zstream;
+ zs->zalloc = gzip_palloc;
+ zs->zfree = gzip_pfree;
+ zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
+ zs->avail_out = streamer->base.bbs_buffer.maxlen;
+
+ /*
+ * Data compression was initialized using deflateInit2 to request a gzip
+ * header. Similarly, we are using inflateInit2 to initialize data
+ * decompression.
+ *
+ * Per the documentation for inflateInit2, the second argument is
+ * "windowBits" and its value must be greater than or equal to the value
+ * provided while compressing the data, so we are using the maximum
+ * possible value for safety.
+ */
+ if (inflateInit2(zs, 15 + 16) != Z_OK)
+ pg_fatal("could not initialize compression library");
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support gzip compression");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_gzip_decompressor *mystreamer;
+ z_stream *zs;
+
+ mystreamer = (bbstreamer_gzip_decompressor *) streamer;
+
+ zs = &mystreamer->zstream;
+ zs->next_in = (uint8 *) data;
+ zs->avail_in = len;
+
+ /* Process the current chunk */
+ while (zs->avail_in > 0)
+ {
+ int res;
+
+ Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
+
+ zs->next_out = (uint8 *)
+ mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+ zs->avail_out =
+ mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+ /*
+ * This call decompresses data starting at zs->next_in and updates
+ * zs->next_in * and zs->avail_in. It generates output data starting
+ * at zs->next_out and updates zs->next_out and zs->avail_out
+ * accordingly.
+ */
+ res = inflate(zs, Z_NO_FLUSH);
+
+ if (res == Z_STREAM_ERROR)
+ pg_log_error("could not decompress data: %s", zs->msg);
+
+ mystreamer->bytes_written =
+ mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
+
+ /* If output buffer is full then pass data to next streamer */
+ if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+ {
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen, context);
+ mystreamer->bytes_written = 0;
+ }
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer)
+{
+ bbstreamer_gzip_decompressor *mystreamer;
+
+ mystreamer = (bbstreamer_gzip_decompressor *) streamer;
+
+ /*
+ * End of the stream, if there is some pending data in output buffers then
+ * we must forward it to next streamer.
+ */
+ bbstreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ BBSTREAMER_UNKNOWN);
+
+ bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_gzip_decompressor_free(bbstreamer *streamer)
+{
+ bbstreamer_free(streamer->bbs_next);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+
+/*
+ * Wrapper function to adjust the signature of palloc to match what libz
+ * expects.
+ */
+static void *
+gzip_palloc(void *opaque, unsigned items, unsigned size)
+{
+ return palloc(items * size);
+}
+
+/*
+ * Wrapper function to adjust the signature of pfree to match what libz
+ * expects.
+ */
+static void
+gzip_pfree(void *opaque, void *address)
+{
+ pfree(address);
+}
+#endif