summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_basebackup/bbstreamer_lz4.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/bin/pg_basebackup/bbstreamer_lz4.c
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_basebackup/bbstreamer_lz4.c')
-rw-r--r--src/bin/pg_basebackup/bbstreamer_lz4.c422
1 files changed, 422 insertions, 0 deletions
diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c
new file mode 100644
index 0000000..ed2aa01
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_lz4.c
@@ -0,0 +1,422 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_lz4.c
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/bbstreamer_lz4.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef USE_LZ4
+#include <lz4frame.h>
+#endif
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+#ifdef USE_LZ4
+typedef struct bbstreamer_lz4_frame
+{
+ bbstreamer base;
+
+ LZ4F_compressionContext_t cctx;
+ LZ4F_decompressionContext_t dctx;
+ LZ4F_preferences_t prefs;
+
+ size_t bytes_written;
+ bool header_written;
+} bbstreamer_lz4_frame;
+
+static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
+ .content = bbstreamer_lz4_compressor_content,
+ .finalize = bbstreamer_lz4_compressor_finalize,
+ .free = bbstreamer_lz4_compressor_free
+};
+
+static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context);
+static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
+ .content = bbstreamer_lz4_decompressor_content,
+ .finalize = bbstreamer_lz4_decompressor_finalize,
+ .free = bbstreamer_lz4_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs lz4 compression of tar
+ * blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
+{
+#ifdef USE_LZ4
+ bbstreamer_lz4_frame *streamer;
+ LZ4F_errorCode_t ctxError;
+ LZ4F_preferences_t *prefs;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_lz4_compressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ streamer->header_written = false;
+
+ /* Initialize stream compression preferences */
+ prefs = &streamer->prefs;
+ memset(prefs, 0, sizeof(LZ4F_preferences_t));
+ prefs->frameInfo.blockSizeID = LZ4F_max256KB;
+ prefs->compressionLevel = compress->level;
+
+ ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctxError))
+ pg_log_error("could not create lz4 compression context: %s",
+ LZ4F_getErrorName(ctxError));
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support lz4 compression");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_LZ4
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_lz4_frame *mystreamer;
+ uint8 *next_in,
+ *next_out;
+ size_t out_bound,
+ compressed_size,
+ avail_out;
+
+ mystreamer = (bbstreamer_lz4_frame *) streamer;
+ next_in = (uint8 *) data;
+
+ /* Write header before processing the first input chunk. */
+ if (!mystreamer->header_written)
+ {
+ compressed_size = LZ4F_compressBegin(mystreamer->cctx,
+ (uint8 *) mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ &mystreamer->prefs);
+
+ if (LZ4F_isError(compressed_size))
+ pg_log_error("could not write lz4 header: %s",
+ LZ4F_getErrorName(compressed_size));
+
+ mystreamer->bytes_written += compressed_size;
+ mystreamer->header_written = true;
+ }
+
+ /*
+ * Update the offset and capacity of output buffer based on number of
+ * bytes written to output buffer.
+ */
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+ avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+ /*
+ * Find out the compression bound and make sure that output buffer has the
+ * required capacity for the success of LZ4F_compressUpdate. If needed
+ * forward the content to next streamer and empty the buffer.
+ */
+ out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
+ if (avail_out < out_bound)
+ {
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ context);
+
+ /* Enlarge buffer if it falls short of out bound. */
+ if (mystreamer->base.bbs_buffer.maxlen < out_bound)
+ enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
+
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->bytes_written = 0;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ }
+
+ /*
+ * This call compresses the data starting at next_in and generates the
+ * output starting at next_out. It expects the caller to provide the size
+ * of input buffer and capacity of output buffer by providing parameters
+ * len and avail_out.
+ *
+ * It returns the number of bytes compressed to output buffer.
+ */
+ compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
+ next_out, avail_out,
+ next_in, len, NULL);
+
+ if (LZ4F_isError(compressed_size))
+ pg_log_error("could not compress data: %s",
+ LZ4F_getErrorName(compressed_size));
+
+ mystreamer->bytes_written += compressed_size;
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
+{
+ bbstreamer_lz4_frame *mystreamer;
+ uint8 *next_out;
+ size_t footer_bound,
+ compressed_size,
+ avail_out;
+
+ mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+ /* Find out the footer bound and update the output buffer. */
+ footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
+ if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
+ footer_bound)
+ {
+ bbstreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ BBSTREAMER_UNKNOWN);
+
+ /* Enlarge buffer if it falls short of footer bound. */
+ if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
+ enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
+
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->bytes_written = 0;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ }
+ else
+ {
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+ avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+ }
+
+ /*
+ * Finalize the frame and flush whatever data remaining in compression
+ * context.
+ */
+ compressed_size = LZ4F_compressEnd(mystreamer->cctx,
+ next_out, avail_out, NULL);
+
+ if (LZ4F_isError(compressed_size))
+ pg_log_error("could not end lz4 compression: %s",
+ LZ4F_getErrorName(compressed_size));
+
+ mystreamer->bytes_written += compressed_size;
+
+ bbstreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ BBSTREAMER_UNKNOWN);
+
+ bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_compressor_free(bbstreamer *streamer)
+{
+ bbstreamer_lz4_frame *mystreamer;
+
+ mystreamer = (bbstreamer_lz4_frame *) streamer;
+ bbstreamer_free(streamer->bbs_next);
+ LZ4F_freeCompressionContext(mystreamer->cctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of lz4
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_decompressor_new(bbstreamer *next)
+{
+#ifdef USE_LZ4
+ bbstreamer_lz4_frame *streamer;
+ LZ4F_errorCode_t ctxError;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+ *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+ &bbstreamer_lz4_decompressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+
+ /* Initialize internal stream state for decompression */
+ ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctxError))
+ pg_fatal("could not initialize compression library: %s",
+ LZ4F_getErrorName(ctxError));
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support lz4 compression");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_LZ4
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+ bbstreamer_member *member,
+ const char *data, int len,
+ bbstreamer_archive_context context)
+{
+ bbstreamer_lz4_frame *mystreamer;
+ uint8 *next_in,
+ *next_out;
+ size_t avail_in,
+ avail_out;
+
+ mystreamer = (bbstreamer_lz4_frame *) streamer;
+ next_in = (uint8 *) data;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ avail_in = len;
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+
+ while (avail_in > 0)
+ {
+ size_t ret,
+ read_size,
+ out_size;
+
+ read_size = avail_in;
+ out_size = avail_out;
+
+ /*
+ * This call decompresses the data starting at next_in and generates
+ * the output data starting at next_out. It expects the caller to
+ * provide size of the input buffer and total capacity of the output
+ * buffer by providing the read_size and out_size parameters
+ * respectively.
+ *
+ * Per the documentation of LZ4, parameters read_size and out_size
+ * behaves as dual parameters. On return, the number of bytes consumed
+ * from the input buffer will be written back to read_size and the
+ * number of bytes decompressed to output buffer will be written back
+ * to out_size respectively.
+ */
+ ret = LZ4F_decompress(mystreamer->dctx,
+ next_out, &out_size,
+ next_in, &read_size, NULL);
+
+ if (LZ4F_isError(ret))
+ pg_log_error("could not decompress data: %s",
+ LZ4F_getErrorName(ret));
+
+ /* Update input buffer based on number of bytes consumed */
+ avail_in -= read_size;
+ next_in += read_size;
+
+ mystreamer->bytes_written += out_size;
+
+ /*
+ * If output buffer is full then forward the content to next streamer
+ * and update the output buffer.
+ */
+ if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+ {
+ bbstreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ context);
+
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->bytes_written = 0;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ }
+ else
+ {
+ avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+ next_out += mystreamer->bytes_written;
+ }
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
+{
+ bbstreamer_lz4_frame *mystreamer;
+
+ mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+ /*
+ * End of the stream, if there is some pending data in output buffers then
+ * we must forward it to next streamer.
+ */
+ bbstreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ BBSTREAMER_UNKNOWN);
+
+ bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
+{
+ bbstreamer_lz4_frame *mystreamer;
+
+ mystreamer = (bbstreamer_lz4_frame *) streamer;
+ bbstreamer_free(streamer->bbs_next);
+ LZ4F_freeDecompressionContext(mystreamer->dctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif