diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/python-common | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/python-common')
33 files changed, 5353 insertions, 0 deletions
diff --git a/src/python-common/.gitignore b/src/python-common/.gitignore new file mode 100644 index 000000000..c2de8195a --- /dev/null +++ b/src/python-common/.gitignore @@ -0,0 +1,3 @@ +ceph.egg-info +build +setup.cfg diff --git a/src/python-common/CMakeLists.txt b/src/python-common/CMakeLists.txt new file mode 100644 index 000000000..e89bbe2fe --- /dev/null +++ b/src/python-common/CMakeLists.txt @@ -0,0 +1,7 @@ +include(Distutils) +distutils_install_module(ceph) + +if(WITH_TESTS) + include(AddCephTest) + add_tox_test(python-common TOX_ENVS py3 lint) +endif() diff --git a/src/python-common/README.rst b/src/python-common/README.rst new file mode 100644 index 000000000..3900ec4f9 --- /dev/null +++ b/src/python-common/README.rst @@ -0,0 +1,22 @@ +ceph-python-common +================== + +This library is meant to be used to keep common data structures and +functions usable throughout the Ceph project. + +Like for example: + +- All different Cython bindings. +- MGR modules. +- ``ceph`` command line interface and other Ceph tools. +- Also external tools. + +Usage +===== + +From within the Ceph git, just import it: + +.. code:: python + + from ceph.deployment_utils import DriveGroupSpec + from ceph.exceptions import OSError diff --git a/src/python-common/ceph/__init__.py b/src/python-common/ceph/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/python-common/ceph/__init__.py diff --git a/src/python-common/ceph/deployment/__init__.py b/src/python-common/ceph/deployment/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/python-common/ceph/deployment/__init__.py diff --git a/src/python-common/ceph/deployment/drive_group.py b/src/python-common/ceph/deployment/drive_group.py new file mode 100644 index 000000000..dc36a60d9 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_group.py @@ -0,0 +1,348 @@ +import enum +import yaml + +from ceph.deployment.inventory import Device +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec +from ceph.deployment.hostspec import SpecValidationError + +try: + from typing import Optional, List, Dict, Any, Union +except ImportError: + pass + + +class OSDMethod(str, enum.Enum): + raw = 'raw' + lvm = 'lvm' + + +class DeviceSelection(object): + """ + Used within :class:`ceph.deployment.drive_group.DriveGroupSpec` to specify the devices + used by the Drive Group. + + Any attributes (even none) can be included in the device + specification structure. + """ + + _supported_filters = [ + "paths", "size", "vendor", "model", "rotational", "limit", "all" + ] + + def __init__(self, + paths=None, # type: Optional[List[str]] + model=None, # type: Optional[str] + size=None, # type: Optional[str] + rotational=None, # type: Optional[bool] + limit=None, # type: Optional[int] + vendor=None, # type: Optional[str] + all=False, # type: bool + ): + """ + ephemeral drive group device specification + """ + #: List of Device objects for devices paths. + self.paths = [] if paths is None else [Device(path) for path in paths] # type: List[Device] + + #: A wildcard string. e.g: "SDD*" or "SanDisk SD8SN8U5" + self.model = model + + #: Match on the VENDOR property of the drive + self.vendor = vendor + + #: Size specification of format LOW:HIGH. + #: Can also take the the form :HIGH, LOW: + #: or an exact value (as ceph-volume inventory reports) + self.size: Optional[str] = size + + #: is the drive rotating or not + self.rotational = rotational + + #: Limit the number of devices added to this Drive Group. Devices + #: are used from top to bottom in the output of ``ceph-volume inventory`` + self.limit = limit + + #: Matches all devices. Can only be used for data devices + self.all = all + + def validate(self, name: str) -> None: + props = [self.model, self.vendor, self.size, self.rotational] # type: List[Any] + if self.paths and any(p is not None for p in props): + raise DriveGroupValidationError( + name, + 'device selection: `paths` and other parameters are mutually exclusive') + is_empty = not any(p is not None and p != [] for p in [self.paths] + props) + if not self.all and is_empty: + raise DriveGroupValidationError(name, 'device selection cannot be empty') + + if self.all and not is_empty: + raise DriveGroupValidationError( + name, + 'device selection: `all` and other parameters are mutually exclusive. {}'.format( + repr(self))) + + @classmethod + def from_json(cls, device_spec): + # type: (dict) -> Optional[DeviceSelection] + if not device_spec: + return None + for applied_filter in list(device_spec.keys()): + if applied_filter not in cls._supported_filters: + raise KeyError(applied_filter) + + return cls(**device_spec) + + def to_json(self): + # type: () -> Dict[str, Any] + ret: Dict[str, Any] = {} + if self.paths: + ret['paths'] = [p.path for p in self.paths] + if self.model: + ret['model'] = self.model + if self.vendor: + ret['vendor'] = self.vendor + if self.size: + ret['size'] = self.size + if self.rotational is not None: + ret['rotational'] = self.rotational + if self.limit: + ret['limit'] = self.limit + if self.all: + ret['all'] = self.all + + return ret + + def __repr__(self) -> str: + keys = [ + key for key in self._supported_filters + ['limit'] if getattr(self, key) is not None + ] + if 'paths' in keys and self.paths == []: + keys.remove('paths') + return "DeviceSelection({})".format( + ', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys) + ) + + def __eq__(self, other: Any) -> bool: + return repr(self) == repr(other) + + +class DriveGroupValidationError(SpecValidationError): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + + def __init__(self, name: Optional[str], msg: str): + name = name or "<unnamed>" + super(DriveGroupValidationError, self).__init__( + f'Failed to validate OSD spec "{name}": {msg}') + + +class DriveGroupSpec(ServiceSpec): + """ + Describe a drive group in the same form that ceph-volume + understands. + """ + + _supported_features = [ + "encrypted", "block_wal_size", "osds_per_device", + "db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type", + "data_devices", "db_devices", "wal_devices", "journal_devices", + "data_directories", "osds_per_device", "objectstore", "osd_id_claims", + "journal_size", "unmanaged", "filter_logic", "preview_only", "extra_container_args", + "data_allocate_fraction", "method", "crush_device_class", + ] + + def __init__(self, + placement=None, # type: Optional[PlacementSpec] + service_id=None, # type: Optional[str] + data_devices=None, # type: Optional[DeviceSelection] + db_devices=None, # type: Optional[DeviceSelection] + wal_devices=None, # type: Optional[DeviceSelection] + journal_devices=None, # type: Optional[DeviceSelection] + data_directories=None, # type: Optional[List[str]] + osds_per_device=None, # type: Optional[int] + objectstore='bluestore', # type: str + encrypted=False, # type: bool + db_slots=None, # type: Optional[int] + wal_slots=None, # type: Optional[int] + osd_id_claims=None, # type: Optional[Dict[str, List[str]]] + block_db_size=None, # type: Union[int, str, None] + block_wal_size=None, # type: Union[int, str, None] + journal_size=None, # type: Union[int, str, None] + service_type=None, # type: Optional[str] + unmanaged=False, # type: bool + filter_logic='AND', # type: str + preview_only=False, # type: bool + extra_container_args=None, # type: Optional[List[str]] + data_allocate_fraction=None, # type: Optional[float] + method=None, # type: Optional[OSDMethod] + crush_device_class=None, # type: Optional[str] + ): + assert service_type is None or service_type == 'osd' + super(DriveGroupSpec, self).__init__('osd', service_id=service_id, + placement=placement, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args) + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.data_devices = data_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.db_devices = db_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.wal_devices = wal_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.journal_devices = journal_devices + + #: Set (or override) the "bluestore_block_wal_size" value, in bytes + self.block_wal_size: Union[int, str, None] = block_wal_size + + #: Set (or override) the "bluestore_block_db_size" value, in bytes + self.block_db_size: Union[int, str, None] = block_db_size + + #: set journal_size in bytes + self.journal_size: Union[int, str, None] = journal_size + + #: Number of osd daemons per "DATA" device. + #: To fully utilize nvme devices multiple osds are required. + self.osds_per_device = osds_per_device + + #: A list of strings, containing paths which should back OSDs + self.data_directories = data_directories + + #: ``filestore`` or ``bluestore`` + self.objectstore = objectstore + + #: ``true`` or ``false`` + self.encrypted = encrypted + + #: How many OSDs per DB device + self.db_slots = db_slots + + #: How many OSDs per WAL device + self.wal_slots = wal_slots + + #: Optional: mapping of host -> List of osd_ids that should be replaced + #: See :ref:`orchestrator-osd-replace` + self.osd_id_claims = osd_id_claims or dict() + + #: The logic gate we use to match disks with filters. + #: defaults to 'AND' + self.filter_logic = filter_logic.upper() + + #: If this should be treated as a 'preview' spec + self.preview_only = preview_only + + #: Allocate a fraction of the data device (0,1.0] + self.data_allocate_fraction = data_allocate_fraction + + self.method = method + + #: Crush device class to assign to OSDs + self.crush_device_class = crush_device_class + + @classmethod + def _from_json_impl(cls, json_drive_group): + # type: (dict) -> DriveGroupSpec + """ + Initialize 'Drive group' structure + + :param json_drive_group: A valid json string with a Drive Group + specification + """ + args: Dict[str, Any] = json_drive_group.copy() + # legacy json (pre Octopus) + if 'host_pattern' in args and 'placement' not in args: + args['placement'] = {'host_pattern': args['host_pattern']} + del args['host_pattern'] + + s_id = args.get('service_id', '<unnamed>') + + # spec: was not mandatory in octopus + if 'spec' in args: + args['spec'].update(cls._drive_group_spec_from_json(s_id, args['spec'])) + else: + args.update(cls._drive_group_spec_from_json(s_id, args)) + + return super(DriveGroupSpec, cls)._from_json_impl(args) + + @classmethod + def _drive_group_spec_from_json(cls, name: str, json_drive_group: dict) -> dict: + for applied_filter in list(json_drive_group.keys()): + if applied_filter not in cls._supported_features: + raise DriveGroupValidationError( + name, + "Feature `{}` is not supported".format(applied_filter)) + + try: + def to_selection(key: str, vals: dict) -> Optional[DeviceSelection]: + try: + return DeviceSelection.from_json(vals) + except KeyError as e: + raise DriveGroupValidationError( + f'{name}.{key}', + f"Filtering for `{e.args[0]}` is not supported") + + args = {k: (to_selection(k, v) if k.endswith('_devices') else v) for k, v in + json_drive_group.items()} + if not args: + raise DriveGroupValidationError(name, "Didn't find drive selections") + return args + except (KeyError, TypeError) as e: + raise DriveGroupValidationError(name, str(e)) + + def validate(self): + # type: () -> None + super(DriveGroupSpec, self).validate() + + if self.placement.is_empty(): + raise DriveGroupValidationError(self.service_id, '`placement` required') + + if self.data_devices is None: + raise DriveGroupValidationError(self.service_id, "`data_devices` element is required.") + + specs_names = "data_devices db_devices wal_devices journal_devices".split() + specs = dict(zip(specs_names, [getattr(self, k) for k in specs_names])) + for k, s in [ks for ks in specs.items() if ks[1] is not None]: + assert s is not None + s.validate(f'{self.service_id}.{k}') + for s in filter(None, [self.db_devices, self.wal_devices, self.journal_devices]): + if s.all: + raise DriveGroupValidationError( + self.service_id, + "`all` is only allowed for data_devices") + + if self.objectstore not in ('bluestore'): + raise DriveGroupValidationError(self.service_id, + f"{self.objectstore} is not supported. Must be " + f"one of ('bluestore')") + + if self.block_wal_size is not None and type(self.block_wal_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'block_wal_size must be of type int or string') + if self.block_db_size is not None and type(self.block_db_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'block_db_size must be of type int or string') + if self.journal_size is not None and type(self.journal_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'journal_size must be of type int or string') + + if self.filter_logic not in ['AND', 'OR']: + raise DriveGroupValidationError( + self.service_id, + 'filter_logic must be either <AND> or <OR>') + + if self.method not in [None, 'lvm', 'raw']: + raise DriveGroupValidationError(self.service_id, 'method must be one of None, lvm, raw') + if self.method == 'raw' and self.objectstore == 'filestore': + raise DriveGroupValidationError(self.service_id, 'method raw only supports bluestore') + + +yaml.add_representer(DriveGroupSpec, DriveGroupSpec.yaml_representer) diff --git a/src/python-common/ceph/deployment/drive_selection/__init__.py b/src/python-common/ceph/deployment/drive_selection/__init__.py new file mode 100644 index 000000000..994e2f2da --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/__init__.py @@ -0,0 +1,2 @@ +from .selector import DriveSelection # NOQA +from .matchers import Matcher, SubstringMatcher, EqualityMatcher, AllMatcher, SizeMatcher # NOQA diff --git a/src/python-common/ceph/deployment/drive_selection/example.yaml b/src/python-common/ceph/deployment/drive_selection/example.yaml new file mode 100644 index 000000000..2851e7dbb --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/example.yaml @@ -0,0 +1,21 @@ +# default: +# target: 'data*' +# data_devices: +# size: 20G +# db_devices: +# size: 10G +# rotational: 1 +# allflash: +# target: 'fast_nodes*' +# data_devices: +# size: 100G +# db_devices: +# size: 50G +# rotational: 0 + +# This is the default configuration and +# will create an OSD on all available drives +default: + target: 'fnmatch_target' + data_devices: + all: true diff --git a/src/python-common/ceph/deployment/drive_selection/filter.py b/src/python-common/ceph/deployment/drive_selection/filter.py new file mode 100644 index 000000000..36cacfaa5 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/filter.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import logging + +from ceph.deployment.drive_group import DeviceSelection + +try: + from typing import Generator +except ImportError: + pass + +from .matchers import Matcher, SubstringMatcher, AllMatcher, SizeMatcher, EqualityMatcher + +logger = logging.getLogger(__name__) + + +class FilterGenerator(object): + def __init__(self, device_filter): + # type: (DeviceSelection) -> None + self.device_filter = device_filter + + def __iter__(self): + # type: () -> Generator[Matcher, None, None] + if self.device_filter.size: + yield SizeMatcher('size', self.device_filter.size) + if self.device_filter.model: + yield SubstringMatcher('model', self.device_filter.model) + if self.device_filter.vendor: + yield SubstringMatcher('vendor', self.device_filter.vendor) + if self.device_filter.rotational is not None: + val = '1' if self.device_filter.rotational else '0' + yield EqualityMatcher('rotational', val) + if self.device_filter.all: + yield AllMatcher('all', str(self.device_filter.all)) diff --git a/src/python-common/ceph/deployment/drive_selection/matchers.py b/src/python-common/ceph/deployment/drive_selection/matchers.py new file mode 100644 index 000000000..f423c2f43 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/matchers.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 -*- + +from typing import Tuple, Optional, Any, Union, Iterator + +from ceph.deployment.inventory import Device + +import re +import logging + +logger = logging.getLogger(__name__) + + +class _MatchInvalid(Exception): + pass + + +# pylint: disable=too-few-public-methods +class Matcher(object): + """ The base class to all Matchers + + It holds utility methods such as _get_disk_key + and handles the initialization. + + """ + + def __init__(self, key, value): + # type: (str, Any) -> None + """ Initialization of Base class + + :param str key: Attribute like 'model, size or vendor' + :param str value: Value of attribute like 'X123, 5G or samsung' + """ + self.key = key + self.value = value + self.fallback_key = '' # type: Optional[str] + + def _get_disk_key(self, device): + # type: (Device) -> Any + """ Helper method to safely extract values form the disk dict + + There is a 'key' and a _optional_ 'fallback' key that can be used. + The reason for this is that the output of ceph-volume is not always + consistent (due to a bug currently, but you never know). + There is also a safety measure for a disk_key not existing on + virtual environments. ceph-volume apparently sources its information + from udev which seems to not populate certain fields on VMs. + + :raises: A generic Exception when no disk_key could be found. + :return: A disk value + :rtype: str + """ + # using the . notation, but some keys are nested, and hidden behind + # a different hierarchy, which makes it harder to access programatically + # hence, make it a dict. + disk = device.to_json() + + def findkeys(node: Union[list, dict], key_val: str) -> Iterator[str]: + """ Find keys in non-flat dict recursively """ + if isinstance(node, list): + for i in node: + for key in findkeys(i, key_val): + yield key + elif isinstance(node, dict): + if key_val in node: + yield node[key_val] + for j in node.values(): + for key in findkeys(j, key_val): + yield key + + disk_value = list(findkeys(disk, self.key)) + if not disk_value and self.fallback_key: + disk_value = list(findkeys(disk, self.fallback_key)) + + if disk_value: + return disk_value[0] + else: + raise _MatchInvalid("No value found for {} or {}".format( + self.key, self.fallback_key)) + + def compare(self, disk): + # type: (Device) -> bool + """ Implements a valid comparison method for a SubMatcher + This will get overwritten by the individual classes + + :param dict disk: A disk representation + """ + raise NotImplementedError + + +# pylint: disable=too-few-public-methods +class SubstringMatcher(Matcher): + """ Substring matcher subclass + """ + + def __init__(self, key, value, fallback_key=None): + # type: (str, str, Optional[str]) -> None + Matcher.__init__(self, key, value) + self.fallback_key = fallback_key + + def compare(self, disk): + # type: (Device) -> bool + """ Overwritten method to match substrings + + This matcher does substring matching + :param dict disk: A disk representation (see base for examples) + :return: True/False if the match succeeded + :rtype: bool + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + if str(self.value) in disk_value: + return True + return False + + +# pylint: disable=too-few-public-methods +class AllMatcher(Matcher): + """ All matcher subclass + """ + + def __init__(self, key, value, fallback_key=None): + # type: (str, Any, Optional[str]) -> None + + Matcher.__init__(self, key, value) + self.fallback_key = fallback_key + + def compare(self, disk): + # type: (Device) -> bool + + """ Overwritten method to match all + + A rather dumb matcher that just accepts all disks + (regardless of the value) + :param dict disk: A disk representation (see base for examples) + :return: always True + :rtype: bool + """ + if not disk: + return False + return True + + +# pylint: disable=too-few-public-methods +class EqualityMatcher(Matcher): + """ Equality matcher subclass + """ + + def __init__(self, key, value): + # type: (str, Any) -> None + + Matcher.__init__(self, key, value) + + def compare(self, disk): + # type: (Device) -> bool + + """ Overwritten method to match equality + + This matcher does value comparison + :param dict disk: A disk representation + :return: True/False if the match succeeded + :rtype: bool + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + ret = disk_value == self.value + if not ret: + logger.debug('{} != {}'.format(disk_value, self.value)) + return ret + + +class SizeMatcher(Matcher): + """ Size matcher subclass + """ + + SUFFIXES = ( + ["KB", "MB", "GB", "TB"], + ["K", "M", "G", "T"], + [1e+3, 1e+6, 1e+9, 1e+12] + ) + + supported_suffixes = SUFFIXES[0] + SUFFIXES[1] + + # pylint: disable=too-many-instance-attributes + def __init__(self, key, value): + # type: (str, str) -> None + + # The 'key' value is overwritten here because + # the user_defined attribute does not neccessarily + # correspond to the desired attribute + # requested from the inventory output + Matcher.__init__(self, key, value) + self.key = "human_readable_size" + self.fallback_key = "size" + self._high = None # type: Optional[str] + self._high_suffix = None # type: Optional[str] + self._low = None # type: Optional[str] + self._low_suffix = None # type: Optional[str] + self._exact = None # type: Optional[str] + self._exact_suffix = None # type: Optional[str] + self._parse_filter() + + @property + def low(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'low' matchers + """ + return self._low, self._low_suffix + + @low.setter + def low(self, low): + # type: (Tuple[str, str]) -> None + """ Setter for 'low' matchers + """ + self._low, self._low_suffix = low + + @property + def high(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'high' matchers + """ + return self._high, self._high_suffix + + @high.setter + def high(self, high): + # type: (Tuple[str, str]) -> None + """ Setter for 'high' matchers + """ + self._high, self._high_suffix = high + + @property + def exact(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'exact' matchers + """ + return self._exact, self._exact_suffix + + @exact.setter + def exact(self, exact): + # type: (Tuple[str, str]) -> None + """ Setter for 'exact' matchers + """ + self._exact, self._exact_suffix = exact + + @classmethod + def _normalize_suffix(cls, suffix): + # type: (str) -> str + """ Normalize any supported suffix + + Since the Drive Groups are user facing, we simply + can't make sure that all users type in the requested + form. That's why we have to internally agree on one format. + It also checks if any of the supported suffixes was used + and raises an Exception otherwise. + + :param str suffix: A suffix ('G') or ('M') + :return: A normalized output + :rtype: str + """ + suffix = suffix.upper() + if suffix not in cls.supported_suffixes: + raise _MatchInvalid("Unit '{}' not supported".format(suffix)) + return dict(zip( + cls.SUFFIXES[1], + cls.SUFFIXES[0], + )).get(suffix, suffix) + + @classmethod + def _parse_suffix(cls, obj): + # type: (str) -> str + """ Wrapper method to find and normalize a prefix + + :param str obj: A size filtering string ('10G') + :return: A normalized unit ('GB') + :rtype: str + """ + return cls._normalize_suffix(re.findall(r"[a-zA-Z]+", obj)[0]) + + @classmethod + def _get_k_v(cls, data): + # type: (str) -> Tuple[str, str] + """ Helper method to extract data from a string + + It uses regex to extract all digits and calls _parse_suffix + which also uses a regex to extract all letters and normalizes + the resulting suffix. + + :param str data: A size filtering string ('10G') + :return: A Tuple with normalized output (10, 'GB') + :rtype: tuple + """ + return re.findall(r"\d+\.?\d*", data)[0], cls._parse_suffix(data) + + def _parse_filter(self) -> None: + """ Identifies which type of 'size' filter is applied + + There are four different filtering modes: + + 1) 10G:50G (high-low) + At least 10G but at max 50G of size + + 2) :60G + At max 60G of size + + 3) 50G: + At least 50G of size + + 4) 20G + Exactly 20G in size + + This method uses regex to identify and extract this information + and raises if none could be found. + """ + low_high = re.match(r"\d+[A-Z]{1,2}:\d+[A-Z]{1,2}", self.value) + if low_high: + low, high = low_high.group().split(":") + self.low = self._get_k_v(low) + self.high = self._get_k_v(high) + + low = re.match(r"\d+[A-Z]{1,2}:$", self.value) + if low: + self.low = self._get_k_v(low.group()) + + high = re.match(r"^:\d+[A-Z]{1,2}", self.value) + if high: + self.high = self._get_k_v(high.group()) + + exact = re.match(r"^\d+\.?\d*[A-Z]{1,2}$", self.value) + if exact: + self.exact = self._get_k_v(exact.group()) + + if not self.low and not self.high and not self.exact: + raise _MatchInvalid("Couldn't parse {}".format(self.value)) + + @staticmethod + # pylint: disable=inconsistent-return-statements + def to_byte(tpl): + # type: (Tuple[Optional[str], Optional[str]]) -> float + + """ Convert any supported unit to bytes + + :param tuple tpl: A tuple with ('10', 'GB') + :return: The converted byte value + :rtype: float + """ + val_str, suffix = tpl + value = float(val_str) if val_str is not None else 0.0 + return dict(zip( + SizeMatcher.SUFFIXES[0], + SizeMatcher.SUFFIXES[2], + )).get(str(suffix), 0.00) * value + + @staticmethod + def str_to_byte(input): + # type: (str) -> float + return SizeMatcher.to_byte(SizeMatcher._get_k_v(input)) + + # pylint: disable=inconsistent-return-statements, too-many-return-statements + def compare(self, disk): + # type: (Device) -> bool + """ Convert MB/GB/TB down to bytes and compare + + 1) Extracts information from the to-be-inspected disk. + 2) Depending on the mode, apply checks and return + + # This doesn't seem very solid and _may_ + be re-factored + + + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + # This doesn't neccessarily have to be a float. + # The current output from ceph-volume gives a float.. + # This may change in the future.. + # todo: harden this paragraph + if not disk_value: + logger.warning("Could not retrieve value for disk") + return False + + disk_size = re.findall(r"\d+\.\d+", disk_value)[0] + disk_suffix = self._parse_suffix(disk_value) + disk_size_in_byte = self.to_byte((disk_size, disk_suffix)) + + if all(self.high) and all(self.low): + if disk_size_in_byte <= self.to_byte( + self.high) and disk_size_in_byte >= self.to_byte(self.low): + return True + # is a else: return False neccessary here? + # (and in all other branches) + logger.debug("Disk didn't match for 'high/low' filter") + + elif all(self.low) and not all(self.high): + if disk_size_in_byte >= self.to_byte(self.low): + return True + logger.debug("Disk didn't match for 'low' filter") + + elif all(self.high) and not all(self.low): + if disk_size_in_byte <= self.to_byte(self.high): + return True + logger.debug("Disk didn't match for 'high' filter") + + elif all(self.exact): + if disk_size_in_byte == self.to_byte(self.exact): + return True + logger.debug("Disk didn't match for 'exact' filter") + else: + logger.debug("Neither high, low, nor exact was given") + raise _MatchInvalid("No filters applied") + return False diff --git a/src/python-common/ceph/deployment/drive_selection/selector.py b/src/python-common/ceph/deployment/drive_selection/selector.py new file mode 100644 index 000000000..07b0549f3 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/selector.py @@ -0,0 +1,179 @@ +import logging + +from typing import List, Optional, Dict, Callable + +from ..inventory import Device +from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError + +from .filter import FilterGenerator +from .matchers import _MatchInvalid + +logger = logging.getLogger(__name__) + + +def to_dg_exception(f: Callable) -> Callable[['DriveSelection', str, + Optional['DeviceSelection']], + List['Device']]: + def wrapper(self: 'DriveSelection', name: str, ds: Optional['DeviceSelection']) -> List[Device]: + try: + return f(self, ds) + except _MatchInvalid as e: + raise DriveGroupValidationError(f'{self.spec.service_id}.{name}', e.args[0]) + return wrapper + + +class DriveSelection(object): + def __init__(self, + spec, # type: DriveGroupSpec + disks, # type: List[Device] + existing_daemons=None, # type: Optional[int] + ): + self.disks = disks.copy() + self.spec = spec + self.existing_daemons = existing_daemons or 0 + + self._data = self.assign_devices('data_devices', self.spec.data_devices) + self._wal = self.assign_devices('wal_devices', self.spec.wal_devices) + self._db = self.assign_devices('db_devices', self.spec.db_devices) + self._journal = self.assign_devices('journal_devices', self.spec.journal_devices) + + def data_devices(self): + # type: () -> List[Device] + return self._data + + def wal_devices(self): + # type: () -> List[Device] + return self._wal + + def db_devices(self): + # type: () -> List[Device] + return self._db + + def journal_devices(self): + # type: () -> List[Device] + return self._journal + + def _limit_reached(self, device_filter, len_devices, + disk_path): + # type: (DeviceSelection, int, str) -> bool + """ Check for the <limit> property and apply logic + + If a limit is set in 'device_attrs' we have to stop adding + disks at some point. + + If limit is set (>0) and len(devices) >= limit + + :param int len_devices: Length of the already populated device set/list + :param str disk_path: The disk identifier (for logging purposes) + :return: True/False if the device should be added to the list of devices + :rtype: bool + """ + limit = device_filter.limit or 0 + + if limit > 0 and (len_devices + self.existing_daemons >= limit): + logger.info("Refuse to add {} due to limit policy of <{}>".format( + disk_path, limit)) + return True + return False + + @staticmethod + def _has_mandatory_idents(disk): + # type: (Device) -> bool + """ Check for mandatory identification fields + """ + if disk.path: + logger.debug("Found matching disk: {}".format(disk.path)) + return True + else: + raise Exception( + "Disk {} doesn't have a 'path' identifier".format(disk)) + + @to_dg_exception + def assign_devices(self, device_filter): + # type: (Optional[DeviceSelection]) -> List[Device] + """ Assign drives based on used filters + + Do not add disks when: + + 1) Filter didn't match + 2) Disk doesn't have a mandatory identification item (path) + 3) The set :limit was reached + + After the disk was added we make sure not to re-assign this disk + for another defined type[wal/db/journal devices] + + return a sorted(by path) list of devices + """ + + if not device_filter: + logger.debug('device_filter is None') + return [] + + if not self.spec.data_devices: + logger.debug('data_devices is None') + return [] + + if device_filter.paths: + logger.debug('device filter is using explicit paths') + return device_filter.paths + + devices = list() # type: List[Device] + for disk in self.disks: + logger.debug("Processing disk {}".format(disk.path)) + + if not disk.available and not disk.ceph_device: + logger.debug( + ("Ignoring disk {}. " + "Disk is unavailable due to {}".format(disk.path, disk.rejected_reasons)) + ) + continue + + if not self._has_mandatory_idents(disk): + logger.debug( + "Ignoring disk {}. Missing mandatory idents".format( + disk.path)) + continue + + # break on this condition. + if self._limit_reached(device_filter, len(devices), disk.path): + logger.debug("Ignoring disk {}. Limit reached".format( + disk.path)) + break + + if disk in devices: + continue + + if self.spec.filter_logic == 'AND': + if not all(m.compare(disk) for m in FilterGenerator(device_filter)): + logger.debug( + "Ignoring disk {}. Not all filter did match the disk".format( + disk.path)) + continue + + if self.spec.filter_logic == 'OR': + if not any(m.compare(disk) for m in FilterGenerator(device_filter)): + logger.debug( + "Ignoring disk {}. No filter matched the disk".format( + disk.path)) + continue + + logger.debug('Adding disk {}'.format(disk.path)) + devices.append(disk) + + # This disk is already taken and must not be re-assigned. + for taken_device in devices: + if taken_device in self.disks: + self.disks.remove(taken_device) + + return sorted([x for x in devices], key=lambda dev: dev.path) + + def __repr__(self) -> str: + selection: Dict[str, List[str]] = { + 'data devices': [d.path for d in self._data], + 'wal_devices': [d.path for d in self._wal], + 'db devices': [d.path for d in self._db], + 'journal devices': [d.path for d in self._journal] + } + return "DeviceSelection({})".format( + ', '.join('{}={}'.format(key, selection[key]) for key in selection.keys()) + ) diff --git a/src/python-common/ceph/deployment/hostspec.py b/src/python-common/ceph/deployment/hostspec.py new file mode 100644 index 000000000..1bf686f97 --- /dev/null +++ b/src/python-common/ceph/deployment/hostspec.py @@ -0,0 +1,135 @@ +from collections import OrderedDict +import errno +import re +from typing import Optional, List, Any, Dict + + +def assert_valid_host(name: str) -> None: + p = re.compile('^[a-zA-Z0-9-]+$') + try: + assert len(name) <= 250, 'name is too long (max 250 chars)' + for part in name.split('.'): + assert len(part) > 0, '.-delimited name component must not be empty' + assert len(part) <= 63, '.-delimited name component must not be more than 63 chars' + assert p.match(part), 'name component must include only a-z, 0-9, and -' + except AssertionError as e: + raise SpecValidationError(str(e)) + + +class SpecValidationError(Exception): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + def __init__(self, + msg: str, + errno: int = -errno.EINVAL): + super(SpecValidationError, self).__init__(msg) + self.errno = errno + + +class HostSpec(object): + """ + Information about hosts. Like e.g. ``kubectl get nodes`` + """ + def __init__(self, + hostname: str, + addr: Optional[str] = None, + labels: Optional[List[str]] = None, + status: Optional[str] = None, + location: Optional[Dict[str, str]] = None, + ): + self.service_type = 'host' + + #: the bare hostname on the host. Not the FQDN. + self.hostname = hostname # type: str + + #: DNS name or IP address to reach it + self.addr = addr or hostname # type: str + + #: label(s), if any + self.labels = labels or [] # type: List[str] + + #: human readable status + self.status = status or '' # type: str + + self.location = location + + def validate(self) -> None: + assert_valid_host(self.hostname) + + def to_json(self) -> Dict[str, Any]: + r: Dict[str, Any] = { + 'hostname': self.hostname, + 'addr': self.addr, + 'labels': list(OrderedDict.fromkeys((self.labels))), + 'status': self.status, + } + if self.location: + r['location'] = self.location + return r + + @classmethod + def from_json(cls, host_spec: dict) -> 'HostSpec': + host_spec = cls.normalize_json(host_spec) + _cls = cls( + host_spec['hostname'], + host_spec['addr'] if 'addr' in host_spec else None, + list(OrderedDict.fromkeys( + host_spec['labels'])) if 'labels' in host_spec else None, + host_spec['status'] if 'status' in host_spec else None, + host_spec.get('location'), + ) + return _cls + + @staticmethod + def normalize_json(host_spec: dict) -> dict: + labels = host_spec.get('labels') + if labels is not None: + if isinstance(labels, str): + host_spec['labels'] = [labels] + elif ( + not isinstance(labels, list) + or any(not isinstance(v, str) for v in labels) + ): + raise SpecValidationError( + f'Labels ({labels}) must be a string or list of strings' + ) + + loc = host_spec.get('location') + if loc is not None: + if ( + not isinstance(loc, dict) + or any(not isinstance(k, str) for k in loc.keys()) + or any(not isinstance(v, str) for v in loc.values()) + ): + raise SpecValidationError( + f'Location ({loc}) must be a dictionary of strings to strings' + ) + + return host_spec + + def __repr__(self) -> str: + args = [self.hostname] # type: List[Any] + if self.addr is not None: + args.append(self.addr) + if self.labels: + args.append(self.labels) + if self.status: + args.append(self.status) + if self.location: + args.append(self.location) + + return "HostSpec({})".format(', '.join(map(repr, args))) + + def __str__(self) -> str: + if self.hostname != self.addr: + return f'{self.hostname} ({self.addr})' + return self.hostname + + def __eq__(self, other: Any) -> bool: + # Let's omit `status` for the moment, as it is still the very same host. + return self.hostname == other.hostname and \ + self.addr == other.addr and \ + sorted(self.labels) == sorted(other.labels) and \ + self.location == other.location diff --git a/src/python-common/ceph/deployment/inventory.py b/src/python-common/ceph/deployment/inventory.py new file mode 100644 index 000000000..570c1dbb3 --- /dev/null +++ b/src/python-common/ceph/deployment/inventory.py @@ -0,0 +1,132 @@ +try: + from typing import List, Optional, Dict, Any, Union +except ImportError: + pass # for type checking + +from ceph.utils import datetime_now, datetime_to_str, str_to_datetime +import datetime +import json + + +class Devices(object): + """ + A container for Device instances with reporting + """ + + def __init__(self, devices): + # type: (List[Device]) -> None + self.devices = devices # type: List[Device] + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Devices): + return NotImplemented + if len(self.devices) != len(other.devices): + return False + for d1, d2 in zip(other.devices, self.devices): + if d1 != d2: + return False + return True + + def to_json(self): + # type: () -> List[dict] + return [d.to_json() for d in self.devices] + + @classmethod + def from_json(cls, input): + # type: (List[Dict[str, Any]]) -> Devices + return cls([Device.from_json(i) for i in input]) + + def copy(self): + # type: () -> Devices + return Devices(devices=list(self.devices)) + + +class Device(object): + report_fields = [ + 'ceph_device', + 'rejected_reasons', + 'available', + 'path', + 'sys_api', + 'created', + 'lvs', + 'human_readable_type', + 'device_id', + 'lsm_data', + ] + + def __init__(self, + path, # type: str + sys_api=None, # type: Optional[Dict[str, Any]] + available=None, # type: Optional[bool] + rejected_reasons=None, # type: Optional[List[str]] + lvs=None, # type: Optional[List[str]] + device_id=None, # type: Optional[str] + lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]] + created=None, # type: Optional[datetime.datetime] + ceph_device=None # type: Optional[bool] + ): + self.path = path + self.sys_api = sys_api if sys_api is not None else {} # type: Dict[str, Any] + self.available = available + self.rejected_reasons = rejected_reasons if rejected_reasons is not None else [] + self.lvs = lvs + self.device_id = device_id + self.lsm_data = lsm_data if lsm_data is not None else {} # type: Dict[str, Dict[str, str]] + self.created = created if created is not None else datetime_now() + self.ceph_device = ceph_device + + def __eq__(self, other): + # type: (Any) -> bool + if not isinstance(other, Device): + return NotImplemented + diff = [k for k in self.report_fields if k != 'created' and (getattr(self, k) + != getattr(other, k))] + return not diff + + def to_json(self): + # type: () -> dict + return { + k: (getattr(self, k) if k != 'created' + or not isinstance(getattr(self, k), datetime.datetime) + else datetime_to_str(getattr(self, k))) + for k in self.report_fields + } + + @classmethod + def from_json(cls, input): + # type: (Dict[str, Any]) -> Device + if not isinstance(input, dict): + raise ValueError('Device: Expected dict. Got `{}...`'.format(json.dumps(input)[:10])) + ret = cls( + **{ + key: (input.get(key, None) if key != 'created' + or not input.get(key, None) + else str_to_datetime(input.get(key, None))) + for key in Device.report_fields + if key != 'human_readable_type' + } + ) + if ret.rejected_reasons: + ret.rejected_reasons = sorted(ret.rejected_reasons) + return ret + + @property + def human_readable_type(self): + # type: () -> str + if self.sys_api is None or 'rotational' not in self.sys_api: + return "unknown" + return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd' + + def __repr__(self) -> str: + device_desc: Dict[str, Union[str, List[str]]] = { + 'path': self.path if self.path is not None else 'unknown', + 'lvs': self.lvs if self.lvs else 'None', + 'available': str(self.available), + 'ceph_device': str(self.ceph_device) + } + if not self.available and self.rejected_reasons: + device_desc['rejection reasons'] = self.rejected_reasons + return "Device({})".format( + ', '.join('{}={}'.format(key, device_desc[key]) for key in device_desc.keys()) + ) diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py new file mode 100644 index 000000000..17130ea9a --- /dev/null +++ b/src/python-common/ceph/deployment/service_spec.py @@ -0,0 +1,1384 @@ +import fnmatch +import re +import enum +from collections import OrderedDict +from contextlib import contextmanager +from functools import wraps +from ipaddress import ip_network, ip_address +from typing import Optional, Dict, Any, List, Union, Callable, Iterable, Type, TypeVar, cast, \ + NamedTuple, Mapping, Iterator + +import yaml + +from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host +from ceph.deployment.utils import unwrap_ipv6, valid_addr +from ceph.utils import is_hex + +ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec') +FuncT = TypeVar('FuncT', bound=Callable) + + +def handle_type_error(method: FuncT) -> FuncT: + @wraps(method) + def inner(cls: Any, *args: Any, **kwargs: Any) -> Any: + try: + return method(cls, *args, **kwargs) + except (TypeError, AttributeError) as e: + error_msg = '{}: {}'.format(cls.__name__, e) + raise SpecValidationError(error_msg) + return cast(FuncT, inner) + + +class HostPlacementSpec(NamedTuple): + hostname: str + network: str + name: str + + def __str__(self) -> str: + res = '' + res += self.hostname + if self.network: + res += ':' + self.network + if self.name: + res += '=' + self.name + return res + + @classmethod + @handle_type_error + def from_json(cls, data: Union[dict, str]) -> 'HostPlacementSpec': + if isinstance(data, str): + return cls.parse(data) + return cls(**data) + + def to_json(self) -> str: + return str(self) + + @classmethod + def parse(cls, host, require_network=True): + # type: (str, bool) -> HostPlacementSpec + """ + Split host into host, network, and (optional) daemon name parts. The network + part can be an IP, CIDR, or ceph addrvec like '[v2:1.2.3.4:3300,v1:1.2.3.4:6789]'. + e.g., + "myhost" + "myhost=name" + "myhost:1.2.3.4" + "myhost:1.2.3.4=name" + "myhost:1.2.3.0/24" + "myhost:1.2.3.0/24=name" + "myhost:[v2:1.2.3.4:3000]=name" + "myhost:[v2:1.2.3.4:3000,v1:1.2.3.4:6789]=name" + """ + # Matches from start to : or = or until end of string + host_re = r'^(.*?)(:|=|$)' + # Matches from : to = or until end of string + ip_re = r':(.*?)(=|$)' + # Matches from = to end of string + name_re = r'=(.*?)$' + + # assign defaults + host_spec = cls('', '', '') + + match_host = re.search(host_re, host) + if match_host: + host_spec = host_spec._replace(hostname=match_host.group(1)) + + name_match = re.search(name_re, host) + if name_match: + host_spec = host_spec._replace(name=name_match.group(1)) + + ip_match = re.search(ip_re, host) + if ip_match: + host_spec = host_spec._replace(network=ip_match.group(1)) + + if not require_network: + return host_spec + + networks = list() # type: List[str] + network = host_spec.network + # in case we have [v2:1.2.3.4:3000,v1:1.2.3.4:6478] + if ',' in network: + networks = [x for x in network.split(',')] + else: + if network != '': + networks.append(network) + + for network in networks: + # only if we have versioned network configs + if network.startswith('v') or network.startswith('[v'): + # if this is ipv6 we can't just simply split on ':' so do + # a split once and rsplit once to leave us with just ipv6 addr + network = network.split(':', 1)[1] + network = network.rsplit(':', 1)[0] + try: + # if subnets are defined, also verify the validity + if '/' in network: + ip_network(network) + else: + ip_address(unwrap_ipv6(network)) + except ValueError as e: + # logging? + raise e + host_spec.validate() + return host_spec + + def validate(self) -> None: + assert_valid_host(self.hostname) + + +class PlacementSpec(object): + """ + For APIs that need to specify a host subset + """ + + def __init__(self, + label=None, # type: Optional[str] + hosts=None, # type: Union[List[str],List[HostPlacementSpec], None] + count=None, # type: Optional[int] + count_per_host=None, # type: Optional[int] + host_pattern=None, # type: Optional[str] + ): + # type: (...) -> None + self.label = label + self.hosts = [] # type: List[HostPlacementSpec] + + if hosts: + self.set_hosts(hosts) + + self.count = count # type: Optional[int] + self.count_per_host = count_per_host # type: Optional[int] + + #: fnmatch patterns to select hosts. Can also be a single host. + self.host_pattern = host_pattern # type: Optional[str] + + self.validate() + + def is_empty(self) -> bool: + return ( + self.label is None + and not self.hosts + and not self.host_pattern + and self.count is None + and self.count_per_host is None + ) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, PlacementSpec): + return self.label == other.label \ + and self.hosts == other.hosts \ + and self.count == other.count \ + and self.host_pattern == other.host_pattern \ + and self.count_per_host == other.count_per_host + return NotImplemented + + def set_hosts(self, hosts: Union[List[str], List[HostPlacementSpec]]) -> None: + # To backpopulate the .hosts attribute when using labels or count + # in the orchestrator backend. + if all([isinstance(host, HostPlacementSpec) for host in hosts]): + self.hosts = hosts # type: ignore + else: + self.hosts = [HostPlacementSpec.parse(x, require_network=False) # type: ignore + for x in hosts if x] + + # deprecated + def filter_matching_hosts(self, _get_hosts_func: Callable) -> List[str]: + return self.filter_matching_hostspecs(_get_hosts_func(as_hostspec=True)) + + def filter_matching_hostspecs(self, hostspecs: Iterable[HostSpec]) -> List[str]: + if self.hosts: + all_hosts = [hs.hostname for hs in hostspecs] + return [h.hostname for h in self.hosts if h.hostname in all_hosts] + if self.label: + return [hs.hostname for hs in hostspecs if self.label in hs.labels] + all_hosts = [hs.hostname for hs in hostspecs] + if self.host_pattern: + return fnmatch.filter(all_hosts, self.host_pattern) + return all_hosts + + def get_target_count(self, hostspecs: Iterable[HostSpec]) -> int: + if self.count: + return self.count + return len(self.filter_matching_hostspecs(hostspecs)) * (self.count_per_host or 1) + + def pretty_str(self) -> str: + """ + >>> #doctest: +SKIP + ... ps = PlacementSpec(...) # For all placement specs: + ... PlacementSpec.from_string(ps.pretty_str()) == ps + """ + kv = [] + if self.hosts: + kv.append(';'.join([str(h) for h in self.hosts])) + if self.count: + kv.append('count:%d' % self.count) + if self.count_per_host: + kv.append('count-per-host:%d' % self.count_per_host) + if self.label: + kv.append('label:%s' % self.label) + if self.host_pattern: + kv.append(self.host_pattern) + return ';'.join(kv) + + def __repr__(self) -> str: + kv = [] + if self.count: + kv.append('count=%d' % self.count) + if self.count_per_host: + kv.append('count_per_host=%d' % self.count_per_host) + if self.label: + kv.append('label=%s' % repr(self.label)) + if self.hosts: + kv.append('hosts={!r}'.format(self.hosts)) + if self.host_pattern: + kv.append('host_pattern={!r}'.format(self.host_pattern)) + return "PlacementSpec(%s)" % ', '.join(kv) + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'PlacementSpec': + c = data.copy() + hosts = c.get('hosts', []) + if hosts: + c['hosts'] = [] + for host in hosts: + c['hosts'].append(HostPlacementSpec.from_json(host)) + _cls = cls(**c) + _cls.validate() + return _cls + + def to_json(self) -> dict: + r: Dict[str, Any] = {} + if self.label: + r['label'] = self.label + if self.hosts: + r['hosts'] = [host.to_json() for host in self.hosts] + if self.count: + r['count'] = self.count + if self.count_per_host: + r['count_per_host'] = self.count_per_host + if self.host_pattern: + r['host_pattern'] = self.host_pattern + return r + + def validate(self) -> None: + if self.hosts and self.label: + # TODO: a less generic Exception + raise SpecValidationError('Host and label are mutually exclusive') + if self.count is not None: + try: + intval = int(self.count) + except (ValueError, TypeError): + raise SpecValidationError("num/count must be a numeric value") + if self.count != intval: + raise SpecValidationError("num/count must be an integer value") + if self.count < 1: + raise SpecValidationError("num/count must be >= 1") + if self.count_per_host is not None: + try: + intval = int(self.count_per_host) + except (ValueError, TypeError): + raise SpecValidationError("count-per-host must be a numeric value") + if self.count_per_host != intval: + raise SpecValidationError("count-per-host must be an integer value") + if self.count_per_host < 1: + raise SpecValidationError("count-per-host must be >= 1") + if self.count_per_host is not None and not ( + self.label + or self.hosts + or self.host_pattern + ): + raise SpecValidationError( + "count-per-host must be combined with label or hosts or host_pattern" + ) + if self.count is not None and self.count_per_host is not None: + raise SpecValidationError("cannot combine count and count-per-host") + if ( + self.count_per_host is not None + and self.hosts + and any([hs.network or hs.name for hs in self.hosts]) + ): + raise SpecValidationError( + "count-per-host cannot be combined explicit placement with names or networks" + ) + if self.host_pattern: + if not isinstance(self.host_pattern, str): + raise SpecValidationError('host_pattern must be of type string') + if self.hosts: + raise SpecValidationError('cannot combine host patterns and hosts') + + for h in self.hosts: + h.validate() + + @classmethod + def from_string(cls, arg): + # type: (Optional[str]) -> PlacementSpec + """ + A single integer is parsed as a count: + + >>> PlacementSpec.from_string('3') + PlacementSpec(count=3) + + A list of names is parsed as host specifications: + + >>> PlacementSpec.from_string('host1 host2') + PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacemen\ +tSpec(hostname='host2', network='', name='')]) + + You can also prefix the hosts with a count as follows: + + >>> PlacementSpec.from_string('2 host1 host2') + PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), Hos\ +tPlacementSpec(hostname='host2', network='', name='')]) + + You can specify labels using `label:<label>` + + >>> PlacementSpec.from_string('label:mon') + PlacementSpec(label='mon') + + Labels also support a count: + + >>> PlacementSpec.from_string('3 label:mon') + PlacementSpec(count=3, label='mon') + + fnmatch is also supported: + + >>> PlacementSpec.from_string('data[1-3]') + PlacementSpec(host_pattern='data[1-3]') + + >>> PlacementSpec.from_string(None) + PlacementSpec() + """ + if arg is None or not arg: + strings = [] + elif isinstance(arg, str): + if ' ' in arg: + strings = arg.split(' ') + elif ';' in arg: + strings = arg.split(';') + elif ',' in arg and '[' not in arg: + # FIXME: this isn't quite right. we want to avoid breaking + # a list of mons with addrvecs... so we're basically allowing + # , most of the time, except when addrvecs are used. maybe + # ok? + strings = arg.split(',') + else: + strings = [arg] + else: + raise SpecValidationError('invalid placement %s' % arg) + + count = None + count_per_host = None + if strings: + try: + count = int(strings[0]) + strings = strings[1:] + except ValueError: + pass + for s in strings: + if s.startswith('count:'): + try: + count = int(s[len('count:'):]) + strings.remove(s) + break + except ValueError: + pass + for s in strings: + if s.startswith('count-per-host:'): + try: + count_per_host = int(s[len('count-per-host:'):]) + strings.remove(s) + break + except ValueError: + pass + + advanced_hostspecs = [h for h in strings if + (':' in h or '=' in h or not any(c in '[]?*:=' for c in h)) and + 'label:' not in h] + for a_h in advanced_hostspecs: + strings.remove(a_h) + + labels = [x for x in strings if 'label:' in x] + if len(labels) > 1: + raise SpecValidationError('more than one label provided: {}'.format(labels)) + for l in labels: + strings.remove(l) + label = labels[0][6:] if labels else None + + host_patterns = strings + if len(host_patterns) > 1: + raise SpecValidationError( + 'more than one host pattern provided: {}'.format(host_patterns)) + + ps = PlacementSpec(count=count, + count_per_host=count_per_host, + hosts=advanced_hostspecs, + label=label, + host_pattern=host_patterns[0] if host_patterns else None) + return ps + + +_service_spec_from_json_validate = True + + +@contextmanager +def service_spec_allow_invalid_from_json() -> Iterator[None]: + """ + I know this is evil, but unfortunately `ceph orch ls` + may return invalid OSD specs for OSDs not associated to + and specs. If you have a better idea, please! + """ + global _service_spec_from_json_validate + _service_spec_from_json_validate = False + yield + _service_spec_from_json_validate = True + + +class ServiceSpec(object): + """ + Details of service creation. + + Request to the orchestrator for a cluster of daemons + such as MDS, RGW, iscsi gateway, MONs, MGRs, Prometheus + + This structure is supposed to be enough information to + start the services. + """ + KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \ + 'node-exporter osd prometheus rbd-mirror rgw ' \ + 'container cephadm-exporter ingress cephfs-mirror snmp-gateway'.split() + REQUIRES_SERVICE_ID = 'iscsi mds nfs rgw container ingress '.split() + MANAGED_CONFIG_OPTIONS = [ + 'mds_join_fs', + ] + + @classmethod + def _cls(cls: Type[ServiceSpecT], service_type: str) -> Type[ServiceSpecT]: + from ceph.deployment.drive_group import DriveGroupSpec + + ret = { + 'rgw': RGWSpec, + 'nfs': NFSServiceSpec, + 'osd': DriveGroupSpec, + 'mds': MDSSpec, + 'iscsi': IscsiServiceSpec, + 'alertmanager': AlertManagerSpec, + 'ingress': IngressSpec, + 'container': CustomContainerSpec, + 'grafana': GrafanaSpec, + 'node-exporter': MonitoringSpec, + 'prometheus': PrometheusSpec, + 'snmp-gateway': SNMPGatewaySpec, + }.get(service_type, cls) + if ret == ServiceSpec and not service_type: + raise SpecValidationError('Spec needs a "service_type" key.') + return ret + + def __new__(cls: Type[ServiceSpecT], *args: Any, **kwargs: Any) -> ServiceSpecT: + """ + Some Python foo to make sure, we don't have an object + like `ServiceSpec('rgw')` of type `ServiceSpec`. Now we have: + + >>> type(ServiceSpec('rgw')) == type(RGWSpec('rgw')) + True + + """ + if cls != ServiceSpec: + return object.__new__(cls) + service_type = kwargs.get('service_type', args[0] if args else None) + sub_cls: Any = cls._cls(service_type) + return object.__new__(sub_cls) + + def __init__(self, + service_type: str, + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + count: Optional[int] = None, + config: Optional[Dict[str, str]] = None, + unmanaged: bool = False, + preview_only: bool = False, + networks: Optional[List[str]] = None, + extra_container_args: Optional[List[str]] = None, + ): + + #: See :ref:`orchestrator-cli-placement-spec`. + self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec + + assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES, service_type + #: The type of the service. Needs to be either a Ceph + #: service (``mon``, ``crash``, ``mds``, ``mgr``, ``osd`` or + #: ``rbd-mirror``), a gateway (``nfs`` or ``rgw``), part of the + #: monitoring stack (``alertmanager``, ``grafana``, ``node-exporter`` or + #: ``prometheus``) or (``container``) for custom containers. + self.service_type = service_type + + #: The name of the service. Required for ``iscsi``, ``mds``, ``nfs``, ``osd``, ``rgw``, + #: ``container``, ``ingress`` + self.service_id = None + + if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd': + self.service_id = service_id + + #: If set to ``true``, the orchestrator will not deploy nor remove + #: any daemon associated with this service. Placement and all other properties + #: will be ignored. This is useful, if you do not want this service to be + #: managed temporarily. For cephadm, See :ref:`cephadm-spec-unmanaged` + self.unmanaged = unmanaged + self.preview_only = preview_only + + #: A list of network identities instructing the daemons to only bind + #: on the particular networks in that list. In case the cluster is distributed + #: across multiple networks, you can add multiple networks. See + #: :ref:`cephadm-monitoring-networks-ports`, + #: :ref:`cephadm-rgw-networks` and :ref:`cephadm-mgr-networks`. + self.networks: List[str] = networks or [] + + self.config: Optional[Dict[str, str]] = None + if config: + self.config = {k.replace(' ', '_'): v for k, v in config.items()} + + self.extra_container_args: Optional[List[str]] = extra_container_args + + @classmethod + @handle_type_error + def from_json(cls: Type[ServiceSpecT], json_spec: Dict) -> ServiceSpecT: + """ + Initialize 'ServiceSpec' object data from a json structure + + There are two valid styles for service specs: + + the "old" style: + + .. code:: yaml + + service_type: nfs + service_id: foo + pool: mypool + namespace: myns + + and the "new" style: + + .. code:: yaml + + service_type: nfs + service_id: foo + config: + some_option: the_value + networks: [10.10.0.0/16] + spec: + pool: mypool + namespace: myns + + In https://tracker.ceph.com/issues/45321 we decided that we'd like to + prefer the new style as it is more readable and provides a better + understanding of what fields are special for a give service type. + + Note, we'll need to stay compatible with both versions for the + the next two major releases (octoups, pacific). + + :param json_spec: A valid dict with ServiceSpec + + :meta private: + """ + if not isinstance(json_spec, dict): + raise SpecValidationError( + f'Service Spec is not an (JSON or YAML) object. got "{str(json_spec)}"') + + json_spec = cls.normalize_json(json_spec) + + c = json_spec.copy() + + # kludge to make `from_json` compatible to `Orchestrator.describe_service` + # Open question: Remove `service_id` form to_json? + if c.get('service_name', ''): + service_type_id = c['service_name'].split('.', 1) + + if not c.get('service_type', ''): + c['service_type'] = service_type_id[0] + if not c.get('service_id', '') and len(service_type_id) > 1: + c['service_id'] = service_type_id[1] + del c['service_name'] + + service_type = c.get('service_type', '') + _cls = cls._cls(service_type) + + if 'status' in c: + del c['status'] # kludge to make us compatible to `ServiceDescription.to_json()` + + return _cls._from_json_impl(c) # type: ignore + + @staticmethod + def normalize_json(json_spec: dict) -> dict: + networks = json_spec.get('networks') + if networks is None: + return json_spec + if isinstance(networks, list): + return json_spec + if not isinstance(networks, str): + raise SpecValidationError(f'Networks ({networks}) must be a string or list of strings') + json_spec['networks'] = [networks] + return json_spec + + @classmethod + def _from_json_impl(cls: Type[ServiceSpecT], json_spec: dict) -> ServiceSpecT: + args = {} # type: Dict[str, Any] + for k, v in json_spec.items(): + if k == 'placement': + v = PlacementSpec.from_json(v) + if k == 'spec': + args.update(v) + continue + args.update({k: v}) + _cls = cls(**args) + if _service_spec_from_json_validate: + _cls.validate() + return _cls + + def service_name(self) -> str: + n = self.service_type + if self.service_id: + n += '.' + self.service_id + return n + + def get_port_start(self) -> List[int]: + # If defined, we will allocate and number ports starting at this + # point. + return [] + + def get_virtual_ip(self) -> Optional[str]: + return None + + def to_json(self): + # type: () -> OrderedDict[str, Any] + ret: OrderedDict[str, Any] = OrderedDict() + ret['service_type'] = self.service_type + if self.service_id: + ret['service_id'] = self.service_id + ret['service_name'] = self.service_name() + if self.placement.to_json(): + ret['placement'] = self.placement.to_json() + if self.unmanaged: + ret['unmanaged'] = self.unmanaged + if self.networks: + ret['networks'] = self.networks + if self.extra_container_args: + ret['extra_container_args'] = self.extra_container_args + + c = {} + for key, val in sorted(self.__dict__.items(), key=lambda tpl: tpl[0]): + if key in ret: + continue + if hasattr(val, 'to_json'): + val = val.to_json() + if val: + c[key] = val + if c: + ret['spec'] = c + return ret + + def validate(self) -> None: + if not self.service_type: + raise SpecValidationError('Cannot add Service: type required') + + if self.service_type != 'osd': + if self.service_type in self.REQUIRES_SERVICE_ID and not self.service_id: + raise SpecValidationError('Cannot add Service: id required') + if self.service_type not in self.REQUIRES_SERVICE_ID and self.service_id: + raise SpecValidationError( + f'Service of type \'{self.service_type}\' should not contain a service id') + + if self.service_id: + if not re.match('^[a-zA-Z0-9_.-]+$', str(self.service_id)): + raise SpecValidationError('Service id contains invalid characters, ' + 'only [a-zA-Z0-9_.-] allowed') + + if self.placement is not None: + self.placement.validate() + if self.config: + for k, v in self.config.items(): + if k in self.MANAGED_CONFIG_OPTIONS: + raise SpecValidationError( + f'Cannot set config option {k} in spec: it is managed by cephadm' + ) + for network in self.networks or []: + try: + ip_network(network) + except ValueError as e: + raise SpecValidationError( + f'Cannot parse network {network}: {e}' + ) + + def __repr__(self) -> str: + y = yaml.dump(cast(dict, self), default_flow_style=False) + return f"{self.__class__.__name__}.from_json(yaml.safe_load('''{y}'''))" + + def __eq__(self, other: Any) -> bool: + return (self.__class__ == other.__class__ + and + self.__dict__ == other.__dict__) + + def one_line_str(self) -> str: + return '<{} for service_name={}>'.format(self.__class__.__name__, self.service_name()) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceSpec') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(ServiceSpec, ServiceSpec.yaml_representer) + + +class NFSServiceSpec(ServiceSpec): + def __init__(self, + service_type: str = 'nfs', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'nfs' + super(NFSServiceSpec, self).__init__( + 'nfs', service_id=service_id, + placement=placement, unmanaged=unmanaged, preview_only=preview_only, + config=config, networks=networks, extra_container_args=extra_container_args) + + self.port = port + + def get_port_start(self) -> List[int]: + if self.port: + return [self.port] + return [] + + def rados_config_name(self): + # type: () -> str + return 'conf-' + self.service_name() + + +yaml.add_representer(NFSServiceSpec, ServiceSpec.yaml_representer) + + +class RGWSpec(ServiceSpec): + """ + Settings to configure a (multisite) Ceph RGW + + .. code-block:: yaml + + service_type: rgw + service_id: myrealm.myzone + spec: + rgw_realm: myrealm + rgw_zone: myzone + ssl: true + rgw_frontend_port: 1234 + rgw_frontend_type: beast + rgw_frontend_ssl_certificate: ... + + See also: :ref:`orchestrator-cli-service-spec` + """ + + MANAGED_CONFIG_OPTIONS = ServiceSpec.MANAGED_CONFIG_OPTIONS + [ + 'rgw_zone', + 'rgw_realm', + 'rgw_frontends', + ] + + def __init__(self, + service_type: str = 'rgw', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + rgw_realm: Optional[str] = None, + rgw_zone: Optional[str] = None, + rgw_frontend_port: Optional[int] = None, + rgw_frontend_ssl_certificate: Optional[List[str]] = None, + rgw_frontend_type: Optional[str] = None, + unmanaged: bool = False, + ssl: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + subcluster: Optional[str] = None, # legacy, only for from_json on upgrade + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'rgw', service_type + + # for backward compatibility with octopus spec files, + if not service_id and (rgw_realm and rgw_zone): + service_id = rgw_realm + '.' + rgw_zone + + super(RGWSpec, self).__init__( + 'rgw', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, + extra_container_args=extra_container_args) + + #: The RGW realm associated with this service. Needs to be manually created + self.rgw_realm: Optional[str] = rgw_realm + #: The RGW zone associated with this service. Needs to be manually created + self.rgw_zone: Optional[str] = rgw_zone + #: Port of the RGW daemons + self.rgw_frontend_port: Optional[int] = rgw_frontend_port + #: List of SSL certificates + self.rgw_frontend_ssl_certificate: Optional[List[str]] = rgw_frontend_ssl_certificate + #: civetweb or beast (default: beast). See :ref:`rgw_frontends` + self.rgw_frontend_type: Optional[str] = rgw_frontend_type + #: enable SSL + self.ssl = ssl + + def get_port_start(self) -> List[int]: + return [self.get_port()] + + def get_port(self) -> int: + if self.rgw_frontend_port: + return self.rgw_frontend_port + if self.ssl: + return 443 + else: + return 80 + + def validate(self) -> None: + super(RGWSpec, self).validate() + + if self.rgw_realm and not self.rgw_zone: + raise SpecValidationError( + 'Cannot add RGW: Realm specified but no zone specified') + if self.rgw_zone and not self.rgw_realm: + raise SpecValidationError( + 'Cannot add RGW: Zone specified but no realm specified') + + +yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer) + + +class IscsiServiceSpec(ServiceSpec): + def __init__(self, + service_type: str = 'iscsi', + service_id: Optional[str] = None, + pool: Optional[str] = None, + trusted_ip_list: Optional[str] = None, + api_port: Optional[int] = None, + api_user: Optional[str] = None, + api_password: Optional[str] = None, + api_secure: Optional[bool] = None, + ssl_cert: Optional[str] = None, + ssl_key: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'iscsi' + super(IscsiServiceSpec, self).__init__('iscsi', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, + config=config, networks=networks, + extra_container_args=extra_container_args) + + #: RADOS pool where ceph-iscsi config data is stored. + self.pool = pool + #: list of trusted IP addresses + self.trusted_ip_list = trusted_ip_list + #: ``api_port`` as defined in the ``iscsi-gateway.cfg`` + self.api_port = api_port + #: ``api_user`` as defined in the ``iscsi-gateway.cfg`` + self.api_user = api_user + #: ``api_password`` as defined in the ``iscsi-gateway.cfg`` + self.api_password = api_password + #: ``api_secure`` as defined in the ``iscsi-gateway.cfg`` + self.api_secure = api_secure + #: SSL certificate + self.ssl_cert = ssl_cert + #: SSL private key + self.ssl_key = ssl_key + + if not self.api_secure and self.ssl_cert and self.ssl_key: + self.api_secure = True + + def validate(self) -> None: + super(IscsiServiceSpec, self).validate() + + if not self.pool: + raise SpecValidationError( + 'Cannot add ISCSI: No Pool specified') + + # Do not need to check for api_user and api_password as they + # now default to 'admin' when setting up the gateway url. Older + # iSCSI specs from before this change should be fine as they will + # have been required to have an api_user and api_password set and + # will be unaffected by the new default value. + + +yaml.add_representer(IscsiServiceSpec, ServiceSpec.yaml_representer) + + +class IngressSpec(ServiceSpec): + def __init__(self, + service_type: str = 'ingress', + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + backend_service: Optional[str] = None, + frontend_port: Optional[int] = None, + ssl_cert: Optional[str] = None, + ssl_key: Optional[str] = None, + ssl_dh_param: Optional[str] = None, + ssl_ciphers: Optional[List[str]] = None, + ssl_options: Optional[List[str]] = None, + monitor_port: Optional[int] = None, + monitor_user: Optional[str] = None, + monitor_password: Optional[str] = None, + enable_stats: Optional[bool] = None, + keepalived_password: Optional[str] = None, + virtual_ip: Optional[str] = None, + virtual_ips_list: Optional[List[str]] = None, + virtual_interface_networks: Optional[List[str]] = [], + unmanaged: bool = False, + ssl: bool = False, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'ingress' + + super(IngressSpec, self).__init__( + 'ingress', service_id=service_id, + placement=placement, config=config, + networks=networks, + extra_container_args=extra_container_args + ) + self.backend_service = backend_service + self.frontend_port = frontend_port + self.ssl_cert = ssl_cert + self.ssl_key = ssl_key + self.ssl_dh_param = ssl_dh_param + self.ssl_ciphers = ssl_ciphers + self.ssl_options = ssl_options + self.monitor_port = monitor_port + self.monitor_user = monitor_user + self.monitor_password = monitor_password + self.keepalived_password = keepalived_password + self.virtual_ip = virtual_ip + self.virtual_ips_list = virtual_ips_list + self.virtual_interface_networks = virtual_interface_networks or [] + self.unmanaged = unmanaged + self.ssl = ssl + + def get_port_start(self) -> List[int]: + return [cast(int, self.frontend_port), + cast(int, self.monitor_port)] + + def get_virtual_ip(self) -> Optional[str]: + return self.virtual_ip + + def validate(self) -> None: + super(IngressSpec, self).validate() + + if not self.backend_service: + raise SpecValidationError( + 'Cannot add ingress: No backend_service specified') + if not self.frontend_port: + raise SpecValidationError( + 'Cannot add ingress: No frontend_port specified') + if not self.monitor_port: + raise SpecValidationError( + 'Cannot add ingress: No monitor_port specified') + if not self.virtual_ip and not self.virtual_ips_list: + raise SpecValidationError( + 'Cannot add ingress: No virtual_ip provided') + if self.virtual_ip is not None and self.virtual_ips_list is not None: + raise SpecValidationError( + 'Cannot add ingress: Single and multiple virtual IPs specified') + + +yaml.add_representer(IngressSpec, ServiceSpec.yaml_representer) + + +class CustomContainerSpec(ServiceSpec): + def __init__(self, + service_type: str = 'container', + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + image: Optional[str] = None, + entrypoint: Optional[str] = None, + uid: Optional[int] = None, + gid: Optional[int] = None, + volume_mounts: Optional[Dict[str, str]] = {}, + args: Optional[List[str]] = [], + envs: Optional[List[str]] = [], + privileged: Optional[bool] = False, + bind_mounts: Optional[List[List[str]]] = None, + ports: Optional[List[int]] = [], + dirs: Optional[List[str]] = [], + files: Optional[Dict[str, Any]] = {}, + ): + assert service_type == 'container' + assert service_id is not None + assert image is not None + + super(CustomContainerSpec, self).__init__( + service_type, service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, + networks=networks) + + self.image = image + self.entrypoint = entrypoint + self.uid = uid + self.gid = gid + self.volume_mounts = volume_mounts + self.args = args + self.envs = envs + self.privileged = privileged + self.bind_mounts = bind_mounts + self.ports = ports + self.dirs = dirs + self.files = files + + def config_json(self) -> Dict[str, Any]: + """ + Helper function to get the value of the `--config-json` cephadm + command line option. It will contain all specification properties + that haven't a `None` value. Such properties will get default + values in cephadm. + :return: Returns a dictionary containing all specification + properties. + """ + config_json = {} + for prop in ['image', 'entrypoint', 'uid', 'gid', 'args', + 'envs', 'volume_mounts', 'privileged', + 'bind_mounts', 'ports', 'dirs', 'files']: + value = getattr(self, prop) + if value is not None: + config_json[prop] = value + return config_json + + +yaml.add_representer(CustomContainerSpec, ServiceSpec.yaml_representer) + + +class MonitoringSpec(ServiceSpec): + def __init__(self, + service_type: str, + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + port: Optional[int] = None, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type in ['grafana', 'node-exporter', 'prometheus', 'alertmanager'] + + super(MonitoringSpec, self).__init__( + service_type, service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, + networks=networks, extra_container_args=extra_container_args) + + self.service_type = service_type + self.port = port + + def get_port_start(self) -> List[int]: + return [self.get_port()] + + def get_port(self) -> int: + if self.port: + return self.port + else: + return {'prometheus': 9095, + 'node-exporter': 9100, + 'alertmanager': 9093, + 'grafana': 3000}[self.service_type] + + +yaml.add_representer(MonitoringSpec, ServiceSpec.yaml_representer) + + +class AlertManagerSpec(MonitoringSpec): + def __init__(self, + service_type: str = 'alertmanager', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + user_data: Optional[Dict[str, Any]] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + secure: bool = False, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'alertmanager' + super(AlertManagerSpec, self).__init__( + 'alertmanager', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, port=port, + extra_container_args=extra_container_args) + + # Custom configuration. + # + # Example: + # service_type: alertmanager + # service_id: xyz + # user_data: + # default_webhook_urls: + # - "https://foo" + # - "https://bar" + # + # Documentation: + # default_webhook_urls - A list of additional URL's that are + # added to the default receivers' + # <webhook_configs> configuration. + self.user_data = user_data or {} + self.secure = secure + + def get_port_start(self) -> List[int]: + return [self.get_port(), 9094] + + def validate(self) -> None: + super(AlertManagerSpec, self).validate() + + if self.port == 9094: + raise SpecValidationError( + 'Port 9094 is reserved for AlertManager cluster listen address') + + +yaml.add_representer(AlertManagerSpec, ServiceSpec.yaml_representer) + + +class GrafanaSpec(MonitoringSpec): + def __init__(self, + service_type: str = 'grafana', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + initial_admin_password: Optional[str] = None, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'grafana' + super(GrafanaSpec, self).__init__( + 'grafana', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, port=port, + extra_container_args=extra_container_args) + + self.initial_admin_password = initial_admin_password + + +yaml.add_representer(GrafanaSpec, ServiceSpec.yaml_representer) + + +class PrometheusSpec(MonitoringSpec): + def __init__(self, + service_type: str = 'prometheus', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + port: Optional[int] = None, + retention_time: Optional[str] = None, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'prometheus' + super(PrometheusSpec, self).__init__( + 'prometheus', service_id=service_id, + placement=placement, unmanaged=unmanaged, + preview_only=preview_only, config=config, networks=networks, port=port, + extra_container_args=extra_container_args) + + self.retention_time = retention_time + + +yaml.add_representer(PrometheusSpec, ServiceSpec.yaml_representer) + + +class SNMPGatewaySpec(ServiceSpec): + class SNMPVersion(str, enum.Enum): + V2c = 'V2c' + V3 = 'V3' + + def to_json(self) -> str: + return self.value + + class SNMPAuthType(str, enum.Enum): + MD5 = 'MD5' + SHA = 'SHA' + + def to_json(self) -> str: + return self.value + + class SNMPPrivacyType(str, enum.Enum): + DES = 'DES' + AES = 'AES' + + def to_json(self) -> str: + return self.value + + valid_destination_types = [ + 'Name:Port', + 'IPv4:Port' + ] + + def __init__(self, + service_type: str = 'snmp-gateway', + snmp_version: Optional[SNMPVersion] = None, + snmp_destination: str = '', + credentials: Dict[str, str] = {}, + engine_id: Optional[str] = None, + auth_protocol: Optional[SNMPAuthType] = None, + privacy_protocol: Optional[SNMPPrivacyType] = None, + placement: Optional[PlacementSpec] = None, + unmanaged: bool = False, + preview_only: bool = False, + port: Optional[int] = None, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'snmp-gateway' + + super(SNMPGatewaySpec, self).__init__( + service_type, + placement=placement, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args) + + self.service_type = service_type + self.snmp_version = snmp_version + self.snmp_destination = snmp_destination + self.port = port + self.credentials = credentials + self.engine_id = engine_id + self.auth_protocol = auth_protocol + self.privacy_protocol = privacy_protocol + + @classmethod + def _from_json_impl(cls, json_spec: dict) -> 'SNMPGatewaySpec': + + cpy = json_spec.copy() + types = [ + ('snmp_version', SNMPGatewaySpec.SNMPVersion), + ('auth_protocol', SNMPGatewaySpec.SNMPAuthType), + ('privacy_protocol', SNMPGatewaySpec.SNMPPrivacyType), + ] + for d in cpy, cpy.get('spec', {}): + for key, enum_cls in types: + try: + if key in d: + d[key] = enum_cls(d[key]) + except ValueError: + raise SpecValidationError(f'{key} unsupported. Must be one of ' + f'{", ".join(enum_cls)}') + return super(SNMPGatewaySpec, cls)._from_json_impl(cpy) + + @property + def ports(self) -> List[int]: + return [self.port or 9464] + + def get_port_start(self) -> List[int]: + return self.ports + + def validate(self) -> None: + super(SNMPGatewaySpec, self).validate() + + if not self.credentials: + raise SpecValidationError( + 'Missing authentication information (credentials). ' + 'SNMP V2c and V3 require credential information' + ) + elif not self.snmp_version: + raise SpecValidationError( + 'Missing SNMP version (snmp_version)' + ) + + creds_requirement = { + 'V2c': ['snmp_community'], + 'V3': ['snmp_v3_auth_username', 'snmp_v3_auth_password'] + } + if self.privacy_protocol: + creds_requirement['V3'].append('snmp_v3_priv_password') + + missing = [parm for parm in creds_requirement[self.snmp_version] + if parm not in self.credentials] + # check that credentials are correct for the version + if missing: + raise SpecValidationError( + f'SNMP {self.snmp_version} credentials are incomplete. Missing {", ".join(missing)}' + ) + + if self.engine_id: + if 10 <= len(self.engine_id) <= 64 and \ + is_hex(self.engine_id) and \ + len(self.engine_id) % 2 == 0: + pass + else: + raise SpecValidationError( + 'engine_id must be a string containing 10-64 hex characters. ' + 'Its length must be divisible by 2' + ) + + else: + if self.snmp_version == 'V3': + raise SpecValidationError( + 'Must provide an engine_id for SNMP V3 notifications' + ) + + if not self.snmp_destination: + raise SpecValidationError( + 'SNMP destination (snmp_destination) must be provided' + ) + else: + valid, description = valid_addr(self.snmp_destination) + if not valid: + raise SpecValidationError( + f'SNMP destination (snmp_destination) is invalid: {description}' + ) + if description not in self.valid_destination_types: + raise SpecValidationError( + f'SNMP destination (snmp_destination) type ({description}) is invalid. ' + f'Must be either: {", ".join(sorted(self.valid_destination_types))}' + ) + + +yaml.add_representer(SNMPGatewaySpec, ServiceSpec.yaml_representer) + + +class MDSSpec(ServiceSpec): + def __init__(self, + service_type: str = 'mds', + service_id: Optional[str] = None, + placement: Optional[PlacementSpec] = None, + config: Optional[Dict[str, str]] = None, + unmanaged: bool = False, + preview_only: bool = False, + extra_container_args: Optional[List[str]] = None, + ): + assert service_type == 'mds' + super(MDSSpec, self).__init__('mds', service_id=service_id, + placement=placement, + config=config, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args) + + def validate(self) -> None: + super(MDSSpec, self).validate() + + if str(self.service_id)[0].isdigit(): + raise SpecValidationError('MDS service id cannot start with a numeric digit') + + +yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer) diff --git a/src/python-common/ceph/deployment/translate.py b/src/python-common/ceph/deployment/translate.py new file mode 100644 index 000000000..2d373732c --- /dev/null +++ b/src/python-common/ceph/deployment/translate.py @@ -0,0 +1,119 @@ +import logging + +try: + from typing import Optional, List +except ImportError: + pass + +from ceph.deployment.drive_selection.selector import DriveSelection + +logger = logging.getLogger(__name__) + + +# TODO refactor this to a DriveSelection method +class to_ceph_volume(object): + + def __init__(self, + selection, # type: DriveSelection + osd_id_claims=None, # type: Optional[List[str]] + preview=False # type: bool + ): + + self.selection = selection + self.spec = selection.spec + self.preview = preview + self.osd_id_claims = osd_id_claims + + def run(self): + # type: () -> List[str] + """ Generate ceph-volume commands based on the DriveGroup filters """ + data_devices = [x.path for x in self.selection.data_devices()] + db_devices = [x.path for x in self.selection.db_devices()] + wal_devices = [x.path for x in self.selection.wal_devices()] + journal_devices = [x.path for x in self.selection.journal_devices()] + + if not data_devices: + return [] + + cmds: List[str] = [] + if self.spec.method == 'raw': + assert self.spec.objectstore == 'bluestore' + # ceph-volume raw prepare only support 1:1 ratio of data to db/wal devices + if data_devices and db_devices: + if len(data_devices) != len(db_devices): + raise ValueError('Number of data devices must match number of ' + 'db devices for raw mode osds') + if data_devices and wal_devices: + if len(data_devices) != len(wal_devices): + raise ValueError('Number of data devices must match number of ' + 'wal devices for raw mode osds') + # for raw prepare each data device needs its own prepare command + dev_counter = 0 + while dev_counter < len(data_devices): + cmd = "raw prepare --bluestore" + cmd += " --data {}".format(data_devices[dev_counter]) + if db_devices: + cmd += " --block.db {}".format(db_devices[dev_counter]) + if wal_devices: + cmd += " --block.wal {}".format(wal_devices[dev_counter]) + cmds.append(cmd) + dev_counter += 1 + + elif self.spec.objectstore == 'filestore': + # for lvm batch we can just do all devices in one command + cmd = "lvm batch --no-auto" + + cmd += " {}".format(" ".join(data_devices)) + + if self.spec.journal_size: + cmd += " --journal-size {}".format(self.spec.journal_size) + + if journal_devices: + cmd += " --journal-devices {}".format( + ' '.join(journal_devices)) + + cmd += " --filestore" + cmds.append(cmd) + + elif self.spec.objectstore == 'bluestore': + # for lvm batch we can just do all devices in one command + cmd = "lvm batch --no-auto {}".format(" ".join(data_devices)) + + if db_devices: + cmd += " --db-devices {}".format(" ".join(db_devices)) + + if wal_devices: + cmd += " --wal-devices {}".format(" ".join(wal_devices)) + + if self.spec.block_wal_size: + cmd += " --block-wal-size {}".format(self.spec.block_wal_size) + + if self.spec.block_db_size: + cmd += " --block-db-size {}".format(self.spec.block_db_size) + cmds.append(cmd) + + for i in range(len(cmds)): + if self.spec.encrypted: + cmds[i] += " --dmcrypt" + + if self.spec.osds_per_device: + cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device) + + if self.spec.data_allocate_fraction: + cmds[i] += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction) + + if self.osd_id_claims: + cmds[i] += " --osd-ids {}".format(" ".join(self.osd_id_claims)) + + if self.spec.method != 'raw': + cmds[i] += " --yes" + cmds[i] += " --no-systemd" + + if self.spec.crush_device_class: + cmds[i] += " --crush-device-class {}".format(self.spec.crush_device_class) + + if self.preview: + cmds[i] += " --report" + cmds[i] += " --format json" + + return cmds diff --git a/src/python-common/ceph/deployment/utils.py b/src/python-common/ceph/deployment/utils.py new file mode 100644 index 000000000..6aad15b75 --- /dev/null +++ b/src/python-common/ceph/deployment/utils.py @@ -0,0 +1,102 @@ +import ipaddress +import socket +from typing import Tuple, Optional +from urllib.parse import urlparse + + +def unwrap_ipv6(address): + # type: (str) -> str + if address.startswith('[') and address.endswith(']'): + return address[1:-1] + return address + + +def wrap_ipv6(address): + # type: (str) -> str + + # We cannot assume it's already wrapped or even an IPv6 address if + # it's already wrapped it'll not pass (like if it's a hostname) and trigger + # the ValueError + try: + if ipaddress.ip_address(address).version == 6: + return f"[{address}]" + except ValueError: + pass + + return address + + +def is_ipv6(address): + # type: (str) -> bool + address = unwrap_ipv6(address) + try: + return ipaddress.ip_address(address).version == 6 + except ValueError: + return False + + +def valid_addr(addr: str) -> Tuple[bool, str]: + """check that an address string is valid + Valid in this case means that a name is resolvable, or the + IP address string is a correctly formed IPv4 or IPv6 address, + with or without a port + + Args: + addr (str): address + + Returns: + Tuple[bool, str]: Validity of the address, either + True, address type (IPv4[:Port], IPv6[:Port], Name[:Port]) + False, <error description> + """ + + def _dns_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]: + try: + socket.getaddrinfo(addr, None) + except socket.gaierror: + # not resolvable + return False, 'DNS lookup failed' + return True, 'Name:Port' if port else 'Name' + + def _ip_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]: + unwrapped = unwrap_ipv6(addr) + try: + ip_addr = ipaddress.ip_address(unwrapped) + except ValueError: + return False, 'Invalid IP v4 or v6 address format' + return True, f'IPv{ip_addr.version}:Port' if port else f'IPv{ip_addr.version}' + + dots = addr.count('.') + colons = addr.count(':') + addr_as_url = f'http://{addr}' + + try: + res = urlparse(addr_as_url) + except ValueError as e: + if str(e) == 'Invalid IPv6 URL': + return False, 'Address has incorrect/incomplete use of enclosing brackets' + return False, f'Unknown urlparse error {str(e)} for {addr_as_url}' + + addr = res.netloc + port = None + try: + port = res.port + if port: + addr = addr[:-len(f':{port}')] + except ValueError: + if colons == 1: + return False, 'Port must be numeric' + elif ']:' in addr: + return False, 'Port must be numeric' + + if addr.startswith('[') and dots: + return False, "IPv4 address wrapped in brackets is invalid" + + # catch partial address like 10.8 which would be valid IPaddress schemes + # but are classed as invalid here since they're not usable + if dots and addr[0].isdigit() and dots != 3: + return False, 'Invalid partial IPv4 address' + + if addr[0].isalpha() and '.' in addr: + return _dns_lookup(addr, port) + return _ip_lookup(addr, port) diff --git a/src/python-common/ceph/py.typed b/src/python-common/ceph/py.typed new file mode 100644 index 000000000..444b02d77 --- /dev/null +++ b/src/python-common/ceph/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. This package uses inline types.
\ No newline at end of file diff --git a/src/python-common/ceph/tests/__init__.py b/src/python-common/ceph/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/python-common/ceph/tests/__init__.py diff --git a/src/python-common/ceph/tests/c-v-inventory.json b/src/python-common/ceph/tests/c-v-inventory.json new file mode 100644 index 000000000..c24345525 --- /dev/null +++ b/src/python-common/ceph/tests/c-v-inventory.json @@ -0,0 +1,155 @@ +[ + { + "available": false, + "created": "2022-02-11T10:58:23.177450Z", + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "", + "rotational": "0", + "vendor": "", + "human_readable_size": "50.00 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": {}, + "rev": "", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/dm-0", + "support_discard": "", + "model": "", + "ro": "0", + "nr_requests": "128", + "size": 53687091200 + }, + "lvs": [], + "path": "/dev/dm-0" + }, + { + "available": false, + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "", + "rotational": "0", + "vendor": "", + "human_readable_size": "31.47 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": {}, + "rev": "", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/dm-1", + "support_discard": "", + "model": "", + "ro": "0", + "nr_requests": "128", + "size": 33789313024 + }, + "lvs": [], + "path": "/dev/dm-1" + }, + { + "available": false, + "created": "2022-02-11T10:58:23.177450Z", + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "", + "rotational": "0", + "vendor": "", + "human_readable_size": "394.27 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": {}, + "rev": "", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/dm-2", + "support_discard": "", + "model": "", + "ro": "0", + "nr_requests": "128", + "size": 423347879936 + }, + "lvs": [], + "path": "/dev/dm-2" + }, + { + "available": false, + "rejected_reasons": [ + "locked" + ], + "sys_api": { + "scheduler_mode": "cfq", + "rotational": "0", + "vendor": "ATA", + "human_readable_size": "476.94 GB", + "sectors": 0, + "sas_device_handle": "", + "partitions": { + "sda2": { + "start": "411648", + "holders": [], + "sectorsize": 512, + "sectors": "2097152", + "size": "1024.00 MB" + }, + "sda3": { + "start": "2508800", + "holders": [ + "dm-1", + "dm-2", + "dm-0" + ], + "sectorsize": 512, + "sectors": "997705728", + "size": "475.74 GB" + }, + "sda1": { + "start": "2048", + "holders": [], + "sectorsize": 512, + "sectors": "409600", + "size": "200.00 MB" + } + }, + "rev": "0000", + "sas_address": "", + "locked": 1, + "sectorsize": "512", + "removable": "0", + "path": "/dev/sda", + "support_discard": "", + "model": "SanDisk SD8SN8U5", + "ro": "0", + "nr_requests": "128", + "size": 512110190592 + }, + "lvs": [ + { + "comment": "not used by ceph", + "name": "swap" + }, + { + "comment": "not used by ceph", + "name": "home" + }, + { + "comment": "not used by ceph", + "name": "root" + } + ], + "path": "/dev/sda" + } +] diff --git a/src/python-common/ceph/tests/factories.py b/src/python-common/ceph/tests/factories.py new file mode 100644 index 000000000..6938fd084 --- /dev/null +++ b/src/python-common/ceph/tests/factories.py @@ -0,0 +1,101 @@ +from ceph.deployment.inventory import Device + + +class InventoryFactory(object): + def __init__(self): + self.taken_paths = [] + + def _make_path(self, ident='b'): + return "/dev/{}{}".format(self.prefix, ident) + + def _find_new_path(self): + cnt = 0 + if len(self.taken_paths) >= 25: + raise Exception( + "Double-character disks are not implemetend. Maximum amount" + "of disks reached.") + + while self.path in self.taken_paths: + ident = chr(ord('b') + cnt) + self.path = "/dev/{}{}".format(self.prefix, ident) + cnt += 1 + + def assemble(self): + if self.empty: + return {} + self._find_new_path() + inventory_sample = { + 'available': self.available, + 'lvs': [], + 'path': self.path, + 'rejected_reasons': self.rejected_reason, + 'sys_api': { + 'human_readable_size': self.human_readable_size, + 'locked': 1, + 'model': self.model, + 'nr_requests': '256', + 'partitions': + { # partitions are not as relevant for now, todo for later + 'sda1': { + 'sectors': '41940992', + 'sectorsize': 512, + 'size': self.human_readable_size, + 'start': '2048' + } + }, + 'path': self.path, + 'removable': '0', + 'rev': '', + 'ro': '0', + 'rotational': str(self.rotational), + 'sas_address': '', + 'sas_device_handle': '', + 'scheduler_mode': 'mq-deadline', + 'sectors': 0, + 'sectorsize': '512', + 'size': self.size, + 'support_discard': '', + 'vendor': self.vendor + } + } + + if self.available: + self.taken_paths.append(self.path) + return inventory_sample + return {} + + def _init(self, **kwargs): + self.prefix = 'sd' + self.path = kwargs.get('path', self._make_path()) + self.human_readable_size = kwargs.get('human_readable_size', + '50.00 GB') + self.vendor = kwargs.get('vendor', 'samsung') + self.model = kwargs.get('model', '42-RGB') + self.available = kwargs.get('available', True) + self.rejected_reason = kwargs.get('rejected_reason', ['']) + self.rotational = kwargs.get('rotational', '1') + if not self.available: + self.rejected_reason = ['locked'] + self.empty = kwargs.get('empty', False) + self.size = kwargs.get('size', 5368709121) + + def produce(self, pieces=1, **kwargs): + if kwargs.get('path') and pieces > 1: + raise Exception("/path/ and /pieces/ are mutually exclusive") + # Move to custom init to track _taken_paths. + # class is invoked once in each context. + # if disks with different properties are being created + # we'd have to re-init the class and loose track of the + # taken_paths + self._init(**kwargs) + return [self.assemble() for x in range(0, pieces)] + + +class DeviceFactory(object): + def __init__(self, device_setup): + self.device_setup = device_setup + self.pieces = device_setup.get('pieces', 1) + self.device_conf = device_setup.get('device_config', {}) + + def produce(self): + return [Device(**self.device_conf) for x in range(0, self.pieces)] diff --git a/src/python-common/ceph/tests/test_datetime.py b/src/python-common/ceph/tests/test_datetime.py new file mode 100644 index 000000000..d03a82930 --- /dev/null +++ b/src/python-common/ceph/tests/test_datetime.py @@ -0,0 +1,61 @@ +import datetime + +import pytest + +from ceph.utils import datetime_now, datetime_to_str, str_to_datetime + + +def test_datetime_to_str_1(): + dt = datetime.datetime.now() + assert type(datetime_to_str(dt)) is str + + +def test_datetime_to_str_2(): + # note: tz isn't specified in the string, so explicitly store this as UTC + dt = datetime.datetime.strptime( + '2019-04-24T17:06:53.039991', + '%Y-%m-%dT%H:%M:%S.%f' + ).replace(tzinfo=datetime.timezone.utc) + assert datetime_to_str(dt) == '2019-04-24T17:06:53.039991Z' + + +def test_datetime_to_str_3(): + dt = datetime.datetime.strptime('2020-11-02T04:40:12.748172-0800', + '%Y-%m-%dT%H:%M:%S.%f%z') + assert datetime_to_str(dt) == '2020-11-02T12:40:12.748172Z' + + +def test_str_to_datetime_1(): + dt = str_to_datetime('2020-03-03T09:21:43.636153304Z') + assert type(dt) is datetime.datetime + assert dt.tzinfo is not None + + +def test_str_to_datetime_2(): + dt = str_to_datetime('2020-03-03T15:52:30.136257504-0600') + assert type(dt) is datetime.datetime + assert dt.tzinfo is not None + + +def test_str_to_datetime_3(): + dt = str_to_datetime('2020-03-03T15:52:30.136257504') + assert type(dt) is datetime.datetime + assert dt.tzinfo is not None + + +def test_str_to_datetime_invalid_format_1(): + with pytest.raises(ValueError): + str_to_datetime('2020-03-03 15:52:30.136257504') + + +def test_str_to_datetime_invalid_format_2(): + with pytest.raises(ValueError): + str_to_datetime('2020-03-03') + + +def test_datetime_now_1(): + dt = str_to_datetime('2020-03-03T09:21:43.636153304Z') + dt_now = datetime_now() + assert type(dt_now) is datetime.datetime + assert dt_now.tzinfo is not None + assert dt < dt_now diff --git a/src/python-common/ceph/tests/test_disk_selector.py b/src/python-common/ceph/tests/test_disk_selector.py new file mode 100644 index 000000000..b08236130 --- /dev/null +++ b/src/python-common/ceph/tests/test_disk_selector.py @@ -0,0 +1,560 @@ +# flake8: noqa +import pytest + +from ceph.deployment.drive_selection.matchers import _MatchInvalid +from ceph.deployment.inventory import Devices, Device + +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \ + DriveGroupValidationError + +from ceph.deployment import drive_selection +from ceph.deployment.service_spec import PlacementSpec +from ceph.tests.factories import InventoryFactory +from ceph.tests.utils import _mk_inventory, _mk_device + + +class TestMatcher(object): + """ Test Matcher base class + """ + + def test_get_disk_key_3(self): + """ + virtual is False + key is found + retrun value of key is expected + """ + disk_map = Device(path='/dev/vdb', sys_api={'foo': 'bar'}) + ret = drive_selection.Matcher('foo', 'bar')._get_disk_key(disk_map) + assert ret is disk_map.sys_api.get('foo') + + def test_get_disk_key_4(self): + """ + virtual is False + key is not found + expect raise Exception + """ + disk_map = Device(path='/dev/vdb') + with pytest.raises(Exception): + drive_selection.Matcher('bar', 'foo')._get_disk_key(disk_map) + pytest.fail("No disk_key found for foo or None") + + +class TestSubstringMatcher(object): + def test_compare(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(model='samsung')) + matcher = drive_selection.SubstringMatcher('model', 'samsung') + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_false(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(model='nothing_matching')) + matcher = drive_selection.SubstringMatcher('model', 'samsung') + ret = matcher.compare(disk_dict) + assert ret is False + + +class TestEqualityMatcher(object): + def test_compare(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(rotates='1')) + matcher = drive_selection.EqualityMatcher('rotates', '1') + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_false(self): + disk_dict = Device(path='/dev/vdb', sys_api=dict(rotates='1')) + matcher = drive_selection.EqualityMatcher('rotates', '0') + ret = matcher.compare(disk_dict) + assert ret is False + + +class TestAllMatcher(object): + def test_compare(self): + disk_dict = Device(path='/dev/vdb') + matcher = drive_selection.AllMatcher('all', 'True') + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_value_not_true(self): + disk_dict = Device(path='/dev/vdb') + matcher = drive_selection.AllMatcher('all', 'False') + ret = matcher.compare(disk_dict) + assert ret is True + + +class TestSizeMatcher(object): + def test_parse_filter_exact(self): + """ Testing exact notation with 20G """ + matcher = drive_selection.SizeMatcher('size', '20G') + assert isinstance(matcher.exact, tuple) + assert matcher.exact[0] == '20' + assert matcher.exact[1] == 'GB' + + def test_parse_filter_exact_GB_G(self): + """ Testing exact notation with 20G """ + matcher = drive_selection.SizeMatcher('size', '20GB') + assert isinstance(matcher.exact, tuple) + assert matcher.exact[0] == '20' + assert matcher.exact[1] == 'GB' + + def test_parse_filter_high_low(self): + """ Testing high-low notation with 20G:50G """ + + matcher = drive_selection.SizeMatcher('size', '20G:50G') + assert isinstance(matcher.exact, tuple) + assert matcher.low[0] == '20' + assert matcher.high[0] == '50' + assert matcher.low[1] == 'GB' + assert matcher.high[1] == 'GB' + + def test_parse_filter_max_high(self): + """ Testing high notation with :50G """ + + matcher = drive_selection.SizeMatcher('size', ':50G') + assert isinstance(matcher.exact, tuple) + assert matcher.high[0] == '50' + assert matcher.high[1] == 'GB' + + def test_parse_filter_min_low(self): + """ Testing low notation with 20G: """ + + matcher = drive_selection.SizeMatcher('size', '50G:') + assert isinstance(matcher.exact, tuple) + assert matcher.low[0] == '50' + assert matcher.low[1] == 'GB' + + def test_to_byte_KB(self): + """ I doubt anyone ever thought we'd need to understand KB """ + + ret = drive_selection.SizeMatcher('size', '4K').to_byte(('4', 'KB')) + assert ret == 4 * 1e+3 + + def test_to_byte_GB(self): + """ Pretty nonesense test..""" + + ret = drive_selection.SizeMatcher('size', '10G').to_byte(('10', 'GB')) + assert ret == 10 * 1e+9 + + def test_to_byte_MB(self): + """ Pretty nonesense test..""" + + ret = drive_selection.SizeMatcher('size', '10M').to_byte(('10', 'MB')) + assert ret == 10 * 1e+6 + + def test_to_byte_TB(self): + """ Pretty nonesense test..""" + + ret = drive_selection.SizeMatcher('size', '10T').to_byte(('10', 'TB')) + assert ret == 10 * 1e+12 + + def test_to_byte_PB(self): + """ Expect to raise """ + + with pytest.raises(_MatchInvalid): + drive_selection.SizeMatcher('size', '10P').to_byte(('10', 'PB')) + assert 'Unit \'P\' is not supported' + + def test_compare_exact(self): + + matcher = drive_selection.SizeMatcher('size', '20GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.00 GB')) + ret = matcher.compare(disk_dict) + assert ret is True + + def test_compare_exact_decimal(self): + + matcher = drive_selection.SizeMatcher('size', '20.12GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.12 GB')) + ret = matcher.compare(disk_dict) + assert ret is True + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", False), + ("20.00 GB", True), + ("50.00 GB", True), + ("100.00 GB", True), + ("101.00 GB", False), + ("1101.00 GB", False), + ]) + def test_compare_high_low(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', '20GB:100GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", True), + ("20.00 GB", True), + ("50.00 GB", True), + ("100.00 GB", False), + ("101.00 GB", False), + ("1101.00 GB", False), + ]) + def test_compare_high(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', ':50GB') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", False), + ("20.00 GB", False), + ("50.00 GB", True), + ("100.00 GB", True), + ("101.00 GB", True), + ("1101.00 GB", True), + ]) + def test_compare_low(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', '50GB:') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + @pytest.mark.parametrize("test_input,expected", [ + ("1.00 GB", False), + ("20.00 GB", False), + ("50.00 GB", False), + ("100.00 GB", False), + ("101.00 GB", False), + ("1101.00 GB", True), + ("9.10 TB", True), + ]) + def test_compare_at_least_1TB(self, test_input, expected): + + matcher = drive_selection.SizeMatcher('size', '1TB:') + disk_dict = Device(path='/dev/sdz', sys_api=dict(size=test_input)) + ret = matcher.compare(disk_dict) + assert ret is expected + + def test_compare_raise(self): + + matcher = drive_selection.SizeMatcher('size', 'None') + disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.00 GB')) + with pytest.raises(Exception): + matcher.compare(disk_dict) + pytest.fail("Couldn't parse size") + + @pytest.mark.parametrize("test_input,expected", [ + ("10G", ('10', 'GB')), + ("20GB", ('20', 'GB')), + ("10g", ('10', 'GB')), + ("20gb", ('20', 'GB')), + ]) + def test_get_k_v(self, test_input, expected): + assert drive_selection.SizeMatcher('size', '10G')._get_k_v(test_input) == expected + + @pytest.mark.parametrize("test_input,expected", [ + ("10G", ('GB')), + ("10g", ('GB')), + ("20GB", ('GB')), + ("20gb", ('GB')), + ("20TB", ('TB')), + ("20tb", ('TB')), + ("20T", ('TB')), + ("20t", ('TB')), + ("20MB", ('MB')), + ("20mb", ('MB')), + ("20M", ('MB')), + ("20m", ('MB')), + ]) + def test_parse_suffix(self, test_input, expected): + assert drive_selection.SizeMatcher('size', '10G')._parse_suffix(test_input) == expected + + @pytest.mark.parametrize("test_input,expected", [ + ("G", 'GB'), + ("GB", 'GB'), + ("TB", 'TB'), + ("T", 'TB'), + ("MB", 'MB'), + ("M", 'MB'), + ]) + def test_normalize_suffix(self, test_input, expected): + + assert drive_selection.SizeMatcher('10G', 'size')._normalize_suffix(test_input) == expected + + def test_normalize_suffix_raises(self): + + with pytest.raises(_MatchInvalid): + drive_selection.SizeMatcher('10P', 'size')._normalize_suffix("P") + pytest.fail("Unit 'P' not supported") + + +class TestDriveGroup(object): + @pytest.fixture(scope='class') + def test_fix(self, empty=None): + def make_sample_data(empty=empty, + data_limit=0, + wal_limit=0, + db_limit=0, + osds_per_device='', + disk_format='bluestore'): + raw_sample_bluestore = { + 'service_type': 'osd', + 'service_id': 'foo', + 'placement': {'host_pattern': 'data*'}, + 'data_devices': { + 'size': '30G:50G', + 'model': '42-RGB', + 'vendor': 'samsung', + 'limit': data_limit + }, + 'wal_devices': { + 'model': 'fast', + 'limit': wal_limit + }, + 'db_devices': { + 'size': ':20G', + 'limit': db_limit + }, + 'db_slots': 5, + 'wal_slots': 5, + 'block_wal_size': '5G', + 'block_db_size': '10G', + 'objectstore': disk_format, + 'osds_per_device': osds_per_device, + 'encrypted': True, + } + raw_sample_filestore = { + 'service_type': 'osd', + 'service_id': 'foo', + 'placement': {'host_pattern': 'data*'}, + 'objectstore': 'filestore', + 'data_devices': { + 'size': '30G:50G', + 'model': 'foo', + 'vendor': '1x', + 'limit': data_limit + }, + 'journal_devices': { + 'size': ':20G' + }, + 'journal_size': '5G', + 'osds_per_device': osds_per_device, + 'encrypted': True, + } + if disk_format == 'filestore': + raw_sample = raw_sample_filestore + else: + raw_sample = raw_sample_bluestore + + if empty: + raw_sample = { + 'service_type': 'osd', + 'service_id': 'foo', + 'placement': {'host_pattern': 'data*'}, + 'data_devices': { + 'all': True + }, + } + + dgo = DriveGroupSpec.from_json(raw_sample) + return dgo + + return make_sample_data + + def test_encryption_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.encrypted is True + + def test_encryption_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.encrypted is False + + def test_db_slots_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.db_slots == 5 + + def test_db_slots_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.db_slots is None + + def test_wal_slots_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.wal_slots == 5 + + def test_wal_slots_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.wal_slots is None + + def test_block_wal_size_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.block_wal_size == '5G' + + def test_block_wal_size_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.block_wal_size is None + + def test_block_db_size_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.block_db_size == '10G' + + def test_block_db_size_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.block_db_size is None + + def test_data_devices_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.data_devices == DeviceSelection( + model='42-RGB', + size='30G:50G', + vendor='samsung', + limit=0, + ) + + def test_data_devices_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.db_devices is None + + def test_db_devices_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.db_devices == DeviceSelection( + size=':20G', + limit=0, + ) + + def test_db_devices_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.db_devices is None + + def test_wal_device_prop(self, test_fix): + test_fix = test_fix() + assert test_fix.wal_devices == DeviceSelection( + model='fast', + limit=0, + ) + + def test_wal_device_prop_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.wal_devices is None + + def test_bluestore_format_prop(self, test_fix): + test_fix = test_fix(disk_format='bluestore') + assert test_fix.objectstore == 'bluestore' + + def test_default_format_prop(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.objectstore == 'bluestore' + + def test_osds_per_device(self, test_fix): + test_fix = test_fix(osds_per_device='3') + assert test_fix.osds_per_device == '3' + + def test_osds_per_device_default(self, test_fix): + test_fix = test_fix() + assert test_fix.osds_per_device == '' + + def test_journal_size_empty(self, test_fix): + test_fix = test_fix(empty=True) + assert test_fix.journal_size is None + + @pytest.fixture + def inventory(self, available=True): + def make_sample_data(available=available, + data_devices=10, + wal_devices=0, + db_devices=2, + human_readable_size_data='50.00 GB', + human_readable_size_wal='20.00 GB', + size=5368709121, + human_readable_size_db='20.00 GB'): + factory = InventoryFactory() + inventory_sample = [] + data_disks = factory.produce( + pieces=data_devices, + available=available, + size=size, + human_readable_size=human_readable_size_data) + wal_disks = factory.produce( + pieces=wal_devices, + human_readable_size=human_readable_size_wal, + rotational='0', + model='ssd_type_model', + size=size, + available=available) + db_disks = factory.produce( + pieces=db_devices, + human_readable_size=human_readable_size_db, + rotational='0', + size=size, + model='ssd_type_model', + available=available) + inventory_sample.extend(data_disks) + inventory_sample.extend(wal_disks) + inventory_sample.extend(db_disks) + + return Devices(devices=inventory_sample) + + return make_sample_data + + +class TestDriveSelection(object): + + testdata = [ + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True)), + _mk_inventory(_mk_device() * 5), + ['/dev/sda', '/dev/sdb', '/dev/sdc', '/dev/sdd', '/dev/sde'], [] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True, limit=3), + db_devices=DeviceSelection(all=True) + ), + _mk_inventory(_mk_device() * 5), + ['/dev/sda', '/dev/sdb', '/dev/sdc'], ['/dev/sdd', '/dev/sde'] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ), + _mk_inventory(_mk_device(rotational=False) + _mk_device(rotational=True)), + ['/dev/sdb'], ['/dev/sda'] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ), + _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)), + ['/dev/sda', '/dev/sdb'], ['/dev/sdc'] + ), + ( + DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ), + _mk_inventory(_mk_device(rotational=True)*2), + ['/dev/sda', '/dev/sdb'], [] + ), + ] + + @pytest.mark.parametrize("spec,inventory,expected_data,expected_db", testdata) + def test_disk_selection(self, spec, inventory, expected_data, expected_db): + sel = drive_selection.DriveSelection(spec, inventory) + assert [d.path for d in sel.data_devices()] == expected_data + assert [d.path for d in sel.db_devices()] == expected_db + + def test_disk_selection_raise(self): + spec = DriveGroupSpec( + placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='wrong'), + ) + inventory = _mk_inventory(_mk_device(rotational=True)*2) + m = 'Failed to validate OSD spec "foobar.data_devices": No filters applied' + with pytest.raises(DriveGroupValidationError, match=m): + drive_selection.DriveSelection(spec, inventory)
\ No newline at end of file diff --git a/src/python-common/ceph/tests/test_drive_group.py b/src/python-common/ceph/tests/test_drive_group.py new file mode 100644 index 000000000..2fc66b737 --- /dev/null +++ b/src/python-common/ceph/tests/test_drive_group.py @@ -0,0 +1,336 @@ +# flake8: noqa +import re + +import pytest +import yaml + +from ceph.deployment import drive_selection, translate +from ceph.deployment.hostspec import HostSpec, SpecValidationError +from ceph.deployment.inventory import Device +from ceph.deployment.service_spec import PlacementSpec +from ceph.tests.utils import _mk_inventory, _mk_device +from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \ + DriveGroupValidationError + +@pytest.mark.parametrize("test_input", +[ + ( # new style json + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +data_devices: + paths: + - /dev/sda +""" + ), + ( + """service_type: osd +service_id: testing_drivegroup +placement: + host_pattern: hostname +data_devices: + paths: + - /dev/sda""" + ), +]) +def test_DriveGroup(test_input): + dg = DriveGroupSpec.from_json(yaml.safe_load(test_input)) + assert dg.service_id == 'testing_drivegroup' + assert all([isinstance(x, Device) for x in dg.data_devices.paths]) + assert dg.data_devices.paths[0].path == '/dev/sda' + +@pytest.mark.parametrize("match,test_input", +[ + ( + re.escape('Service Spec is not an (JSON or YAML) object. got "None"'), + '' + ), + ( + 'Failed to validate OSD spec "<unnamed>": `placement` required', + """data_devices: + all: True +""" + ), + ( + 'Failed to validate OSD spec "mydg.data_devices": device selection cannot be empty', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +data_devices: + limit: 1 +""" + ), + ( + 'Failed to validate OSD spec "mydg": filter_logic must be either <AND> or <OR>', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +data_devices: + all: True +filter_logic: XOR +""" + ), + ( + 'Failed to validate OSD spec "mydg": `data_devices` element is required.', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +spec: + db_devices: + model: model +""" + ), + ( + 'Failed to validate OSD spec "mydg.db_devices": Filtering for `unknown_key` is not supported', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +spec: + db_devices: + unknown_key: 1 +""" + ), + ( + 'Failed to validate OSD spec "mydg": Feature `unknown_key` is not supported', """ +service_type: osd +service_id: mydg +placement: + host_pattern: '*' +spec: + db_devices: + all: true + unknown_key: 1 +""" + ), +]) +def test_DriveGroup_fail(match, test_input): + with pytest.raises(SpecValidationError, match=match): + osd_spec = DriveGroupSpec.from_json(yaml.safe_load(test_input)) + osd_spec.validate() + + +def test_drivegroup_pattern(): + dg = DriveGroupSpec( + PlacementSpec(host_pattern='node[1-3]'), + service_id='foobar', + data_devices=DeviceSelection(all=True)) + assert dg.placement.filter_matching_hostspecs([HostSpec('node{}'.format(i)) for i in range(10)]) == ['node1', 'node2', 'node3'] + + +def test_drive_selection(): + devs = DeviceSelection(paths=['/dev/sda']) + spec = DriveGroupSpec( + PlacementSpec('node_name'), + service_id='foobar', + data_devices=devs) + assert all([isinstance(x, Device) for x in spec.data_devices.paths]) + assert spec.data_devices.paths[0].path == '/dev/sda' + + with pytest.raises(DriveGroupValidationError, match='exclusive'): + ds = DeviceSelection(paths=['/dev/sda'], rotational=False) + ds.validate('') + + +def test_ceph_volume_command_0(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True) + ) + spec.validate() + inventory = _mk_inventory(_mk_device()*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_1(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False) + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_2(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='200GB:350GB', rotational=True), + db_devices=DeviceSelection(size='200GB:350GB', rotational=False), + wal_devices=DeviceSelection(size='10G') + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 + + _mk_device(rotational=False, size="300.00 GB")*2 + + _mk_device(size="10.0 GB", rotational=False)*2 + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf ' + '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_3(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='200GB:350GB', rotational=True), + db_devices=DeviceSelection(size='200GB:350GB', rotational=False), + wal_devices=DeviceSelection(size='10G'), + encrypted=True + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 + + _mk_device(rotational=False, size="300.00 GB")*2 + + _mk_device(size="10.0 GB", rotational=False)*2 + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd ' + '--wal-devices /dev/sde /dev/sdf --dmcrypt ' + '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_4(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(size='200GB:350GB', rotational=True), + db_devices=DeviceSelection(size='200GB:350GB', rotational=False), + wal_devices=DeviceSelection(size='10G'), + block_db_size='500M', + block_wal_size='500M', + osds_per_device=3, + encrypted=True + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 + + _mk_device(rotational=False, size="300.00 GB")*2 + + _mk_device(size="10.0 GB", rotational=False)*2 + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb ' + '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf ' + '--block-wal-size 500M --block-db-size 500M --dmcrypt ' + '--osds-per-device 3 --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_5(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + objectstore='filestore' + ) + with pytest.raises(DriveGroupValidationError): + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_6(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=False), + journal_devices=DeviceSelection(rotational=True), + journal_size='500M', + objectstore='filestore' + ) + with pytest.raises(DriveGroupValidationError): + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd ' + '--journal-size 500M --journal-devices /dev/sda /dev/sdb ' + '--filestore --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_7(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True), + osd_id_claims={'host1': ['0', '1']} + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True)*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, ['0', '1']).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_8(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True, model='INTEL SSDS'), + db_devices=DeviceSelection(model='INTEL SSDP'), + filter_logic='OR', + osd_id_claims={} + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True, size='1.82 TB', model='ST2000DM001-1ER1') + # data + _mk_device(rotational=False, size="223.0 GB", model='INTEL SSDSC2KG24') + # data + _mk_device(rotational=False, size="349.0 GB", model='INTEL SSDPED1K375GA') # wal/db + ) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + + +def test_ceph_volume_command_9(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(all=True), + data_allocate_fraction=0.8 + ) + spec.validate() + inventory = _mk_inventory(_mk_device()*2) + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}' + +def test_raw_ceph_volume_command_0(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False), + method='raw', + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True) + # data + _mk_device(rotational=True) + # data + _mk_device(rotational=False) + # db + _mk_device(rotational=False) # db + ) + exp_cmds = ['raw prepare --bluestore --data /dev/sda --block.db /dev/sdc', 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdd'] + sel = drive_selection.DriveSelection(spec, inventory) + cmds = translate.to_ceph_volume(sel, []).run() + assert all(cmd in exp_cmds for cmd in cmds), f'Expected {exp_cmds} to match {cmds}' + +def test_raw_ceph_volume_command_1(): + spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'), + service_id='foobar', + data_devices=DeviceSelection(rotational=True), + db_devices=DeviceSelection(rotational=False), + method='raw', + ) + spec.validate() + inventory = _mk_inventory(_mk_device(rotational=True) + # data + _mk_device(rotational=True) + # data + _mk_device(rotational=False) # db + ) + sel = drive_selection.DriveSelection(spec, inventory) + with pytest.raises(ValueError): + cmds = translate.to_ceph_volume(sel, []).run() diff --git a/src/python-common/ceph/tests/test_hostspec.py b/src/python-common/ceph/tests/test_hostspec.py new file mode 100644 index 000000000..b6817579e --- /dev/null +++ b/src/python-common/ceph/tests/test_hostspec.py @@ -0,0 +1,40 @@ +# flake8: noqa +import json +import yaml + +import pytest + +from ceph.deployment.hostspec import HostSpec, SpecValidationError + + +@pytest.mark.parametrize( + "test_input,expected", + [ + ({"hostname": "foo"}, HostSpec('foo')), + ({"hostname": "foo", "labels": "l1"}, HostSpec('foo', labels=['l1'])), + ({"hostname": "foo", "labels": ["l1", "l2"]}, HostSpec('foo', labels=['l1', 'l2'])), + ({"hostname": "foo", "location": {"rack": "foo"}}, HostSpec('foo', location={'rack': 'foo'})), + ] +) +def test_parse_host_specs(test_input, expected): + hs = HostSpec.from_json(test_input) + assert hs == expected + + +@pytest.mark.parametrize( + "bad_input", + [ + ({"hostname": "foo", "labels": 124}), + ({"hostname": "foo", "labels": {"a", "b"}}), + ({"hostname": "foo", "labels": {"a", "b"}}), + ({"hostname": "foo", "labels": ["a", 2]}), + ({"hostname": "foo", "location": "rack=bar"}), + ({"hostname": "foo", "location": ["a"]}), + ({"hostname": "foo", "location": {"rack", 1}}), + ({"hostname": "foo", "location": {1: "rack"}}), + ] +) +def test_parse_host_specs(bad_input): + with pytest.raises(SpecValidationError): + hs = HostSpec.from_json(bad_input) + diff --git a/src/python-common/ceph/tests/test_inventory.py b/src/python-common/ceph/tests/test_inventory.py new file mode 100644 index 000000000..69c1c306c --- /dev/null +++ b/src/python-common/ceph/tests/test_inventory.py @@ -0,0 +1,21 @@ +import json +import os +import pytest + +from ceph.deployment.inventory import Devices + + +@pytest.mark.parametrize("filename", + [ + os.path.dirname(__file__) + '/c-v-inventory.json', + os.path.dirname(__file__) + '/../../../pybind/mgr/test_orchestrator/du' + 'mmy_data.json', + ]) +def test_from_json(filename): + with open(filename) as f: + data = json.load(f) + if 'inventory' in data: + data = data['inventory'] + ds = Devices.from_json(data) + assert len(ds.devices) == len(data) + assert Devices.from_json(ds.to_json()) == ds diff --git a/src/python-common/ceph/tests/test_service_spec.py b/src/python-common/ceph/tests/test_service_spec.py new file mode 100644 index 000000000..d3fb43296 --- /dev/null +++ b/src/python-common/ceph/tests/test_service_spec.py @@ -0,0 +1,866 @@ +# flake8: noqa +import json +import re + +import yaml + +import pytest + +from ceph.deployment.service_spec import HostPlacementSpec, PlacementSpec, \ + ServiceSpec, RGWSpec, NFSServiceSpec, IscsiServiceSpec, AlertManagerSpec, \ + CustomContainerSpec +from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.hostspec import SpecValidationError + + +@pytest.mark.parametrize("test_input,expected, require_network", + [("myhost", ('myhost', '', ''), False), + ("myhost=sname", ('myhost', '', 'sname'), False), + ("myhost:10.1.1.10", ('myhost', '10.1.1.10', ''), True), + ("myhost:10.1.1.10=sname", ('myhost', '10.1.1.10', 'sname'), True), + ("myhost:10.1.1.0/32", ('myhost', '10.1.1.0/32', ''), True), + ("myhost:10.1.1.0/32=sname", ('myhost', '10.1.1.0/32', 'sname'), True), + ("myhost:[v1:10.1.1.10:6789]", ('myhost', '[v1:10.1.1.10:6789]', ''), True), + ("myhost:[v1:10.1.1.10:6789]=sname", ('myhost', '[v1:10.1.1.10:6789]', 'sname'), True), + ("myhost:[v1:10.1.1.10:6789,v2:10.1.1.11:3000]", ('myhost', '[v1:10.1.1.10:6789,v2:10.1.1.11:3000]', ''), True), + ("myhost:[v1:10.1.1.10:6789,v2:10.1.1.11:3000]=sname", ('myhost', '[v1:10.1.1.10:6789,v2:10.1.1.11:3000]', 'sname'), True), + ]) +def test_parse_host_placement_specs(test_input, expected, require_network): + ret = HostPlacementSpec.parse(test_input, require_network=require_network) + assert ret == expected + assert str(ret) == test_input + + ps = PlacementSpec.from_string(test_input) + assert ps.pretty_str() == test_input + assert ps == PlacementSpec.from_string(ps.pretty_str()) + + # Testing the old verbose way of generating json. Don't remove: + assert ret == HostPlacementSpec.from_json({ + 'hostname': ret.hostname, + 'network': ret.network, + 'name': ret.name + }) + + assert ret == HostPlacementSpec.from_json(ret.to_json()) + + + + +@pytest.mark.parametrize( + "test_input,expected", + [ + ('', "PlacementSpec()"), + ("count:2", "PlacementSpec(count=2)"), + ("3", "PlacementSpec(count=3)"), + ("host1 host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ("host1;host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ("host1,host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ("host1 host2=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='b')])"), + ("host1=a host2=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name='a'), HostPlacementSpec(hostname='host2', network='', name='b')])"), + ("host1:1.2.3.4=a host2:1.2.3.5=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='1.2.3.4', name='a'), HostPlacementSpec(hostname='host2', network='1.2.3.5', name='b')])"), + ("myhost:[v1:10.1.1.10:6789]", "PlacementSpec(hosts=[HostPlacementSpec(hostname='myhost', network='[v1:10.1.1.10:6789]', name='')])"), + ('2 host1 host2', "PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"), + ('label:foo', "PlacementSpec(label='foo')"), + ('3 label:foo', "PlacementSpec(count=3, label='foo')"), + ('*', "PlacementSpec(host_pattern='*')"), + ('3 data[1-3]', "PlacementSpec(count=3, host_pattern='data[1-3]')"), + ('3 data?', "PlacementSpec(count=3, host_pattern='data?')"), + ('3 data*', "PlacementSpec(count=3, host_pattern='data*')"), + ("count-per-host:4 label:foo", "PlacementSpec(count_per_host=4, label='foo')"), + ]) +def test_parse_placement_specs(test_input, expected): + ret = PlacementSpec.from_string(test_input) + assert str(ret) == expected + assert PlacementSpec.from_string(ret.pretty_str()) == ret, f'"{ret.pretty_str()}" != "{test_input}"' + +@pytest.mark.parametrize( + "test_input", + [ + ("host=a host*"), + ("host=a label:wrong"), + ("host? host*"), + ('host=a count-per-host:0'), + ('host=a count-per-host:-10'), + ('count:2 count-per-host:1'), + ('host1=a host2=b count-per-host:2'), + ('host1:10/8 count-per-host:2'), + ('count-per-host:2'), + ] +) +def test_parse_placement_specs_raises(test_input): + with pytest.raises(SpecValidationError): + PlacementSpec.from_string(test_input) + +@pytest.mark.parametrize("test_input", + # wrong subnet + [("myhost:1.1.1.1/24"), + # wrong ip format + ("myhost:1"), + ]) +def test_parse_host_placement_specs_raises_wrong_format(test_input): + with pytest.raises(ValueError): + HostPlacementSpec.parse(test_input) + + +@pytest.mark.parametrize( + "p,hosts,size", + [ + ( + PlacementSpec(count=3), + ['host1', 'host2', 'host3', 'host4', 'host5'], + 3 + ), + ( + PlacementSpec(host_pattern='*'), + ['host1', 'host2', 'host3', 'host4', 'host5'], + 5 + ), + ( + PlacementSpec(count_per_host=2, host_pattern='*'), + ['host1', 'host2', 'host3', 'host4', 'host5'], + 10 + ), + ( + PlacementSpec(host_pattern='foo*'), + ['foo1', 'foo2', 'bar1', 'bar2'], + 2 + ), + ( + PlacementSpec(count_per_host=2, host_pattern='foo*'), + ['foo1', 'foo2', 'bar1', 'bar2'], + 4 + ), + ]) +def test_placement_target_size(p, hosts, size): + assert p.get_target_count( + [HostPlacementSpec(n, '', '') for n in hosts] + ) == size + + +def _get_dict_spec(s_type, s_id): + dict_spec = { + "service_id": s_id, + "service_type": s_type, + "placement": + dict(hosts=["host1:1.1.1.1"]) + } + if s_type == 'nfs': + pass + elif s_type == 'iscsi': + dict_spec['pool'] = 'pool' + dict_spec['api_user'] = 'api_user' + dict_spec['api_password'] = 'api_password' + elif s_type == 'osd': + dict_spec['spec'] = { + 'data_devices': { + 'all': True + } + } + elif s_type == 'rgw': + dict_spec['rgw_realm'] = 'realm' + dict_spec['rgw_zone'] = 'zone' + + return dict_spec + + +@pytest.mark.parametrize( + "s_type,o_spec,s_id", + [ + ("mgr", ServiceSpec, 'test'), + ("mon", ServiceSpec, 'test'), + ("mds", ServiceSpec, 'test'), + ("rgw", RGWSpec, 'realm.zone'), + ("nfs", NFSServiceSpec, 'test'), + ("iscsi", IscsiServiceSpec, 'test'), + ("osd", DriveGroupSpec, 'test'), + ]) +def test_servicespec_map_test(s_type, o_spec, s_id): + spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id)) + assert isinstance(spec, o_spec) + assert isinstance(spec.placement, PlacementSpec) + assert isinstance(spec.placement.hosts[0], HostPlacementSpec) + assert spec.placement.hosts[0].hostname == 'host1' + assert spec.placement.hosts[0].network == '1.1.1.1' + assert spec.placement.hosts[0].name == '' + assert spec.validate() is None + ServiceSpec.from_json(spec.to_json()) + +def test_osd_unmanaged(): + osd_spec = {"placement": {"host_pattern": "*"}, + "service_id": "all-available-devices", + "service_name": "osd.all-available-devices", + "service_type": "osd", + "spec": {"data_devices": {"all": True}, "filter_logic": "AND", "objectstore": "bluestore"}, + "unmanaged": True} + + dg_spec = ServiceSpec.from_json(osd_spec) + assert dg_spec.unmanaged == True + + +@pytest.mark.parametrize("y", +"""service_type: crash +service_name: crash +placement: + host_pattern: '*' +--- +service_type: crash +service_name: crash +placement: + host_pattern: '*' +unmanaged: true +--- +service_type: rgw +service_id: default-rgw-realm.eu-central-1.1 +service_name: rgw.default-rgw-realm.eu-central-1.1 +placement: + hosts: + - ceph-001 +networks: +- 10.0.0.0/8 +- 192.168.0.0/16 +spec: + rgw_frontend_type: civetweb + rgw_realm: default-rgw-realm + rgw_zone: eu-central-1 +--- +service_type: osd +service_id: osd_spec_default +service_name: osd.osd_spec_default +placement: + host_pattern: '*' +spec: + data_devices: + model: MC-55-44-XZ + db_devices: + model: SSD-123-foo + filter_logic: AND + objectstore: bluestore + wal_devices: + model: NVME-QQQQ-987 +--- +service_type: alertmanager +service_name: alertmanager +spec: + port: 1234 + user_data: + default_webhook_urls: + - foo +--- +service_type: grafana +service_name: grafana +spec: + port: 1234 +--- +service_type: grafana +service_name: grafana +spec: + initial_admin_password: secure + port: 1234 +--- +service_type: ingress +service_id: rgw.foo +service_name: ingress.rgw.foo +placement: + hosts: + - host1 + - host2 + - host3 +spec: + backend_service: rgw.foo + frontend_port: 8080 + monitor_port: 8081 + virtual_ip: 192.168.20.1/24 +--- +service_type: nfs +service_id: mynfs +service_name: nfs.mynfs +spec: + port: 1234 +--- +service_type: iscsi +service_id: iscsi +service_name: iscsi.iscsi +networks: +- ::0/8 +spec: + api_user: api_user + pool: pool + trusted_ip_list: + - ::1 + - ::2 +--- +service_type: container +service_id: hello-world +service_name: container.hello-world +spec: + args: + - --foo + bind_mounts: + - - type=bind + - source=lib/modules + - destination=/lib/modules + - ro=true + dirs: + - foo + - bar + entrypoint: /usr/bin/bash + envs: + - FOO=0815 + files: + bar.conf: + - foo + - bar + foo.conf: 'foo + + bar' + gid: 2000 + image: docker.io/library/hello-world:latest + ports: + - 8080 + - 8443 + uid: 1000 + volume_mounts: + foo: /foo +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + snmp_destination: 192.168.1.42:162 + snmp_version: V2c +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + auth_protocol: MD5 + credentials: + snmp_v3_auth_password: mypassword + snmp_v3_auth_username: myuser + engine_id: 8000C53F00000000 + port: 9464 + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_password: mypassword + snmp_v3_auth_username: myuser + snmp_v3_priv_password: mysecret + engine_id: 8000C53F00000000 + privacy_protocol: AES + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""".split('---\n')) +def test_yaml(y): + data = yaml.safe_load(y) + object = ServiceSpec.from_json(data) + + assert yaml.dump(object) == y + assert yaml.dump(ServiceSpec.from_json(object.to_json())) == y + + +def test_alertmanager_spec_1(): + spec = AlertManagerSpec() + assert spec.service_type == 'alertmanager' + assert isinstance(spec.user_data, dict) + assert len(spec.user_data.keys()) == 0 + assert spec.get_port_start() == [9093, 9094] + + +def test_alertmanager_spec_2(): + spec = AlertManagerSpec(user_data={'default_webhook_urls': ['foo']}) + assert isinstance(spec.user_data, dict) + assert 'default_webhook_urls' in spec.user_data.keys() + + + +def test_repr(): + val = """ServiceSpec.from_json(yaml.safe_load('''service_type: crash +service_name: crash +placement: + count: 42 +'''))""" + obj = eval(val) + assert obj.service_type == 'crash' + assert val == repr(obj) + +@pytest.mark.parametrize("spec1, spec2, eq", + [ + ( + ServiceSpec( + service_type='mon' + ), + ServiceSpec( + service_type='mon' + ), + True + ), + ( + ServiceSpec( + service_type='mon' + ), + ServiceSpec( + service_type='mon', + service_id='foo' + ), + True + ), + # Add service_type='mgr' + ( + ServiceSpec( + service_type='osd' + ), + ServiceSpec( + service_type='osd', + ), + True + ), + ( + ServiceSpec( + service_type='osd' + ), + DriveGroupSpec(), + True + ), + ( + ServiceSpec( + service_type='osd' + ), + ServiceSpec( + service_type='osd', + service_id='foo', + ), + False + ), + ( + ServiceSpec( + service_type='rgw', + service_id='foo', + ), + RGWSpec(service_id='foo'), + True + ), + ]) +def test_spec_hash_eq(spec1: ServiceSpec, + spec2: ServiceSpec, + eq: bool): + + assert (spec1 == spec2) is eq + +@pytest.mark.parametrize( + "s_type,s_id,s_name", + [ + ('mgr', 's_id', 'mgr'), + ('mon', 's_id', 'mon'), + ('mds', 's_id', 'mds.s_id'), + ('rgw', 's_id', 'rgw.s_id'), + ('nfs', 's_id', 'nfs.s_id'), + ('iscsi', 's_id', 'iscsi.s_id'), + ('osd', 's_id', 'osd.s_id'), + ]) +def test_service_name(s_type, s_id, s_name): + spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id)) + spec.validate() + assert spec.service_name() == s_name + +@pytest.mark.parametrize( + 's_type,s_id', + [ + ('mds', 's:id'), # MDS service_id cannot contain an invalid char ':' + ('mds', '1abc'), # MDS service_id cannot start with a numeric digit + ('mds', ''), # MDS service_id cannot be empty + ('rgw', '*s_id'), + ('nfs', 's/id'), + ('iscsi', 's@id'), + ('osd', 's;id'), + ]) + +def test_service_id_raises_invalid_char(s_type, s_id): + with pytest.raises(SpecValidationError): + spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id)) + spec.validate() + +def test_custom_container_spec(): + spec = CustomContainerSpec(service_id='hello-world', + image='docker.io/library/hello-world:latest', + entrypoint='/usr/bin/bash', + uid=1000, + gid=2000, + volume_mounts={'foo': '/foo'}, + args=['--foo'], + envs=['FOO=0815'], + bind_mounts=[ + [ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ] + ], + ports=[8080, 8443], + dirs=['foo', 'bar'], + files={ + 'foo.conf': 'foo\nbar', + 'bar.conf': ['foo', 'bar'] + }) + assert spec.service_type == 'container' + assert spec.entrypoint == '/usr/bin/bash' + assert spec.uid == 1000 + assert spec.gid == 2000 + assert spec.volume_mounts == {'foo': '/foo'} + assert spec.args == ['--foo'] + assert spec.envs == ['FOO=0815'] + assert spec.bind_mounts == [ + [ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ] + ] + assert spec.ports == [8080, 8443] + assert spec.dirs == ['foo', 'bar'] + assert spec.files == { + 'foo.conf': 'foo\nbar', + 'bar.conf': ['foo', 'bar'] + } + + +def test_custom_container_spec_config_json(): + spec = CustomContainerSpec(service_id='foo', image='foo', dirs=None) + config_json = spec.config_json() + for key in ['entrypoint', 'uid', 'gid', 'bind_mounts', 'dirs']: + assert key not in config_json + + +def test_ingress_spec(): + yaml_str = """service_type: ingress +service_id: rgw.foo +placement: + hosts: + - host1 + - host2 + - host3 +spec: + virtual_ip: 192.168.20.1/24 + backend_service: rgw.foo + frontend_port: 8080 + monitor_port: 8081 +""" + yaml_file = yaml.safe_load(yaml_str) + spec = ServiceSpec.from_json(yaml_file) + assert spec.service_type == "ingress" + assert spec.service_id == "rgw.foo" + assert spec.virtual_ip == "192.168.20.1/24" + assert spec.frontend_port == 8080 + assert spec.monitor_port == 8081 + + +@pytest.mark.parametrize("y, error_match", [ + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: "twelve" +""", "count-per-host must be a numeric value",), + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: "2" +""", "count-per-host must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: 7.36 +""", "count-per-host must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count: "fifteen" +""", "num/count must be a numeric value",), + (""" +service_type: rgw +service_id: foo +placement: + count: "4" +""", "num/count must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count: 7.36 +""", "num/count must be an integer value",), + (""" +service_type: rgw +service_id: foo +placement: + count: 0 +""", "num/count must be >= 1",), + (""" +service_type: rgw +service_id: foo +placement: + count_per_host: 0 +""", "count-per-host must be >= 1",), + (""" +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_password: mypassword + snmp_v3_auth_username: myuser + snmp_v3_priv_password: mysecret + port: 9464 + engine_id: 8000c53f0000000000 + privacy_protocol: WEIRD + snmp_destination: 192.168.122.1:162 + auth_protocol: BIZARRE + snmp_version: V3 +""", "auth_protocol unsupported. Must be one of MD5, SHA"), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + snmp_destination: 192.168.1.42:162 + snmp_version: V4 +""", 'snmp_version unsupported. Must be one of V2c, V3'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + port: 9464 + snmp_destination: 192.168.1.42:162 +""", re.escape('Missing SNMP version (snmp_version)')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + port: 9464 + auth_protocol: wah + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'auth_protocol unsupported. Must be one of MD5, SHA'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: weewah + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'privacy_protocol unsupported. Must be one of DES, AES'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'Must provide an engine_id for SNMP V3 notifications'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_community: public + port: 9464 + snmp_destination: 192.168.1.42 + snmp_version: V2c +""", re.escape('SNMP destination (snmp_destination) type (IPv4) is invalid. Must be either: IPv4:Port, Name:Port')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: bogus + snmp_destination: 192.168.1.42:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + port: 9464 + auth_protocol: SHA + engine_id: 8000C53F0000000000 + snmp_version: V3 +""", re.escape('SNMP destination (snmp_destination) must be provided')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53F0000000000 + snmp_destination: my.imaginary.snmp-host + snmp_version: V3 +""", re.escape('SNMP destination (snmp_destination) is invalid: DNS lookup failed')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53F0000000000 + snmp_destination: 10.79.32.10:fred + snmp_version: V3 +""", re.escape('SNMP destination (snmp_destination) is invalid: Port must be numeric')), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53 + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53DOH! + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53FCA7344403DC611EC9B985254002537A6C53FCA7344403DC6112537A60 + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + (""" +--- +service_type: snmp-gateway +service_name: snmp-gateway +placement: + count: 1 +spec: + credentials: + snmp_v3_auth_username: myuser + snmp_v3_auth_password: mypassword + snmp_v3_priv_password: mysecret + port: 9464 + auth_protocol: SHA + privacy_protocol: AES + engine_id: 8000C53F00000 + snmp_destination: 10.79.32.10:162 + snmp_version: V3 +""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'), + ]) +def test_service_spec_validation_error(y, error_match): + data = yaml.safe_load(y) + with pytest.raises(SpecValidationError) as err: + specObj = ServiceSpec.from_json(data) + assert err.match(error_match) diff --git a/src/python-common/ceph/tests/test_utils.py b/src/python-common/ceph/tests/test_utils.py new file mode 100644 index 000000000..8a94ac400 --- /dev/null +++ b/src/python-common/ceph/tests/test_utils.py @@ -0,0 +1,75 @@ +import pytest + +from ceph.deployment.utils import is_ipv6, unwrap_ipv6, wrap_ipv6, valid_addr +from typing import NamedTuple + + +def test_is_ipv6(): + for good in ("[::1]", "::1", + "fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"): + assert is_ipv6(good) + for bad in ("127.0.0.1", + "ffff:ffff:ffff:ffff:ffff:ffff:ffff:fffg", + "1:2:3:4:5:6:7:8:9", "fd00::1::1", "[fg::1]"): + assert not is_ipv6(bad) + + +def test_unwrap_ipv6(): + def unwrap_test(address, expected): + assert unwrap_ipv6(address) == expected + + tests = [ + ('::1', '::1'), ('[::1]', '::1'), + ('[fde4:8dba:82e1:0:5054:ff:fe6a:357]', 'fde4:8dba:82e1:0:5054:ff:fe6a:357'), + ('can actually be any string', 'can actually be any string'), + ('[but needs to be stripped] ', '[but needs to be stripped] ')] + for address, expected in tests: + unwrap_test(address, expected) + + +def test_wrap_ipv6(): + def wrap_test(address, expected): + assert wrap_ipv6(address) == expected + + tests = [ + ('::1', '[::1]'), ('[::1]', '[::1]'), + ('fde4:8dba:82e1:0:5054:ff:fe6a:357', '[fde4:8dba:82e1:0:5054:ff:fe6a:357]'), + ('myhost.example.com', 'myhost.example.com'), ('192.168.0.1', '192.168.0.1'), + ('', ''), ('fd00::1::1', 'fd00::1::1')] + for address, expected in tests: + wrap_test(address, expected) + + +class Address(NamedTuple): + addr: str + status: bool + description: str + + +@pytest.mark.parametrize('addr_object', [ + Address('www.ibm.com', True, 'Name'), + Address('www.google.com:162', True, 'Name:Port'), + Address('my.big.domain.name.for.big.people', False, 'DNS lookup failed'), + Address('192.168.122.1', True, 'IPv4'), + Address('[192.168.122.1]', False, 'IPv4 address wrapped in brackets is invalid'), + Address('10.40003.200', False, 'Invalid partial IPv4 address'), + Address('10.7.5', False, 'Invalid partial IPv4 address'), + Address('10.7', False, 'Invalid partial IPv4 address'), + Address('192.168.122.5:7090', True, 'IPv4:Port'), + Address('fe80::7561:c8fb:d3d7:1fa4', True, 'IPv6'), + Address('[fe80::7561:c8fb:d3d7:1fa4]:9464', True, 'IPv6:Port'), + Address('[fe80::7561:c8fb:d3d7:1fa4]', True, 'IPv6'), + Address('[fe80::7561:c8fb:d3d7:1fa4', False, + 'Address has incorrect/incomplete use of enclosing brackets'), + Address('fe80::7561:c8fb:d3d7:1fa4]', False, + 'Address has incorrect/incomplete use of enclosing brackets'), + Address('fred.knockinson.org', False, 'DNS lookup failed'), + Address('tumbleweed.pickles.gov.au', False, 'DNS lookup failed'), + Address('192.168.122.5:00PS', False, 'Port must be numeric'), + Address('[fe80::7561:c8fb:d3d7:1fa4]:DOH', False, 'Port must be numeric') +]) +def test_valid_addr(addr_object: Address): + + valid, description = valid_addr(addr_object.addr) + assert valid == addr_object.status + assert description == addr_object.description diff --git a/src/python-common/ceph/tests/utils.py b/src/python-common/ceph/tests/utils.py new file mode 100644 index 000000000..04b8a4e38 --- /dev/null +++ b/src/python-common/ceph/tests/utils.py @@ -0,0 +1,46 @@ +from ceph.deployment.inventory import Devices, Device + +try: + from typing import Any, List +except ImportError: + pass # for type checking + + +def _mk_device(rotational=True, + locked=False, + size="394.27 GB", + vendor='Vendor', + model='Model'): + return [Device( + path='??', + sys_api={ + "rotational": '1' if rotational else '0', + "vendor": vendor, + "human_readable_size": size, + "partitions": {}, + "locked": int(locked), + "sectorsize": "512", + "removable": "0", + "path": "??", + "support_discard": "", + "model": model, + "ro": "0", + "nr_requests": "128", + "size": 423347879936 # ignore coversion from human_readable_size + }, + available=not locked, + rejected_reasons=['locked'] if locked else [], + lvs=[], + device_id="Model-Vendor-foobar" + )] + + +def _mk_inventory(devices): + # type: (Any) -> List[Device] + devs = [] + for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))): + dev = Device.from_json(dev_.to_json()) + dev.path = '/dev/sd' + name + dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name) + devs.append(dev) + return Devices(devices=devs).devices diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py new file mode 100644 index 000000000..643be0658 --- /dev/null +++ b/src/python-common/ceph/utils.py @@ -0,0 +1,123 @@ +import datetime +import re +import string + +from typing import Optional + + +def datetime_now() -> datetime.datetime: + """ + Return the current local date and time. + :return: Returns an aware datetime object of the current date + and time. + """ + return datetime.datetime.now(tz=datetime.timezone.utc) + + +def datetime_to_str(dt: datetime.datetime) -> str: + """ + Convert a datetime object into a ISO 8601 string, e.g. + '2019-04-24T17:06:53.039991Z'. + :param dt: The datetime object to process. + :return: Return a string representing the date in + ISO 8601 (timezone=UTC). + """ + return dt.astimezone(tz=datetime.timezone.utc).strftime( + '%Y-%m-%dT%H:%M:%S.%fZ') + + +def str_to_datetime(string: str) -> datetime.datetime: + """ + Convert an ISO 8601 string into a datetime object. + The following formats are supported: + + - 2020-03-03T09:21:43.636153304Z + - 2020-03-03T15:52:30.136257504-0600 + - 2020-03-03T15:52:30.136257504 + + :param string: The string to parse. + :return: Returns an aware datetime object of the given date + and time string. + :raises: :exc:`~exceptions.ValueError` for an unknown + datetime string. + """ + fmts = [ + '%Y-%m-%dT%H:%M:%S.%f', + '%Y-%m-%dT%H:%M:%S.%f%z' + ] + + # In *all* cases, the 9 digit second precision is too much for + # Python's strptime. Shorten it to 6 digits. + p = re.compile(r'(\.[\d]{6})[\d]*') + string = p.sub(r'\1', string) + + # Replace trailing Z with -0000, since (on Python 3.6.8) it + # won't parse. + if string and string[-1] == 'Z': + string = string[:-1] + '-0000' + + for fmt in fmts: + try: + dt = datetime.datetime.strptime(string, fmt) + # Make sure the datetime object is aware (timezone is set). + # If not, then assume the time is in UTC. + if dt.tzinfo is None: + dt = dt.replace(tzinfo=datetime.timezone.utc) + return dt + except ValueError: + pass + + raise ValueError("Time data {} does not match one of the formats {}".format( + string, str(fmts))) + + +def parse_timedelta(delta: str) -> Optional[datetime.timedelta]: + """ + Returns a timedelta object represents a duration, the difference + between two dates or times. + + >>> parse_timedelta('foo') + + >>> parse_timedelta('2d') == datetime.timedelta(days=2) + True + + >>> parse_timedelta("4w") == datetime.timedelta(days=28) + True + + >>> parse_timedelta("5s") == datetime.timedelta(seconds=5) + True + + >>> parse_timedelta("-5s") == datetime.timedelta(days=-1, seconds=86395) + True + + :param delta: The string to process, e.g. '2h', '10d', '30s'. + :return: The `datetime.timedelta` object or `None` in case of + a parsing error. + """ + parts = re.match(r'(?P<seconds>-?\d+)s|' + r'(?P<minutes>-?\d+)m|' + r'(?P<hours>-?\d+)h|' + r'(?P<days>-?\d+)d|' + r'(?P<weeks>-?\d+)w$', + delta, + re.IGNORECASE) + if not parts: + return None + parts = parts.groupdict() + args = {name: int(param) for name, param in parts.items() if param} + return datetime.timedelta(**args) + + +def is_hex(s: str, strict: bool = True) -> bool: + """Simple check that a string contains only hex chars""" + try: + int(s, 16) + except ValueError: + return False + + # s is multiple chars, but we should catch a '+/-' prefix too. + if strict: + if s[0] not in string.hexdigits: + return False + + return True diff --git a/src/python-common/requirements-lint.txt b/src/python-common/requirements-lint.txt new file mode 100644 index 000000000..2a7142182 --- /dev/null +++ b/src/python-common/requirements-lint.txt @@ -0,0 +1,2 @@ +flake8==3.7.8 +rstcheck==3.3.1 diff --git a/src/python-common/requirements.txt b/src/python-common/requirements.txt new file mode 100644 index 000000000..432fcd5e3 --- /dev/null +++ b/src/python-common/requirements.txt @@ -0,0 +1,7 @@ +pytest >=2.1.3,<5; python_version < '3.5' +mock; python_version < '3.3' +mypy==0.790; python_version >= '3' +pytest-mypy; python_version >= '3' +pytest >= 2.1.3; python_version >= '3' +pyyaml +typing-extensions; python_version < '3.8' diff --git a/src/python-common/setup.py b/src/python-common/setup.py new file mode 100644 index 000000000..43a46eb10 --- /dev/null +++ b/src/python-common/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages + + +with open("README.rst", "r") as fh: + long_description = fh.read() + + +setup( + name='ceph', + version='1.0.0', + packages=find_packages(), + author='', + author_email='dev@ceph.io', + description='Ceph common library', + long_description=long_description, + license='LGPLv2+', + keywords='ceph', + url="https://github.com/ceph/ceph", + zip_safe = False, + install_requires=( + 'pyyaml', + ), + classifiers = [ + 'Intended Audience :: Developer', + 'Operating System :: POSIX :: Linux', + 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + ] +) diff --git a/src/python-common/tox.ini b/src/python-common/tox.ini new file mode 100644 index 000000000..5869b6ae2 --- /dev/null +++ b/src/python-common/tox.ini @@ -0,0 +1,27 @@ +[tox] +envlist = py3, lint +skip_missing_interpreters = true + +[testenv:py3] +deps= + -rrequirements.txt +commands= + pytest --doctest-modules ceph/deployment/service_spec.py ceph/utils.py + pytest {posargs} + mypy --config-file=../mypy.ini -p ceph + +[tool:pytest] +norecursedirs = .* _* virtualenv + +[flake8] +max-line-length = 100 +exclude = + __pycache__ + +[testenv:lint] +deps = + -rrequirements-lint.txt +commands = + flake8 {posargs:ceph} + rstcheck --report info --debug README.rst + |