diff options
Diffstat (limited to 'src/python-common')
37 files changed, 8056 insertions, 0 deletions
diff --git a/src/python-common/.gitignore b/src/python-common/.gitignore new file mode 100644 index 000000000..c2de8195a --- /dev/null +++ b/src/python-common/.gitignore @@ -0,0 +1,3 @@ +ceph.egg-info +build +setup.cfg diff --git a/src/python-common/CMakeLists.txt b/src/python-common/CMakeLists.txt new file mode 100644 index 000000000..e89bbe2fe --- /dev/null +++ b/src/python-common/CMakeLists.txt @@ -0,0 +1,7 @@ +include(Distutils) +distutils_install_module(ceph) + +if(WITH_TESTS) + include(AddCephTest) + add_tox_test(python-common TOX_ENVS py3 lint) +endif() diff --git a/src/python-common/README.rst b/src/python-common/README.rst new file mode 100644 index 000000000..3900ec4f9 --- /dev/null +++ b/src/python-common/README.rst @@ -0,0 +1,22 @@ +ceph-python-common +================== + +This library is meant to be used to keep common data structures and +functions usable throughout the Ceph project. + +Like for example: + +- All different Cython bindings. +- MGR modules. +- ``ceph`` command line interface and other Ceph tools. +- Also external tools. + +Usage +===== + +From within the Ceph git, just import it: + +.. code:: python + + from ceph.deployment_utils import DriveGroupSpec + from ceph.exceptions import OSError diff --git a/src/python-common/ceph/__init__.py b/src/python-common/ceph/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/python-common/ceph/__init__.py diff --git a/src/python-common/ceph/deployment/__init__.py b/src/python-common/ceph/deployment/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/python-common/ceph/deployment/__init__.py diff --git a/src/python-common/ceph/deployment/drive_group.py b/src/python-common/ceph/deployment/drive_group.py new file mode 100644 index 000000000..cf24fc0ef --- /dev/null +++ b/src/python-common/ceph/deployment/drive_group.py @@ -0,0 +1,385 @@ +import enum +import yaml + +from ceph.deployment.inventory import Device +from ceph.deployment.service_spec import ( + CustomConfig, + GeneralArgList, + PlacementSpec, + ServiceSpec, +) +from ceph.deployment.hostspec import SpecValidationError + +try: + from typing import Optional, List, Dict, Any, Union +except ImportError: + pass + + +class OSDMethod(str, enum.Enum): + raw = 'raw' + lvm = 'lvm' + + def to_json(self) -> str: + return self.value + + +class DeviceSelection(object): + """ + Used within :class:`ceph.deployment.drive_group.DriveGroupSpec` to specify the devices + used by the Drive Group. + + Any attributes (even none) can be included in the device + specification structure. + """ + + _supported_filters = [ + "actuators", "paths", "size", "vendor", "model", "rotational", "limit", "all" + ] + + def __init__(self, + actuators=None, # type: Optional[int] + paths=None, # type: Optional[List[Dict[str, str]]] + model=None, # type: Optional[str] + size=None, # type: Optional[str] + rotational=None, # type: Optional[bool] + limit=None, # type: Optional[int] + vendor=None, # type: Optional[str] + all=False, # type: bool + ): + """ + ephemeral drive group device specification + """ + self.actuators = actuators + + #: List of Device objects for devices paths. + + self.paths = [] + + if paths is not None: + for device in paths: + if isinstance(device, dict): + path: str = device.get("path", '') + self.paths.append(Device(path, crush_device_class=device.get("crush_device_class", None))) # noqa E501 + else: + self.paths.append(Device(str(device))) + + #: A wildcard string. e.g: "SDD*" or "SanDisk SD8SN8U5" + self.model = model + + #: Match on the VENDOR property of the drive + self.vendor = vendor + + #: Size specification of format LOW:HIGH. + #: Can also take the form :HIGH, LOW: + #: or an exact value (as ceph-volume inventory reports) + self.size: Optional[str] = size + + #: is the drive rotating or not + self.rotational = rotational + + #: Limit the number of devices added to this Drive Group. Devices + #: are used from top to bottom in the output of ``ceph-volume inventory`` + self.limit = limit + + #: Matches all devices. Can only be used for data devices + self.all = all + + def validate(self, name: str) -> None: + props = [self.actuators, self.model, self.vendor, self.size, + self.rotational] # type: List[Any] + if self.paths and any(p is not None for p in props): + raise DriveGroupValidationError( + name, + 'device selection: `paths` and other parameters are mutually exclusive') + is_empty = not any(p is not None and p != [] for p in [self.paths] + props) + if not self.all and is_empty: + raise DriveGroupValidationError(name, 'device selection cannot be empty') + + if self.all and not is_empty: + raise DriveGroupValidationError( + name, + 'device selection: `all` and other parameters are mutually exclusive. {}'.format( + repr(self))) + + @classmethod + def from_json(cls, device_spec): + # type: (dict) -> Optional[DeviceSelection] + if not device_spec: + return None + for applied_filter in list(device_spec.keys()): + if applied_filter not in cls._supported_filters: + raise KeyError(applied_filter) + + return cls(**device_spec) + + def to_json(self): + # type: () -> Dict[str, Any] + ret: Dict[str, Any] = {} + if self.paths: + ret['paths'] = [p.path for p in self.paths] + if self.model: + ret['model'] = self.model + if self.vendor: + ret['vendor'] = self.vendor + if self.size: + ret['size'] = self.size + if self.rotational is not None: + ret['rotational'] = self.rotational + if self.limit: + ret['limit'] = self.limit + if self.all: + ret['all'] = self.all + + return ret + + def __repr__(self) -> str: + keys = [ + key for key in self._supported_filters + ['limit'] if getattr(self, key) is not None + ] + if 'paths' in keys and self.paths == []: + keys.remove('paths') + return "DeviceSelection({})".format( + ', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys) + ) + + def __eq__(self, other: Any) -> bool: + return repr(self) == repr(other) + + +class DriveGroupValidationError(SpecValidationError): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + + def __init__(self, name: Optional[str], msg: str): + name = name or "<unnamed>" + super(DriveGroupValidationError, self).__init__( + f'Failed to validate OSD spec "{name}": {msg}') + + +class DriveGroupSpec(ServiceSpec): + """ + Describe a drive group in the same form that ceph-volume + understands. + """ + + _supported_features = [ + "encrypted", "block_wal_size", "osds_per_device", + "db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type", + "data_devices", "db_devices", "wal_devices", "journal_devices", + "data_directories", "osds_per_device", "objectstore", "osd_id_claims", + "journal_size", "unmanaged", "filter_logic", "preview_only", "extra_container_args", + "extra_entrypoint_args", "data_allocate_fraction", "method", "crush_device_class", "config", + ] + + def __init__(self, + placement=None, # type: Optional[PlacementSpec] + service_id=None, # type: Optional[str] + data_devices=None, # type: Optional[DeviceSelection] + db_devices=None, # type: Optional[DeviceSelection] + wal_devices=None, # type: Optional[DeviceSelection] + journal_devices=None, # type: Optional[DeviceSelection] + data_directories=None, # type: Optional[List[str]] + osds_per_device=None, # type: Optional[int] + objectstore='bluestore', # type: str + encrypted=False, # type: bool + db_slots=None, # type: Optional[int] + wal_slots=None, # type: Optional[int] + osd_id_claims=None, # type: Optional[Dict[str, List[str]]] + block_db_size=None, # type: Union[int, str, None] + block_wal_size=None, # type: Union[int, str, None] + journal_size=None, # type: Union[int, str, None] + service_type=None, # type: Optional[str] + unmanaged=False, # type: bool + filter_logic='AND', # type: str + preview_only=False, # type: bool + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + data_allocate_fraction=None, # type: Optional[float] + method=None, # type: Optional[OSDMethod] + config=None, # type: Optional[Dict[str, str]] + custom_configs=None, # type: Optional[List[CustomConfig]] + crush_device_class=None, # type: Optional[str] + ): + assert service_type is None or service_type == 'osd' + super(DriveGroupSpec, self).__init__('osd', service_id=service_id, + placement=placement, + config=config, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.data_devices = data_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.db_devices = db_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.wal_devices = wal_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.journal_devices = journal_devices + + #: Set (or override) the "bluestore_block_wal_size" value, in bytes + self.block_wal_size: Union[int, str, None] = block_wal_size + + #: Set (or override) the "bluestore_block_db_size" value, in bytes + self.block_db_size: Union[int, str, None] = block_db_size + + #: set journal_size in bytes + self.journal_size: Union[int, str, None] = journal_size + + #: Number of osd daemons per "DATA" device. + #: To fully utilize nvme devices multiple osds are required. + #: Can be used to split dual-actuator devices across 2 OSDs, by setting the option to 2. + self.osds_per_device = osds_per_device + + #: A list of strings, containing paths which should back OSDs + self.data_directories = data_directories + + #: ``filestore`` or ``bluestore`` + self.objectstore = objectstore + + #: ``true`` or ``false`` + self.encrypted = encrypted + + #: How many OSDs per DB device + self.db_slots = db_slots + + #: How many OSDs per WAL device + self.wal_slots = wal_slots + + #: Optional: mapping of host -> List of osd_ids that should be replaced + #: See :ref:`orchestrator-osd-replace` + self.osd_id_claims = osd_id_claims or dict() + + #: The logic gate we use to match disks with filters. + #: defaults to 'AND' + self.filter_logic = filter_logic.upper() + + #: If this should be treated as a 'preview' spec + self.preview_only = preview_only + + #: Allocate a fraction of the data device (0,1.0] + self.data_allocate_fraction = data_allocate_fraction + + self.method = method + + #: Crush device class to assign to OSDs + self.crush_device_class = crush_device_class + + @classmethod + def _from_json_impl(cls, json_drive_group): + # type: (dict) -> DriveGroupSpec + """ + Initialize 'Drive group' structure + + :param json_drive_group: A valid json string with a Drive Group + specification + """ + args: Dict[str, Any] = json_drive_group.copy() + # legacy json (pre Octopus) + if 'host_pattern' in args and 'placement' not in args: + args['placement'] = {'host_pattern': args['host_pattern']} + del args['host_pattern'] + + s_id = args.get('service_id', '<unnamed>') + + # spec: was not mandatory in octopus + if 'spec' in args: + args['spec'].update(cls._drive_group_spec_from_json(s_id, args['spec'])) + args.update(cls._drive_group_spec_from_json( + s_id, {k: v for k, v in args.items() if k != 'spec'})) + + return super(DriveGroupSpec, cls)._from_json_impl(args) + + @classmethod + def _drive_group_spec_from_json(cls, name: str, json_drive_group: dict) -> dict: + for applied_filter in list(json_drive_group.keys()): + if applied_filter not in cls._supported_features: + raise DriveGroupValidationError( + name, + "Feature `{}` is not supported".format(applied_filter)) + + try: + def to_selection(key: str, vals: dict) -> Optional[DeviceSelection]: + try: + return DeviceSelection.from_json(vals) + except KeyError as e: + raise DriveGroupValidationError( + f'{name}.{key}', + f"Filtering for `{e.args[0]}` is not supported") + + args = {k: (to_selection(k, v) if k.endswith('_devices') else v) for k, v in + json_drive_group.items()} + if not args: + raise DriveGroupValidationError(name, "Didn't find drive selections") + return args + except (KeyError, TypeError) as e: + raise DriveGroupValidationError(name, str(e)) + + def validate(self): + # type: () -> None + super(DriveGroupSpec, self).validate() + + if self.placement.is_empty(): + raise DriveGroupValidationError(self.service_id, '`placement` required') + + if self.data_devices is None: + raise DriveGroupValidationError(self.service_id, "`data_devices` element is required.") + + specs_names = "data_devices db_devices wal_devices journal_devices".split() + specs = dict(zip(specs_names, [getattr(self, k) for k in specs_names])) + for k, s in [ks for ks in specs.items() if ks[1] is not None]: + assert s is not None + s.validate(f'{self.service_id}.{k}') + for s in filter(None, [self.db_devices, self.wal_devices, self.journal_devices]): + if s.all: + raise DriveGroupValidationError( + self.service_id, + "`all` is only allowed for data_devices") + + if self.objectstore not in ('bluestore'): + raise DriveGroupValidationError(self.service_id, + f"{self.objectstore} is not supported. Must be " + f"one of ('bluestore')") + + if self.block_wal_size is not None and type(self.block_wal_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'block_wal_size must be of type int or string') + if self.block_db_size is not None and type(self.block_db_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'block_db_size must be of type int or string') + if self.journal_size is not None and type(self.journal_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'journal_size must be of type int or string') + + if self.filter_logic not in ['AND', 'OR']: + raise DriveGroupValidationError( + self.service_id, + 'filter_logic must be either <AND> or <OR>') + + if self.method not in [None, 'lvm', 'raw']: + raise DriveGroupValidationError( + self.service_id, + 'method must be one of None, lvm, raw') + if self.method == 'raw' and self.objectstore == 'filestore': + raise DriveGroupValidationError( + self.service_id, + 'method raw only supports bluestore') + + if self.data_devices.paths is not None: + for device in list(self.data_devices.paths): + if not device.path: + raise DriveGroupValidationError(self.service_id, 'Device path cannot be empty') # noqa E501 + + +yaml.add_representer(DriveGroupSpec, DriveGroupSpec.yaml_representer) diff --git a/src/python-common/ceph/deployment/drive_selection/__init__.py b/src/python-common/ceph/deployment/drive_selection/__init__.py new file mode 100644 index 000000000..994e2f2da --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/__init__.py @@ -0,0 +1,2 @@ +from .selector import DriveSelection # NOQA +from .matchers import Matcher, SubstringMatcher, EqualityMatcher, AllMatcher, SizeMatcher # NOQA diff --git a/src/python-common/ceph/deployment/drive_selection/example.yaml b/src/python-common/ceph/deployment/drive_selection/example.yaml new file mode 100644 index 000000000..2851e7dbb --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/example.yaml @@ -0,0 +1,21 @@ +# default: +# target: 'data*' +# data_devices: +# size: 20G +# db_devices: +# size: 10G +# rotational: 1 +# allflash: +# target: 'fast_nodes*' +# data_devices: +# size: 100G +# db_devices: +# size: 50G +# rotational: 0 + +# This is the default configuration and +# will create an OSD on all available drives +default: + target: 'fnmatch_target' + data_devices: + all: true diff --git a/src/python-common/ceph/deployment/drive_selection/filter.py b/src/python-common/ceph/deployment/drive_selection/filter.py new file mode 100644 index 000000000..0da1b5c39 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/filter.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- + +import logging + +from ceph.deployment.drive_group import DeviceSelection + +try: + from typing import Generator +except ImportError: + pass + +from .matchers import Matcher, SubstringMatcher, AllMatcher, SizeMatcher, EqualityMatcher + +logger = logging.getLogger(__name__) + + +class FilterGenerator(object): + def __init__(self, device_filter): + # type: (DeviceSelection) -> None + self.device_filter = device_filter + + def __iter__(self): + # type: () -> Generator[Matcher, None, None] + if self.device_filter.actuators: + yield EqualityMatcher('actuators', self.device_filter.actuators) + if self.device_filter.size: + yield SizeMatcher('size', self.device_filter.size) + if self.device_filter.model: + yield SubstringMatcher('model', self.device_filter.model) + if self.device_filter.vendor: + yield SubstringMatcher('vendor', self.device_filter.vendor) + if self.device_filter.rotational is not None: + val = '1' if self.device_filter.rotational else '0' + yield EqualityMatcher('rotational', val) + if self.device_filter.all: + yield AllMatcher('all', str(self.device_filter.all)) diff --git a/src/python-common/ceph/deployment/drive_selection/matchers.py b/src/python-common/ceph/deployment/drive_selection/matchers.py new file mode 100644 index 000000000..df502410a --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/matchers.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 -*- + +from typing import Tuple, Optional, Any, Union, Iterator + +from ceph.deployment.inventory import Device + +import re +import logging + +logger = logging.getLogger(__name__) + + +class _MatchInvalid(Exception): + pass + + +# pylint: disable=too-few-public-methods +class Matcher(object): + """ The base class to all Matchers + + It holds utility methods such as _get_disk_key + and handles the initialization. + + """ + + def __init__(self, key, value): + # type: (str, Any) -> None + """ Initialization of Base class + + :param str key: Attribute like 'model, size or vendor' + :param str value: Value of attribute like 'X123, 5G or samsung' + """ + self.key = key + self.value = value + self.fallback_key = '' # type: Optional[str] + + def _get_disk_key(self, device): + # type: (Device) -> Any + """ Helper method to safely extract values form the disk dict + + There is a 'key' and a _optional_ 'fallback' key that can be used. + The reason for this is that the output of ceph-volume is not always + consistent (due to a bug currently, but you never know). + There is also a safety measure for a disk_key not existing on + virtual environments. ceph-volume apparently sources its information + from udev which seems to not populate certain fields on VMs. + + :raises: A generic Exception when no disk_key could be found. + :return: A disk value + :rtype: str + """ + # using the . notation, but some keys are nested, and hidden behind + # a different hierarchy, which makes it harder to access programatically + # hence, make it a dict. + disk = device.to_json() + + def findkeys(node: Union[list, dict], key_val: str) -> Iterator[str]: + """ Find keys in non-flat dict recursively """ + if isinstance(node, list): + for i in node: + for key in findkeys(i, key_val): + yield key + elif isinstance(node, dict): + if key_val in node: + yield node[key_val] + for j in node.values(): + for key in findkeys(j, key_val): + yield key + + disk_value = list(findkeys(disk, self.key)) + if not disk_value and self.fallback_key: + disk_value = list(findkeys(disk, self.fallback_key)) + + if disk_value: + return disk_value[0] + else: + raise _MatchInvalid("No value found for {} or {}".format( + self.key, self.fallback_key)) + + def compare(self, disk): + # type: (Device) -> bool + """ Implements a valid comparison method for a SubMatcher + This will get overwritten by the individual classes + + :param dict disk: A disk representation + """ + raise NotImplementedError + + +# pylint: disable=too-few-public-methods +class SubstringMatcher(Matcher): + """ Substring matcher subclass + """ + + def __init__(self, key, value, fallback_key=None): + # type: (str, str, Optional[str]) -> None + Matcher.__init__(self, key, value) + self.fallback_key = fallback_key + + def compare(self, disk): + # type: (Device) -> bool + """ Overwritten method to match substrings + + This matcher does substring matching + :param dict disk: A disk representation (see base for examples) + :return: True/False if the match succeeded + :rtype: bool + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + if str(self.value) in disk_value: + return True + return False + + +# pylint: disable=too-few-public-methods +class AllMatcher(Matcher): + """ All matcher subclass + """ + + def __init__(self, key, value, fallback_key=None): + # type: (str, Any, Optional[str]) -> None + + Matcher.__init__(self, key, value) + self.fallback_key = fallback_key + + def compare(self, disk): + # type: (Device) -> bool + + """ Overwritten method to match all + + A rather dumb matcher that just accepts all disks + (regardless of the value) + :param dict disk: A disk representation (see base for examples) + :return: always True + :rtype: bool + """ + if not disk: + return False + return True + + +# pylint: disable=too-few-public-methods +class EqualityMatcher(Matcher): + """ Equality matcher subclass + """ + + def __init__(self, key, value): + # type: (str, Any) -> None + + Matcher.__init__(self, key, value) + + def compare(self, disk): + # type: (Device) -> bool + + """ Overwritten method to match equality + + This matcher does value comparison + :param dict disk: A disk representation + :return: True/False if the match succeeded + :rtype: bool + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + ret = disk_value == self.value + if not ret: + logger.debug('{} != {}'.format(disk_value, self.value)) + return ret + + +class SizeMatcher(Matcher): + """ Size matcher subclass + """ + + SUFFIXES = ( + ["KB", "MB", "GB", "TB"], + ["K", "M", "G", "T"], + [1e+3, 1e+6, 1e+9, 1e+12] + ) + + supported_suffixes = SUFFIXES[0] + SUFFIXES[1] + + # pylint: disable=too-many-instance-attributes + def __init__(self, key, value): + # type: (str, str) -> None + + # The 'key' value is overwritten here because + # the user_defined attribute does not necessarily + # correspond to the desired attribute + # requested from the inventory output + Matcher.__init__(self, key, value) + self.key = "human_readable_size" + self.fallback_key = "size" + self._high = None # type: Optional[str] + self._high_suffix = None # type: Optional[str] + self._low = None # type: Optional[str] + self._low_suffix = None # type: Optional[str] + self._exact = None # type: Optional[str] + self._exact_suffix = None # type: Optional[str] + self._parse_filter() + + @property + def low(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'low' matchers + """ + return self._low, self._low_suffix + + @low.setter + def low(self, low): + # type: (Tuple[str, str]) -> None + """ Setter for 'low' matchers + """ + self._low, self._low_suffix = low + + @property + def high(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'high' matchers + """ + return self._high, self._high_suffix + + @high.setter + def high(self, high): + # type: (Tuple[str, str]) -> None + """ Setter for 'high' matchers + """ + self._high, self._high_suffix = high + + @property + def exact(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'exact' matchers + """ + return self._exact, self._exact_suffix + + @exact.setter + def exact(self, exact): + # type: (Tuple[str, str]) -> None + """ Setter for 'exact' matchers + """ + self._exact, self._exact_suffix = exact + + @classmethod + def _normalize_suffix(cls, suffix): + # type: (str) -> str + """ Normalize any supported suffix + + Since the Drive Groups are user facing, we simply + can't make sure that all users type in the requested + form. That's why we have to internally agree on one format. + It also checks if any of the supported suffixes was used + and raises an Exception otherwise. + + :param str suffix: A suffix ('G') or ('M') + :return: A normalized output + :rtype: str + """ + suffix = suffix.upper() + if suffix not in cls.supported_suffixes: + raise _MatchInvalid("Unit '{}' not supported".format(suffix)) + return dict(zip( + cls.SUFFIXES[1], + cls.SUFFIXES[0], + )).get(suffix, suffix) + + @classmethod + def _parse_suffix(cls, obj): + # type: (str) -> str + """ Wrapper method to find and normalize a prefix + + :param str obj: A size filtering string ('10G') + :return: A normalized unit ('GB') + :rtype: str + """ + return cls._normalize_suffix(re.findall(r"[a-zA-Z]+", obj)[0]) + + @classmethod + def _get_k_v(cls, data): + # type: (str) -> Tuple[str, str] + """ Helper method to extract data from a string + + It uses regex to extract all digits and calls _parse_suffix + which also uses a regex to extract all letters and normalizes + the resulting suffix. + + :param str data: A size filtering string ('10G') + :return: A Tuple with normalized output (10, 'GB') + :rtype: tuple + """ + return re.findall(r"\d+\.?\d*", data)[0], cls._parse_suffix(data) + + def _parse_filter(self) -> None: + """ Identifies which type of 'size' filter is applied + + There are four different filtering modes: + + 1) 10G:50G (high-low) + At least 10G but at max 50G of size + + 2) :60G + At max 60G of size + + 3) 50G: + At least 50G of size + + 4) 20G + Exactly 20G in size + + This method uses regex to identify and extract this information + and raises if none could be found. + """ + low_high = re.match(r"\d+[A-Z]{1,2}:\d+[A-Z]{1,2}", self.value) + if low_high is not None: + lowpart, highpart = low_high.group().split(":") + self.low = self._get_k_v(lowpart) + self.high = self._get_k_v(highpart) + + low = re.match(r"\d+[A-Z]{1,2}:$", self.value) + if low: + self.low = self._get_k_v(low.group()) + + high = re.match(r"^:\d+[A-Z]{1,2}", self.value) + if high: + self.high = self._get_k_v(high.group()) + + exact = re.match(r"^\d+\.?\d*[A-Z]{1,2}$", self.value) + if exact: + self.exact = self._get_k_v(exact.group()) + + if not self.low and not self.high and not self.exact: + raise _MatchInvalid("Couldn't parse {}".format(self.value)) + + @staticmethod + # pylint: disable=inconsistent-return-statements + def to_byte(tpl): + # type: (Tuple[Optional[str], Optional[str]]) -> float + + """ Convert any supported unit to bytes + + :param tuple tpl: A tuple with ('10', 'GB') + :return: The converted byte value + :rtype: float + """ + val_str, suffix = tpl + value = float(val_str) if val_str is not None else 0.0 + return dict(zip( + SizeMatcher.SUFFIXES[0], + SizeMatcher.SUFFIXES[2], + )).get(str(suffix), 0.00) * value + + @staticmethod + def str_to_byte(input): + # type: (str) -> float + return SizeMatcher.to_byte(SizeMatcher._get_k_v(input)) + + # pylint: disable=inconsistent-return-statements, too-many-return-statements + def compare(self, disk): + # type: (Device) -> bool + """ Convert MB/GB/TB down to bytes and compare + + 1) Extracts information from the to-be-inspected disk. + 2) Depending on the mode, apply checks and return + + # This doesn't seem very solid and _may_ + be re-factored + + + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + # This doesn't necessarily have to be a float. + # The current output from ceph-volume gives a float.. + # This may change in the future.. + # todo: harden this paragraph + if not disk_value: + logger.warning("Could not retrieve value for disk") + return False + + disk_size = re.findall(r"\d+\.\d+", disk_value)[0] + disk_suffix = self._parse_suffix(disk_value) + disk_size_in_byte = self.to_byte((disk_size, disk_suffix)) + + if all(self.high) and all(self.low): + if disk_size_in_byte <= self.to_byte( + self.high) and disk_size_in_byte >= self.to_byte(self.low): + return True + # is a else: return False necessary here? + # (and in all other branches) + logger.debug("Disk didn't match for 'high/low' filter") + + elif all(self.low) and not all(self.high): + if disk_size_in_byte >= self.to_byte(self.low): + return True + logger.debug("Disk didn't match for 'low' filter") + + elif all(self.high) and not all(self.low): + if disk_size_in_byte <= self.to_byte(self.high): + return True + logger.debug("Disk didn't match for 'high' filter") + + elif all(self.exact): + if disk_size_in_byte == self.to_byte(self.exact): + return True + logger.debug("Disk didn't match for 'exact' filter") + else: + logger.debug("Neither high, low, nor exact was given") + raise _MatchInvalid("No filters applied") + return False diff --git a/src/python-common/ceph/deployment/drive_selection/selector.py b/src/python-common/ceph/deployment/drive_selection/selector.py new file mode 100644 index 000000000..1b3bfbb4e --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/selector.py @@ -0,0 +1,191 @@ +import logging + +from typing import List, Optional, Dict, Callable + +from ..inventory import Device +from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError + +from .filter import FilterGenerator +from .matchers import _MatchInvalid + +logger = logging.getLogger(__name__) + + +def to_dg_exception(f: Callable) -> Callable[['DriveSelection', str, + Optional['DeviceSelection']], + List['Device']]: + def wrapper(self: 'DriveSelection', name: str, ds: Optional['DeviceSelection']) -> List[Device]: + try: + return f(self, ds) + except _MatchInvalid as e: + raise DriveGroupValidationError(f'{self.spec.service_id}.{name}', e.args[0]) + return wrapper + + +class DriveSelection(object): + def __init__(self, + spec, # type: DriveGroupSpec + disks, # type: List[Device] + existing_daemons=None, # type: Optional[int] + ): + self.disks = disks.copy() + self.spec = spec + self.existing_daemons = existing_daemons or 0 + + self._data = self.assign_devices('data_devices', self.spec.data_devices) + self._wal = self.assign_devices('wal_devices', self.spec.wal_devices) + self._db = self.assign_devices('db_devices', self.spec.db_devices) + self._journal = self.assign_devices('journal_devices', self.spec.journal_devices) + + def data_devices(self): + # type: () -> List[Device] + return self._data + + def wal_devices(self): + # type: () -> List[Device] + return self._wal + + def db_devices(self): + # type: () -> List[Device] + return self._db + + def journal_devices(self): + # type: () -> List[Device] + return self._journal + + def _limit_reached(self, device_filter, len_devices, + disk_path): + # type: (DeviceSelection, int, str) -> bool + """ Check for the <limit> property and apply logic + + If a limit is set in 'device_attrs' we have to stop adding + disks at some point. + + If limit is set (>0) and len(devices) >= limit + + :param int len_devices: Length of the already populated device set/list + :param str disk_path: The disk identifier (for logging purposes) + :return: True/False if the device should be added to the list of devices + :rtype: bool + """ + limit = device_filter.limit or 0 + + if limit > 0 and (len_devices + self.existing_daemons >= limit): + logger.debug("Refuse to add {} due to limit policy of <{}>".format( + disk_path, limit)) + return True + return False + + @staticmethod + def _has_mandatory_idents(disk): + # type: (Device) -> bool + """ Check for mandatory identification fields + """ + if disk.path: + logger.debug("Found matching disk: {}".format(disk.path)) + return True + else: + raise Exception( + "Disk {} doesn't have a 'path' identifier".format(disk)) + + @to_dg_exception + def assign_devices(self, device_filter): + # type: (Optional[DeviceSelection]) -> List[Device] + """ Assign drives based on used filters + + Do not add disks when: + + 1) Filter didn't match + 2) Disk doesn't have a mandatory identification item (path) + 3) The set :limit was reached + + After the disk was added we make sure not to re-assign this disk + for another defined type[wal/db/journal devices] + + return a sorted(by path) list of devices + """ + + if not device_filter: + logger.debug('device_filter is None') + return [] + + if not self.spec.data_devices: + logger.debug('data_devices is None') + return [] + + if device_filter.paths: + logger.debug('device filter is using explicit paths') + return device_filter.paths + + devices = list() # type: List[Device] + for disk in self.disks: + logger.debug("Processing disk {}".format(disk.path)) + + if not disk.available and not disk.ceph_device: + logger.debug( + ("Ignoring disk {}. " + "Disk is unavailable due to {}".format(disk.path, disk.rejected_reasons)) + ) + continue + + if not disk.available and disk.ceph_device and disk.lvs: + other_osdspec_affinity = '' + for lv in disk.lvs: + if 'osdspec_affinity' in lv.keys(): + if lv['osdspec_affinity'] != self.spec.service_id: + other_osdspec_affinity = lv['osdspec_affinity'] + break + if other_osdspec_affinity: + logger.debug("{} is already used in spec {}, " + "skipping it.".format(disk.path, other_osdspec_affinity)) + continue + + if not self._has_mandatory_idents(disk): + logger.debug( + "Ignoring disk {}. Missing mandatory idents".format( + disk.path)) + continue + + # break on this condition. + if self._limit_reached(device_filter, len(devices), disk.path): + logger.debug("Ignoring disk {}. Limit reached".format( + disk.path)) + break + + if disk in devices: + continue + + if self.spec.filter_logic == 'AND': + if not all(m.compare(disk) for m in FilterGenerator(device_filter)): + logger.debug( + "Ignoring disk {}. Not all filter did match the disk".format( + disk.path)) + continue + + if self.spec.filter_logic == 'OR': + if not any(m.compare(disk) for m in FilterGenerator(device_filter)): + logger.debug( + "Ignoring disk {}. No filter matched the disk".format( + disk.path)) + continue + + logger.debug('Adding disk {}'.format(disk.path)) + devices.append(disk) + + # This disk is already taken and must not be re-assigned. + for taken_device in devices: + if taken_device in self.disks: + self.disks.remove(taken_device) + + return sorted([x for x in devices], key=lambda dev: dev.path) + + def __repr__(self) -> str: + selection: Dict[str, List[str]] = { + 'data devices': [d.path for d in self._data], + 'wal_devices': [d.path for d in self._wal], + 'db devices': [d.path for d in self._db], + 'journal devices': [d.path for d in self._journal] + } + return "DeviceSelection({})".format( + ', '.join('{}={}'.format(key, selection[key]) for key in selection.keys()) + ) diff --git a/src/python-common/ceph/deployment/hostspec.py b/src/python-common/ceph/deployment/hostspec.py new file mode 100644 index 000000000..0c448bf13 --- /dev/null +++ b/src/python-common/ceph/deployment/hostspec.py @@ -0,0 +1,137 @@ +from collections import OrderedDict +import errno +import re +from typing import Optional, List, Any, Dict + + +def assert_valid_host(name: str) -> None: + p = re.compile('^[a-zA-Z0-9-]+$') + try: + assert len(name) <= 250, 'name is too long (max 250 chars)' + for part in name.split('.'): + assert len(part) > 0, '.-delimited name component must not be empty' + assert len(part) <= 63, '.-delimited name component must not be more than 63 chars' + assert p.match(part), 'name component must include only a-z, 0-9, and -' + except AssertionError as e: + raise SpecValidationError(str(e) + f'. Got "{name}"') + + +class SpecValidationError(Exception): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + def __init__(self, + msg: str, + errno: int = -errno.EINVAL): + super(SpecValidationError, self).__init__(msg) + self.errno = errno + + +class HostSpec(object): + """ + Information about hosts. Like e.g. ``kubectl get nodes`` + """ + def __init__(self, + hostname: str, + addr: Optional[str] = None, + labels: Optional[List[str]] = None, + status: Optional[str] = None, + location: Optional[Dict[str, str]] = None, + ): + self.service_type = 'host' + + #: the bare hostname on the host. Not the FQDN. + self.hostname = hostname # type: str + + #: DNS name or IP address to reach it + self.addr = addr or hostname # type: str + + #: label(s), if any + self.labels = labels or [] # type: List[str] + + #: human readable status + self.status = status or '' # type: str + + self.location = location + + def validate(self) -> None: + assert_valid_host(self.hostname) + + def to_json(self) -> Dict[str, Any]: + r: Dict[str, Any] = { + 'hostname': self.hostname, + 'addr': self.addr, + 'labels': list(OrderedDict.fromkeys((self.labels))), + 'status': self.status, + } + if self.location: + r['location'] = self.location + return r + + @classmethod + def from_json(cls, host_spec: dict) -> 'HostSpec': + host_spec = cls.normalize_json(host_spec) + _cls = cls( + host_spec['hostname'], + host_spec['addr'] if 'addr' in host_spec else None, + list(OrderedDict.fromkeys( + host_spec['labels'])) if 'labels' in host_spec else None, + host_spec['status'] if 'status' in host_spec else None, + host_spec.get('location'), + ) + return _cls + + @staticmethod + def normalize_json(host_spec: dict) -> dict: + labels = host_spec.get('labels') + if labels is not None: + if isinstance(labels, str): + host_spec['labels'] = [labels] + elif ( + not isinstance(labels, list) + or any(not isinstance(v, str) for v in labels) + ): + raise SpecValidationError( + f'Labels ({labels}) must be a string or list of strings' + ) + + loc = host_spec.get('location') + if loc is not None: + if ( + not isinstance(loc, dict) + or any(not isinstance(k, str) for k in loc.keys()) + or any(not isinstance(v, str) for v in loc.values()) + ): + raise SpecValidationError( + f'Location ({loc}) must be a dictionary of strings to strings' + ) + + return host_spec + + def __repr__(self) -> str: + args = [self.hostname] # type: List[Any] + if self.addr is not None: + args.append(self.addr) + if self.labels: + args.append(self.labels) + if self.status: + args.append(self.status) + if self.location: + args.append(self.location) + + return "HostSpec({})".format(', '.join(map(repr, args))) + + def __str__(self) -> str: + if self.hostname != self.addr: + return f'{self.hostname} ({self.addr})' + return self.hostname + + def __eq__(self, other: Any) -> bool: + # Let's omit `status` for the moment, as it is still the very same host. + if not isinstance(other, HostSpec): + return NotImplemented + return self.hostname == other.hostname and \ + self.addr == other.addr and \ + sorted(self.labels) == sorted(other.labels) and \ + self.location == other.location diff --git a/src/python-common/ceph/deployment/inventory.py b/src/python-common/ceph/deployment/inventory.py new file mode 100644 index 000000000..a30238821 --- /dev/null +++ b/src/python-common/ceph/deployment/inventory.py @@ -0,0 +1,138 @@ +try: + from typing import List, Optional, Dict, Any, Union +except ImportError: + pass # for type checking + +from ceph.utils import datetime_now, datetime_to_str, str_to_datetime +import datetime +import json + + +class Devices(object): + """ + A container for Device instances with reporting + """ + + def __init__(self, devices): + # type: (List[Device]) -> None + # sort devices by path name so ordering is consistent + self.devices: List[Device] = sorted(devices, key=lambda d: d.path if d.path else '') + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Devices): + return NotImplemented + if len(self.devices) != len(other.devices): + return False + for d1, d2 in zip(other.devices, self.devices): + if d1 != d2: + return False + return True + + def to_json(self): + # type: () -> List[dict] + return [d.to_json() for d in self.devices] + + @classmethod + def from_json(cls, input): + # type: (List[Dict[str, Any]]) -> Devices + return cls([Device.from_json(i) for i in input]) + + def copy(self): + # type: () -> Devices + return Devices(devices=list(self.devices)) + + +class Device(object): + report_fields = [ + 'ceph_device', + 'rejected_reasons', + 'available', + 'path', + 'sys_api', + 'created', + 'lvs', + 'human_readable_type', + 'device_id', + 'lsm_data', + 'crush_device_class' + ] + + def __init__(self, + path, # type: str + sys_api=None, # type: Optional[Dict[str, Any]] + available=None, # type: Optional[bool] + rejected_reasons=None, # type: Optional[List[str]] + lvs=None, # type: Optional[List[Dict[str, str]]] + device_id=None, # type: Optional[str] + lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]] + created=None, # type: Optional[datetime.datetime] + ceph_device=None, # type: Optional[bool] + crush_device_class=None # type: Optional[str] + ): + + self.path = path + self.sys_api = sys_api if sys_api is not None else {} # type: Dict[str, Any] + self.available = available + self.rejected_reasons = rejected_reasons if rejected_reasons is not None else [] + self.lvs = lvs + self.device_id = device_id + self.lsm_data = lsm_data if lsm_data is not None else {} # type: Dict[str, Dict[str, str]] + self.created = created if created is not None else datetime_now() + self.ceph_device = ceph_device + self.crush_device_class = crush_device_class + + def __eq__(self, other): + # type: (Any) -> bool + if not isinstance(other, Device): + return NotImplemented + diff = [k for k in self.report_fields if k != 'created' and (getattr(self, k) + != getattr(other, k))] + return not diff + + def to_json(self): + # type: () -> dict + return { + k: (getattr(self, k) if k != 'created' + or not isinstance(getattr(self, k), datetime.datetime) + else datetime_to_str(getattr(self, k))) + for k in self.report_fields + } + + @classmethod + def from_json(cls, input): + # type: (Dict[str, Any]) -> Device + if not isinstance(input, dict): + raise ValueError('Device: Expected dict. Got `{}...`'.format(json.dumps(input)[:10])) + ret = cls( + **{ + key: (input.get(key, None) if key != 'created' + or not input.get(key, None) + else str_to_datetime(input.get(key, None))) + for key in Device.report_fields + if key != 'human_readable_type' + } + ) + if ret.rejected_reasons: + ret.rejected_reasons = sorted(ret.rejected_reasons) + return ret + + @property + def human_readable_type(self): + # type: () -> str + if self.sys_api is None or 'rotational' not in self.sys_api: + return "unknown" + return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd' + + def __repr__(self) -> str: + device_desc: Dict[str, Union[str, List[str], List[Dict[str, str]]]] = { + 'path': self.path if self.path is not None else 'unknown', + 'lvs': self.lvs if self.lvs else 'None', + 'available': str(self.available), + 'ceph_device': str(self.ceph_device), + 'crush_device_class': str(self.crush_device_class) + } + if not self.available and self.rejected_reasons: + device_desc['rejection reasons'] = self.rejected_reasons + return "Device({})".format( + ', '.join('{}={}'.format(key, device_desc[key]) for key in device_desc.keys()) + ) diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py new file mode 100644 index 000000000..be9f3e8ea --- /dev/null +++ b/src/python-common/ceph/deployment/service_spec.py @@ -0,0 +1,2011 @@ +import fnmatch +import os +import re +import enum +from collections import OrderedDict +from contextlib import contextmanager +from functools import wraps +from ipaddress import ip_network, ip_address +from typing import Optional, Dict, Any, List, Union, Callable, Iterable, Type, TypeVar, cast, \ + NamedTuple, Mapping, Iterator + +import yaml + +from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host +from ceph.deployment.utils import unwrap_ipv6, valid_addr +from ceph.utils import is_hex + +ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec') +FuncT = TypeVar('FuncT', bound=Callable) + + +def handle_type_error(method: FuncT) -> FuncT: + @wraps(method) + def inner(cls: Any, *args: Any, **kwargs: Any) -> Any: + try: + return method(cls, *args, **kwargs) + except (TypeError, AttributeError) as e: + error_msg = '{}: {}'.format(cls.__name__, e) + raise SpecValidationError(error_msg) + return cast(FuncT, inner) + + +class HostPlacementSpec(NamedTuple): + hostname: str + network: str + name: str + + def __str__(self) -> str: + res = '' + res += self.hostname + if self.network: + res += ':' + self.network + if self.name: + res += '=' + self.name + return res + + @classmethod + @handle_type_error + def from_json(cls, data: Union[dict, str]) -> 'HostPlacementSpec': + if isinstance(data, str): + return cls.parse(data) + return cls(**data) + + def to_json(self) -> str: + return str(self) + + @classmethod + def parse(cls, host, require_network=True): + # type: (str, bool) -> HostPlacementSpec + """ + Split host into host, network, and (optional) daemon name parts. The network + part can be an IP, CIDR, or ceph addrvec like '[v2:1.2.3.4:3300,v1:1.2.3.4:6789]'. + e.g., + "myhost" + "myhost=name" + "myhost:1.2.3.4" + "myhost:1.2.3.4=name" + "myhost:1.2.3.0/24" + "myhost:1.2.3.0/24=name" + "myhost:[v2:1.2.3.4:3000]=name" + "myhost:[v2:1.2.3.4:3000,v1:1.2.3.4:6789]=name" + """ + # Matches from start to : or = or until end of string + host_re = r'^(.*?)(:|=|$)' + # Matches from : to = or until end of string + ip_re = r':(.*?)(=|$)' + # Matches from = to end of string + name_re = r'=(.*?)$' + + # assign defaults + host_spec = cls('', '', '') + + match_host = re.search(host_re, host) + if match_host: + host_spec = host_spec._replace(hostname=match_host.group(1)) + + name_match = re.search(name_re, host) + if name_match: + host_spec = host_spec._replace(name=name_match.group(1)) + + ip_match = re.search(ip_re, host) + if ip_match: + host_spec = host_spec._replace(network=ip_match.group(1)) + + if not require_network: + return host_spec + + networks = list() # type: List[str] + network = host_spec.network + # in case we have [v2:1.2.3.4:3000,v1:1.2.3.4:6478] + if ',' in network: + networks = [x for x in network.split(',')] + else: + if network != '': + networks.append(network) + + for network in networks: + # only if we have versioned network configs + if network.startswith('v') or network.startswith('[v'): + # if this is ipv6 we can't just simply split on ':' so do + # a split once and rsplit once to leave us with just ipv6 addr + network = network.split(':', 1)[1] + network = network.rsplit(':', 1)[0] + try: + # if subnets are defined, also verify the validity + if '/' in network: + ip_network(network) + else: + ip_address(unwrap_ipv6(network)) + except ValueError as e: + # logging? + raise e + host_spec.validate() + return host_spec + + def validate(self) -> None: + assert_valid_host(self.hostname) + + +class PlacementSpec(object): + """ + For APIs that need to specify a host subset + """ + + def __init__(self, + label=None, # type: Optional[str] + hosts=None, # type: Union[List[str],List[HostPlacementSpec], None] + count=None, # type: Optional[int] + count_per_host=None, # type: Optional[int] + host_pattern=None, # type: Optional[str] + ): + # type: (...) -> None + self.label = label + self.hosts = [] # type: List[HostPlacementSpec] + + if hosts: + self.set_hosts(hosts) + + self.count = count # type: Optional[int] + self.count_per_host = count_per_host # type: Optional[int] + + #: fnmatch patterns to select hosts. Can also be a single host. + self.host_pattern = host_pattern # type: Optional[str] + + self.validate() + + def is_empty(self) -> bool: + return ( + self.label is None + and not self.hosts + and not self.host_pattern + and self.count is None + and self.count_per_host is None + ) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, PlacementSpec): + return self.label == other.label \ + and self.hosts == other.hosts \ + and self.count == other.count \ + and self.host_pattern == other.host_pattern \ + and self.count_per_host == other.count_per_host + return NotImplemented + + def set_hosts(self, hosts: Union[List[str], List[HostPlacementSpec]]) -> None: + # To backpopulate the .hosts attribute when using labels or count + # in the orchestrator backend. + if all([isinstance(host, HostPlacementSpec) for host in hosts]): + self.hosts = hosts # type: ignore + else: + self.hosts = [HostPlacementSpec.parse(x, require_network=False) # type: ignore + for x in hosts if x] + + # deprecated + def filter_matching_hosts(self, _get_hosts_func: Callable) -> List[str]: + return self.filter_matching_hostspecs(_get_hosts_func(as_hostspec=True)) + + def filter_matching_hostspecs(self, hostspecs: Iterable[HostSpec]) -> List[str]: + if self.hosts: + all_hosts = [hs.hostname for hs in hostspecs] + return [h.hostname for h in self.hosts if h.hostname in all_hosts] + if self.label: + return [hs.hostname for hs in hostspecs if self.label in hs.labels] + all_hosts = [hs.hostname for hs in hostspecs] + if self.host_pattern: + return fnmatch.filter(all_hosts, self.host_pattern) + return all_hosts + + def get_target_count(self, hostspecs: Iterable[HostSpec]) -> int: + if self.count: + return self.count + return len(self.filter_matching_hostspecs(hostspecs)) * (self.count_per_host or 1) + + def pretty_str(self) -> str: + """ + >>> #doctest: +SKIP + ... ps = PlacementSpec(...) # For all placement specs: + ... PlacementSpec.from_string(ps.pretty_str()) == ps + """ + kv = [] + if self.hosts: + kv.append(';'.join([str(h) for h in self.hosts])) + if self.count: + kv.append('count:%d' % self.count) + if self.count_per_host: + kv.append('count-per-host:%d' % self.count_per_host) + if self.label: + kv.append('label:%s' % self.label) + if self.host_pattern: + kv.append(self.host_pattern) + return ';'.join(kv) + + def __repr__(self) -> str: + kv = [] + if self.count: + kv.append('count=%d' % self.count) + if self.count_per_host: + kv.append('count_per_host=%d' % self.count_per_host) + if self.label: + kv.append('label=%s' % repr(self.label)) + if self.hosts: + kv.append('hosts={!r}'.format(self.hosts)) + if self.host_pattern: + kv.append('host_pattern={!r}'.format(self.host_pattern)) + return "PlacementSpec(%s)" % ', '.join(kv) + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'PlacementSpec': + c = data.copy() + hosts = c.get('hosts', []) + if hosts: + c['hosts'] = [] + for host in hosts: + c['hosts'].append(HostPlacementSpec.from_json(host)) + _cls = cls(**c) + _cls.validate() + return _cls + + def to_json(self) -> dict: + r: Dict[str, Any] = {} + if self.label: + r['label'] = self.label + if self.hosts: + r['hosts'] = [host.to_json() for host in self.hosts] + if self.count: + r['count'] = self.count + if self.count_per_host: + r['count_per_host'] = self.count_per_host + if self.host_pattern: + r['host_pattern'] = self.host_pattern + return r + + def validate(self) -> None: + if self.hosts and self.label: + # TODO: a less generic Exception + raise SpecValidationError('Host and label are mutually exclusive') + if self.count is not None: + try: + intval = int(self.count) + except (ValueError, TypeError): + raise SpecValidationError("num/count must be a numeric value") + if self.count != intval: + raise SpecValidationError("num/count must be an integer value") + if self.count < 1: + raise SpecValidationError("num/count must be >= 1") + if self.count_per_host is not None: + try: + intval = int(self.count_per_host) + except (ValueError, TypeError): + raise SpecValidationError("count-per-host must be a numeric value") + if self.count_per_host != intval: + raise SpecValidationError("count-per-host must be an integer value") + if self.count_per_host < 1: + raise SpecValidationError("count-per-host must be >= 1") + if self.count_per_host is not None and not ( + self.label + or self.hosts + or self.host_pattern + ): + raise SpecValidationError( + "count-per-host must be combined with label or hosts or host_pattern" + ) + if self.count is not None and self.count_per_host is not None: + raise SpecValidationError("cannot combine count and count-per-host") + if ( + self.count_per_host is not None + and self.hosts + and any([hs.network or hs.name for hs in self.hosts]) + ): + raise SpecValidationError( + "count-per-host cannot be combined explicit placement with names or networks" + ) + if self.host_pattern: + if not isinstance(self.host_pattern, str): + raise SpecValidationError('host_pattern must be of type string') + if self.hosts: + raise SpecValidationError('cannot combine host patterns and hosts') + + for h in self.hosts: + h.validate() + + @classmethod + def from_string(cls, arg): + # type: (Optional[str]) -> PlacementSpec + """ + A single integer is parsed as a count: + + >>> PlacementSpec.from_string('3') + PlacementSpec(count=3) + + A list of names is parsed as host specifications: + + >>> PlacementSpec.from_string('host1 host2') + PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacemen\ +tSpec(hostname='host2', network='', name='')]) + + You can also prefix the hosts with a count as follows: + + >>> PlacementSpec.from_string('2 host1 host2') + PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), Hos\ +tPlacementSpec(hostname='host2', network='', name='')]) + + You can specify labels using `label:<label>` + + >>> PlacementSpec.from_string('label:mon') + PlacementSpec(label='mon') + + Labels also support a count: + + >>> PlacementSpec.from_string('3 label:mon') + PlacementSpec(count=3, label='mon') + + fnmatch is also supported: + + >>> PlacementSpec.from_string('data[1-3]') + PlacementSpec(host_pattern='data[1-3]') + + >>> PlacementSpec.from_string(None) + PlacementSpec() + """ + if arg is None or not arg: + strings = [] + elif isinstance(arg, str): + if ' ' in arg: + strings = arg.split(' ') + elif ';' in arg: + strings = arg.split(';') + elif ',' in arg and '[' not in arg: + # FIXME: this isn't quite right. we want to avoid breaking + # a list of mons with addrvecs... so we're basically allowing + # , most of the time, except when addrvecs are used. maybe + # ok? + strings = arg.split(',') + else: + strings = [arg] + else: + raise SpecValidationError('invalid placement %s' % arg) + + count = None + count_per_host = None + if strings: + try: + count = int(strings[0]) + strings = strings[1:] + except ValueError: + pass + for s in strings: + if s.startswith('count:'): + try: + count = int(s[len('count:'):]) + strings.remove(s) + break + except ValueError: + pass + for s in strings: + if s.startswith('count-per-host:'): + try: + count_per_host = int(s[len('count-per-host:'):]) + strings.remove(s) + break + except ValueError: + pass + + advanced_hostspecs = [h for h in strings if + (':' in h or '=' in h or not any(c in '[]?*:=' for c in h)) and + 'label:' not in h] + for a_h in advanced_hostspecs: + strings.remove(a_h) + + labels = [x for x in strings if 'label:' in x] + if len(labels) > 1: + raise SpecValidationError('more than one label provided: {}'.format(labels)) + for l in labels: + strings.remove(l) + label = labels[0][6:] if labels else None + + host_patterns = strings + if len(host_patterns) > 1: + raise SpecValidationError( + 'more than one host pattern provided: {}'.format(host_patterns)) + + ps = PlacementSpec(count=count, + count_per_host=count_per_host, + hosts=advanced_hostspecs, + label=label, + host_pattern=host_patterns[0] if host_patterns else None) + return ps + + +_service_spec_from_json_validate = True + + +class CustomConfig: + """ + Class to specify custom config files to be mounted in daemon's container + """ + + _fields = ['content', 'mount_path'] + + def __init__(self, content: str, mount_path: str) -> None: + self.content: str = content + self.mount_path: str = mount_path + self.validate() + + def to_json(self) -> Dict[str, Any]: + return { + 'content': self.content, + 'mount_path': self.mount_path, + } + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "CustomConfig": + for k in cls._fields: + if k not in data: + raise SpecValidationError(f'CustomConfig must have "{k}" field') + for k in data.keys(): + if k not in cls._fields: + raise SpecValidationError(f'CustomConfig got unknown field "{k}"') + return cls(**data) + + @property + def filename(self) -> str: + return os.path.basename(self.mount_path) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, CustomConfig): + return ( + self.content == other.content + and self.mount_path == other.mount_path + ) + return NotImplemented + + def __repr__(self) -> str: + return f'CustomConfig({self.mount_path})' + + def validate(self) -> None: + if not isinstance(self.content, str): + raise SpecValidationError( + f'CustomConfig content must be a string. Got {type(self.content)}') + if not isinstance(self.mount_path, str): + raise SpecValidationError( + f'CustomConfig content must be a string. Got {type(self.mount_path)}') + + +@contextmanager +def service_spec_allow_invalid_from_json() -> Iterator[None]: + """ + I know this is evil, but unfortunately `ceph orch ls` + may return invalid OSD specs for OSDs not associated to + and specs. If you have a better idea, please! + """ + global _service_spec_from_json_validate + _service_spec_from_json_validate = False + yield + _service_spec_from_json_validate = True + + +class ArgumentSpec: + """The ArgumentSpec type represents an argument that can be + passed to an underyling subsystem, like a container engine or + another command line tool. + + The ArgumentSpec aims to be backwards compatible with the previous + form of argument, a single string. The string was always assumed + to be indentended to be split on spaces. For example: + `--cpus 8` becomes `["--cpus", "8"]`. This type is converted from + either a string or an json/yaml object. In the object form you + can choose if the string part should be split so an argument like + `--migrate-from=//192.168.5.22/My Documents` can be expressed. + """ + _fields = ['argument', 'split'] + + class OriginalType(enum.Enum): + OBJECT = 0 + STRING = 1 + + def __init__( + self, + argument: str, + split: bool = False, + *, + origin: OriginalType = OriginalType.OBJECT, + ) -> None: + self.argument = argument + self.split = bool(split) + # origin helps with round-tripping between inputs that + # are simple strings or objects (dicts) + self._origin = origin + self.validate() + + def to_json(self) -> Union[str, Dict[str, Any]]: + """Return a json-safe represenation of the ArgumentSpec.""" + if self._origin == self.OriginalType.STRING: + return self.argument + return { + 'argument': self.argument, + 'split': self.split, + } + + def to_args(self) -> List[str]: + """Convert this ArgumentSpec into a list of arguments suitable for + adding to an argv-style command line. + """ + if not self.split: + return [self.argument] + return [part for part in self.argument.split(" ") if part] + + def __eq__(self, other: Any) -> bool: + if isinstance(other, ArgumentSpec): + return ( + self.argument == other.argument + and self.split == other.split + ) + if isinstance(other, object): + # This is a workaround for silly ceph mgr object/type identity + # mismatches due to multiple python interpreters in use. + try: + argument = getattr(other, 'argument') + split = getattr(other, 'split') + return (self.argument == argument and self.split == split) + except AttributeError: + pass + return NotImplemented + + def __repr__(self) -> str: + return f'ArgumentSpec({self.argument!r}, {self.split!r})' + + def validate(self) -> None: + if not isinstance(self.argument, str): + raise SpecValidationError( + f'ArgumentSpec argument must be a string. Got {type(self.argument)}') + if not isinstance(self.split, bool): + raise SpecValidationError( + f'ArgumentSpec split must be a boolean. Got {type(self.split)}') + + @classmethod + def from_json(cls, data: Union[str, Dict[str, Any]]) -> "ArgumentSpec": + """Convert a json-object (dict) to an ArgumentSpec.""" + if isinstance(data, str): + return cls(data, split=True, origin=cls.OriginalType.STRING) + if 'argument' not in data: + raise SpecValidationError(f'ArgumentSpec must have an "argument" field') + for k in data.keys(): + if k not in cls._fields: + raise SpecValidationError(f'ArgumentSpec got an unknown field {k!r}') + return cls(**data) + + @staticmethod + def map_json( + values: Optional["ArgumentList"] + ) -> Optional[List[Union[str, Dict[str, Any]]]]: + """Given a list of ArgumentSpec objects return a json-safe + representation.of them.""" + if values is None: + return None + return [v.to_json() for v in values] + + @classmethod + def from_general_args(cls, data: "GeneralArgList") -> "ArgumentList": + """Convert a list of strs, dicts, or existing ArgumentSpec objects + to a list of only ArgumentSpec objects. + """ + out: ArgumentList = [] + for item in data: + if isinstance(item, (str, dict)): + out.append(cls.from_json(item)) + elif isinstance(item, cls): + out.append(item) + elif hasattr(item, 'to_json'): + # This is a workaround for silly ceph mgr object/type identity + # mismatches due to multiple python interpreters in use. + # It should be safe because we already have to be able to + # round-trip between json/yaml. + out.append(cls.from_json(item.to_json())) + else: + raise SpecValidationError(f"Unknown type for argument: {type(item)}") + return out + + +ArgumentList = List[ArgumentSpec] +GeneralArgList = List[Union[str, Dict[str, Any], "ArgumentSpec"]] + + +class ServiceSpec(object): + """ + Details of service creation. + + Request to the orchestrator for a cluster of daemons + such as MDS, RGW, iscsi gateway, nvmeof gateway, MONs, MGRs, Prometheus + + This structure is supposed to be enough information to + start the services. + """ + KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \ + 'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \ + 'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \ + 'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split() + REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split() + MANAGED_CONFIG_OPTIONS = [ + 'mds_join_fs', + ] + + @classmethod + def _cls(cls: Type[ServiceSpecT], service_type: str) -> Type[ServiceSpecT]: + from ceph.deployment.drive_group import DriveGroupSpec + + ret = { + 'mon': MONSpec, + 'rgw': RGWSpec, + 'nfs': NFSServiceSpec, + 'osd': DriveGroupSpec, + 'mds': MDSSpec, + 'iscsi': IscsiServiceSpec, + 'nvmeof': NvmeofServiceSpec, + 'alertmanager': AlertManagerSpec, + 'ingress': IngressSpec, + 'container': CustomContainerSpec, + 'grafana': GrafanaSpec, + 'node-exporter': MonitoringSpec, + 'ceph-exporter': CephExporterSpec, + 'prometheus': PrometheusSpec, + 'loki': MonitoringSpec, + 'promtail': MonitoringSpec, + 'snmp-gateway': SNMPGatewaySpec, + 'elasticsearch': TracingSpec, + 'jaeger-agent': TracingSpec, + 'jaeger-collector': TracingSpec, + 'jaeger-query': TracingSpec, + 'jaeger-tracing': TracingSpec, + }.get(service_type, cls) + if ret == ServiceSpec and not service_type: + raise SpecValidationError('Spec needs a "service_type" key.') + return ret + + def __new__(cls: Type[ServiceSpecT], *args: Any, **kwargs: Any) -> ServiceSpecT: + """ + Some Python foo to make sure, we don't have an object + like `ServiceSpec('rgw')` of type `ServiceSpec`. Now we have: + + >>> type(ServiceSpec('rgw')) == type(RGWSpec('rgw')) + True + + """ + if cls != ServiceSpec: + return object.__new__(cls) + service_type = kwargs.get('service_type', args[0] if args else None) + sub_cls: Any = cls._cls(service_type) + return object.__new__(sub_cls) + + def __init__(self, + service_type: str, + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + count: Optional[int] = None, + config: Optional[Dict[str, str]] = None, + unmanaged: bool = False, + preview_only: bool = False, + networks: Optional[List[str]] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + + #: See :ref:`orchestrator-cli-placement-spec`. + self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec + + assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES, service_type + #: The type of the service. Needs to be either a Ceph + #: service (``mon``, ``crash``, ``mds``, ``mgr``, ``osd`` or + #: ``rbd-mirror``), a gateway (``nfs`` or ``rgw``), part of the + #: monitoring stack (``alertmanager``, ``grafana``, ``node-exporter`` or + #: ``prometheus``) or (``container``) for custom containers. + self.service_type = service_type + + #: The name of the service. Required for ``iscsi``, ``nvmeof``, ``mds``, ``nfs``, ``osd``, + #: ``rgw``, ``container``, ``ingress`` + self.service_id = None + + if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd': + self.service_id = service_id + + #: If set to ``true``, the orchestrator will not deploy nor remove + #: any daemon associated with this service. Placement and all other properties + #: will be ignored. This is useful, if you do not want this service to be + #: managed temporarily. For cephadm, See :ref:`cephadm-spec-unmanaged` + self.unmanaged = unmanaged + self.preview_only = preview_only + + #: A list of network identities instructing the daemons to only bind + #: on the particular networks in that list. In case the cluster is distributed + #: across multiple networks, you can add multiple networks. See + #: :ref:`cephadm-monitoring-networks-ports`, + #: :ref:`cephadm-rgw-networks` and :ref:`cephadm-mgr-networks`. + self.networks: List[str] = networks or [] + + self.config: Optional[Dict[str, str]] = None + if config: + self.config = {k.replace(' ', '_'): v for k, v in config.items()} + + self.extra_container_args: Optional[ArgumentList] = None + self.extra_entrypoint_args: Optional[ArgumentList] = None + if extra_container_args: + self.extra_container_args = ArgumentSpec.from_general_args( + extra_container_args) + if extra_entrypoint_args: + self.extra_entrypoint_args = ArgumentSpec.from_general_args( + extra_entrypoint_args) + self.custom_configs: Optional[List[CustomConfig]] = custom_configs + + @classmethod + @handle_type_error + def from_json(cls: Type[ServiceSpecT], json_spec: Dict) -> ServiceSpecT: + """ + Initialize 'ServiceSpec' object data from a json structure + + There are two valid styles for service specs: + + the "old" style: + + .. code:: yaml + + service_type: nfs + service_id: foo + pool: mypool + namespace: myns + + and the "new" style: + + .. code:: yaml + + service_type: nfs + service_id: foo + config: + some_option: the_value + networks: [10.10.0.0/16] + spec: + pool: mypool + namespace: myns + + In https://tracker.ceph.com/issues/45321 we decided that we'd like to + prefer the new style as it is more readable and provides a better + understanding of what fields are special for a give service type. + + Note, we'll need to stay compatible with both versions for the + the next two major releases (octopus, pacific). + + :param json_spec: A valid dict with ServiceSpec + + :meta private: + """ + if not isinstance(json_spec, dict): + raise SpecValidationError( + f'Service Spec is not an (JSON or YAML) object. got "{str(json_spec)}"') + + json_spec = cls.normalize_json(json_spec) + + c = json_spec.copy() + + # kludge to make `from_json` compatible to `Orchestrator.describe_service` + # Open question: Remove `service_id` form to_json? + if c.get('service_name', ''): + service_type_id = c['service_name'].split('.', 1) + + if not c.get('service_type', ''): + c['service_type'] = service_type_id[0] + if not c.get('service_id', '') and len(service_type_id) > 1: + c['service_id'] = service_type_id[1] + del c['service_name'] + + service_type = c.get('service_type', '') + _cls = cls._cls(service_type) + + if 'status' in c: + del c['status'] # kludge to make us compatible to `ServiceDescription.to_json()` + + return _cls._from_json_impl(c) # type: ignore + + @staticmethod + def normalize_json(json_spec: dict) -> dict: + networks = json_spec.get('networks') + if networks is None: + return json_spec + if isinstance(networks, list): + return json_spec + if not isinstance(networks, str): + raise SpecValidationError(f'Networks ({networks}) must be a string or list of strings') + json_spec['networks'] = [networks] + return json_spec + + @classmethod + def _from_json_impl(cls: Type[ServiceSpecT], json_spec: dict) -> ServiceSpecT: + args = {} # type: Dict[str, Any] + for k, v in json_spec.items(): + if k == 'placement': + v = PlacementSpec.from_json(v) + if k == 'custom_configs': + v = [CustomConfig.from_json(c) for c in v] + if k == 'spec': + args.update(v) + continue + args.update({k: v}) + _cls = cls(**args) + if _service_spec_from_json_validate: + _cls.validate() + return _cls + + def service_name(self) -> str: + n = self.service_type + if self.service_id: + n += '.' + self.service_id + return n + + def get_port_start(self) -> List[int]: + # If defined, we will allocate and number ports starting at this + # point. + return [] + + def get_virtual_ip(self) -> Optional[str]: + return None + + def to_json(self): + # type: () -> OrderedDict[str, Any] + ret: OrderedDict[str, Any] = OrderedDict() + ret['service_type'] = self.service_type + if self.service_id: + ret['service_id'] = self.service_id + ret['service_name'] = self.service_name() + if self.placement.to_json(): + ret['placement'] = self.placement.to_json() + if self.unmanaged: + ret['unmanaged'] = self.unmanaged + if self.networks: + ret['networks'] = self.networks + if self.extra_container_args: + ret['extra_container_args'] = ArgumentSpec.map_json( + self.extra_container_args + ) + if self.extra_entrypoint_args: + ret['extra_entrypoint_args'] = ArgumentSpec.map_json( + self.extra_entrypoint_args + ) + if self.custom_configs: + ret['custom_configs'] = [c.to_json() for c in self.custom_configs] + + c = {} + for key, val in sorted(self.__dict__.items(), key=lambda tpl: tpl[0]): + if key in ret: + continue + if hasattr(val, 'to_json'): + val = val.to_json() + if val: + c[key] = val + if c: + ret['spec'] = c + return ret + + def validate(self) -> None: + if not self.service_type: + raise SpecValidationError('Cannot add Service: type required') + + if self.service_type != 'osd': + if self.service_type in self.REQUIRES_SERVICE_ID and not self.service_id: + raise SpecValidationError('Cannot add Service: id required') + if self.service_type not in self.REQUIRES_SERVICE_ID and self.service_id: + raise SpecValidationError( + f'Service of type \'{self.service_type}\' should not contain a service id') + + if self.service_id: + if not re.match('^[a-zA-Z0-9_.-]+$', str(self.service_id)): + raise SpecValidationError('Service id contains invalid characters, ' + 'only [a-zA-Z0-9_.-] allowed') + + if self.placement is not None: + self.placement.validate() + if self.config: + for k, v in self.config.items(): + if k in self.MANAGED_CONFIG_OPTIONS: + raise SpecValidationError( + f'Cannot set config option {k} in spec: it is managed by cephadm' + ) + for network in self.networks or []: + try: + ip_network(network) + except ValueError as e: + raise SpecValidationError( + f'Cannot parse network {network}: {e}' + ) + + def __repr__(self) -> str: + y = yaml.dump(cast(dict, self), default_flow_style=False) + return f"{self.__class__.__name__}.from_json(yaml.safe_load('''{y}'''))" + + def __eq__(self, other: Any) -> bool: + return (self.__class__ == other.__class__ + and + self.__dict__ == other.__dict__) + + def one_line_str(self) -> str: + return '<{} for service_name={}>'.format(self.__class__.__name__, self.service_name()) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceSpec') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(ServiceSpec, ServiceSpec.yaml_representer) + + +class NFSServiceSpec(ServiceSpec): + def __init__(self, + service_type: str = 'nfs', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + virtual_ip: Optional[str] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + enable_haproxy_protocol: bool = False, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'nfs' + super(NFSServiceSpec, self).__init__( + 'nfs', service_id=service_id, + placement=placement, unmanaged=unmanaged, preview_only=preview_only, + config=config, networks=networks, extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, custom_configs=custom_configs) + + self.port = port + self.virtual_ip = virtual_ip + self.enable_haproxy_protocol = enable_haproxy_protocol + + def get_port_start(self) -> List[int]: + if self.port: + return [self.port] + return [] + + def rados_config_name(self): + # type: () -> str + return 'conf-' + self.service_name() + + +yaml.add_representer(NFSServiceSpec, ServiceSpec.yaml_representer) + + +class RGWSpec(ServiceSpec): + """ + Settings to configure a (multisite) Ceph RGW + + .. code-block:: yaml + + service_type: rgw + service_id: myrealm.myzone + spec: + rgw_realm: myrealm + rgw_zonegroup: myzonegroup + rgw_zone: myzone + ssl: true + rgw_frontend_port: 1234 + rgw_frontend_type: beast + rgw_frontend_ssl_certificate: ... + + See also: :ref:`orchestrator-cli-service-spec` + """ + + MANAGED_CONFIG_OPTIONS = ServiceSpec.MANAGED_CONFIG_OPTIONS + [ + 'rgw_zone', + 'rgw_realm', + 'rgw_zonegroup', + 'rgw_frontends', + ] + + def __init__(self, + service_type: str = 'rgw', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + rgw_realm: Optional[str] = None, + rgw_zonegroup: Optional[str] = None, + rgw_zone: Optional[str] = None, + rgw_frontend_port: Optional[int] = None, + rgw_frontend_ssl_certificate: Optional[List[str]] = None, + rgw_frontend_type: Optional[str] = None, + rgw_frontend_extra_args: Optional[List[str]] = None, + unmanaged: bool = False, + ssl: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + subcluster: Optional[str] = None, # legacy, only for from_json on upgrade + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + rgw_realm_token: Optional[str] = None, + update_endpoints: Optional[bool] = False, + zone_endpoints: Optional[str] = None # commad separated endpoints list + ): + assert service_type == 'rgw', service_type + + # for backward compatibility with octopus spec files, + if not service_id and (rgw_realm and rgw_zone): + service_id = rgw_realm + '.' + rgw_zone + + super(RGWSpec, self).__init__( + 'rgw', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, + extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + #: The RGW realm associated with this service. Needs to be manually created + #: if the spec is being applied directly to cephdam. In case of rgw module + #: the realm is created automatically. + self.rgw_realm: Optional[str] = rgw_realm + #: The RGW zonegroup associated with this service. Needs to be manually created + #: if the spec is being applied directly to cephdam. In case of rgw module + #: the zonegroup is created automatically. + self.rgw_zonegroup: Optional[str] = rgw_zonegroup + #: The RGW zone associated with this service. Needs to be manually created + #: if the spec is being applied directly to cephdam. In case of rgw module + #: the zone is created automatically. + self.rgw_zone: Optional[str] = rgw_zone + #: Port of the RGW daemons + self.rgw_frontend_port: Optional[int] = rgw_frontend_port + #: List of SSL certificates + self.rgw_frontend_ssl_certificate: Optional[List[str]] = rgw_frontend_ssl_certificate + #: civetweb or beast (default: beast). See :ref:`rgw_frontends` + self.rgw_frontend_type: Optional[str] = rgw_frontend_type + #: List of extra arguments for rgw_frontend in the form opt=value. See :ref:`rgw_frontends` + self.rgw_frontend_extra_args: Optional[List[str]] = rgw_frontend_extra_args + #: enable SSL + self.ssl = ssl + self.rgw_realm_token = rgw_realm_token + self.update_endpoints = update_endpoints + self.zone_endpoints = zone_endpoints + + def get_port_start(self) -> List[int]: + return [self.get_port()] + + def get_port(self) -> int: + if self.rgw_frontend_port: + return self.rgw_frontend_port + if self.ssl: + return 443 + else: + return 80 + + def validate(self) -> None: + super(RGWSpec, self).validate() + + if self.rgw_realm and not self.rgw_zone: + raise SpecValidationError( + 'Cannot add RGW: Realm specified but no zone specified') + if self.rgw_zone and not self.rgw_realm: + raise SpecValidationError('Cannot add RGW: Zone specified but no realm specified') + + if self.rgw_frontend_type is not None: + if self.rgw_frontend_type not in ['beast', 'civetweb']: + raise SpecValidationError( + 'Invalid rgw_frontend_type value. Valid values are: beast, civetweb.\n' + 'Additional rgw type parameters can be passed using rgw_frontend_extra_args.' + ) + + +yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer) + + +class NvmeofServiceSpec(ServiceSpec): + def __init__(self, + service_type: str = 'nvmeof', + service_id: Optional[str] = None, + name: Optional[str] = None, + group: Optional[str] = None, + port: Optional[int] = None, + pool: Optional[str] = None, + enable_auth: bool = False, + server_key: Optional[str] = None, + server_cert: Optional[str] = None, + client_key: Optional[str] = None, + client_cert: Optional[str] = None, + spdk_path: Optional[str] = None, + tgt_path: Optional[str] = None, + timeout: Optional[int] = 60, + conn_retries: Optional[int] = 10, + transports: Optional[str] = 'tcp', + transport_tcp_options: Optional[Dict[str, int]] = + {"in_capsule_data_size": 8192, "max_io_qpairs_per_ctrlr": 7}, + tgt_cmd_extra_args: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'nvmeof' + super(NvmeofServiceSpec, self).__init__('nvmeof', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, + config=config, networks=networks, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + #: RADOS pool where ceph-nvmeof config data is stored. + self.pool = pool + #: ``port`` port of the nvmeof gateway + self.port = port or 5500 + #: ``name`` name of the nvmeof gateway + self.name = name + #: ``group`` name of the nvmeof gateway + self.group = group + #: ``enable_auth`` enables user authentication on nvmeof gateway + self.enable_auth = enable_auth + #: ``server_key`` gateway server key + self.server_key = server_key or './server.key' + #: ``server_cert`` gateway server certificate + self.server_cert = server_cert or './server.crt' + #: ``client_key`` client key + self.client_key = client_key or './client.key' + #: ``client_cert`` client certificate + self.client_cert = client_cert or './client.crt' + #: ``spdk_path`` path to SPDK + self.spdk_path = spdk_path or '/usr/local/bin/nvmf_tgt' + #: ``tgt_path`` nvmeof target path + self.tgt_path = tgt_path or '/usr/local/bin/nvmf_tgt' + #: ``timeout`` ceph connectivity timeout + self.timeout = timeout + #: ``conn_retries`` ceph connection retries number + self.conn_retries = conn_retries + #: ``transports`` tcp + self.transports = transports + #: List of extra arguments for transports in the form opt=value + self.transport_tcp_options: Optional[Dict[str, int]] = transport_tcp_options + #: ``tgt_cmd_extra_args`` extra arguments for the nvmf_tgt process + self.tgt_cmd_extra_args = tgt_cmd_extra_args + + def get_port_start(self) -> List[int]: + return [5500, 4420, 8009] + + def validate(self) -> None: + # TODO: what other parameters should be validated as part of this function? + super(NvmeofServiceSpec, self).validate() + + if not self.pool: + raise SpecValidationError('Cannot add NVMEOF: No Pool specified') + + if self.enable_auth: + if not any([self.server_key, self.server_cert, self.client_key, self.client_cert]): + raise SpecValidationError( + 'enable_auth is true but client/server certificates are missing') + + if self.transports not in ['tcp']: + raise SpecValidationError('Invalid transport. Valid values are tcp') + + +yaml.add_representer(NvmeofServiceSpec, ServiceSpec.yaml_representer) + + +class IscsiServiceSpec(ServiceSpec): + def __init__(self, + service_type: str = 'iscsi', + service_id: Optional[str] = None, + pool: Optional[str] = None, + trusted_ip_list: Optional[str] = None, + api_port: Optional[int] = 5000, + api_user: Optional[str] = 'admin', + api_password: Optional[str] = 'admin', + api_secure: Optional[bool] = None, + ssl_cert: Optional[str] = None, + ssl_key: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'iscsi' + super(IscsiServiceSpec, self).__init__('iscsi', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, + config=config, networks=networks, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + #: RADOS pool where ceph-iscsi config data is stored. + self.pool = pool + #: list of trusted IP addresses + self.trusted_ip_list = trusted_ip_list + #: ``api_port`` as defined in the ``iscsi-gateway.cfg`` + self.api_port = api_port + #: ``api_user`` as defined in the ``iscsi-gateway.cfg`` + self.api_user = api_user + #: ``api_password`` as defined in the ``iscsi-gateway.cfg`` + self.api_password = api_password + #: ``api_secure`` as defined in the ``iscsi-gateway.cfg`` + self.api_secure = api_secure + #: SSL certificate + self.ssl_cert = ssl_cert + #: SSL private key + self.ssl_key = ssl_key + + if not self.api_secure and self.ssl_cert and self.ssl_key: + self.api_secure = True + + def get_port_start(self) -> List[int]: + return [self.api_port or 5000] + + def validate(self) -> None: + super(IscsiServiceSpec, self).validate() + + if not self.pool: + raise SpecValidationError( + 'Cannot add ISCSI: No Pool specified') + + # Do not need to check for api_user and api_password as they + # now default to 'admin' when setting up the gateway url. Older + # iSCSI specs from before this change should be fine as they will + # have been required to have an api_user and api_password set and + # will be unaffected by the new default value. + + +yaml.add_representer(IscsiServiceSpec, ServiceSpec.yaml_representer) + + +class IngressSpec(ServiceSpec): + def __init__(self, + service_type: str = 'ingress', + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + backend_service: Optional[str] = None, + frontend_port: Optional[int] = None, + ssl_cert: Optional[str] = None, + ssl_key: Optional[str] = None, + ssl_dh_param: Optional[str] = None, + ssl_ciphers: Optional[List[str]] = None, + ssl_options: Optional[List[str]] = None, + monitor_port: Optional[int] = None, + monitor_user: Optional[str] = None, + monitor_password: Optional[str] = None, + enable_stats: Optional[bool] = None, + keepalived_password: Optional[str] = None, + virtual_ip: Optional[str] = None, + virtual_ips_list: Optional[List[str]] = None, + virtual_interface_networks: Optional[List[str]] = [], + use_keepalived_multicast: Optional[bool] = False, + vrrp_interface_network: Optional[str] = None, + first_virtual_router_id: Optional[int] = 50, + unmanaged: bool = False, + ssl: bool = False, + keepalive_only: bool = False, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + enable_haproxy_protocol: bool = False, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'ingress' + + super(IngressSpec, self).__init__( + 'ingress', service_id=service_id, + placement=placement, config=config, + networks=networks, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs + ) + self.backend_service = backend_service + self.frontend_port = frontend_port + self.ssl_cert = ssl_cert + self.ssl_key = ssl_key + self.ssl_dh_param = ssl_dh_param + self.ssl_ciphers = ssl_ciphers + self.ssl_options = ssl_options + self.monitor_port = monitor_port + self.monitor_user = monitor_user + self.monitor_password = monitor_password + self.keepalived_password = keepalived_password + self.virtual_ip = virtual_ip + self.virtual_ips_list = virtual_ips_list + self.virtual_interface_networks = virtual_interface_networks or [] + self.use_keepalived_multicast = use_keepalived_multicast + self.vrrp_interface_network = vrrp_interface_network + self.first_virtual_router_id = first_virtual_router_id + self.unmanaged = unmanaged + self.ssl = ssl + self.keepalive_only = keepalive_only + self.enable_haproxy_protocol = enable_haproxy_protocol + + def get_port_start(self) -> List[int]: + ports = [] + if self.frontend_port is not None: + ports.append(cast(int, self.frontend_port)) + if self.monitor_port is not None: + ports.append(cast(int, self.monitor_port)) + return ports + + def get_virtual_ip(self) -> Optional[str]: + return self.virtual_ip + + def validate(self) -> None: + super(IngressSpec, self).validate() + + if not self.backend_service: + raise SpecValidationError( + 'Cannot add ingress: No backend_service specified') + if not self.keepalive_only and not self.frontend_port: + raise SpecValidationError( + 'Cannot add ingress: No frontend_port specified') + if not self.monitor_port: + raise SpecValidationError( + 'Cannot add ingress: No monitor_port specified') + if not self.virtual_ip and not self.virtual_ips_list: + raise SpecValidationError( + 'Cannot add ingress: No virtual_ip provided') + if self.virtual_ip is not None and self.virtual_ips_list is not None: + raise SpecValidationError( + 'Cannot add ingress: Single and multiple virtual IPs specified') + + +yaml.add_representer(IngressSpec, ServiceSpec.yaml_representer) + + +class CustomContainerSpec(ServiceSpec): + def __init__(self, + service_type: str = 'container', + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + image: Optional[str] = None, + entrypoint: Optional[str] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + volume_mounts: Optional[Dict[str, str]] = {}, + # args are for the container runtime, not entrypoint + args: Optional[GeneralArgList] = [], + envs: Optional[List[str]] = [], + privileged: Optional[bool] = False, + bind_mounts: Optional[List[List[str]]] = None, + ports: Optional[List[int]] = [], + dirs: Optional[List[str]] = [], + files: Optional[Dict[str, Any]] = {}, + ): + assert service_type == 'container' + assert service_id is not None + assert image is not None + + super(CustomContainerSpec, self).__init__( + service_type, service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, + networks=networks, extra_entrypoint_args=extra_entrypoint_args) + + self.image = image + self.entrypoint = entrypoint + self.uid = uid + self.gid = gid + self.volume_mounts = volume_mounts + self.args = args + self.envs = envs + self.privileged = privileged + self.bind_mounts = bind_mounts + self.ports = ports + self.dirs = dirs + self.files = files + + def config_json(self) -> Dict[str, Any]: + """ + Helper function to get the value of the `--config-json` cephadm + command line option. It will contain all specification properties + that haven't a `None` value. Such properties will get default + values in cephadm. + :return: Returns a dictionary containing all specification + properties. + """ + config_json = {} + for prop in ['image', 'entrypoint', 'uid', 'gid', 'args', + 'envs', 'volume_mounts', 'privileged', + 'bind_mounts', 'ports', 'dirs', 'files']: + value = getattr(self, prop) + if value is not None: + config_json[prop] = value + return config_json + + +yaml.add_representer(CustomContainerSpec, ServiceSpec.yaml_representer) + + +class MonitoringSpec(ServiceSpec): + def __init__(self, + service_type: str, + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + port: Optional[int] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type in ['grafana', 'node-exporter', 'prometheus', 'alertmanager', + 'loki', 'promtail'] + + super(MonitoringSpec, self).__init__( + service_type, service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, + networks=networks, extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + self.service_type = service_type + self.port = port + + def get_port_start(self) -> List[int]: + return [self.get_port()] + + def get_port(self) -> int: + if self.port: + return self.port + else: + return {'prometheus': 9095, + 'node-exporter': 9100, + 'alertmanager': 9093, + 'grafana': 3000, + 'loki': 3100, + 'promtail': 9080}[self.service_type] + + +yaml.add_representer(MonitoringSpec, ServiceSpec.yaml_representer) + + +class AlertManagerSpec(MonitoringSpec): + def __init__(self, + service_type: str = 'alertmanager', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + user_data: Optional[Dict[str, Any]] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + secure: bool = False, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'alertmanager' + super(AlertManagerSpec, self).__init__( + 'alertmanager', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, port=port, + extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + # Custom configuration. + # + # Example: + # service_type: alertmanager + # service_id: xyz + # user_data: + # default_webhook_urls: + # - "https://foo" + # - "https://bar" + # + # Documentation: + # default_webhook_urls - A list of additional URL's that are + # added to the default receivers' + # <webhook_configs> configuration. + self.user_data = user_data or {} + self.secure = secure + + def get_port_start(self) -> List[int]: + return [self.get_port(), 9094] + + def validate(self) -> None: + super(AlertManagerSpec, self).validate() + + if self.port == 9094: + raise SpecValidationError( + 'Port 9094 is reserved for AlertManager cluster listen address') + + +yaml.add_representer(AlertManagerSpec, ServiceSpec.yaml_representer) + + +class GrafanaSpec(MonitoringSpec): + def __init__(self, + service_type: str = 'grafana', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + protocol: Optional[str] = 'https', + initial_admin_password: Optional[str] = None, + anonymous_access: Optional[bool] = True, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'grafana' + super(GrafanaSpec, self).__init__( + 'grafana', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, port=port, + extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + self.initial_admin_password = initial_admin_password + self.anonymous_access = anonymous_access + self.protocol = protocol + + def validate(self) -> None: + super(GrafanaSpec, self).validate() + if self.protocol not in ['http', 'https']: + err_msg = f"Invalid protocol '{self.protocol}'. Valid values are: 'http', 'https'." + raise SpecValidationError(err_msg) + + if not self.anonymous_access and not self.initial_admin_password: + err_msg = ('Either initial_admin_password must be set or anonymous_access ' + 'must be set to true. Otherwise the grafana dashboard will ' + 'be inaccessible.') + raise SpecValidationError(err_msg) + + +yaml.add_representer(GrafanaSpec, ServiceSpec.yaml_representer) + + +class PrometheusSpec(MonitoringSpec): + def __init__(self, + service_type: str = 'prometheus', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + retention_time: Optional[str] = None, + retention_size: Optional[str] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'prometheus' + super(PrometheusSpec, self).__init__( + 'prometheus', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, port=port, + extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + self.retention_time = retention_time.strip() if retention_time else None + self.retention_size = retention_size.strip() if retention_size else None + + def validate(self) -> None: + super(PrometheusSpec, self).validate() + + if self.retention_time: + valid_units = ['y', 'w', 'd', 'h', 'm', 's'] + m = re.search(rf"^(\d+)({'|'.join(valid_units)})$", self.retention_time) + if not m: + units = ', '.join(valid_units) + raise SpecValidationError(f"Invalid retention time. Valid units are: {units}") + if self.retention_size: + valid_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB'] + m = re.search(rf"^(\d+)({'|'.join(valid_units)})$", self.retention_size) + if not m: + units = ', '.join(valid_units) + raise SpecValidationError(f"Invalid retention size. Valid units are: {units}") + + +yaml.add_representer(PrometheusSpec, ServiceSpec.yaml_representer) + + +class SNMPGatewaySpec(ServiceSpec): + class SNMPVersion(str, enum.Enum): + V2c = 'V2c' + V3 = 'V3' + + def to_json(self) -> str: + return self.value + + class SNMPAuthType(str, enum.Enum): + MD5 = 'MD5' + SHA = 'SHA' + + def to_json(self) -> str: + return self.value + + class SNMPPrivacyType(str, enum.Enum): + DES = 'DES' + AES = 'AES' + + def to_json(self) -> str: + return self.value + + valid_destination_types = [ + 'Name:Port', + 'IPv4:Port' + ] + + def __init__(self, + service_type: str = 'snmp-gateway', + snmp_version: Optional[SNMPVersion] = None, + snmp_destination: str = '', + credentials: Dict[str, str] = {}, + engine_id: Optional[str] = None, + auth_protocol: Optional[SNMPAuthType] = None, + privacy_protocol: Optional[SNMPPrivacyType] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + port: Optional[int] = None, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'snmp-gateway' + + super(SNMPGatewaySpec, self).__init__( + service_type, + placement=placement, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + self.service_type = service_type + self.snmp_version = snmp_version + self.snmp_destination = snmp_destination + self.port = port + self.credentials = credentials + self.engine_id = engine_id + self.auth_protocol = auth_protocol + self.privacy_protocol = privacy_protocol + + @classmethod + def _from_json_impl(cls, json_spec: dict) -> 'SNMPGatewaySpec': + + cpy = json_spec.copy() + types = [ + ('snmp_version', SNMPGatewaySpec.SNMPVersion), + ('auth_protocol', SNMPGatewaySpec.SNMPAuthType), + ('privacy_protocol', SNMPGatewaySpec.SNMPPrivacyType), + ] + for d in cpy, cpy.get('spec', {}): + for key, enum_cls in types: + try: + if key in d: + d[key] = enum_cls(d[key]) + except ValueError: + raise SpecValidationError(f'{key} unsupported. Must be one of ' + f'{", ".join(enum_cls)}') + return super(SNMPGatewaySpec, cls)._from_json_impl(cpy) + + @property + def ports(self) -> List[int]: + return [self.port or 9464] + + def get_port_start(self) -> List[int]: + return self.ports + + def validate(self) -> None: + super(SNMPGatewaySpec, self).validate() + + if not self.credentials: + raise SpecValidationError( + 'Missing authentication information (credentials). ' + 'SNMP V2c and V3 require credential information' + ) + elif not self.snmp_version: + raise SpecValidationError( + 'Missing SNMP version (snmp_version)' + ) + + creds_requirement = { + 'V2c': ['snmp_community'], + 'V3': ['snmp_v3_auth_username', 'snmp_v3_auth_password'] + } + if self.privacy_protocol: + creds_requirement['V3'].append('snmp_v3_priv_password') + + missing = [parm for parm in creds_requirement[self.snmp_version] + if parm not in self.credentials] + # check that credentials are correct for the version + if missing: + raise SpecValidationError( + f'SNMP {self.snmp_version} credentials are incomplete. Missing {", ".join(missing)}' + ) + + if self.engine_id: + if 10 <= len(self.engine_id) <= 64 and \ + is_hex(self.engine_id) and \ + len(self.engine_id) % 2 == 0: + pass + else: + raise SpecValidationError( + 'engine_id must be a string containing 10-64 hex characters. ' + 'Its length must be divisible by 2' + ) + + else: + if self.snmp_version == 'V3': + raise SpecValidationError( + 'Must provide an engine_id for SNMP V3 notifications' + ) + + if not self.snmp_destination: + raise SpecValidationError( + 'SNMP destination (snmp_destination) must be provided' + ) + else: + valid, description = valid_addr(self.snmp_destination) + if not valid: + raise SpecValidationError( + f'SNMP destination (snmp_destination) is invalid: {description}' + ) + if description not in self.valid_destination_types: + raise SpecValidationError( + f'SNMP destination (snmp_destination) type ({description}) is invalid. ' + f'Must be either: {", ".join(sorted(self.valid_destination_types))}' + ) + + +yaml.add_representer(SNMPGatewaySpec, ServiceSpec.yaml_representer) + + +class MDSSpec(ServiceSpec): + def __init__(self, + service_type: str = 'mds', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + config: Optional[Dict[str, str]] = None, + unmanaged: bool = False, + preview_only: bool = False, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'mds' + super(MDSSpec, self).__init__('mds', service_id=service_id, + placement=placement, + config=config, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs) + + def validate(self) -> None: + super(MDSSpec, self).validate() + + if str(self.service_id)[0].isdigit(): + raise SpecValidationError('MDS service id cannot start with a numeric digit') + + +yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer) + + +class MONSpec(ServiceSpec): + def __init__(self, + service_type: str, + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + count: Optional[int] = None, + config: Optional[Dict[str, str]] = None, + unmanaged: bool = False, + preview_only: bool = False, + networks: Optional[List[str]] = None, + extra_container_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + crush_locations: Optional[Dict[str, List[str]]] = None, + ): + assert service_type == 'mon' + super(MONSpec, self).__init__('mon', service_id=service_id, + placement=placement, + count=count, + config=config, + unmanaged=unmanaged, + preview_only=preview_only, + networks=networks, + extra_container_args=extra_container_args, + custom_configs=custom_configs) + + self.crush_locations = crush_locations + self.validate() + + def validate(self) -> None: + if self.crush_locations: + for host, crush_locs in self.crush_locations.items(): + try: + assert_valid_host(host) + except SpecValidationError as e: + err_str = f'Invalid hostname found in spec crush locations: {e}' + raise SpecValidationError(err_str) + for cloc in crush_locs: + if '=' not in cloc or len(cloc.split('=')) != 2: + err_str = ('Crush locations must be of form <bucket>=<location>. ' + f'Found crush location: {cloc}') + raise SpecValidationError(err_str) + + +yaml.add_representer(MONSpec, ServiceSpec.yaml_representer) + + +class TracingSpec(ServiceSpec): + SERVICE_TYPES = ['elasticsearch', 'jaeger-collector', 'jaeger-query', 'jaeger-agent'] + + def __init__(self, + service_type: str, + es_nodes: Optional[str] = None, + without_query: bool = False, + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False + ): + assert service_type in TracingSpec.SERVICE_TYPES + ['jaeger-tracing'] + + super(TracingSpec, self).__init__( + service_type, service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, + networks=networks) + self.without_query = without_query + self.es_nodes = es_nodes + + def get_port_start(self) -> List[int]: + return [self.get_port()] + + def get_port(self) -> int: + return {'elasticsearch': 9200, + 'jaeger-agent': 6799, + 'jaeger-collector': 14250, + 'jaeger-query': 16686}[self.service_type] + + def get_tracing_specs(self) -> List[ServiceSpec]: + assert self.service_type == 'jaeger-tracing' + specs: List[ServiceSpec] = [] + daemons: Dict[str, Optional[PlacementSpec]] = { + daemon: None for daemon in TracingSpec.SERVICE_TYPES} + + if self.es_nodes: + del daemons['elasticsearch'] + if self.without_query: + del daemons['jaeger-query'] + if self.placement: + daemons.update({'jaeger-collector': self.placement}) + + for daemon, daemon_placement in daemons.items(): + specs.append(TracingSpec(service_type=daemon, + es_nodes=self.es_nodes, + placement=daemon_placement, + unmanaged=self.unmanaged, + config=self.config, + networks=self.networks, + preview_only=self.preview_only + )) + return specs + + +yaml.add_representer(TracingSpec, ServiceSpec.yaml_representer) + + +class TunedProfileSpec(): + def __init__(self, + profile_name: str, + placement: Optional[PlacementSpec] = None, + settings: Optional[Dict[str, str]] = None, + ): + self.profile_name = profile_name + self.placement = placement or PlacementSpec(host_pattern='*') + self.settings = settings or {} + self._last_updated: str = '' + + @classmethod + def from_json(cls, spec: Dict[str, Any]) -> 'TunedProfileSpec': + data = {} + if 'profile_name' not in spec: + raise SpecValidationError('Tuned profile spec must include "profile_name" field') + data['profile_name'] = spec['profile_name'] + if not isinstance(data['profile_name'], str): + raise SpecValidationError('"profile_name" field must be a string') + if 'placement' in spec: + data['placement'] = PlacementSpec.from_json(spec['placement']) + if 'settings' in spec: + data['settings'] = spec['settings'] + return cls(**data) + + def to_json(self) -> Dict[str, Any]: + res: Dict[str, Any] = {} + res['profile_name'] = self.profile_name + res['placement'] = self.placement.to_json() + res['settings'] = self.settings + return res + + def __eq__(self, other: Any) -> bool: + if isinstance(other, TunedProfileSpec): + if ( + self.placement == other.placement + and self.profile_name == other.profile_name + and self.settings == other.settings + ): + return True + return False + return NotImplemented + + def __repr__(self) -> str: + return f'TunedProfile({self.profile_name})' + + def copy(self) -> 'TunedProfileSpec': + # for making deep copies so you can edit the settings in one without affecting the other + # mostly for testing purposes + return TunedProfileSpec(self.profile_name, self.placement, self.settings.copy()) + + +class CephExporterSpec(ServiceSpec): + def __init__(self, + service_type: str = 'ceph-exporter', + sock_dir: Optional[str] = None, + addrs: str = '', + port: Optional[int] = None, + prio_limit: Optional[int] = 5, + stats_period: Optional[int] = 5, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + extra_container_args: Optional[GeneralArgList] = None, + ): + assert service_type == 'ceph-exporter' + + super(CephExporterSpec, self).__init__( + service_type, + placement=placement, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args) + + self.service_type = service_type + self.sock_dir = sock_dir + self.addrs = addrs + self.port = port + self.prio_limit = prio_limit + self.stats_period = stats_period + + def validate(self) -> None: + super(CephExporterSpec, self).validate() + + if not isinstance(self.prio_limit, int): + raise SpecValidationError( + f'prio_limit must be an integer. Got {type(self.prio_limit)}') + if not isinstance(self.stats_period, int): + raise SpecValidationError( + f'stats_period must be an integer. Got {type(self.stats_period)}') + + +yaml.add_representer(CephExporterSpec, ServiceSpec.yaml_representer) diff --git a/src/python-common/ceph/deployment/translate.py b/src/python-common/ceph/deployment/translate.py new file mode 100644 index 000000000..86243b8ae --- /dev/null +++ b/src/python-common/ceph/deployment/translate.py @@ -0,0 +1,198 @@ +import logging + +try: + from typing import Optional, List, Dict +except ImportError: + pass + +from ceph.deployment.drive_selection.selector import DriveSelection + +logger = logging.getLogger(__name__) + + +# TODO refactor this to a DriveSelection method +class to_ceph_volume(object): + + _supported_device_classes = [ + "hdd", "ssd", "nvme" + ] + + def __init__(self, + selection, # type: DriveSelection + osd_id_claims=None, # type: Optional[List[str]] + preview=False # type: bool + ): + + self.selection = selection + self.spec = selection.spec + self.preview = preview + self.osd_id_claims = osd_id_claims + + def prepare_devices(self): + + # type: () -> Dict[str, List[str]] + + lvcount: Dict[str, List[str]] = dict() + + """ + Default entry for the global crush_device_class definition; + if there's no global definition at spec level, we do not want + to apply anything to the provided devices, hence we need to run + a ceph-volume command without that option, otherwise we init an + entry for the globally defined crush_device_class. + """ + if self.spec.crush_device_class: + lvcount[self.spec.crush_device_class] = [] + + # entry where the drives that don't require a crush_device_class + # option are collected + lvcount["no_crush"] = [] + + """ + for each device, check if it's just a path or it has a crush_device + class definition, and append an entry to the right crush_device_ + class group + """ + for device in self.selection.data_devices(): + # iterate on List[Device], containing both path and + # crush_device_class + path = device.path + crush_device_class = device.crush_device_class + + if path is None: + raise ValueError("Device path can't be empty") + + """ + if crush_device_class is specified for the current Device path + we should either init the array for this group or append the + drive path to the existing entry + """ + if crush_device_class: + if crush_device_class in lvcount.keys(): + lvcount[crush_device_class].append(path) + else: + lvcount[crush_device_class] = [path] + continue + + """ + if no crush_device_class is specified for the current path + but a global definition is present in the spec, so we group + the drives together + """ + if crush_device_class is None and self.spec.crush_device_class: + lvcount[self.spec.crush_device_class].append(path) + continue + else: + # default use case + lvcount["no_crush"].append(path) + continue + + return lvcount + + def run(self): + # type: () -> List[str] + """ Generate ceph-volume commands based on the DriveGroup filters """ + + db_devices = [x.path for x in self.selection.db_devices()] + wal_devices = [x.path for x in self.selection.wal_devices()] + + if not self.selection.data_devices(): + return [] + + cmds: List[str] = [] + + devices = self.prepare_devices() + # get the total number of devices provided by the Dict[str, List[str]] + devices_count = len(sum(list(devices.values()), [])) + + if devices and db_devices: + if (devices_count != len(db_devices)) and (self.spec.method == 'raw'): + raise ValueError('Number of data devices must match number of ' + 'db devices for raw mode osds') + + if devices and wal_devices: + if (devices_count != len(wal_devices)) and (self.spec.method == 'raw'): + raise ValueError('Number of data devices must match number of ' + 'wal devices for raw mode osds') + + for d in devices.keys(): + data_devices: Optional[List[str]] = devices.get(d) + if not data_devices: + continue + + if self.spec.method == 'raw': + assert self.spec.objectstore == 'bluestore' + # ceph-volume raw prepare only support 1:1 ratio of data to db/wal devices + # for raw prepare each data device needs its own prepare command + dev_counter = 0 + # reversing the lists as we're assigning db_devices sequentially + db_devices.reverse() + wal_devices.reverse() + + while dev_counter < len(data_devices): + cmd = "raw prepare --bluestore" + cmd += " --data {}".format(data_devices[dev_counter]) + if db_devices: + cmd += " --block.db {}".format(db_devices.pop()) + if wal_devices: + cmd += " --block.wal {}".format(wal_devices.pop()) + if d in self._supported_device_classes: + cmd += " --crush-device-class {}".format(d) + + cmds.append(cmd) + dev_counter += 1 + + elif self.spec.objectstore == 'bluestore': + # for lvm batch we can just do all devices in one command + + cmd = "lvm batch --no-auto {}".format(" ".join(data_devices)) + + if db_devices: + cmd += " --db-devices {}".format(" ".join(db_devices)) + + if wal_devices: + cmd += " --wal-devices {}".format(" ".join(wal_devices)) + + if self.spec.block_wal_size: + cmd += " --block-wal-size {}".format(self.spec.block_wal_size) + + if self.spec.block_db_size: + cmd += " --block-db-size {}".format(self.spec.block_db_size) + + if d in self._supported_device_classes: + cmd += " --crush-device-class {}".format(d) + cmds.append(cmd) + + for i in range(len(cmds)): + if self.spec.encrypted: + cmds[i] += " --dmcrypt" + + if self.spec.osds_per_device: + cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device) + + if self.spec.data_allocate_fraction: + cmds[i] += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction) + + if self.osd_id_claims: + cmds[i] += " --osd-ids {}".format(" ".join(self.osd_id_claims)) + + if self.spec.method != 'raw': + cmds[i] += " --yes" + cmds[i] += " --no-systemd" + + # set the --crush-device-class option when: + # - crush_device_class is specified at spec level (global for all the osds) # noqa E501 + # - crush_device_class is allowed + # - there's no override at osd level + if ( + self.spec.crush_device_class and + self.spec.crush_device_class in self._supported_device_classes and # noqa E501 + "crush-device-class" not in cmds[i] + ): + cmds[i] += " --crush-device-class {}".format(self.spec.crush_device_class) # noqa E501 + + if self.preview: + cmds[i] += " --report" + cmds[i] += " --format json" + + return cmds diff --git a/src/python-common/ceph/deployment/utils.py b/src/python-common/ceph/deployment/utils.py new file mode 100644 index 000000000..6aad15b75 --- /dev/null +++ b/src/python-common/ceph/deployment/utils.py @@ -0,0 +1,102 @@ +import ipaddress +import socket +from typing import Tuple, Optional +from urllib.parse import urlparse + + +def unwrap_ipv6(address): + # type: (str) -> str + if address.startswith('[') and address.endswith(']'): + return address[1:-1] + return address + + +def wrap_ipv6(address): + # type: (str) -> str + + # We cannot assume it's already wrapped or even an IPv6 address if + # it's already wrapped it'll not pass (like if it's a hostname) and trigger + # the ValueError + try: + if ipaddress.ip_address(address).version == 6: + return f"[{address}]" + except ValueError: + pass + + return address + + +def is_ipv6(address): + # type: (str) -> bool + address = unwrap_ipv6(address) + try: + return ipaddress.ip_address(address).version == 6 + except ValueError: + return False + + +def valid_addr(addr: str) -> Tuple[bool, str]: + """check that an address string is valid + Valid in this case means that a name is resolvable, or the + IP address string is a correctly formed IPv4 or IPv6 address, + with or without a port + + Args: + addr (str): address + + Returns: + Tuple[bool, str]: Validity of the address, either + True, address type (IPv4[:Port], IPv6[:Port], Name[:Port]) + False, <error description> + """ + + def _dns_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]: + try: + socket.getaddrinfo(addr, None) + except socket.gaierror: + # not resolvable + return False, 'DNS lookup failed' + return True, 'Name:Port' if port else 'Name' + + def _ip_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]: + unwrapped = unwrap_ipv6(addr) + try: + ip_addr = ipaddress.ip_address(unwrapped) + except ValueError: + return False, 'Invalid IP v4 or v6 address format' + return True, f'IPv{ip_addr.version}:Port' if port else f'IPv{ip_addr.version}' + + dots = addr.count('.') + colons = addr.count(':') + addr_as_url = f'http://{addr}' + + try: + res = urlparse(addr_as_url) + except ValueError as e: + if str(e) == 'Invalid IPv6 URL': + return False, 'Address has incorrect/incomplete use of enclosing brackets' + return False, f'Unknown urlparse error {str(e)} for {addr_as_url}' + + addr = res.netloc + port = None + try: + port = res.port + if port: + addr = addr[:-len(f':{port}')] + except ValueError: + if colons == 1: + return False, 'Port must be numeric' + elif ']:' in addr: + return False, 'Port must be numeric' + + if addr.startswith('[') and dots: + return False, "IPv4 address wrapped in brackets is invalid" + + # catch partial address like 10.8 which would be valid IPaddress schemes + # but are classed as invalid here since they're not usable + if dots and addr[0].isdigit() and dots != 3: + return False, 'Invalid partial IPv4 address' + + if addr[0].isalpha() and '.' in addr: + return _dns_lookup(addr, port) + return _ip_lookup(addr, port) diff --git a/src/python-common/ceph/py.typed b/src/python-common/ceph/py.typed new file mode 100644 index 000000000..444b02d77 --- /dev/null +++ b/src/python-common/ceph/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. This package uses inline types.
\ No newline at end of file diff --git a/src/python-common/ceph/rgw/__init__.py b/src/python-common/ceph/rgw/__init__.py new file mode 100644 index 000000000..3988bf129 --- /dev/null +++ b/src/python-common/ceph/rgw/__init__.py @@ -0,0 +1,3 @@ +import logging + +log = logging.getLogger(__name__) diff --git a/src/python-common/ceph/rgw/diff.py b/src/python-common/ceph/rgw/diff.py new file mode 100644 index 000000000..cd91aa97f --- /dev/null +++ b/src/python-common/ceph/rgw/diff.py @@ -0,0 +1,93 @@ +class ZoneEPs: + def __init__(self): + self.endpoints = set() + + def add(self, ep): + if not ep: + return + + self.endpoints.add(ep) + + def diff(self, zep): + return list(self.endpoints.difference(zep.endpoints)) + + def get_all(self): + for ep in self.endpoints: + yield ep + + +class RealmEPs: + def __init__(self): + self.zones = {} + + def add(self, zone, ep=None): + if not zone: + return + + z = self.zones.get(zone) + if not z: + z = ZoneEPs() + self.zones[zone] = z + + z.add(ep) + + def diff(self, rep): + result = {} + for z, zep in rep.zones.items(): + myzep = self.zones.get(z) + if not myzep: + continue + + d = myzep.diff(zep) + if len(d) > 0: + result[z] = myzep.diff(zep) + + return result + + def get_all(self): + for z, zep in self.zones.items(): + eps = [] + for ep in zep.get_all(): + eps.append(ep) + yield z, eps + + +class RealmsEPs: + def __init__(self): + self.realms = {} + + def add(self, realm, zone=None, ep=None): + if not realm: + return + + r = self.realms.get(realm) + if not r: + r = RealmEPs() + self.realms[realm] = r + + r.add(zone, ep) + + def diff(self, rep): + result = {} + + for r, rep in rep.realms.items(): + myrealm = self.realms.get(r) + if not myrealm: + continue + + d = myrealm.diff(rep) + if d: + result[r] = d + + return result + + def get_all(self): + result = {} + for r, rep in self.realms.items(): + zs = {} + for z, eps in rep.get_all(): + zs[z] = eps + + result[r] = zs + + return result diff --git a/src/python-common/ceph/rgw/rgwam_core.py b/src/python-common/ceph/rgw/rgwam_core.py new file mode 100644 index 000000000..7041ea154 --- /dev/null +++ b/src/python-common/ceph/rgw/rgwam_core.py @@ -0,0 +1,937 @@ +# -*- mode:python -*- +# vim: ts=4 sw=4 smarttab expandtab +# +# Processed in Makefile to add python #! line and version variable +# +# + +import random +import string +import json +import socket +import base64 +import logging +import errno + +from .types import RGWAMException, RGWAMCmdRunException, RGWPeriod, RGWUser, RealmToken +from .diff import RealmsEPs + +DEFAULT_PORT = 8000 + +log = logging.getLogger(__name__) + + +def bool_str(x): + return 'true' if x else 'false' + + +def rand_alphanum_lower(k): + return ''.join(random.choices(string.ascii_lowercase + string.digits, k=k)) + + +def gen_name(prefix, suffix_len): + return prefix + rand_alphanum_lower(suffix_len) + + +def set_or_gen(val, gen, prefix): + if val: + return val + if gen: + return gen_name(prefix, 8) + + return None + + +def get_endpoints(endpoints, period=None): + if endpoints: + return endpoints + + hostname = socket.getfqdn() + + port = DEFAULT_PORT + + while True: + ep = 'http://%s:%d' % (hostname, port) + if not period or not period.endpoint_exists(ep): + return ep + port += 1 + + +class EnvArgs: + def __init__(self, mgr): + self.mgr = mgr + + +class EntityKey: + def __init__(self, name=None, id=None): + self.name = name + self.id = id + + def safe_vals(ek): + if not ek: + return None, None + return ek.name, ek.id + + +class EntityName(EntityKey): + def __init__(self, name=None): + super().__init__(name=name) + + +class EntityID(EntityKey): + def __init__(self, id=None): + super().__init__(id=id) + + +class ZoneEnv: + def __init__(self, env: EnvArgs, realm: EntityKey = None, zg: EntityKey = None, + zone: EntityKey = None): + self.env = env + self.realm = realm + self.zg = zg + self.zone = zone + + def set(self, env: EnvArgs = None, realm: EntityKey = None, zg: EntityKey = None, + zone: EntityKey = None): + if env: + self.env = env + if realm: + self.realm = realm + if zg: + self.zg = zg + if zone: + self.zone = zone + + return self + + def _init_entity(self, ek: EntityKey, gen, prefix): + name, id = EntityKey.safe_vals(ek) + name = set_or_gen(name, gen, prefix) + + return EntityKey(name, id) + + def init_realm(self, realm: EntityKey = None, gen=False): + self.realm = self._init_entity(realm, gen, 'realm-') + return self + + def init_zg(self, zg: EntityKey = None, gen=False): + self.zg = self._init_entity(zg, gen, 'zg-') + return self + + def init_zone(self, zone: EntityKey = None, gen=False): + self.zone = self._init_entity(zone, gen, 'zone-') + return self + + +def opt_arg(params, cmd, arg): + if arg: + params += [cmd, arg] + + +def opt_arg_bool(params, flag, arg): + if arg: + params += [flag] + + +class RGWCmdBase: + def __init__(self, prog, zone_env: ZoneEnv): + self.env = zone_env.env + self.mgr = self.env.mgr + self.prog = prog + self.cmd_suffix = [] + if zone_env.realm: + opt_arg(self.cmd_suffix, '--rgw-realm', zone_env.realm.name) + opt_arg(self.cmd_suffix, '--realm-id', zone_env.realm.id) + if zone_env.zg: + opt_arg(self.cmd_suffix, '--rgw-zonegroup', zone_env.zg.name) + opt_arg(self.cmd_suffix, '--zonegroup-id', zone_env.zg.id) + if zone_env.zone: + opt_arg(self.cmd_suffix, '--rgw-zone', zone_env.zone.name) + opt_arg(self.cmd_suffix, '--zone-id', zone_env.zone.id) + + def run(self, cmd): + args = cmd + self.cmd_suffix + cmd, returncode, stdout, stderr = self.mgr.tool_exec(self.prog, args) + + log.debug('cmd=%s' % str(cmd)) + log.debug('stdout=%s' % stdout) + + if returncode != 0: + cmd_str = ' '.join(cmd) + log.error('ERROR: command exited with error status (%d): %s\nstdout=%s\nstderr=%s' % + (returncode, cmd_str, stdout, stderr)) + raise RGWAMCmdRunException(cmd_str, -returncode, stdout, stderr) + + return (stdout, stderr) + + +class RGWAdminCmd(RGWCmdBase): + def __init__(self, zone_env: ZoneEnv): + super().__init__('radosgw-admin', zone_env) + + +class RGWAdminJSONCmd(RGWAdminCmd): + def __init__(self, zone_env: ZoneEnv): + super().__init__(zone_env) + + def run(self, cmd): + stdout, _ = RGWAdminCmd.run(self, cmd) + + return json.loads(stdout) + + +class RGWCmd(RGWCmdBase): + def __init__(self, zone_env: ZoneEnv): + super().__init__('radosgw', zone_env) + + +class RealmOp: + def __init__(self, env: EnvArgs): + self.env = env + + def list(self): + try: + ze = ZoneEnv(self.env) + params = ['realm', 'list'] + output = RGWAdminJSONCmd(ze).run(params) + return output.get('realms') or [] + except RGWAMException as e: + logging.info(f'Exception while listing realms {e.message}') + # in case the realm list is empty an exception is raised + return [] + + def get(self, realm: EntityKey = None): + ze = ZoneEnv(self.env, realm=realm) + params = ['realm', 'get'] + return RGWAdminJSONCmd(ze).run(params) + + def create(self, realm: EntityKey = None): + ze = ZoneEnv(self.env).init_realm(realm=realm, gen=True) + params = ['realm', 'create'] + return RGWAdminJSONCmd(ze).run(params) + + def pull(self, realm, url, access_key, secret): + params = ['realm', + 'pull', + '--url', url, + '--access-key', access_key, + '--secret', secret] + ze = ZoneEnv(self.env, realm=realm) + return RGWAdminJSONCmd(ze).run(params) + + +class ZonegroupOp: + def __init__(self, env: EnvArgs): + self.env = env + + def list(self): + try: + ze = ZoneEnv(self.env) + params = ['zonegroup', 'list'] + output = RGWAdminJSONCmd(ze).run(params) + return output.get('zonegroups') or [] + except RGWAMException as e: + logging.info(f'Exception while listing zonegroups {e.message}') + return [] + + def get(self, zonegroup: EntityKey = None): + ze = ZoneEnv(self.env) + params = ['zonegroup', 'get'] + opt_arg(params, '--rgw-zonegroup', zonegroup) + return RGWAdminJSONCmd(ze).run(params) + + def create(self, realm: EntityKey, zg: EntityKey = None, endpoints=None, is_master=True): + ze = ZoneEnv(self.env, realm=realm).init_zg(zg, gen=True) + + params = ['zonegroup', + 'create'] + + opt_arg_bool(params, '--master', is_master) + opt_arg(params, '--endpoints', endpoints) + + stdout, _ = RGWAdminCmd(ze).run(params) + + return json.loads(stdout) + + def modify(self, realm: EntityKey, zg: EntityKey, endpoints=None): + ze = ZoneEnv(self.env, realm=realm, zg=zg) + params = ['zonegroup', 'modify'] + opt_arg(params, '--endpoints', endpoints) + return RGWAdminJSONCmd(ze).run(params) + + +class ZoneOp: + def __init__(self, env: EnvArgs): + self.env = env + + def list(self): + try: + ze = ZoneEnv(self.env) + params = ['zone', 'list'] + output = RGWAdminJSONCmd(ze).run(params) + return output.get('zones') or [] + except RGWAMException as e: + logging.info(f'Exception while listing zones {e.message}') + return [] + + def get(self, zone: EntityKey): + ze = ZoneEnv(self.env, zone=zone) + + params = ['zone', + 'get'] + + return RGWAdminJSONCmd(ze).run(params) + + def create(self, realm: EntityKey, zonegroup: EntityKey, zone: EntityKey = None, + endpoints=None, is_master=True, + access_key=None, secret=None): + + ze = ZoneEnv(self.env, realm=realm, zg=zonegroup).init_zone(zone, gen=True) + + params = ['zone', + 'create'] + + opt_arg_bool(params, '--master', is_master) + opt_arg(params, '--access-key', access_key) + opt_arg(params, '--secret', secret) + opt_arg(params, '--endpoints', endpoints) + + return RGWAdminJSONCmd(ze).run(params) + + def modify(self, zone: EntityKey, zg: EntityKey, is_master=None, + access_key=None, secret=None, endpoints=None): + ze = ZoneEnv(self.env, zone=zone, zg=zg) + + params = ['zone', + 'modify'] + + opt_arg_bool(params, '--master', is_master) + opt_arg(params, '--access-key', access_key) + opt_arg(params, '--secret', secret) + opt_arg(params, '--endpoints', endpoints) + + return RGWAdminJSONCmd(ze).run(params) + + +class PeriodOp: + def __init__(self, env): + self.env = env + + def update(self, realm: EntityKey, zonegroup: EntityKey, zone: EntityKey, commit=True): + master_zone_info = self.get_master_zone(realm, zonegroup) + master_zone = EntityName(master_zone_info['name']) if master_zone_info else zone + master_zonegroup_info = self.get_master_zonegroup(realm) + master_zonegroup = EntityName(master_zonegroup_info['name']) \ + if master_zonegroup_info else zonegroup + ze = ZoneEnv(self.env, realm=realm, zg=master_zonegroup, zone=master_zone) + params = ['period', 'update'] + opt_arg_bool(params, '--commit', commit) + return RGWAdminJSONCmd(ze).run(params) + + def get_master_zone(self, realm, zonegroup=None): + try: + ze = ZoneEnv(self.env, realm=realm, zg=zonegroup) + params = ['zone', 'get'] + return RGWAdminJSONCmd(ze).run(params) + except RGWAMCmdRunException: + return None + + def get_master_zone_ep(self, realm, zonegroup=None): + try: + ze = ZoneEnv(self.env, realm=realm, zg=zonegroup) + params = ['period', 'get'] + output = RGWAdminJSONCmd(ze).run(params) + for zg in output['period_map']['zonegroups']: + if not bool(zg['is_master']): + continue + for zone in zg['zones']: + if zone['id'] == zg['master_zone']: + return zone['endpoints'] + return None + except RGWAMCmdRunException: + return None + + def get_master_zonegroup(self, realm): + try: + ze = ZoneEnv(self.env, realm=realm) + params = ['zonegroup', 'get'] + return RGWAdminJSONCmd(ze).run(params) + except RGWAMCmdRunException: + return None + + def get(self, realm=None): + ze = ZoneEnv(self.env, realm=realm) + params = ['period', 'get'] + return RGWAdminJSONCmd(ze).run(params) + + +class UserOp: + def __init__(self, env): + self.env = env + + def create(self, zone: EntityKey, zg: EntityKey, uid=None, uid_prefix=None, display_name=None, + email=None, is_system=False): + ze = ZoneEnv(self.env, zone=zone, zg=zg) + + u = uid or gen_name(uid_prefix or 'user-', 6) + + dn = display_name or u + + params = ['user', + 'create', + '--uid', u, + '--display-name', dn] + + opt_arg(params, '--email', email) + opt_arg_bool(params, '--system', is_system) + + return RGWAdminJSONCmd(ze).run(params) + + def info(self, zone: EntityKey, zg: EntityKey, uid=None, access_key=None): + ze = ZoneEnv(self.env, zone=zone, zg=zg) + + params = ['user', + 'info'] + + opt_arg(params, '--uid', uid) + opt_arg(params, '--access-key', access_key) + + return RGWAdminJSONCmd(ze).run(params) + + def rm(self, zone: EntityKey, zg: EntityKey, uid=None, access_key=None): + ze = ZoneEnv(self.env, zone=zone, zg=zg) + + params = ['user', + 'rm'] + + opt_arg(params, '--uid', uid) + opt_arg(params, '--access-key', access_key) + + return RGWAdminCmd(ze).run(params) + + def rm_key(self, zone: EntityKey, zg: EntityKey, access_key=None): + ze = ZoneEnv(self.env, zone=zone, zg=zg) + + params = ['key', + 'remove'] + + opt_arg(params, '--access-key', access_key) + + return RGWAdminCmd(ze).run(params) + + +class RGWAM: + def __init__(self, env): + self.env = env + + def realm_op(self): + return RealmOp(self.env) + + def period_op(self): + return PeriodOp(self.env) + + def zonegroup_op(self): + return ZonegroupOp(self.env) + + def zone_op(self): + return ZoneOp(self.env) + + def user_op(self): + return UserOp(self.env) + + def get_realm(self, realm_name): + try: + realm_info = self.realm_op().get(EntityName(realm_name)) + realm = EntityKey(realm_info['name'], realm_info['id']) + return realm + except RGWAMException: + raise None + + def create_realm(self, realm_name): + try: + realm_info = self.realm_op().create(EntityName(realm_name)) + realm = EntityKey(realm_info['name'], realm_info['id']) + logging.info(f'Created realm name={realm.name} id={realm.id}') + return realm + except RGWAMException as e: + raise RGWAMException('failed to create realm', e) + + def create_zonegroup(self, realm, zonegroup_name, zonegroup_is_master, endpoints=None): + try: + zg_info = self.zonegroup_op().create(realm, + EntityName(zonegroup_name), + endpoints, + is_master=zonegroup_is_master) + zonegroup = EntityKey(zg_info['name'], zg_info['id']) + logging.info(f'Created zonegroup name={zonegroup.name} id={zonegroup.id}') + return zonegroup + except RGWAMException as e: + raise RGWAMException('failed to create zonegroup', e) + + def create_zone(self, realm, zg, zone_name, zone_is_master, access_key=None, + secret=None, endpoints=None): + try: + zone_info = self.zone_op().create(realm, zg, + EntityName(zone_name), + endpoints, + is_master=zone_is_master, + access_key=access_key, + secret=secret) + + zone = EntityKey(zone_info['name'], zone_info['id']) + logging.info(f'Created zone name={zone.name} id={zone.id}') + return zone + except RGWAMException as e: + raise RGWAMException('failed to create zone', e) + + def create_system_user(self, realm, zonegroup, zone): + try: + sys_user_info = self.user_op().create(zone, + zonegroup, + uid=f'sysuser-{realm.name}', + uid_prefix='user-sys', + is_system=True) + sys_user = RGWUser(sys_user_info) + logging.info(f'Created system user: {sys_user.uid} on' + '{realm.name}/{zonegroup.name}/{zone.name}') + return sys_user + except RGWAMException as e: + raise RGWAMException('failed to create system user', e) + + def create_normal_user(self, zg, zone, uid=None): + try: + user_info = self.user_op().create(zone, zg, uid=uid, is_system=False) + user = RGWUser(user_info) + logging.info('Created regular user {user.uid} on' + '{realm.name}/{zonegroup.name}/{zone.name}') + return user + except RGWAMException as e: + raise RGWAMException('failed to create user', e) + + def update_period(self, realm, zg, zone=None): + try: + period_info = self.period_op().update(realm, zg, zone, commit=True) + period = RGWPeriod(period_info) + logging.info('Period: ' + period.id) + except RGWAMCmdRunException as e: + raise RGWAMException('failed to update period', e) + + def realm_bootstrap(self, rgw_spec, start_radosgw=True): + + realm_name = rgw_spec.rgw_realm + zonegroup_name = rgw_spec.rgw_zonegroup + zone_name = rgw_spec.rgw_zone + + # Some sanity checks + if realm_name in self.realm_op().list(): + raise RGWAMException(f'Realm {realm_name} already exists') + if zonegroup_name in self.zonegroup_op().list(): + raise RGWAMException(f'Zonegroup {zonegroup_name} already exists') + if zone_name in self.zone_op().list(): + raise RGWAMException(f'Zone {zone_name} already exists') + + # Create RGW entities and update the period + realm = self.create_realm(realm_name) + zonegroup = self.create_zonegroup(realm, zonegroup_name, zonegroup_is_master=True) + zone = self.create_zone(realm, zonegroup, zone_name, zone_is_master=True) + self.update_period(realm, zonegroup) + + # Create system user, normal user and update the master zone + sys_user = self.create_system_user(realm, zonegroup, zone) + rgw_acces_key = sys_user.get_key(0) + access_key = rgw_acces_key.access_key if rgw_acces_key else '' + secret = rgw_acces_key.secret_key if rgw_acces_key else '' + self.zone_op().modify(zone, zonegroup, None, + access_key, secret, endpoints=rgw_spec.zone_endpoints) + self.update_period(realm, zonegroup) + + if start_radosgw and rgw_spec.zone_endpoints is None: + # Instruct the orchestrator to start RGW daemons, asynchronically, this will + # call back the rgw module to update the master zone with the corresponding endpoints + realm_token = RealmToken(realm_name, + realm.id, + None, # no endpoint + access_key, secret) + realm_token_b = realm_token.to_json().encode('utf-8') + realm_token_s = base64.b64encode(realm_token_b).decode('utf-8') + rgw_spec.rgw_realm_token = realm_token_s + rgw_spec.update_endpoints = True + self.env.mgr.apply_rgw(rgw_spec) + + def realm_new_zone_creds(self, realm_name, endpoints, sys_uid): + try: + period_info = self.period_op().get(EntityName(realm_name)) + except RGWAMException as e: + raise RGWAMException('failed to fetch period info', e) + + period = RGWPeriod(period_info) + + master_zg = EntityID(period.master_zonegroup) + master_zone = EntityID(period.master_zone) + + try: + zone_info = self.zone_op().get(zone=master_zone) + except RGWAMException as e: + raise RGWAMException('failed to access master zone', e) + + zone_id = zone_info['id'] + + logging.info('Period: ' + period.id) + logging.info('Master zone: ' + period.master_zone) + + if period.master_zone != zone_id: + return (-errno.EINVAL, '', 'Command needs to run on master zone') + + ep = '' + if not endpoints: + eps = period.get_zone_endpoints(period.master_zonegroup, period.master_zone) + else: + eps = endpoints.split(',') + + if len(eps) > 0: + ep = eps[0] + + try: + sys_user_info = self.user_op().create(master_zone, master_zg, uid=sys_uid, + uid_prefix='user-sys', is_system=True) + except RGWAMException as e: + raise RGWAMException('failed to create system user', e) + + sys_user = RGWUser(sys_user_info) + + logging.info('Created system user: %s' % sys_user.uid) + + sys_access_key = '' + sys_secret = '' + + if len(sys_user.keys) > 0: + sys_access_key = sys_user.keys[0].access_key + sys_secret = sys_user.keys[0].secret_key + + realm_token = RealmToken(realm_name, period.realm_id, ep, sys_access_key, sys_secret) + + logging.info(realm_token.to_json()) + + realm_token_b = realm_token.to_json().encode('utf-8') + return (0, 'Realm Token: %s' % base64.b64encode(realm_token_b).decode('utf-8'), '') + + def realm_rm_zone_creds(self, realm_token_b64): + if not realm_token_b64: + raise RGWAMException('missing realm token') + + realm_token = RealmToken.from_base64_str(realm_token_b64) + try: + period_info = self.period_op().get(EntityID(realm_token.realm_id)) + except RGWAMException as e: + raise RGWAMException('failed to fetch period info', e) + + period = RGWPeriod(period_info) + master_zg = EntityID(period.master_zonegroup) + master_zone = EntityID(period.master_zone) + logging.info('Period: ' + period.id) + logging.info('Master zone: ' + period.master_zone) + try: + zone_info = self.zone_op().get(zone=master_zone) + except RGWAMException as e: + raise RGWAMException('failed to access master zone', e) + + if period.master_zone != zone_info['id']: + return (-errno.EINVAL, '', 'Command needs to run on master zone') + + access_key = realm_token.access_key + try: + user_info = self.user_op().info(master_zone, master_zg, access_key=access_key) + except RGWAMException as e: + raise RGWAMException('failed to get the system user information', e) + + user = RGWUser(user_info) + + only_key = True + + for k in user.keys: + if k.access_key != access_key: + only_key = False + break + + success_message = '' + + if only_key: + # the only key this user has is the one defined in the token + # can remove the user completely + + try: + self.user_op().rm(master_zone, master_zg, uid=user.uid) + except RGWAMException as e: + raise RGWAMException('failed removing user ' + user, user.uid, e) + + success_message = 'Removed uid ' + user.uid + else: + try: + self.user_op().rm_key(master_zone, master_zg, access_key=access_key) + except RGWAMException as e: + raise RGWAMException('failed removing access key ' + + access_key + '(uid = ' + user.uid + ')', e) + + success_message = 'Removed access key ' + access_key + '(uid = ' + user.uid + ')' + + return (0, success_message, '') + + def zone_modify(self, realm_name, zonegroup_name, zone_name, endpoints, realm_token_b64): + + if not realm_token_b64: + raise RGWAMException('missing realm access config') + if zone_name is None: + raise RGWAMException('Zone name is a mandatory parameter') + + realm_token = RealmToken.from_base64_str(realm_token_b64) + access_key = realm_token.access_key + secret = realm_token.secret + realm_name = realm_token.realm_name + realm_id = realm_token.realm_id + logging.info(f'Using realm {realm_name} {realm_id}') + + realm = EntityID(realm_id) + period_info = self.period_op().get(realm) + period = RGWPeriod(period_info) + logging.info('Period: ' + period.id) + zonegroup = period.find_zonegroup_by_name(zonegroup_name) + if not zonegroup: + raise RGWAMException(f'zonegroup {zonegroup_name} not found') + + zg = EntityName(zonegroup.name) + zone = EntityName(zone_name) + master_zone_info = self.period_op().get_master_zone(realm, zg) + success_message = f'Modified zone {realm_name} {zonegroup_name} {zone_name}' + logging.info(success_message) + try: + self.zone_op().modify(zone, zg, access_key=access_key, + secret=secret, endpoints=','.join(endpoints)) + # we only update the zonegroup endpoints if the zone being + # modified is a master zone + if zone_name == master_zone_info['name']: + self.zonegroup_op().modify(realm, zg, endpoints=','.join(endpoints)) + except RGWAMException as e: + raise RGWAMException('failed to modify zone', e) + + # done, let's update the period + try: + period_info = self.period_op().update(realm, zg, zone, True) + except RGWAMException as e: + raise RGWAMException('failed to update period', e) + + period = RGWPeriod(period_info) + logging.debug(period.to_json()) + + return (0, success_message, '') + + def get_realms_info(self): + realms_info = [] + for realm_name in self.realm_op().list(): + realm = self.get_realm(realm_name) + master_zone_inf = self.period_op().get_master_zone(realm) + zone_ep = self.period_op().get_master_zone_ep(realm) + if master_zone_inf and 'system_key' in master_zone_inf: + access_key = master_zone_inf['system_key']['access_key'] + secret = master_zone_inf['system_key']['secret_key'] + else: + access_key = '' + secret = '' + realms_info.append({"realm_name": realm_name, + "realm_id": realm.id, + "master_zone_id": master_zone_inf['id'] if master_zone_inf else '', + "endpoint": zone_ep[0] if zone_ep else None, + "access_key": access_key, + "secret": secret}) + return realms_info + + def zone_create(self, rgw_spec, start_radosgw): + + if not rgw_spec.rgw_realm_token: + raise RGWAMException('missing realm token') + if rgw_spec.rgw_zone is None: + raise RGWAMException('Zone name is a mandatory parameter') + if rgw_spec.rgw_zone in self.zone_op().list(): + raise RGWAMException(f'Zone {rgw_spec.rgw_zone} already exists') + + realm_token = RealmToken.from_base64_str(rgw_spec.rgw_realm_token) + if realm_token.endpoint is None: + raise RGWAMException('Provided realm token has no endpoint') + + access_key = realm_token.access_key + secret = realm_token.secret + try: + realm_info = self.realm_op().pull(EntityName(realm_token.realm_name), + realm_token.endpoint, access_key, secret) + except RGWAMException as e: + raise RGWAMException('failed to pull realm', e) + + logging.info(f"Pulled realm {realm_info['name']} ({realm_info['id']})") + realm_name = realm_info['name'] + realm_id = realm_info['id'] + + realm = EntityID(realm_id) + period_info = self.period_op().get(realm) + period = RGWPeriod(period_info) + logging.info('Period: ' + period.id) + + zonegroup = period.get_master_zonegroup() + if not zonegroup: + raise RGWAMException('Cannot find master zonegroup of realm {realm_name}') + + zone = self.create_zone(realm, zonegroup, rgw_spec.rgw_zone, + False, # secondary zone + access_key, secret, endpoints=rgw_spec.zone_endpoints) + self.update_period(realm, zonegroup, zone) + + period = RGWPeriod(period_info) + logging.debug(period.to_json()) + + if start_radosgw and rgw_spec.zone_endpoints is None: + secondary_realm_token = RealmToken(realm_name, + realm_id, + None, # no endpoint + realm_token.access_key, + realm_token.secret) + realm_token_b = secondary_realm_token.to_json().encode('utf-8') + realm_token_s = base64.b64encode(realm_token_b).decode('utf-8') + rgw_spec.update_endpoints = True + rgw_spec.rgw_token = realm_token_s + rgw_spec.rgw_zonegroup = zonegroup.name # master zonegroup is used + self.env.mgr.apply_rgw(rgw_spec) + + def _get_daemon_eps(self, realm_name=None, zonegroup_name=None, zone_name=None): + # get running daemons info + service_name = None + if realm_name and zone_name: + service_name = 'rgw.%s.%s' % (realm_name, zone_name) + + daemon_type = 'rgw' + daemon_id = None + hostname = None + refresh = True + + daemons = self.env.mgr.list_daemons(service_name, + daemon_type, + daemon_id=daemon_id, + host=hostname, + refresh=refresh) + + rep = RealmsEPs() + + for s in daemons: + for p in s.ports: + svc_id = s.service_id() + sp = svc_id.split('.') + if len(sp) < 2: + log.error('ERROR: service id cannot be parsed: (svc_id=%s)' % svc_id) + continue + + svc_realm = sp[0] + svc_zone = sp[1] + + if realm_name and svc_realm != realm_name: + log.debug('skipping realm %s' % svc_realm) + continue + + if zone_name and svc_zone != zone_name: + log.debug('skipping zone %s' % svc_zone) + continue + + ep = 'http://%s:%d' % (s.hostname, p) # ssl? + + rep.add(svc_realm, svc_zone, ep) + + return rep + + def _get_rgw_eps(self, realm_name=None, zonegroup_name=None, zone_name=None): + rep = RealmsEPs() + + try: + realms = self.realm_op().list() + except RGWAMException as e: + raise RGWAMException('failed to list realms', e) + + zones_map = {} + for realm in realms: + if realm_name and realm != realm_name: + log.debug('skipping realm %s' % realm) + continue + + period_info = self.period_op().get(EntityName(realm)) + + period = RGWPeriod(period_info) + + zones_map[realm] = {} + + for zg in period.iter_zonegroups(): + if zonegroup_name and zg.name != zonegroup_name: + log.debug('skipping zonegroup %s' % zg.name) + continue + + for zone in zg.iter_zones(): + if zone_name and zone.name != zone_name: + log.debug('skipping zone %s' % zone.name) + continue + + zones_map[realm][zone.name] = zg.name + + if len(zone.endpoints) == 0: + rep.add(realm, zone.name, None) + continue + + for ep in zone.endpoints: + rep.add(realm, zone.name, ep) + + return (rep, zones_map) + + def realm_reconcile(self, realm_name=None, zonegroup_name=None, zone_name=None, update=False): + + daemon_rep = self._get_daemon_eps(realm_name, zonegroup_name, zone_name) + + rgw_rep, zones_map = self._get_rgw_eps(realm_name, zonegroup_name, zone_name) + + diff = daemon_rep.diff(rgw_rep) + + diffj = json.dumps(diff) + + if not update: + return (0, diffj, '') + + for realm, realm_diff in diff.items(): + for zone, endpoints in realm_diff.items(): + + zg = zones_map[realm][zone] + + try: + self.zone_op().modify(EntityName(zone), EntityName(zg), + endpoints=','.join(diff[realm][zone])) + except RGWAMException as e: + raise RGWAMException('failed to modify zone', e) + + try: + self.period_op().update(EntityName(realm), EntityName(zg), EntityName(zone), True) + except RGWAMException as e: + raise RGWAMException('failed to update period', e) + + return (0, 'Updated: ' + diffj, '') + + def run_radosgw(self, port=None, log_file=None, debug_ms=None, debug_rgw=None): + + fe_cfg = 'beast' + if port: + fe_cfg += ' port=%s' % port + + params = ['--rgw-frontends', fe_cfg] + + if log_file: + params += ['--log-file', log_file] + + if debug_ms: + params += ['--debug-ms', debug_ms] + + if debug_rgw: + params += ['--debug-rgw', debug_rgw] + + (retcode, stdout, stderr) = RGWCmd(self.env).run(params) + + return (retcode, stdout, stderr) diff --git a/src/python-common/ceph/rgw/types.py b/src/python-common/ceph/rgw/types.py new file mode 100644 index 000000000..3f65f9da0 --- /dev/null +++ b/src/python-common/ceph/rgw/types.py @@ -0,0 +1,186 @@ +import json +import base64 +import binascii +import errno + +from abc import abstractmethod + + +class RGWAMException(Exception): + def __init__(self, message, orig=None): + if orig: + self.message = message + ': ' + orig.message + self.retcode = orig.retcode + self.stdout = orig.stdout + self.stderr = orig.stdout + else: + self.message = message + self.retcode = -errno.EINVAL + self.stdout = '' + self.stderr = message + + +class RGWAMCmdRunException(RGWAMException): + def __init__(self, cmd, retcode, stdout, stderr): + super().__init__('Command error (%d): %s\nstdout:%s\nstderr:%s' % + (retcode, cmd, stdout, stderr)) + self.retcode = retcode + self.stdout = stdout + self.stderr = stderr + + +class RGWAMEnvMgr: + @abstractmethod + def tool_exec(self, prog, args): + pass + + @abstractmethod + def apply_rgw(self, spec): + pass + + @abstractmethod + def list_daemons(self, service_name, daemon_type=None, daemon_id=None, hostname=None, + refresh=True): + pass + + +class JSONObj: + def to_json(self): + return json.dumps(self, default=lambda o: o.__dict__, indent=4) + + +class RealmToken(JSONObj): + def __init__(self, realm_name, realm_id, endpoint, access_key, secret): + self.realm_name = realm_name + self.realm_id = realm_id + self.endpoint = endpoint + self.access_key = access_key + self.secret = secret + + @classmethod + def from_base64_str(cls, realm_token_b64): + try: + realm_token_b = base64.b64decode(realm_token_b64) + realm_token_s = realm_token_b.decode('utf-8') + realm_token = json.loads(realm_token_s) + return cls(**realm_token) + except binascii.Error: + return None + + +class RGWZone(JSONObj): + def __init__(self, zone_dict): + self.id = zone_dict['id'] + self.name = zone_dict['name'] + self.endpoints = zone_dict['endpoints'] + + +class RGWZoneGroup(JSONObj): + def __init__(self, zg_dict): + self.id = zg_dict['id'] + self.name = zg_dict['name'] + self.api_name = zg_dict['api_name'] + self.is_master = zg_dict['is_master'] + self.endpoints = zg_dict['endpoints'] + + self.zones_by_id = {} + self.zones_by_name = {} + self.all_endpoints = [] + + for zone in zg_dict['zones']: + z = RGWZone(zone) + self.zones_by_id[zone['id']] = z + self.zones_by_name[zone['name']] = z + self.all_endpoints += z.endpoints + + def endpoint_exists(self, endpoint): + for ep in self.all_endpoints: + if ep == endpoint: + return True + return False + + def get_zone_endpoints(self, zone_id): + z = self.zones_by_id.get(zone_id) + if not z: + return None + + return z.endpoints + + def iter_zones(self): + for zone in self.zones_by_id.values(): + yield zone + + +class RGWPeriod(JSONObj): + def __init__(self, period_dict): + self.id = period_dict['id'] + self.epoch = period_dict['epoch'] + self.master_zone = period_dict['master_zone'] + self.master_zonegroup = period_dict['master_zonegroup'] + self.realm_name = period_dict['realm_name'] + self.realm_id = period_dict['realm_id'] + pm = period_dict['period_map'] + self.zonegroups_by_id = {} + self.zonegroups_by_name = {} + + for zg in pm['zonegroups']: + self.zonegroups_by_id[zg['id']] = RGWZoneGroup(zg) + self.zonegroups_by_name[zg['name']] = RGWZoneGroup(zg) + + def endpoint_exists(self, endpoint): + for _, zg in self.zonegroups_by_id.items(): + if zg.endpoint_exists(endpoint): + return True + return False + + def find_zonegroup_by_name(self, zonegroup): + if not zonegroup: + return self.find_zonegroup_by_id(self.master_zonegroup) + return self.zonegroups_by_name.get(zonegroup) + + def get_master_zonegroup(self): + return self.find_zonegroup_by_id(self.master_zonegroup) + + def find_zonegroup_by_id(self, zonegroup): + return self.zonegroups_by_id.get(zonegroup) + + def get_zone_endpoints(self, zonegroup_id, zone_id): + zg = self.zonegroups_by_id.get(zonegroup_id) + if not zg: + return None + + return zg.get_zone_endpoints(zone_id) + + def iter_zonegroups(self): + for zg in self.zonegroups_by_id.values(): + yield zg + + +class RGWAccessKey(JSONObj): + def __init__(self, d): + self.uid = d['user'] + self.access_key = d['access_key'] + self.secret_key = d['secret_key'] + + +class RGWUser(JSONObj): + def __init__(self, d): + self.uid = d['user_id'] + self.display_name = d['display_name'] + self.email = d['email'] + + self.keys = [] + + for k in d['keys']: + self.keys.append(RGWAccessKey(k)) + + is_system = d.get('system') or 'false' + self.system = (is_system == 'true') + + def add_key(self, access_key, secret): + self.keys.append(RGWAccessKey({'user': self.uid, + 'access_key': access_key, + 'secret_key': secret})) + + def get_key(self, index): + return self.keys[index] if index < len(self.keys) else None diff --git a/src/python-common/ceph/tests/__init__.py b/src/python-common/ceph/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/python-common/ceph/tests/__init__.py diff --git a/src/python-common/ceph/tests/c-v-inventory.json b/src/python-common/ceph/tests/c-v-inventory.json new file mode 100644 index 000000000..c24345525 --- /dev/null +++ b/src/python-common/ceph/tests/c-v-inventory.json @@ -0,0 +1,155 @@ +[ + { + "available": false, + "created": "2022-02-11T10:58:23.177450Z", + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "", + "rotational": "0", + "vendor": "", + "human_readable_size": "50.00 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": {}, + "rev": "", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/dm-0", + "support_discard": "", + "model": "", + "ro": "0", + "nr_requests": "128", + "size": 53687091200 + }, + "lvs": [], + "path": "/dev/dm-0" + }, + { + "available": false, + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "", + "rotational": "0", + "vendor": "", + "human_readable_size": "31.47 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": {}, + "rev": "", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/dm-1", + "support_discard": "", + "model": "", + "ro": "0", + "nr_requests": "128", + "size": 33789313024 + }, + "lvs": [], + "path": "/dev/dm-1" + }, + { + "available": false, + "created": "2022-02-11T10:58:23.177450Z", + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "", + "rotational": "0", + "vendor": "", + "human_readable_size": "394.27 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": {}, + "rev": "", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/dm-2", + "support_discard": "", + "model": "", + "ro": "0", + "nr_requests": "128", + "size": 423347879936 + }, + "lvs": [], + "path": "/dev/dm-2" + }, + { + "available": false, + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "cfq", + "rotational": "0", + "vendor": "ATA", + "human_readable_size": "476.94 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": { + "sda2": { + "start": "411648", + "holders": [], + "sectorsize": 512, + "sectors": "2097152", + "size": "1024.00 MB" + }, + "sda3": { + "start": "2508800", + "holders": [ + "dm-1", + "dm-2", + "dm-0" + ], + "sectorsize": 512, + "sectors": "997705728", + "size": "475.74 GB" + }, + "sda1": { + "start": "2048", + "holders": [], + "sectorsize": 512, + "sectors": "409600", + "size": "200.00 MB" + } + }, + "rev": "0000", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/sda", + "support_discard": "", + "model": "SanDisk SD8SN8U5", + "ro": "0", + "nr_requests": "128", + "size": 512110190592 + }, + "lvs": [ + { + "comment": "not used by ceph", + "name": "swap" + }, + { + "comment": "not used by ceph", + "name": "home" + }, + { + "comment": "not used by ceph", + "name": "root" + } + ], + "path": "/dev/sda" + } +] diff --git a/src/python-common/ceph/tests/factories.py b/src/python-common/ceph/tests/factories.py new file mode 100644 index 000000000..6938fd084 --- /dev/null +++ b/src/python-common/ceph/tests/factories.py @@ -0,0 +1,101 @@ +from ceph.deployment.inventory import Device + + +class InventoryFactory(object): + def __init__(self): + self.taken_paths = [] + + def _make_path(self, ident='b'): + return "/dev/{}{}".format(self.prefix, ident) + + def _find_new_path(self): + cnt = 0 + if len(self.taken_paths) >= 25: + raise Exception( + "Double-character disks are not implemetend. Maximum amount" + "of disks reached.") + + while self.path in self.taken_paths: + ident = chr(ord('b') + cnt) + self.path = "/dev/{}{}".format(self.prefix, ident) + cnt += 1 + + def assemble(self): + if self.empty: + return {} + self._find_new_path() + inventory_sample = { + 'available': self.available, + 'lvs': [], + 'path': self.path, + 'rejected_reasons': self.rejected_reason, + 'sys_api': { + 'human_readable_size': self.human_readable_size, + 'locked': 1, + 'model': self.model, + 'nr_requests': '256', + 'partitions': + { # partitions are not as relevant for now, todo for later + 'sda1': { + 'sectors': '41940992', + 'sectorsize': 512, + 'size': self.human_readable_size, + 'start': '2048' + } + }, + 'path': self.path, + 'removable': '0', + 'rev': '', + 'ro': '0', + 'rotational': str(self.rotational), + 'sas_address': '', + 'sas_device_handle': '', + 'scheduler_mode': 'mq-deadline', + 'sectors': 0, + 'sectorsize': '512', + 'size': self.size, + 'support_discard': '', + 'vendor': self.vendor + } + } + + if self.available: + self.taken_paths.append(self.path) + return inventory_sample + return {} + + def _init(self, **kwargs): + self.prefix = 'sd' + self.path = kwargs.get('path', self._make_path()) + self.human_readable_size = kwargs.get('human_readable_size', + '50.00 GB') + self.vendor = kwargs.get('vendor', 'samsung') + self.model = kwargs.get('model', '42-RGB') + self.available = kwargs.get('available', True) + self.rejected_reason = kwargs.get('rejected_reason', ['']) + self.rotational = kwargs.get('rotational', '1') + if not self.available: + self.rejected_reason = ['locked'] + self.empty = kwargs.get('empty', False) + self.size = kwargs.get('size', 5368709121) + + def produce(self, pieces=1, **kwargs): + if kwargs.get('path') and pieces > 1: + raise Exception("/path/ and /pieces/ are mutually exclusive") + # Move to custom init to track _taken_paths. + # class is invoked once in each context. + # if disks with different properties are being created + # we'd have to re-init the class and loose track of the + # taken_paths + self._init(**kwargs) + return [self.assemble() for x in range(0, pieces)] + + +class DeviceFactory(object): + def __init__(self, device_setup): + self.device_setup = device_setup + self.pieces = device_setup.get('pieces', 1) + self.device_conf = device_setup.get('device_config', {}) + + def produce(self): + return [Device(**self.device_conf) for x in range(0, self.pieces)] diff --git a/src/python-common/ceph/tests/test_datetime.py b/src/python-common/ceph/tests/test_datetime.py new file mode 100644 index 000000000..d03a82930 --- /dev/null +++ b/src/python-common/ceph/tests/test_datetime.py @@ -0,0 +1,61 @@ +import datetime + +import pytest + +from ceph.utils import datetime_now, datetime_to_str, str_to_datetime + + +def test_datetime_to_str_1(): + dt = datetime.datetime.now() + assert type(datetime_to_str(dt)) is str + + +def test_datetime_to_str_2(): + # note: tz isn't specified in the string, so explicitly store this as UTC + dt = datetime.datetime.strptime( + '2019-04-24T17:06:53.039991', + '%Y-%m-%dT%H:%M:%S.%f' + ).replace(tzinfo=datetime.timezone.utc) + assert datetime_to_str(dt) == '2019-04-24T17:06:53.039991Z' + + +def test_datetime_to_str_3(): + dt = datetime.datetime.strptime('2020-11-02T04:40:12.748172-0800', + '%Y-%m-%dT%H:%M:%S.%f%z') + assert datetime_to_str(dt) == '2020-11-02T12:40:12.748172Z' + + +def test_str_to_datetime_1(): + dt = str_to_datetime('2020-03-03T09:21:43.636153304Z') + assert type(dt) is datetime.datetime + assert dt.tzinfo is not None + + +def test_str_to_datetime_2(): + dt = str_to_datetime('2020-03-03T15:52:30.136257504-0600') + assert type(dt) is datetime.datetime + assert dt.tzinfo is not None + + +def test_str_to_datetime_3(): + dt = str_to_datetime('2020-03-03T15:52:30.136257504') + assert type(dt) is datetime.datetime + assert dt.tzinfo is not None + + +def test_str_to_datetime_invalid_format_1(): + with pytest.raises(ValueError): + str_to_datetime('2020-03-03 15:52:30.136257504') + + +def test_str_to_datetime_invalid_format_2(): + with pytest.raises(ValueError): + str_to_datetime('2020-03-03') + + +def test_datetime_now_1(): + dt = str_to_datetime('2020-03-03T09:21:43.636153304Z') + dt_now = datetime_now() + assert type(dt_now) is datetime.datetime + assert dt_now.tzinfo is not None + assert dt < dt_now diff --git a/src/python-common/ceph/tests/test_disk_selector.py b/src/python-common/ceph/tests/test_disk_selector.py new file mode 100644 index 000000000..b08236130 --- /dev/null +++ b/src/python-common/ceph/tests/test_disk_selector.py @@ -0,0 +1,560 @@ +# flake8: noqa +import pytest + +from ceph.deployment.drive_selection.matchers import _MatchInvalid +from ceph.deployment.inventory import Devices, Device + +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \ + DriveGroupValidationError + +from ceph.deployment import drive_selection +from ceph.deployment.service_spec import PlacementSpec +from ceph.tests.factories import InventoryFactory +from ceph.tests.utils import _mk_inventory, _mk_device + + +class TestMatcher(object): + """ Test Matcher base class + """ + + def test_get_disk_key_3(self): + """ + virtual is False + key is found + retrun value of key is expected + """ + disk_map = Device(path='/dev/vdb', sys_api={'foo': 'bar'}) + ret = drive_selection.Matcher('foo', 'bar')._get_disk_key(disk_map) + assert ret is disk_map.sys_api.get('foo') + + def test_get_disk_key_4(self): + """ + virtual is False + key is not found + expect raise Exception + """ + disk_map = Device(path='/dev/vdb') + with pytest.raises(Exception): + drive_selection.Matcher('bar', 'foo')._get_disk_key(disk_map) + pytest.fail("No disk_key found for foo or None") + + +class TestSubstringMatcher(object): + def test_compare(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(model='samsung')) + matcher = drive_selection.SubstringMatcher('model', 'samsung') + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_false(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(model='nothing_matching')) + matcher = drive_selection.SubstringMatcher('model', 'samsung') + ret = matcher.compare(disk_dict) + assert ret is False + + +class TestEqualityMatcher(object): + def test_compare(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(rotates='1')) + matcher = drive_selection.EqualityMatcher('rotates', '1') + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_false(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(rotates='1')) + matcher = drive_selection.EqualityMatcher('rotates', '0') + ret = matcher.compare(disk_dict) + assert ret is False + + +class TestAllMatcher(object): + def test_compare(self): + disk_dict = Device(path='/dev/vdb') + matcher = drive_selection.AllMatcher('all', 'True') + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_value_not_true(self): + disk_dict = Device(path='/dev/vdb') + matcher = drive_selection.AllMatcher('all', 'False') + ret = matcher.compare(disk_dict) + assert ret is True + + +class TestSizeMatcher(object): + def test_parse_filter_exact(self): + """ Testing exact notation with 20G """ + matcher = drive_selection.SizeMatcher('size', '20G') + assert isinstance(matcher.exact, tuple) + assert matcher.exact[0] == '20' + assert matcher.exact[1] == 'GB' + + def test_parse_filter_exact_GB_G(self): + """ Testing exact notation with 20G """ + matcher = drive_selection.SizeMatcher('size', '20GB') + assert isinstance(matcher.exact, tuple) + assert matcher.exact[0] == '20' + assert matcher.exact[1] == 'GB' + + def test_parse_filter_high_low(self): + """ Testing high-low notation with 20G:50G """ + + matcher = drive_selection.SizeMatcher('size', '20G:50G') + assert isinstance(matcher.exact, tuple) + assert matcher.low[0] == '20' + assert matcher.high[0] == '50' + assert matcher.low[1] == 'GB' + assert matcher.high[1] == 'GB' + + def test_parse_filter_max_high(self): + """ Testing high notation with :50G """ + + matcher = drive_selection.SizeMatcher('size', ':50G') + assert isinstance(matcher.exact, tuple) + assert matcher.high[0] == '50' + assert matcher.high[1] == 'GB' + + def test_parse_filter_min_low(self): + """ Testing low notation with 20G: """ + + matcher = drive_selection.SizeMatcher('size', '50G:') + assert isinstance(matcher.exact, tuple) + assert matcher.low[0] == '50' + assert matcher.low[1] == 'GB' + + def test_to_byte_KB(self): + """ I doubt anyone ever thought we'd need to understand KB """ + + ret = drive_selection.SizeMatcher('size', '4K').to_byte(('4', 'KB')) + assert ret == 4 * 1e+3 + + def test_to_byte_GB(self): + """ Pretty nonesense test..""" + + ret = drive_selection.SizeMatcher('size', '10G').to_byte(('10', 'GB')) + assert ret == 10 * 1e+9 + + def test_to_byte_MB(self): + """ Pretty nonesense test..""" + + ret = drive_selection.SizeMatcher('size', '10M').to_byte(('10', 'MB')) + assert ret == 10 * 1e+6 + + def test_to_byte_TB(self): + """ Pretty nonesense test..""" + + ret = drive_selection.SizeMatcher('size', '10T').to_byte(('10', 'TB')) + assert ret == 10 * 1e+12 + + def test_to_byte_PB(self): + """ Expect to raise """ + + with pytest.raises(_MatchInvalid): + drive_selection.SizeMatcher('size', '10P').to_byte(('10', 'PB')) + assert 'Unit \'P\' is not supported' + + def test_compare_exact(self): + + matcher = drive_selection.SizeMatcher('size', '20GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.00 GB')) + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_exact_decimal(self): + + matcher = drive_selection.SizeMatcher('size', '20.12GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.12 GB')) + ret = matcher.compare(disk_dict) + assert ret is True + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", False), + ("20.00 GB", True), + ("50.00 GB", True), + ("100.00 GB", True), + ("101.00 GB", False), + ("1101.00 GB", False), + ]) + def test_compare_high_low(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', '20GB:100GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", True), + ("20.00 GB", True), + ("50.00 GB", True), + ("100.00 GB", False), + ("101.00 GB", False), + ("1101.00 GB", False), + ]) + def test_compare_high(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', ':50GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", False), + ("20.00 GB", False), + ("50.00 GB", True), + ("100.00 GB", True), + ("101.00 GB", True), + ("1101.00 GB", True), + ]) + def test_compare_low(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', '50GB:') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", False), + ("20.00 GB", False), + ("50.00 GB", False), + ("100.00 GB", False), + ("101.00 GB", False), + ("1101.00 GB", True), + ("9.10 TB", True), + ]) + def test_compare_at_least_1TB(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', '1TB:') + disk_dict = Device(path='/dev/sdz', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + def test_compare_raise(self): + + matcher = drive_selection.SizeMatcher('size', 'None') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.00 GB')) + with pytest.raises(Exception): + matcher.compare(disk_dict) + pytest.fail("Couldn't parse size") + + @pytest.mark.parametrize("test_input,expected", [ + ("10G", ('10', 'GB')), + ("20GB", ('20', 'GB')), + ("10g", ('10', 'GB')), + ("20gb", ('20', 'GB')), + ]) + def test_get_k_v(self, test_input, expected): + assert drive_selection.SizeMatcher('size', '10G')._get_k_v(test_input) == expected + + @pytest.mark.parametrize("test_input,expected", [ + ("10G", ('GB')), + ("10g", ('GB')), + ("20GB", ('GB')), + ("20gb", ('GB')), + ("20TB", ('TB')), + ("20tb", ('TB')), + ("20T", ('TB')), + ("20t", ('TB')), + ("20MB", ('MB')), + ("20mb", ('MB')), + ("20M", ('MB')), + ("20m", ('MB')), + ]) + def test_parse_suffix(self, test_input, expected): + assert drive_selection.SizeMatcher('size', '10G')._parse_suffix(test_input) == expected + + @pytest.mark.parametrize("test_input,expected", [ + ("G", 'GB'), + ("GB", 'GB'), + ("TB", 'TB'), + ("T", 'TB'), + ("MB", 'MB'), + ("M", 'MB'), + ]) + def test_normalize_suffix(self, test_input, expected): + + assert drive_selection.SizeMatcher('10G', 'size')._normalize_suffix(test_input) == expected + + def test_normalize_suffix_raises(self): + + with pytest.raises(_MatchInvalid): + drive_selection.SizeMatcher('10P', 'size')._normalize_suffix("P") + pytest.fail("Unit 'P' not supported") + + +class TestDriveGroup(object): + @pytest.fixture(scope='class') + def test_fix(self, empty=None): + def make_sample_data(empty=empty, + data_limit=0, + wal_limit=0, + db_limit=0, + osds_per_device='', + disk_format='bluestore'): + raw_sample_bluestore = { + 'service_type': 'osd', + 'service_id': 'foo', + 'placement': {'host_pattern': 'data*'}, + 'data_devices': { + 'size': '30G:50G', + 'model': '42-RGB', + 'vendor': 'samsung', + 'limit': data_limit + }, + 'wal_devices': { + 'model': 'fast', + 'limit': wal_limit + }, + 'db_devices': { + 'size': ':20G', + 'limit': db_limit + }, + 'db_slots': 5, + 'wal_slots': 5, + 'block_wal_size': '5G', + 'block_db_size': '10G', + 'objectstore': disk_format, + 'osds_per_device': osds_per_device, + 'encrypted': True, + } + raw_sample_filestore = { + 'service_type': 'osd', + 'service_id': 'foo', + 'placement': {'host_pattern': 'data*'}, + 'objectstore': 'filestore', + 'data_devices': { + 'size': '30G:50G', + 'model': 'foo', + 'vendor': '1x', + 'limit': data_limit + }, + 'journal_devices': { + 'size': ':20G' + }, + 'journal_size': '5G', + 'osds_per_device': osds_per_device, + 'encrypted': True, + } + if disk_format == 'filestore': + raw_sample = raw_sample_filestore + else: + raw_sample = raw_sample_bluestore + + if empty: + raw_sample = { + 'service_type': 'osd', + 'service_id': 'foo', + 'placement': {'host_pattern': 'data*'}, + 'data_devices': { + 'all': True + }, + } + + dgo = DriveGroupSpec.from_json(raw_sample) + return dgo + + return make_sample_data + + def test_encryption_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.encrypted is True + + def test_encryption_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.encrypted is False + + def test_db_slots_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.db_slots == 5 + + def test_db_slots_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.db_slots is None + + def test_wal_slots_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.wal_slots == 5 + + def test_wal_slots_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.wal_slots is None + + def test_block_wal_size_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.block_wal_size == '5G' + + def test_block_wal_size_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.block_wal_size is None + + def test_block_db_size_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.block_db_size == '10G' + + def test_block_db_size_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.block_db_size is None + + def test_data_devices_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.data_devices == DeviceSelection( + model='42-RGB', + size='30G:50G', + vendor='samsung', + limit=0, + ) + + def test_data_devices_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.db_devices is None + + def test_db_devices_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.db_devices == DeviceSelection( + size=':20G', + limit=0, + ) + + def test_db_devices_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.db_devices is None + + def test_wal_device_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.wal_devices == DeviceSelection( + model='fast', + limit=0, + ) + + def test_wal_device_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.wal_devices is None + + def test_bluestore_format_prop(self, test_fix): + test_fix = test_fix(disk_format='bluestore') + assert test_fix.objectstore == 'bluestore' + + def test_default_format_prop(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.objectstore == 'bluestore' + + def test_osds_per_device(self, test_fix): + test_fix = test_fix(osds_per_device='3') + assert test_fix.osds_per_device == '3' + + def test_osds_per_device_default(self, test_fix): + test_fix = test_fix() + assert test_fix.osds_per_device == '' + + def test_journal_size_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.journal_size is None + + @pytest.fixture + def inventory(self, available=True): + def make_sample_data(available=available, + data_devices=10, + wal_devices=0, + db_devices=2, + human_readable_size_data='50.00 GB', + human_readable_size_wal='20.00 GB', + size=5368709121, + human_readable_size_db='20.00 GB'): + factory = InventoryFactory() + inventory_sample = [] + data_disks = factory.produce( + pieces=data_devices, + available=available, + size=size, + human_readable_size=human_readable_size_data) + wal_disks = factory.produce( + pieces=wal_devices, + human_readable_size=human_readable_size_wal, + rotational='0', + model='ssd_type_model', + size=size, + available=available) + db_disks = factory.produce( + pieces=db_devices, + human_readable_size=human_readable_size_db, + rotational='0', + size=size, + model='ssd_type_model', + available=available) + inventory_sample.extend(data_disks) + inventory_sample.extend(wal_disks) + inventory_sample.extend(db_disks) + + return Devices(devices=inventory_sample) + + return make_sample_data + + +class TestDriveSelection(object): + + testdata = [ + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True)), + _mk_inventory(_mk_device() * 5), + ['/dev/sda', '/dev/sdb', '/dev/sdc', '/dev/sdd', '/dev/sde'], [] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True, limit=3), + db_devices=DeviceSelection(all=True) + ), + _mk_inventory(_mk_device() * 5), + ['/dev/sda', '/dev/sdb', '/dev/sdc'], ['/dev/sdd', '/dev/sde'] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ), + _mk_inventory(_mk_device(rotational=False) + _mk_device(rotational=True)), + ['/dev/sdb'], ['/dev/sda'] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ), + _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)), + ['/dev/sda', '/dev/sdb'], ['/dev/sdc'] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ), + _mk_inventory(_mk_device(rotational=True)*2), + ['/dev/sda', '/dev/sdb'], [] + ), + ] + + @pytest.mark.parametrize("spec,inventory,expected_data,expected_db", testdata) + def test_disk_selection(self, spec, inventory, expected_data, expected_db): + sel = drive_selection.DriveSelection(spec, inventory) + assert [d.path for d in sel.data_devices()] == expected_data + assert [d.path for d in sel.db_devices()] == expected_db + + def test_disk_selection_raise(self): + spec = DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='wrong'), + ) + inventory = _mk_inventory(_mk_device(rotational=True)*2) + m = 'Failed to validate OSD spec "foobar.data_devices": No filters applied' + with pytest.raises(DriveGroupValidationError, match=m): + drive_selection.DriveSelection(spec, inventory)
\ No newline at end of file diff --git a/src/python-common/ceph/tests/test_drive_group.py b/src/python-common/ceph/tests/test_drive_group.py new file mode 100644 index 000000000..77e9b4083 --- /dev/null +++ b/src/python-common/ceph/tests/test_drive_group.py @@ -0,0 +1,592 @@ +# flake8: noqa +import re + +import pytest +import yaml + +from ceph.deployment import drive_selection, translate +from ceph.deployment.hostspec import HostSpec, SpecValidationError +from ceph.deployment.inventory import Device +from ceph.deployment.service_spec import PlacementSpec +from ceph.tests.utils import _mk_inventory, _mk_device +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \ + DriveGroupValidationError + +@pytest.mark.parametrize("test_input", +[ + ( # new style json + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +crush_device_class: ssd +data_devices: + paths: + - /dev/sda +""" + ), + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +data_devices: + paths: + - path: /dev/sda + crush_device_class: ssd""" + ), + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +spec: + osds_per_device: 2 +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd""" + ), +]) +def test_DriveGroup(test_input): + + dg = DriveGroupSpec.from_json(yaml.safe_load(test_input)) + assert dg.service_id == 'testing_drivegroup' + assert all([isinstance(x, Device) for x in dg.data_devices.paths]) + if isinstance(dg.data_devices.paths[0].path, str): + assert dg.data_devices.paths[0].path == '/dev/sda' + + + +@pytest.mark.parametrize("match,test_input", +[ + ( + re.escape('Service Spec is not an (JSON or YAML) object. got "None"'), + '' + ), + ( + 'Failed to validate OSD spec "<unnamed>": `placement` required', + """data_devices: + all: True +""" + ), + ( + 'Failed to validate OSD spec "mydg.data_devices": device selection cannot be empty', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +data_devices: + limit: 1 +""" + ), + ( + 'Failed to validate OSD spec "mydg": filter_logic must be either <AND> or <OR>', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +data_devices: + all: True +filter_logic: XOR +""" + ), + ( + 'Failed to validate OSD spec "mydg": `data_devices` element is required.', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +spec: + db_devices: + model: model +""" + ), + ( + 'Failed to validate OSD spec "mydg.db_devices": Filtering for `unknown_key` is not supported', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +spec: + db_devices: + unknown_key: 1 +""" + ), + ( + 'Failed to validate OSD spec "mydg": Feature `unknown_key` is not supported', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +spec: + db_devices: + all: true + unknown_key: 1 +""" + ), +]) +def test_DriveGroup_fail(match, test_input): + with pytest.raises(SpecValidationError, match=match): + osd_spec = DriveGroupSpec.from_json(yaml.safe_load(test_input)) + osd_spec.validate() + + +def test_drivegroup_pattern(): + dg = DriveGroupSpec( + PlacementSpec(host_pattern='node[1-3]'), + service_id='foobar', + data_devices=DeviceSelection(all=True)) + assert dg.placement.filter_matching_hostspecs([HostSpec('node{}'.format(i)) for i in range(10)]) == ['node1', 'node2', 'node3'] + + +def test_drive_selection(): + devs = DeviceSelection(paths=['/dev/sda']) + spec = DriveGroupSpec( + PlacementSpec('node_name'), + service_id='foobar', + data_devices=devs) + assert all([isinstance(x, Device) for x in spec.data_devices.paths]) + assert spec.data_devices.paths[0].path == '/dev/sda' + + with pytest.raises(DriveGroupValidationError, match='exclusive'): + ds = DeviceSelection(paths=['/dev/sda'], rotational=False) + ds.validate('') + + +def test_ceph_volume_command_0(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True) + ) + spec.validate() + inventory = _mk_inventory(_mk_device()*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_1(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_2(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='200GB:350GB', rotational=True), + db_devices=DeviceSelection(size='200GB:350GB', rotational=False), + wal_devices=DeviceSelection(size='10G') + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 + + _mk_device(rotational=False, size="300.00 GB")*2 + + _mk_device(size="10.0 GB", rotational=False)*2 + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf ' + '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_3(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='200GB:350GB', rotational=True), + db_devices=DeviceSelection(size='200GB:350GB', rotational=False), + wal_devices=DeviceSelection(size='10G'), + encrypted=True + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 + + _mk_device(rotational=False, size="300.00 GB")*2 + + _mk_device(size="10.0 GB", rotational=False)*2 + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd ' + '--wal-devices /dev/sde /dev/sdf --dmcrypt ' + '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_4(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='200GB:350GB', rotational=True), + db_devices=DeviceSelection(size='200GB:350GB', rotational=False), + wal_devices=DeviceSelection(size='10G'), + block_db_size='500M', + block_wal_size='500M', + osds_per_device=3, + encrypted=True + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 + + _mk_device(rotational=False, size="300.00 GB")*2 + + _mk_device(size="10.0 GB", rotational=False)*2 + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf ' + '--block-wal-size 500M --block-db-size 500M --dmcrypt ' + '--osds-per-device 3 --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_5(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + objectstore='filestore' + ) + with pytest.raises(DriveGroupValidationError): + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_6(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=False), + journal_devices=DeviceSelection(rotational=True), + journal_size='500M', + objectstore='filestore' + ) + with pytest.raises(DriveGroupValidationError): + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd ' + '--journal-size 500M --journal-devices /dev/sda /dev/sdb ' + '--filestore --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_7(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True), + osd_id_claims={'host1': ['0', '1']} + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, ['0', '1']).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_8(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True, model='INTEL SSDS'), + db_devices=DeviceSelection(model='INTEL SSDP'), + filter_logic='OR', + osd_id_claims={} + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size='1.82 TB', model='ST2000DM001-1ER1') + # data + _mk_device(rotational=False, size="223.0 GB", model='INTEL SSDSC2KG24') + # data + _mk_device(rotational=False, size="349.0 GB", model='INTEL SSDPED1K375GA') # wal/db + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_9(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True), + data_allocate_fraction=0.8 + ) + spec.validate() + inventory = _mk_inventory(_mk_device()*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +@pytest.mark.parametrize("test_input_base", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +crush_device_class: ssd +data_devices: + paths: + - /dev/sda +""" + ), + ]) +def test_ceph_volume_command_10(test_input_base): + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input_base)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert all(cmd == 'lvm batch --no-auto /dev/sda --crush-device-class ssd --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +@pytest.mark.parametrize("test_input1", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +crush_device_class: ssd +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd + - path: /dev/sdb + crush_device_class: hdd +""" + ), + ]) +def test_ceph_volume_command_11(test_input1): + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input1)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --crush-device-class hdd --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +@pytest.mark.parametrize("test_input2", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +crush_device_class: ssd +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd + - path: /dev/sdb +""" + ), + ]) +def test_ceph_volume_command_12(test_input2): + + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input2)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert (cmds[0] == 'lvm batch --no-auto /dev/sdb --crush-device-class ssd --yes --no-systemd') # noqa E501 + assert (cmds[1] == 'lvm batch --no-auto /dev/sda --crush-device-class hdd --yes --no-systemd') # noqa E501 + + +@pytest.mark.parametrize("test_input3", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd + - path: /dev/sdb +""" + ), + ]) +def test_ceph_volume_command_13(test_input3): + + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input3)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert (cmds[0] == 'lvm batch --no-auto /dev/sdb --yes --no-systemd') # noqa E501 + assert (cmds[1] == 'lvm batch --no-auto /dev/sda --crush-device-class hdd --yes --no-systemd') # noqa E501 + + +@pytest.mark.parametrize("test_input4", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +data_devices: + paths: + - crush_device_class: hdd +""" + ), + ]) +def test_ceph_volume_command_14(test_input4): + + with pytest.raises(DriveGroupValidationError, match='Device path'): + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input4)) + spec.validate() + + +def test_raw_ceph_volume_command_0(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False), + method='raw', + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True) + # data + _mk_device(rotational=True) + # data + _mk_device(rotational=False) + # db + _mk_device(rotational=False) # db + ) + exp_cmds = ['raw prepare --bluestore --data /dev/sda --block.db /dev/sdc', 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdd'] + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd in exp_cmds for cmd in cmds), f'Expected {exp_cmds} to match {cmds}' + +def test_raw_ceph_volume_command_1(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False), + method='raw', + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True) + # data + _mk_device(rotational=True) + # data + _mk_device(rotational=False) # db + ) + sel = drive_selection.DriveSelection(spec, inventory) + with pytest.raises(ValueError): + cmds = translate.to_ceph_volume(sel, []).run() + +@pytest.mark.parametrize("test_input5", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +method: raw +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd + - path: /dev/sdb + crush_device_class: hdd + - path: /dev/sdc + crush_device_class: hdd +db_devices: + paths: + - /dev/sdd + - /dev/sde + - /dev/sdf + +""" + ), + ]) +def test_raw_ceph_volume_command_2(test_input5): + + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input5)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --crush-device-class hdd' + assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sde --crush-device-class hdd' + assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sdf --crush-device-class hdd' + + +@pytest.mark.parametrize("test_input6", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +method: raw +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd + - path: /dev/sdb + crush_device_class: hdd + - path: /dev/sdc + crush_device_class: ssd +db_devices: + paths: + - /dev/sdd + - /dev/sde + - /dev/sdf + +""" + ), + ]) +def test_raw_ceph_volume_command_3(test_input6): + + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input6)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --crush-device-class hdd' + assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sde --crush-device-class hdd' + assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sdf --crush-device-class ssd' + + +@pytest.mark.parametrize("test_input7", +[ + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +method: raw +data_devices: + paths: + - path: /dev/sda + crush_device_class: hdd + - path: /dev/sdb + crush_device_class: nvme + - path: /dev/sdc + crush_device_class: ssd +db_devices: + paths: + - /dev/sdd + - /dev/sde + - /dev/sdf +wal_devices: + paths: + - /dev/sdg + - /dev/sdh + - /dev/sdi + +""" + ), + ]) +def test_raw_ceph_volume_command_4(test_input7): + + spec = DriveGroupSpec.from_json(yaml.safe_load(test_input7)) + spec.validate() + drive = drive_selection.DriveSelection(spec, spec.data_devices.paths) + cmds = translate.to_ceph_volume(drive, []).run() + + assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --block.wal /dev/sdg --crush-device-class hdd' + assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdf --block.wal /dev/sdi --crush-device-class nvme' + assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sde --block.wal /dev/sdh --crush-device-class ssd' diff --git a/src/python-common/ceph/tests/test_hostspec.py b/src/python-common/ceph/tests/test_hostspec.py new file mode 100644 index 000000000..b6817579e --- /dev/null +++ b/src/python-common/ceph/tests/test_hostspec.py @@ -0,0 +1,40 @@ +# flake8: noqa +import json +import yaml + +import pytest + +from ceph.deployment.hostspec import HostSpec, SpecValidationError + + +@pytest.mark.parametrize( + "test_input,expected", + [ + ({"hostname": "foo"}, HostSpec('foo')), + ({"hostname": "foo", "labels": "l1"}, HostSpec('foo', labels=['l1'])), + ({"hostname": "foo", "labels": ["l1", "l2"]}, HostSpec('foo', labels=['l1', 'l2'])), + ({"hostname": "foo", "location": {"rack": "foo"}}, HostSpec('foo', location={'rack': 'foo'})), + ] +) +def test_parse_host_specs(test_input, expected): + hs = HostSpec.from_json(test_input) + assert hs == expected + + +@pytest.mark.parametrize( + "bad_input", + [ + ({"hostname": "foo", "labels": 124}), + ({"hostname": "foo", "labels": {"a", "b"}}), + ({"hostname": "foo", "labels": {"a", "b"}}), + ({"hostname": "foo", "labels": ["a", 2]}), + ({"hostname": "foo", "location": "rack=bar"}), + ({"hostname": "foo", "location": ["a"]}), + ({"hostname": "foo", "location": {"rack", 1}}), + ({"hostname": "foo", "location": {1: "rack"}}), + ] +) +def test_parse_host_specs(bad_input): + with pytest.raises(SpecValidationError): + hs = HostSpec.from_json(bad_input) + diff --git a/src/python-common/ceph/tests/test_inventory.py b/src/python-common/ceph/tests/test_inventory.py new file mode 100644 index 000000000..2d916fad2 --- /dev/null +++ b/src/python-common/ceph/tests/test_inventory.py @@ -0,0 +1,71 @@ +import datetime +import json +import os +import pytest + +from ceph.deployment.inventory import Devices, Device +from ceph.utils import datetime_now + + +@pytest.mark.parametrize("filename", + [ + os.path.dirname(__file__) + '/c-v-inventory.json', + os.path.dirname(__file__) + '/../../../pybind/mgr/test_orchestrator/du' + 'mmy_data.json', + ]) +def test_from_json(filename): + with open(filename) as f: + data = json.load(f) + if 'inventory' in data: + data = data['inventory'] + ds = Devices.from_json(data) + assert len(ds.devices) == len(data) + assert Devices.from_json(ds.to_json()) == ds + + +class TestDevicesEquality(): + created_time1 = datetime_now() + created_time2 = created_time1 + datetime.timedelta(seconds=30) + + @pytest.mark.parametrize( + "old_devices, new_devices, expected_equal", + [ + ( # identical list should be equal + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + True, + ), + ( # differing only in created time should still be equal + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + Devices([Device('/dev/sdb', available=True, created=created_time2), + Device('/dev/sdc', available=True, created=created_time2)]), + True, + ), + ( # differing in some other field should make them not equal + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + Devices([Device('/dev/sdb', available=False, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + False, + ), + ( # different amount of devices should not pass equality + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1), + Device('/dev/sdd', available=True, created=created_time1)]), + False, + ), + ( # differing order should not affect equality + Devices([Device('/dev/sdb', available=True, created=created_time1), + Device('/dev/sdc', available=True, created=created_time1)]), + Devices([Device('/dev/sdc', available=True, created=created_time1), + Device('/dev/sdb', available=True, created=created_time1)]), + True, + ), + ]) + def test_equality(self, old_devices, new_devices, expected_equal): + assert (old_devices == new_devices) == expected_equal diff --git a/src/python-common/ceph/tests/test_service_spec.py b/src/python-common/ceph/tests/test_service_spec.py new file mode 100644 index 000000000..502057f5c --- /dev/null +++ b/src/python-common/ceph/tests/test_service_spec.py @@ -0,0 +1,1270 @@ +# flake8: noqa +import json +import re + +import yaml + +import pytest + +from ceph.deployment.service_spec import ( + AlertManagerSpec, + ArgumentSpec, + CustomContainerSpec, + GrafanaSpec, + HostPlacementSpec, + IscsiServiceSpec, + NFSServiceSpec, + PlacementSpec, + PrometheusSpec, + RGWSpec, + ServiceSpec, +) +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.hostspec import SpecValidationError + + +@pytest.mark.parametrize("test_input,expected, require_network", + [("myhost", ('myhost', '', ''), False), + ("myhost=sname", ('myhost', '', 'sname'), False), + ("myhost:10.1.1.10", ('myhost', '10.1.1.10', ''), True), + ("myhost:10.1.1.10=sname", ('myhost', '10.1.1.10', 'sname'), True), + ("myhost:10.1.1.0/32", ('myhost', '10.1.1.0/32', ''), True), + ("myhost:10.1.1.0/32=sname", ('myhost', '10.1.1.0/32', 'sname'), True), + ("myhost:[v1:10.1.1.10:6789]", ('myhost', '[v1:10.1.1.10:6789]', ''), True), + ("myhost:[v1:10.1.1.10:6789]=sname", ('myhost', '[v1:10.1.1.10:6789]', 'sname'), True), + ("myhost:[v1:10.1.1.10:6789,v2:10.1.1.11:3000]", ('myhost', '[v1:10.1.1.10:6789,v2:10.1.1.11:3000]', ''), True), + ("myhost:[v1:10.1.1.10:6789,v2:10.1.1.11:3000]=sname", ('myhost', '[v1:10.1.1.10:6789,v2:10.1.1.11:3000]', 'sname'), True), + ]) +def test_parse_host_placement_specs(test_input, expected, require_network): + ret = HostPlacementSpec.parse(test_input, require_network=require_network) + assert ret == expected + assert str(ret) == test_input + + ps = PlacementSpec.from_string(test_input) + assert ps.pretty_str() == test_input + assert ps == PlacementSpec.from_string(ps.pretty_str()) + + # Testing the old verbose way of generating json. Don't remove: + assert ret == HostPlacementSpec.from_json({ + 'hostname': ret.hostname, + 'network': ret.network, + 'name': ret.name + }) + + assert ret == HostPlacementSpec.from_json(ret.to_json()) + + +@pytest.mark.parametrize( + "spec, raise_exception, msg", + [ + (GrafanaSpec(protocol=''), True, '^Invalid protocol'), + (GrafanaSpec(protocol='invalid'), True, '^Invalid protocol'), + (GrafanaSpec(protocol='-http'), True, '^Invalid protocol'), + (GrafanaSpec(protocol='-https'), True, '^Invalid protocol'), + (GrafanaSpec(protocol='http'), False, ''), + (GrafanaSpec(protocol='https'), False, ''), + (GrafanaSpec(anonymous_access=False), True, '^Either initial'), # we require inital_admin_password if anonymous_access is False + (GrafanaSpec(anonymous_access=False, initial_admin_password='test'), False, ''), + ]) +def test_apply_grafana(spec: GrafanaSpec, raise_exception: bool, msg: str): + if raise_exception: + with pytest.raises(SpecValidationError, match=msg): + spec.validate() + else: + spec.validate() + +@pytest.mark.parametrize( + "spec, raise_exception, msg", + [ + # Valid retention_time values (valid units: 'y', 'w', 'd', 'h', 'm', 's') + (PrometheusSpec(retention_time='1y'), False, ''), + (PrometheusSpec(retention_time=' 10w '), False, ''), + (PrometheusSpec(retention_time=' 1348d'), False, ''), + (PrometheusSpec(retention_time='2000h '), False, ''), + (PrometheusSpec(retention_time='173847m'), False, ''), + (PrometheusSpec(retention_time='200s'), False, ''), + (PrometheusSpec(retention_time=' '), False, ''), # default value will be used + # Invalid retention_time values + (PrometheusSpec(retention_time='100k'), True, '^Invalid retention time'), # invalid unit + (PrometheusSpec(retention_time='10'), True, '^Invalid retention time'), # no unit + (PrometheusSpec(retention_time='100.00y'), True, '^Invalid retention time'), # invalid value and valid unit + (PrometheusSpec(retention_time='100.00k'), True, '^Invalid retention time'), # invalid value and invalid unit + (PrometheusSpec(retention_time='---'), True, '^Invalid retention time'), # invalid value + + # Valid retention_size values (valid units: 'B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB') + (PrometheusSpec(retention_size='123456789B'), False, ''), + (PrometheusSpec(retention_size=' 200KB'), False, ''), + (PrometheusSpec(retention_size='99999MB '), False, ''), + (PrometheusSpec(retention_size=' 10GB '), False, ''), + (PrometheusSpec(retention_size='100TB'), False, ''), + (PrometheusSpec(retention_size='500PB'), False, ''), + (PrometheusSpec(retention_size='200EB'), False, ''), + (PrometheusSpec(retention_size=' '), False, ''), # default value will be used + + # Invalid retention_size values + (PrometheusSpec(retention_size='100b'), True, '^Invalid retention size'), # invalid unit (case sensitive) + (PrometheusSpec(retention_size='333kb'), True, '^Invalid retention size'), # invalid unit (case sensitive) + (PrometheusSpec(retention_size='2000'), True, '^Invalid retention size'), # no unit + (PrometheusSpec(retention_size='200.00PB'), True, '^Invalid retention size'), # invalid value and valid unit + (PrometheusSpec(retention_size='400.B'), True, '^Invalid retention size'), # invalid value and invalid unit + (PrometheusSpec(retention_size='10.000s'), True, '^Invalid retention size'), # invalid value and invalid unit + (PrometheusSpec(retention_size='...'), True, '^Invalid retention size'), # invalid value + + # valid retention_size and valid retention_time + (PrometheusSpec(retention_time='1y', retention_size='100GB'), False, ''), + # invalid retention_time and valid retention_size + (PrometheusSpec(retention_time='1j', retention_size='100GB'), True, '^Invalid retention time'), + # valid retention_time and invalid retention_size + (PrometheusSpec(retention_time='1y', retention_size='100gb'), True, '^Invalid retention size'), + # valid retention_time and invalid retention_size + (PrometheusSpec(retention_time='1y', retention_size='100gb'), True, '^Invalid retention size'), + # invalid retention_time and invalid retention_size + (PrometheusSpec(retention_time='1i', retention_size='100gb'), True, '^Invalid retention time'), + ]) +def test_apply_prometheus(spec: PrometheusSpec, raise_exception: bool, msg: str): + if raise_exception: + with pytest.raises(SpecValidationError, match=msg): + spec.validate() + else: + spec.validate() + +@pytest.mark.parametrize( + "test_input,expected", + [ + ('', "PlacementSpec()"), + ("count:2", "PlacementSpec(count=2)"), + ("3", "PlacementSpec(count=3)"), + ("host1 host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ("host1;host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ("host1,host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ("host1 host2=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='b')])"), + ("host1=a host2=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name='a'), HostPlacementSpec(hostname='host2', network='', name='b')])"), + ("host1:1.2.3.4=a host2:1.2.3.5=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='1.2.3.4', name='a'), HostPlacementSpec(hostname='host2', network='1.2.3.5', name='b')])"), + ("myhost:[v1:10.1.1.10:6789]", "PlacementSpec(hosts=[HostPlacementSpec(hostname='myhost', network='[v1:10.1.1.10:6789]', name='')])"), + ('2 host1 host2', "PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ('label:foo', "PlacementSpec(label='foo')"), + ('3 label:foo', "PlacementSpec(count=3, label='foo')"), + ('*', "PlacementSpec(host_pattern='*')"), + ('3 data[1-3]', "PlacementSpec(count=3, host_pattern='data[1-3]')"), + ('3 data?', "PlacementSpec(count=3, host_pattern='data?')"), + ('3 data*', "PlacementSpec(count=3, host_pattern='data*')"), + ("count-per-host:4 label:foo", "PlacementSpec(count_per_host=4, label='foo')"), + ]) +def test_parse_placement_specs(test_input, expected): + ret = PlacementSpec.from_string(test_input) + assert str(ret) == expected + assert PlacementSpec.from_string(ret.pretty_str()) == ret, f'"{ret.pretty_str()}" != "{test_input}"' + +@pytest.mark.parametrize( + "test_input", + [ + ("host=a host*"), + ("host=a label:wrong"), + ("host? host*"), + ('host=a count-per-host:0'), + ('host=a count-per-host:-10'), + ('count:2 count-per-host:1'), + ('host1=a host2=b count-per-host:2'), + ('host1:10/8 count-per-host:2'), + ('count-per-host:2'), + ] +) +def test_parse_placement_specs_raises(test_input): + with pytest.raises(SpecValidationError): + PlacementSpec.from_string(test_input) + +@pytest.mark.parametrize("test_input", + # wrong subnet + [("myhost:1.1.1.1/24"), + # wrong ip format + ("myhost:1"), + ]) +def test_parse_host_placement_specs_raises_wrong_format(test_input): + with pytest.raises(ValueError): + HostPlacementSpec.parse(test_input) + + +@pytest.mark.parametrize( + "p,hosts,size", + [ + ( + PlacementSpec(count=3), + ['host1', 'host2', 'host3', 'host4', 'host5'], + 3 + ), + ( + PlacementSpec(host_pattern='*'), + ['host1', 'host2', 'host3', 'host4', 'host5'], + 5 + ), + ( + PlacementSpec(count_per_host=2, host_pattern='*'), + ['host1', 'host2', 'host3', 'host4', 'host5'], + 10 + ), + ( + PlacementSpec(host_pattern='foo*'), + ['foo1', 'foo2', 'bar1', 'bar2'], + 2 + ), + ( + PlacementSpec(count_per_host=2, host_pattern='foo*'), + ['foo1', 'foo2', 'bar1', 'bar2'], + 4 + ), + ]) +def test_placement_target_size(p, hosts, size): + assert p.get_target_count( + [HostPlacementSpec(n, '', '') for n in hosts] + ) == size + + +def _get_dict_spec(s_type, s_id): + dict_spec = { + "service_id": s_id, + "service_type": s_type, + "placement": + dict(hosts=["host1:1.1.1.1"]) + } + if s_type == 'nfs': + pass + elif s_type == 'iscsi': + dict_spec['pool'] = 'pool' + dict_spec['api_user'] = 'api_user' + dict_spec['api_password'] = 'api_password' + elif s_type == 'osd': + dict_spec['spec'] = { + 'data_devices': { + 'all': True + } + } + elif s_type == 'rgw': + dict_spec['rgw_realm'] = 'realm' + dict_spec['rgw_zone'] = 'zone' + + return dict_spec + + +@pytest.mark.parametrize( + "s_type,o_spec,s_id", + [ + ("mgr", ServiceSpec, 'test'), + ("mon", ServiceSpec, 'test'), + ("mds", ServiceSpec, 'test'), + ("rgw", RGWSpec, 'realm.zone'), + ("nfs", NFSServiceSpec, 'test'), + ("iscsi", IscsiServiceSpec, 'test'), + ("osd", DriveGroupSpec, 'test'), + ]) +def test_servicespec_map_test(s_type, o_spec, s_id): + spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id)) + assert isinstance(spec, o_spec) + assert isinstance(spec.placement, PlacementSpec) + assert isinstance(spec.placement.hosts[0], HostPlacementSpec) + assert spec.placement.hosts[0].hostname == 'host1' + assert spec.placement.hosts[0].network == '1.1.1.1' + assert spec.placement.hosts[0].name == '' + assert spec.validate() is None + ServiceSpec.from_json(spec.to_json()) + + +@pytest.mark.parametrize( + "realm, zone, frontend_type, raise_exception, msg", + [ + ('realm', 'zone1', 'beast', False, ''), + ('realm', 'zone2', 'civetweb', False, ''), + ('realm', None, 'beast', True, 'Cannot add RGW: Realm specified but no zone specified'), + (None, 'zone1', 'beast', True, 'Cannot add RGW: Zone specified but no realm specified'), + ('realm', 'zone', 'invalid-beast', True, '^Invalid rgw_frontend_type value'), + ('realm', 'zone', 'invalid-civetweb', True, '^Invalid rgw_frontend_type value'), + ]) +def test_rgw_servicespec_parse(realm, zone, frontend_type, raise_exception, msg): + spec = RGWSpec(service_id='foo', + rgw_realm=realm, + rgw_zone=zone, + rgw_frontend_type=frontend_type) + if raise_exception: + with pytest.raises(SpecValidationError, match=msg): + spec.validate() + else: + spec.validate() + +def test_osd_unmanaged(): + osd_spec = {"placement": {"host_pattern": "*"}, + "service_id": "all-available-devices", + "service_name": "osd.all-available-devices", + "service_type": "osd", + "spec": {"data_devices": {"all": True}, "filter_logic": "AND", "objectstore": "bluestore"}, + "unmanaged": True} + + dg_spec = ServiceSpec.from_json(osd_spec) + assert dg_spec.unmanaged == True + + +@pytest.mark.parametrize("y", +"""service_type: crash +service_name: crash +placement: + host_pattern: '*' +--- +service_type: crash +service_name: crash +placement: + host_pattern: '*' +unmanaged: true +--- +service_type: rgw +service_id: default-rgw-realm.eu-central-1.1 +service_name: rgw.default-rgw-realm.eu-central-1.1 +placement: + hosts: + - ceph-001 +networks: +- 10.0.0.0/8 +- 192.168.0.0/16 +spec: + rgw_frontend_type: civetweb + rgw_realm: default-rgw-realm + rgw_zone: eu-central-1 +--- +service_type: osd +service_id: osd_spec_default +service_name: osd.osd_spec_default +placement: + host_pattern: '*' +spec: + data_devices: + model: MC-55-44-XZ + db_devices: + model: SSD-123-foo + filter_logic: AND + objectstore: bluestore + wal_devices: + model: NVME-QQQQ-987 +--- +service_type: alertmanager +service_name: alertmanager +spec: + port: 1234 + user_data: + default_webhook_urls: + - foo +--- +service_type: grafana +service_name: grafana +spec: + anonymous_access: true + port: 1234 + protocol: https +--- +service_type: grafana +service_name: grafana +spec: + anonymous_access: true + initial_admin_password: secure + port: 1234 + protocol: https +--- +service_type: ingress +service_id: rgw.foo +service_name: ingress.rgw.foo +placement: + hosts: + - host1 + - host2 + - host3 +spec: + backend_service: rgw.foo + first_virtual_router_id: 50 + frontend_port: 8080 + monitor_port: 8081 + virtual_ip: 192.168.20.1/24 +--- +service_type: nfs +service_id: mynfs +service_name: nfs.mynfs +spec: + port: 1234 +--- +service_type: iscsi +service_id: iscsi +service_name: iscsi.iscsi +networks: +- ::0/8 +spec: + api_password: admin + api_port: 5000 + api_user: admin + pool: pool + trusted_ip_list: + - ::1 + - ::2 +--- +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + snmp_destination: 192.168.1.42:162 + snmp_version: V2c +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + auth_protocol: MD5 + credentials: + snmp_v3_auth_password: mypassword + snmp_v3_auth_username: myuser + engine_id: 8000C53F00000000 + port: 9464 + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_password: mypassword + snmp_v3_auth_username: myuser + snmp_v3_priv_password: mysecret + engine_id: 8000C53F00000000 + privacy_protocol: AES + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""".split('---\n')) +def test_yaml(y): + data = yaml.safe_load(y) + object = ServiceSpec.from_json(data) + + assert yaml.dump(object) == y + assert yaml.dump(ServiceSpec.from_json(object.to_json())) == y + + +def test_alertmanager_spec_1(): + spec = AlertManagerSpec() + assert spec.service_type == 'alertmanager' + assert isinstance(spec.user_data, dict) + assert len(spec.user_data.keys()) == 0 + assert spec.get_port_start() == [9093, 9094] + + +def test_alertmanager_spec_2(): + spec = AlertManagerSpec(user_data={'default_webhook_urls': ['foo']}) + assert isinstance(spec.user_data, dict) + assert 'default_webhook_urls' in spec.user_data.keys() + + + +def test_repr(): + val = """ServiceSpec.from_json(yaml.safe_load('''service_type: crash +service_name: crash +placement: + count: 42 +'''))""" + obj = eval(val) + assert obj.service_type == 'crash' + assert val == repr(obj) + +@pytest.mark.parametrize("spec1, spec2, eq", + [ + ( + ServiceSpec( + service_type='mon' + ), + ServiceSpec( + service_type='mon' + ), + True + ), + ( + ServiceSpec( + service_type='mon' + ), + ServiceSpec( + service_type='mon', + service_id='foo' + ), + True + ), + # Add service_type='mgr' + ( + ServiceSpec( + service_type='osd' + ), + ServiceSpec( + service_type='osd', + ), + True + ), + ( + ServiceSpec( + service_type='osd' + ), + DriveGroupSpec(), + True + ), + ( + ServiceSpec( + service_type='osd' + ), + ServiceSpec( + service_type='osd', + service_id='foo', + ), + False + ), + ( + ServiceSpec( + service_type='rgw', + service_id='foo', + ), + RGWSpec(service_id='foo'), + True + ), + ]) +def test_spec_hash_eq(spec1: ServiceSpec, + spec2: ServiceSpec, + eq: bool): + + assert (spec1 == spec2) is eq + +@pytest.mark.parametrize( + "s_type,s_id,s_name", + [ + ('mgr', 's_id', 'mgr'), + ('mon', 's_id', 'mon'), + ('mds', 's_id', 'mds.s_id'), + ('rgw', 's_id', 'rgw.s_id'), + ('nfs', 's_id', 'nfs.s_id'), + ('iscsi', 's_id', 'iscsi.s_id'), + ('osd', 's_id', 'osd.s_id'), + ]) +def test_service_name(s_type, s_id, s_name): + spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id)) + spec.validate() + assert spec.service_name() == s_name + +@pytest.mark.parametrize( + 's_type,s_id', + [ + ('mds', 's:id'), # MDS service_id cannot contain an invalid char ':' + ('mds', '1abc'), # MDS service_id cannot start with a numeric digit + ('mds', ''), # MDS service_id cannot be empty + ('rgw', '*s_id'), + ('nfs', 's/id'), + ('iscsi', 's@id'), + ('osd', 's;id'), + ]) + +def test_service_id_raises_invalid_char(s_type, s_id): + with pytest.raises(SpecValidationError): + spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id)) + spec.validate() + +def test_custom_container_spec(): + spec = CustomContainerSpec(service_id='hello-world', + image='docker.io/library/hello-world:latest', + entrypoint='/usr/bin/bash', + uid=1000, + gid=2000, + volume_mounts={'foo': '/foo'}, + args=['--foo'], + envs=['FOO=0815'], + bind_mounts=[ + [ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ] + ], + ports=[8080, 8443], + dirs=['foo', 'bar'], + files={ + 'foo.conf': 'foo\nbar', + 'bar.conf': ['foo', 'bar'] + }) + assert spec.service_type == 'container' + assert spec.entrypoint == '/usr/bin/bash' + assert spec.uid == 1000 + assert spec.gid == 2000 + assert spec.volume_mounts == {'foo': '/foo'} + assert spec.args == ['--foo'] + assert spec.envs == ['FOO=0815'] + assert spec.bind_mounts == [ + [ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ] + ] + assert spec.ports == [8080, 8443] + assert spec.dirs == ['foo', 'bar'] + assert spec.files == { + 'foo.conf': 'foo\nbar', + 'bar.conf': ['foo', 'bar'] + } + + +def test_custom_container_spec_config_json(): + spec = CustomContainerSpec(service_id='foo', image='foo', dirs=None) + config_json = spec.config_json() + for key in ['entrypoint', 'uid', 'gid', 'bind_mounts', 'dirs']: + assert key not in config_json + + +def test_ingress_spec(): + yaml_str = """service_type: ingress +service_id: rgw.foo +placement: + hosts: + - host1 + - host2 + - host3 +spec: + virtual_ip: 192.168.20.1/24 + backend_service: rgw.foo + frontend_port: 8080 + monitor_port: 8081 +""" + yaml_file = yaml.safe_load(yaml_str) + spec = ServiceSpec.from_json(yaml_file) + assert spec.service_type == "ingress" + assert spec.service_id == "rgw.foo" + assert spec.virtual_ip == "192.168.20.1/24" + assert spec.frontend_port == 8080 + assert spec.monitor_port == 8081 + + +@pytest.mark.parametrize("y, error_match", [ + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: "twelve" +""", "count-per-host must be a numeric value",), + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: "2" +""", "count-per-host must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: 7.36 +""", "count-per-host must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count: "fifteen" +""", "num/count must be a numeric value",), + (""" +service_type: rgw +service_id: foo +placement: + count: "4" +""", "num/count must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count: 7.36 +""", "num/count must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count: 0 +""", "num/count must be >= 1",), + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: 0 +""", "count-per-host must be >= 1",), + (""" +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_password: mypassword + snmp_v3_auth_username: myuser + snmp_v3_priv_password: mysecret + port: 9464 + engine_id: 8000c53f0000000000 + privacy_protocol: WEIRD + snmp_destination: 192.168.122.1:162 + auth_protocol: BIZARRE + snmp_version: V3 +""", "auth_protocol unsupported. Must be one of MD5, SHA"), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + snmp_destination: 192.168.1.42:162 + snmp_version: V4 +""", 'snmp_version unsupported. Must be one of V2c, V3'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + port: 9464 + snmp_destination: 192.168.1.42:162 +""", re.escape('Missing SNMP version (snmp_version)')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + port: 9464 + auth_protocol: wah + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'auth_protocol unsupported. Must be one of MD5, SHA'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: weewah + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'privacy_protocol unsupported. Must be one of DES, AES'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'Must provide an engine_id for SNMP V3 notifications'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + port: 9464 + snmp_destination: 192.168.1.42 + snmp_version: V2c +""", re.escape('SNMP destination (snmp_destination) type (IPv4) is invalid. Must be either: IPv4:Port, Name:Port')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: bogus + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + port: 9464 + auth_protocol: SHA + engine_id: 8000C53F0000000000 + snmp_version: V3 +""", re.escape('SNMP destination (snmp_destination) must be provided')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53F0000000000 + snmp_destination: my.imaginary.snmp-host + snmp_version: V3 +""", re.escape('SNMP destination (snmp_destination) is invalid: DNS lookup failed')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53F0000000000 + snmp_destination: 10.79.32.10:fred + snmp_version: V3 +""", re.escape('SNMP destination (snmp_destination) is invalid: Port must be numeric')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53 + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53DOH! + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53FCA7344403DC611EC9B985254002537A6C53FCA7344403DC6112537A60 + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53F00000 + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + ]) +def test_service_spec_validation_error(y, error_match): + data = yaml.safe_load(y) + with pytest.raises(SpecValidationError) as err: + specObj = ServiceSpec.from_json(data) + assert err.match(error_match) + + +@pytest.mark.parametrize("y, ec_args, ee_args, ec_final_args, ee_final_args", [ + pytest.param(""" +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo +""", + None, + None, + None, + None, + id="no_extra_args"), + pytest.param(""" +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + extra_entrypoint_args: + - "--lasers=blue" + - "--enable-confetti" + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo +""", + None, + ["--lasers=blue", "--enable-confetti"], + None, + ["--lasers=blue", "--enable-confetti"], + id="only_extra_entrypoint_args_spec"), + pytest.param(""" +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo +extra_entrypoint_args: +- "--lasers blue" +- "--enable-confetti" +""", + None, + ["--lasers blue", "--enable-confetti"], + None, + ["--lasers", "blue", "--enable-confetti"], + id="only_extra_entrypoint_args_toplevel"), + pytest.param(""" +service_type: nfs +service_id: mynfs +service_name: nfs.mynfs +spec: + port: 1234 + extra_entrypoint_args: + - "--lasers=blue" + - "--title=Custom NFS Options" + extra_container_args: + - "--cap-add=CAP_NET_BIND_SERVICE" + - "--oom-score-adj=12" +""", + ["--cap-add=CAP_NET_BIND_SERVICE", "--oom-score-adj=12"], + ["--lasers=blue", "--title=Custom NFS Options"], + ["--cap-add=CAP_NET_BIND_SERVICE", "--oom-score-adj=12"], + ["--lasers=blue", "--title=Custom", "NFS", "Options"], + id="both_kinds_nfs"), + pytest.param(""" +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo +extra_entrypoint_args: +- argument: "--lasers=blue" + split: true +- argument: "--enable-confetti" +""", + None, + [ + {"argument": "--lasers=blue", "split": True}, + {"argument": "--enable-confetti", "split": False}, + ], + None, + [ + "--lasers=blue", + "--enable-confetti", + ], + id="only_extra_entrypoint_args_obj_toplevel"), + pytest.param(""" +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo + extra_entrypoint_args: + - argument: "--lasers=blue" + split: true + - argument: "--enable-confetti" +""", + None, + [ + {"argument": "--lasers=blue", "split": True}, + {"argument": "--enable-confetti", "split": False}, + ], + None, + [ + "--lasers=blue", + "--enable-confetti", + ], + id="only_extra_entrypoint_args_obj_indented"), + pytest.param(""" +service_type: nfs +service_id: mynfs +service_name: nfs.mynfs +spec: + port: 1234 +extra_entrypoint_args: +- argument: "--lasers=blue" +- argument: "--title=Custom NFS Options" +extra_container_args: +- argument: "--cap-add=CAP_NET_BIND_SERVICE" +- argument: "--oom-score-adj=12" +""", + [ + {"argument": "--cap-add=CAP_NET_BIND_SERVICE", "split": False}, + {"argument": "--oom-score-adj=12", "split": False}, + ], + [ + {"argument": "--lasers=blue", "split": False}, + {"argument": "--title=Custom NFS Options", "split": False}, + ], + [ + "--cap-add=CAP_NET_BIND_SERVICE", + "--oom-score-adj=12", + ], + [ + "--lasers=blue", + "--title=Custom NFS Options", + ], + id="both_kinds_obj_nfs"), +]) +def test_extra_args_handling(y, ec_args, ee_args, ec_final_args, ee_final_args): + data = yaml.safe_load(y) + spec_obj = ServiceSpec.from_json(data) + + assert ArgumentSpec.map_json(spec_obj.extra_container_args) == ec_args + assert ArgumentSpec.map_json(spec_obj.extra_entrypoint_args) == ee_args + if ec_final_args is None: + assert spec_obj.extra_container_args is None + else: + ec_res = [] + for args in spec_obj.extra_container_args: + ec_res.extend(args.to_args()) + assert ec_res == ec_final_args + if ee_final_args is None: + assert spec_obj.extra_entrypoint_args is None + else: + ee_res = [] + for args in spec_obj.extra_entrypoint_args: + ee_res.extend(args.to_args()) + assert ee_res == ee_final_args diff --git a/src/python-common/ceph/tests/test_utils.py b/src/python-common/ceph/tests/test_utils.py new file mode 100644 index 000000000..8a94ac400 --- /dev/null +++ b/src/python-common/ceph/tests/test_utils.py @@ -0,0 +1,75 @@ +import pytest + +from ceph.deployment.utils import is_ipv6, unwrap_ipv6, wrap_ipv6, valid_addr +from typing import NamedTuple + + +def test_is_ipv6(): + for good in ("[::1]", "::1", + "fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"): + assert is_ipv6(good) + for bad in ("127.0.0.1", + "ffff:ffff:ffff:ffff:ffff:ffff:ffff:fffg", + "1:2:3:4:5:6:7:8:9", "fd00::1::1", "[fg::1]"): + assert not is_ipv6(bad) + + +def test_unwrap_ipv6(): + def unwrap_test(address, expected): + assert unwrap_ipv6(address) == expected + + tests = [ + ('::1', '::1'), ('[::1]', '::1'), + ('[fde4:8dba:82e1:0:5054:ff:fe6a:357]', 'fde4:8dba:82e1:0:5054:ff:fe6a:357'), + ('can actually be any string', 'can actually be any string'), + ('[but needs to be stripped] ', '[but needs to be stripped] ')] + for address, expected in tests: + unwrap_test(address, expected) + + +def test_wrap_ipv6(): + def wrap_test(address, expected): + assert wrap_ipv6(address) == expected + + tests = [ + ('::1', '[::1]'), ('[::1]', '[::1]'), + ('fde4:8dba:82e1:0:5054:ff:fe6a:357', '[fde4:8dba:82e1:0:5054:ff:fe6a:357]'), + ('myhost.example.com', 'myhost.example.com'), ('192.168.0.1', '192.168.0.1'), + ('', ''), ('fd00::1::1', 'fd00::1::1')] + for address, expected in tests: + wrap_test(address, expected) + + +class Address(NamedTuple): + addr: str + status: bool + description: str + + +@pytest.mark.parametrize('addr_object', [ + Address('www.ibm.com', True, 'Name'), + Address('www.google.com:162', True, 'Name:Port'), + Address('my.big.domain.name.for.big.people', False, 'DNS lookup failed'), + Address('192.168.122.1', True, 'IPv4'), + Address('[192.168.122.1]', False, 'IPv4 address wrapped in brackets is invalid'), + Address('10.40003.200', False, 'Invalid partial IPv4 address'), + Address('10.7.5', False, 'Invalid partial IPv4 address'), + Address('10.7', False, 'Invalid partial IPv4 address'), + Address('192.168.122.5:7090', True, 'IPv4:Port'), + Address('fe80::7561:c8fb:d3d7:1fa4', True, 'IPv6'), + Address('[fe80::7561:c8fb:d3d7:1fa4]:9464', True, 'IPv6:Port'), + Address('[fe80::7561:c8fb:d3d7:1fa4]', True, 'IPv6'), + Address('[fe80::7561:c8fb:d3d7:1fa4', False, + 'Address has incorrect/incomplete use of enclosing brackets'), + Address('fe80::7561:c8fb:d3d7:1fa4]', False, + 'Address has incorrect/incomplete use of enclosing brackets'), + Address('fred.knockinson.org', False, 'DNS lookup failed'), + Address('tumbleweed.pickles.gov.au', False, 'DNS lookup failed'), + Address('192.168.122.5:00PS', False, 'Port must be numeric'), + Address('[fe80::7561:c8fb:d3d7:1fa4]:DOH', False, 'Port must be numeric') +]) +def test_valid_addr(addr_object: Address): + + valid, description = valid_addr(addr_object.addr) + assert valid == addr_object.status + assert description == addr_object.description diff --git a/src/python-common/ceph/tests/utils.py b/src/python-common/ceph/tests/utils.py new file mode 100644 index 000000000..04b8a4e38 --- /dev/null +++ b/src/python-common/ceph/tests/utils.py @@ -0,0 +1,46 @@ +from ceph.deployment.inventory import Devices, Device + +try: + from typing import Any, List +except ImportError: + pass # for type checking + + +def _mk_device(rotational=True, + locked=False, + size="394.27 GB", + vendor='Vendor', + model='Model'): + return [Device( + path='??', + sys_api={ + "rotational": '1' if rotational else '0', + "vendor": vendor, + "human_readable_size": size, + "partitions": {}, + "locked": int(locked), + "sectorsize": "512", + "removable": "0", + "path": "??", + "support_discard": "", + "model": model, + "ro": "0", + "nr_requests": "128", + "size": 423347879936 # ignore coversion from human_readable_size + }, + available=not locked, + rejected_reasons=['locked'] if locked else [], + lvs=[], + device_id="Model-Vendor-foobar" + )] + + +def _mk_inventory(devices): + # type: (Any) -> List[Device] + devs = [] + for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))): + dev = Device.from_json(dev_.to_json()) + dev.path = '/dev/sd' + name + dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name) + devs.append(dev) + return Devices(devices=devs).devices diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py new file mode 100644 index 000000000..643be0658 --- /dev/null +++ b/src/python-common/ceph/utils.py @@ -0,0 +1,123 @@ +import datetime +import re +import string + +from typing import Optional + + +def datetime_now() -> datetime.datetime: + """ + Return the current local date and time. + :return: Returns an aware datetime object of the current date + and time. + """ + return datetime.datetime.now(tz=datetime.timezone.utc) + + +def datetime_to_str(dt: datetime.datetime) -> str: + """ + Convert a datetime object into a ISO 8601 string, e.g. + '2019-04-24T17:06:53.039991Z'. + :param dt: The datetime object to process. + :return: Return a string representing the date in + ISO 8601 (timezone=UTC). + """ + return dt.astimezone(tz=datetime.timezone.utc).strftime( + '%Y-%m-%dT%H:%M:%S.%fZ') + + +def str_to_datetime(string: str) -> datetime.datetime: + """ + Convert an ISO 8601 string into a datetime object. + The following formats are supported: + + - 2020-03-03T09:21:43.636153304Z + - 2020-03-03T15:52:30.136257504-0600 + - 2020-03-03T15:52:30.136257504 + + :param string: The string to parse. + :return: Returns an aware datetime object of the given date + and time string. + :raises: :exc:`~exceptions.ValueError` for an unknown + datetime string. + """ + fmts = [ + '%Y-%m-%dT%H:%M:%S.%f', + '%Y-%m-%dT%H:%M:%S.%f%z' + ] + + # In *all* cases, the 9 digit second precision is too much for + # Python's strptime. Shorten it to 6 digits. + p = re.compile(r'(\.[\d]{6})[\d]*') + string = p.sub(r'\1', string) + + # Replace trailing Z with -0000, since (on Python 3.6.8) it + # won't parse. + if string and string[-1] == 'Z': + string = string[:-1] + '-0000' + + for fmt in fmts: + try: + dt = datetime.datetime.strptime(string, fmt) + # Make sure the datetime object is aware (timezone is set). + # If not, then assume the time is in UTC. + if dt.tzinfo is None: + dt = dt.replace(tzinfo=datetime.timezone.utc) + return dt + except ValueError: + pass + + raise ValueError("Time data {} does not match one of the formats {}".format( + string, str(fmts))) + + +def parse_timedelta(delta: str) -> Optional[datetime.timedelta]: + """ + Returns a timedelta object represents a duration, the difference + between two dates or times. + + >>> parse_timedelta('foo') + + >>> parse_timedelta('2d') == datetime.timedelta(days=2) + True + + >>> parse_timedelta("4w") == datetime.timedelta(days=28) + True + + >>> parse_timedelta("5s") == datetime.timedelta(seconds=5) + True + + >>> parse_timedelta("-5s") == datetime.timedelta(days=-1, seconds=86395) + True + + :param delta: The string to process, e.g. '2h', '10d', '30s'. + :return: The `datetime.timedelta` object or `None` in case of + a parsing error. + """ + parts = re.match(r'(?P<seconds>-?\d+)s|' + r'(?P<minutes>-?\d+)m|' + r'(?P<hours>-?\d+)h|' + r'(?P<days>-?\d+)d|' + r'(?P<weeks>-?\d+)w$', + delta, + re.IGNORECASE) + if not parts: + return None + parts = parts.groupdict() + args = {name: int(param) for name, param in parts.items() if param} + return datetime.timedelta(**args) + + +def is_hex(s: str, strict: bool = True) -> bool: + """Simple check that a string contains only hex chars""" + try: + int(s, 16) + except ValueError: + return False + + # s is multiple chars, but we should catch a '+/-' prefix too. + if strict: + if s[0] not in string.hexdigits: + return False + + return True diff --git a/src/python-common/requirements-lint.txt b/src/python-common/requirements-lint.txt new file mode 100644 index 000000000..2a7142182 --- /dev/null +++ b/src/python-common/requirements-lint.txt @@ -0,0 +1,2 @@ +flake8==3.7.8 +rstcheck==3.3.1 diff --git a/src/python-common/requirements.txt b/src/python-common/requirements.txt new file mode 100644 index 000000000..88b47310d --- /dev/null +++ b/src/python-common/requirements.txt @@ -0,0 +1,8 @@ +pytest >=2.1.3,<5; python_version < '3.5' +mock; python_version < '3.3' +mypy; python_version >= '3' +pytest-mypy; python_version >= '3' +pytest >= 2.1.3; python_version >= '3' +pyyaml +typing-extensions; python_version < '3.8' +types-PyYAML diff --git a/src/python-common/setup.py b/src/python-common/setup.py new file mode 100644 index 000000000..43a46eb10 --- /dev/null +++ b/src/python-common/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages + + +with open("README.rst", "r") as fh: + long_description = fh.read() + + +setup( + name='ceph', + version='1.0.0', + packages=find_packages(), + author='', + author_email='dev@ceph.io', + description='Ceph common library', + long_description=long_description, + license='LGPLv2+', + keywords='ceph', + url="https://github.com/ceph/ceph", + zip_safe = False, + install_requires=( + 'pyyaml', + ), + classifiers = [ + 'Intended Audience :: Developer', + 'Operating System :: POSIX :: Linux', + 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + ] +) diff --git a/src/python-common/tox.ini b/src/python-common/tox.ini new file mode 100644 index 000000000..2737a87e8 --- /dev/null +++ b/src/python-common/tox.ini @@ -0,0 +1,35 @@ +[tox] +envlist = py3, mypy, lint +skip_missing_interpreters = true + +[testenv:py3] +deps= + -rrequirements.txt + -c{toxinidir}/../mypy-constrains.txt +commands= + pytest --doctest-modules ceph/deployment/service_spec.py ceph/utils.py + pytest {posargs} + mypy --config-file=../mypy.ini -p ceph + +[testenv:mypy] +deps= + -rrequirements.txt + -c{toxinidir}/../mypy-constrains.txt +commands= + mypy --config-file=../mypy.ini -p ceph + +[tool:pytest] +norecursedirs = .* _* virtualenv + +[flake8] +max-line-length = 100 +exclude = + __pycache__ + +[testenv:lint] +deps = + -rrequirements-lint.txt +commands = + flake8 {posargs:ceph} + rstcheck --report info --debug README.rst + |