diff options
Diffstat (limited to 'src/compressor')
28 files changed, 1789 insertions, 0 deletions
diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt new file mode 100644 index 00000000..494bcc44 --- /dev/null +++ b/src/compressor/CMakeLists.txt @@ -0,0 +1,39 @@ + +set(compressor_srcs + Compressor.cc) +if (HAVE_QATZIP) + list(APPEND compressor_srcs QatAccel.cc) +endif() +add_library(compressor_objs OBJECT ${compressor_srcs}) + +## compressor plugins + +set(compressor_plugin_dir ${CEPH_INSTALL_PKGLIBDIR}/compressor) + +add_subdirectory(snappy) +add_subdirectory(zlib) +add_subdirectory(zstd) + +if(HAVE_LZ4) + add_subdirectory(lz4) +endif() + +if(HAVE_BROTLI) + add_subdirectory(brotli) +endif() + +set(ceph_compressor_libs + ceph_snappy + ceph_zlib + ceph_zstd) + +if(HAVE_LZ4) + list(APPEND ceph_compressor_libs ceph_lz4) +endif() + +if(HAVE_BROTLI) + list(APPEND ceph_compressor_libs ceph_brotli) +endif() + +add_custom_target(compressor_plugins DEPENDS + ${ceph_compressor_libs}) diff --git a/src/compressor/CompressionPlugin.h b/src/compressor/CompressionPlugin.h new file mode 100644 index 00000000..5e0ed777 --- /dev/null +++ b/src/compressor/CompressionPlugin.h @@ -0,0 +1,47 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph distributed storage system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef COMPRESSION_PLUGIN_H +#define COMPRESSION_PLUGIN_H + +#include <iosfwd> +#include <iostream> + +#include "common/PluginRegistry.h" +#include "Compressor.h" + +namespace ceph { + + class CompressionPlugin : public Plugin { + public: + CompressorRef compressor; + + explicit CompressionPlugin(CephContext *cct) : Plugin(cct), + compressor(0) + {} + + ~CompressionPlugin() override {} + + virtual int factory(CompressorRef *cs, + std::ostream *ss) = 0; + + virtual const char* name() {return "CompressionPlugin";} + }; + +} + +#endif diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc new file mode 100644 index 00000000..3e545d48 --- /dev/null +++ b/src/compressor/Compressor.cc @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <random> +#include <sstream> +#include <iterator> +#include <algorithm> + +#include "CompressionPlugin.h" +#include "Compressor.h" +#include "include/random.h" +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/dout.h" + +const char* Compressor::get_comp_alg_name(int a) { + + auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), + [a](const auto& kv) { return kv.second == a; }); + + if (std::cend(compression_algorithms) == p) + return "???"; // It would be nice to revise this... + + return p->first; +} + +boost::optional<Compressor::CompressionAlgorithm> Compressor::get_comp_alg_type(const std::string &s) { + + auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), + [&s](const auto& kv) { return kv.first == s; }); + if (std::cend(compression_algorithms) == p) + return {}; + + return p->second; +} + +const char *Compressor::get_comp_mode_name(int m) { + switch (m) { + case COMP_NONE: return "none"; + case COMP_PASSIVE: return "passive"; + case COMP_AGGRESSIVE: return "aggressive"; + case COMP_FORCE: return "force"; + default: return "???"; + } +} +boost::optional<Compressor::CompressionMode> Compressor::get_comp_mode_type(const std::string &s) { + if (s == "force") + return COMP_FORCE; + if (s == "aggressive") + return COMP_AGGRESSIVE; + if (s == "passive") + return COMP_PASSIVE; + if (s == "none") + return COMP_NONE; + return boost::optional<CompressionMode>(); +} + +CompressorRef Compressor::create(CephContext *cct, const std::string &type) +{ + // support "random" for teuthology testing + if (type == "random") { + int alg = ceph::util::generate_random_number(0, COMP_ALG_LAST - 1); + if (alg == COMP_ALG_NONE) { + return nullptr; + } + return create(cct, alg); + } + + CompressorRef cs_impl = NULL; + std::stringstream ss; + PluginRegistry *reg = cct->get_plugin_registry(); + CompressionPlugin *factory = dynamic_cast<CompressionPlugin*>(reg->get_with_load("compressor", type)); + if (factory == NULL) { + lderr(cct) << __func__ << " cannot load compressor of type " << type << dendl; + return NULL; + } + int err = factory->factory(&cs_impl, &ss); + if (err) + lderr(cct) << __func__ << " factory return error " << err << dendl; + return cs_impl; +} + +CompressorRef Compressor::create(CephContext *cct, int alg) +{ + if (alg < 0 || alg >= COMP_ALG_LAST) { + lderr(cct) << __func__ << " invalid algorithm value:" << alg << dendl; + return CompressorRef(); + } + std::string type_name = get_comp_alg_name(alg); + return create(cct, type_name); +} diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h new file mode 100644 index 00000000..a696521a --- /dev/null +++ b/src/compressor/Compressor.h @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMPRESSOR_H +#define CEPH_COMPRESSOR_H + +#include <memory> +#include <string> +#include <string_view> +#include <boost/optional.hpp> +#include "include/ceph_assert.h" // boost clobbers this +#include "include/buffer.h" +#include "include/int_types.h" +#ifdef HAVE_QATZIP + #include "QatAccel.h" +#endif + +class Compressor; +typedef std::shared_ptr<Compressor> CompressorRef; +class CephContext; + +class Compressor { +public: + enum CompressionAlgorithm { + COMP_ALG_NONE = 0, + COMP_ALG_SNAPPY = 1, + COMP_ALG_ZLIB = 2, + COMP_ALG_ZSTD = 3, +#ifdef HAVE_LZ4 + COMP_ALG_LZ4 = 4, +#endif +#ifdef HAVE_BROTLI + COMP_ALG_BROTLI = 5, +#endif + COMP_ALG_LAST //the last value for range checks + }; + + using pair_type = std::pair<const char*, CompressionAlgorithm>; + static constexpr std::initializer_list<pair_type> compression_algorithms { + { "none", COMP_ALG_NONE }, + { "snappy", COMP_ALG_SNAPPY }, + { "zlib", COMP_ALG_ZLIB }, + { "zstd", COMP_ALG_ZSTD }, +#ifdef HAVE_LZ4 + { "lz4", COMP_ALG_LZ4 }, +#endif +#ifdef HAVE_BROTLI + { "brotli", COMP_ALG_BROTLI }, +#endif + }; + + // compression options + enum CompressionMode { + COMP_NONE, ///< compress never + COMP_PASSIVE, ///< compress if hinted COMPRESSIBLE + COMP_AGGRESSIVE, ///< compress unless hinted INCOMPRESSIBLE + COMP_FORCE ///< compress always + }; + +#ifdef HAVE_QATZIP + bool qat_enabled; + QatAccel qat_accel; +#endif + + static const char* get_comp_alg_name(int a); + static boost::optional<CompressionAlgorithm> get_comp_alg_type(const std::string &s); + + static const char *get_comp_mode_name(int m); + static boost::optional<CompressionMode> get_comp_mode_type(const std::string &s); + + Compressor(CompressionAlgorithm a, const char* t) : alg(a), type(t) { + } + virtual ~Compressor() {} + const std::string& get_type_name() const { + return type; + } + CompressionAlgorithm get_type() const { + return alg; + } + virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out) = 0; + virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out) = 0; + // this is a bit weird but we need non-const iterator to be in + // alignment with decode methods + virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out) = 0; + + static CompressorRef create(CephContext *cct, const std::string &type); + static CompressorRef create(CephContext *cct, int alg); + +protected: + CompressionAlgorithm alg; + std::string type; + +}; + +#endif diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc new file mode 100644 index 00000000..7836243b --- /dev/null +++ b/src/compressor/QatAccel.cc @@ -0,0 +1,141 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Intel Corporation + * + * Author: Qiaowei Ren <qiaowei.ren@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "QatAccel.h" + +/* Estimate data expansion after decompression */ +static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200}; + +QatAccel::~QatAccel() { + if (NULL != session.internal) { + qzTeardownSession(&session); + qzClose(&session); + } +} + +bool QatAccel::init(const std::string &alg) { + QzSessionParams_T params = {(QzHuffmanHdr_T)0,}; + int rc; + + rc = qzGetDefaults(¶ms); + if (rc != QZ_OK) + return false; + params.direction = QZ_DIR_BOTH; + if (alg == "snappy") + params.comp_algorithm = QZ_SNAPPY; + else if (alg == "zlib") + params.comp_algorithm = QZ_DEFLATE; + else if (alg == "lz4") + params.comp_algorithm = QZ_LZ4; + else + return false; + + rc = qzSetDefaults(¶ms); + if (rc != QZ_OK) + return false; + + rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT); + if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW) + return false; + + rc = qzSetupSession(&session, ¶ms); + if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) { + qzTeardownSession(&session); + qzClose(&session); + return false; + } + + return true; +} + +int QatAccel::compress(const bufferlist &in, bufferlist &out) { + for (auto &i : in.buffers()) { + const unsigned char* c_in = (unsigned char*) i.c_str(); + unsigned int len = i.length(); + unsigned int out_len = qzMaxCompressedLength(len); + + bufferptr ptr = buffer::create_small_page_aligned(out_len); + int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1); + if (rc != QZ_OK) + return -1; + out.append(ptr, 0, out_len); + } + + return 0; +} + +int QatAccel::decompress(const bufferlist &in, bufferlist &out) { + auto i = in.begin(); + return decompress(i, in.length(), out); +} + +int QatAccel::decompress(bufferlist::const_iterator &p, + size_t compressed_len, + bufferlist &dst) { + unsigned int ratio_idx = 0; + bool read_more = false; + bool joint = false; + int rc = 0; + bufferlist tmp; + size_t remaining = MIN(p.get_remaining(), compressed_len); + + while (remaining) { + if (p.end()) { + return -1; + } + + bufferptr cur_ptr = p.get_current_ptr(); + unsigned int len = cur_ptr.length(); + if (joint) { + if (read_more) + tmp.append(cur_ptr.c_str(), len); + len = tmp.length(); + } + unsigned int out_len = len * expansion_ratio[ratio_idx]; + bufferptr ptr = buffer::create_small_page_aligned(out_len); + + if (joint) + rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len); + else + rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len); + if (rc == QZ_DATA_ERROR) { + if (!joint) { + tmp.append(cur_ptr.c_str(), cur_ptr.length()); + p.advance(remaining); + joint = true; + } + read_more = true; + continue; + } else if (rc == QZ_BUF_ERROR) { + if (ratio_idx == std::size(expansion_ratio)) + return -1; + if (joint) + read_more = false; + ratio_idx++; + continue; + } else if (rc != QZ_OK) { + return -1; + } else { + ratio_idx = 0; + joint = false; + read_more = false; + } + + p.advance(remaining); + remaining -= len; + dst.append(ptr, 0, out_len); + } + + return 0; +} diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h new file mode 100644 index 00000000..295b180e --- /dev/null +++ b/src/compressor/QatAccel.h @@ -0,0 +1,35 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Intel Corporation + * + * Author: Qiaowei Ren <qiaowei.ren@intel.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_QATACCEL_H +#define CEPH_QATACCEL_H + +#include <qatzip.h> +#include "include/buffer.h" + +class QatAccel { + QzSession_T session; + + public: + QatAccel() : session({0}) {} + ~QatAccel(); + + bool init(const std::string &alg); + + int compress(const bufferlist &in, bufferlist &out); + int decompress(const bufferlist &in, bufferlist &out); + int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst); +}; + +#endif diff --git a/src/compressor/brotli/BrotliCompressor.cc b/src/compressor/brotli/BrotliCompressor.cc new file mode 100644 index 00000000..27685da3 --- /dev/null +++ b/src/compressor/brotli/BrotliCompressor.cc @@ -0,0 +1,95 @@ +#include "brotli/encode.h" +#include "brotli/decode.h" +#include "BrotliCompressor.h" +#include "include/scope_guard.h" + +#define MAX_LEN (CEPH_PAGE_SIZE) + +int BrotliCompressor::compress(const bufferlist &in, bufferlist &out) +{ + BrotliEncoderState* s = BrotliEncoderCreateInstance(nullptr, + nullptr, + nullptr); + if (!s) { + return -1; + } + auto sg = make_scope_guard([&s] { BrotliEncoderDestroyInstance(s); }); + BrotliEncoderSetParameter(s, BROTLI_PARAM_QUALITY, (uint32_t)9); + BrotliEncoderSetParameter(s, BROTLI_PARAM_LGWIN, 22); + for (auto i = in.buffers().begin(); i != in.buffers().end();) { + size_t available_in = i->length(); + size_t max_comp_size = BrotliEncoderMaxCompressedSize(available_in); + size_t available_out = max_comp_size; + bufferptr ptr = buffer::create_small_page_aligned(max_comp_size); + uint8_t* next_out = (uint8_t*)ptr.c_str(); + const uint8_t* next_in = (uint8_t*)i->c_str(); + ++i; + BrotliEncoderOperation finish = i != in.buffers().end() ? + BROTLI_OPERATION_PROCESS : + BROTLI_OPERATION_FINISH; + do { + if (!BrotliEncoderCompressStream(s, + finish, + &available_in, + &next_in, + &available_out, + &next_out, + nullptr)) { + return -1; + } + unsigned have = max_comp_size - available_out; + out.append(ptr, 0, have); + } while (available_out == 0); + if (BrotliEncoderIsFinished(s)) { + break; + } + } + return 0; +} + +int BrotliCompressor::decompress(bufferlist::const_iterator &p, + size_t compressed_size, + bufferlist &out) +{ + BrotliDecoderState* s = BrotliDecoderCreateInstance(nullptr, + nullptr, + nullptr); + if (!s) { + return -1; + } + auto sg = make_scope_guard([&s] { BrotliDecoderDestroyInstance(s); }); + size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size); + while (remaining) { + const uint8_t* next_in; + size_t len = p.get_ptr_and_advance(remaining, (const char**)&next_in); + remaining -= len; + size_t available_in = len; + BrotliDecoderResult result = BROTLI_DECODER_RESULT_ERROR; + do { + size_t available_out = MAX_LEN; + bufferptr ptr = buffer::create_page_aligned(MAX_LEN); + uint8_t* next_out = (uint8_t*)ptr.c_str(); + result = BrotliDecoderDecompressStream(s, + &available_in, + &next_in, + &available_out, + &next_out, + 0); + if (!result) { + return -1; + } + unsigned have = MAX_LEN - available_out; + out.append(ptr, 0, have); + } while (result == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT); + if (BrotliDecoderIsFinished(s)) { + break; + } + } + return 0; +} + +int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out) +{ + auto i = std::cbegin(in); + return decompress(i, in.length(), out); +} diff --git a/src/compressor/brotli/BrotliCompressor.h b/src/compressor/brotli/BrotliCompressor.h new file mode 100644 index 00000000..482fe5e2 --- /dev/null +++ b/src/compressor/brotli/BrotliCompressor.h @@ -0,0 +1,31 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BROTLICOMPRESSOR_H +#define CEPH_BROTLICOMPRESSOR_H + + +#include "include/buffer.h" +#include "compressor/Compressor.h" + +class BrotliCompressor : public Compressor +{ + public: + BrotliCompressor() : Compressor(COMP_ALG_BROTLI, "brotli") {} + + int compress(const bufferlist &in, bufferlist &out) override; + int decompress(const bufferlist &in, bufferlist &out) override; + int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out) override; +}; + +#endif //CEPH_BROTLICOMPRESSOR_H + diff --git a/src/compressor/brotli/CMakeLists.txt b/src/compressor/brotli/CMakeLists.txt new file mode 100644 index 00000000..f1992428 --- /dev/null +++ b/src/compressor/brotli/CMakeLists.txt @@ -0,0 +1,39 @@ +# brotli + +set(brotli_sources + CompressionPluginBrotli.cc + BrotliCompressor.cc +) +include(ExternalProject) +ExternalProject_Add(brotli_ext + DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/src/ + GIT_REPOSITORY "https://github.com/google/brotli.git" + GIT_TAG "v1.0.7" + SOURCE_DIR ${CMAKE_BINARY_DIR}/src/brotli + CONFIGURE_COMMAND ./configure-cmake --disable-debug + INSTALL_COMMAND "" + BUILD_COMMAND $(MAKE) + BUILD_IN_SOURCE 1 + INSTALL_COMMAND "") + +ExternalProject_Add_Step(brotli_ext forcebuild + DEPENDEES configure + DEPENDERS build + COMMAND "true" + ALWAYS 1) + +set(bortli_libs enc dec common) +file(MAKE_DIRECTORY "${CMAKE_BINARY_DIR}/src/brotli/c/include") +foreach(lib ${bortli_libs}) + add_library(brotli::${lib} STATIC IMPORTED) + add_dependencies(brotli::${lib} brotli_ext) + set_target_properties(brotli::${lib} PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_BINARY_DIR}/src/brotli/c/include" + IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/src/brotli/libbrotli${lib}-static.a") + list(APPEND BROTLI_LIBRARIES brotli::${lib}) +endforeach() + +add_library(ceph_brotli SHARED ${brotli_sources}) +list(REVERSE bortli_libs) +target_link_libraries(ceph_brotli PRIVATE ${BROTLI_LIBRARIES}) +install(TARGETS ceph_brotli DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/brotli/CompressionPluginBrotli.cc b/src/compressor/brotli/CompressionPluginBrotli.cc new file mode 100644 index 00000000..245f49db --- /dev/null +++ b/src/compressor/brotli/CompressionPluginBrotli.cc @@ -0,0 +1,19 @@ +#include "acconfig.h" +#include "ceph_ver.h" +#include "CompressionPluginBrotli.h" +#include "common/ceph_context.h" + + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + return instance->add(type, name, new CompressionPluginBrotli(cct)); +} + diff --git a/src/compressor/brotli/CompressionPluginBrotli.h b/src/compressor/brotli/CompressionPluginBrotli.h new file mode 100644 index 00000000..641a6e1c --- /dev/null +++ b/src/compressor/brotli/CompressionPluginBrotli.h @@ -0,0 +1,36 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 BI SHUN KE <aionshun@livemail.tw> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_BROTLI_H +#define CEPH_COMPRESSION_PLUGIN_BROTLI_H + +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "BrotliCompressor.h" + +class CompressionPluginBrotli : public CompressionPlugin { +public: + explicit CompressionPluginBrotli(CephContext *cct) : CompressionPlugin(cct) + {} + + virtual int factory(CompressorRef *cs, std::ostream *ss) + { + if (compressor == nullptr) { + BrotliCompressor *interface = new BrotliCompressor(); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/lz4/CMakeLists.txt b/src/compressor/lz4/CMakeLists.txt new file mode 100644 index 00000000..7a8a15d6 --- /dev/null +++ b/src/compressor/lz4/CMakeLists.txt @@ -0,0 +1,13 @@ +# lz4 + +set(lz4_sources + CompressionPluginLZ4.cc +) + +add_library(ceph_lz4 SHARED ${lz4_sources}) +target_link_libraries(ceph_lz4 PRIVATE LZ4::LZ4) +set_target_properties(ceph_lz4 PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_lz4 DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/lz4/CompressionPluginLZ4.cc b/src/compressor/lz4/CompressionPluginLZ4.cc new file mode 100644 index 00000000..ef3f2406 --- /dev/null +++ b/src/compressor/lz4/CompressionPluginLZ4.cc @@ -0,0 +1,36 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 XSKY Inc. + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginLZ4.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginLZ4(cct)); +} diff --git a/src/compressor/lz4/CompressionPluginLZ4.h b/src/compressor/lz4/CompressionPluginLZ4.h new file mode 100644 index 00000000..7e8cdf67 --- /dev/null +++ b/src/compressor/lz4/CompressionPluginLZ4.h @@ -0,0 +1,41 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 XSKY Inc. + * + * Author: Haomai Wang <haomaiwang@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_LZ4_H +#define CEPH_COMPRESSION_PLUGIN_LZ4_H + +// ----------------------------------------------------------------------------- +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "LZ4Compressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginLZ4 : public CompressionPlugin { + +public: + + explicit CompressionPluginLZ4(CephContext* cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, std::ostream *ss) override { + if (compressor == 0) { + LZ4Compressor *interface = new LZ4Compressor(cct); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/lz4/LZ4Compressor.h b/src/compressor/lz4/LZ4Compressor.h new file mode 100644 index 00000000..6e604d5c --- /dev/null +++ b/src/compressor/lz4/LZ4Compressor.h @@ -0,0 +1,144 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LZ4COMPRESSOR_H +#define CEPH_LZ4COMPRESSOR_H + +#include <lz4.h> + +#include "compressor/Compressor.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include "common/config.h" +#include "common/Tub.h" + + +class LZ4Compressor : public Compressor { + public: + LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") { +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) + qat_enabled = true; + else + qat_enabled = false; +#endif + } + + int compress(const bufferlist &src, bufferlist &dst) override { + // older versions of liblz4 introduce bit errors when compressing + // fragmented buffers. this was fixed in lz4 commit + // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first + // appeared in v1.8.2. + // + // workaround: rebuild if not contiguous. + if (!src.is_contiguous()) { + bufferlist new_src = src; + new_src.rebuild(); + return compress(new_src, dst); + } + +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(src, dst); +#endif + bufferptr outptr = buffer::create_small_page_aligned( + LZ4_compressBound(src.length())); + LZ4_stream_t lz4_stream; + LZ4_resetStream(&lz4_stream); + + auto p = src.begin(); + size_t left = src.length(); + int pos = 0; + const char *data; + unsigned num = src.get_num_buffers(); + encode((uint32_t)num, dst); + while (left) { + uint32_t origin_len = p.get_ptr_and_advance(left, &data); + int compressed_len = LZ4_compress_fast_continue( + &lz4_stream, data, outptr.c_str()+pos, origin_len, + outptr.length()-pos, 1); + if (compressed_len <= 0) + return -1; + pos += compressed_len; + left -= origin_len; + encode(origin_len, dst); + encode((uint32_t)compressed_len, dst); + } + ceph_assert(p.end()); + + dst.append(outptr, 0, pos); + return 0; + } + + int decompress(const bufferlist &src, bufferlist &dst) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(src, dst); +#endif + auto i = std::cbegin(src); + return decompress(i, src.length(), dst); + } + + int decompress(bufferlist::const_iterator &p, + size_t compressed_len, + bufferlist &dst) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_len, dst); +#endif + uint32_t count; + std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs; + decode(count, p); + compressed_pairs.resize(count); + uint32_t total_origin = 0; + for (unsigned i = 0; i < count; ++i) { + decode(compressed_pairs[i].first, p); + decode(compressed_pairs[i].second, p); + total_origin += compressed_pairs[i].first; + } + compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); + + bufferptr dstptr(total_origin); + LZ4_streamDecode_t lz4_stream_decode; + LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); + + bufferptr cur_ptr = p.get_current_ptr(); + bufferptr *ptr = &cur_ptr; + Tub<bufferptr> data_holder; + if (compressed_len != cur_ptr.length()) { + data_holder.construct(compressed_len); + p.copy_deep(compressed_len, *data_holder); + ptr = data_holder.get(); + } + + char *c_in = ptr->c_str(); + char *c_out = dstptr.c_str(); + for (unsigned i = 0; i < count; ++i) { + int r = LZ4_decompress_safe_continue( + &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); + if (r == (int)compressed_pairs[i].first) { + c_in += compressed_pairs[i].second; + c_out += compressed_pairs[i].first; + } else if (r < 0) { + return -1; + } else { + return -2; + } + } + dst.push_back(std::move(dstptr)); + return 0; + } +}; + +#endif diff --git a/src/compressor/snappy/CMakeLists.txt b/src/compressor/snappy/CMakeLists.txt new file mode 100644 index 00000000..0bb233d4 --- /dev/null +++ b/src/compressor/snappy/CMakeLists.txt @@ -0,0 +1,13 @@ +# snappy + +set(snappy_sources + CompressionPluginSnappy.cc +) + +add_library(ceph_snappy SHARED ${snappy_sources}) +target_link_libraries(ceph_snappy PRIVATE snappy::snappy) +set_target_properties(ceph_snappy PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_snappy DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/snappy/CompressionPluginSnappy.cc b/src/compressor/snappy/CompressionPluginSnappy.cc new file mode 100644 index 00000000..96abf84f --- /dev/null +++ b/src/compressor/snappy/CompressionPluginSnappy.cc @@ -0,0 +1,38 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + + +// ----------------------------------------------------------------------------- +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginSnappy.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginSnappy(cct)); +} diff --git a/src/compressor/snappy/CompressionPluginSnappy.h b/src/compressor/snappy/CompressionPluginSnappy.h new file mode 100644 index 00000000..19b69765 --- /dev/null +++ b/src/compressor/snappy/CompressionPluginSnappy.h @@ -0,0 +1,42 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_SNAPPY_H +#define CEPH_COMPRESSION_PLUGIN_SNAPPY_H + +// ----------------------------------------------------------------------------- +#include "compressor/CompressionPlugin.h" +#include "SnappyCompressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginSnappy : public CompressionPlugin { + +public: + + explicit CompressionPluginSnappy(CephContext* cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, + std::ostream *ss) override + { + if (compressor == 0) { + SnappyCompressor *interface = new SnappyCompressor(cct); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h new file mode 100644 index 00000000..0291a923 --- /dev/null +++ b/src/compressor/snappy/SnappyCompressor.h @@ -0,0 +1,115 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SNAPPYCOMPRESSOR_H +#define CEPH_SNAPPYCOMPRESSOR_H + +#include <snappy.h> +#include <snappy-sinksource.h> +#include "common/config.h" +#include "compressor/Compressor.h" +#include "include/buffer.h" + +class CEPH_BUFFER_API BufferlistSource : public snappy::Source { + bufferlist::const_iterator pb; + size_t remaining; + + public: + explicit BufferlistSource(bufferlist::const_iterator _pb, size_t _input_len) + : pb(_pb), + remaining(_input_len) { + remaining = std::min(remaining, (size_t)pb.get_remaining()); + } + size_t Available() const override { + return remaining; + } + const char *Peek(size_t *len) override { + const char *data = NULL; + *len = 0; + size_t avail = Available(); + if (avail) { + auto ptmp = pb; + *len = ptmp.get_ptr_and_advance(avail, &data); + } + return data; + } + void Skip(size_t n) override { + ceph_assert(n <= remaining); + pb.advance(n); + remaining -= n; + } + + bufferlist::const_iterator get_pos() const { + return pb; + } +}; + +class SnappyCompressor : public Compressor { + public: + SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") { +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy")) + qat_enabled = true; + else + qat_enabled = false; +#endif + } + + int compress(const bufferlist &src, bufferlist &dst) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(src, dst); +#endif + BufferlistSource source(const_cast<bufferlist&>(src).begin(), src.length()); + bufferptr ptr = buffer::create_small_page_aligned( + snappy::MaxCompressedLength(src.length())); + snappy::UncheckedByteArraySink sink(ptr.c_str()); + snappy::Compress(&source, &sink); + dst.append(ptr, 0, sink.CurrentDestination() - ptr.c_str()); + return 0; + } + + int decompress(const bufferlist &src, bufferlist &dst) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(src, dst); +#endif + auto i = src.begin(); + return decompress(i, src.length(), dst); + } + + int decompress(bufferlist::const_iterator &p, + size_t compressed_len, + bufferlist &dst) override { +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_len, dst); +#endif + snappy::uint32 res_len = 0; + BufferlistSource source_1(p, compressed_len); + if (!snappy::GetUncompressedLength(&source_1, &res_len)) { + return -1; + } + BufferlistSource source_2(p, compressed_len); + bufferptr ptr(res_len); + if (snappy::RawUncompress(&source_2, ptr.c_str())) { + p = source_2.get_pos(); + dst.append(ptr); + return 0; + } + return -2; + } +}; + +#endif diff --git a/src/compressor/zlib/CMakeLists.txt b/src/compressor/zlib/CMakeLists.txt new file mode 100644 index 00000000..51aba083 --- /dev/null +++ b/src/compressor/zlib/CMakeLists.txt @@ -0,0 +1,47 @@ +# zlib + +if(HAVE_INTEL_SSE4_1 AND HAVE_BETTER_YASM_ELF64 AND (NOT APPLE)) + set(zlib_sources + CompressionPluginZlib.cc + ZlibCompressor.cc + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/crc32_gzip.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/crc32_gzip_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/detect_repeated_char.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/encode_df_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/flatten_ll.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/huff_codes.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/hufftables_c.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_body_01.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_body_02.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_body_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_finish.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_base.c + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body_01.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body_02.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_body_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_icf_finish.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_multibinary.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_01.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/igzip_update_histogram_04.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/proc_heap.asm + ${CMAKE_SOURCE_DIR}/src/isa-l/igzip/rfc1951_lookup.asm + ) +else(HAVE_INTEL_SSE4_1 AND HAVE_BETTER_YASM_ELF64 AND (NOT APPLE)) + set(zlib_sources + CompressionPluginZlib.cc + ZlibCompressor.cc + ) +endif(HAVE_INTEL_SSE4_1 AND HAVE_BETTER_YASM_ELF64 AND (NOT APPLE)) + +add_library(ceph_zlib SHARED ${zlib_sources}) +target_link_libraries(ceph_zlib ZLIB::ZLIB) +target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include") +set_target_properties(ceph_zlib PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_zlib DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/zlib/CompressionPluginZlib.cc b/src/compressor/zlib/CompressionPluginZlib.cc new file mode 100644 index 00000000..2215b9a4 --- /dev/null +++ b/src/compressor/zlib/CompressionPluginZlib.cc @@ -0,0 +1,38 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + + +// ----------------------------------------------------------------------------- +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginZlib.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginZlib(cct)); +} diff --git a/src/compressor/zlib/CompressionPluginZlib.h b/src/compressor/zlib/CompressionPluginZlib.h new file mode 100644 index 00000000..38320c0c --- /dev/null +++ b/src/compressor/zlib/CompressionPluginZlib.h @@ -0,0 +1,54 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_ZLIB_H +#define CEPH_COMPRESSION_PLUGIN_ZLIB_H + +// ----------------------------------------------------------------------------- +#include "arch/probe.h" +#include "arch/intel.h" +#include "arch/arm.h" +#include "compressor/CompressionPlugin.h" +#include "ZlibCompressor.h" + +// ----------------------------------------------------------------------------- + +class CompressionPluginZlib : public CompressionPlugin { +public: + bool has_isal = false; + + explicit CompressionPluginZlib(CephContext *cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, + std::ostream *ss) override + { + bool isal = false; +#if defined(__i386__) || defined(__x86_64__) + // other arches or lack of support result in isal = false + if (cct->_conf->compressor_zlib_isal) { + ceph_arch_probe(); + isal = (ceph_arch_intel_pclmul && ceph_arch_intel_sse41); + } +#endif + if (compressor == 0 || has_isal != isal) { + compressor = std::make_shared<ZlibCompressor>(cct, isal); + has_isal = isal; + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc new file mode 100644 index 00000000..e3064d2a --- /dev/null +++ b/src/compressor/zlib/ZlibCompressor.cc @@ -0,0 +1,246 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +// ----------------------------------------------------------------------------- +#include "common/debug.h" +#include "ZlibCompressor.h" +#include "osd/osd_types.h" +#include "isa-l/include/igzip_lib.h" +// ----------------------------------------------------------------------------- + +#include <zlib.h> + +// ----------------------------------------------------------------------------- +#define dout_context cct +#define dout_subsys ceph_subsys_compressor +#undef dout_prefix +#define dout_prefix _prefix(_dout) +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- + +static ostream& +_prefix(std::ostream* _dout) +{ + return *_dout << "ZlibCompressor: "; +} +// ----------------------------------------------------------------------------- + +#define MAX_LEN (CEPH_PAGE_SIZE) + +// default window size for Zlib 1.2.8, negated for raw deflate +#define ZLIB_DEFAULT_WIN_SIZE -15 + +// desired memory usage level. increasing to 9 doesn't speed things up +// significantly (helps only on >=16K blocks) and sometimes degrades +// compression ratio. +#define ZLIB_MEMORY_LEVEL 8 + +int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out) +{ + int ret; + unsigned have; + z_stream strm; + unsigned char* c_in; + int begin = 1; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, ZLIB_DEFAULT_WIN_SIZE, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY); + if (ret != Z_OK) { + dout(1) << "Compression init error: init return " + << ret << " instead of Z_OK" << dendl; + return -1; + } + + for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin(); + i != in.buffers().end();) { + + c_in = (unsigned char*) (*i).c_str(); + long unsigned int len = (*i).length(); + ++i; + + strm.avail_in = len; + int flush = i != in.buffers().end() ? Z_NO_FLUSH : Z_FINISH; + + strm.next_in = c_in; + do { + bufferptr ptr = buffer::create_page_aligned(MAX_LEN); + strm.next_out = (unsigned char*)ptr.c_str() + begin; + strm.avail_out = MAX_LEN - begin; + if (begin) { + // put a compressor variation mark in front of compressed stream, not used at the moment + ptr.c_str()[0] = 0; + begin = 0; + } + ret = deflate(&strm, flush); /* no bad return value */ + if (ret == Z_STREAM_ERROR) { + dout(1) << "Compression error: compress return Z_STREAM_ERROR(" + << ret << ")" << dendl; + deflateEnd(&strm); + return -1; + } + have = MAX_LEN - strm.avail_out; + out.append(ptr, 0, have); + } while (strm.avail_out == 0); + if (strm.avail_in != 0) { + dout(10) << "Compression error: unused input" << dendl; + deflateEnd(&strm); + return -1; + } + } + + deflateEnd(&strm); + return 0; +} + +#if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64) +int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out) +{ + int ret; + unsigned have; + isal_zstream strm; + unsigned char* c_in; + int begin = 1; + + /* allocate deflate state */ + isal_deflate_init(&strm); + strm.end_of_stream = 0; + + for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin(); + i != in.buffers().end();) { + + c_in = (unsigned char*) (*i).c_str(); + long unsigned int len = (*i).length(); + ++i; + + strm.avail_in = len; + strm.end_of_stream = (i == in.buffers().end()); + strm.flush = FINISH_FLUSH; + + strm.next_in = c_in; + + do { + bufferptr ptr = buffer::create_page_aligned(MAX_LEN); + strm.next_out = (unsigned char*)ptr.c_str() + begin; + strm.avail_out = MAX_LEN - begin; + if (begin) { + // put a compressor variation mark in front of compressed stream, not used at the moment + ptr.c_str()[0] = 1; + begin = 0; + } + ret = isal_deflate(&strm); + if (ret != COMP_OK) { + dout(1) << "Compression error: isal_deflate return error (" + << ret << ")" << dendl; + return -1; + } + have = MAX_LEN - strm.avail_out; + out.append(ptr, 0, have); + } while (strm.avail_out == 0); + if (strm.avail_in != 0) { + dout(10) << "Compression error: unused input" << dendl; + return -1; + } + } + + return 0; +} +#endif + +int ZlibCompressor::compress(const bufferlist &in, bufferlist &out) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(in, out); +#endif +#if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64) + if (isal_enabled) + return isal_compress(in, out); + else + return zlib_compress(in, out); +#else + return zlib_compress(in, out); +#endif +} + +int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_size, out); +#endif + + int ret; + unsigned have; + z_stream strm; + const char* c_in; + int begin = 1; + + /* allocate inflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + strm.avail_in = 0; + strm.next_in = Z_NULL; + + // choose the variation of compressor + ret = inflateInit2(&strm, ZLIB_DEFAULT_WIN_SIZE); + if (ret != Z_OK) { + dout(1) << "Decompression init error: init return " + << ret << " instead of Z_OK" << dendl; + return -1; + } + + size_t remaining = std::min<size_t>(p.get_remaining(), compressed_size); + + while(remaining) { + long unsigned int len = p.get_ptr_and_advance(remaining, &c_in); + remaining -= len; + strm.avail_in = len - begin; + strm.next_in = (unsigned char*)c_in + begin; + begin = 0; + + do { + strm.avail_out = MAX_LEN; + bufferptr ptr = buffer::create_page_aligned(MAX_LEN); + strm.next_out = (unsigned char*)ptr.c_str(); + ret = inflate(&strm, Z_NO_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) { + dout(1) << "Decompression error: decompress return " + << ret << dendl; + inflateEnd(&strm); + return -1; + } + have = MAX_LEN - strm.avail_out; + out.append(ptr, 0, have); + } while (strm.avail_out == 0); + } + + /* clean up and return */ + (void)inflateEnd(&strm); + return 0; +} + +int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(in, out); +#endif + auto i = std::cbegin(in); + return decompress(i, in.length(), out); +} diff --git a/src/compressor/zlib/ZlibCompressor.h b/src/compressor/zlib/ZlibCompressor.h new file mode 100644 index 00000000..d770547a --- /dev/null +++ b/src/compressor/zlib/ZlibCompressor.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_ZLIB_H +#define CEPH_COMPRESSION_ZLIB_H + +#include "common/config.h" +#include "compressor/Compressor.h" + +class ZlibCompressor : public Compressor { + bool isal_enabled; + CephContext *const cct; +public: + ZlibCompressor(CephContext *cct, bool isal) + : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) { +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib")) + qat_enabled = true; + else + qat_enabled = false; +#endif + } + + int compress(const bufferlist &in, bufferlist &out) override; + int decompress(const bufferlist &in, bufferlist &out) override; + int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out) override; +private: + int zlib_compress(const bufferlist &in, bufferlist &out); + int isal_compress(const bufferlist &in, bufferlist &out); + }; + + +#endif diff --git a/src/compressor/zstd/CMakeLists.txt b/src/compressor/zstd/CMakeLists.txt new file mode 100644 index 00000000..76709bbb --- /dev/null +++ b/src/compressor/zstd/CMakeLists.txt @@ -0,0 +1,41 @@ +# zstd + +# libzstd - build it statically +set(ZSTD_C_FLAGS "-fPIC -Wno-unused-variable -O3") + +include(ExternalProject) +ExternalProject_Add(zstd_ext + SOURCE_DIR ${CMAKE_SOURCE_DIR}/src/zstd/build/cmake + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} + -DCMAKE_C_FLAGS=${ZSTD_C_FLAGS} + -DCMAKE_AR=${CMAKE_AR} + -DCMAKE_POSITION_INDEPENDENT_CODE=${ENABLE_SHARED} + BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/libzstd + BUILD_COMMAND $(MAKE) libzstd_static + INSTALL_COMMAND "true") + +# force zstd make to be called on each time +ExternalProject_Add_Step(zstd_ext forcebuild + DEPENDEES configure + DEPENDERS build + COMMAND "true" + ALWAYS 1) + +add_library(zstd STATIC IMPORTED) +set_target_properties(zstd PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/src/zstd/lib" + IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/libzstd/lib/libzstd.a") +add_dependencies(zstd zstd_ext) + +set(zstd_sources + CompressionPluginZstd.cc +) + +add_library(ceph_zstd SHARED ${zstd_sources}) +target_link_libraries(ceph_zstd PRIVATE zstd) +set_target_properties(ceph_zstd PROPERTIES + VERSION 2.0.0 + SOVERSION 2 + INSTALL_RPATH "") +install(TARGETS ceph_zstd DESTINATION ${compressor_plugin_dir}) diff --git a/src/compressor/zstd/CompressionPluginZstd.cc b/src/compressor/zstd/CompressionPluginZstd.cc new file mode 100644 index 00000000..62c2cfbf --- /dev/null +++ b/src/compressor/zstd/CompressionPluginZstd.cc @@ -0,0 +1,36 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#include "acconfig.h" +#include "ceph_ver.h" +#include "common/ceph_context.h" +#include "CompressionPluginZstd.h" + +// ----------------------------------------------------------------------------- + +const char *__ceph_plugin_version() +{ + return CEPH_GIT_NICE_VER; +} + +// ----------------------------------------------------------------------------- + +int __ceph_plugin_init(CephContext *cct, + const std::string& type, + const std::string& name) +{ + PluginRegistry *instance = cct->get_plugin_registry(); + + return instance->add(type, name, new CompressionPluginZstd(cct)); +} diff --git a/src/compressor/zstd/CompressionPluginZstd.h b/src/compressor/zstd/CompressionPluginZstd.h new file mode 100644 index 00000000..72ef9c45 --- /dev/null +++ b/src/compressor/zstd/CompressionPluginZstd.h @@ -0,0 +1,43 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Mirantis, Inc. + * + * Author: Alyona Kiseleva <akiselyova@mirantis.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + */ + +#ifndef CEPH_COMPRESSION_PLUGIN_ZSTD_H +#define CEPH_COMPRESSION_PLUGIN_ZSTD_H + +// ----------------------------------------------------------------------------- +#include "ceph_ver.h" +#include "compressor/CompressionPlugin.h" +#include "ZstdCompressor.h" +// ----------------------------------------------------------------------------- + +class CompressionPluginZstd : public CompressionPlugin { + +public: + + explicit CompressionPluginZstd(CephContext* cct) : CompressionPlugin(cct) + {} + + int factory(CompressorRef *cs, + std::ostream *ss) override + { + if (compressor == 0) { + ZstdCompressor *interface = new ZstdCompressor(cct); + compressor = CompressorRef(interface); + } + *cs = compressor; + return 0; + } +}; + +#endif diff --git a/src/compressor/zstd/ZstdCompressor.h b/src/compressor/zstd/ZstdCompressor.h new file mode 100644 index 00000000..83508b93 --- /dev/null +++ b/src/compressor/zstd/ZstdCompressor.h @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ZSTDCOMPRESSOR_H +#define CEPH_ZSTDCOMPRESSOR_H + +#define ZSTD_STATIC_LINKING_ONLY +#include "zstd/lib/zstd.h" + +#include "include/buffer.h" +#include "include/encoding.h" +#include "compressor/Compressor.h" + +class ZstdCompressor : public Compressor { + public: + ZstdCompressor(CephContext *cct) : Compressor(COMP_ALG_ZSTD, "zstd"), cct(cct) {} + + int compress(const bufferlist &src, bufferlist &dst) override { + ZSTD_CStream *s = ZSTD_createCStream(); + ZSTD_initCStream_srcSize(s, cct->_conf->compressor_zstd_level, src.length()); + auto p = src.begin(); + size_t left = src.length(); + + size_t const out_max = ZSTD_compressBound(left); + bufferptr outptr = buffer::create_small_page_aligned(out_max); + ZSTD_outBuffer_s outbuf; + outbuf.dst = outptr.c_str(); + outbuf.size = outptr.length(); + outbuf.pos = 0; + + while (left) { + ceph_assert(!p.end()); + struct ZSTD_inBuffer_s inbuf; + inbuf.pos = 0; + inbuf.size = p.get_ptr_and_advance(left, (const char**)&inbuf.src); + left -= inbuf.size; + ZSTD_EndDirective const zed = (left==0) ? ZSTD_e_end : ZSTD_e_continue; + size_t r = ZSTD_compress_generic(s, &outbuf, &inbuf, zed); + if (ZSTD_isError(r)) { + return -EINVAL; + } + } + ceph_assert(p.end()); + + ZSTD_freeCStream(s); + + // prefix with decompressed length + encode((uint32_t)src.length(), dst); + dst.append(outptr, 0, outbuf.pos); + return 0; + } + + int decompress(const bufferlist &src, bufferlist &dst) override { + auto i = std::cbegin(src); + return decompress(i, src.length(), dst); + } + + int decompress(bufferlist::const_iterator &p, + size_t compressed_len, + bufferlist &dst) override { + if (compressed_len < 4) { + return -1; + } + compressed_len -= 4; + uint32_t dst_len; + decode(dst_len, p); + + bufferptr dstptr(dst_len); + ZSTD_outBuffer_s outbuf; + outbuf.dst = dstptr.c_str(); + outbuf.size = dstptr.length(); + outbuf.pos = 0; + ZSTD_DStream *s = ZSTD_createDStream(); + ZSTD_initDStream(s); + while (compressed_len > 0) { + if (p.end()) { + return -1; + } + ZSTD_inBuffer_s inbuf; + inbuf.pos = 0; + inbuf.size = p.get_ptr_and_advance(compressed_len, + (const char**)&inbuf.src); + ZSTD_decompressStream(s, &outbuf, &inbuf); + compressed_len -= inbuf.size; + } + ZSTD_freeDStream(s); + + dst.append(dstptr, 0, outbuf.pos); + return 0; + } + private: + CephContext *const cct; +}; + +#endif |