diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/compressor | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
28 files changed, 1980 insertions, 0 deletions
diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt new file mode 100644 index 000000000..3e99f8b73 --- /dev/null +++ b/src/compressor/CMakeLists.txt @@ -0,0 +1,51 @@ + +set(compressor_srcs + Compressor.cc) +if (HAVE_QATZIP) + list(APPEND compressor_srcs QatAccel.cc) +endif() +add_library(compressor_objs OBJECT ${compressor_srcs}) +add_dependencies(compressor_objs common-objs) +if(HAVE_QATZIP AND HAVE_QATDRV) + target_link_libraries(compressor_objs PRIVATE + QatDrv::qat_s + QatDrv::usdm_drv_s + qatzip::qatzip + ) +endif() +add_dependencies(compressor_objs legacy-option-headers) + +## compressor plugins + +set(compressor_plugin_dir ${CEPH_INSTALL_PKGLIBDIR}/compressor) + +add_subdirectory(snappy) +add_subdirectory(zlib) +add_subdirectory(zstd) + +if(HAVE_LZ4) + add_subdirectory(lz4) +endif() + +if(HAVE_BROTLI) + add_subdirectory(brotli) +endif() + +add_library(compressor STATIC $<TARGET_OBJECTS:compressor_objs>) +target_link_libraries(compressor PRIVATE compressor_objs) + +set(ceph_compressor_libs + ceph_snappy + ceph_zlib + ceph_zstd) + +if(HAVE_LZ4) + list(APPEND ceph_compressor_libs ceph_lz4) +endif() + +if(HAVE_BROTLI) + list(APPEND ceph_compressor_libs ceph_brotli) +endif() + +add_custom_target(compressor_plugins DEPENDS + ${ceph_compressor_libs}) diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h new file mode 100644 index 000000000..2a21f2fef --- /dev/null +++ b/src/compressor/CompressionPlugin.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph distributed storage system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef COMPRESSION_PLUGIN_H +#define COMPRESSION_PLUGIN_H + +#include <iosfwd> +#include <iostream> + +#include "common/PluginRegistry.h" +#include "include/common_fwd.h" +#include "Compressor.h" + +namespace ceph { + + class CompressionPlugin : public Plugin { + public: + TOPNSPC::CompressorRef compressor; + + explicit CompressionPlugin(CephContext *cct) + : Plugin(cct) + {} + + ~CompressionPlugin() override {} + + virtual int factory(TOPNSPC::CompressorRef *cs, + std::ostream *ss) = 0; + + virtual const char* name() {return "CompressionPlugin";} + }; + +} + +#endif diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc new file mode 100644 index 000000000..43d34c8eb --- /dev/null +++ b/src/compressor/Compressor.cc @@ -0,0 +1,112 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <random> +#include <sstream> +#include <iterator> +#include <algorithm> + +#include "CompressionPlugin.h" +#include "Compressor.h" +#include "include/random.h" +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/dout.h" + +namespace TOPNSPC { + +#ifdef HAVE_QATZIP + QatAccel Compressor::qat_accel; +#endif + +const char* Compressor::get_comp_alg_name(int a) { + + auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), + [a](const auto& kv) { return kv.second == a; }); + + if (std::cend(compression_algorithms) == p) + return "???"; // It would be nice to revise this... + + return p->first; +} + +std::optional<Compressor::CompressionAlgorithm> +Compressor::get_comp_alg_type(std::string_view s) { + + auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), + [&s](const auto& kv) { return kv.first == s; }); + if (std::cend(compression_algorithms) == p) + return {}; + + return p->second; +} + +const char *Compressor::get_comp_mode_name(int m) { + switch (m) { + case COMP_NONE: return "none"; + case COMP_PASSIVE: return "passive"; + case COMP_AGGRESSIVE: return "aggressive"; + case COMP_FORCE: return "force"; + default: return "???"; + } +} +std::optional<Compressor::CompressionMode> +Compressor::get_comp_mode_type(std::string_view s) { + if (s == "force") + return COMP_FORCE; + if (s == "aggressive") + return COMP_AGGRESSIVE; + if (s == "passive") + return COMP_PASSIVE; + if (s == "none") + return COMP_NONE; + return {}; +} + +CompressorRef Compressor::create(CephContext *cct, const std::string &type) +{ + // support "random" for teuthology testing + if (type == "random") { + int alg = ceph::util::generate_random_number(0, COMP_ALG_LAST - 1); + if (alg == COMP_ALG_NONE) { + return nullptr; + } + return create(cct, alg); + } + + CompressorRef cs_impl = NULL; + std::stringstream ss; + auto reg = cct->get_plugin_registry(); + auto factory = dynamic_cast<ceph::CompressionPlugin*>(reg->get_with_load("compressor", type)); + if (factory == NULL) { + lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl; + return NULL; + } + int err = factory->factory(&cs_impl, &ss); + if (err) + lderr(cct) << __func__ << " factory return error " << err << dendl; + return cs_impl; +} + +CompressorRef Compressor::create(CephContext *cct, int alg) +{ + if (alg < 0 || alg >= COMP_ALG_LAST) { + lderr(cct) << __func__ << " invalid algorithm value:" << alg << dendl; + return CompressorRef(); + } + std::string type_name = get_comp_alg_name(alg); + return create(cct, type_name); +} + +} // namespace TOPNSPC diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h new file mode 100644 index 000000000..276cd875a --- /dev/null +++ b/src/compressor/Compressor.h @@ -0,0 +1,109 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMPRESSOR_H +#define CEPH_COMPRESSOR_H + +#include <memory> +#include <optional> +#include <string> +#include <string_view> +#include "include/ceph_assert.h" // boost clobbers this +#include "include/common_fwd.h" +#include "include/buffer.h" +#include "include/int_types.h" +#ifdef HAVE_QATZIP + #include "QatAccel.h" +#endif + +namespace TOPNSPC { + +class Compressor; +typedef std::shared_ptr<Compressor> CompressorRef; + +class Compressor { +public: + enum CompressionAlgorithm { + COMP_ALG_NONE = 0, + COMP_ALG_SNAPPY = 1, + COMP_ALG_ZLIB = 2, + COMP_ALG_ZSTD = 3, +#ifdef HAVE_LZ4 + COMP_ALG_LZ4 = 4, +#endif +#ifdef HAVE_BROTLI + COMP_ALG_BROTLI = 5, +#endif + COMP_ALG_LAST //the last value for range checks + }; + + using pair_type = std::pair<const char*, CompressionAlgorithm>; + static constexpr std::initializer_list<pair_type> compression_algorithms { + { "none", COMP_ALG_NONE }, + { "snappy", COMP_ALG_SNAPPY }, + { "zlib", COMP_ALG_ZLIB }, + { "zstd", COMP_ALG_ZSTD }, +#ifdef HAVE_LZ4 + { "lz4", COMP_ALG_LZ4 }, +#endif +#ifdef HAVE_BROTLI + { "brotli", COMP_ALG_BROTLI }, +#endif + }; + + // compression options + enum CompressionMode { + COMP_NONE, ///< compress never + COMP_PASSIVE, ///< compress if hinted COMPRESSIBLE + COMP_AGGRESSIVE, ///< compress unless hinted INCOMPRESSIBLE + COMP_FORCE ///< compress always + }; + +#ifdef HAVE_QATZIP + bool qat_enabled; + static QatAccel qat_accel; +#endif + + static const char* get_comp_alg_name(int a); + static std::optional<CompressionAlgorithm> get_comp_alg_type(std::string_view s); + + static const char *get_comp_mode_name(int m); + static std::optional<CompressionMode> get_comp_mode_type(std::string_view s); + + Compressor(CompressionAlgorithm a, const char* t) : alg(a), type(t) { + } + virtual ~Compressor() {} + const std::string& get_type_name() const { + return type; + } + CompressionAlgorithm get_type() const { + return alg; + } + virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out, std::optional<int32_t> &compressor_message) = 0; + virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out, std::optional<int32_t> compressor_message) = 0; + // this is a bit weird but we need non-const iterator to be in + // alignment with decode methods + virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out, std::optional<int32_t> compressor_message) = 0; + + static CompressorRef create(CephContext *cct, const std::string &type); + static CompressorRef create(CephContext *cct, int alg); + +protected: + CompressionAlgorithm alg; + std::string type; + +}; + +} // namespace TOPNSPC +#endif diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc new file mode 100644 index 000000000..561fcc996 --- /dev/null +++ b/src/compressor/QatAccel.cc @@ -0,0 +1,240 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Intel Corporation + * + * Author: Qiaowei Ren <qiaowei.ren@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include <qatzip.h> + +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" +#include "QatAccel.h" + +// ----------------------------------------------------------------------------- +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_compressor +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static std::ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "QatAccel: "; +} +// ----------------------------------------------------------------------------- +// default window size for Zlib 1.2.8, negated for raw deflate +#define ZLIB_DEFAULT_WIN_SIZE -15 + +/* Estimate data expansion after decompression */ +static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000}; + +void QzSessionDeleter::operator() (struct QzSession_S *session) { + qzTeardownSession(session); + delete session; +} + +static bool get_qz_params(const std::string &alg, QzSessionParams_T ¶ms) { + int rc; + rc = qzGetDefaults(¶ms); + if (rc != QZ_OK) + return false; + params.direction = QZ_DIR_BOTH; + params.is_busy_polling = true; + if (alg == "zlib") { + params.comp_algorithm = QZ_DEFLATE; + params.data_fmt = QZ_DEFLATE_RAW; + params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level; + } + else { + // later, there also has lz4. + return false; + } + + rc = qzSetDefaults(¶ms); + if (rc != QZ_OK) + return false; + return true; +} + +static bool setup_session(QatAccel::session_ptr &session, QzSessionParams_T ¶ms) { + int rc; + rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT); + if (rc != QZ_OK && rc != QZ_DUPLICATE) + return false; + rc = qzSetupSession(session.get(), ¶ms); + if (rc != QZ_OK) { + return false; + } + return true; +} + +// put the session back to the session pool in a RAII manner +struct cached_session_t { + cached_session_t(QatAccel* accel, QatAccel::session_ptr&& sess) + : accel{accel}, session{std::move(sess)} {} + + ~cached_session_t() { + std::scoped_lock lock{accel->mutex}; + // if the cache size is still under its upper bound, the current session is put into + // accel->sessions. otherwise it's released right + uint64_t sessions_num = g_ceph_context->_conf.get_val<uint64_t>("qat_compressor_session_max_number"); + if (accel->sessions.size() < sessions_num) { + accel->sessions.push_back(std::move(session)); + } + } + + struct QzSession_S* get() { + assert(static_cast<bool>(session)); + return session.get(); + } + + QatAccel* accel; + QatAccel::session_ptr session; +}; + +QatAccel::session_ptr QatAccel::get_session() { + { + std::scoped_lock lock{mutex}; + if (!sessions.empty()) { + auto session = std::move(sessions.back()); + sessions.pop_back(); + return session; + } + } + + // If there are no available session to use, we try allocate a new + // session. + QzSessionParams_T params = {(QzHuffmanHdr_T)0,}; + session_ptr session(new struct QzSession_S()); + memset(session.get(), 0, sizeof(struct QzSession_S)); + if (get_qz_params(alg_name, params) && setup_session(session, params)) { + return session; + } else { + return nullptr; + } +} + +QatAccel::QatAccel() {} + +QatAccel::~QatAccel() { + // First, we should uninitialize all QATzip session that disconnects all session + // from a hardware instance and deallocates buffers. + sessions.clear(); + // Then we close the connection with QAT. + // where the value of the parameter passed to qzClose() does not matter. as long as + // it is not nullptr. + qzClose((QzSession_T*)1); +} + +bool QatAccel::init(const std::string &alg) { + std::scoped_lock lock(mutex); + if (!alg_name.empty()) { + return true; + } + + dout(15) << "First use for QAT compressor" << dendl; + if (alg != "zlib") { + return false; + } + + alg_name = alg; + return true; +} + +int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message) { + auto s = get_session(); // get a session from the pool + if (!s) { + return -1; // session initialization failed + } + auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction + compressor_message = ZLIB_DEFAULT_WIN_SIZE; + int begin = 1; + for (auto &i : in.buffers()) { + const unsigned char* c_in = (unsigned char*) i.c_str(); + unsigned int len = i.length(); + unsigned int out_len = qzMaxCompressedLength(len, session.get()) + begin; + + bufferptr ptr = buffer::create_small_page_aligned(out_len); + unsigned char* c_out = (unsigned char*)ptr.c_str() + begin; + int rc = qzCompress(session.get(), c_in, &len, c_out, &out_len, 1); + if (rc != QZ_OK) + return -1; + if (begin) { + // put a compressor variation mark in front of compressed stream, not used at the moment + ptr.c_str()[0] = 0; + out_len += begin; + begin = 0; + } + out.append(ptr, 0, out_len); + + } + + return 0; +} + +int QatAccel::decompress(const bufferlist &in, bufferlist &out, std::optional<int32_t> compressor_message) { + auto i = in.begin(); + return decompress(i, in.length(), out, compressor_message); +} + +int QatAccel::decompress(bufferlist::const_iterator &p, + size_t compressed_len, + bufferlist &dst, + std::optional<int32_t> compressor_message) { + auto s = get_session(); // get a session from the pool + if (!s) { + return -1; // session initialization failed + } + auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction + int begin = 1; + + int rc = 0; + bufferlist tmp; + size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len); + + while (remaining) { + unsigned int ratio_idx = 0; + const char* c_in = nullptr; + unsigned int len = p.get_ptr_and_advance(remaining, &c_in); + remaining -= len; + len -= begin; + c_in += begin; + begin = 0; + unsigned int out_len = QZ_HW_BUFF_SZ; + + bufferptr ptr; + do { + while (out_len <= len * expansion_ratio[ratio_idx]) { + out_len *= 2; + } + + ptr = buffer::create_small_page_aligned(out_len); + rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len); + ratio_idx++; + } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio)); + + if (rc == QZ_OK) { + dst.append(ptr, 0, out_len); + } else if (rc == QZ_DATA_ERROR) { + dout(1) << "QAT compressor DATA ERROR" << dendl; + return -1; + } else if (rc == QZ_BUF_ERROR) { + dout(1) << "QAT compressor BUF ERROR" << dendl; + return -1; + } else if (rc != QZ_OK) { + dout(1) << "QAT compressor NOT OK" << dendl; + return -1; + } + } + + return 0; +} diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h new file mode 100644 index 000000000..3533eff9b --- /dev/null +++ b/src/compressor/QatAccel.h @@ -0,0 +1,54 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Intel Corporation + * + * Author: Qiaowei Ren <qiaowei.ren@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_QATACCEL_H +#define CEPH_QATACCEL_H + +#include <condition_variable> +#include <memory> +#include <mutex> +#include <optional> +#include <vector> + +#include "include/buffer.h" + +extern "C" struct QzSession_S; // typedef struct QzSession_S QzSession_T; + +struct QzSessionDeleter { + void operator() (struct QzSession_S *session); +}; + +class QatAccel { + public: + using session_ptr = std::unique_ptr<struct QzSession_S, QzSessionDeleter>; + QatAccel(); + ~QatAccel(); + + bool init(const std::string &alg); + + int compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message); + int decompress(const bufferlist &in, bufferlist &out, std::optional<int32_t> compressor_message); + int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, std::optional<int32_t> compressor_message); + + private: + // get a session from the pool or create a new one. returns null if session init fails + session_ptr get_session(); + + friend struct cached_session_t; + std::vector<session_ptr> sessions; + std::mutex mutex; + std::string alg_name; +}; + +#endif diff --git a/src/compressor/brotli/BrotliCompressor.cc b/src/compressor/brotli/BrotliCompressor.cc new file mode 100644 index 000000000..ed4abef4b --- /dev/null +++ b/src/compressor/brotli/BrotliCompressor.cc @@ -0,0 +1,96 @@ +#include "brotli/encode.h" +#include "brotli/decode.h" +#include "BrotliCompressor.h" +#include "include/scope_guard.h" + +#define MAX_LEN (CEPH_PAGE_SIZE) + +int BrotliCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) +{ + BrotliEncoderState* s = BrotliEncoderCreateInstance(nullptr, + nullptr, + nullptr); + if (!s) { + return -1; + } + auto sg = make_scope_guard([&s] { BrotliEncoderDestroyInstance(s); }); + BrotliEncoderSetParameter(s, BROTLI_PARAM_QUALITY, (uint32_t)9); + BrotliEncoderSetParameter(s, BROTLI_PARAM_LGWIN, 22); + for (auto i = in.buffers().begin(); i != in.buffers().end();) { + size_t available_in = i->length(); + size_t max_comp_size = BrotliEncoderMaxCompressedSize(available_in); + size_t available_out = max_comp_size; + bufferptr ptr = buffer::create_small_page_aligned(max_comp_size); + uint8_t* next_out = (uint8_t*)ptr.c_str(); + const uint8_t* next_in = (uint8_t*)i->c_str(); + ++i; + BrotliEncoderOperation finish = i != in.buffers().end() ? + BROTLI_OPERATION_PROCESS : + BROTLI_OPERATION_FINISH; + do { + if (!BrotliEncoderCompressStream(s, + finish, + &available_in, + &next_in, + &available_out, + &next_out, + nullptr)) { + return -1; + } + unsigned have = max_comp_size - available_out; + out.append(ptr, 0, have); + } while (available_out == 0); + if (BrotliEncoderIsFinished(s)) { + break; + } + } + return 0; +} + +int BrotliCompressor::decompress(bufferlist::const_iterator &p, + size_t compressed_size, + bufferlist &out, + boost::optional<int32_t> compressor_message) +{ + BrotliDecoderState* s = BrotliDecoderCreateInstance(nullptr, + nullptr, + nullptr); + if (!s) { + return -1; + } + auto sg = make_scope_guard([&s] { BrotliDecoderDestroyInstance(s); }); + size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size); + while (remaining) { + const uint8_t* next_in; + size_t len = p.get_ptr_and_advance(remaining, (const char**)&next_in); + remaining -= len; + size_t available_in = len; + BrotliDecoderResult result = BROTLI_DECODER_RESULT_ERROR; + do { + size_t available_out = MAX_LEN; + bufferptr ptr = buffer::create_page_aligned(MAX_LEN); + uint8_t* next_out = (uint8_t*)ptr.c_str(); + result = BrotliDecoderDecompressStream(s, + &available_in, + &next_in, + &available_out, + &next_out, + 0); + if (!result) { + return -1; + } + unsigned have = MAX_LEN - available_out; + out.append(ptr, 0, have); + } while (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT); + if (BrotliDecoderIsFinished(s)) { + break; + } + } + return 0; +} + +int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) +{ + auto i = std::cbegin(in); + return decompress(i, in.length(), out, compressor_message); +} diff --git a/src/compressor/brotli/BrotliCompressor.h b/src/compressor/brotli/BrotliCompressor.h new file mode 100644 index 000000000..373300645 --- /dev/null +++ b/src/compressor/brotli/BrotliCompressor.h @@ -0,0 +1,31 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BROTLICOMPRESSOR_H +#define CEPH_BROTLICOMPRESSOR_H + + +#include "include/buffer.h" +#include "compressor/Compressor.h" + +class BrotliCompressor : public Compressor +{ + public: + BrotliCompressor() : Compressor(COMP_ALG_BROTLI, "brotli") {} + + int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override; + int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override; + int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override; +}; + +#endif //CEPH_BROTLICOMPRESSOR_H + diff --git a/src/compressor/brotli/CMakeLists.txt b/src/compressor/brotli/CMakeLists.txt new file mode 100644 index 000000000..31a376289 --- /dev/null +++ b/src/compressor/brotli/CMakeLists.txt @@ -0,0 +1,34 @@ +# brotli + +set(brotli_sources + CompressionPluginBrotli.cc + BrotliCompressor.cc +) +include(ExternalProject) +ExternalProject_Add(brotli_ext + DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/src/ + GIT_REPOSITORY "https://github.com/google/brotli.git" + GIT_TAG "v1.0.7" + GIT_SHALLOW TRUE + SOURCE_DIR ${CMAKE_BINARY_DIR}/src/brotli + CONFIGURE_COMMAND ./configure-cmake --disable-debug + INSTALL_COMMAND "" + BUILD_COMMAND $(MAKE) + BUILD_IN_SOURCE 1 + INSTALL_COMMAND "") + +set(brotli_libs enc dec common) +file(MAKE_DIRECTORY "${CMAKE_BINARY_DIR}/src/brotli/c/include") +foreach(lib ${brotli_libs}) + add_library(brotli::${lib} STATIC IMPORTED) + add_dependencies(brotli::${lib} brotli_ext) + set_target_properties(brotli::${lib} PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_BINARY_DIR}/src/brotli/c/include" + IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/src/brotli/libbrotli${lib}-static.a") + list(APPEND BROTLI_LIBRARIES brotli::${lib}) +endforeach() + +add_library(ceph_brotli SHARED ${brotli_sources}) +list(REVERSE brotli_libs) +target_link_libraries(ceph_brotli PRIVATE ${BROTLI_LIBRARIES}) +install(TARGETS ceph_brotli DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/brotli/CompressionPluginBrotli.cc b/src/compressor/brotli/CompressionPluginBrotli.cc new file mode 100644 index 000000000..245f49dbb --- /dev/null +++ b/src/compressor/brotli/CompressionPluginBrotli.cc @@ -0,0 +1,19 @@ +#include "acconfig.h" +#include "ceph_ver.h" +#include "CompressionPluginBrotli.h" +#include "common/ceph_context.h" + + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + return instance->add(type, name, new CompressionPluginBrotli(cct)); +} + diff --git a/src/compressor/brotli/CompressionPluginBrotli.h b/src/compressor/brotli/CompressionPluginBrotli.h new file mode 100644 index 000000000..641a6e1c9 --- /dev/null +++ b/src/compressor/brotli/CompressionPluginBrotli.h @@ -0,0 +1,36 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_BROTLI_H +#define CEPH_COMPRESSION_PLUGIN_BROTLI_H + +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "BrotliCompressor.h" + +class CompressionPluginBrotli : public CompressionPlugin { +public: + explicit CompressionPluginBrotli(CephContext *cct) : CompressionPlugin(cct) + {} + + virtual int factory(CompressorRef *cs, std::ostream *ss) + { + if (compressor == nullptr) { + BrotliCompressor *interface = new BrotliCompressor(); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/lz4/CMakeLists.txt b/src/compressor/lz4/CMakeLists.txt new file mode 100644 index 000000000..ff8e14c29 --- /dev/null +++ b/src/compressor/lz4/CMakeLists.txt @@ -0,0 +1,14 @@ +# lz4 + +set(lz4_sources + CompressionPluginLZ4.cc +) + +add_library(ceph_lz4 SHARED ${lz4_sources}) +target_link_libraries(ceph_lz4 + PRIVATE LZ4::LZ4 compressor $<$<PLATFORM_ID:Windows>:ceph-common>) +set_target_properties(ceph_lz4 PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_lz4 DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/lz4/CompressionPluginLZ4.cc b/src/compressor/lz4/CompressionPluginLZ4.cc new file mode 100644 index 000000000..0d05500e4 --- /dev/null +++ b/src/compressor/lz4/CompressionPluginLZ4.cc @@ -0,0 +1,36 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 XSKY Inc. + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginLZ4.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + auto instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginLZ4(cct)); +} diff --git a/src/compressor/lz4/CompressionPluginLZ4.h b/src/compressor/lz4/CompressionPluginLZ4.h new file mode 100644 index 000000000..38e5b9cc5 --- /dev/null +++ b/src/compressor/lz4/CompressionPluginLZ4.h @@ -0,0 +1,41 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 XSKY Inc. + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_LZ4_H +#define CEPH_COMPRESSION_PLUGIN_LZ4_H + +// ----------------------------------------------------------------------------- +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "LZ4Compressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginLZ4 : public ceph::CompressionPlugin { + +public: + + explicit CompressionPluginLZ4(CephContext* cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, std::ostream *ss) override { + if (compressor == 0) { + LZ4Compressor *interface = new LZ4Compressor(cct); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/lz4/LZ4Compressor.h b/src/compressor/lz4/LZ4Compressor.h new file mode 100644 index 000000000..eca08e1a5 --- /dev/null +++ b/src/compressor/lz4/LZ4Compressor.h @@ -0,0 +1,147 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LZ4COMPRESSOR_H +#define CEPH_LZ4COMPRESSOR_H + +#include <optional> +#include <lz4.h> + +#include "compressor/Compressor.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include "common/config.h" + + +class LZ4Compressor : public Compressor { + public: + LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") { +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) + qat_enabled = true; + else + qat_enabled = false; +#endif + } + + int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override { + // older versions of liblz4 introduce bit errors when compressing + // fragmented buffers. this was fixed in lz4 commit + // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first + // appeared in v1.8.2. + // + // workaround: rebuild if not contiguous. + if (!src.is_contiguous()) { + ceph::buffer::list new_src = src; + new_src.rebuild(); + return compress(new_src, dst, compressor_message); + } + +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(src, dst, compressor_message); +#endif + ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned( + LZ4_compressBound(src.length())); + LZ4_stream_t lz4_stream; + LZ4_resetStream(&lz4_stream); + + using ceph::encode; + + auto p = src.begin(); + size_t left = src.length(); + int pos = 0; + const char *data; + unsigned num = src.get_num_buffers(); + encode((uint32_t)num, dst); + while (left) { + uint32_t origin_len = p.get_ptr_and_advance(left, &data); + int compressed_len = LZ4_compress_fast_continue( + &lz4_stream, data, outptr.c_str()+pos, origin_len, + outptr.length()-pos, 1); + if (compressed_len <= 0) + return -1; + pos += compressed_len; + left -= origin_len; + encode(origin_len, dst); + encode((uint32_t)compressed_len, dst); + } + ceph_assert(p.end()); + + dst.append(outptr, 0, pos); + return 0; + } + + int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(src, dst, compressor_message); +#endif + auto i = std::cbegin(src); + return decompress(i, src.length(), dst, compressor_message); + } + + int decompress(ceph::buffer::list::const_iterator &p, + size_t compressed_len, + ceph::buffer::list &dst, + std::optional<int32_t> compressor_message) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_len, dst, compressor_message); +#endif + using ceph::decode; + uint32_t count; + decode(count, p); + std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count); + uint32_t total_origin = 0; + for (auto& [dst_size, src_size] : compressed_pairs) { + decode(dst_size, p); + decode(src_size, p); + total_origin += dst_size; + } + compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); + + ceph::buffer::ptr dstptr(total_origin); + LZ4_streamDecode_t lz4_stream_decode; + LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); + + ceph::buffer::ptr cur_ptr = p.get_current_ptr(); + ceph::buffer::ptr *ptr = &cur_ptr; + std::optional<ceph::buffer::ptr> data_holder; + if (compressed_len != cur_ptr.length()) { + data_holder.emplace(compressed_len); + p.copy_deep(compressed_len, *data_holder); + ptr = &*data_holder; + } + + char *c_in = ptr->c_str(); + char *c_out = dstptr.c_str(); + for (unsigned i = 0; i < count; ++i) { + int r = LZ4_decompress_safe_continue( + &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); + if (r == (int)compressed_pairs[i].first) { + c_in += compressed_pairs[i].second; + c_out += compressed_pairs[i].first; + } else if (r < 0) { + return -1; + } else { + return -2; + } + } + dst.push_back(std::move(dstptr)); + return 0; + } +}; + +#endif diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt new file mode 100644 index 000000000..d1ba3b2e7 --- /dev/null +++ b/src/compressor/snappy/CMakeLists.txt @@ -0,0 +1,14 @@ +# snappy + +set(snappy_sources + CompressionPluginSnappy.cc +) + +add_library(ceph_snappy SHARED ${snappy_sources}) +target_link_libraries(ceph_snappy + PRIVATE snappy::snappy compressor $<$<PLATFORM_ID:Windows>:ceph-common>) +set_target_properties(ceph_snappy PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_snappy DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc new file mode 100644 index 000000000..85b6cf94a --- /dev/null +++ b/src/compressor/snappy/CompressionPluginSnappy.cc @@ -0,0 +1,38 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + + +// ----------------------------------------------------------------------------- +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginSnappy.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + auto instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginSnappy(cct)); +} diff --git a/src/compressor/snappy/CompressionPluginSnappy.h b/src/compressor/snappy/CompressionPluginSnappy.h new file mode 100644 index 000000000..a9bc98f73 --- /dev/null +++ b/src/compressor/snappy/CompressionPluginSnappy.h @@ -0,0 +1,42 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_SNAPPY_H +#define CEPH_COMPRESSION_PLUGIN_SNAPPY_H + +// ----------------------------------------------------------------------------- +#include "compressor/CompressionPlugin.h" +#include "SnappyCompressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginSnappy : public ceph::CompressionPlugin { + +public: + + explicit CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, + std::ostream *ss) override + { + if (compressor == 0) { + SnappyCompressor *interface = new SnappyCompressor(cct); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h new file mode 100644 index 000000000..8150f783c --- /dev/null +++ b/src/compressor/snappy/SnappyCompressor.h @@ -0,0 +1,116 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SNAPPYCOMPRESSOR_H +#define CEPH_SNAPPYCOMPRESSOR_H + +#include <snappy.h> +#include <snappy-sinksource.h> +#include "common/config.h" +#include "compressor/Compressor.h" +#include "include/buffer.h" + +class CEPH_BUFFER_API BufferlistSource : public snappy::Source { + ceph::bufferlist::const_iterator pb; + size_t remaining; + + public: + explicit BufferlistSource(ceph::bufferlist::const_iterator _pb, size_t _input_len) + : pb(_pb), + remaining(_input_len) { + remaining = std::min(remaining, (size_t)pb.get_remaining()); + } + size_t Available() const override { + return remaining; + } + const char *Peek(size_t *len) override { + const char *data = NULL; + *len = 0; + size_t avail = Available(); + if (avail) { + auto ptmp = pb; + *len = ptmp.get_ptr_and_advance(avail, &data); + } + return data; + } + void Skip(size_t n) override { + ceph_assert(n <= remaining); + pb += n; + remaining -= n; + } + + ceph::bufferlist::const_iterator get_pos() const { + return pb; + } +}; + +class SnappyCompressor : public Compressor { + public: + SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") { +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy")) + qat_enabled = true; + else + qat_enabled = false; +#endif + } + + int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional<int32_t> &compressor_message) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(src, dst, compressor_message); +#endif + BufferlistSource source(const_cast<ceph::bufferlist&>(src).begin(), src.length()); + ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned( + snappy::MaxCompressedLength(src.length())); + snappy::UncheckedByteArraySink sink(ptr.c_str()); + snappy::Compress(&source, &sink); + dst.append(ptr, 0, sink.CurrentDestination() - ptr.c_str()); + return 0; + } + + int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional<int32_t> compressor_message) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(src, dst, compressor_message); +#endif + auto i = src.begin(); + return decompress(i, src.length(), dst, compressor_message); + } + + int decompress(ceph::bufferlist::const_iterator &p, + size_t compressed_len, + ceph::bufferlist &dst, + std::optional<int32_t> compressor_message) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_len, dst, compressor_message); +#endif + BufferlistSource source_1(p, compressed_len); + uint32_t res_len = 0; + if (!snappy::GetUncompressedLength(&source_1, &res_len)) { + return -1; + } + BufferlistSource source_2(p, compressed_len); + ceph::bufferptr ptr(res_len); + if (snappy::RawUncompress(&source_2, ptr.c_str())) { + p = source_2.get_pos(); + dst.append(ptr); + return 0; + } + return -2; + } +}; + +#endif diff --git a/src/compressor/zlib/CMakeLists.txt b/src/compressor/zlib/CMakeLists.txt new file mode 100644 index 000000000..050ff03fa --- /dev/null +++ b/src/compressor/zlib/CMakeLists.txt @@ -0,0 +1,99 @@ +# zlib + +if(HAVE_INTEL_SSE4_1 AND HAVE_NASM_X64_AVX2 AND (NOT APPLE)) + set(CMAKE_ASM_FLAGS "-i ${PROJECT_SOURCE_DIR}/src/isa-l/igzip/ -i ${PROJECT_SOURCE_DIR}/src/isa-l/include/ ${CMAKE_ASM_FLAGS}") + set(zlib_sources + CompressionPluginZlib.cc + ZlibCompressor.cc + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/hufftables_c.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/flatten_ll.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/huff_codes.c + ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base_aliases.c + ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc64_base.c + ) + list(APPEND zlib_sources + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_body.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_finish.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body_h1_gr_bt.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_finish.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/rfc1951_lookup.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_sse.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_avx2_4.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_multibinary.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_01.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_decode_block_stateless_01.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_decode_block_stateless_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate_multibinary.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_06.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/proc_heap.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_deflate_hash.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_gen_icf_map_lh1_06.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_gen_icf_map_lh1_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_set_long_icf_fg_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_set_long_icf_fg_06.asm + ) +elseif(HAVE_ARMV8_SIMD) + set(zlib_asm_sources + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_inflate_multibinary_arm64.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_multibinary_arm64.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_deflate_body_aarch64.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_deflate_finish_aarch64.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/isal_deflate_icf_body_hash_hist.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/isal_deflate_icf_finish_hash_hist.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_set_long_icf_fg.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/isal_update_histogram.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_deflate_hash_aarch64.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_decode_huffman_code_block_aarch64.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_isal_adler32_neon.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/encode_df.S + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/gen_icf_map.S + ) + set(zlib_sources + CompressionPluginZlib.cc + ZlibCompressor.cc + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/hufftables_c.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/adler32_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/flatten_ll.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_inflate.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/huff_codes.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/proc_heap_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/aarch64/igzip_multibinary_aarch64_dispatcher.c + ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base_aliases.c + ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/crc/crc64_base.c + ${zlib_asm_sources} + ) + set_source_files_properties(${zlib_asm_sources} PROPERTIES + COMPILE_DEFINITIONS "__ASSEMBLY__" + INCLUDE_DIRECTORIES "${PROJECT_SOURCE_DIR}/src/isa-l/igzip;${PROJECT_SOURCE_DIR}/src/isa-l/igzip/aarch64" + ) +else() + set(zlib_sources + CompressionPluginZlib.cc + ZlibCompressor.cc + ) +endif() + +add_library(ceph_zlib SHARED ${zlib_sources}) +target_link_libraries(ceph_zlib ZLIB::ZLIB compressor $<$<PLATFORM_ID:Windows>:ceph-common>) +target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include") +set_target_properties(ceph_zlib PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_zlib DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/zlib/CompressionPluginZlib.cc b/src/compressor/zlib/CompressionPluginZlib.cc new file mode 100644 index 000000000..c0a53fa77 --- /dev/null +++ b/src/compressor/zlib/CompressionPluginZlib.cc @@ -0,0 +1,38 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + + +// ----------------------------------------------------------------------------- +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginZlib.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + auto instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginZlib(cct)); +} diff --git a/src/compressor/zlib/CompressionPluginZlib.h b/src/compressor/zlib/CompressionPluginZlib.h new file mode 100644 index 000000000..597bc02a5 --- /dev/null +++ b/src/compressor/zlib/CompressionPluginZlib.h @@ -0,0 +1,60 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_ZLIB_H +#define CEPH_COMPRESSION_PLUGIN_ZLIB_H + +// ----------------------------------------------------------------------------- +#include "arch/probe.h" +#include "arch/intel.h" +#include "arch/arm.h" +#include "common/ceph_context.h" +#include "compressor/CompressionPlugin.h" +#include "ZlibCompressor.h" + +// ----------------------------------------------------------------------------- + +class CompressionPluginZlib : public ceph::CompressionPlugin { +public: + bool has_isal = false; + + explicit CompressionPluginZlib(CephContext *cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, + std::ostream *ss) override + { + bool isal = false; +#if defined(__i386__) || defined(__x86_64__) + // other arches or lack of support result in isal = false + if (cct->_conf->compressor_zlib_isal) { + ceph_arch_probe(); + isal = (ceph_arch_intel_pclmul && ceph_arch_intel_sse41); + } +#elif defined(__aarch64__) + if (cct->_conf->compressor_zlib_isal) { + ceph_arch_probe(); + isal = (ceph_arch_aarch64_pmull && ceph_arch_neon); + } +#endif + if (compressor == 0 || has_isal != isal) { + compressor = std::make_shared<ZlibCompressor>(cct, isal); + has_isal = isal; + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc new file mode 100644 index 000000000..9795d79b3 --- /dev/null +++ b/src/compressor/zlib/ZlibCompressor.cc @@ -0,0 +1,252 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +// ----------------------------------------------------------------------------- +#include "common/debug.h" +#include "ZlibCompressor.h" +#include "osd/osd_types.h" +#include "isa-l/include/igzip_lib.h" +// ----------------------------------------------------------------------------- + +#include <zlib.h> + +// ----------------------------------------------------------------------------- +#define dout_context cct +#define dout_subsys ceph_subsys_compressor +#undef dout_prefix +#define dout_prefix _prefix(_dout) +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- + +using std::ostream; + +using ceph::bufferlist; +using ceph::bufferptr; + +static ostream& +_prefix(std::ostream* _dout) +{ + return *_dout << "ZlibCompressor: "; +} +// ----------------------------------------------------------------------------- + +#define MAX_LEN (CEPH_PAGE_SIZE) + +// default window size for Zlib 1.2.8, negated for raw deflate +#define ZLIB_DEFAULT_WIN_SIZE -15 + +// desired memory usage level. increasing to 9 doesn't speed things up +// significantly (helps only on >=16K blocks) and sometimes degrades +// compression ratio. +#define ZLIB_MEMORY_LEVEL 8 + +int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message) +{ + int ret; + unsigned have; + z_stream strm; + unsigned char* c_in; + int begin = 1; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, cct->_conf->compressor_zlib_winsize, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY); + if (ret != Z_OK) { + dout(1) << "Compression init error: init return " + << ret << " instead of Z_OK" << dendl; + return -1; + } + compressor_message = cct->_conf->compressor_zlib_winsize; + + for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin(); + i != in.buffers().end();) { + + c_in = (unsigned char*) (*i).c_str(); + long unsigned int len = (*i).length(); + ++i; + + strm.avail_in = len; + int flush = i != in.buffers().end() ? Z_NO_FLUSH : Z_FINISH; + + strm.next_in = c_in; + do { + bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN); + strm.next_out = (unsigned char*)ptr.c_str() + begin; + strm.avail_out = MAX_LEN - begin; + if (begin) { + // put a compressor variation mark in front of compressed stream, not used at the moment + ptr.c_str()[0] = 0; + begin = 0; + } + ret = deflate(&strm, flush); /* no bad return value */ + if (ret == Z_STREAM_ERROR) { + dout(1) << "Compression error: compress return Z_STREAM_ERROR(" + << ret << ")" << dendl; + deflateEnd(&strm); + return -1; + } + have = MAX_LEN - strm.avail_out; + out.append(ptr, 0, have); + } while (strm.avail_out == 0); + if (strm.avail_in != 0) { + dout(10) << "Compression error: unused input" << dendl; + deflateEnd(&strm); + return -1; + } + } + + deflateEnd(&strm); + return 0; +} + +#if (__x86_64__ && defined(HAVE_NASM_X64_AVX2)) || defined(__aarch64__) +int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message) +{ + int ret; + unsigned have; + isal_zstream strm; + unsigned char* c_in; + int begin = 1; + + /* allocate deflate state */ + isal_deflate_init(&strm); + strm.end_of_stream = 0; + compressor_message = ZLIB_DEFAULT_WIN_SIZE; + + for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin(); + i != in.buffers().end();) { + + c_in = (unsigned char*) (*i).c_str(); + long unsigned int len = (*i).length(); + ++i; + + strm.avail_in = len; + strm.end_of_stream = (i == in.buffers().end()); + strm.flush = FINISH_FLUSH; + + strm.next_in = c_in; + + do { + bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN); + strm.next_out = (unsigned char*)ptr.c_str() + begin; + strm.avail_out = MAX_LEN - begin; + if (begin) { + // put a compressor variation mark in front of compressed stream, not used at the moment + ptr.c_str()[0] = 1; + begin = 0; + } + ret = isal_deflate(&strm); + if (ret != COMP_OK) { + dout(1) << "Compression error: isal_deflate return error (" + << ret << ")" << dendl; + return -1; + } + have = MAX_LEN - strm.avail_out; + out.append(ptr, 0, have); + } while (strm.avail_out == 0); + if (strm.avail_in != 0) { + dout(10) << "Compression error: unused input" << dendl; + return -1; + } + } + + return 0; +} +#endif + +int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(in, out, compressor_message); +#endif +#if (__x86_64__ && defined(HAVE_NASM_X64_AVX2)) || defined(__aarch64__) + if (isal_enabled) + return isal_compress(in, out, compressor_message); + else + return zlib_compress(in, out, compressor_message); +#else + return zlib_compress(in, out, compressor_message); +#endif +} + +int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message) +{ +#ifdef HAVE_QATZIP + // QAT can only decompress with the default window size + if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE)) + return qat_accel.decompress(p, compressed_size, out, compressor_message); +#endif + + int ret; + unsigned have; + z_stream strm; + const char* c_in; + int begin = 1; + + /* allocate inflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = 0; + strm.next_in = Z_NULL; + + // choose the variation of compressor + if (!compressor_message) + compressor_message = ZLIB_DEFAULT_WIN_SIZE; + ret = inflateInit2(&strm, *compressor_message); + if (ret != Z_OK) { + dout(1) << "Decompression init error: init return " + << ret << " instead of Z_OK" << dendl; + return -1; + } + + size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size); + + while(remaining) { + long unsigned int len = p.get_ptr_and_advance(remaining, &c_in); + remaining -= len; + strm.avail_in = len - begin; + strm.next_in = (unsigned char*)c_in + begin; + begin = 0; + + do { + strm.avail_out = MAX_LEN; + bufferptr ptr = ceph::buffer::create_page_aligned(MAX_LEN); + strm.next_out = (unsigned char*)ptr.c_str(); + ret = inflate(&strm, Z_NO_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) { + dout(1) << "Decompression error: decompress return " + << ret << dendl; + inflateEnd(&strm); + return -1; + } + have = MAX_LEN - strm.avail_out; + out.append(ptr, 0, have); + } while (strm.avail_out == 0); + } + + /* clean up and return */ + (void)inflateEnd(&strm); + return 0; +} + +int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out, std::optional<int32_t> compressor_message) +{ + auto i = std::cbegin(in); + return decompress(i, in.length(), out, compressor_message); +} diff --git a/src/compressor/zlib/ZlibCompressor.h b/src/compressor/zlib/ZlibCompressor.h new file mode 100644 index 000000000..da1c8117e --- /dev/null +++ b/src/compressor/zlib/ZlibCompressor.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_ZLIB_H +#define CEPH_COMPRESSION_ZLIB_H + +#include "common/config.h" +#include "compressor/Compressor.h" + +class ZlibCompressor : public Compressor { + bool isal_enabled; + CephContext *const cct; +public: + ZlibCompressor(CephContext *cct, bool isal) + : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) { +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib")) + qat_enabled = true; + else + qat_enabled = false; +#endif + } + + int compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message) override; + int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> compressor_message) override; + int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out, std::optional<int32_t> compressor_message) override; +private: + int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message); + int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message); + }; + + +#endif diff --git a/src/compressor/zstd/CMakeLists.txt b/src/compressor/zstd/CMakeLists.txt new file mode 100644 index 000000000..5a522840a --- /dev/null +++ b/src/compressor/zstd/CMakeLists.txt @@ -0,0 +1,21 @@ +# zstd + +option(WITH_SYSTEM_ZSTD "use prebuilt libzstd in system" OFF) + +if(WITH_SYSTEM_ZSTD) + find_package(Zstd 1.4.4 REQUIRED) +else() + include(BuildZstd) + build_Zstd() +endif() + +set(zstd_sources + CompressionPluginZstd.cc) + +add_library(ceph_zstd SHARED ${zstd_sources}) +target_link_libraries(ceph_zstd PRIVATE Zstd::Zstd $<$<PLATFORM_ID:Windows>:ceph-common>) +set_target_properties(ceph_zstd PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_zstd DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/zstd/CompressionPluginZstd.cc b/src/compressor/zstd/CompressionPluginZstd.cc new file mode 100644 index 000000000..92aee7343 --- /dev/null +++ b/src/compressor/zstd/CompressionPluginZstd.cc @@ -0,0 +1,36 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginZstd.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + auto instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginZstd(cct)); +} diff --git a/src/compressor/zstd/CompressionPluginZstd.h b/src/compressor/zstd/CompressionPluginZstd.h new file mode 100644 index 000000000..632e7c6dc --- /dev/null +++ b/src/compressor/zstd/CompressionPluginZstd.h @@ -0,0 +1,43 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_ZSTD_H +#define CEPH_COMPRESSION_PLUGIN_ZSTD_H + +// ----------------------------------------------------------------------------- +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "ZstdCompressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginZstd : public ceph::CompressionPlugin { + +public: + + explicit CompressionPluginZstd(CephContext* cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, + std::ostream *ss) override + { + if (compressor == 0) { + ZstdCompressor *interface = new ZstdCompressor(cct); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/zstd/ZstdCompressor.h b/src/compressor/zstd/ZstdCompressor.h new file mode 100644 index 000000000..859bd6b57 --- /dev/null +++ b/src/compressor/zstd/ZstdCompressor.h @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ZSTDCOMPRESSOR_H +#define CEPH_ZSTDCOMPRESSOR_H + +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd/lib/zstd.h" + +#include "include/buffer.h" +#include "include/encoding.h" +#include "compressor/Compressor.h" + +class ZstdCompressor : public Compressor { + public: + ZstdCompressor(CephContext *cct) : Compressor(COMP_ALG_ZSTD, "zstd"), cct(cct) {} + + int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override { + ZSTD_CStream *s = ZSTD_createCStream(); + ZSTD_initCStream_srcSize(s, cct->_conf->compressor_zstd_level, src.length()); + auto p = src.begin(); + size_t left = src.length(); + + size_t const out_max = ZSTD_compressBound(left); + ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(out_max); + ZSTD_outBuffer_s outbuf; + outbuf.dst = outptr.c_str(); + outbuf.size = outptr.length(); + outbuf.pos = 0; + + while (left) { + ceph_assert(!p.end()); + struct ZSTD_inBuffer_s inbuf; + inbuf.pos = 0; + inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src); + left -= inbuf.size; + ZSTD_EndDirective const zed = (left==0) ? ZSTD_e_end : ZSTD_e_continue; + size_t r = ZSTD_compressStream2(s, &outbuf, &inbuf, zed); + if (ZSTD_isError(r)) { + return -EINVAL; + } + } + ceph_assert(p.end()); + + ZSTD_freeCStream(s); + + // prefix with decompressed length + ceph::encode((uint32_t)src.length(), dst); + dst.append(outptr, 0, outbuf.pos); + return 0; + } + + int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override { + auto i = std::cbegin(src); + return decompress(i, src.length(), dst, compressor_message); + } + + int decompress(ceph::buffer::list::const_iterator &p, + size_t compressed_len, + ceph::buffer::list &dst, + std::optional<int32_t> compressor_message) override { + if (compressed_len < 4) { + return -1; + } + compressed_len -= 4; + uint32_t dst_len; + ceph::decode(dst_len, p); + + ceph::buffer::ptr dstptr(dst_len); + ZSTD_outBuffer_s outbuf; + outbuf.dst = dstptr.c_str(); + outbuf.size = dstptr.length(); + outbuf.pos = 0; + ZSTD_DStream *s = ZSTD_createDStream(); + ZSTD_initDStream(s); + while (compressed_len > 0) { + if (p.end()) { + return -1; + } + ZSTD_inBuffer_s inbuf; + inbuf.pos = 0; + inbuf.size = p.get_ptr_and_advance(compressed_len, + (const char**)&inbuf.src); + ZSTD_decompressStream(s, &outbuf, &inbuf); + compressed_len -= inbuf.size; + } + ZSTD_freeDStream(s); + + dst.append(dstptr, 0, outbuf.pos); + return 0; + } + private: + CephContext *const cct; +}; + +#endif |