diff options
Diffstat (limited to 'src/pybind/mgr/rbd_support/common.py')
-rw-r--r-- | src/pybind/mgr/rbd_support/common.py | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/src/pybind/mgr/rbd_support/common.py b/src/pybind/mgr/rbd_support/common.py new file mode 100644 index 000000000..a6c041bf7 --- /dev/null +++ b/src/pybind/mgr/rbd_support/common.py @@ -0,0 +1,48 @@ +import re + +from typing import Dict, Optional, Tuple, TYPE_CHECKING, Union + + +GLOBAL_POOL_KEY = (None, None) + + +class NotAuthorizedError(Exception): + pass + + +if TYPE_CHECKING: + from rbd_support.module import Module + + +def is_authorized(module: 'Module', + pool: Optional[str], + namespace: Optional[str]) -> bool: + return module.is_authorized({"pool": pool or '', + "namespace": namespace or ''}) + + +def authorize_request(module: 'Module', + pool: Optional[str], + namespace: Optional[str]) -> None: + if not is_authorized(module, pool, namespace): + raise NotAuthorizedError("not authorized on pool={}, namespace={}".format( + pool, namespace)) + + +PoolKeyT = Union[Tuple[str, str], Tuple[None, None]] + + +def extract_pool_key(pool_spec: Optional[str]) -> PoolKeyT: + if not pool_spec: + return GLOBAL_POOL_KEY + + match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec) + if not match: + raise ValueError("Invalid pool spec: {}".format(pool_spec)) + return (match.group(1), match.group(2) or '') + + +def get_rbd_pools(module: 'Module') -> Dict[int, str]: + osd_map = module.get('osd_map') + return {pool['pool']: pool['pool_name'] for pool in osd_map['pools'] + if 'rbd' in pool.get('application_metadata', {})} |