summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rbd_support/common.py
blob: a6c041bf72948c41aa1277127b854a442e709f30 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import re

from typing import Dict, Optional, Tuple, TYPE_CHECKING, Union


GLOBAL_POOL_KEY = (None, None)


class NotAuthorizedError(Exception):
    pass


if TYPE_CHECKING:
    from rbd_support.module import Module


def is_authorized(module: 'Module',
                  pool: Optional[str],
                  namespace: Optional[str]) -> bool:
    return module.is_authorized({"pool": pool or '',
                                 "namespace": namespace or ''})


def authorize_request(module: 'Module',
                      pool: Optional[str],
                      namespace: Optional[str]) -> None:
    if not is_authorized(module, pool, namespace):
        raise NotAuthorizedError("not authorized on pool={}, namespace={}".format(
            pool, namespace))


PoolKeyT = Union[Tuple[str, str], Tuple[None, None]]


def extract_pool_key(pool_spec: Optional[str]) -> PoolKeyT:
    if not pool_spec:
        return GLOBAL_POOL_KEY

    match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec)
    if not match:
        raise ValueError("Invalid pool spec: {}".format(pool_spec))
    return (match.group(1), match.group(2) or '')


def get_rbd_pools(module: 'Module') -> Dict[int, str]:
    osd_map = module.get('osd_map')
    return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
            if 'rbd' in pool.get('application_metadata', {})}