summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/utils.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/cephadm/utils.py
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/cephadm/utils.py136
1 files changed, 136 insertions, 0 deletions
diff --git a/src/pybind/mgr/cephadm/utils.py b/src/pybind/mgr/cephadm/utils.py
new file mode 100644
index 000000000..a4940c361
--- /dev/null
+++ b/src/pybind/mgr/cephadm/utils.py
@@ -0,0 +1,136 @@
+import logging
+import json
+import socket
+from enum import Enum
+from functools import wraps
+from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple
+from orchestrator import OrchestratorError
+
+if TYPE_CHECKING:
+ from cephadm import CephadmOrchestrator
+
+T = TypeVar('T')
+logger = logging.getLogger(__name__)
+
+ConfEntity = NewType('ConfEntity', str)
+
+
+class CephadmNoImage(Enum):
+ token = 1
+
+
+# ceph daemon types that use the ceph container image.
+# NOTE: order important here as these are used for upgrade order
+CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
+GATEWAY_TYPES = ['iscsi', 'nfs']
+MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', 'alertmanager', 'grafana']
+RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['nfs']
+
+CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES
+
+# these daemon types use the ceph container image
+CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs']
+
+# Used for _run_cephadm used for check-host etc that don't require an --image parameter
+cephadmNoImage = CephadmNoImage.token
+
+
+class ContainerInspectInfo(NamedTuple):
+ image_id: str
+ ceph_version: Optional[str]
+ repo_digests: Optional[List[str]]
+
+
+def name_to_config_section(name: str) -> ConfEntity:
+ """
+ Map from daemon names to ceph entity names (as seen in config)
+ """
+ daemon_type = name.split('.', 1)[0]
+ if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
+ return ConfEntity('client.' + name)
+ elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
+ return ConfEntity(name)
+ else:
+ return ConfEntity('mon')
+
+
+def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
+ @wraps(f)
+ def forall_hosts_wrapper(*args: Any) -> List[T]:
+ from cephadm.module import CephadmOrchestrator
+
+ # Some weired logic to make calling functions with multiple arguments work.
+ if len(args) == 1:
+ vals = args[0]
+ self = None
+ elif len(args) == 2:
+ self, vals = args
+ else:
+ assert 'either f([...]) or self.f([...])'
+
+ def do_work(arg: Any) -> T:
+ if not isinstance(arg, tuple):
+ arg = (arg, )
+ try:
+ if self:
+ return f(self, *arg)
+ return f(*arg)
+ except Exception:
+ logger.exception(f'executing {f.__name__}({args}) failed.')
+ raise
+
+ assert CephadmOrchestrator.instance is not None
+ return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
+
+ return forall_hosts_wrapper
+
+
+def get_cluster_health(mgr: 'CephadmOrchestrator') -> str:
+ # check cluster health
+ ret, out, err = mgr.check_mon_command({
+ 'prefix': 'health',
+ 'format': 'json',
+ })
+ try:
+ j = json.loads(out)
+ except ValueError:
+ msg = 'Failed to parse health status: Cannot decode JSON'
+ logger.exception('%s: \'%s\'' % (msg, out))
+ raise OrchestratorError('failed to parse health status')
+
+ return j['status']
+
+
+def is_repo_digest(image_name: str) -> bool:
+ """
+ repo digest are something like "ceph/ceph@sha256:blablabla"
+ """
+ return '@' in image_name
+
+
+def resolve_ip(hostname: str) -> str:
+ try:
+ r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME,
+ type=socket.SOCK_STREAM)
+ # pick first v4 IP, if present
+ for a in r:
+ if a[0] == socket.AF_INET:
+ return a[4][0]
+ return r[0][4][0]
+ except socket.gaierror as e:
+ raise OrchestratorError(f"Cannot resolve ip for host {hostname}: {e}")
+
+
+def ceph_release_to_major(release: str) -> int:
+ return ord(release[0]) - ord('a') + 1
+
+
+def file_mode_to_str(mode: int) -> str:
+ r = ''
+ for shift in range(0, 9, 3):
+ r = (
+ f'{"r" if (mode >> shift) & 4 else "-"}'
+ f'{"w" if (mode >> shift) & 2 else "-"}'
+ f'{"x" if (mode >> shift) & 1 else "-"}'
+ ) + r
+ return r