From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/msg/async/compression_onwire.cc | 86 +++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 src/msg/async/compression_onwire.cc (limited to 'src/msg/async/compression_onwire.cc') diff --git a/src/msg/async/compression_onwire.cc b/src/msg/async/compression_onwire.cc new file mode 100644 index 000000000..9e6d07cfd --- /dev/null +++ b/src/msg/async/compression_onwire.cc @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "compression_onwire.h" +#include "compression_meta.h" +#include "common/dout.h" + +#define dout_subsys ceph_subsys_ms + +namespace ceph::compression::onwire { + +rxtx_t rxtx_t::create_handler_pair( + CephContext* ctx, + const CompConnectionMeta& comp_meta, + std::uint64_t compress_min_size) +{ + if (comp_meta.is_compress()) { + CompressorRef compressor = Compressor::create(ctx, comp_meta.get_method()); + if (compressor) { + return {std::make_unique(ctx, compressor), + std::make_unique(ctx, compressor, + comp_meta.get_mode(), + compress_min_size)}; + } + } + return {}; +} + +std::optional TxHandler::compress(const ceph::bufferlist &input) +{ + if (m_init_onwire_size < m_min_size) { + ldout(m_cct, 20) << __func__ + << " discovered frame that is smaller than threshold, aborting compression" + << dendl; + return {}; + } + + m_compress_potential -= input.length(); + + ceph::bufferlist out; + if (input.length() == 0) { + ldout(m_cct, 20) << __func__ + << " discovered an empty segment, skipping compression without aborting" + << dendl; + out.clear(); + return out; + } + + std::optional compressor_message; + if (m_compressor->compress(input, out, compressor_message)) { + return {}; + } else { + ldout(m_cct, 20) << __func__ << " uncompressed.length()=" << input.length() + << " compressed.length()=" << out.length() << dendl; + m_onwire_size += out.length(); + return out; + } +} + +std::optional RxHandler::decompress(const ceph::bufferlist &input) +{ + ceph::bufferlist out; + if (input.length() == 0) { + ldout(m_cct, 20) << __func__ + << " discovered an empty segment, skipping decompression without aborting" + << dendl; + out.clear(); + return out; + } + + std::optional compressor_message; + if (m_compressor->decompress(input, out, compressor_message)) { + return {}; + } else { + ldout(m_cct, 20) << __func__ << " compressed.length()=" << input.length() + << " uncompressed.length()=" << out.length() << dendl; + return out; + } +} + +void TxHandler::done() +{ + ldout(m_cct, 25) << __func__ << " compression ratio=" << get_ratio() << dendl; +} + +} // namespace ceph::compression::onwire -- cgit v1.2.3