summaryrefslogtreecommitdiffstats
path: root/src/msg/async/compression_onwire.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/async/compression_onwire.h')
-rw-r--r--src/msg/async/compression_onwire.h105
1 files changed, 105 insertions, 0 deletions
diff --git a/src/msg/async/compression_onwire.h b/src/msg/async/compression_onwire.h
new file mode 100644
index 000000000..d3b35a465
--- /dev/null
+++ b/src/msg/async/compression_onwire.h
@@ -0,0 +1,105 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_COMPRESSION_ONWIRE_H
+#define CEPH_COMPRESSION_ONWIRE_H
+
+#include <cstdint>
+#include <optional>
+
+#include "compressor/Compressor.h"
+#include "include/buffer.h"
+
+class CompConnectionMeta;
+
+namespace ceph::compression::onwire {
+ using Compressor = TOPNSPC::Compressor;
+ using CompressorRef = TOPNSPC::CompressorRef;
+
+ class Handler {
+ public:
+ Handler(CephContext* const cct, CompressorRef compressor)
+ : m_cct(cct), m_compressor(compressor) {}
+
+ protected:
+ CephContext* const m_cct;
+ CompressorRef m_compressor;
+ };
+
+ class RxHandler final : private Handler {
+ public:
+ RxHandler(CephContext* const cct, CompressorRef compressor)
+ : Handler(cct, compressor) {}
+ ~RxHandler() {};
+
+ /**
+ * Decompresses a bufferlist
+ *
+ * @param input compressed bufferlist
+ * @param out decompressed bufferlist
+ *
+ * @returns true on success, false on failure
+ */
+ std::optional<ceph::bufferlist> decompress(const ceph::bufferlist &input);
+ };
+
+ class TxHandler final : private Handler {
+ public:
+ TxHandler(CephContext* const cct, CompressorRef compressor, int mode, std::uint64_t min_size)
+ : Handler(cct, compressor),
+ m_min_size(min_size),
+ m_mode(static_cast<Compressor::CompressionMode>(mode))
+ {}
+ ~TxHandler() {}
+
+ void reset_handler(int num_segments, uint64_t size) {
+ m_init_onwire_size = size;
+ m_compress_potential = size;
+ m_onwire_size = 0;
+ }
+
+ void done();
+
+ /**
+ * Compresses a bufferlist
+ *
+ * @param input bufferlist to compress
+ * @param out compressed bufferlist
+ *
+ * @returns true on success, false on failure
+ */
+ std::optional<ceph::bufferlist> compress(const ceph::bufferlist &input);
+
+ double get_ratio() const {
+ return get_initial_size() / (double) get_final_size();
+ }
+
+ uint64_t get_initial_size() const {
+ return m_init_onwire_size;
+ }
+
+ uint64_t get_final_size() const {
+ return m_onwire_size;
+ }
+
+ private:
+ uint64_t m_min_size;
+ Compressor::CompressionMode m_mode;
+
+ uint64_t m_init_onwire_size;
+ uint64_t m_onwire_size;
+ uint64_t m_compress_potential;
+ };
+
+ struct rxtx_t {
+ std::unique_ptr<RxHandler> rx;
+ std::unique_ptr<TxHandler> tx;
+
+ static rxtx_t create_handler_pair(
+ CephContext* ctx,
+ const CompConnectionMeta& comp_meta,
+ std::uint64_t compress_min_size);
+ };
+}
+
+#endif // CEPH_COMPRESSION_ONWIRE_H