summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/utils.py
blob: a4940c361a48f7433464e8e1eb5ff985aba62cd5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
import json
import socket
from enum import Enum
from functools import wraps
from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple
from orchestrator import OrchestratorError

if TYPE_CHECKING:
    from cephadm import CephadmOrchestrator

T = TypeVar('T')
logger = logging.getLogger(__name__)

ConfEntity = NewType('ConfEntity', str)


class CephadmNoImage(Enum):
    token = 1


# ceph daemon types that use the ceph container image.
# NOTE: order important here as these are used for upgrade order
CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
GATEWAY_TYPES = ['iscsi', 'nfs']
MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', 'alertmanager', 'grafana']
RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['nfs']

CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES

# these daemon types use the ceph container image
CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs']

# Used for _run_cephadm used for check-host etc that don't require an --image parameter
cephadmNoImage = CephadmNoImage.token


class ContainerInspectInfo(NamedTuple):
    image_id: str
    ceph_version: Optional[str]
    repo_digests: Optional[List[str]]


def name_to_config_section(name: str) -> ConfEntity:
    """
    Map from daemon names to ceph entity names (as seen in config)
    """
    daemon_type = name.split('.', 1)[0]
    if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
        return ConfEntity('client.' + name)
    elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
        return ConfEntity(name)
    else:
        return ConfEntity('mon')


def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
    @wraps(f)
    def forall_hosts_wrapper(*args: Any) -> List[T]:
        from cephadm.module import CephadmOrchestrator

        # Some weired logic to make calling functions with multiple arguments work.
        if len(args) == 1:
            vals = args[0]
            self = None
        elif len(args) == 2:
            self, vals = args
        else:
            assert 'either f([...]) or self.f([...])'

        def do_work(arg: Any) -> T:
            if not isinstance(arg, tuple):
                arg = (arg, )
            try:
                if self:
                    return f(self, *arg)
                return f(*arg)
            except Exception:
                logger.exception(f'executing {f.__name__}({args}) failed.')
                raise

        assert CephadmOrchestrator.instance is not None
        return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)

    return forall_hosts_wrapper


def get_cluster_health(mgr: 'CephadmOrchestrator') -> str:
    # check cluster health
    ret, out, err = mgr.check_mon_command({
        'prefix': 'health',
        'format': 'json',
    })
    try:
        j = json.loads(out)
    except ValueError:
        msg = 'Failed to parse health status: Cannot decode JSON'
        logger.exception('%s: \'%s\'' % (msg, out))
        raise OrchestratorError('failed to parse health status')

    return j['status']


def is_repo_digest(image_name: str) -> bool:
    """
    repo digest are something like "ceph/ceph@sha256:blablabla"
    """
    return '@' in image_name


def resolve_ip(hostname: str) -> str:
    try:
        r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME,
                               type=socket.SOCK_STREAM)
        # pick first v4 IP, if present
        for a in r:
            if a[0] == socket.AF_INET:
                return a[4][0]
        return r[0][4][0]
    except socket.gaierror as e:
        raise OrchestratorError(f"Cannot resolve ip for host {hostname}: {e}")


def ceph_release_to_major(release: str) -> int:
    return ord(release[0]) - ord('a') + 1


def file_mode_to_str(mode: int) -> str:
    r = ''
    for shift in range(0, 9, 3):
        r = (
            f'{"r" if (mode >> shift) & 4 else "-"}'
            f'{"w" if (mode >> shift) & 2 else "-"}'
            f'{"x" if (mode >> shift) & 1 else "-"}'
        ) + r
    return r