summaryrefslogtreecommitdiffstats
path: root/src/msg/compressor_registry.cc
blob: 80b155fbdf8eddc1fb56aad9decd3847e9221b99 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "compressor_registry.h"
#include "common/dout.h"

#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << "CompressorRegistry(" << this << ") "

CompressorRegistry::CompressorRegistry(CephContext *cct)
  : cct(cct)
{
  cct->_conf.add_observer(this);
}

CompressorRegistry::~CompressorRegistry()
{
  cct->_conf.remove_observer(this);
}

const char** CompressorRegistry::get_tracked_conf_keys() const
{
  static const char *keys[] = {
    "ms_osd_compress_mode",
    "ms_osd_compression_algorithm",
    "ms_osd_compress_min_size",
    "ms_compress_secure",
    nullptr
  };
  return keys;
}

void CompressorRegistry::handle_conf_change(
  const ConfigProxy& conf,
  const std::set<std::string>& changed)
{
  std::scoped_lock l(lock);
  _refresh_config();
}

std::vector<uint32_t> CompressorRegistry::_parse_method_list(const std::string& s)
{
  std::vector<uint32_t> methods;

  for_each_substr(s, ";,= \t", [&] (auto method) {
    ldout(cct,20) << "adding algorithm method: " << method << dendl;

    auto alg_type = Compressor::get_comp_alg_type(method);
    if (alg_type) {
      methods.push_back(*alg_type);
    } else {
      ldout(cct,5) << "WARNING: unknown algorithm method " << method << dendl;
    }
  });

  if (methods.empty()) {
    methods.push_back(Compressor::COMP_ALG_NONE);
  }
  ldout(cct,20) << __func__ << " " << s << " -> " << methods << dendl;

  return methods;
}

void CompressorRegistry::_refresh_config()
{
  auto c_mode = Compressor::get_comp_mode_type(cct->_conf.get_val<std::string>("ms_osd_compress_mode"));

  if (c_mode) {
    ms_osd_compress_mode = *c_mode;
  } else {
    ldout(cct,1) << __func__ << " failed to identify ms_osd_compress_mode " 
      << ms_osd_compress_mode << dendl;

    ms_osd_compress_mode = Compressor::COMP_NONE;
  }

  ms_osd_compression_methods = _parse_method_list(cct->_conf.get_val<std::string>("ms_osd_compression_algorithm"));
  ms_osd_compress_min_size = cct->_conf.get_val<std::uint64_t>("ms_osd_compress_min_size");

  ms_compress_secure = cct->_conf.get_val<bool>("ms_compress_secure");

  ldout(cct,10) << __func__ << " ms_osd_compression_mode " << ms_osd_compress_mode
    << " ms_osd_compression_methods " << ms_osd_compression_methods
    << " ms_osd_compress_above_min_size " << ms_osd_compress_min_size
    << " ms_compress_secure " << ms_compress_secure
    << dendl;
}

Compressor::CompressionAlgorithm
CompressorRegistry::pick_method(uint32_t peer_type,
                                const std::vector<uint32_t>& preferred_methods)
{
  std::vector<uint32_t> allowed_methods = get_methods(peer_type);
  auto preferred = std::find_first_of(preferred_methods.begin(),
                                      preferred_methods.end(),
                                      allowed_methods.begin(),
                                      allowed_methods.end());
  if (preferred == preferred_methods.end()) {
    ldout(cct,1) << "failed to pick compression method from client's "
                 << preferred_methods
                 << " and our " << allowed_methods << dendl;
    return Compressor::COMP_ALG_NONE;
  } else {
    return static_cast<Compressor::CompressionAlgorithm>(*preferred);
  }
}

Compressor::CompressionMode
CompressorRegistry::get_mode(uint32_t peer_type, bool is_secure)
{
  std::scoped_lock l(lock);
  ldout(cct, 20) << __func__ << " peer_type " << peer_type 
    << " is_secure " << is_secure << dendl;

  if (is_secure && !ms_compress_secure) {
    return Compressor::COMP_NONE;
  }

  switch (peer_type) {
  case CEPH_ENTITY_TYPE_OSD:
    return static_cast<Compressor::CompressionMode>(ms_osd_compress_mode);
  default:
    return Compressor::COMP_NONE;
  }
}