summaryrefslogtreecommitdiffstats
path: root/src/lib-compression/ostream-zstd.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib-compression/ostream-zstd.c')
-rw-r--r--src/lib-compression/ostream-zstd.c243
1 files changed, 243 insertions, 0 deletions
diff --git a/src/lib-compression/ostream-zstd.c b/src/lib-compression/ostream-zstd.c
new file mode 100644
index 0000000..e40380b
--- /dev/null
+++ b/src/lib-compression/ostream-zstd.c
@@ -0,0 +1,243 @@
+/* Copyright (c) 2020 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+
+#ifdef HAVE_ZSTD
+
+#include "ostream.h"
+#include "ostream-private.h"
+#include "ostream-zlib.h"
+
+#include "zstd.h"
+#include "zstd_errors.h"
+#include "iostream-zstd-private.h"
+
+struct zstd_ostream {
+ struct ostream_private ostream;
+
+ ZSTD_CStream *cstream;
+ ZSTD_outBuffer output;
+
+ unsigned char *outbuf;
+
+ bool flushed:1;
+ bool closed:1;
+ bool finished:1;
+};
+
+int compression_get_min_level_zstd(void)
+{
+#if HAVE_DECL_ZSTD_MINCLEVEL == 1
+ return ZSTD_minCLevel();
+#else
+ return 1;
+#endif
+}
+
+int compression_get_default_level_zstd(void)
+{
+#ifdef ZSTD_CLEVEL_DEFAULT
+ return ZSTD_CLEVEL_DEFAULT;
+#else
+ /* Documentation says 3 is default */
+ return 3;
+#endif
+}
+
+int compression_get_max_level_zstd(void)
+{
+ return ZSTD_maxCLevel();
+}
+
+static void o_stream_zstd_write_error(struct zstd_ostream *zstream, size_t err)
+{
+ ZSTD_ErrorCode errcode = zstd_version_errcode(ZSTD_getErrorCode(err));
+ const char *error = ZSTD_getErrorName(err);
+ if (errcode == ZSTD_error_memory_allocation)
+ i_fatal_status(FATAL_OUTOFMEM, "zstd.write(%s): Out of memory",
+ o_stream_get_name(&zstream->ostream.ostream));
+ else if (errcode == ZSTD_error_prefix_unknown ||
+#if HAVE_DECL_ZSTD_ERROR_PARAMETER_UNSUPPORTED == 1
+ errcode == ZSTD_error_parameter_unsupported ||
+#endif
+ errcode == ZSTD_error_dictionary_wrong ||
+ errcode == ZSTD_error_init_missing)
+ zstream->ostream.ostream.stream_errno = EINVAL;
+ else
+ zstream->ostream.ostream.stream_errno = EIO;
+
+ io_stream_set_error(&zstream->ostream.iostream,
+ "zstd.write(%s): %s at %"PRIuUOFF_T,
+ o_stream_get_name(&zstream->ostream.ostream), error,
+ zstream->ostream.ostream.offset);
+}
+
+static ssize_t o_stream_zstd_send_outbuf(struct zstd_ostream *zstream)
+{
+ ssize_t ret;
+ /* nothing to send */
+ if (zstream->output.pos == 0)
+ return 1;
+ ret = o_stream_send(zstream->ostream.parent, zstream->output.dst,
+ zstream->output.pos);
+ if (ret < 0) {
+ o_stream_copy_error_from_parent(&zstream->ostream);
+ return -1;
+ } else {
+ memmove(zstream->outbuf, zstream->outbuf+ret, zstream->output.pos-ret);
+ zstream->output.pos -= ret;
+ }
+ if (zstream->output.pos > 0)
+ return 0;
+ return 1;
+}
+
+static ssize_t
+o_stream_zstd_sendv(struct ostream_private *stream,
+ const struct const_iovec *iov, unsigned int iov_count)
+{
+ struct zstd_ostream *zstream =
+ container_of(stream, struct zstd_ostream, ostream);
+ ssize_t total = 0;
+ size_t ret;
+
+ for (unsigned int i = 0; i < iov_count; i++) {
+ /* does it actually fit there */
+ ZSTD_inBuffer input = {
+ .src = iov[i].iov_base,
+ .pos = 0,
+ .size = iov[i].iov_len
+ };
+ bool flush_attempted = FALSE;
+ for (;;) {
+ size_t prev_pos = input.pos;
+ ret = ZSTD_compressStream(zstream->cstream, &zstream->output,
+ &input);
+ if (ZSTD_isError(ret) != 0) {
+ o_stream_zstd_write_error(zstream, ret);
+ return -1;
+ }
+ size_t new_input_size = input.pos - prev_pos;
+ if (new_input_size == 0 && flush_attempted) {
+ /* non-blocking output buffer full */
+ return total;
+ }
+ stream->ostream.offset += new_input_size;
+ total += new_input_size;
+ if (input.pos == input.size)
+ break;
+ /* output buffer full. try to flush it. */
+ if (o_stream_zstd_send_outbuf(zstream) < 0)
+ return -1;
+ flush_attempted = TRUE;
+ }
+ }
+ if (o_stream_zstd_send_outbuf(zstream) < 0)
+ return -1;
+ return total;
+}
+
+static int o_stream_zstd_send_flush(struct zstd_ostream *zstream, bool final)
+{
+ int ret;
+
+ if (zstream->flushed) {
+ i_assert(zstream->output.pos == 0);
+ return 1;
+ }
+
+ if ((ret = o_stream_flush_parent_if_needed(&zstream->ostream)) <= 0)
+ return ret;
+
+ if (zstream->output.pos == 0)
+ ZSTD_flushStream(zstream->cstream, &zstream->output);
+
+ if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0)
+ return ret;
+
+ if (!final)
+ return 1;
+
+ if (!zstream->finished) {
+ ret = ZSTD_endStream(zstream->cstream, &zstream->output);
+ if (ZSTD_isError(ret) != 0) {
+ o_stream_zstd_write_error(zstream, ret);
+ return -1;
+ }
+ zstream->finished = TRUE;
+ }
+
+ if ((ret = o_stream_zstd_send_outbuf(zstream)) <= 0)
+ return ret;
+
+ if (final)
+ zstream->flushed = TRUE;
+ i_assert(zstream->output.pos == 0);
+ return 1;
+}
+
+static int o_stream_zstd_flush(struct ostream_private *stream)
+{
+ struct zstd_ostream *zstream =
+ container_of(stream, struct zstd_ostream, ostream);
+
+ int ret;
+ if ((ret = o_stream_zstd_send_flush(zstream, stream->finished)) < 0)
+ return -1;
+ else if (ret > 0)
+ return o_stream_flush_parent(stream);
+ return ret;
+}
+
+static void o_stream_zstd_close(struct iostream_private *stream,
+ bool close_parent)
+{
+ struct ostream_private *_ostream =
+ container_of(stream, struct ostream_private, iostream);
+ struct zstd_ostream *zstream =
+ container_of(_ostream, struct zstd_ostream, ostream);
+
+ i_assert(zstream->ostream.finished ||
+ zstream->ostream.ostream.stream_errno != 0 ||
+ zstream->ostream.error_handling_disabled);
+ if (zstream->cstream != NULL) {
+ ZSTD_freeCStream(zstream->cstream);
+ zstream->cstream = NULL;
+ }
+ i_free(zstream->outbuf);
+ i_zero(&zstream->output);
+ if (close_parent)
+ o_stream_close(zstream->ostream.parent);
+}
+
+struct ostream *
+o_stream_create_zstd(struct ostream *output, int level)
+{
+ struct zstd_ostream *zstream;
+ size_t ret;
+
+ i_assert(level >= compression_get_min_level_zstd() &&
+ level <= compression_get_max_level_zstd());
+
+ zstd_version_check();
+
+ zstream = i_new(struct zstd_ostream, 1);
+ zstream->ostream.sendv = o_stream_zstd_sendv;
+ zstream->ostream.flush = o_stream_zstd_flush;
+ zstream->ostream.iostream.close = o_stream_zstd_close;
+ zstream->cstream = ZSTD_createCStream();
+ if (zstream->cstream == NULL)
+ i_fatal_status(FATAL_OUTOFMEM, "zstd: Out of memory");
+ ret = ZSTD_initCStream(zstream->cstream, level);
+ if (ZSTD_isError(ret) != 0)
+ o_stream_zstd_write_error(zstream, ret);
+ else {
+ zstream->outbuf = i_malloc(ZSTD_CStreamOutSize());
+ zstream->output.dst = zstream->outbuf;
+ zstream->output.size = ZSTD_CStreamOutSize();
+ }
+ return o_stream_create(&zstream->ostream, output,
+ o_stream_get_fd(output));
+}
+
+#endif