diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:36:47 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 17:36:47 +0000 |
commit | 0441d265f2bb9da249c7abf333f0f771fadb4ab5 (patch) | |
tree | 3f3789daa2f6db22da6e55e92bee0062a7d613fe /src/lib-compression/ostream-zstd.c | |
parent | Initial commit. (diff) | |
download | dovecot-0441d265f2bb9da249c7abf333f0f771fadb4ab5.tar.xz dovecot-0441d265f2bb9da249c7abf333f0f771fadb4ab5.zip |
Adding upstream version 1:2.3.21+dfsg1.upstream/1%2.3.21+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/lib-compression/ostream-zstd.c')
-rw-r--r-- | src/lib-compression/ostream-zstd.c | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/src/lib-compression/ostream-zstd.c b/src/lib-compression/ostream-zstd.c new file mode 100644 index 0000000..e40380b --- /dev/null +++ b/src/lib-compression/ostream-zstd.c @@ -0,0 +1,243 @@ +/* Copyright (c) 2020 Dovecot authors, see the included COPYING file */ + +#include "lib.h" + +#ifdef HAVE_ZSTD + +#include "ostream.h" +#include "ostream-private.h" +#include "ostream-zlib.h" + +#include "zstd.h" +#include "zstd_errors.h" +#include "iostream-zstd-private.h" + +struct zstd_ostream { + struct ostream_private ostream; + + ZSTD_CStream *cstream; + ZSTD_outBuffer output; + + unsigned char *outbuf; + + bool flushed:1; + bool closed:1; + bool finished:1; +}; + +int compression_get_min_level_zstd(void) +{ +#if HAVE_DECL_ZSTD_MINCLEVEL == 1 + return ZSTD_minCLevel(); +#else + return 1; +#endif +} + +int compression_get_default_level_zstd(void) +{ +#ifdef ZSTD_CLEVEL_DEFAULT + return ZSTD_CLEVEL_DEFAULT; +#else + /* Documentation says 3 is default */ + return 3; +#endif +} + +int compression_get_max_level_zstd(void) +{ + return ZSTD_maxCLevel(); +} + +static void o_stream_zstd_write_error(struct zstd_ostream *zstream, size_t err) +{ + ZSTD_ErrorCode errcode = zstd_version_errcode(ZSTD_getErrorCode(err)); + const char *error = ZSTD_getErrorName(err); + if (errcode == ZSTD_error_memory_allocation) + i_fatal_status(FATAL_OUTOFMEM, "zstd.write(%s): Out of memory", + o_stream_get_name(&zstream->ostream.ostream)); + else if (errcode == ZSTD_error_prefix_unknown || +#if HAVE_DECL_ZSTD_ERROR_PARAMETER_UNSUPPORTED == 1 + errcode == ZSTD_error_parameter_unsupported || +#endif + errcode == ZSTD_error_dictionary_wrong || + errcode == ZSTD_error_init_missing) + zstream->ostream.ostream.stream_errno = EINVAL; + else + zstream->ostream.ostream.stream_errno = EIO; + + io_stream_set_error(&zstream->ostream.iostream, + "zstd.write(%s): %s at %"PRIuUOFF_T, + o_stream_get_name(&zstream->ostream.ostream), error, + zstream->ostream.ostream.offset); +} + +static ssize_t o_stream_zstd_send_outbuf(struct zstd_ostream *zstream) +{ + ssize_t ret; + /* nothing to send */ + if (zstream->output.pos == 0) + return 1; + ret = o_stream_send(zstream->ostream.parent, zstream->output.dst, + zstream->output.pos); + if (ret < 0) { + o_stream_copy_error_from_parent(&zstream->ostream); + return -1; + } else { + memmove(zstream->outbuf, zstream->outbuf+ret, zstream->output.pos-ret); + zstream->output.pos -= ret; + } + if (zstream->output.pos > 0) + return 0; + return 1; +} + +static ssize_t +o_stream_zstd_sendv(struct ostream_private *stream, + const struct const_iovec *iov, unsigned int iov_count) +{ + struct zstd_ostream *zstream = + container_of(stream, struct zstd_ostream, ostream); + ssize_t total = 0; + size_t ret; + + for (unsigned int i = 0; i < iov_count; i++) { + /* does it actually fit there */ + ZSTD_inBuffer input = { + .src = iov[i].iov_base, + .pos = 0, + .size = iov[i].iov_len + }; + bool flush_attempted = FALSE; + for (;;) { + size_t prev_pos = input.pos; + ret = ZSTD_compressStream(zstream->cstream, &zstream->output, + &input); + if (ZSTD_isError(ret) != 0) { + o_stream_zstd_write_error(zstream, ret); + return -1; + } + size_t new_input_size = input.pos - prev_pos; + if (new_input_size == 0 && flush_attempted) { + /* non-blocking output buffer full */ + return total; + } + stream->ostream.offset += new_input_size; + total += new_input_size; + if (input.pos == input.size) + break; + /* output buffer full. try to flush it. */ + if (o_stream_zstd_send_outbuf(zstream) < 0) + return -1; + flush_attempted = TRUE; + } + } + if (o_stream_zstd_send_outbuf(zstream) < 0) + return -1; + return total; +} + +static int o_stream_zstd_send_flush(struct zstd_ostream *zstream, bool final) +{ + int ret; + + if (zstream->flushed) { + i_assert(zstream->output.pos == 0); + return 1; + } + + if ((ret = o_stream_flush_parent_if_needed(&zstream->ostream)) <= 0) + return ret; + + if (zstream->output.pos == 0) + ZSTD_flushStream(zstream->cstream, &zstream->output); + + if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0) + return ret; + + if (!final) + return 1; + + if (!zstream->finished) { + ret = ZSTD_endStream(zstream->cstream, &zstream->output); + if (ZSTD_isError(ret) != 0) { + o_stream_zstd_write_error(zstream, ret); + return -1; + } + zstream->finished = TRUE; + } + + if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0) + return ret; + + if (final) + zstream->flushed = TRUE; + i_assert(zstream->output.pos == 0); + return 1; +} + +static int o_stream_zstd_flush(struct ostream_private *stream) +{ + struct zstd_ostream *zstream = + container_of(stream, struct zstd_ostream, ostream); + + int ret; + if ((ret = o_stream_zstd_send_flush(zstream, stream->finished)) < 0) + return -1; + else if (ret > 0) + return o_stream_flush_parent(stream); + return ret; +} + +static void o_stream_zstd_close(struct iostream_private *stream, + bool close_parent) +{ + struct ostream_private *_ostream = + container_of(stream, struct ostream_private, iostream); + struct zstd_ostream *zstream = + container_of(_ostream, struct zstd_ostream, ostream); + + i_assert(zstream->ostream.finished || + zstream->ostream.ostream.stream_errno != 0 || + zstream->ostream.error_handling_disabled); + if (zstream->cstream != NULL) { + ZSTD_freeCStream(zstream->cstream); + zstream->cstream = NULL; + } + i_free(zstream->outbuf); + i_zero(&zstream->output); + if (close_parent) + o_stream_close(zstream->ostream.parent); +} + +struct ostream * +o_stream_create_zstd(struct ostream *output, int level) +{ + struct zstd_ostream *zstream; + size_t ret; + + i_assert(level >= compression_get_min_level_zstd() && + level <= compression_get_max_level_zstd()); + + zstd_version_check(); + + zstream = i_new(struct zstd_ostream, 1); + zstream->ostream.sendv = o_stream_zstd_sendv; + zstream->ostream.flush = o_stream_zstd_flush; + zstream->ostream.iostream.close = o_stream_zstd_close; + zstream->cstream = ZSTD_createCStream(); + if (zstream->cstream == NULL) + i_fatal_status(FATAL_OUTOFMEM, "zstd: Out of memory"); + ret = ZSTD_initCStream(zstream->cstream, level); + if (ZSTD_isError(ret) != 0) + o_stream_zstd_write_error(zstream, ret); + else { + zstream->outbuf = i_malloc(ZSTD_CStreamOutSize()); + zstream->output.dst = zstream->outbuf; + zstream->output.size = ZSTD_CStreamOutSize(); + } + return o_stream_create(&zstream->ostream, output, + o_stream_get_fd(output)); +} + +#endif |