1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
import json
import logging
from typing import TYPE_CHECKING, List, Dict, Any, Tuple
from orchestrator import OrchestratorError
from mgr_util import ServerConfigException, verify_tls
from .cephadmservice import CephadmService, CephadmDaemonDeploySpec
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
logger = logging.getLogger(__name__)
class CephadmExporterConfig:
required_keys = ['crt', 'key', 'token', 'port']
DEFAULT_PORT = '9443'
def __init__(self, mgr: "CephadmOrchestrator", crt: str = "", key: str = "",
token: str = "", port: str = "") -> None:
self.mgr = mgr
self.crt = crt
self.key = key
self.token = token
self.port = port
@property
def ready(self) -> bool:
return all([self.crt, self.key, self.token, self.port])
def load_from_store(self) -> None:
cfg = self.mgr._get_exporter_config()
assert isinstance(cfg, dict)
self.crt = cfg.get('crt', "")
self.key = cfg.get('key', "")
self.token = cfg.get('token', "")
self.port = cfg.get('port', "")
def load_from_json(self, json_str: str) -> Tuple[int, str]:
try:
cfg = json.loads(json_str)
except ValueError:
return 1, "Invalid JSON provided - unable to load"
if not all([k in cfg for k in CephadmExporterConfig.required_keys]):
return 1, "JSON file must contain crt, key, token and port"
self.crt = cfg.get('crt')
self.key = cfg.get('key')
self.token = cfg.get('token')
self.port = cfg.get('port')
return 0, ""
def validate_config(self) -> Tuple[int, str]:
if not self.ready:
return 1, "Incomplete configuration. cephadm-exporter needs crt, key, token and port to be set"
for check in [self._validate_tls, self._validate_token, self._validate_port]:
rc, reason = check()
if rc:
return 1, reason
return 0, ""
def _validate_tls(self) -> Tuple[int, str]:
try:
verify_tls(self.crt, self.key)
except ServerConfigException as e:
return 1, str(e)
return 0, ""
def _validate_token(self) -> Tuple[int, str]:
if not isinstance(self.token, str):
return 1, "token must be a string"
if len(self.token) < 8:
return 1, "Token must be a string of at least 8 chars in length"
return 0, ""
def _validate_port(self) -> Tuple[int, str]:
try:
p = int(str(self.port))
if p <= 1024:
raise ValueError
except ValueError:
return 1, "Port must be a integer (>1024)"
return 0, ""
class CephadmExporter(CephadmService):
TYPE = 'cephadm-exporter'
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
cfg = CephadmExporterConfig(self.mgr)
cfg.load_from_store()
if cfg.ready:
rc, reason = cfg.validate_config()
if rc:
raise OrchestratorError(reason)
else:
logger.info(
"Incomplete/Missing configuration, applying defaults")
self.mgr._set_exporter_defaults()
cfg.load_from_store()
if not daemon_spec.ports:
daemon_spec.ports = [int(cfg.port)]
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
assert self.TYPE == daemon_spec.daemon_type
deps: List[str] = []
cfg = CephadmExporterConfig(self.mgr)
cfg.load_from_store()
if cfg.ready:
rc, reason = cfg.validate_config()
if rc:
raise OrchestratorError(reason)
else:
logger.info("Using default configuration for cephadm-exporter")
self.mgr._set_exporter_defaults()
cfg.load_from_store()
config = {
"crt": cfg.crt,
"key": cfg.key,
"token": cfg.token
}
return config, deps
def purge(self, service_name: str) -> None:
logger.info("Purging cephadm-exporter settings from mon K/V store")
self.mgr._clear_exporter_config_settings()
|