1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
import logging
import json
import socket
from enum import Enum
from functools import wraps
from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple
from orchestrator import OrchestratorError
if TYPE_CHECKING:
from cephadm import CephadmOrchestrator
T = TypeVar('T')
logger = logging.getLogger(__name__)
ConfEntity = NewType('ConfEntity', str)
class CephadmNoImage(Enum):
token = 1
# ceph daemon types that use the ceph container image.
# NOTE: order important here as these are used for upgrade order
CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw',
'rbd-mirror', 'cephfs-mirror', 'ceph-exporter']
GATEWAY_TYPES = ['iscsi', 'nfs', 'nvmeof']
MONITORING_STACK_TYPES = ['node-exporter', 'prometheus',
'alertmanager', 'grafana', 'loki', 'promtail']
RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['haproxy', 'nfs']
CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES
# these daemon types use the ceph container image
CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs', 'node-proxy']
# these daemons do not use the ceph image. There are other daemons
# that also don't use the ceph image, but we only care about those
# that are part of the upgrade order here
NON_CEPH_IMAGE_TYPES = MONITORING_STACK_TYPES + ['nvmeof']
# Used for _run_cephadm used for check-host etc that don't require an --image parameter
cephadmNoImage = CephadmNoImage.token
class ContainerInspectInfo(NamedTuple):
image_id: str
ceph_version: Optional[str]
repo_digests: Optional[List[str]]
class SpecialHostLabels(str, Enum):
ADMIN: str = '_admin'
NO_MEMORY_AUTOTUNE: str = '_no_autotune_memory'
DRAIN_DAEMONS: str = '_no_schedule'
DRAIN_CONF_KEYRING: str = '_no_conf_keyring'
def to_json(self) -> str:
return self.value
def name_to_config_section(name: str) -> ConfEntity:
"""
Map from daemon names to ceph entity names (as seen in config)
"""
daemon_type = name.split('.', 1)[0]
if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter', 'nvmeof']:
return ConfEntity('client.' + name)
elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
return ConfEntity(name)
else:
return ConfEntity('mon')
def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
@wraps(f)
def forall_hosts_wrapper(*args: Any) -> List[T]:
from cephadm.module import CephadmOrchestrator
# Some weird logic to make calling functions with multiple arguments work.
if len(args) == 1:
vals = args[0]
self = None
elif len(args) == 2:
self, vals = args
else:
assert 'either f([...]) or self.f([...])'
def do_work(arg: Any) -> T:
if not isinstance(arg, tuple):
arg = (arg, )
try:
if self:
return f(self, *arg)
return f(*arg)
except Exception:
logger.exception(f'executing {f.__name__}({args}) failed.')
raise
assert CephadmOrchestrator.instance is not None
return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
return forall_hosts_wrapper
def get_cluster_health(mgr: 'CephadmOrchestrator') -> str:
# check cluster health
ret, out, err = mgr.check_mon_command({
'prefix': 'health',
'format': 'json',
})
try:
j = json.loads(out)
except ValueError:
msg = 'Failed to parse health status: Cannot decode JSON'
logger.exception('%s: \'%s\'' % (msg, out))
raise OrchestratorError('failed to parse health status')
return j['status']
def is_repo_digest(image_name: str) -> bool:
"""
repo digest are something like "ceph/ceph@sha256:blablabla"
"""
return '@' in image_name
def resolve_ip(hostname: str) -> str:
try:
r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME,
type=socket.SOCK_STREAM)
# pick first v4 IP, if present
for a in r:
if a[0] == socket.AF_INET:
return a[4][0]
return r[0][4][0]
except socket.gaierror as e:
raise OrchestratorError(f"Cannot resolve ip for host {hostname}: {e}")
def ceph_release_to_major(release: str) -> int:
return ord(release[0]) - ord('a') + 1
def file_mode_to_str(mode: int) -> str:
r = ''
for shift in range(0, 9, 3):
r = (
f'{"r" if (mode >> shift) & 4 else "-"}'
f'{"w" if (mode >> shift) & 2 else "-"}'
f'{"x" if (mode >> shift) & 1 else "-"}'
) + r
return r
|