summaryrefslogtreecommitdiffstats
path: root/src/compressor
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/compressor
parentInitial commit. (diff)
downloadceph-upstream/18.2.2.tar.xz
ceph-upstream/18.2.2.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/compressor')
-rw-r--r--src/compressor/CMakeLists.txt51
-rw-r--r--src/compressor/CompressionPlugin.h48
-rw-r--r--src/compressor/Compressor.cc112
-rw-r--r--src/compressor/Compressor.h109
-rw-r--r--src/compressor/QatAccel.cc240
-rw-r--r--src/compressor/QatAccel.h54
-rw-r--r--src/compressor/brotli/BrotliCompressor.cc96
-rw-r--r--src/compressor/brotli/BrotliCompressor.h31
-rw-r--r--src/compressor/brotli/CMakeLists.txt34
-rw-r--r--src/compressor/brotli/CompressionPluginBrotli.cc19
-rw-r--r--src/compressor/brotli/CompressionPluginBrotli.h36
-rw-r--r--src/compressor/lz4/CMakeLists.txt14
-rw-r--r--src/compressor/lz4/CompressionPluginLZ4.cc36
-rw-r--r--src/compressor/lz4/CompressionPluginLZ4.h41
-rw-r--r--src/compressor/lz4/LZ4Compressor.h147
-rw-r--r--src/compressor/snappy/CMakeLists.txt14
-rw-r--r--src/compressor/snappy/CompressionPluginSnappy.cc38
-rw-r--r--src/compressor/snappy/CompressionPluginSnappy.h42
-rw-r--r--src/compressor/snappy/SnappyCompressor.h116
-rw-r--r--src/compressor/zlib/CMakeLists.txt99
-rw-r--r--src/compressor/zlib/CompressionPluginZlib.cc38
-rw-r--r--src/compressor/zlib/CompressionPluginZlib.h60
-rw-r--r--src/compressor/zlib/ZlibCompressor.cc252
-rw-r--r--src/compressor/zlib/ZlibCompressor.h46
-rw-r--r--src/compressor/zstd/CMakeLists.txt21
-rw-r--r--src/compressor/zstd/CompressionPluginZstd.cc36
-rw-r--r--src/compressor/zstd/CompressionPluginZstd.h43
-rw-r--r--src/compressor/zstd/ZstdCompressor.h107
28 files changed, 1980 insertions, 0 deletions
diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt
new file mode 100644
index 000000000..3e99f8b73
--- /dev/null
+++ b/src/compressor/CMakeLists.txt
@@ -0,0 +1,51 @@
+
+set(compressor_srcs
+ Compressor.cc)
+if (HAVE_QATZIP)
+ list(APPEND compressor_srcs QatAccel.cc)
+endif()
+add_library(compressor_objs OBJECT ${compressor_srcs})
+add_dependencies(compressor_objs common-objs)
+if(HAVE_QATZIP AND HAVE_QATDRV)
+ target_link_libraries(compressor_objs PRIVATE
+ QatDrv::qat_s
+ QatDrv::usdm_drv_s
+ qatzip::qatzip
+ )
+endif()
+add_dependencies(compressor_objs legacy-option-headers)
+
+## compressor plugins
+
+set(compressor_plugin_dir ${CEPH_INSTALL_PKGLIBDIR}/compressor)
+
+add_subdirectory(snappy)
+add_subdirectory(zlib)
+add_subdirectory(zstd)
+
+if(HAVE_LZ4)
+ add_subdirectory(lz4)
+endif()
+
+if(HAVE_BROTLI)
+ add_subdirectory(brotli)
+endif()
+
+add_library(compressor STATIC $<TARGET_OBJECTS:compressor_objs>)
+target_link_libraries(compressor PRIVATE compressor_objs)
+
+set(ceph_compressor_libs
+ ceph_snappy
+ ceph_zlib
+ ceph_zstd)
+
+if(HAVE_LZ4)
+ list(APPEND ceph_compressor_libs ceph_lz4)
+endif()
+
+if(HAVE_BROTLI)
+ list(APPEND ceph_compressor_libs ceph_brotli)
+endif()
+
+add_custom_target(compressor_plugins DEPENDS
+ ${ceph_compressor_libs})
diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h
new file mode 100644
index 000000000..2a21f2fef
--- /dev/null
+++ b/src/compressor/CompressionPlugin.h
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph distributed storage system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef COMPRESSION_PLUGIN_H
+#define COMPRESSION_PLUGIN_H
+
+#include <iosfwd>
+#include <iostream>
+
+#include "common/PluginRegistry.h"
+#include "include/common_fwd.h"
+#include "Compressor.h"
+
+namespace ceph {
+
+ class CompressionPlugin : public Plugin {
+ public:
+ TOPNSPC::CompressorRef compressor;
+
+ explicit CompressionPlugin(CephContext *cct)
+ : Plugin(cct)
+ {}
+
+ ~CompressionPlugin() override {}
+
+ virtual int factory(TOPNSPC::CompressorRef *cs,
+ std::ostream *ss) = 0;
+
+ virtual const char* name() {return "CompressionPlugin";}
+ };
+
+}
+
+#endif
diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc
new file mode 100644
index 000000000..43d34c8eb
--- /dev/null
+++ b/src/compressor/Compressor.cc
@@ -0,0 +1,112 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <random>
+#include <sstream>
+#include <iterator>
+#include <algorithm>
+
+#include "CompressionPlugin.h"
+#include "Compressor.h"
+#include "include/random.h"
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/dout.h"
+
+namespace TOPNSPC {
+
+#ifdef HAVE_QATZIP
+ QatAccel Compressor::qat_accel;
+#endif
+
+const char* Compressor::get_comp_alg_name(int a) {
+
+ auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),
+ [a](const auto& kv) { return kv.second == a; });
+
+ if (std::cend(compression_algorithms) == p)
+ return "???"; // It would be nice to revise this...
+
+ return p->first;
+}
+
+std::optional<Compressor::CompressionAlgorithm>
+Compressor::get_comp_alg_type(std::string_view s) {
+
+ auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),
+ [&s](const auto& kv) { return kv.first == s; });
+ if (std::cend(compression_algorithms) == p)
+ return {};
+
+ return p->second;
+}
+
+const char *Compressor::get_comp_mode_name(int m) {
+ switch (m) {
+ case COMP_NONE: return "none";
+ case COMP_PASSIVE: return "passive";
+ case COMP_AGGRESSIVE: return "aggressive";
+ case COMP_FORCE: return "force";
+ default: return "???";
+ }
+}
+std::optional<Compressor::CompressionMode>
+Compressor::get_comp_mode_type(std::string_view s) {
+ if (s == "force")
+ return COMP_FORCE;
+ if (s == "aggressive")
+ return COMP_AGGRESSIVE;
+ if (s == "passive")
+ return COMP_PASSIVE;
+ if (s == "none")
+ return COMP_NONE;
+ return {};
+}
+
+CompressorRef Compressor::create(CephContext *cct, const std::string &type)
+{
+ // support "random" for teuthology testing
+ if (type == "random") {
+ int alg = ceph::util::generate_random_number(0, COMP_ALG_LAST - 1);
+ if (alg == COMP_ALG_NONE) {
+ return nullptr;
+ }
+ return create(cct, alg);
+ }
+
+ CompressorRef cs_impl = NULL;
+ std::stringstream ss;
+ auto reg = cct->get_plugin_registry();
+ auto factory = dynamic_cast<ceph::CompressionPlugin*>(reg->get_with_load("compressor", type));
+ if (factory == NULL) {
+ lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl;
+ return NULL;
+ }
+ int err = factory->factory(&cs_impl, &ss);
+ if (err)
+ lderr(cct) << __func__ << " factory return error " << err << dendl;
+ return cs_impl;
+}
+
+CompressorRef Compressor::create(CephContext *cct, int alg)
+{
+ if (alg < 0 || alg >= COMP_ALG_LAST) {
+ lderr(cct) << __func__ << " invalid algorithm value:" << alg << dendl;
+ return CompressorRef();
+ }
+ std::string type_name = get_comp_alg_name(alg);
+ return create(cct, type_name);
+}
+
+} // namespace TOPNSPC
diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h
new file mode 100644
index 000000000..276cd875a
--- /dev/null
+++ b/src/compressor/Compressor.h
@@ -0,0 +1,109 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMPRESSOR_H
+#define CEPH_COMPRESSOR_H
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include "include/ceph_assert.h" // boost clobbers this
+#include "include/common_fwd.h"
+#include "include/buffer.h"
+#include "include/int_types.h"
+#ifdef HAVE_QATZIP
+ #include "QatAccel.h"
+#endif
+
+namespace TOPNSPC {
+
+class Compressor;
+typedef std::shared_ptr<Compressor> CompressorRef;
+
+class Compressor {
+public:
+ enum CompressionAlgorithm {
+ COMP_ALG_NONE = 0,
+ COMP_ALG_SNAPPY = 1,
+ COMP_ALG_ZLIB = 2,
+ COMP_ALG_ZSTD = 3,
+#ifdef HAVE_LZ4
+ COMP_ALG_LZ4 = 4,
+#endif
+#ifdef HAVE_BROTLI
+ COMP_ALG_BROTLI = 5,
+#endif
+ COMP_ALG_LAST //the last value for range checks
+ };
+
+ using pair_type = std::pair<const char*, CompressionAlgorithm>;
+ static constexpr std::initializer_list<pair_type> compression_algorithms {
+ { "none", COMP_ALG_NONE },
+ { "snappy", COMP_ALG_SNAPPY },
+ { "zlib", COMP_ALG_ZLIB },
+ { "zstd", COMP_ALG_ZSTD },
+#ifdef HAVE_LZ4
+ { "lz4", COMP_ALG_LZ4 },
+#endif
+#ifdef HAVE_BROTLI
+ { "brotli", COMP_ALG_BROTLI },
+#endif
+ };
+
+ // compression options
+ enum CompressionMode {
+ COMP_NONE, ///< compress never
+ COMP_PASSIVE, ///< compress if hinted COMPRESSIBLE
+ COMP_AGGRESSIVE, ///< compress unless hinted INCOMPRESSIBLE
+ COMP_FORCE ///< compress always
+ };
+
+#ifdef HAVE_QATZIP
+ bool qat_enabled;
+ static QatAccel qat_accel;
+#endif
+
+ static const char* get_comp_alg_name(int a);
+ static std::optional<CompressionAlgorithm> get_comp_alg_type(std::string_view s);
+
+ static const char *get_comp_mode_name(int m);
+ static std::optional<CompressionMode> get_comp_mode_type(std::string_view s);
+
+ Compressor(CompressionAlgorithm a, const char* t) : alg(a), type(t) {
+ }
+ virtual ~Compressor() {}
+ const std::string& get_type_name() const {
+ return type;
+ }
+ CompressionAlgorithm get_type() const {
+ return alg;
+ }
+ virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out, std::optional<int32_t> &compressor_message) = 0;
+ virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out, std::optional<int32_t> compressor_message) = 0;
+ // this is a bit weird but we need non-const iterator to be in
+ // alignment with decode methods
+ virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out, std::optional<int32_t> compressor_message) = 0;
+
+ static CompressorRef create(CephContext *cct, const std::string &type);
+ static CompressorRef create(CephContext *cct, int alg);
+
+protected:
+ CompressionAlgorithm alg;
+ std::string type;
+
+};
+
+} // namespace TOPNSPC
+#endif
diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc
new file mode 100644
index 000000000..561fcc996
--- /dev/null
+++ b/src/compressor/QatAccel.cc
@@ -0,0 +1,240 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include <qatzip.h>
+
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "QatAccel.h"
+
+// -----------------------------------------------------------------------------
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_compressor
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static std::ostream& _prefix(std::ostream* _dout)
+{
+ return *_dout << "QatAccel: ";
+}
+// -----------------------------------------------------------------------------
+// default window size for Zlib 1.2.8, negated for raw deflate
+#define ZLIB_DEFAULT_WIN_SIZE -15
+
+/* Estimate data expansion after decompression */
+static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
+
+void QzSessionDeleter::operator() (struct QzSession_S *session) {
+ qzTeardownSession(session);
+ delete session;
+}
+
+static bool get_qz_params(const std::string &alg, QzSessionParams_T &params) {
+ int rc;
+ rc = qzGetDefaults(&params);
+ if (rc != QZ_OK)
+ return false;
+ params.direction = QZ_DIR_BOTH;
+ params.is_busy_polling = true;
+ if (alg == "zlib") {
+ params.comp_algorithm = QZ_DEFLATE;
+ params.data_fmt = QZ_DEFLATE_RAW;
+ params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
+ }
+ else {
+ // later, there also has lz4.
+ return false;
+ }
+
+ rc = qzSetDefaults(&params);
+ if (rc != QZ_OK)
+ return false;
+ return true;
+}
+
+static bool setup_session(QatAccel::session_ptr &session, QzSessionParams_T &params) {
+ int rc;
+ rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE)
+ return false;
+ rc = qzSetupSession(session.get(), &params);
+ if (rc != QZ_OK) {
+ return false;
+ }
+ return true;
+}
+
+// put the session back to the session pool in a RAII manner
+struct cached_session_t {
+ cached_session_t(QatAccel* accel, QatAccel::session_ptr&& sess)
+ : accel{accel}, session{std::move(sess)} {}
+
+ ~cached_session_t() {
+ std::scoped_lock lock{accel->mutex};
+ // if the cache size is still under its upper bound, the current session is put into
+ // accel->sessions. otherwise it's released right
+ uint64_t sessions_num = g_ceph_context->_conf.get_val<uint64_t>("qat_compressor_session_max_number");
+ if (accel->sessions.size() < sessions_num) {
+ accel->sessions.push_back(std::move(session));
+ }
+ }
+
+ struct QzSession_S* get() {
+ assert(static_cast<bool>(session));
+ return session.get();
+ }
+
+ QatAccel* accel;
+ QatAccel::session_ptr session;
+};
+
+QatAccel::session_ptr QatAccel::get_session() {
+ {
+ std::scoped_lock lock{mutex};
+ if (!sessions.empty()) {
+ auto session = std::move(sessions.back());
+ sessions.pop_back();
+ return session;
+ }
+ }
+
+ // If there are no available session to use, we try allocate a new
+ // session.
+ QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
+ session_ptr session(new struct QzSession_S());
+ memset(session.get(), 0, sizeof(struct QzSession_S));
+ if (get_qz_params(alg_name, params) && setup_session(session, params)) {
+ return session;
+ } else {
+ return nullptr;
+ }
+}
+
+QatAccel::QatAccel() {}
+
+QatAccel::~QatAccel() {
+ // First, we should uninitialize all QATzip session that disconnects all session
+ // from a hardware instance and deallocates buffers.
+ sessions.clear();
+ // Then we close the connection with QAT.
+ // where the value of the parameter passed to qzClose() does not matter. as long as
+ // it is not nullptr.
+ qzClose((QzSession_T*)1);
+}
+
+bool QatAccel::init(const std::string &alg) {
+ std::scoped_lock lock(mutex);
+ if (!alg_name.empty()) {
+ return true;
+ }
+
+ dout(15) << "First use for QAT compressor" << dendl;
+ if (alg != "zlib") {
+ return false;
+ }
+
+ alg_name = alg;
+ return true;
+}
+
+int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message) {
+ auto s = get_session(); // get a session from the pool
+ if (!s) {
+ return -1; // session initialization failed
+ }
+ auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
+ compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+ int begin = 1;
+ for (auto &i : in.buffers()) {
+ const unsigned char* c_in = (unsigned char*) i.c_str();
+ unsigned int len = i.length();
+ unsigned int out_len = qzMaxCompressedLength(len, session.get()) + begin;
+
+ bufferptr ptr = buffer::create_small_page_aligned(out_len);
+ unsigned char* c_out = (unsigned char*)ptr.c_str() + begin;
+ int rc = qzCompress(session.get(), c_in, &len, c_out, &out_len, 1);
+ if (rc != QZ_OK)
+ return -1;
+ if (begin) {
+ // put a compressor variation mark in front of compressed stream, not used at the moment
+ ptr.c_str()[0] = 0;
+ out_len += begin;
+ begin = 0;
+ }
+ out.append(ptr, 0, out_len);
+
+ }
+
+ return 0;
+}
+
+int QatAccel::decompress(const bufferlist &in, bufferlist &out, std::optional<int32_t> compressor_message) {
+ auto i = in.begin();
+ return decompress(i, in.length(), out, compressor_message);
+}
+
+int QatAccel::decompress(bufferlist::const_iterator &p,
+ size_t compressed_len,
+ bufferlist &dst,
+ std::optional<int32_t> compressor_message) {
+ auto s = get_session(); // get a session from the pool
+ if (!s) {
+ return -1; // session initialization failed
+ }
+ auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
+ int begin = 1;
+
+ int rc = 0;
+ bufferlist tmp;
+ size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
+
+ while (remaining) {
+ unsigned int ratio_idx = 0;
+ const char* c_in = nullptr;
+ unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+ remaining -= len;
+ len -= begin;
+ c_in += begin;
+ begin = 0;
+ unsigned int out_len = QZ_HW_BUFF_SZ;
+
+ bufferptr ptr;
+ do {
+ while (out_len <= len * expansion_ratio[ratio_idx]) {
+ out_len *= 2;
+ }
+
+ ptr = buffer::create_small_page_aligned(out_len);
+ rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
+ ratio_idx++;
+ } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
+
+ if (rc == QZ_OK) {
+ dst.append(ptr, 0, out_len);
+ } else if (rc == QZ_DATA_ERROR) {
+ dout(1) << "QAT compressor DATA ERROR" << dendl;
+ return -1;
+ } else if (rc == QZ_BUF_ERROR) {
+ dout(1) << "QAT compressor BUF ERROR" << dendl;
+ return -1;
+ } else if (rc != QZ_OK) {
+ dout(1) << "QAT compressor NOT OK" << dendl;
+ return -1;
+ }
+ }
+
+ return 0;
+}
diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h
new file mode 100644
index 000000000..3533eff9b
--- /dev/null
+++ b/src/compressor/QatAccel.h
@@ -0,0 +1,54 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_QATACCEL_H
+#define CEPH_QATACCEL_H
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <vector>
+
+#include "include/buffer.h"
+
+extern "C" struct QzSession_S; // typedef struct QzSession_S QzSession_T;
+
+struct QzSessionDeleter {
+ void operator() (struct QzSession_S *session);
+};
+
+class QatAccel {
+ public:
+ using session_ptr = std::unique_ptr<struct QzSession_S, QzSessionDeleter>;
+ QatAccel();
+ ~QatAccel();
+
+ bool init(const std::string &alg);
+
+ int compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message);
+ int decompress(const bufferlist &in, bufferlist &out, std::optional<int32_t> compressor_message);
+ int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, std::optional<int32_t> compressor_message);
+
+ private:
+ // get a session from the pool or create a new one. returns null if session init fails
+ session_ptr get_session();
+
+ friend struct cached_session_t;
+ std::vector<session_ptr> sessions;
+ std::mutex mutex;
+ std::string alg_name;
+};
+
+#endif
diff --git a/src/compressor/brotli/BrotliCompressor.cc b/src/compressor/brotli/BrotliCompressor.cc
new file mode 100644
index 000000000..ed4abef4b
--- /dev/null
+++ b/src/compressor/brotli/BrotliCompressor.cc
@@ -0,0 +1,96 @@
+#include "brotli/encode.h"
+#include "brotli/decode.h"
+#include "BrotliCompressor.h"
+#include "include/scope_guard.h"
+
+#define MAX_LEN (CEPH_PAGE_SIZE)
+
+int BrotliCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
+{
+ BrotliEncoderState* s = BrotliEncoderCreateInstance(nullptr,
+ nullptr,
+ nullptr);
+ if (!s) {
+ return -1;
+ }
+ auto sg = make_scope_guard([&s] { BrotliEncoderDestroyInstance(s); });
+ BrotliEncoderSetParameter(s, BROTLI_PARAM_QUALITY, (uint32_t)9);
+ BrotliEncoderSetParameter(s, BROTLI_PARAM_LGWIN, 22);
+ for (auto i = in.buffers().begin(); i != in.buffers().end();) {
+ size_t available_in = i->length();
+ size_t max_comp_size = BrotliEncoderMaxCompressedSize(available_in);
+ size_t available_out = max_comp_size;
+ bufferptr ptr = buffer::create_small_page_aligned(max_comp_size);
+ uint8_t* next_out = (uint8_t*)ptr.c_str();
+ const uint8_t* next_in = (uint8_t*)i->c_str();
+ ++i;
+ BrotliEncoderOperation finish = i != in.buffers().end() ?
+ BROTLI_OPERATION_PROCESS :
+ BROTLI_OPERATION_FINISH;
+ do {
+ if (!BrotliEncoderCompressStream(s,
+ finish,
+ &available_in,
+ &next_in,
+ &available_out,
+ &next_out,
+ nullptr)) {
+ return -1;
+ }
+ unsigned have = max_comp_size - available_out;
+ out.append(ptr, 0, have);
+ } while (available_out == 0);
+ if (BrotliEncoderIsFinished(s)) {
+ break;
+ }
+ }
+ return 0;
+}
+
+int BrotliCompressor::decompress(bufferlist::const_iterator &p,
+ size_t compressed_size,
+ bufferlist &out,
+ boost::optional<int32_t> compressor_message)
+{
+ BrotliDecoderState* s = BrotliDecoderCreateInstance(nullptr,
+ nullptr,
+ nullptr);
+ if (!s) {
+ return -1;
+ }
+ auto sg = make_scope_guard([&s] { BrotliDecoderDestroyInstance(s); });
+ size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size);
+ while (remaining) {
+ const uint8_t* next_in;
+ size_t len = p.get_ptr_and_advance(remaining, (const char**)&next_in);
+ remaining -= len;
+ size_t available_in = len;
+ BrotliDecoderResult result = BROTLI_DECODER_RESULT_ERROR;
+ do {
+ size_t available_out = MAX_LEN;
+ bufferptr ptr = buffer::create_page_aligned(MAX_LEN);
+ uint8_t* next_out = (uint8_t*)ptr.c_str();
+ result = BrotliDecoderDecompressStream(s,
+ &available_in,
+ &next_in,
+ &available_out,
+ &next_out,
+ 0);
+ if (!result) {
+ return -1;
+ }
+ unsigned have = MAX_LEN - available_out;
+ out.append(ptr, 0, have);
+ } while (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
+ if (BrotliDecoderIsFinished(s)) {
+ break;
+ }
+ }
+ return 0;
+}
+
+int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message)
+{
+ auto i = std::cbegin(in);
+ return decompress(i, in.length(), out, compressor_message);
+}
diff --git a/src/compressor/brotli/BrotliCompressor.h b/src/compressor/brotli/BrotliCompressor.h
new file mode 100644
index 000000000..373300645
--- /dev/null
+++ b/src/compressor/brotli/BrotliCompressor.h
@@ -0,0 +1,31 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BROTLICOMPRESSOR_H
+#define CEPH_BROTLICOMPRESSOR_H
+
+
+#include "include/buffer.h"
+#include "compressor/Compressor.h"
+
+class BrotliCompressor : public Compressor
+{
+ public:
+ BrotliCompressor() : Compressor(COMP_ALG_BROTLI, "brotli") {}
+
+ int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override;
+ int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override;
+ int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override;
+};
+
+#endif //CEPH_BROTLICOMPRESSOR_H
+
diff --git a/src/compressor/brotli/CMakeLists.txt b/src/compressor/brotli/CMakeLists.txt
new file mode 100644
index 000000000..31a376289
--- /dev/null
+++ b/src/compressor/brotli/CMakeLists.txt
@@ -0,0 +1,34 @@
+# brotli
+
+set(brotli_sources
+ CompressionPluginBrotli.cc
+ BrotliCompressor.cc
+)
+include(ExternalProject)
+ExternalProject_Add(brotli_ext
+ DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/src/
+ GIT_REPOSITORY "https://github.com/google/brotli.git"
+ GIT_TAG "v1.0.7"
+ GIT_SHALLOW TRUE
+ SOURCE_DIR ${CMAKE_BINARY_DIR}/src/brotli
+ CONFIGURE_COMMAND ./configure-cmake --disable-debug
+ INSTALL_COMMAND ""
+ BUILD_COMMAND $(MAKE)
+ BUILD_IN_SOURCE 1
+ INSTALL_COMMAND "")
+
+set(brotli_libs enc dec common)
+file(MAKE_DIRECTORY "${CMAKE_BINARY_DIR}/src/brotli/c/include")
+foreach(lib ${brotli_libs})
+ add_library(brotli::${lib} STATIC IMPORTED)
+ add_dependencies(brotli::${lib} brotli_ext)
+ set_target_properties(brotli::${lib} PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_BINARY_DIR}/src/brotli/c/include"
+ IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/src/brotli/libbrotli${lib}-static.a")
+ list(APPEND BROTLI_LIBRARIES brotli::${lib})
+endforeach()
+
+add_library(ceph_brotli SHARED ${brotli_sources})
+list(REVERSE brotli_libs)
+target_link_libraries(ceph_brotli PRIVATE ${BROTLI_LIBRARIES})
+install(TARGETS ceph_brotli DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/brotli/CompressionPluginBrotli.cc b/src/compressor/brotli/CompressionPluginBrotli.cc
new file mode 100644
index 000000000..245f49dbb
--- /dev/null
+++ b/src/compressor/brotli/CompressionPluginBrotli.cc
@@ -0,0 +1,19 @@
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "CompressionPluginBrotli.h"
+#include "common/ceph_context.h"
+
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ PluginRegistry *instance = cct->get_plugin_registry();
+ return instance->add(type, name, new CompressionPluginBrotli(cct));
+}
+
diff --git a/src/compressor/brotli/CompressionPluginBrotli.h b/src/compressor/brotli/CompressionPluginBrotli.h
new file mode 100644
index 000000000..641a6e1c9
--- /dev/null
+++ b/src/compressor/brotli/CompressionPluginBrotli.h
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_BROTLI_H
+#define CEPH_COMPRESSION_PLUGIN_BROTLI_H
+
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "BrotliCompressor.h"
+
+class CompressionPluginBrotli : public CompressionPlugin {
+public:
+ explicit CompressionPluginBrotli(CephContext *cct) : CompressionPlugin(cct)
+ {}
+
+ virtual int factory(CompressorRef *cs, std::ostream *ss)
+ {
+ if (compressor == nullptr) {
+ BrotliCompressor *interface = new BrotliCompressor();
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/lz4/CMakeLists.txt b/src/compressor/lz4/CMakeLists.txt
new file mode 100644
index 000000000..ff8e14c29
--- /dev/null
+++ b/src/compressor/lz4/CMakeLists.txt
@@ -0,0 +1,14 @@
+# lz4
+
+set(lz4_sources
+ CompressionPluginLZ4.cc
+)
+
+add_library(ceph_lz4 SHARED ${lz4_sources})
+target_link_libraries(ceph_lz4
+ PRIVATE LZ4::LZ4 compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+set_target_properties(ceph_lz4 PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_lz4 DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/lz4/CompressionPluginLZ4.cc b/src/compressor/lz4/CompressionPluginLZ4.cc
new file mode 100644
index 000000000..0d05500e4
--- /dev/null
+++ b/src/compressor/lz4/CompressionPluginLZ4.cc
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 XSKY Inc.
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginLZ4.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginLZ4(cct));
+}
diff --git a/src/compressor/lz4/CompressionPluginLZ4.h b/src/compressor/lz4/CompressionPluginLZ4.h
new file mode 100644
index 000000000..38e5b9cc5
--- /dev/null
+++ b/src/compressor/lz4/CompressionPluginLZ4.h
@@ -0,0 +1,41 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 XSKY Inc.
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_LZ4_H
+#define CEPH_COMPRESSION_PLUGIN_LZ4_H
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "LZ4Compressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginLZ4 : public ceph::CompressionPlugin {
+
+public:
+
+ explicit CompressionPluginLZ4(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs, std::ostream *ss) override {
+ if (compressor == 0) {
+ LZ4Compressor *interface = new LZ4Compressor(cct);
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/lz4/LZ4Compressor.h b/src/compressor/lz4/LZ4Compressor.h
new file mode 100644
index 000000000..eca08e1a5
--- /dev/null
+++ b/src/compressor/lz4/LZ4Compressor.h
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_LZ4COMPRESSOR_H
+#define CEPH_LZ4COMPRESSOR_H
+
+#include <optional>
+#include <lz4.h>
+
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "common/config.h"
+
+
+class LZ4Compressor : public Compressor {
+ public:
+ LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
+
+ int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override {
+ // older versions of liblz4 introduce bit errors when compressing
+ // fragmented buffers. this was fixed in lz4 commit
+ // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
+ // appeared in v1.8.2.
+ //
+ // workaround: rebuild if not contiguous.
+ if (!src.is_contiguous()) {
+ ceph::buffer::list new_src = src;
+ new_src.rebuild();
+ return compress(new_src, dst, compressor_message);
+ }
+
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(src, dst, compressor_message);
+#endif
+ ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
+ LZ4_compressBound(src.length()));
+ LZ4_stream_t lz4_stream;
+ LZ4_resetStream(&lz4_stream);
+
+ using ceph::encode;
+
+ auto p = src.begin();
+ size_t left = src.length();
+ int pos = 0;
+ const char *data;
+ unsigned num = src.get_num_buffers();
+ encode((uint32_t)num, dst);
+ while (left) {
+ uint32_t origin_len = p.get_ptr_and_advance(left, &data);
+ int compressed_len = LZ4_compress_fast_continue(
+ &lz4_stream, data, outptr.c_str()+pos, origin_len,
+ outptr.length()-pos, 1);
+ if (compressed_len <= 0)
+ return -1;
+ pos += compressed_len;
+ left -= origin_len;
+ encode(origin_len, dst);
+ encode((uint32_t)compressed_len, dst);
+ }
+ ceph_assert(p.end());
+
+ dst.append(outptr, 0, pos);
+ return 0;
+ }
+
+ int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(src, dst, compressor_message);
+#endif
+ auto i = std::cbegin(src);
+ return decompress(i, src.length(), dst, compressor_message);
+ }
+
+ int decompress(ceph::buffer::list::const_iterator &p,
+ size_t compressed_len,
+ ceph::buffer::list &dst,
+ std::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_len, dst, compressor_message);
+#endif
+ using ceph::decode;
+ uint32_t count;
+ decode(count, p);
+ std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count);
+ uint32_t total_origin = 0;
+ for (auto& [dst_size, src_size] : compressed_pairs) {
+ decode(dst_size, p);
+ decode(src_size, p);
+ total_origin += dst_size;
+ }
+ compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
+
+ ceph::buffer::ptr dstptr(total_origin);
+ LZ4_streamDecode_t lz4_stream_decode;
+ LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
+
+ ceph::buffer::ptr cur_ptr = p.get_current_ptr();
+ ceph::buffer::ptr *ptr = &cur_ptr;
+ std::optional<ceph::buffer::ptr> data_holder;
+ if (compressed_len != cur_ptr.length()) {
+ data_holder.emplace(compressed_len);
+ p.copy_deep(compressed_len, *data_holder);
+ ptr = &*data_holder;
+ }
+
+ char *c_in = ptr->c_str();
+ char *c_out = dstptr.c_str();
+ for (unsigned i = 0; i < count; ++i) {
+ int r = LZ4_decompress_safe_continue(
+ &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
+ if (r == (int)compressed_pairs[i].first) {
+ c_in += compressed_pairs[i].second;
+ c_out += compressed_pairs[i].first;
+ } else if (r < 0) {
+ return -1;
+ } else {
+ return -2;
+ }
+ }
+ dst.push_back(std::move(dstptr));
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt
new file mode 100644
index 000000000..d1ba3b2e7
--- /dev/null
+++ b/src/compressor/snappy/CMakeLists.txt
@@ -0,0 +1,14 @@
+# snappy
+
+set(snappy_sources
+ CompressionPluginSnappy.cc
+)
+
+add_library(ceph_snappy SHARED ${snappy_sources})
+target_link_libraries(ceph_snappy
+ PRIVATE snappy::snappy compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+set_target_properties(ceph_snappy PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_snappy DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc
new file mode 100644
index 000000000..85b6cf94a
--- /dev/null
+++ b/src/compressor/snappy/CompressionPluginSnappy.cc
@@ -0,0 +1,38 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+
+// -----------------------------------------------------------------------------
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginSnappy.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginSnappy(cct));
+}
diff --git a/src/compressor/snappy/CompressionPluginSnappy.h b/src/compressor/snappy/CompressionPluginSnappy.h
new file mode 100644
index 000000000..a9bc98f73
--- /dev/null
+++ b/src/compressor/snappy/CompressionPluginSnappy.h
@@ -0,0 +1,42 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_SNAPPY_H
+#define CEPH_COMPRESSION_PLUGIN_SNAPPY_H
+
+// -----------------------------------------------------------------------------
+#include "compressor/CompressionPlugin.h"
+#include "SnappyCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginSnappy : public ceph::CompressionPlugin {
+
+public:
+
+ explicit CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs,
+ std::ostream *ss) override
+ {
+ if (compressor == 0) {
+ SnappyCompressor *interface = new SnappyCompressor(cct);
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h
new file mode 100644
index 000000000..8150f783c
--- /dev/null
+++ b/src/compressor/snappy/SnappyCompressor.h
@@ -0,0 +1,116 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_SNAPPYCOMPRESSOR_H
+#define CEPH_SNAPPYCOMPRESSOR_H
+
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include "common/config.h"
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+
+class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
+ ceph::bufferlist::const_iterator pb;
+ size_t remaining;
+
+ public:
+ explicit BufferlistSource(ceph::bufferlist::const_iterator _pb, size_t _input_len)
+ : pb(_pb),
+ remaining(_input_len) {
+ remaining = std::min(remaining, (size_t)pb.get_remaining());
+ }
+ size_t Available() const override {
+ return remaining;
+ }
+ const char *Peek(size_t *len) override {
+ const char *data = NULL;
+ *len = 0;
+ size_t avail = Available();
+ if (avail) {
+ auto ptmp = pb;
+ *len = ptmp.get_ptr_and_advance(avail, &data);
+ }
+ return data;
+ }
+ void Skip(size_t n) override {
+ ceph_assert(n <= remaining);
+ pb += n;
+ remaining -= n;
+ }
+
+ ceph::bufferlist::const_iterator get_pos() const {
+ return pb;
+ }
+};
+
+class SnappyCompressor : public Compressor {
+ public:
+ SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
+
+ int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional<int32_t> &compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(src, dst, compressor_message);
+#endif
+ BufferlistSource source(const_cast<ceph::bufferlist&>(src).begin(), src.length());
+ ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned(
+ snappy::MaxCompressedLength(src.length()));
+ snappy::UncheckedByteArraySink sink(ptr.c_str());
+ snappy::Compress(&source, &sink);
+ dst.append(ptr, 0, sink.CurrentDestination() - ptr.c_str());
+ return 0;
+ }
+
+ int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(src, dst, compressor_message);
+#endif
+ auto i = src.begin();
+ return decompress(i, src.length(), dst, compressor_message);
+ }
+
+ int decompress(ceph::bufferlist::const_iterator &p,
+ size_t compressed_len,
+ ceph::bufferlist &dst,
+ std::optional<int32_t> compressor_message) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_len, dst, compressor_message);
+#endif
+ BufferlistSource source_1(p, compressed_len);
+ uint32_t res_len = 0;
+ if (!snappy::GetUncompressedLength(&source_1, &res_len)) {
+ return -1;
+ }
+ BufferlistSource source_2(p, compressed_len);
+ ceph::bufferptr ptr(res_len);
+ if (snappy::RawUncompress(&source_2, ptr.c_str())) {
+ p = source_2.get_pos();
+ dst.append(ptr);
+ return 0;
+ }
+ return -2;
+ }
+};
+
+#endif
diff --git a/src/compressor/zlib/CMakeLists.txt b/src/compressor/zlib/CMakeLists.txt
new file mode 100644
index 000000000..050ff03fa
--- /dev/null
+++ b/src/compressor/zlib/CMakeLists.txt
@@ -0,0 +1,99 @@
+# zlib
+
+if(HAVE_INTEL_SSE4_1 AND HAVE_NASM_X64_AVX2 AND (NOT APPLE))
+ set(CMAKE_ASM_FLAGS "-i ${PROJECT_SOURCE_DIR}/src/isa-l/igzip/ -i ${PROJECT_SOURCE_DIR}/src/isa-l/include/ ${CMAKE_ASM_FLAGS}")
+ set(zlib_sources
+ CompressionPluginZlib.cc
+ ZlibCompressor.cc
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/hufftables_c.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/flatten_ll.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/huff_codes.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base_aliases.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc64_base.c
+ )
+ list(APPEND zlib_sources
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_body.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_finish.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body_h1_gr_bt.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_finish.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/rfc1951_lookup.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_sse.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_avx2_4.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_multibinary.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_01.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_decode_block_stateless_01.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_decode_block_stateless_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate_multibinary.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_06.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/proc_heap.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_deflate_hash.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_gen_icf_map_lh1_06.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_gen_icf_map_lh1_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_set_long_icf_fg_04.asm
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_set_long_icf_fg_06.asm
+ )
+elseif(HAVE_ARMV8_SIMD)
+ set(zlib_asm_sources
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_inflate_multibinary_arm64.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_multibinary_arm64.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_deflate_body_aarch64.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_deflate_finish_aarch64.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/isal_deflate_icf_body_hash_hist.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/isal_deflate_icf_finish_hash_hist.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_set_long_icf_fg.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/isal_update_histogram.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_deflate_hash_aarch64.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_decode_huffman_code_block_aarch64.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_isal_adler32_neon.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/encode_df.S
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/gen_icf_map.S
+ )
+ set(zlib_sources
+ CompressionPluginZlib.cc
+ ZlibCompressor.cc
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/hufftables_c.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/flatten_ll.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/huff_codes.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/proc_heap_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_multibinary_aarch64_dispatcher.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base_aliases.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base.c
+ ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc64_base.c
+ ${zlib_asm_sources}
+ )
+ set_source_files_properties(${zlib_asm_sources} PROPERTIES
+ COMPILE_DEFINITIONS "__ASSEMBLY__"
+ INCLUDE_DIRECTORIES "${PROJECT_SOURCE_DIR}/src/isa-l/igzip;${PROJECT_SOURCE_DIR}/src/isa-l/igzip/aarch64"
+ )
+else()
+ set(zlib_sources
+ CompressionPluginZlib.cc
+ ZlibCompressor.cc
+ )
+endif()
+
+add_library(ceph_zlib SHARED ${zlib_sources})
+target_link_libraries(ceph_zlib ZLIB::ZLIB compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include")
+set_target_properties(ceph_zlib PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_zlib DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/zlib/CompressionPluginZlib.cc b/src/compressor/zlib/CompressionPluginZlib.cc
new file mode 100644
index 000000000..c0a53fa77
--- /dev/null
+++ b/src/compressor/zlib/CompressionPluginZlib.cc
@@ -0,0 +1,38 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+
+// -----------------------------------------------------------------------------
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginZlib.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginZlib(cct));
+}
diff --git a/src/compressor/zlib/CompressionPluginZlib.h b/src/compressor/zlib/CompressionPluginZlib.h
new file mode 100644
index 000000000..597bc02a5
--- /dev/null
+++ b/src/compressor/zlib/CompressionPluginZlib.h
@@ -0,0 +1,60 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_ZLIB_H
+#define CEPH_COMPRESSION_PLUGIN_ZLIB_H
+
+// -----------------------------------------------------------------------------
+#include "arch/probe.h"
+#include "arch/intel.h"
+#include "arch/arm.h"
+#include "common/ceph_context.h"
+#include "compressor/CompressionPlugin.h"
+#include "ZlibCompressor.h"
+
+// -----------------------------------------------------------------------------
+
+class CompressionPluginZlib : public ceph::CompressionPlugin {
+public:
+ bool has_isal = false;
+
+ explicit CompressionPluginZlib(CephContext *cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs,
+ std::ostream *ss) override
+ {
+ bool isal = false;
+#if defined(__i386__) || defined(__x86_64__)
+ // other arches or lack of support result in isal = false
+ if (cct->_conf->compressor_zlib_isal) {
+ ceph_arch_probe();
+ isal = (ceph_arch_intel_pclmul && ceph_arch_intel_sse41);
+ }
+#elif defined(__aarch64__)
+ if (cct->_conf->compressor_zlib_isal) {
+ ceph_arch_probe();
+ isal = (ceph_arch_aarch64_pmull && ceph_arch_neon);
+ }
+#endif
+ if (compressor == 0 || has_isal != isal) {
+ compressor = std::make_shared<ZlibCompressor>(cct, isal);
+ has_isal = isal;
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc
new file mode 100644
index 000000000..9795d79b3
--- /dev/null
+++ b/src/compressor/zlib/ZlibCompressor.cc
@@ -0,0 +1,252 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+// -----------------------------------------------------------------------------
+#include "common/debug.h"
+#include "ZlibCompressor.h"
+#include "osd/osd_types.h"
+#include "isa-l/include/igzip_lib.h"
+// -----------------------------------------------------------------------------
+
+#include <zlib.h>
+
+// -----------------------------------------------------------------------------
+#define dout_context cct
+#define dout_subsys ceph_subsys_compressor
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+// -----------------------------------------------------------------------------
+
+// -----------------------------------------------------------------------------
+
+using std::ostream;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+
+static ostream&
+_prefix(std::ostream* _dout)
+{
+ return *_dout << "ZlibCompressor: ";
+}
+// -----------------------------------------------------------------------------
+
+#define MAX_LEN (CEPH_PAGE_SIZE)
+
+// default window size for Zlib 1.2.8, negated for raw deflate
+#define ZLIB_DEFAULT_WIN_SIZE -15
+
+// desired memory usage level. increasing to 9 doesn't speed things up
+// significantly (helps only on >=16K blocks) and sometimes degrades
+// compression ratio.
+#define ZLIB_MEMORY_LEVEL 8
+
+int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message)
+{
+ int ret;
+ unsigned have;
+ z_stream strm;
+ unsigned char* c_in;
+ int begin = 1;
+
+ /* allocate deflate state */
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, cct->_conf->compressor_zlib_winsize, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY);
+ if (ret != Z_OK) {
+ dout(1) << "Compression init error: init return "
+ << ret << " instead of Z_OK" << dendl;
+ return -1;
+ }
+ compressor_message = cct->_conf->compressor_zlib_winsize;
+
+ for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
+ i != in.buffers().end();) {
+
+ c_in = (unsigned char*) (*i).c_str();
+ long unsigned int len = (*i).length();
+ ++i;
+
+ strm.avail_in = len;
+ int flush = i != in.buffers().end() ? Z_NO_FLUSH : Z_FINISH;
+
+ strm.next_in = c_in;
+ do {
+ bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN);
+ strm.next_out = (unsigned char*)ptr.c_str() + begin;
+ strm.avail_out = MAX_LEN - begin;
+ if (begin) {
+ // put a compressor variation mark in front of compressed stream, not used at the moment
+ ptr.c_str()[0] = 0;
+ begin = 0;
+ }
+ ret = deflate(&strm, flush); /* no bad return value */
+ if (ret == Z_STREAM_ERROR) {
+ dout(1) << "Compression error: compress return Z_STREAM_ERROR("
+ << ret << ")" << dendl;
+ deflateEnd(&strm);
+ return -1;
+ }
+ have = MAX_LEN - strm.avail_out;
+ out.append(ptr, 0, have);
+ } while (strm.avail_out == 0);
+ if (strm.avail_in != 0) {
+ dout(10) << "Compression error: unused input" << dendl;
+ deflateEnd(&strm);
+ return -1;
+ }
+ }
+
+ deflateEnd(&strm);
+ return 0;
+}
+
+#if (__x86_64__ && defined(HAVE_NASM_X64_AVX2)) || defined(__aarch64__)
+int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message)
+{
+ int ret;
+ unsigned have;
+ isal_zstream strm;
+ unsigned char* c_in;
+ int begin = 1;
+
+ /* allocate deflate state */
+ isal_deflate_init(&strm);
+ strm.end_of_stream = 0;
+ compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+
+ for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
+ i != in.buffers().end();) {
+
+ c_in = (unsigned char*) (*i).c_str();
+ long unsigned int len = (*i).length();
+ ++i;
+
+ strm.avail_in = len;
+ strm.end_of_stream = (i == in.buffers().end());
+ strm.flush = FINISH_FLUSH;
+
+ strm.next_in = c_in;
+
+ do {
+ bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN);
+ strm.next_out = (unsigned char*)ptr.c_str() + begin;
+ strm.avail_out = MAX_LEN - begin;
+ if (begin) {
+ // put a compressor variation mark in front of compressed stream, not used at the moment
+ ptr.c_str()[0] = 1;
+ begin = 0;
+ }
+ ret = isal_deflate(&strm);
+ if (ret != COMP_OK) {
+ dout(1) << "Compression error: isal_deflate return error ("
+ << ret << ")" << dendl;
+ return -1;
+ }
+ have = MAX_LEN - strm.avail_out;
+ out.append(ptr, 0, have);
+ } while (strm.avail_out == 0);
+ if (strm.avail_in != 0) {
+ dout(10) << "Compression error: unused input" << dendl;
+ return -1;
+ }
+ }
+
+ return 0;
+}
+#endif
+
+int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message)
+{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(in, out, compressor_message);
+#endif
+#if (__x86_64__ && defined(HAVE_NASM_X64_AVX2)) || defined(__aarch64__)
+ if (isal_enabled)
+ return isal_compress(in, out, compressor_message);
+ else
+ return zlib_compress(in, out, compressor_message);
+#else
+ return zlib_compress(in, out, compressor_message);
+#endif
+}
+
+int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message)
+{
+#ifdef HAVE_QATZIP
+ // QAT can only decompress with the default window size
+ if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
+ return qat_accel.decompress(p, compressed_size, out, compressor_message);
+#endif
+
+ int ret;
+ unsigned have;
+ z_stream strm;
+ const char* c_in;
+ int begin = 1;
+
+ /* allocate inflate state */
+ strm.zalloc = Z_NULL;
+ strm.zfree = Z_NULL;
+ strm.opaque = Z_NULL;
+ strm.avail_in = 0;
+ strm.next_in = Z_NULL;
+
+ // choose the variation of compressor
+ if (!compressor_message)
+ compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+ ret = inflateInit2(&strm, *compressor_message);
+ if (ret != Z_OK) {
+ dout(1) << "Decompression init error: init return "
+ << ret << " instead of Z_OK" << dendl;
+ return -1;
+ }
+
+ size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size);
+
+ while(remaining) {
+ long unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+ remaining -= len;
+ strm.avail_in = len - begin;
+ strm.next_in = (unsigned char*)c_in + begin;
+ begin = 0;
+
+ do {
+ strm.avail_out = MAX_LEN;
+ bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN);
+ strm.next_out = (unsigned char*)ptr.c_str();
+ ret = inflate(&strm, Z_NO_FLUSH);
+ if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) {
+ dout(1) << "Decompression error: decompress return "
+ << ret << dendl;
+ inflateEnd(&strm);
+ return -1;
+ }
+ have = MAX_LEN - strm.avail_out;
+ out.append(ptr, 0, have);
+ } while (strm.avail_out == 0);
+ }
+
+ /* clean up and return */
+ (void)inflateEnd(&strm);
+ return 0;
+}
+
+int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out, std::optional<int32_t> compressor_message)
+{
+ auto i = std::cbegin(in);
+ return decompress(i, in.length(), out, compressor_message);
+}
diff --git a/src/compressor/zlib/ZlibCompressor.h b/src/compressor/zlib/ZlibCompressor.h
new file mode 100644
index 000000000..da1c8117e
--- /dev/null
+++ b/src/compressor/zlib/ZlibCompressor.h
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_ZLIB_H
+#define CEPH_COMPRESSION_ZLIB_H
+
+#include "common/config.h"
+#include "compressor/Compressor.h"
+
+class ZlibCompressor : public Compressor {
+ bool isal_enabled;
+ CephContext *const cct;
+public:
+ ZlibCompressor(CephContext *cct, bool isal)
+ : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
+
+ int compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message) override;
+ int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> compressor_message) override;
+ int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out, std::optional<int32_t> compressor_message) override;
+private:
+ int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message);
+ int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message);
+ };
+
+
+#endif
diff --git a/src/compressor/zstd/CMakeLists.txt b/src/compressor/zstd/CMakeLists.txt
new file mode 100644
index 000000000..5a522840a
--- /dev/null
+++ b/src/compressor/zstd/CMakeLists.txt
@@ -0,0 +1,21 @@
+# zstd
+
+option(WITH_SYSTEM_ZSTD "use prebuilt libzstd in system" OFF)
+
+if(WITH_SYSTEM_ZSTD)
+ find_package(Zstd 1.4.4 REQUIRED)
+else()
+ include(BuildZstd)
+ build_Zstd()
+endif()
+
+set(zstd_sources
+ CompressionPluginZstd.cc)
+
+add_library(ceph_zstd SHARED ${zstd_sources})
+target_link_libraries(ceph_zstd PRIVATE Zstd::Zstd $<$<PLATFORM_ID:Windows>:ceph-common>)
+set_target_properties(ceph_zstd PROPERTIES
+ VERSION 2.0.0
+ SOVERSION 2
+ INSTALL_RPATH "")
+install(TARGETS ceph_zstd DESTINATION ${compressor_plugin_dir})
diff --git a/src/compressor/zstd/CompressionPluginZstd.cc b/src/compressor/zstd/CompressionPluginZstd.cc
new file mode 100644
index 000000000..92aee7343
--- /dev/null
+++ b/src/compressor/zstd/CompressionPluginZstd.cc
@@ -0,0 +1,36 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#include "acconfig.h"
+#include "ceph_ver.h"
+#include "common/ceph_context.h"
+#include "CompressionPluginZstd.h"
+
+// -----------------------------------------------------------------------------
+
+const char *__ceph_plugin_version()
+{
+ return CEPH_GIT_NICE_VER;
+}
+
+// -----------------------------------------------------------------------------
+
+int __ceph_plugin_init(CephContext *cct,
+ const std::string& type,
+ const std::string& name)
+{
+ auto instance = cct->get_plugin_registry();
+
+ return instance->add(type, name, new CompressionPluginZstd(cct));
+}
diff --git a/src/compressor/zstd/CompressionPluginZstd.h b/src/compressor/zstd/CompressionPluginZstd.h
new file mode 100644
index 000000000..632e7c6dc
--- /dev/null
+++ b/src/compressor/zstd/CompressionPluginZstd.h
@@ -0,0 +1,43 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Mirantis, Inc.
+ *
+ * Author: Alyona Kiseleva <akiselyova@mirantis.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ */
+
+#ifndef CEPH_COMPRESSION_PLUGIN_ZSTD_H
+#define CEPH_COMPRESSION_PLUGIN_ZSTD_H
+
+// -----------------------------------------------------------------------------
+#include "ceph_ver.h"
+#include "compressor/CompressionPlugin.h"
+#include "ZstdCompressor.h"
+// -----------------------------------------------------------------------------
+
+class CompressionPluginZstd : public ceph::CompressionPlugin {
+
+public:
+
+ explicit CompressionPluginZstd(CephContext* cct) : CompressionPlugin(cct)
+ {}
+
+ int factory(CompressorRef *cs,
+ std::ostream *ss) override
+ {
+ if (compressor == 0) {
+ ZstdCompressor *interface = new ZstdCompressor(cct);
+ compressor = CompressorRef(interface);
+ }
+ *cs = compressor;
+ return 0;
+ }
+};
+
+#endif
diff --git a/src/compressor/zstd/ZstdCompressor.h b/src/compressor/zstd/ZstdCompressor.h
new file mode 100644
index 000000000..859bd6b57
--- /dev/null
+++ b/src/compressor/zstd/ZstdCompressor.h
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_ZSTDCOMPRESSOR_H
+#define CEPH_ZSTDCOMPRESSOR_H
+
+#define ZSTD_STATIC_LINKING_ONLY
+#include "zstd/lib/zstd.h"
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "compressor/Compressor.h"
+
+class ZstdCompressor : public Compressor {
+ public:
+ ZstdCompressor(CephContext *cct) : Compressor(COMP_ALG_ZSTD, "zstd"), cct(cct) {}
+
+ int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override {
+ ZSTD_CStream *s = ZSTD_createCStream();
+ ZSTD_initCStream_srcSize(s, cct->_conf->compressor_zstd_level, src.length());
+ auto p = src.begin();
+ size_t left = src.length();
+
+ size_t const out_max = ZSTD_compressBound(left);
+ ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(out_max);
+ ZSTD_outBuffer_s outbuf;
+ outbuf.dst = outptr.c_str();
+ outbuf.size = outptr.length();
+ outbuf.pos = 0;
+
+ while (left) {
+ ceph_assert(!p.end());
+ struct ZSTD_inBuffer_s inbuf;
+ inbuf.pos = 0;
+ inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src);
+ left -= inbuf.size;
+ ZSTD_EndDirective const zed = (left==0) ? ZSTD_e_end : ZSTD_e_continue;
+ size_t r = ZSTD_compressStream2(s, &outbuf, &inbuf, zed);
+ if (ZSTD_isError(r)) {
+ return -EINVAL;
+ }
+ }
+ ceph_assert(p.end());
+
+ ZSTD_freeCStream(s);
+
+ // prefix with decompressed length
+ ceph::encode((uint32_t)src.length(), dst);
+ dst.append(outptr, 0, outbuf.pos);
+ return 0;
+ }
+
+ int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override {
+ auto i = std::cbegin(src);
+ return decompress(i, src.length(), dst, compressor_message);
+ }
+
+ int decompress(ceph::buffer::list::const_iterator &p,
+ size_t compressed_len,
+ ceph::buffer::list &dst,
+ std::optional<int32_t> compressor_message) override {
+ if (compressed_len < 4) {
+ return -1;
+ }
+ compressed_len -= 4;
+ uint32_t dst_len;
+ ceph::decode(dst_len, p);
+
+ ceph::buffer::ptr dstptr(dst_len);
+ ZSTD_outBuffer_s outbuf;
+ outbuf.dst = dstptr.c_str();
+ outbuf.size = dstptr.length();
+ outbuf.pos = 0;
+ ZSTD_DStream *s = ZSTD_createDStream();
+ ZSTD_initDStream(s);
+ while (compressed_len > 0) {
+ if (p.end()) {
+ return -1;
+ }
+ ZSTD_inBuffer_s inbuf;
+ inbuf.pos = 0;
+ inbuf.size = p.get_ptr_and_advance(compressed_len,
+ (const char**)&inbuf.src);
+ ZSTD_decompressStream(s, &outbuf, &inbuf);
+ compressed_len -= inbuf.size;
+ }
+ ZSTD_freeDStream(s);
+
+ dst.append(dstptr, 0, outbuf.pos);
+ return 0;
+ }
+ private:
+ CephContext *const cct;
+};
+
+#endif