From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/msg/async/compression_onwire.h | 105 +++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 src/msg/async/compression_onwire.h (limited to 'src/msg/async/compression_onwire.h') diff --git a/src/msg/async/compression_onwire.h b/src/msg/async/compression_onwire.h new file mode 100644 index 000000000..d3b35a465 --- /dev/null +++ b/src/msg/async/compression_onwire.h @@ -0,0 +1,105 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_COMPRESSION_ONWIRE_H +#define CEPH_COMPRESSION_ONWIRE_H + +#include +#include + +#include "compressor/Compressor.h" +#include "include/buffer.h" + +class CompConnectionMeta; + +namespace ceph::compression::onwire { + using Compressor = TOPNSPC::Compressor; + using CompressorRef = TOPNSPC::CompressorRef; + + class Handler { + public: + Handler(CephContext* const cct, CompressorRef compressor) + : m_cct(cct), m_compressor(compressor) {} + + protected: + CephContext* const m_cct; + CompressorRef m_compressor; + }; + + class RxHandler final : private Handler { + public: + RxHandler(CephContext* const cct, CompressorRef compressor) + : Handler(cct, compressor) {} + ~RxHandler() {}; + + /** + * Decompresses a bufferlist + * + * @param input compressed bufferlist + * @param out decompressed bufferlist + * + * @returns true on success, false on failure + */ + std::optional decompress(const ceph::bufferlist &input); + }; + + class TxHandler final : private Handler { + public: + TxHandler(CephContext* const cct, CompressorRef compressor, int mode, std::uint64_t min_size) + : Handler(cct, compressor), + m_min_size(min_size), + m_mode(static_cast(mode)) + {} + ~TxHandler() {} + + void reset_handler(int num_segments, uint64_t size) { + m_init_onwire_size = size; + m_compress_potential = size; + m_onwire_size = 0; + } + + void done(); + + /** + * Compresses a bufferlist + * + * @param input bufferlist to compress + * @param out compressed bufferlist + * + * @returns true on success, false on failure + */ + std::optional compress(const ceph::bufferlist &input); + + double get_ratio() const { + return get_initial_size() / (double) get_final_size(); + } + + uint64_t get_initial_size() const { + return m_init_onwire_size; + } + + uint64_t get_final_size() const { + return m_onwire_size; + } + + private: + uint64_t m_min_size; + Compressor::CompressionMode m_mode; + + uint64_t m_init_onwire_size; + uint64_t m_onwire_size; + uint64_t m_compress_potential; + }; + + struct rxtx_t { + std::unique_ptr rx; + std::unique_ptr tx; + + static rxtx_t create_handler_pair( + CephContext* ctx, + const CompConnectionMeta& comp_meta, + std::uint64_t compress_min_size); + }; +} + +#endif // CEPH_COMPRESSION_ONWIRE_H -- cgit v1.2.3