From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/python-common/.gitignore | 3 + src/python-common/CMakeLists.txt | 7 + src/python-common/README.rst | 22 + src/python-common/ceph/__init__.py | 0 src/python-common/ceph/deployment/__init__.py | 0 src/python-common/ceph/deployment/drive_group.py | 348 +++++ .../ceph/deployment/drive_selection/__init__.py | 2 + .../ceph/deployment/drive_selection/example.yaml | 21 + .../ceph/deployment/drive_selection/filter.py | 34 + .../ceph/deployment/drive_selection/matchers.py | 412 ++++++ .../ceph/deployment/drive_selection/selector.py | 179 +++ src/python-common/ceph/deployment/hostspec.py | 135 ++ src/python-common/ceph/deployment/inventory.py | 132 ++ src/python-common/ceph/deployment/service_spec.py | 1384 ++++++++++++++++++++ src/python-common/ceph/deployment/translate.py | 119 ++ src/python-common/ceph/deployment/utils.py | 102 ++ src/python-common/ceph/py.typed | 1 + src/python-common/ceph/tests/__init__.py | 0 src/python-common/ceph/tests/c-v-inventory.json | 155 +++ src/python-common/ceph/tests/factories.py | 101 ++ src/python-common/ceph/tests/test_datetime.py | 61 + src/python-common/ceph/tests/test_disk_selector.py | 560 ++++++++ src/python-common/ceph/tests/test_drive_group.py | 336 +++++ src/python-common/ceph/tests/test_hostspec.py | 40 + src/python-common/ceph/tests/test_inventory.py | 21 + src/python-common/ceph/tests/test_service_spec.py | 866 ++++++++++++ src/python-common/ceph/tests/test_utils.py | 75 ++ src/python-common/ceph/tests/utils.py | 46 + src/python-common/ceph/utils.py | 123 ++ src/python-common/requirements-lint.txt | 2 + src/python-common/requirements.txt | 7 + src/python-common/setup.py | 32 + src/python-common/tox.ini | 27 + 33 files changed, 5353 insertions(+) create mode 100644 src/python-common/.gitignore create mode 100644 src/python-common/CMakeLists.txt create mode 100644 src/python-common/README.rst create mode 100644 src/python-common/ceph/__init__.py create mode 100644 src/python-common/ceph/deployment/__init__.py create mode 100644 src/python-common/ceph/deployment/drive_group.py create mode 100644 src/python-common/ceph/deployment/drive_selection/__init__.py create mode 100644 src/python-common/ceph/deployment/drive_selection/example.yaml create mode 100644 src/python-common/ceph/deployment/drive_selection/filter.py create mode 100644 src/python-common/ceph/deployment/drive_selection/matchers.py create mode 100644 src/python-common/ceph/deployment/drive_selection/selector.py create mode 100644 src/python-common/ceph/deployment/hostspec.py create mode 100644 src/python-common/ceph/deployment/inventory.py create mode 100644 src/python-common/ceph/deployment/service_spec.py create mode 100644 src/python-common/ceph/deployment/translate.py create mode 100644 src/python-common/ceph/deployment/utils.py create mode 100644 src/python-common/ceph/py.typed create mode 100644 src/python-common/ceph/tests/__init__.py create mode 100644 src/python-common/ceph/tests/c-v-inventory.json create mode 100644 src/python-common/ceph/tests/factories.py create mode 100644 src/python-common/ceph/tests/test_datetime.py create mode 100644 src/python-common/ceph/tests/test_disk_selector.py create mode 100644 src/python-common/ceph/tests/test_drive_group.py create mode 100644 src/python-common/ceph/tests/test_hostspec.py create mode 100644 src/python-common/ceph/tests/test_inventory.py create mode 100644 src/python-common/ceph/tests/test_service_spec.py create mode 100644 src/python-common/ceph/tests/test_utils.py create mode 100644 src/python-common/ceph/tests/utils.py create mode 100644 src/python-common/ceph/utils.py create mode 100644 src/python-common/requirements-lint.txt create mode 100644 src/python-common/requirements.txt create mode 100644 src/python-common/setup.py create mode 100644 src/python-common/tox.ini (limited to 'src/python-common') diff --git a/src/python-common/.gitignore b/src/python-common/.gitignore new file mode 100644 index 000000000..c2de8195a --- /dev/null +++ b/src/python-common/.gitignore @@ -0,0 +1,3 @@ +ceph.egg-info +build +setup.cfg diff --git a/src/python-common/CMakeLists.txt b/src/python-common/CMakeLists.txt new file mode 100644 index 000000000..e89bbe2fe --- /dev/null +++ b/src/python-common/CMakeLists.txt @@ -0,0 +1,7 @@ +include(Distutils) +distutils_install_module(ceph) + +if(WITH_TESTS) + include(AddCephTest) + add_tox_test(python-common TOX_ENVS py3 lint) +endif() diff --git a/src/python-common/README.rst b/src/python-common/README.rst new file mode 100644 index 000000000..3900ec4f9 --- /dev/null +++ b/src/python-common/README.rst @@ -0,0 +1,22 @@ +ceph-python-common +================== + +This library is meant to be used to keep common data structures and +functions usable throughout the Ceph project. + +Like for example: + +- All different Cython bindings. +- MGR modules. +- ``ceph`` command line interface and other Ceph tools. +- Also external tools. + +Usage +===== + +From within the Ceph git, just import it: + +.. code:: python + + from ceph.deployment_utils import DriveGroupSpec + from ceph.exceptions import OSError diff --git a/src/python-common/ceph/__init__.py b/src/python-common/ceph/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/python-common/ceph/deployment/__init__.py b/src/python-common/ceph/deployment/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/python-common/ceph/deployment/drive_group.py b/src/python-common/ceph/deployment/drive_group.py new file mode 100644 index 000000000..dc36a60d9 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_group.py @@ -0,0 +1,348 @@ +import enum +import yaml + +from ceph.deployment.inventory import Device +from ceph.deployment.service_spec import ServiceSpec, PlacementSpec +from ceph.deployment.hostspec import SpecValidationError + +try: + from typing import Optional, List, Dict, Any, Union +except ImportError: + pass + + +class OSDMethod(str, enum.Enum): + raw = 'raw' + lvm = 'lvm' + + +class DeviceSelection(object): + """ + Used within :class:`ceph.deployment.drive_group.DriveGroupSpec` to specify the devices + used by the Drive Group. + + Any attributes (even none) can be included in the device + specification structure. + """ + + _supported_filters = [ + "paths", "size", "vendor", "model", "rotational", "limit", "all" + ] + + def __init__(self, + paths=None, # type: Optional[List[str]] + model=None, # type: Optional[str] + size=None, # type: Optional[str] + rotational=None, # type: Optional[bool] + limit=None, # type: Optional[int] + vendor=None, # type: Optional[str] + all=False, # type: bool + ): + """ + ephemeral drive group device specification + """ + #: List of Device objects for devices paths. + self.paths = [] if paths is None else [Device(path) for path in paths] # type: List[Device] + + #: A wildcard string. e.g: "SDD*" or "SanDisk SD8SN8U5" + self.model = model + + #: Match on the VENDOR property of the drive + self.vendor = vendor + + #: Size specification of format LOW:HIGH. + #: Can also take the the form :HIGH, LOW: + #: or an exact value (as ceph-volume inventory reports) + self.size: Optional[str] = size + + #: is the drive rotating or not + self.rotational = rotational + + #: Limit the number of devices added to this Drive Group. Devices + #: are used from top to bottom in the output of ``ceph-volume inventory`` + self.limit = limit + + #: Matches all devices. Can only be used for data devices + self.all = all + + def validate(self, name: str) -> None: + props = [self.model, self.vendor, self.size, self.rotational] # type: List[Any] + if self.paths and any(p is not None for p in props): + raise DriveGroupValidationError( + name, + 'device selection: `paths` and other parameters are mutually exclusive') + is_empty = not any(p is not None and p != [] for p in [self.paths] + props) + if not self.all and is_empty: + raise DriveGroupValidationError(name, 'device selection cannot be empty') + + if self.all and not is_empty: + raise DriveGroupValidationError( + name, + 'device selection: `all` and other parameters are mutually exclusive. {}'.format( + repr(self))) + + @classmethod + def from_json(cls, device_spec): + # type: (dict) -> Optional[DeviceSelection] + if not device_spec: + return None + for applied_filter in list(device_spec.keys()): + if applied_filter not in cls._supported_filters: + raise KeyError(applied_filter) + + return cls(**device_spec) + + def to_json(self): + # type: () -> Dict[str, Any] + ret: Dict[str, Any] = {} + if self.paths: + ret['paths'] = [p.path for p in self.paths] + if self.model: + ret['model'] = self.model + if self.vendor: + ret['vendor'] = self.vendor + if self.size: + ret['size'] = self.size + if self.rotational is not None: + ret['rotational'] = self.rotational + if self.limit: + ret['limit'] = self.limit + if self.all: + ret['all'] = self.all + + return ret + + def __repr__(self) -> str: + keys = [ + key for key in self._supported_filters + ['limit'] if getattr(self, key) is not None + ] + if 'paths' in keys and self.paths == []: + keys.remove('paths') + return "DeviceSelection({})".format( + ', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys) + ) + + def __eq__(self, other: Any) -> bool: + return repr(self) == repr(other) + + +class DriveGroupValidationError(SpecValidationError): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + + def __init__(self, name: Optional[str], msg: str): + name = name or "" + super(DriveGroupValidationError, self).__init__( + f'Failed to validate OSD spec "{name}": {msg}') + + +class DriveGroupSpec(ServiceSpec): + """ + Describe a drive group in the same form that ceph-volume + understands. + """ + + _supported_features = [ + "encrypted", "block_wal_size", "osds_per_device", + "db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type", + "data_devices", "db_devices", "wal_devices", "journal_devices", + "data_directories", "osds_per_device", "objectstore", "osd_id_claims", + "journal_size", "unmanaged", "filter_logic", "preview_only", "extra_container_args", + "data_allocate_fraction", "method", "crush_device_class", + ] + + def __init__(self, + placement=None, # type: Optional[PlacementSpec] + service_id=None, # type: Optional[str] + data_devices=None, # type: Optional[DeviceSelection] + db_devices=None, # type: Optional[DeviceSelection] + wal_devices=None, # type: Optional[DeviceSelection] + journal_devices=None, # type: Optional[DeviceSelection] + data_directories=None, # type: Optional[List[str]] + osds_per_device=None, # type: Optional[int] + objectstore='bluestore', # type: str + encrypted=False, # type: bool + db_slots=None, # type: Optional[int] + wal_slots=None, # type: Optional[int] + osd_id_claims=None, # type: Optional[Dict[str, List[str]]] + block_db_size=None, # type: Union[int, str, None] + block_wal_size=None, # type: Union[int, str, None] + journal_size=None, # type: Union[int, str, None] + service_type=None, # type: Optional[str] + unmanaged=False, # type: bool + filter_logic='AND', # type: str + preview_only=False, # type: bool + extra_container_args=None, # type: Optional[List[str]] + data_allocate_fraction=None, # type: Optional[float] + method=None, # type: Optional[OSDMethod] + crush_device_class=None, # type: Optional[str] + ): + assert service_type is None or service_type == 'osd' + super(DriveGroupSpec, self).__init__('osd', service_id=service_id, + placement=placement, + unmanaged=unmanaged, + preview_only=preview_only, + extra_container_args=extra_container_args) + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.data_devices = data_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.db_devices = db_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.wal_devices = wal_devices + + #: A :class:`ceph.deployment.drive_group.DeviceSelection` + self.journal_devices = journal_devices + + #: Set (or override) the "bluestore_block_wal_size" value, in bytes + self.block_wal_size: Union[int, str, None] = block_wal_size + + #: Set (or override) the "bluestore_block_db_size" value, in bytes + self.block_db_size: Union[int, str, None] = block_db_size + + #: set journal_size in bytes + self.journal_size: Union[int, str, None] = journal_size + + #: Number of osd daemons per "DATA" device. + #: To fully utilize nvme devices multiple osds are required. + self.osds_per_device = osds_per_device + + #: A list of strings, containing paths which should back OSDs + self.data_directories = data_directories + + #: ``filestore`` or ``bluestore`` + self.objectstore = objectstore + + #: ``true`` or ``false`` + self.encrypted = encrypted + + #: How many OSDs per DB device + self.db_slots = db_slots + + #: How many OSDs per WAL device + self.wal_slots = wal_slots + + #: Optional: mapping of host -> List of osd_ids that should be replaced + #: See :ref:`orchestrator-osd-replace` + self.osd_id_claims = osd_id_claims or dict() + + #: The logic gate we use to match disks with filters. + #: defaults to 'AND' + self.filter_logic = filter_logic.upper() + + #: If this should be treated as a 'preview' spec + self.preview_only = preview_only + + #: Allocate a fraction of the data device (0,1.0] + self.data_allocate_fraction = data_allocate_fraction + + self.method = method + + #: Crush device class to assign to OSDs + self.crush_device_class = crush_device_class + + @classmethod + def _from_json_impl(cls, json_drive_group): + # type: (dict) -> DriveGroupSpec + """ + Initialize 'Drive group' structure + + :param json_drive_group: A valid json string with a Drive Group + specification + """ + args: Dict[str, Any] = json_drive_group.copy() + # legacy json (pre Octopus) + if 'host_pattern' in args and 'placement' not in args: + args['placement'] = {'host_pattern': args['host_pattern']} + del args['host_pattern'] + + s_id = args.get('service_id', '') + + # spec: was not mandatory in octopus + if 'spec' in args: + args['spec'].update(cls._drive_group_spec_from_json(s_id, args['spec'])) + else: + args.update(cls._drive_group_spec_from_json(s_id, args)) + + return super(DriveGroupSpec, cls)._from_json_impl(args) + + @classmethod + def _drive_group_spec_from_json(cls, name: str, json_drive_group: dict) -> dict: + for applied_filter in list(json_drive_group.keys()): + if applied_filter not in cls._supported_features: + raise DriveGroupValidationError( + name, + "Feature `{}` is not supported".format(applied_filter)) + + try: + def to_selection(key: str, vals: dict) -> Optional[DeviceSelection]: + try: + return DeviceSelection.from_json(vals) + except KeyError as e: + raise DriveGroupValidationError( + f'{name}.{key}', + f"Filtering for `{e.args[0]}` is not supported") + + args = {k: (to_selection(k, v) if k.endswith('_devices') else v) for k, v in + json_drive_group.items()} + if not args: + raise DriveGroupValidationError(name, "Didn't find drive selections") + return args + except (KeyError, TypeError) as e: + raise DriveGroupValidationError(name, str(e)) + + def validate(self): + # type: () -> None + super(DriveGroupSpec, self).validate() + + if self.placement.is_empty(): + raise DriveGroupValidationError(self.service_id, '`placement` required') + + if self.data_devices is None: + raise DriveGroupValidationError(self.service_id, "`data_devices` element is required.") + + specs_names = "data_devices db_devices wal_devices journal_devices".split() + specs = dict(zip(specs_names, [getattr(self, k) for k in specs_names])) + for k, s in [ks for ks in specs.items() if ks[1] is not None]: + assert s is not None + s.validate(f'{self.service_id}.{k}') + for s in filter(None, [self.db_devices, self.wal_devices, self.journal_devices]): + if s.all: + raise DriveGroupValidationError( + self.service_id, + "`all` is only allowed for data_devices") + + if self.objectstore not in ('bluestore'): + raise DriveGroupValidationError(self.service_id, + f"{self.objectstore} is not supported. Must be " + f"one of ('bluestore')") + + if self.block_wal_size is not None and type(self.block_wal_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'block_wal_size must be of type int or string') + if self.block_db_size is not None and type(self.block_db_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'block_db_size must be of type int or string') + if self.journal_size is not None and type(self.journal_size) not in [int, str]: + raise DriveGroupValidationError( + self.service_id, + 'journal_size must be of type int or string') + + if self.filter_logic not in ['AND', 'OR']: + raise DriveGroupValidationError( + self.service_id, + 'filter_logic must be either or ') + + if self.method not in [None, 'lvm', 'raw']: + raise DriveGroupValidationError(self.service_id, 'method must be one of None, lvm, raw') + if self.method == 'raw' and self.objectstore == 'filestore': + raise DriveGroupValidationError(self.service_id, 'method raw only supports bluestore') + + +yaml.add_representer(DriveGroupSpec, DriveGroupSpec.yaml_representer) diff --git a/src/python-common/ceph/deployment/drive_selection/__init__.py b/src/python-common/ceph/deployment/drive_selection/__init__.py new file mode 100644 index 000000000..994e2f2da --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/__init__.py @@ -0,0 +1,2 @@ +from .selector import DriveSelection # NOQA +from .matchers import Matcher, SubstringMatcher, EqualityMatcher, AllMatcher, SizeMatcher # NOQA diff --git a/src/python-common/ceph/deployment/drive_selection/example.yaml b/src/python-common/ceph/deployment/drive_selection/example.yaml new file mode 100644 index 000000000..2851e7dbb --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/example.yaml @@ -0,0 +1,21 @@ +# default: +# target: 'data*' +# data_devices: +# size: 20G +# db_devices: +# size: 10G +# rotational: 1 +# allflash: +# target: 'fast_nodes*' +# data_devices: +# size: 100G +# db_devices: +# size: 50G +# rotational: 0 + +# This is the default configuration and +# will create an OSD on all available drives +default: + target: 'fnmatch_target' + data_devices: + all: true diff --git a/src/python-common/ceph/deployment/drive_selection/filter.py b/src/python-common/ceph/deployment/drive_selection/filter.py new file mode 100644 index 000000000..36cacfaa5 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/filter.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +import logging + +from ceph.deployment.drive_group import DeviceSelection + +try: + from typing import Generator +except ImportError: + pass + +from .matchers import Matcher, SubstringMatcher, AllMatcher, SizeMatcher, EqualityMatcher + +logger = logging.getLogger(__name__) + + +class FilterGenerator(object): + def __init__(self, device_filter): + # type: (DeviceSelection) -> None + self.device_filter = device_filter + + def __iter__(self): + # type: () -> Generator[Matcher, None, None] + if self.device_filter.size: + yield SizeMatcher('size', self.device_filter.size) + if self.device_filter.model: + yield SubstringMatcher('model', self.device_filter.model) + if self.device_filter.vendor: + yield SubstringMatcher('vendor', self.device_filter.vendor) + if self.device_filter.rotational is not None: + val = '1' if self.device_filter.rotational else '0' + yield EqualityMatcher('rotational', val) + if self.device_filter.all: + yield AllMatcher('all', str(self.device_filter.all)) diff --git a/src/python-common/ceph/deployment/drive_selection/matchers.py b/src/python-common/ceph/deployment/drive_selection/matchers.py new file mode 100644 index 000000000..f423c2f43 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/matchers.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 -*- + +from typing import Tuple, Optional, Any, Union, Iterator + +from ceph.deployment.inventory import Device + +import re +import logging + +logger = logging.getLogger(__name__) + + +class _MatchInvalid(Exception): + pass + + +# pylint: disable=too-few-public-methods +class Matcher(object): + """ The base class to all Matchers + + It holds utility methods such as _get_disk_key + and handles the initialization. + + """ + + def __init__(self, key, value): + # type: (str, Any) -> None + """ Initialization of Base class + + :param str key: Attribute like 'model, size or vendor' + :param str value: Value of attribute like 'X123, 5G or samsung' + """ + self.key = key + self.value = value + self.fallback_key = '' # type: Optional[str] + + def _get_disk_key(self, device): + # type: (Device) -> Any + """ Helper method to safely extract values form the disk dict + + There is a 'key' and a _optional_ 'fallback' key that can be used. + The reason for this is that the output of ceph-volume is not always + consistent (due to a bug currently, but you never know). + There is also a safety measure for a disk_key not existing on + virtual environments. ceph-volume apparently sources its information + from udev which seems to not populate certain fields on VMs. + + :raises: A generic Exception when no disk_key could be found. + :return: A disk value + :rtype: str + """ + # using the . notation, but some keys are nested, and hidden behind + # a different hierarchy, which makes it harder to access programatically + # hence, make it a dict. + disk = device.to_json() + + def findkeys(node: Union[list, dict], key_val: str) -> Iterator[str]: + """ Find keys in non-flat dict recursively """ + if isinstance(node, list): + for i in node: + for key in findkeys(i, key_val): + yield key + elif isinstance(node, dict): + if key_val in node: + yield node[key_val] + for j in node.values(): + for key in findkeys(j, key_val): + yield key + + disk_value = list(findkeys(disk, self.key)) + if not disk_value and self.fallback_key: + disk_value = list(findkeys(disk, self.fallback_key)) + + if disk_value: + return disk_value[0] + else: + raise _MatchInvalid("No value found for {} or {}".format( + self.key, self.fallback_key)) + + def compare(self, disk): + # type: (Device) -> bool + """ Implements a valid comparison method for a SubMatcher + This will get overwritten by the individual classes + + :param dict disk: A disk representation + """ + raise NotImplementedError + + +# pylint: disable=too-few-public-methods +class SubstringMatcher(Matcher): + """ Substring matcher subclass + """ + + def __init__(self, key, value, fallback_key=None): + # type: (str, str, Optional[str]) -> None + Matcher.__init__(self, key, value) + self.fallback_key = fallback_key + + def compare(self, disk): + # type: (Device) -> bool + """ Overwritten method to match substrings + + This matcher does substring matching + :param dict disk: A disk representation (see base for examples) + :return: True/False if the match succeeded + :rtype: bool + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + if str(self.value) in disk_value: + return True + return False + + +# pylint: disable=too-few-public-methods +class AllMatcher(Matcher): + """ All matcher subclass + """ + + def __init__(self, key, value, fallback_key=None): + # type: (str, Any, Optional[str]) -> None + + Matcher.__init__(self, key, value) + self.fallback_key = fallback_key + + def compare(self, disk): + # type: (Device) -> bool + + """ Overwritten method to match all + + A rather dumb matcher that just accepts all disks + (regardless of the value) + :param dict disk: A disk representation (see base for examples) + :return: always True + :rtype: bool + """ + if not disk: + return False + return True + + +# pylint: disable=too-few-public-methods +class EqualityMatcher(Matcher): + """ Equality matcher subclass + """ + + def __init__(self, key, value): + # type: (str, Any) -> None + + Matcher.__init__(self, key, value) + + def compare(self, disk): + # type: (Device) -> bool + + """ Overwritten method to match equality + + This matcher does value comparison + :param dict disk: A disk representation + :return: True/False if the match succeeded + :rtype: bool + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + ret = disk_value == self.value + if not ret: + logger.debug('{} != {}'.format(disk_value, self.value)) + return ret + + +class SizeMatcher(Matcher): + """ Size matcher subclass + """ + + SUFFIXES = ( + ["KB", "MB", "GB", "TB"], + ["K", "M", "G", "T"], + [1e+3, 1e+6, 1e+9, 1e+12] + ) + + supported_suffixes = SUFFIXES[0] + SUFFIXES[1] + + # pylint: disable=too-many-instance-attributes + def __init__(self, key, value): + # type: (str, str) -> None + + # The 'key' value is overwritten here because + # the user_defined attribute does not neccessarily + # correspond to the desired attribute + # requested from the inventory output + Matcher.__init__(self, key, value) + self.key = "human_readable_size" + self.fallback_key = "size" + self._high = None # type: Optional[str] + self._high_suffix = None # type: Optional[str] + self._low = None # type: Optional[str] + self._low_suffix = None # type: Optional[str] + self._exact = None # type: Optional[str] + self._exact_suffix = None # type: Optional[str] + self._parse_filter() + + @property + def low(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'low' matchers + """ + return self._low, self._low_suffix + + @low.setter + def low(self, low): + # type: (Tuple[str, str]) -> None + """ Setter for 'low' matchers + """ + self._low, self._low_suffix = low + + @property + def high(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'high' matchers + """ + return self._high, self._high_suffix + + @high.setter + def high(self, high): + # type: (Tuple[str, str]) -> None + """ Setter for 'high' matchers + """ + self._high, self._high_suffix = high + + @property + def exact(self): + # type: () -> Tuple[Optional[str], Optional[str]] + """ Getter for 'exact' matchers + """ + return self._exact, self._exact_suffix + + @exact.setter + def exact(self, exact): + # type: (Tuple[str, str]) -> None + """ Setter for 'exact' matchers + """ + self._exact, self._exact_suffix = exact + + @classmethod + def _normalize_suffix(cls, suffix): + # type: (str) -> str + """ Normalize any supported suffix + + Since the Drive Groups are user facing, we simply + can't make sure that all users type in the requested + form. That's why we have to internally agree on one format. + It also checks if any of the supported suffixes was used + and raises an Exception otherwise. + + :param str suffix: A suffix ('G') or ('M') + :return: A normalized output + :rtype: str + """ + suffix = suffix.upper() + if suffix not in cls.supported_suffixes: + raise _MatchInvalid("Unit '{}' not supported".format(suffix)) + return dict(zip( + cls.SUFFIXES[1], + cls.SUFFIXES[0], + )).get(suffix, suffix) + + @classmethod + def _parse_suffix(cls, obj): + # type: (str) -> str + """ Wrapper method to find and normalize a prefix + + :param str obj: A size filtering string ('10G') + :return: A normalized unit ('GB') + :rtype: str + """ + return cls._normalize_suffix(re.findall(r"[a-zA-Z]+", obj)[0]) + + @classmethod + def _get_k_v(cls, data): + # type: (str) -> Tuple[str, str] + """ Helper method to extract data from a string + + It uses regex to extract all digits and calls _parse_suffix + which also uses a regex to extract all letters and normalizes + the resulting suffix. + + :param str data: A size filtering string ('10G') + :return: A Tuple with normalized output (10, 'GB') + :rtype: tuple + """ + return re.findall(r"\d+\.?\d*", data)[0], cls._parse_suffix(data) + + def _parse_filter(self) -> None: + """ Identifies which type of 'size' filter is applied + + There are four different filtering modes: + + 1) 10G:50G (high-low) + At least 10G but at max 50G of size + + 2) :60G + At max 60G of size + + 3) 50G: + At least 50G of size + + 4) 20G + Exactly 20G in size + + This method uses regex to identify and extract this information + and raises if none could be found. + """ + low_high = re.match(r"\d+[A-Z]{1,2}:\d+[A-Z]{1,2}", self.value) + if low_high: + low, high = low_high.group().split(":") + self.low = self._get_k_v(low) + self.high = self._get_k_v(high) + + low = re.match(r"\d+[A-Z]{1,2}:$", self.value) + if low: + self.low = self._get_k_v(low.group()) + + high = re.match(r"^:\d+[A-Z]{1,2}", self.value) + if high: + self.high = self._get_k_v(high.group()) + + exact = re.match(r"^\d+\.?\d*[A-Z]{1,2}$", self.value) + if exact: + self.exact = self._get_k_v(exact.group()) + + if not self.low and not self.high and not self.exact: + raise _MatchInvalid("Couldn't parse {}".format(self.value)) + + @staticmethod + # pylint: disable=inconsistent-return-statements + def to_byte(tpl): + # type: (Tuple[Optional[str], Optional[str]]) -> float + + """ Convert any supported unit to bytes + + :param tuple tpl: A tuple with ('10', 'GB') + :return: The converted byte value + :rtype: float + """ + val_str, suffix = tpl + value = float(val_str) if val_str is not None else 0.0 + return dict(zip( + SizeMatcher.SUFFIXES[0], + SizeMatcher.SUFFIXES[2], + )).get(str(suffix), 0.00) * value + + @staticmethod + def str_to_byte(input): + # type: (str) -> float + return SizeMatcher.to_byte(SizeMatcher._get_k_v(input)) + + # pylint: disable=inconsistent-return-statements, too-many-return-statements + def compare(self, disk): + # type: (Device) -> bool + """ Convert MB/GB/TB down to bytes and compare + + 1) Extracts information from the to-be-inspected disk. + 2) Depending on the mode, apply checks and return + + # This doesn't seem very solid and _may_ + be re-factored + + + """ + if not disk: + return False + disk_value = self._get_disk_key(disk) + # This doesn't neccessarily have to be a float. + # The current output from ceph-volume gives a float.. + # This may change in the future.. + # todo: harden this paragraph + if not disk_value: + logger.warning("Could not retrieve value for disk") + return False + + disk_size = re.findall(r"\d+\.\d+", disk_value)[0] + disk_suffix = self._parse_suffix(disk_value) + disk_size_in_byte = self.to_byte((disk_size, disk_suffix)) + + if all(self.high) and all(self.low): + if disk_size_in_byte <= self.to_byte( + self.high) and disk_size_in_byte >= self.to_byte(self.low): + return True + # is a else: return False neccessary here? + # (and in all other branches) + logger.debug("Disk didn't match for 'high/low' filter") + + elif all(self.low) and not all(self.high): + if disk_size_in_byte >= self.to_byte(self.low): + return True + logger.debug("Disk didn't match for 'low' filter") + + elif all(self.high) and not all(self.low): + if disk_size_in_byte <= self.to_byte(self.high): + return True + logger.debug("Disk didn't match for 'high' filter") + + elif all(self.exact): + if disk_size_in_byte == self.to_byte(self.exact): + return True + logger.debug("Disk didn't match for 'exact' filter") + else: + logger.debug("Neither high, low, nor exact was given") + raise _MatchInvalid("No filters applied") + return False diff --git a/src/python-common/ceph/deployment/drive_selection/selector.py b/src/python-common/ceph/deployment/drive_selection/selector.py new file mode 100644 index 000000000..07b0549f3 --- /dev/null +++ b/src/python-common/ceph/deployment/drive_selection/selector.py @@ -0,0 +1,179 @@ +import logging + +from typing import List, Optional, Dict, Callable + +from ..inventory import Device +from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError + +from .filter import FilterGenerator +from .matchers import _MatchInvalid + +logger = logging.getLogger(__name__) + + +def to_dg_exception(f: Callable) -> Callable[['DriveSelection', str, + Optional['DeviceSelection']], + List['Device']]: + def wrapper(self: 'DriveSelection', name: str, ds: Optional['DeviceSelection']) -> List[Device]: + try: + return f(self, ds) + except _MatchInvalid as e: + raise DriveGroupValidationError(f'{self.spec.service_id}.{name}', e.args[0]) + return wrapper + + +class DriveSelection(object): + def __init__(self, + spec, # type: DriveGroupSpec + disks, # type: List[Device] + existing_daemons=None, # type: Optional[int] + ): + self.disks = disks.copy() + self.spec = spec + self.existing_daemons = existing_daemons or 0 + + self._data = self.assign_devices('data_devices', self.spec.data_devices) + self._wal = self.assign_devices('wal_devices', self.spec.wal_devices) + self._db = self.assign_devices('db_devices', self.spec.db_devices) + self._journal = self.assign_devices('journal_devices', self.spec.journal_devices) + + def data_devices(self): + # type: () -> List[Device] + return self._data + + def wal_devices(self): + # type: () -> List[Device] + return self._wal + + def db_devices(self): + # type: () -> List[Device] + return self._db + + def journal_devices(self): + # type: () -> List[Device] + return self._journal + + def _limit_reached(self, device_filter, len_devices, + disk_path): + # type: (DeviceSelection, int, str) -> bool + """ Check for the property and apply logic + + If a limit is set in 'device_attrs' we have to stop adding + disks at some point. + + If limit is set (>0) and len(devices) >= limit + + :param int len_devices: Length of the already populated device set/list + :param str disk_path: The disk identifier (for logging purposes) + :return: True/False if the device should be added to the list of devices + :rtype: bool + """ + limit = device_filter.limit or 0 + + if limit > 0 and (len_devices + self.existing_daemons >= limit): + logger.info("Refuse to add {} due to limit policy of <{}>".format( + disk_path, limit)) + return True + return False + + @staticmethod + def _has_mandatory_idents(disk): + # type: (Device) -> bool + """ Check for mandatory identification fields + """ + if disk.path: + logger.debug("Found matching disk: {}".format(disk.path)) + return True + else: + raise Exception( + "Disk {} doesn't have a 'path' identifier".format(disk)) + + @to_dg_exception + def assign_devices(self, device_filter): + # type: (Optional[DeviceSelection]) -> List[Device] + """ Assign drives based on used filters + + Do not add disks when: + + 1) Filter didn't match + 2) Disk doesn't have a mandatory identification item (path) + 3) The set :limit was reached + + After the disk was added we make sure not to re-assign this disk + for another defined type[wal/db/journal devices] + + return a sorted(by path) list of devices + """ + + if not device_filter: + logger.debug('device_filter is None') + return [] + + if not self.spec.data_devices: + logger.debug('data_devices is None') + return [] + + if device_filter.paths: + logger.debug('device filter is using explicit paths') + return device_filter.paths + + devices = list() # type: List[Device] + for disk in self.disks: + logger.debug("Processing disk {}".format(disk.path)) + + if not disk.available and not disk.ceph_device: + logger.debug( + ("Ignoring disk {}. " + "Disk is unavailable due to {}".format(disk.path, disk.rejected_reasons)) + ) + continue + + if not self._has_mandatory_idents(disk): + logger.debug( + "Ignoring disk {}. Missing mandatory idents".format( + disk.path)) + continue + + # break on this condition. + if self._limit_reached(device_filter, len(devices), disk.path): + logger.debug("Ignoring disk {}. Limit reached".format( + disk.path)) + break + + if disk in devices: + continue + + if self.spec.filter_logic == 'AND': + if not all(m.compare(disk) for m in FilterGenerator(device_filter)): + logger.debug( + "Ignoring disk {}. Not all filter did match the disk".format( + disk.path)) + continue + + if self.spec.filter_logic == 'OR': + if not any(m.compare(disk) for m in FilterGenerator(device_filter)): + logger.debug( + "Ignoring disk {}. No filter matched the disk".format( + disk.path)) + continue + + logger.debug('Adding disk {}'.format(disk.path)) + devices.append(disk) + + # This disk is already taken and must not be re-assigned. + for taken_device in devices: + if taken_device in self.disks: + self.disks.remove(taken_device) + + return sorted([x for x in devices], key=lambda dev: dev.path) + + def __repr__(self) -> str: + selection: Dict[str, List[str]] = { + 'data devices': [d.path for d in self._data], + 'wal_devices': [d.path for d in self._wal], + 'db devices': [d.path for d in self._db], + 'journal devices': [d.path for d in self._journal] + } + return "DeviceSelection({})".format( + ', '.join('{}={}'.format(key, selection[key]) for key in selection.keys()) + ) diff --git a/src/python-common/ceph/deployment/hostspec.py b/src/python-common/ceph/deployment/hostspec.py new file mode 100644 index 000000000..1bf686f97 --- /dev/null +++ b/src/python-common/ceph/deployment/hostspec.py @@ -0,0 +1,135 @@ +from collections import OrderedDict +import errno +import re +from typing import Optional, List, Any, Dict + + +def assert_valid_host(name: str) -> None: + p = re.compile('^[a-zA-Z0-9-]+$') + try: + assert len(name) <= 250, 'name is too long (max 250 chars)' + for part in name.split('.'): + assert len(part) > 0, '.-delimited name component must not be empty' + assert len(part) <= 63, '.-delimited name component must not be more than 63 chars' + assert p.match(part), 'name component must include only a-z, 0-9, and -' + except AssertionError as e: + raise SpecValidationError(str(e)) + + +class SpecValidationError(Exception): + """ + Defining an exception here is a bit problematic, cause you cannot properly catch it, + if it was raised in a different mgr module. + """ + def __init__(self, + msg: str, + errno: int = -errno.EINVAL): + super(SpecValidationError, self).__init__(msg) + self.errno = errno + + +class HostSpec(object): + """ + Information about hosts. Like e.g. ``kubectl get nodes`` + """ + def __init__(self, + hostname: str, + addr: Optional[str] = None, + labels: Optional[List[str]] = None, + status: Optional[str] = None, + location: Optional[Dict[str, str]] = None, + ): + self.service_type = 'host' + + #: the bare hostname on the host. Not the FQDN. + self.hostname = hostname # type: str + + #: DNS name or IP address to reach it + self.addr = addr or hostname # type: str + + #: label(s), if any + self.labels = labels or [] # type: List[str] + + #: human readable status + self.status = status or '' # type: str + + self.location = location + + def validate(self) -> None: + assert_valid_host(self.hostname) + + def to_json(self) -> Dict[str, Any]: + r: Dict[str, Any] = { + 'hostname': self.hostname, + 'addr': self.addr, + 'labels': list(OrderedDict.fromkeys((self.labels))), + 'status': self.status, + } + if self.location: + r['location'] = self.location + return r + + @classmethod + def from_json(cls, host_spec: dict) -> 'HostSpec': + host_spec = cls.normalize_json(host_spec) + _cls = cls( + host_spec['hostname'], + host_spec['addr'] if 'addr' in host_spec else None, + list(OrderedDict.fromkeys( + host_spec['labels'])) if 'labels' in host_spec else None, + host_spec['status'] if 'status' in host_spec else None, + host_spec.get('location'), + ) + return _cls + + @staticmethod + def normalize_json(host_spec: dict) -> dict: + labels = host_spec.get('labels') + if labels is not None: + if isinstance(labels, str): + host_spec['labels'] = [labels] + elif ( + not isinstance(labels, list) + or any(not isinstance(v, str) for v in labels) + ): + raise SpecValidationError( + f'Labels ({labels}) must be a string or list of strings' + ) + + loc = host_spec.get('location') + if loc is not None: + if ( + not isinstance(loc, dict) + or any(not isinstance(k, str) for k in loc.keys()) + or any(not isinstance(v, str) for v in loc.values()) + ): + raise SpecValidationError( + f'Location ({loc}) must be a dictionary of strings to strings' + ) + + return host_spec + + def __repr__(self) -> str: + args = [self.hostname] # type: List[Any] + if self.addr is not None: + args.append(self.addr) + if self.labels: + args.append(self.labels) + if self.status: + args.append(self.status) + if self.location: + args.append(self.location) + + return "HostSpec({})".format(', '.join(map(repr, args))) + + def __str__(self) -> str: + if self.hostname != self.addr: + return f'{self.hostname} ({self.addr})' + return self.hostname + + def __eq__(self, other: Any) -> bool: + # Let's omit `status` for the moment, as it is still the very same host. + return self.hostname == other.hostname and \ + self.addr == other.addr and \ + sorted(self.labels) == sorted(other.labels) and \ + self.location == other.location diff --git a/src/python-common/ceph/deployment/inventory.py b/src/python-common/ceph/deployment/inventory.py new file mode 100644 index 000000000..570c1dbb3 --- /dev/null +++ b/src/python-common/ceph/deployment/inventory.py @@ -0,0 +1,132 @@ +try: + from typing import List, Optional, Dict, Any, Union +except ImportError: + pass # for type checking + +from ceph.utils import datetime_now, datetime_to_str, str_to_datetime +import datetime +import json + + +class Devices(object): + """ + A container for Device instances with reporting + """ + + def __init__(self, devices): + # type: (List[Device]) -> None + self.devices = devices # type: List[Device] + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, Devices): + return NotImplemented + if len(self.devices) != len(other.devices): + return False + for d1, d2 in zip(other.devices, self.devices): + if d1 != d2: + return False + return True + + def to_json(self): + # type: () -> List[dict] + return [d.to_json() for d in self.devices] + + @classmethod + def from_json(cls, input): + # type: (List[Dict[str, Any]]) -> Devices + return cls([Device.from_json(i) for i in input]) + + def copy(self): + # type: () -> Devices + return Devices(devices=list(self.devices)) + + +class Device(object): + report_fields = [ + 'ceph_device', + 'rejected_reasons', + 'available', + 'path', + 'sys_api', + 'created', + 'lvs', + 'human_readable_type', + 'device_id', + 'lsm_data', + ] + + def __init__(self, + path, # type: str + sys_api=None, # type: Optional[Dict[str, Any]] + available=None, # type: Optional[bool] + rejected_reasons=None, # type: Optional[List[str]] + lvs=None, # type: Optional[List[str]] + device_id=None, # type: Optional[str] + lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]] + created=None, # type: Optional[datetime.datetime] + ceph_device=None # type: Optional[bool] + ): + self.path = path + self.sys_api = sys_api if sys_api is not None else {} # type: Dict[str, Any] + self.available = available + self.rejected_reasons = rejected_reasons if rejected_reasons is not None else [] + self.lvs = lvs + self.device_id = device_id + self.lsm_data = lsm_data if lsm_data is not None else {} # type: Dict[str, Dict[str, str]] + self.created = created if created is not None else datetime_now() + self.ceph_device = ceph_device + + def __eq__(self, other): + # type: (Any) -> bool + if not isinstance(other, Device): + return NotImplemented + diff = [k for k in self.report_fields if k != 'created' and (getattr(self, k) + != getattr(other, k))] + return not diff + + def to_json(self): + # type: () -> dict + return { + k: (getattr(self, k) if k != 'created' + or not isinstance(getattr(self, k), datetime.datetime) + else datetime_to_str(getattr(self, k))) + for k in self.report_fields + } + + @classmethod + def from_json(cls, input): + # type: (Dict[str, Any]) -> Device + if not isinstance(input, dict): + raise ValueError('Device: Expected dict. Got `{}...`'.format(json.dumps(input)[:10])) + ret = cls( + **{ + key: (input.get(key, None) if key != 'created' + or not input.get(key, None) + else str_to_datetime(input.get(key, None))) + for key in Device.report_fields + if key != 'human_readable_type' + } + ) + if ret.rejected_reasons: + ret.rejected_reasons = sorted(ret.rejected_reasons) + return ret + + @property + def human_readable_type(self): + # type: () -> str + if self.sys_api is None or 'rotational' not in self.sys_api: + return "unknown" + return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd' + + def __repr__(self) -> str: + device_desc: Dict[str, Union[str, List[str]]] = { + 'path': self.path if self.path is not None else 'unknown', + 'lvs': self.lvs if self.lvs else 'None', + 'available': str(self.available), + 'ceph_device': str(self.ceph_device) + } + if not self.available and self.rejected_reasons: + device_desc['rejection reasons'] = self.rejected_reasons + return "Device({})".format( + ', '.join('{}={}'.format(key, device_desc[key]) for key in device_desc.keys()) + ) diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py new file mode 100644 index 000000000..17130ea9a --- /dev/null +++ b/src/python-common/ceph/deployment/service_spec.py @@ -0,0 +1,1384 @@ +import fnmatch +import re +import enum +from collections import OrderedDict +from contextlib import contextmanager +from functools import wraps +from ipaddress import ip_network, ip_address +from typing import Optional, Dict, Any, List, Union, Callable, Iterable, Type, TypeVar, cast, \ + NamedTuple, Mapping, Iterator + +import yaml + +from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host +from ceph.deployment.utils import unwrap_ipv6, valid_addr +from ceph.utils import is_hex + +ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec') +FuncT = TypeVar('FuncT', bound=Callable) + + +def handle_type_error(method: FuncT) -> FuncT: + @wraps(method) + def inner(cls: Any, *args: Any, **kwargs: Any) -> Any: + try: + return method(cls, *args, **kwargs) + except (TypeError, AttributeError) as e: + error_msg = '{}: {}'.format(cls.__name__, e) + raise SpecValidationError(error_msg) + return cast(FuncT, inner) + + +class HostPlacementSpec(NamedTuple): + hostname: str + network: str + name: str + + def __str__(self) -> str: + res = '' + res += self.hostname + if self.network: + res += ':' + self.network + if self.name: + res += '=' + self.name + return res + + @classmethod + @handle_type_error + def from_json(cls, data: Union[dict, str]) -> 'HostPlacementSpec': + if isinstance(data, str): + return cls.parse(data) + return cls(**data) + + def to_json(self) -> str: + return str(self) + + @classmethod + def parse(cls, host, require_network=True): + # type: (str, bool) -> HostPlacementSpec + """ + Split host into host, network, and (optional) daemon name parts. The network + part can be an IP, CIDR, or ceph addrvec like '[v2:1.2.3.4:3300,v1:1.2.3.4:6789]'. + e.g., + "myhost" + "myhost=name" + "myhost:1.2.3.4" + "myhost:1.2.3.4=name" + "myhost:1.2.3.0/24" + "myhost:1.2.3.0/24=name" + "myhost:[v2:1.2.3.4:3000]=name" + "myhost:[v2:1.2.3.4:3000,v1:1.2.3.4:6789]=name" + """ + # Matches from start to : or = or until end of string + host_re = r'^(.*?)(:|=|$)' + # Matches from : to = or until end of string + ip_re = r':(.*?)(=|$)' + # Matches from = to end of string + name_re = r'=(.*?)$' + + # assign defaults + host_spec = cls('', '', '') + + match_host = re.search(host_re, host) + if match_host: + host_spec = host_spec._replace(hostname=match_host.group(1)) + + name_match = re.search(name_re, host) + if name_match: + host_spec = host_spec._replace(name=name_match.group(1)) + + ip_match = re.search(ip_re, host) + if ip_match: + host_spec = host_spec._replace(network=ip_match.group(1)) + + if not require_network: + return host_spec + + networks = list() # type: List[str] + network = host_spec.network + # in case we have [v2:1.2.3.4:3000,v1:1.2.3.4:6478] + if ',' in network: + networks = [x for x in network.split(',')] + else: + if network != '': + networks.append(network) + + for network in networks: + # only if we have versioned network configs + if network.startswith('v') or network.startswith('[v'): + # if this is ipv6 we can't just simply split on ':' so do + # a split once and rsplit once to leave us with just ipv6 addr + network = network.split(':', 1)[1] + network = network.rsplit(':', 1)[0] + try: + # if subnets are defined, also verify the validity + if '/' in network: + ip_network(network) + else: + ip_address(unwrap_ipv6(network)) + except ValueError as e: + # logging? + raise e + host_spec.validate() + return host_spec + + def validate(self) -> None: + assert_valid_host(self.hostname) + + +class PlacementSpec(object): + """ + For APIs that need to specify a host subset + """ + + def __init__(self, + label=None, # type: Optional[str] + hosts=None, # type: Union[List[str],List[HostPlacementSpec], None] + count=None, # type: Optional[int] + count_per_host=None, # type: Optional[int] + host_pattern=None, # type: Optional[str] + ): + # type: (...) -> None + self.label = label + self.hosts = [] # type: List[HostPlacementSpec] + + if hosts: + self.set_hosts(hosts) + + self.count = count # type: Optional[int] + self.count_per_host = count_per_host # type: Optional[int] + + #: fnmatch patterns to select hosts. Can also be a single host. + self.host_pattern = host_pattern # type: Optional[str] + + self.validate() + + def is_empty(self) -> bool: + return ( + self.label is None + and not self.hosts + and not self.host_pattern + and self.count is None + and self.count_per_host is None + ) + + def __eq__(self, other: Any) -> bool: + if isinstance(other, PlacementSpec): + return self.label == other.label \ + and self.hosts == other.hosts \ + and self.count == other.count \ + and self.host_pattern == other.host_pattern \ + and self.count_per_host == other.count_per_host + return NotImplemented + + def set_hosts(self, hosts: Union[List[str], List[HostPlacementSpec]]) -> None: + # To backpopulate the .hosts attribute when using labels or count + # in the orchestrator backend. + if all([isinstance(host, HostPlacementSpec) for host in hosts]): + self.hosts = hosts # type: ignore + else: + self.hosts = [HostPlacementSpec.parse(x, require_network=False) # type: ignore + for x in hosts if x] + + # deprecated + def filter_matching_hosts(self, _get_hosts_func: Callable) -> List[str]: + return self.filter_matching_hostspecs(_get_hosts_func(as_hostspec=True)) + + def filter_matching_hostspecs(self, hostspecs: Iterable[HostSpec]) -> List[str]: + if self.hosts: + all_hosts = [hs.hostname for hs in hostspecs] + return [h.hostname for h in self.hosts if h.hostname in all_hosts] + if self.label: + return [hs.hostname for hs in hostspecs if self.label in hs.labels] + all_hosts = [hs.hostname for hs in hostspecs] + if self.host_pattern: + return fnmatch.filter(all_hosts, self.host_pattern) + return all_hosts + + def get_target_count(self, hostspecs: Iterable[HostSpec]) -> int: + if self.count: + return self.count + return len(self.filter_matching_hostspecs(hostspecs)) * (self.count_per_host or 1) + + def pretty_str(self) -> str: + """ + >>> #doctest: +SKIP + ... ps = PlacementSpec(...) # For all placement specs: + ... PlacementSpec.from_string(ps.pretty_str()) == ps + """ + kv = [] + if self.hosts: + kv.append(';'.join([str(h) for h in self.hosts])) + if self.count: + kv.append('count:%d' % self.count) + if self.count_per_host: + kv.append('count-per-host:%d' % self.count_per_host) + if self.label: + kv.append('label:%s' % self.label) + if self.host_pattern: + kv.append(self.host_pattern) + return ';'.join(kv) + + def __repr__(self) -> str: + kv = [] + if self.count: + kv.append('count=%d' % self.count) + if self.count_per_host: + kv.append('count_per_host=%d' % self.count_per_host) + if self.label: + kv.append('label=%s' % repr(self.label)) + if self.hosts: + kv.append('hosts={!r}'.format(self.hosts)) + if self.host_pattern: + kv.append('host_pattern={!r}'.format(self.host_pattern)) + return "PlacementSpec(%s)" % ', '.join(kv) + + @classmethod + @handle_type_error + def from_json(cls, data: dict) -> 'PlacementSpec': + c = data.copy() + hosts = c.get('hosts', []) + if hosts: + c['hosts'] = [] + for host in hosts: + c['hosts'].append(HostPlacementSpec.from_json(host)) + _cls = cls(**c) + _cls.validate() + return _cls + + def to_json(self) -> dict: + r: Dict[str, Any] = {} + if self.label: + r['label'] = self.label + if self.hosts: + r['hosts'] = [host.to_json() for host in self.hosts] + if self.count: + r['count'] = self.count + if self.count_per_host: + r['count_per_host'] = self.count_per_host + if self.host_pattern: + r['host_pattern'] = self.host_pattern + return r + + def validate(self) -> None: + if self.hosts and self.label: + # TODO: a less generic Exception + raise SpecValidationError('Host and label are mutually exclusive') + if self.count is not None: + try: + intval = int(self.count) + except (ValueError, TypeError): + raise SpecValidationError("num/count must be a numeric value") + if self.count != intval: + raise SpecValidationError("num/count must be an integer value") + if self.count < 1: + raise SpecValidationError("num/count must be >= 1") + if self.count_per_host is not None: + try: + intval = int(self.count_per_host) + except (ValueError, TypeError): + raise SpecValidationError("count-per-host must be a numeric value") + if self.count_per_host != intval: + raise SpecValidationError("count-per-host must be an integer value") + if self.count_per_host < 1: + raise SpecValidationError("count-per-host must be >= 1") + if self.count_per_host is not None and not ( + self.label + or self.hosts + or self.host_pattern + ): + raise SpecValidationError( + "count-per-host must be combined with label or hosts or host_pattern" + ) + if self.count is not None and self.count_per_host is not None: + raise SpecValidationError("cannot combine count and count-per-host") + if ( + self.count_per_host is not None + and self.hosts + and any([hs.network or hs.name for hs in self.hosts]) + ): + raise SpecValidationError( + "count-per-host cannot be combined explicit placement with names or networks" + ) + if self.host_pattern: + if not isinstance(self.host_pattern, str): + raise SpecValidationError('host_pattern must be of type string') + if self.hosts: + raise SpecValidationError('cannot combine host patterns and hosts') + + for h in self.hosts: + h.validate() + + @classmethod + def from_string(cls, arg): + # type: (Optional[str]) -> PlacementSpec + """ + A single integer is parsed as a count: + + >>> PlacementSpec.from_string('3') + PlacementSpec(count=3) + + A list of names is parsed as host specifications: + + >>> PlacementSpec.from_string('host1 host2') + PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacemen\ +tSpec(hostname='host2', network='', name='')]) + + You can also prefix the hosts with a count as follows: + + >>> PlacementSpec.from_string('2 host1 host2') + PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), Hos\ +tPlacementSpec(hostname='host2', network='', name='')]) + + You can specify labels using `label: