summaryrefslogtreecommitdiffstats
path: root/src/compressor
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/compressor
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/compressor')
-rw-r--r--src/compressor/CMakeLists.txt44
-rw-r--r--src/compressor/CompressionPlugin.h48
-rw-r--r--src/compressor/Compressor.cc108
-rw-r--r--src/compressor/Compressor.h109
-rw-r--r--src/compressor/QatAccel.cc142
-rw-r--r--src/compressor/QatAccel.h36
-rw-r--r--src/compressor/brotli/BrotliCompressor.cc96
-rw-r--r--src/compressor/brotli/BrotliCompressor.h31
-rw-r--r--src/compressor/brotli/CMakeLists.txt34
-rw-r--r--src/compressor/brotli/CompressionPluginBrotli.cc19
-rw-r--r--src/compressor/brotli/CompressionPluginBrotli.h36
-rw-r--r--src/compressor/lz4/CMakeLists.txt14
-rw-r--r--src/compressor/lz4/CompressionPluginLZ4.cc36
-rw-r--r--src/compressor/lz4/CompressionPluginLZ4.h41
-rw-r--r--src/compressor/lz4/LZ4Compressor.h148
-rw-r--r--src/compressor/snappy/CMakeLists.txt14
-rw-r--r--src/compressor/snappy/CompressionPluginSnappy.cc38
-rw-r--r--src/compressor/snappy/CompressionPluginSnappy.h42
-rw-r--r--src/compressor/snappy/SnappyCompressor.h116
-rw-r--r--src/compressor/zlib/CMakeLists.txt59
-rw-r--r--src/compressor/zlib/CompressionPluginZlib.cc38
-rw-r--r--src/compressor/zlib/CompressionPluginZlib.h55
-rw-r--r--src/compressor/zlib/ZlibCompressor.cc255
-rw-r--r--src/compressor/zlib/ZlibCompressor.h46
-rw-r--r--src/compressor/zstd/CMakeLists.txt36
-rw-r--r--src/compressor/zstd/CompressionPluginZstd.cc36
-rw-r--r--src/compressor/zstd/CompressionPluginZstd.h43
-rw-r--r--src/compressor/zstd/ZstdCompressor.h107
28 files changed, 1827 insertions, 0 deletions
diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt
new file mode 100644
index 000000000..d8b9fd83c
--- /dev/null
+++ b/src/compressor/CMakeLists.txt
@@ -0,0 +1,44 @@
+
+set(compressor_srcs
+ Compressor.cc)
+if (HAVE_QATZIP)
+ list(APPEND compressor_srcs QatAccel.cc)
+endif()
+add_library(compressor_objs OBJECT ${compressor_srcs})
+
+## compressor plugins
+
+set(compressor_plugin_dir ${CEPH_INSTALL_PKGLIBDIR}/compressor)
+
+add_subdirectory(snappy)
+add_subdirectory(zlib)
+add_subdirectory(zstd)
+
+if(HAVE_LZ4)
+ add_subdirectory(lz4)
+endif()
+
+if(HAVE_BROTLI)
+ add_subdirectory(brotli)
+endif()
+
+add_library(compressor STATIC ${compressor_srcs})
+if(HAVE_QATZIP)
+ target_link_libraries(compressor PRIVATE ${QATZIP_LIBRARIES})
+endif()
+
+set(ceph_compressor_libs
+ ceph_snappy
+ ceph_zlib
+ ceph_zstd)
+
+if(HAVE_LZ4)
+ list(APPEND ceph_compressor_libs ceph_lz4)
+endif()
+
+if(HAVE_BROTLI)
+ list(APPEND ceph_compressor_libs ceph_brotli)
+endif()
+
+add_custom_target(compressor_plugins DEPENDS
+ ${ceph_compressor_libs})
diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h
new file mode 100644
index 000000000..2a21f2fef
--- /dev/null
+++ b/src/compressor/CompressionPlugin.h
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph distributed storage system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef COMPRESSION_PLUGIN_H
+#define COMPRESSION_PLUGIN_H
+
+#include <iosfwd>
+#include <iostream>
+
+#include "common/PluginRegistry.h"
+#include "include/common_fwd.h"
+#include "Compressor.h"
+
+namespace ceph {
+
+ class CompressionPlugin : public Plugin {
+ public:
+ TOPNSPC::CompressorRef compressor;
+
+ explicit CompressionPlugin(CephContext *cct)
+ : Plugin(cct)
+ {}
+
+ ~CompressionPlugin() override {}
+
+ virtual int factory(TOPNSPC::CompressorRef *cs,
+ std::ostream *ss) = 0;
+
+ virtual const char* name() {return "CompressionPlugin";}
+ };
+
+}
+
+#endif
diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc
new file mode 100644
index 000000000..fa0f052f6
--- /dev/null
+++ b/src/compressor/Compressor.cc
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <random>
+#include <sstream>
+#include <iterator>
+#include <algorithm>
+
+#include "CompressionPlugin.h"
+#include "Compressor.h"
+#include "include/random.h"
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/dout.h"
+
+namespace TOPNSPC {
+
+const char* Compressor::get_comp_alg_name(int a) {
+
+ auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),
+ [a](const auto& kv) { return kv.second == a; });
+
+ if (std::cend(compression_algorithms) == p)
+ return "???"; // It would be nice to revise this...
+
+ return p->first;
+}
+
+boost::optional<Compressor::CompressionAlgorithm>
+Compressor::get_comp_alg_type(std::string_view s) {
+
+ auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),
+ [&s](const auto& kv) { return kv.first == s; });
+ if (std::cend(compression_algorithms) == p)
+ return {};
+
+ return p->second;
+}
+
+const char *Compressor::get_comp_mode_name(int m) {
+ switch (m) {
+ case COMP_NONE: return "none";
+ case COMP_PASSIVE: return "passive";
+ case COMP_AGGRESSIVE: return "aggressive";
+ case COMP_FORCE: return "force";
+ default: return "???";
+ }
+}
+boost::optional<Compressor::CompressionMode>
+Compressor::get_comp_mode_type(std::string_view s) {
+ if (s == "force")
+ return COMP_FORCE;
+ if (s == "aggressive")
+ return COMP_AGGRESSIVE;
+ if (s == "passive")
+ return COMP_PASSIVE;
+ if (s == "none")
+ return COMP_NONE;
+ return boost::optional<CompressionMode>();
+}
+
+CompressorRef Compressor::create(CephContext *cct, const std::string &type)
+{
+ // support "random" for teuthology testing
+ if (type == "random") {
+ int alg = ceph::util::generate_random_number(0, COMP_ALG_LAST - 1);
+ if (alg == COMP_ALG_NONE) {
+ return nullptr;
+ }
+ return create(cct, alg);
+ }
+
+ CompressorRef cs_impl = NULL;
+ std::stringstream ss;
+ auto reg = cct->get_plugin_registry();
+ auto factory = dynamic_cast<ceph::CompressionPlugin*>(reg->get_with_load("compressor", type));
+ if (factory == NULL) {
+ lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl;
+ return NULL;
+ }
+ int err = factory->factory(&cs_impl, &ss);
+ if (err)
+ lderr(cct) << __func__ << " factory return error " << err << dendl;
+ return cs_impl;
+}
+
+CompressorRef Compressor::create(CephContext *cct, int alg)
+{
+ if (alg < 0 || alg >= COMP_ALG_LAST) {
+ lderr(cct) << __func__ << " invalid algorithm value:" << alg << dendl;
+ return CompressorRef();
+ }
+ std::string type_name = get_comp_alg_name(alg);
+ return create(cct, type_name);
+}
+
+} // namespace TOPNSPC
diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h
new file mode 100644
index 000000000..0a45a990a
--- /dev/null
+++ b/src/compressor/Compressor.h
@@ -0,0 +1,109 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMPRESSOR_H
+#define CEPH_COMPRESSOR_H
+
+#include <memory>
+#include <string>
+#include <string_view>
+#include <boost/optional.hpp>
+#include "include/ceph_assert.h" // boost clobbers this
+#include "include/common_fwd.h"
+#include "include/buffer.h"
+#include "include/int_types.h"
+#ifdef HAVE_QATZIP
+ #include "QatAccel.h"
+#endif
+
+namespace TOPNSPC {
+
+class Compressor;
+typedef std::shared_ptr<Compressor> CompressorRef;
+
+class Compressor {
+public:
+ enum CompressionAlgorithm {
+ COMP_ALG_NONE = 0,
+ COMP_ALG_SNAPPY = 1,
+ COMP_ALG_ZLIB = 2,
+ COMP_ALG_ZSTD = 3,
+#ifdef HAVE_LZ4
+ COMP_ALG_LZ4 = 4,
+#endif
+#ifdef HAVE_BROTLI
+ COMP_ALG_BROTLI = 5,
+#endif
+ COMP_ALG_LAST //the last value for range checks
+ };
+
+ using pair_type = std::pair<const char*, CompressionAlgorithm>;
+ static constexpr std::initializer_list<pair_type> compression_algorithms {
+ { "none", COMP_ALG_NONE },
+ { "snappy", COMP_ALG_SNAPPY },
+ { "zlib", COMP_ALG_ZLIB },
+ { "zstd", COMP_ALG_ZSTD },
+#ifdef HAVE_LZ4
+ { "lz4", COMP_ALG_LZ4 },
+#endif
+#ifdef HAVE_BROTLI
+ { "brotli", COMP_ALG_BROTLI },
+#endif
+ };
+
+ // compression options
+ enum CompressionMode {
+ COMP_NONE, ///< compress never
+ COMP_PASSIVE, ///< compress if hinted COMPRESSIBLE
+ COMP_AGGRESSIVE, ///< compress unless hinted INCOMPRESSIBLE
+ COMP_FORCE ///< compress always
+ };
+
+#ifdef HAVE_QATZIP
+ bool qat_enabled;
+ QatAccel qat_accel;
+#endif
+
+ static const char* get_comp_alg_name(int a);
+ static boost::optional<CompressionAlgorithm> get_comp_alg_type(std::string_view s);
+
+ static const char *get_comp_mode_name(int m);
+ static boost::optional<CompressionMode> get_comp_mode_type(std::string_view s);
+
+ Compressor(CompressionAlgorithm a, const char* t) : alg(a), type(t) {
+ }
+ virtual ~Compressor() {}
+ const std::string& get_type_name() const {
+ return type;
+ }
+ CompressionAlgorithm get_type() const {
+ return alg;
+ }
+ virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out, boost::optional<int32_t> &compressor_message) = 0;
+ virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out, boost::optional<int32_t> compressor_message) = 0;
+ // this is a bit weird but we need non-const iterator to be in
+ // alignment with decode methods
+ virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out, boost::optional<int32_t> compressor_message) = 0;
+
+ static CompressorRef create(CephContext *cct, const std::string &type);
+ static CompressorRef create(CephContext *cct, int alg);
+
+protected:
+ CompressionAlgorithm alg;
+ std::string type;
+
+};
+
+} // namespace TOPNSPC
+#endif
diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc
new file mode 100644
index 000000000..28e86e94e
--- /dev/null
+++ b/src/compressor/QatAccel.cc
@@ -0,0 +1,142 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "QatAccel.h"
+
+/* Estimate data expansion after decompression */
+static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
+
+QatAccel::~QatAccel() {
+ if (NULL != session.internal) {
+ qzTeardownSession(&session);
+ qzClose(&session);
+ }
+}
+
+bool QatAccel::init(const std::string &alg) {
+ QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
+ int rc;
+
+ rc = qzGetDefaults(&params);
+ if (rc != QZ_OK)
+ return false;
+ params.direction = QZ_DIR_BOTH;
+ if (alg == "snappy")
+ params.comp_algorithm = QZ_SNAPPY;
+ else if (alg == "zlib")
+ params.comp_algorithm = QZ_DEFLATE;
+ else if (alg == "lz4")
+ params.comp_algorithm = QZ_LZ4;
+ else
+ return false;
+
+ rc = qzSetDefaults(&params);
+ if (rc != QZ_OK)
+ return false;
+
+ rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW)
+ return false;
+
+ rc = qzSetupSession(&session, &params);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) {
+ qzTeardownSession(&session);
+ qzClose(&session);
+ return false;
+ }
+
+ return true;
+}
+
+int QatAccel::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) {
+ for (auto &i : in.buffers()) {
+ const unsigned char* c_in = (unsigned char*) i.c_str();
+ unsigned int len = i.length();
+ unsigned int out_len = qzMaxCompressedLength(len);
+
+ bufferptr ptr = buffer::create_small_page_aligned(out_len);
+ int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
+ if (rc != QZ_OK)
+ return -1;
+ out.append(ptr, 0, out_len);
+ }
+
+ return 0;
+}
+
+int QatAccel::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) {
+ auto i = in.begin();
+ return decompress(i, in.length(), out, compressor_message);
+}
+
+int QatAccel::decompress(bufferlist::const_iterator &p,
+ size_t compressed_len,
+ bufferlist &dst,
+ boost::optional<int32_t> compressor_message) {
+ unsigned int ratio_idx = 0;
+ bool read_more = false;
+ bool joint = false;
+ int rc = 0;
+ bufferlist tmp;
+ size_t remaining = MIN(p.get_remaining(), compressed_len);
+
+ while (remaining) {
+ if (p.end()) {
+ return -1;
+ }
+
+ bufferptr cur_ptr = p.get_current_ptr();
+ unsigned int len = cur_ptr.length();
+ if (joint) {
+ if (read_more)
+ tmp.append(cur_ptr.c_str(), len);
+ len = tmp.length();
+ }
+ unsigned int out_len = len * expansion_ratio[ratio_idx];
+ bufferptr ptr = buffer::create_small_page_aligned(out_len);
+
+ if (joint)
+ rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
+ else
+ rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
+ if (rc == QZ_DATA_ERROR) {
+ if (!joint) {
+ tmp.append(cur_ptr.c_str(), cur_ptr.length());
+ p += remaining;
+ joint = true;
+ }
+ read_more = true;
+ continue;
+ } else if (rc == QZ_BUF_ERROR) {
+ if (ratio_idx == std::size(expansion_ratio))
+ return -1;
+ if (joint)
+ read_more = false;
+ ratio_idx++;
+ continue;
+ } else if (rc != QZ_OK) {
+ return -1;
+ } else {
+ ratio_idx = 0;
+ joint = false;
+ read_more = false;
+ }
+
+ p += remaining;
+ remaining -= len;
+ dst.append(ptr, 0, out_len);
+ }
+
+ return 0;
+}
diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h
new file mode 100644
index 000000000..3f7ccd25d
--- /dev/null
+++ b/src/compressor/QatAccel.h
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_QATACCEL_H
+#define CEPH_QATACCEL_H
+
+#include <qatzip.h>
+#include <boost/optional.hpp>
+#include "include/buffer.h"
+
+class QatAccel {
+ QzSession_T session;
+
+ public:
+ QatAccel() : session({0}) {}
+ ~QatAccel();
+
+ bool init(const std::string &alg);
+
+ int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message);
+ int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message);
+ int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, boost::optional<int32_t> compressor_message);
+};
+
+#endif
diff --git a/src/compressor/brotli/BrotliCompressor.cc b/src/compressor/brotli/BrotliCompressor.cc
new file mode 100644
index 000000000..ed4abef4b
--- /dev/null
+++ b/src/compressor/brotli/BrotliCompressor.cc
@@ -0,0 +1,96 @@
+#include "brotli/encode.h"
+#include "brotli/decode.h"
+#include "BrotliCompressor.h"
+#include "include/scope_guard.h"
+
+#define MAX_LEN (CEPH_PAGE_SIZE)
+
+int BrotliCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
+{
+ BrotliEncoderState* s = BrotliEncoderCreateInstance(nullptr,
+ nullptr,
+ nullptr);
+ if (!s) {
+ return -1;
+ }
+ auto sg = make_scope_guard([&s] { BrotliEncoderDestroyInstance(s); });
+ BrotliEncoderSetParameter(s, BROTLI_PARAM_QUALITY, (uint32_t)9);
+ BrotliEncoderSetParameter(s, BROTLI_PARAM_LGWIN, 22);
+ for (auto i = in.buffers().begin(); i != in.buffers().end();) {
+ size_t available_in = i->length();
+ size_t max_comp_size = BrotliEncoderMaxCompressedSize(available_in);
+ size_t available_out = max_comp_size;
+ bufferptr ptr = buffer::create_small_page_aligned(max_comp_size);
+ uint8_t* next_out = (uint8_t*)ptr.c_str();
+ const uint8_t* next_in = (uint8_t*)i->c_str();
+ ++i;
+ BrotliEncoderOperation finish = i != in.buffers().end() ?
+ BROTLI_OPERATION_PROCESS :
+ BROTLI_OPERATION_FINISH;
+ do {
+ if (!BrotliEncoderCompressStream(s,
+ finish,
+ &available_in,
+ &next_in,
+ &available_out,
+ &next_out,
+ nullptr)) {
+ return -1;
+ }
+ unsigned have = max_comp_size - available_out;
+ out.append(ptr, 0, have);
+ } while (available_out == 0);
+ if (BrotliEncoderIsFinished(s)) {
+ break;
+ }
+ }
+ return 0;
+}
+
+int BrotliCompressor::decompress(bufferlist::const_iterator &p,
+ size_t compressed_size,
+ bufferlist &out,
+ boost::optional<int32_t> compressor_message)
+{
+ BrotliDecoderState* s = BrotliDecoderCreateInstance(nullptr,
+ nullptr,
+ nullptr);
+ if (!s) {
+ return -1;
+ }
+ auto sg = make_scope_guard([&s] { BrotliDecoderDestroyInstance(s); });
+ size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size);
+ while (remaining) {
+ const uint8_t* next_in;
+ size_t len = p.get_ptr_and_advance(remaining, (const char**)&next_in);
+ remaining -= len;
+ size_t available_in = len;
+ BrotliDecoderResult result = BROTLI_DECODER_RESULT_ERROR;
+ do {
+ size_t available_out = MAX_LEN;
+ bufferptr ptr = buffer::create_page_aligned(MAX_LEN);
+ uint8_t* next_out = (uint8_t*)ptr.c_str();
+ result = BrotliDecoderDecompressStream(s,
+ &available_in,
+ &next_in,
+ &available_out,
+ &next_out,
+ 0);
+ if (!result) {
+ return -1;
+ }
+ unsigned have = MAX_LEN - available_out;
+ out.append(ptr, 0, have);
+ } while (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
+ if (BrotliDecoderIsFinished(s)) {
+ break;
+ }
+ }
+ return 0;
+}
+
+int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message)
+{
+ auto i = std::cbegin(in);
+ return decompress(i, in.length(), out, compressor_message);
+}
diff --git a/src/compressor/brotli/BrotliCompressor.h b/src/compressor/brotli/BrotliCompressor.h
new file mode 100644
index 000000000..373300645
--- /dev/null
+++ b/src/compressor/brotli/BrotliCompressor.h
@@ -0,0 +1,31 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BROTLICOMPRESSOR_H
+#define CEPH_BROTLICOMPRESSOR_H
+
+
+#include "include/buffer.h"
+#include "compressor/Compressor.h"
+
+class BrotliCompressor : public Compressor
+{
+ public:
+ BrotliCompressor() : Compressor(COMP_ALG_BROTLI, "brotli") {}
+
+ int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override;
+ int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override;
+ int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override;
+};
+
+#endif //CEPH_BROTLICOMPRESSOR_H
+
diff --git a/src/compressor/brotli/CMakeLists.txt b/src/compressor/brotli/CMakeLists.txt
new file mode 100644
index 000000000..31a376289
--- /dev/null
+++ b/src/compressor/brotli/CMakeLists.txt
@@ -0,0 +1,34 @@
+# brotli
+
+set(brotli_sources
+ CompressionPluginBrotli.cc
+ BrotliCompressor.cc
+)
+include(ExternalProject)
+ExternalProject_Add(brotli_ext
+ DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/src/
+ GIT_REPOSITORY "https://github.com/google/brotli.git"
+ GIT_TAG "v1.0.7"
+ GIT_SHALLOW TRUE
+ SOURCE_DIR ${CMAKE_BINARY_DIR}/src/brotli
+ CONFIGURE_COMMAND ./configure-cmake --disable-debug
+ INSTALL_COMMAND ""
+ BUILD_COMMAND $(MAKE)
+ BUILD_IN_SOURCE 1
+ INSTALL_COMMAND "")
+
+set(brotli_libs enc dec common)
+file(MAKE_DIRECTORY "${CMAKE_BINARY_DIR}/src/brotli/c/include")
+foreach(lib ${brotli_libs})
+ add_library(brotli::${lib} STATIC IMPORTED)
+ add_dependencies(brotli::${lib} brotli_ext)
+ set_target_properties(brotli::${lib} PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_BINARY_DIR}/src/brotli/c/include"
+ IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/src/brotli/libbrotli${lib}-static.a")
+ list(APPEND BROTLI_LIBRARIES brotli::${lib})
+endforeach()
+
+add_library(ceph_brotli SHARED ${brotli_sources})
+list(REVERSE brotli_libs)
+target_link_libraries(ceph_brotli PRIVATE ${BROTLI_LIBRARIES})
+install(TARGETS ceph_brotli DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/brotli/CompressionPluginBrotli.cc b/src/compressor/brotli/CompressionPluginBrotli.cc
new file mode 100644
index 000000000..245f49dbb
--- /dev/null
+++ b/src/compressor/brotli/CompressionPluginBrotli.cc
@@ -0,0 +1,19 @@
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "CompressionPluginBrotli.h"
+#include "common/ceph_context.h"
+
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ PluginRegistry *instance = cct->get_plugin_registry();
+ return instance->add(type, name, new CompressionPluginBrotli(cct));
+}
+
diff --git a/src/compressor/brotli/CompressionPluginBrotli.h b/src/compressor/brotli/CompressionPluginBrotli.h
new file mode 100644
index 000000000..641a6e1c9
--- /dev/null
+++ b/src/compressor/brotli/CompressionPluginBrotli.h
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_BROTLI_H
+#define CEPH_COMPRESSION_PLUGIN_BROTLI_H
+
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "BrotliCompressor.h"
+
+class CompressionPluginBrotli : public CompressionPlugin {
+public:
+ explicit CompressionPluginBrotli(CephContext *cct) : CompressionPlugin(cct)
+ {}
+
+ virtual int factory(CompressorRef *cs, std::ostream *ss)
+ {
+ if (compressor == nullptr) {
+ BrotliCompressor *interface = new BrotliCompressor();
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/lz4/CMakeLists.txt b/src/compressor/lz4/CMakeLists.txt
new file mode 100644
index 000000000..ff8e14c29
--- /dev/null
+++ b/src/compressor/lz4/CMakeLists.txt
@@ -0,0 +1,14 @@
+# lz4
+
+set(lz4_sources
+ CompressionPluginLZ4.cc
+)
+
+add_library(ceph_lz4 SHARED ${lz4_sources})
+target_link_libraries(ceph_lz4
+ PRIVATE LZ4::LZ4 compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+set_target_properties(ceph_lz4 PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_lz4 DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/lz4/CompressionPluginLZ4.cc b/src/compressor/lz4/CompressionPluginLZ4.cc
new file mode 100644
index 000000000..0d05500e4
--- /dev/null
+++ b/src/compressor/lz4/CompressionPluginLZ4.cc
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 XSKY Inc.
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginLZ4.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginLZ4(cct));
+}
diff --git a/src/compressor/lz4/CompressionPluginLZ4.h b/src/compressor/lz4/CompressionPluginLZ4.h
new file mode 100644
index 000000000..38e5b9cc5
--- /dev/null
+++ b/src/compressor/lz4/CompressionPluginLZ4.h
@@ -0,0 +1,41 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 XSKY Inc.
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_LZ4_H
+#define CEPH_COMPRESSION_PLUGIN_LZ4_H
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "LZ4Compressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginLZ4 : public ceph::CompressionPlugin {
+
+public:
+
+ explicit CompressionPluginLZ4(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs, std::ostream *ss) override {
+ if (compressor == 0) {
+ LZ4Compressor *interface = new LZ4Compressor(cct);
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/lz4/LZ4Compressor.h b/src/compressor/lz4/LZ4Compressor.h
new file mode 100644
index 000000000..b97d91f2b
--- /dev/null
+++ b/src/compressor/lz4/LZ4Compressor.h
@@ -0,0 +1,148 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_LZ4COMPRESSOR_H
+#define CEPH_LZ4COMPRESSOR_H
+
+#include <lz4.h>
+
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "common/config.h"
+#include "common/Tub.h"
+
+
+class LZ4Compressor : public Compressor {
+ public:
+ LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
+
+ int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> &compressor_message) override {
+ // older versions of liblz4 introduce bit errors when compressing
+ // fragmented buffers. this was fixed in lz4 commit
+ // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
+ // appeared in v1.8.2.
+ //
+ // workaround: rebuild if not contiguous.
+ if (!src.is_contiguous()) {
+ ceph::buffer::list new_src = src;
+ new_src.rebuild();
+ return compress(new_src, dst, compressor_message);
+ }
+
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(src, dst, compressor_message);
+#endif
+ ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
+ LZ4_compressBound(src.length()));
+ LZ4_stream_t lz4_stream;
+ LZ4_resetStream(&lz4_stream);
+
+ using ceph::encode;
+
+ auto p = src.begin();
+ size_t left = src.length();
+ int pos = 0;
+ const char *data;
+ unsigned num = src.get_num_buffers();
+ encode((uint32_t)num, dst);
+ while (left) {
+ uint32_t origin_len = p.get_ptr_and_advance(left, &data);
+ int compressed_len = LZ4_compress_fast_continue(
+ &lz4_stream, data, outptr.c_str()+pos, origin_len,
+ outptr.length()-pos, 1);
+ if (compressed_len <= 0)
+ return -1;
+ pos += compressed_len;
+ left -= origin_len;
+ encode(origin_len, dst);
+ encode((uint32_t)compressed_len, dst);
+ }
+ ceph_assert(p.end());
+
+ dst.append(outptr, 0, pos);
+ return 0;
+ }
+
+ int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(src, dst, compressor_message);
+#endif
+ auto i = std::cbegin(src);
+ return decompress(i, src.length(), dst, compressor_message);
+ }
+
+ int decompress(ceph::buffer::list::const_iterator &p,
+ size_t compressed_len,
+ ceph::buffer::list &dst,
+ boost::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_len, dst, compressor_message);
+#endif
+ using ceph::decode;
+ uint32_t count;
+ std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs;
+ decode(count, p);
+ compressed_pairs.resize(count);
+ uint32_t total_origin = 0;
+ for (unsigned i = 0; i < count; ++i) {
+ decode(compressed_pairs[i].first, p);
+ decode(compressed_pairs[i].second, p);
+ total_origin += compressed_pairs[i].first;
+ }
+ compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
+
+ ceph::buffer::ptr dstptr(total_origin);
+ LZ4_streamDecode_t lz4_stream_decode;
+ LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
+
+ ceph::buffer::ptr cur_ptr = p.get_current_ptr();
+ ceph::buffer::ptr *ptr = &cur_ptr;
+ Tub<ceph::buffer::ptr> data_holder;
+ if (compressed_len != cur_ptr.length()) {
+ data_holder.construct(compressed_len);
+ p.copy_deep(compressed_len, *data_holder);
+ ptr = data_holder.get();
+ }
+
+ char *c_in = ptr->c_str();
+ char *c_out = dstptr.c_str();
+ for (unsigned i = 0; i < count; ++i) {
+ int r = LZ4_decompress_safe_continue(
+ &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
+ if (r == (int)compressed_pairs[i].first) {
+ c_in += compressed_pairs[i].second;
+ c_out += compressed_pairs[i].first;
+ } else if (r < 0) {
+ return -1;
+ } else {
+ return -2;
+ }
+ }
+ dst.push_back(std::move(dstptr));
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt
new file mode 100644
index 000000000..d1ba3b2e7
--- /dev/null
+++ b/src/compressor/snappy/CMakeLists.txt
@@ -0,0 +1,14 @@
+# snappy
+
+set(snappy_sources
+ CompressionPluginSnappy.cc
+)
+
+add_library(ceph_snappy SHARED ${snappy_sources})
+target_link_libraries(ceph_snappy
+ PRIVATE snappy::snappy compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+set_target_properties(ceph_snappy PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_snappy DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc
new file mode 100644
index 000000000..85b6cf94a
--- /dev/null
+++ b/src/compressor/snappy/CompressionPluginSnappy.cc
@@ -0,0 +1,38 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+
+// -----------------------------------------------------------------------------
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginSnappy.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginSnappy(cct));
+}
diff --git a/src/compressor/snappy/CompressionPluginSnappy.h b/src/compressor/snappy/CompressionPluginSnappy.h
new file mode 100644
index 000000000..a9bc98f73
--- /dev/null
+++ b/src/compressor/snappy/CompressionPluginSnappy.h
@@ -0,0 +1,42 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_SNAPPY_H
+#define CEPH_COMPRESSION_PLUGIN_SNAPPY_H
+
+// -----------------------------------------------------------------------------
+#include "compressor/CompressionPlugin.h"
+#include "SnappyCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginSnappy : public ceph::CompressionPlugin {
+
+public:
+
+ explicit CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs,
+ std::ostream *ss) override
+ {
+ if (compressor == 0) {
+ SnappyCompressor *interface = new SnappyCompressor(cct);
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h
new file mode 100644
index 000000000..93206c400
--- /dev/null
+++ b/src/compressor/snappy/SnappyCompressor.h
@@ -0,0 +1,116 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SNAPPYCOMPRESSOR_H
+#define CEPH_SNAPPYCOMPRESSOR_H
+
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include "common/config.h"
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+
+class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
+ ceph::bufferlist::const_iterator pb;
+ size_t remaining;
+
+ public:
+ explicit BufferlistSource(ceph::bufferlist::const_iterator _pb, size_t _input_len)
+ : pb(_pb),
+ remaining(_input_len) {
+ remaining = std::min(remaining, (size_t)pb.get_remaining());
+ }
+ size_t Available() const override {
+ return remaining;
+ }
+ const char *Peek(size_t *len) override {
+ const char *data = NULL;
+ *len = 0;
+ size_t avail = Available();
+ if (avail) {
+ auto ptmp = pb;
+ *len = ptmp.get_ptr_and_advance(avail, &data);
+ }
+ return data;
+ }
+ void Skip(size_t n) override {
+ ceph_assert(n <= remaining);
+ pb += n;
+ remaining -= n;
+ }
+
+ ceph::bufferlist::const_iterator get_pos() const {
+ return pb;
+ }
+};
+
+class SnappyCompressor : public Compressor {
+ public:
+ SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
+
+ int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, boost::optional<int32_t> &compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(src, dst, compressor_message);
+#endif
+ BufferlistSource source(const_cast<ceph::bufferlist&>(src).begin(), src.length());
+ ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned(
+ snappy::MaxCompressedLength(src.length()));
+ snappy::UncheckedByteArraySink sink(ptr.c_str());
+ snappy::Compress(&source, &sink);
+ dst.append(ptr, 0, sink.CurrentDestination() - ptr.c_str());
+ return 0;
+ }
+
+ int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, boost::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(src, dst, compressor_message);
+#endif
+ auto i = src.begin();
+ return decompress(i, src.length(), dst, compressor_message);
+ }
+
+ int decompress(ceph::bufferlist::const_iterator &p,
+ size_t compressed_len,
+ ceph::bufferlist &dst,
+ boost::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_len, dst, compressor_message);
+#endif
+ BufferlistSource source_1(p, compressed_len);
+ uint32_t res_len = 0;
+ if (!snappy::GetUncompressedLength(&source_1, &res_len)) {
+ return -1;
+ }
+ BufferlistSource source_2(p, compressed_len);
+ ceph::bufferptr ptr(res_len);
+ if (snappy::RawUncompress(&source_2, ptr.c_str())) {
+ p = source_2.get_pos();
+ dst.append(ptr);
+ return 0;
+ }
+ return -2;
+ }
+};
+
+#endif
diff --git a/src/compressor/zlib/CMakeLists.txt b/src/compressor/zlib/CMakeLists.txt
new file mode 100644
index 000000000..ade57cb08
--- /dev/null
+++ b/src/compressor/zlib/CMakeLists.txt
@@ -0,0 +1,59 @@
+# zlib
+
+if(HAVE_INTEL_SSE4_1 AND HAVE_NASM_X64_AVX2 AND (NOT APPLE))
+ set(CMAKE_ASM_FLAGS "-i ${PROJECT_SOURCE_DIR}/src/isa-l/igzip/ -i ${PROJECT_SOURCE_DIR}/src/isa-l/include/ ${CMAKE_ASM_FLAGS}")
+ set(zlib_sources
+ CompressionPluginZlib.cc
+ ZlibCompressor.cc
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/hufftables_c.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/flatten_ll.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/huff_codes.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base_aliases.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc64_base.c
+ )
+ list(APPEND zlib_sources
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_body.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_finish.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body_h1_gr_bt.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_finish.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/rfc1951_lookup.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_sse.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_avx2_4.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_multibinary.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_01.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_decode_block_stateless_01.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_decode_block_stateless_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate_multibinary.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_06.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/proc_heap.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_deflate_hash.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_gen_icf_map_lh1_06.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_gen_icf_map_lh1_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_set_long_icf_fg_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_set_long_icf_fg_06.asm
+ )
+else(HAVE_INTEL_SSE4_1 AND HAVE_NASM_X64_AVX2 AND (NOT APPLE))
+ set(zlib_sources
+ CompressionPluginZlib.cc
+ ZlibCompressor.cc
+ )
+endif(HAVE_INTEL_SSE4_1 AND HAVE_NASM_X64_AVX2 AND (NOT APPLE))
+
+add_library(ceph_zlib SHARED ${zlib_sources})
+target_link_libraries(ceph_zlib ZLIB::ZLIB compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include")
+set_target_properties(ceph_zlib PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_zlib DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/zlib/CompressionPluginZlib.cc b/src/compressor/zlib/CompressionPluginZlib.cc
new file mode 100644
index 000000000..c0a53fa77
--- /dev/null
+++ b/src/compressor/zlib/CompressionPluginZlib.cc
@@ -0,0 +1,38 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+
+// -----------------------------------------------------------------------------
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginZlib.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginZlib(cct));
+}
diff --git a/src/compressor/zlib/CompressionPluginZlib.h b/src/compressor/zlib/CompressionPluginZlib.h
new file mode 100644
index 000000000..fbdb635c6
--- /dev/null
+++ b/src/compressor/zlib/CompressionPluginZlib.h
@@ -0,0 +1,55 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_ZLIB_H
+#define CEPH_COMPRESSION_PLUGIN_ZLIB_H
+
+// -----------------------------------------------------------------------------
+#include "arch/probe.h"
+#include "arch/intel.h"
+#include "arch/arm.h"
+#include "common/ceph_context.h"
+#include "compressor/CompressionPlugin.h"
+#include "ZlibCompressor.h"
+
+// -----------------------------------------------------------------------------
+
+class CompressionPluginZlib : public ceph::CompressionPlugin {
+public:
+ bool has_isal = false;
+
+ explicit CompressionPluginZlib(CephContext *cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs,
+ std::ostream *ss) override
+ {
+ bool isal = false;
+#if defined(__i386__) || defined(__x86_64__)
+ // other arches or lack of support result in isal = false
+ if (cct->_conf->compressor_zlib_isal) {
+ ceph_arch_probe();
+ isal = (ceph_arch_intel_pclmul && ceph_arch_intel_sse41);
+ }
+#endif
+ if (compressor == 0 || has_isal != isal) {
+ compressor = std::make_shared<ZlibCompressor>(cct, isal);
+ has_isal = isal;
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc
new file mode 100644
index 000000000..27b43c49c
--- /dev/null
+++ b/src/compressor/zlib/ZlibCompressor.cc
@@ -0,0 +1,255 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+// -----------------------------------------------------------------------------
+#include "common/debug.h"
+#include "ZlibCompressor.h"
+#include "osd/osd_types.h"
+#include "isa-l/include/igzip_lib.h"
+// -----------------------------------------------------------------------------
+
+#include <zlib.h>
+
+// -----------------------------------------------------------------------------
+#define dout_context cct
+#define dout_subsys ceph_subsys_compressor
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+// -----------------------------------------------------------------------------
+
+// -----------------------------------------------------------------------------
+
+using std::ostream;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+
+static ostream&
+_prefix(std::ostream* _dout)
+{
+ return *_dout << "ZlibCompressor: ";
+}
+// -----------------------------------------------------------------------------
+
+#define MAX_LEN (CEPH_PAGE_SIZE)
+
+// default window size for Zlib 1.2.8, negated for raw deflate
+#define ZLIB_DEFAULT_WIN_SIZE -15
+
+// desired memory usage level. increasing to 9 doesn't speed things up
+// significantly (helps only on >=16K blocks) and sometimes degrades
+// compression ratio.
+#define ZLIB_MEMORY_LEVEL 8
+
+int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
+{
+ int ret;
+ unsigned have;
+ z_stream strm;
+ unsigned char* c_in;
+ int begin = 1;
+
+ /* allocate deflate state */
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, cct->_conf->compressor_zlib_winsize, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY);
+ if (ret != Z_OK) {
+ dout(1) << "Compression init error: init return "
+ << ret << " instead of Z_OK" << dendl;
+ return -1;
+ }
+ compressor_message = cct->_conf->compressor_zlib_winsize;
+
+ for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
+ i != in.buffers().end();) {
+
+ c_in = (unsigned char*) (*i).c_str();
+ long unsigned int len = (*i).length();
+ ++i;
+
+ strm.avail_in = len;
+ int flush = i != in.buffers().end() ? Z_NO_FLUSH : Z_FINISH;
+
+ strm.next_in = c_in;
+ do {
+ bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN);
+ strm.next_out = (unsigned char*)ptr.c_str() + begin;
+ strm.avail_out = MAX_LEN - begin;
+ if (begin) {
+ // put a compressor variation mark in front of compressed stream, not used at the moment
+ ptr.c_str()[0] = 0;
+ begin = 0;
+ }
+ ret = deflate(&strm, flush); /* no bad return value */
+ if (ret == Z_STREAM_ERROR) {
+ dout(1) << "Compression error: compress return Z_STREAM_ERROR("
+ << ret << ")" << dendl;
+ deflateEnd(&strm);
+ return -1;
+ }
+ have = MAX_LEN - strm.avail_out;
+ out.append(ptr, 0, have);
+ } while (strm.avail_out == 0);
+ if (strm.avail_in != 0) {
+ dout(10) << "Compression error: unused input" << dendl;
+ deflateEnd(&strm);
+ return -1;
+ }
+ }
+
+ deflateEnd(&strm);
+ return 0;
+}
+
+#if __x86_64__ && defined(HAVE_NASM_X64_AVX2)
+int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
+{
+ int ret;
+ unsigned have;
+ isal_zstream strm;
+ unsigned char* c_in;
+ int begin = 1;
+
+ /* allocate deflate state */
+ isal_deflate_init(&strm);
+ strm.end_of_stream = 0;
+ compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+
+ for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
+ i != in.buffers().end();) {
+
+ c_in = (unsigned char*) (*i).c_str();
+ long unsigned int len = (*i).length();
+ ++i;
+
+ strm.avail_in = len;
+ strm.end_of_stream = (i == in.buffers().end());
+ strm.flush = FINISH_FLUSH;
+
+ strm.next_in = c_in;
+
+ do {
+ bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN);
+ strm.next_out = (unsigned char*)ptr.c_str() + begin;
+ strm.avail_out = MAX_LEN - begin;
+ if (begin) {
+ // put a compressor variation mark in front of compressed stream, not used at the moment
+ ptr.c_str()[0] = 1;
+ begin = 0;
+ }
+ ret = isal_deflate(&strm);
+ if (ret != COMP_OK) {
+ dout(1) << "Compression error: isal_deflate return error ("
+ << ret << ")" << dendl;
+ return -1;
+ }
+ have = MAX_LEN - strm.avail_out;
+ out.append(ptr, 0, have);
+ } while (strm.avail_out == 0);
+ if (strm.avail_in != 0) {
+ dout(10) << "Compression error: unused input" << dendl;
+ return -1;
+ }
+ }
+
+ return 0;
+}
+#endif
+
+int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
+{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(in, out, compressor_message);
+#endif
+#if __x86_64__ && defined(HAVE_NASM_X64_AVX2)
+ if (isal_enabled)
+ return isal_compress(in, out, compressor_message);
+ else
+ return zlib_compress(in, out, compressor_message);
+#else
+ return zlib_compress(in, out, compressor_message);
+#endif
+}
+
+int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, boost::optional<int32_t> compressor_message)
+{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_size, out, compressor_message);
+#endif
+
+ int ret;
+ unsigned have;
+ z_stream strm;
+ const char* c_in;
+ int begin = 1;
+
+ /* allocate inflate state */
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ strm.avail_in = 0;
+ strm.next_in = Z_NULL;
+
+ // choose the variation of compressor
+ if (!compressor_message)
+ compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+ ret = inflateInit2(&strm, *compressor_message);
+ if (ret != Z_OK) {
+ dout(1) << "Decompression init error: init return "
+ << ret << " instead of Z_OK" << dendl;
+ return -1;
+ }
+
+ size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size);
+
+ while(remaining) {
+ long unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+ remaining -= len;
+ strm.avail_in = len - begin;
+ strm.next_in = (unsigned char*)c_in + begin;
+ begin = 0;
+
+ do {
+ strm.avail_out = MAX_LEN;
+ bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN);
+ strm.next_out = (unsigned char*)ptr.c_str();
+ ret = inflate(&strm, Z_NO_FLUSH);
+ if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) {
+ dout(1) << "Decompression error: decompress return "
+ << ret << dendl;
+ inflateEnd(&strm);
+ return -1;
+ }
+ have = MAX_LEN - strm.avail_out;
+ out.append(ptr, 0, have);
+ } while (strm.avail_out == 0);
+ }
+
+ /* clean up and return */
+ (void)inflateEnd(&strm);
+ return 0;
+}
+
+int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message)
+{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(in, out, compressor_message);
+#endif
+ auto i = std::cbegin(in);
+ return decompress(i, in.length(), out, compressor_message);
+}
diff --git a/src/compressor/zlib/ZlibCompressor.h b/src/compressor/zlib/ZlibCompressor.h
new file mode 100644
index 000000000..b8e984110
--- /dev/null
+++ b/src/compressor/zlib/ZlibCompressor.h
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_ZLIB_H
+#define CEPH_COMPRESSION_ZLIB_H
+
+#include "common/config.h"
+#include "compressor/Compressor.h"
+
+class ZlibCompressor : public Compressor {
+ bool isal_enabled;
+ CephContext *const cct;
+public:
+ ZlibCompressor(CephContext *cct, bool isal)
+ : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
+
+ int compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message) override;
+ int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> compressor_message) override;
+ int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out, boost::optional<int32_t> compressor_message) override;
+private:
+ int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message);
+ int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message);
+ };
+
+
+#endif
diff --git a/src/compressor/zstd/CMakeLists.txt b/src/compressor/zstd/CMakeLists.txt
new file mode 100644
index 000000000..b53b0d944
--- /dev/null
+++ b/src/compressor/zstd/CMakeLists.txt
@@ -0,0 +1,36 @@
+# zstd
+
+# libzstd - build it statically
+set(ZSTD_C_FLAGS "-fPIC -Wno-unused-variable -O3")
+
+include(ExternalProject)
+ExternalProject_Add(zstd_ext
+ SOURCE_DIR ${CMAKE_SOURCE_DIR}/src/zstd/build/cmake
+ CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
+ -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
+ -DCMAKE_C_FLAGS=${ZSTD_C_FLAGS}
+ -DCMAKE_AR=${CMAKE_AR}
+ -DCMAKE_POSITION_INDEPENDENT_CODE=${ENABLE_SHARED}
+ -G${CMAKE_GENERATOR}
+ BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/libzstd
+ BUILD_COMMAND ${CMAKE_COMMAND} --build <BINARY_DIR> --target libzstd_static
+ BUILD_BYPRODUCTS "${CMAKE_CURRENT_BINARY_DIR}/libzstd/lib/libzstd.a"
+ INSTALL_COMMAND "true")
+
+add_library(zstd STATIC IMPORTED)
+set_target_properties(zstd PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/src/zstd/lib"
+ IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/libzstd/lib/libzstd.a")
+add_dependencies(zstd zstd_ext)
+
+set(zstd_sources
+ CompressionPluginZstd.cc
+)
+
+add_library(ceph_zstd SHARED ${zstd_sources})
+target_link_libraries(ceph_zstd PRIVATE zstd $<$<PLATFORM_ID:Windows>:ceph-common>)
+set_target_properties(ceph_zstd PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_zstd DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/zstd/CompressionPluginZstd.cc b/src/compressor/zstd/CompressionPluginZstd.cc
new file mode 100644
index 000000000..92aee7343
--- /dev/null
+++ b/src/compressor/zstd/CompressionPluginZstd.cc
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginZstd.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginZstd(cct));
+}
diff --git a/src/compressor/zstd/CompressionPluginZstd.h b/src/compressor/zstd/CompressionPluginZstd.h
new file mode 100644
index 000000000..632e7c6dc
--- /dev/null
+++ b/src/compressor/zstd/CompressionPluginZstd.h
@@ -0,0 +1,43 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_ZSTD_H
+#define CEPH_COMPRESSION_PLUGIN_ZSTD_H
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "ZstdCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginZstd : public ceph::CompressionPlugin {
+
+public:
+
+ explicit CompressionPluginZstd(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs,
+ std::ostream *ss) override
+ {
+ if (compressor == 0) {
+ ZstdCompressor *interface = new ZstdCompressor(cct);
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/zstd/ZstdCompressor.h b/src/compressor/zstd/ZstdCompressor.h
new file mode 100644
index 000000000..95b492deb
--- /dev/null
+++ b/src/compressor/zstd/ZstdCompressor.h
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ZSTDCOMPRESSOR_H
+#define CEPH_ZSTDCOMPRESSOR_H
+
+#define ZSTD_STATIC_LINKING_ONLY
+#include "zstd/lib/zstd.h"
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "compressor/Compressor.h"
+
+class ZstdCompressor : public Compressor {
+ public:
+ ZstdCompressor(CephContext *cct) : Compressor(COMP_ALG_ZSTD, "zstd"), cct(cct) {}
+
+ int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> &compressor_message) override {
+ ZSTD_CStream *s = ZSTD_createCStream();
+ ZSTD_initCStream_srcSize(s, cct->_conf->compressor_zstd_level, src.length());
+ auto p = src.begin();
+ size_t left = src.length();
+
+ size_t const out_max = ZSTD_compressBound(left);
+ ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(out_max);
+ ZSTD_outBuffer_s outbuf;
+ outbuf.dst = outptr.c_str();
+ outbuf.size = outptr.length();
+ outbuf.pos = 0;
+
+ while (left) {
+ ceph_assert(!p.end());
+ struct ZSTD_inBuffer_s inbuf;
+ inbuf.pos = 0;
+ inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src);
+ left -= inbuf.size;
+ ZSTD_EndDirective const zed = (left==0) ? ZSTD_e_end : ZSTD_e_continue;
+ size_t r = ZSTD_compressStream2(s, &outbuf, &inbuf, zed);
+ if (ZSTD_isError(r)) {
+ return -EINVAL;
+ }
+ }
+ ceph_assert(p.end());
+
+ ZSTD_freeCStream(s);
+
+ // prefix with decompressed length
+ ceph::encode((uint32_t)src.length(), dst);
+ dst.append(outptr, 0, outbuf.pos);
+ return 0;
+ }
+
+ int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> compressor_message) override {
+ auto i = std::cbegin(src);
+ return decompress(i, src.length(), dst, compressor_message);
+ }
+
+ int decompress(ceph::buffer::list::const_iterator &p,
+ size_t compressed_len,
+ ceph::buffer::list &dst,
+ boost::optional<int32_t> compressor_message) override {
+ if (compressed_len < 4) {
+ return -1;
+ }
+ compressed_len -= 4;
+ uint32_t dst_len;
+ ceph::decode(dst_len, p);
+
+ ceph::buffer::ptr dstptr(dst_len);
+ ZSTD_outBuffer_s outbuf;
+ outbuf.dst = dstptr.c_str();
+ outbuf.size = dstptr.length();
+ outbuf.pos = 0;
+ ZSTD_DStream *s = ZSTD_createDStream();
+ ZSTD_initDStream(s);
+ while (compressed_len > 0) {
+ if (p.end()) {
+ return -1;
+ }
+ ZSTD_inBuffer_s inbuf;
+ inbuf.pos = 0;
+ inbuf.size = p.get_ptr_and_advance(compressed_len,
+ (const char**)&inbuf.src);
+ ZSTD_decompressStream(s, &outbuf, &inbuf);
+ compressed_len -= inbuf.size;
+ }
+ ZSTD_freeDStream(s);
+
+ dst.append(dstptr, 0, outbuf.pos);
+ return 0;
+ }
+ private:
+ CephContext *const cct;
+};
+
+#endif