summaryrefslogtreecommitdiffstats
path: root/src/python-common
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/python-common/.gitignore3
-rw-r--r--src/python-common/CMakeLists.txt7
-rw-r--r--src/python-common/README.rst22
-rw-r--r--src/python-common/ceph/__init__.py0
-rw-r--r--src/python-common/ceph/deployment/__init__.py0
-rw-r--r--src/python-common/ceph/deployment/drive_group.py385
-rw-r--r--src/python-common/ceph/deployment/drive_selection/__init__.py2
-rw-r--r--src/python-common/ceph/deployment/drive_selection/example.yaml21
-rw-r--r--src/python-common/ceph/deployment/drive_selection/filter.py36
-rw-r--r--src/python-common/ceph/deployment/drive_selection/matchers.py412
-rw-r--r--src/python-common/ceph/deployment/drive_selection/selector.py191
-rw-r--r--src/python-common/ceph/deployment/hostspec.py137
-rw-r--r--src/python-common/ceph/deployment/inventory.py138
-rw-r--r--src/python-common/ceph/deployment/service_spec.py2011
-rw-r--r--src/python-common/ceph/deployment/translate.py198
-rw-r--r--src/python-common/ceph/deployment/utils.py102
-rw-r--r--src/python-common/ceph/py.typed1
-rw-r--r--src/python-common/ceph/rgw/__init__.py3
-rw-r--r--src/python-common/ceph/rgw/diff.py93
-rw-r--r--src/python-common/ceph/rgw/rgwam_core.py937
-rw-r--r--src/python-common/ceph/rgw/types.py186
-rw-r--r--src/python-common/ceph/tests/__init__.py0
-rw-r--r--src/python-common/ceph/tests/c-v-inventory.json155
-rw-r--r--src/python-common/ceph/tests/factories.py101
-rw-r--r--src/python-common/ceph/tests/test_datetime.py61
-rw-r--r--src/python-common/ceph/tests/test_disk_selector.py560
-rw-r--r--src/python-common/ceph/tests/test_drive_group.py592
-rw-r--r--src/python-common/ceph/tests/test_hostspec.py40
-rw-r--r--src/python-common/ceph/tests/test_inventory.py71
-rw-r--r--src/python-common/ceph/tests/test_service_spec.py1270
-rw-r--r--src/python-common/ceph/tests/test_utils.py75
-rw-r--r--src/python-common/ceph/tests/utils.py46
-rw-r--r--src/python-common/ceph/utils.py123
-rw-r--r--src/python-common/requirements-lint.txt2
-rw-r--r--src/python-common/requirements.txt8
-rw-r--r--src/python-common/setup.py32
-rw-r--r--src/python-common/tox.ini35
37 files changed, 8056 insertions, 0 deletions
diff --git a/src/python-common/.gitignore b/src/python-common/.gitignore
new file mode 100644
index 000000000..c2de8195a
--- /dev/null
+++ b/src/python-common/.gitignore
@@ -0,0 +1,3 @@
+ceph.egg-info
+build
+setup.cfg
diff --git a/src/python-common/CMakeLists.txt b/src/python-common/CMakeLists.txt
new file mode 100644
index 000000000..e89bbe2fe
--- /dev/null
+++ b/src/python-common/CMakeLists.txt
@@ -0,0 +1,7 @@
+include(Distutils)
+distutils_install_module(ceph)
+
+if(WITH_TESTS)
+ include(AddCephTest)
+ add_tox_test(python-common TOX_ENVS py3 lint)
+endif()
diff --git a/src/python-common/README.rst b/src/python-common/README.rst
new file mode 100644
index 000000000..3900ec4f9
--- /dev/null
+++ b/src/python-common/README.rst
@@ -0,0 +1,22 @@
+ceph-python-common
+==================
+
+This library is meant to be used to keep common data structures and
+functions usable throughout the Ceph project.
+
+Like for example:
+
+- All different Cython bindings.
+- MGR modules.
+- ``ceph`` command line interface and other Ceph tools.
+- Also external tools.
+
+Usage
+=====
+
+From within the Ceph git, just import it:
+
+.. code:: python
+
+ from ceph.deployment_utils import DriveGroupSpec
+ from ceph.exceptions import OSError
diff --git a/src/python-common/ceph/__init__.py b/src/python-common/ceph/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/python-common/ceph/__init__.py
diff --git a/src/python-common/ceph/deployment/__init__.py b/src/python-common/ceph/deployment/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/python-common/ceph/deployment/__init__.py
diff --git a/src/python-common/ceph/deployment/drive_group.py b/src/python-common/ceph/deployment/drive_group.py
new file mode 100644
index 000000000..cf24fc0ef
--- /dev/null
+++ b/src/python-common/ceph/deployment/drive_group.py
@@ -0,0 +1,385 @@
+import enum
+import yaml
+
+from ceph.deployment.inventory import Device
+from ceph.deployment.service_spec import (
+ CustomConfig,
+ GeneralArgList,
+ PlacementSpec,
+ ServiceSpec,
+)
+from ceph.deployment.hostspec import SpecValidationError
+
+try:
+ from typing import Optional, List, Dict, Any, Union
+except ImportError:
+ pass
+
+
+class OSDMethod(str, enum.Enum):
+ raw = 'raw'
+ lvm = 'lvm'
+
+ def to_json(self) -> str:
+ return self.value
+
+
+class DeviceSelection(object):
+ """
+ Used within :class:`ceph.deployment.drive_group.DriveGroupSpec` to specify the devices
+ used by the Drive Group.
+
+ Any attributes (even none) can be included in the device
+ specification structure.
+ """
+
+ _supported_filters = [
+ "actuators", "paths", "size", "vendor", "model", "rotational", "limit", "all"
+ ]
+
+ def __init__(self,
+ actuators=None, # type: Optional[int]
+ paths=None, # type: Optional[List[Dict[str, str]]]
+ model=None, # type: Optional[str]
+ size=None, # type: Optional[str]
+ rotational=None, # type: Optional[bool]
+ limit=None, # type: Optional[int]
+ vendor=None, # type: Optional[str]
+ all=False, # type: bool
+ ):
+ """
+ ephemeral drive group device specification
+ """
+ self.actuators = actuators
+
+ #: List of Device objects for devices paths.
+
+ self.paths = []
+
+ if paths is not None:
+ for device in paths:
+ if isinstance(device, dict):
+ path: str = device.get("path", '')
+ self.paths.append(Device(path, crush_device_class=device.get("crush_device_class", None))) # noqa E501
+ else:
+ self.paths.append(Device(str(device)))
+
+ #: A wildcard string. e.g: "SDD*" or "SanDisk SD8SN8U5"
+ self.model = model
+
+ #: Match on the VENDOR property of the drive
+ self.vendor = vendor
+
+ #: Size specification of format LOW:HIGH.
+ #: Can also take the form :HIGH, LOW:
+ #: or an exact value (as ceph-volume inventory reports)
+ self.size: Optional[str] = size
+
+ #: is the drive rotating or not
+ self.rotational = rotational
+
+ #: Limit the number of devices added to this Drive Group. Devices
+ #: are used from top to bottom in the output of ``ceph-volume inventory``
+ self.limit = limit
+
+ #: Matches all devices. Can only be used for data devices
+ self.all = all
+
+ def validate(self, name: str) -> None:
+ props = [self.actuators, self.model, self.vendor, self.size,
+ self.rotational] # type: List[Any]
+ if self.paths and any(p is not None for p in props):
+ raise DriveGroupValidationError(
+ name,
+ 'device selection: `paths` and other parameters are mutually exclusive')
+ is_empty = not any(p is not None and p != [] for p in [self.paths] + props)
+ if not self.all and is_empty:
+ raise DriveGroupValidationError(name, 'device selection cannot be empty')
+
+ if self.all and not is_empty:
+ raise DriveGroupValidationError(
+ name,
+ 'device selection: `all` and other parameters are mutually exclusive. {}'.format(
+ repr(self)))
+
+ @classmethod
+ def from_json(cls, device_spec):
+ # type: (dict) -> Optional[DeviceSelection]
+ if not device_spec:
+ return None
+ for applied_filter in list(device_spec.keys()):
+ if applied_filter not in cls._supported_filters:
+ raise KeyError(applied_filter)
+
+ return cls(**device_spec)
+
+ def to_json(self):
+ # type: () -> Dict[str, Any]
+ ret: Dict[str, Any] = {}
+ if self.paths:
+ ret['paths'] = [p.path for p in self.paths]
+ if self.model:
+ ret['model'] = self.model
+ if self.vendor:
+ ret['vendor'] = self.vendor
+ if self.size:
+ ret['size'] = self.size
+ if self.rotational is not None:
+ ret['rotational'] = self.rotational
+ if self.limit:
+ ret['limit'] = self.limit
+ if self.all:
+ ret['all'] = self.all
+
+ return ret
+
+ def __repr__(self) -> str:
+ keys = [
+ key for key in self._supported_filters + ['limit'] if getattr(self, key) is not None
+ ]
+ if 'paths' in keys and self.paths == []:
+ keys.remove('paths')
+ return "DeviceSelection({})".format(
+ ', '.join('{}={}'.format(key, repr(getattr(self, key))) for key in keys)
+ )
+
+ def __eq__(self, other: Any) -> bool:
+ return repr(self) == repr(other)
+
+
+class DriveGroupValidationError(SpecValidationError):
+ """
+ Defining an exception here is a bit problematic, cause you cannot properly catch it,
+ if it was raised in a different mgr module.
+ """
+
+ def __init__(self, name: Optional[str], msg: str):
+ name = name or "<unnamed>"
+ super(DriveGroupValidationError, self).__init__(
+ f'Failed to validate OSD spec "{name}": {msg}')
+
+
+class DriveGroupSpec(ServiceSpec):
+ """
+ Describe a drive group in the same form that ceph-volume
+ understands.
+ """
+
+ _supported_features = [
+ "encrypted", "block_wal_size", "osds_per_device",
+ "db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type",
+ "data_devices", "db_devices", "wal_devices", "journal_devices",
+ "data_directories", "osds_per_device", "objectstore", "osd_id_claims",
+ "journal_size", "unmanaged", "filter_logic", "preview_only", "extra_container_args",
+ "extra_entrypoint_args", "data_allocate_fraction", "method", "crush_device_class", "config",
+ ]
+
+ def __init__(self,
+ placement=None, # type: Optional[PlacementSpec]
+ service_id=None, # type: Optional[str]
+ data_devices=None, # type: Optional[DeviceSelection]
+ db_devices=None, # type: Optional[DeviceSelection]
+ wal_devices=None, # type: Optional[DeviceSelection]
+ journal_devices=None, # type: Optional[DeviceSelection]
+ data_directories=None, # type: Optional[List[str]]
+ osds_per_device=None, # type: Optional[int]
+ objectstore='bluestore', # type: str
+ encrypted=False, # type: bool
+ db_slots=None, # type: Optional[int]
+ wal_slots=None, # type: Optional[int]
+ osd_id_claims=None, # type: Optional[Dict[str, List[str]]]
+ block_db_size=None, # type: Union[int, str, None]
+ block_wal_size=None, # type: Union[int, str, None]
+ journal_size=None, # type: Union[int, str, None]
+ service_type=None, # type: Optional[str]
+ unmanaged=False, # type: bool
+ filter_logic='AND', # type: str
+ preview_only=False, # type: bool
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ data_allocate_fraction=None, # type: Optional[float]
+ method=None, # type: Optional[OSDMethod]
+ config=None, # type: Optional[Dict[str, str]]
+ custom_configs=None, # type: Optional[List[CustomConfig]]
+ crush_device_class=None, # type: Optional[str]
+ ):
+ assert service_type is None or service_type == 'osd'
+ super(DriveGroupSpec, self).__init__('osd', service_id=service_id,
+ placement=placement,
+ config=config,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ #: A :class:`ceph.deployment.drive_group.DeviceSelection`
+ self.data_devices = data_devices
+
+ #: A :class:`ceph.deployment.drive_group.DeviceSelection`
+ self.db_devices = db_devices
+
+ #: A :class:`ceph.deployment.drive_group.DeviceSelection`
+ self.wal_devices = wal_devices
+
+ #: A :class:`ceph.deployment.drive_group.DeviceSelection`
+ self.journal_devices = journal_devices
+
+ #: Set (or override) the "bluestore_block_wal_size" value, in bytes
+ self.block_wal_size: Union[int, str, None] = block_wal_size
+
+ #: Set (or override) the "bluestore_block_db_size" value, in bytes
+ self.block_db_size: Union[int, str, None] = block_db_size
+
+ #: set journal_size in bytes
+ self.journal_size: Union[int, str, None] = journal_size
+
+ #: Number of osd daemons per "DATA" device.
+ #: To fully utilize nvme devices multiple osds are required.
+ #: Can be used to split dual-actuator devices across 2 OSDs, by setting the option to 2.
+ self.osds_per_device = osds_per_device
+
+ #: A list of strings, containing paths which should back OSDs
+ self.data_directories = data_directories
+
+ #: ``filestore`` or ``bluestore``
+ self.objectstore = objectstore
+
+ #: ``true`` or ``false``
+ self.encrypted = encrypted
+
+ #: How many OSDs per DB device
+ self.db_slots = db_slots
+
+ #: How many OSDs per WAL device
+ self.wal_slots = wal_slots
+
+ #: Optional: mapping of host -> List of osd_ids that should be replaced
+ #: See :ref:`orchestrator-osd-replace`
+ self.osd_id_claims = osd_id_claims or dict()
+
+ #: The logic gate we use to match disks with filters.
+ #: defaults to 'AND'
+ self.filter_logic = filter_logic.upper()
+
+ #: If this should be treated as a 'preview' spec
+ self.preview_only = preview_only
+
+ #: Allocate a fraction of the data device (0,1.0]
+ self.data_allocate_fraction = data_allocate_fraction
+
+ self.method = method
+
+ #: Crush device class to assign to OSDs
+ self.crush_device_class = crush_device_class
+
+ @classmethod
+ def _from_json_impl(cls, json_drive_group):
+ # type: (dict) -> DriveGroupSpec
+ """
+ Initialize 'Drive group' structure
+
+ :param json_drive_group: A valid json string with a Drive Group
+ specification
+ """
+ args: Dict[str, Any] = json_drive_group.copy()
+ # legacy json (pre Octopus)
+ if 'host_pattern' in args and 'placement' not in args:
+ args['placement'] = {'host_pattern': args['host_pattern']}
+ del args['host_pattern']
+
+ s_id = args.get('service_id', '<unnamed>')
+
+ # spec: was not mandatory in octopus
+ if 'spec' in args:
+ args['spec'].update(cls._drive_group_spec_from_json(s_id, args['spec']))
+ args.update(cls._drive_group_spec_from_json(
+ s_id, {k: v for k, v in args.items() if k != 'spec'}))
+
+ return super(DriveGroupSpec, cls)._from_json_impl(args)
+
+ @classmethod
+ def _drive_group_spec_from_json(cls, name: str, json_drive_group: dict) -> dict:
+ for applied_filter in list(json_drive_group.keys()):
+ if applied_filter not in cls._supported_features:
+ raise DriveGroupValidationError(
+ name,
+ "Feature `{}` is not supported".format(applied_filter))
+
+ try:
+ def to_selection(key: str, vals: dict) -> Optional[DeviceSelection]:
+ try:
+ return DeviceSelection.from_json(vals)
+ except KeyError as e:
+ raise DriveGroupValidationError(
+ f'{name}.{key}',
+ f"Filtering for `{e.args[0]}` is not supported")
+
+ args = {k: (to_selection(k, v) if k.endswith('_devices') else v) for k, v in
+ json_drive_group.items()}
+ if not args:
+ raise DriveGroupValidationError(name, "Didn't find drive selections")
+ return args
+ except (KeyError, TypeError) as e:
+ raise DriveGroupValidationError(name, str(e))
+
+ def validate(self):
+ # type: () -> None
+ super(DriveGroupSpec, self).validate()
+
+ if self.placement.is_empty():
+ raise DriveGroupValidationError(self.service_id, '`placement` required')
+
+ if self.data_devices is None:
+ raise DriveGroupValidationError(self.service_id, "`data_devices` element is required.")
+
+ specs_names = "data_devices db_devices wal_devices journal_devices".split()
+ specs = dict(zip(specs_names, [getattr(self, k) for k in specs_names]))
+ for k, s in [ks for ks in specs.items() if ks[1] is not None]:
+ assert s is not None
+ s.validate(f'{self.service_id}.{k}')
+ for s in filter(None, [self.db_devices, self.wal_devices, self.journal_devices]):
+ if s.all:
+ raise DriveGroupValidationError(
+ self.service_id,
+ "`all` is only allowed for data_devices")
+
+ if self.objectstore not in ('bluestore'):
+ raise DriveGroupValidationError(self.service_id,
+ f"{self.objectstore} is not supported. Must be "
+ f"one of ('bluestore')")
+
+ if self.block_wal_size is not None and type(self.block_wal_size) not in [int, str]:
+ raise DriveGroupValidationError(
+ self.service_id,
+ 'block_wal_size must be of type int or string')
+ if self.block_db_size is not None and type(self.block_db_size) not in [int, str]:
+ raise DriveGroupValidationError(
+ self.service_id,
+ 'block_db_size must be of type int or string')
+ if self.journal_size is not None and type(self.journal_size) not in [int, str]:
+ raise DriveGroupValidationError(
+ self.service_id,
+ 'journal_size must be of type int or string')
+
+ if self.filter_logic not in ['AND', 'OR']:
+ raise DriveGroupValidationError(
+ self.service_id,
+ 'filter_logic must be either <AND> or <OR>')
+
+ if self.method not in [None, 'lvm', 'raw']:
+ raise DriveGroupValidationError(
+ self.service_id,
+ 'method must be one of None, lvm, raw')
+ if self.method == 'raw' and self.objectstore == 'filestore':
+ raise DriveGroupValidationError(
+ self.service_id,
+ 'method raw only supports bluestore')
+
+ if self.data_devices.paths is not None:
+ for device in list(self.data_devices.paths):
+ if not device.path:
+ raise DriveGroupValidationError(self.service_id, 'Device path cannot be empty') # noqa E501
+
+
+yaml.add_representer(DriveGroupSpec, DriveGroupSpec.yaml_representer)
diff --git a/src/python-common/ceph/deployment/drive_selection/__init__.py b/src/python-common/ceph/deployment/drive_selection/__init__.py
new file mode 100644
index 000000000..994e2f2da
--- /dev/null
+++ b/src/python-common/ceph/deployment/drive_selection/__init__.py
@@ -0,0 +1,2 @@
+from .selector import DriveSelection # NOQA
+from .matchers import Matcher, SubstringMatcher, EqualityMatcher, AllMatcher, SizeMatcher # NOQA
diff --git a/src/python-common/ceph/deployment/drive_selection/example.yaml b/src/python-common/ceph/deployment/drive_selection/example.yaml
new file mode 100644
index 000000000..2851e7dbb
--- /dev/null
+++ b/src/python-common/ceph/deployment/drive_selection/example.yaml
@@ -0,0 +1,21 @@
+# default:
+# target: 'data*'
+# data_devices:
+# size: 20G
+# db_devices:
+# size: 10G
+# rotational: 1
+# allflash:
+# target: 'fast_nodes*'
+# data_devices:
+# size: 100G
+# db_devices:
+# size: 50G
+# rotational: 0
+
+# This is the default configuration and
+# will create an OSD on all available drives
+default:
+ target: 'fnmatch_target'
+ data_devices:
+ all: true
diff --git a/src/python-common/ceph/deployment/drive_selection/filter.py b/src/python-common/ceph/deployment/drive_selection/filter.py
new file mode 100644
index 000000000..0da1b5c39
--- /dev/null
+++ b/src/python-common/ceph/deployment/drive_selection/filter.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+
+import logging
+
+from ceph.deployment.drive_group import DeviceSelection
+
+try:
+ from typing import Generator
+except ImportError:
+ pass
+
+from .matchers import Matcher, SubstringMatcher, AllMatcher, SizeMatcher, EqualityMatcher
+
+logger = logging.getLogger(__name__)
+
+
+class FilterGenerator(object):
+ def __init__(self, device_filter):
+ # type: (DeviceSelection) -> None
+ self.device_filter = device_filter
+
+ def __iter__(self):
+ # type: () -> Generator[Matcher, None, None]
+ if self.device_filter.actuators:
+ yield EqualityMatcher('actuators', self.device_filter.actuators)
+ if self.device_filter.size:
+ yield SizeMatcher('size', self.device_filter.size)
+ if self.device_filter.model:
+ yield SubstringMatcher('model', self.device_filter.model)
+ if self.device_filter.vendor:
+ yield SubstringMatcher('vendor', self.device_filter.vendor)
+ if self.device_filter.rotational is not None:
+ val = '1' if self.device_filter.rotational else '0'
+ yield EqualityMatcher('rotational', val)
+ if self.device_filter.all:
+ yield AllMatcher('all', str(self.device_filter.all))
diff --git a/src/python-common/ceph/deployment/drive_selection/matchers.py b/src/python-common/ceph/deployment/drive_selection/matchers.py
new file mode 100644
index 000000000..df502410a
--- /dev/null
+++ b/src/python-common/ceph/deployment/drive_selection/matchers.py
@@ -0,0 +1,412 @@
+# -*- coding: utf-8 -*-
+
+from typing import Tuple, Optional, Any, Union, Iterator
+
+from ceph.deployment.inventory import Device
+
+import re
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class _MatchInvalid(Exception):
+ pass
+
+
+# pylint: disable=too-few-public-methods
+class Matcher(object):
+ """ The base class to all Matchers
+
+ It holds utility methods such as _get_disk_key
+ and handles the initialization.
+
+ """
+
+ def __init__(self, key, value):
+ # type: (str, Any) -> None
+ """ Initialization of Base class
+
+ :param str key: Attribute like 'model, size or vendor'
+ :param str value: Value of attribute like 'X123, 5G or samsung'
+ """
+ self.key = key
+ self.value = value
+ self.fallback_key = '' # type: Optional[str]
+
+ def _get_disk_key(self, device):
+ # type: (Device) -> Any
+ """ Helper method to safely extract values form the disk dict
+
+ There is a 'key' and a _optional_ 'fallback' key that can be used.
+ The reason for this is that the output of ceph-volume is not always
+ consistent (due to a bug currently, but you never know).
+ There is also a safety measure for a disk_key not existing on
+ virtual environments. ceph-volume apparently sources its information
+ from udev which seems to not populate certain fields on VMs.
+
+ :raises: A generic Exception when no disk_key could be found.
+ :return: A disk value
+ :rtype: str
+ """
+ # using the . notation, but some keys are nested, and hidden behind
+ # a different hierarchy, which makes it harder to access programatically
+ # hence, make it a dict.
+ disk = device.to_json()
+
+ def findkeys(node: Union[list, dict], key_val: str) -> Iterator[str]:
+ """ Find keys in non-flat dict recursively """
+ if isinstance(node, list):
+ for i in node:
+ for key in findkeys(i, key_val):
+ yield key
+ elif isinstance(node, dict):
+ if key_val in node:
+ yield node[key_val]
+ for j in node.values():
+ for key in findkeys(j, key_val):
+ yield key
+
+ disk_value = list(findkeys(disk, self.key))
+ if not disk_value and self.fallback_key:
+ disk_value = list(findkeys(disk, self.fallback_key))
+
+ if disk_value:
+ return disk_value[0]
+ else:
+ raise _MatchInvalid("No value found for {} or {}".format(
+ self.key, self.fallback_key))
+
+ def compare(self, disk):
+ # type: (Device) -> bool
+ """ Implements a valid comparison method for a SubMatcher
+ This will get overwritten by the individual classes
+
+ :param dict disk: A disk representation
+ """
+ raise NotImplementedError
+
+
+# pylint: disable=too-few-public-methods
+class SubstringMatcher(Matcher):
+ """ Substring matcher subclass
+ """
+
+ def __init__(self, key, value, fallback_key=None):
+ # type: (str, str, Optional[str]) -> None
+ Matcher.__init__(self, key, value)
+ self.fallback_key = fallback_key
+
+ def compare(self, disk):
+ # type: (Device) -> bool
+ """ Overwritten method to match substrings
+
+ This matcher does substring matching
+ :param dict disk: A disk representation (see base for examples)
+ :return: True/False if the match succeeded
+ :rtype: bool
+ """
+ if not disk:
+ return False
+ disk_value = self._get_disk_key(disk)
+ if str(self.value) in disk_value:
+ return True
+ return False
+
+
+# pylint: disable=too-few-public-methods
+class AllMatcher(Matcher):
+ """ All matcher subclass
+ """
+
+ def __init__(self, key, value, fallback_key=None):
+ # type: (str, Any, Optional[str]) -> None
+
+ Matcher.__init__(self, key, value)
+ self.fallback_key = fallback_key
+
+ def compare(self, disk):
+ # type: (Device) -> bool
+
+ """ Overwritten method to match all
+
+ A rather dumb matcher that just accepts all disks
+ (regardless of the value)
+ :param dict disk: A disk representation (see base for examples)
+ :return: always True
+ :rtype: bool
+ """
+ if not disk:
+ return False
+ return True
+
+
+# pylint: disable=too-few-public-methods
+class EqualityMatcher(Matcher):
+ """ Equality matcher subclass
+ """
+
+ def __init__(self, key, value):
+ # type: (str, Any) -> None
+
+ Matcher.__init__(self, key, value)
+
+ def compare(self, disk):
+ # type: (Device) -> bool
+
+ """ Overwritten method to match equality
+
+ This matcher does value comparison
+ :param dict disk: A disk representation
+ :return: True/False if the match succeeded
+ :rtype: bool
+ """
+ if not disk:
+ return False
+ disk_value = self._get_disk_key(disk)
+ ret = disk_value == self.value
+ if not ret:
+ logger.debug('{} != {}'.format(disk_value, self.value))
+ return ret
+
+
+class SizeMatcher(Matcher):
+ """ Size matcher subclass
+ """
+
+ SUFFIXES = (
+ ["KB", "MB", "GB", "TB"],
+ ["K", "M", "G", "T"],
+ [1e+3, 1e+6, 1e+9, 1e+12]
+ )
+
+ supported_suffixes = SUFFIXES[0] + SUFFIXES[1]
+
+ # pylint: disable=too-many-instance-attributes
+ def __init__(self, key, value):
+ # type: (str, str) -> None
+
+ # The 'key' value is overwritten here because
+ # the user_defined attribute does not necessarily
+ # correspond to the desired attribute
+ # requested from the inventory output
+ Matcher.__init__(self, key, value)
+ self.key = "human_readable_size"
+ self.fallback_key = "size"
+ self._high = None # type: Optional[str]
+ self._high_suffix = None # type: Optional[str]
+ self._low = None # type: Optional[str]
+ self._low_suffix = None # type: Optional[str]
+ self._exact = None # type: Optional[str]
+ self._exact_suffix = None # type: Optional[str]
+ self._parse_filter()
+
+ @property
+ def low(self):
+ # type: () -> Tuple[Optional[str], Optional[str]]
+ """ Getter for 'low' matchers
+ """
+ return self._low, self._low_suffix
+
+ @low.setter
+ def low(self, low):
+ # type: (Tuple[str, str]) -> None
+ """ Setter for 'low' matchers
+ """
+ self._low, self._low_suffix = low
+
+ @property
+ def high(self):
+ # type: () -> Tuple[Optional[str], Optional[str]]
+ """ Getter for 'high' matchers
+ """
+ return self._high, self._high_suffix
+
+ @high.setter
+ def high(self, high):
+ # type: (Tuple[str, str]) -> None
+ """ Setter for 'high' matchers
+ """
+ self._high, self._high_suffix = high
+
+ @property
+ def exact(self):
+ # type: () -> Tuple[Optional[str], Optional[str]]
+ """ Getter for 'exact' matchers
+ """
+ return self._exact, self._exact_suffix
+
+ @exact.setter
+ def exact(self, exact):
+ # type: (Tuple[str, str]) -> None
+ """ Setter for 'exact' matchers
+ """
+ self._exact, self._exact_suffix = exact
+
+ @classmethod
+ def _normalize_suffix(cls, suffix):
+ # type: (str) -> str
+ """ Normalize any supported suffix
+
+ Since the Drive Groups are user facing, we simply
+ can't make sure that all users type in the requested
+ form. That's why we have to internally agree on one format.
+ It also checks if any of the supported suffixes was used
+ and raises an Exception otherwise.
+
+ :param str suffix: A suffix ('G') or ('M')
+ :return: A normalized output
+ :rtype: str
+ """
+ suffix = suffix.upper()
+ if suffix not in cls.supported_suffixes:
+ raise _MatchInvalid("Unit '{}' not supported".format(suffix))
+ return dict(zip(
+ cls.SUFFIXES[1],
+ cls.SUFFIXES[0],
+ )).get(suffix, suffix)
+
+ @classmethod
+ def _parse_suffix(cls, obj):
+ # type: (str) -> str
+ """ Wrapper method to find and normalize a prefix
+
+ :param str obj: A size filtering string ('10G')
+ :return: A normalized unit ('GB')
+ :rtype: str
+ """
+ return cls._normalize_suffix(re.findall(r"[a-zA-Z]+", obj)[0])
+
+ @classmethod
+ def _get_k_v(cls, data):
+ # type: (str) -> Tuple[str, str]
+ """ Helper method to extract data from a string
+
+ It uses regex to extract all digits and calls _parse_suffix
+ which also uses a regex to extract all letters and normalizes
+ the resulting suffix.
+
+ :param str data: A size filtering string ('10G')
+ :return: A Tuple with normalized output (10, 'GB')
+ :rtype: tuple
+ """
+ return re.findall(r"\d+\.?\d*", data)[0], cls._parse_suffix(data)
+
+ def _parse_filter(self) -> None:
+ """ Identifies which type of 'size' filter is applied
+
+ There are four different filtering modes:
+
+ 1) 10G:50G (high-low)
+ At least 10G but at max 50G of size
+
+ 2) :60G
+ At max 60G of size
+
+ 3) 50G:
+ At least 50G of size
+
+ 4) 20G
+ Exactly 20G in size
+
+ This method uses regex to identify and extract this information
+ and raises if none could be found.
+ """
+ low_high = re.match(r"\d+[A-Z]{1,2}:\d+[A-Z]{1,2}", self.value)
+ if low_high is not None:
+ lowpart, highpart = low_high.group().split(":")
+ self.low = self._get_k_v(lowpart)
+ self.high = self._get_k_v(highpart)
+
+ low = re.match(r"\d+[A-Z]{1,2}:$", self.value)
+ if low:
+ self.low = self._get_k_v(low.group())
+
+ high = re.match(r"^:\d+[A-Z]{1,2}", self.value)
+ if high:
+ self.high = self._get_k_v(high.group())
+
+ exact = re.match(r"^\d+\.?\d*[A-Z]{1,2}$", self.value)
+ if exact:
+ self.exact = self._get_k_v(exact.group())
+
+ if not self.low and not self.high and not self.exact:
+ raise _MatchInvalid("Couldn't parse {}".format(self.value))
+
+ @staticmethod
+ # pylint: disable=inconsistent-return-statements
+ def to_byte(tpl):
+ # type: (Tuple[Optional[str], Optional[str]]) -> float
+
+ """ Convert any supported unit to bytes
+
+ :param tuple tpl: A tuple with ('10', 'GB')
+ :return: The converted byte value
+ :rtype: float
+ """
+ val_str, suffix = tpl
+ value = float(val_str) if val_str is not None else 0.0
+ return dict(zip(
+ SizeMatcher.SUFFIXES[0],
+ SizeMatcher.SUFFIXES[2],
+ )).get(str(suffix), 0.00) * value
+
+ @staticmethod
+ def str_to_byte(input):
+ # type: (str) -> float
+ return SizeMatcher.to_byte(SizeMatcher._get_k_v(input))
+
+ # pylint: disable=inconsistent-return-statements, too-many-return-statements
+ def compare(self, disk):
+ # type: (Device) -> bool
+ """ Convert MB/GB/TB down to bytes and compare
+
+ 1) Extracts information from the to-be-inspected disk.
+ 2) Depending on the mode, apply checks and return
+
+ # This doesn't seem very solid and _may_
+ be re-factored
+
+
+ """
+ if not disk:
+ return False
+ disk_value = self._get_disk_key(disk)
+ # This doesn't necessarily have to be a float.
+ # The current output from ceph-volume gives a float..
+ # This may change in the future..
+ # todo: harden this paragraph
+ if not disk_value:
+ logger.warning("Could not retrieve value for disk")
+ return False
+
+ disk_size = re.findall(r"\d+\.\d+", disk_value)[0]
+ disk_suffix = self._parse_suffix(disk_value)
+ disk_size_in_byte = self.to_byte((disk_size, disk_suffix))
+
+ if all(self.high) and all(self.low):
+ if disk_size_in_byte <= self.to_byte(
+ self.high) and disk_size_in_byte >= self.to_byte(self.low):
+ return True
+ # is a else: return False necessary here?
+ # (and in all other branches)
+ logger.debug("Disk didn't match for 'high/low' filter")
+
+ elif all(self.low) and not all(self.high):
+ if disk_size_in_byte >= self.to_byte(self.low):
+ return True
+ logger.debug("Disk didn't match for 'low' filter")
+
+ elif all(self.high) and not all(self.low):
+ if disk_size_in_byte <= self.to_byte(self.high):
+ return True
+ logger.debug("Disk didn't match for 'high' filter")
+
+ elif all(self.exact):
+ if disk_size_in_byte == self.to_byte(self.exact):
+ return True
+ logger.debug("Disk didn't match for 'exact' filter")
+ else:
+ logger.debug("Neither high, low, nor exact was given")
+ raise _MatchInvalid("No filters applied")
+ return False
diff --git a/src/python-common/ceph/deployment/drive_selection/selector.py b/src/python-common/ceph/deployment/drive_selection/selector.py
new file mode 100644
index 000000000..1b3bfbb4e
--- /dev/null
+++ b/src/python-common/ceph/deployment/drive_selection/selector.py
@@ -0,0 +1,191 @@
+import logging
+
+from typing import List, Optional, Dict, Callable
+
+from ..inventory import Device
+from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError
+
+from .filter import FilterGenerator
+from .matchers import _MatchInvalid
+
+logger = logging.getLogger(__name__)
+
+
+def to_dg_exception(f: Callable) -> Callable[['DriveSelection', str,
+ Optional['DeviceSelection']],
+ List['Device']]:
+ def wrapper(self: 'DriveSelection', name: str, ds: Optional['DeviceSelection']) -> List[Device]:
+ try:
+ return f(self, ds)
+ except _MatchInvalid as e:
+ raise DriveGroupValidationError(f'{self.spec.service_id}.{name}', e.args[0])
+ return wrapper
+
+
+class DriveSelection(object):
+ def __init__(self,
+ spec, # type: DriveGroupSpec
+ disks, # type: List[Device]
+ existing_daemons=None, # type: Optional[int]
+ ):
+ self.disks = disks.copy()
+ self.spec = spec
+ self.existing_daemons = existing_daemons or 0
+
+ self._data = self.assign_devices('data_devices', self.spec.data_devices)
+ self._wal = self.assign_devices('wal_devices', self.spec.wal_devices)
+ self._db = self.assign_devices('db_devices', self.spec.db_devices)
+ self._journal = self.assign_devices('journal_devices', self.spec.journal_devices)
+
+ def data_devices(self):
+ # type: () -> List[Device]
+ return self._data
+
+ def wal_devices(self):
+ # type: () -> List[Device]
+ return self._wal
+
+ def db_devices(self):
+ # type: () -> List[Device]
+ return self._db
+
+ def journal_devices(self):
+ # type: () -> List[Device]
+ return self._journal
+
+ def _limit_reached(self, device_filter, len_devices,
+ disk_path):
+ # type: (DeviceSelection, int, str) -> bool
+ """ Check for the <limit> property and apply logic
+
+ If a limit is set in 'device_attrs' we have to stop adding
+ disks at some point.
+
+ If limit is set (>0) and len(devices) >= limit
+
+ :param int len_devices: Length of the already populated device set/list
+ :param str disk_path: The disk identifier (for logging purposes)
+ :return: True/False if the device should be added to the list of devices
+ :rtype: bool
+ """
+ limit = device_filter.limit or 0
+
+ if limit > 0 and (len_devices + self.existing_daemons >= limit):
+ logger.debug("Refuse to add {} due to limit policy of <{}>".format(
+ disk_path, limit))
+ return True
+ return False
+
+ @staticmethod
+ def _has_mandatory_idents(disk):
+ # type: (Device) -> bool
+ """ Check for mandatory identification fields
+ """
+ if disk.path:
+ logger.debug("Found matching disk: {}".format(disk.path))
+ return True
+ else:
+ raise Exception(
+ "Disk {} doesn't have a 'path' identifier".format(disk))
+
+ @to_dg_exception
+ def assign_devices(self, device_filter):
+ # type: (Optional[DeviceSelection]) -> List[Device]
+ """ Assign drives based on used filters
+
+ Do not add disks when:
+
+ 1) Filter didn't match
+ 2) Disk doesn't have a mandatory identification item (path)
+ 3) The set :limit was reached
+
+ After the disk was added we make sure not to re-assign this disk
+ for another defined type[wal/db/journal devices]
+
+ return a sorted(by path) list of devices
+ """
+
+ if not device_filter:
+ logger.debug('device_filter is None')
+ return []
+
+ if not self.spec.data_devices:
+ logger.debug('data_devices is None')
+ return []
+
+ if device_filter.paths:
+ logger.debug('device filter is using explicit paths')
+ return device_filter.paths
+
+ devices = list() # type: List[Device]
+ for disk in self.disks:
+ logger.debug("Processing disk {}".format(disk.path))
+
+ if not disk.available and not disk.ceph_device:
+ logger.debug(
+ ("Ignoring disk {}. "
+ "Disk is unavailable due to {}".format(disk.path, disk.rejected_reasons))
+ )
+ continue
+
+ if not disk.available and disk.ceph_device and disk.lvs:
+ other_osdspec_affinity = ''
+ for lv in disk.lvs:
+ if 'osdspec_affinity' in lv.keys():
+ if lv['osdspec_affinity'] != self.spec.service_id:
+ other_osdspec_affinity = lv['osdspec_affinity']
+ break
+ if other_osdspec_affinity:
+ logger.debug("{} is already used in spec {}, "
+ "skipping it.".format(disk.path, other_osdspec_affinity))
+ continue
+
+ if not self._has_mandatory_idents(disk):
+ logger.debug(
+ "Ignoring disk {}. Missing mandatory idents".format(
+ disk.path))
+ continue
+
+ # break on this condition.
+ if self._limit_reached(device_filter, len(devices), disk.path):
+ logger.debug("Ignoring disk {}. Limit reached".format(
+ disk.path))
+ break
+
+ if disk in devices:
+ continue
+
+ if self.spec.filter_logic == 'AND':
+ if not all(m.compare(disk) for m in FilterGenerator(device_filter)):
+ logger.debug(
+ "Ignoring disk {}. Not all filter did match the disk".format(
+ disk.path))
+ continue
+
+ if self.spec.filter_logic == 'OR':
+ if not any(m.compare(disk) for m in FilterGenerator(device_filter)):
+ logger.debug(
+ "Ignoring disk {}. No filter matched the disk".format(
+ disk.path))
+ continue
+
+ logger.debug('Adding disk {}'.format(disk.path))
+ devices.append(disk)
+
+ # This disk is already taken and must not be re-assigned.
+ for taken_device in devices:
+ if taken_device in self.disks:
+ self.disks.remove(taken_device)
+
+ return sorted([x for x in devices], key=lambda dev: dev.path)
+
+ def __repr__(self) -> str:
+ selection: Dict[str, List[str]] = {
+ 'data devices': [d.path for d in self._data],
+ 'wal_devices': [d.path for d in self._wal],
+ 'db devices': [d.path for d in self._db],
+ 'journal devices': [d.path for d in self._journal]
+ }
+ return "DeviceSelection({})".format(
+ ', '.join('{}={}'.format(key, selection[key]) for key in selection.keys())
+ )
diff --git a/src/python-common/ceph/deployment/hostspec.py b/src/python-common/ceph/deployment/hostspec.py
new file mode 100644
index 000000000..0c448bf13
--- /dev/null
+++ b/src/python-common/ceph/deployment/hostspec.py
@@ -0,0 +1,137 @@
+from collections import OrderedDict
+import errno
+import re
+from typing import Optional, List, Any, Dict
+
+
+def assert_valid_host(name: str) -> None:
+ p = re.compile('^[a-zA-Z0-9-]+$')
+ try:
+ assert len(name) <= 250, 'name is too long (max 250 chars)'
+ for part in name.split('.'):
+ assert len(part) > 0, '.-delimited name component must not be empty'
+ assert len(part) <= 63, '.-delimited name component must not be more than 63 chars'
+ assert p.match(part), 'name component must include only a-z, 0-9, and -'
+ except AssertionError as e:
+ raise SpecValidationError(str(e) + f'. Got "{name}"')
+
+
+class SpecValidationError(Exception):
+ """
+ Defining an exception here is a bit problematic, cause you cannot properly catch it,
+ if it was raised in a different mgr module.
+ """
+ def __init__(self,
+ msg: str,
+ errno: int = -errno.EINVAL):
+ super(SpecValidationError, self).__init__(msg)
+ self.errno = errno
+
+
+class HostSpec(object):
+ """
+ Information about hosts. Like e.g. ``kubectl get nodes``
+ """
+ def __init__(self,
+ hostname: str,
+ addr: Optional[str] = None,
+ labels: Optional[List[str]] = None,
+ status: Optional[str] = None,
+ location: Optional[Dict[str, str]] = None,
+ ):
+ self.service_type = 'host'
+
+ #: the bare hostname on the host. Not the FQDN.
+ self.hostname = hostname # type: str
+
+ #: DNS name or IP address to reach it
+ self.addr = addr or hostname # type: str
+
+ #: label(s), if any
+ self.labels = labels or [] # type: List[str]
+
+ #: human readable status
+ self.status = status or '' # type: str
+
+ self.location = location
+
+ def validate(self) -> None:
+ assert_valid_host(self.hostname)
+
+ def to_json(self) -> Dict[str, Any]:
+ r: Dict[str, Any] = {
+ 'hostname': self.hostname,
+ 'addr': self.addr,
+ 'labels': list(OrderedDict.fromkeys((self.labels))),
+ 'status': self.status,
+ }
+ if self.location:
+ r['location'] = self.location
+ return r
+
+ @classmethod
+ def from_json(cls, host_spec: dict) -> 'HostSpec':
+ host_spec = cls.normalize_json(host_spec)
+ _cls = cls(
+ host_spec['hostname'],
+ host_spec['addr'] if 'addr' in host_spec else None,
+ list(OrderedDict.fromkeys(
+ host_spec['labels'])) if 'labels' in host_spec else None,
+ host_spec['status'] if 'status' in host_spec else None,
+ host_spec.get('location'),
+ )
+ return _cls
+
+ @staticmethod
+ def normalize_json(host_spec: dict) -> dict:
+ labels = host_spec.get('labels')
+ if labels is not None:
+ if isinstance(labels, str):
+ host_spec['labels'] = [labels]
+ elif (
+ not isinstance(labels, list)
+ or any(not isinstance(v, str) for v in labels)
+ ):
+ raise SpecValidationError(
+ f'Labels ({labels}) must be a string or list of strings'
+ )
+
+ loc = host_spec.get('location')
+ if loc is not None:
+ if (
+ not isinstance(loc, dict)
+ or any(not isinstance(k, str) for k in loc.keys())
+ or any(not isinstance(v, str) for v in loc.values())
+ ):
+ raise SpecValidationError(
+ f'Location ({loc}) must be a dictionary of strings to strings'
+ )
+
+ return host_spec
+
+ def __repr__(self) -> str:
+ args = [self.hostname] # type: List[Any]
+ if self.addr is not None:
+ args.append(self.addr)
+ if self.labels:
+ args.append(self.labels)
+ if self.status:
+ args.append(self.status)
+ if self.location:
+ args.append(self.location)
+
+ return "HostSpec({})".format(', '.join(map(repr, args)))
+
+ def __str__(self) -> str:
+ if self.hostname != self.addr:
+ return f'{self.hostname} ({self.addr})'
+ return self.hostname
+
+ def __eq__(self, other: Any) -> bool:
+ # Let's omit `status` for the moment, as it is still the very same host.
+ if not isinstance(other, HostSpec):
+ return NotImplemented
+ return self.hostname == other.hostname and \
+ self.addr == other.addr and \
+ sorted(self.labels) == sorted(other.labels) and \
+ self.location == other.location
diff --git a/src/python-common/ceph/deployment/inventory.py b/src/python-common/ceph/deployment/inventory.py
new file mode 100644
index 000000000..a30238821
--- /dev/null
+++ b/src/python-common/ceph/deployment/inventory.py
@@ -0,0 +1,138 @@
+try:
+ from typing import List, Optional, Dict, Any, Union
+except ImportError:
+ pass # for type checking
+
+from ceph.utils import datetime_now, datetime_to_str, str_to_datetime
+import datetime
+import json
+
+
+class Devices(object):
+ """
+ A container for Device instances with reporting
+ """
+
+ def __init__(self, devices):
+ # type: (List[Device]) -> None
+ # sort devices by path name so ordering is consistent
+ self.devices: List[Device] = sorted(devices, key=lambda d: d.path if d.path else '')
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, Devices):
+ return NotImplemented
+ if len(self.devices) != len(other.devices):
+ return False
+ for d1, d2 in zip(other.devices, self.devices):
+ if d1 != d2:
+ return False
+ return True
+
+ def to_json(self):
+ # type: () -> List[dict]
+ return [d.to_json() for d in self.devices]
+
+ @classmethod
+ def from_json(cls, input):
+ # type: (List[Dict[str, Any]]) -> Devices
+ return cls([Device.from_json(i) for i in input])
+
+ def copy(self):
+ # type: () -> Devices
+ return Devices(devices=list(self.devices))
+
+
+class Device(object):
+ report_fields = [
+ 'ceph_device',
+ 'rejected_reasons',
+ 'available',
+ 'path',
+ 'sys_api',
+ 'created',
+ 'lvs',
+ 'human_readable_type',
+ 'device_id',
+ 'lsm_data',
+ 'crush_device_class'
+ ]
+
+ def __init__(self,
+ path, # type: str
+ sys_api=None, # type: Optional[Dict[str, Any]]
+ available=None, # type: Optional[bool]
+ rejected_reasons=None, # type: Optional[List[str]]
+ lvs=None, # type: Optional[List[Dict[str, str]]]
+ device_id=None, # type: Optional[str]
+ lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]]
+ created=None, # type: Optional[datetime.datetime]
+ ceph_device=None, # type: Optional[bool]
+ crush_device_class=None # type: Optional[str]
+ ):
+
+ self.path = path
+ self.sys_api = sys_api if sys_api is not None else {} # type: Dict[str, Any]
+ self.available = available
+ self.rejected_reasons = rejected_reasons if rejected_reasons is not None else []
+ self.lvs = lvs
+ self.device_id = device_id
+ self.lsm_data = lsm_data if lsm_data is not None else {} # type: Dict[str, Dict[str, str]]
+ self.created = created if created is not None else datetime_now()
+ self.ceph_device = ceph_device
+ self.crush_device_class = crush_device_class
+
+ def __eq__(self, other):
+ # type: (Any) -> bool
+ if not isinstance(other, Device):
+ return NotImplemented
+ diff = [k for k in self.report_fields if k != 'created' and (getattr(self, k)
+ != getattr(other, k))]
+ return not diff
+
+ def to_json(self):
+ # type: () -> dict
+ return {
+ k: (getattr(self, k) if k != 'created'
+ or not isinstance(getattr(self, k), datetime.datetime)
+ else datetime_to_str(getattr(self, k)))
+ for k in self.report_fields
+ }
+
+ @classmethod
+ def from_json(cls, input):
+ # type: (Dict[str, Any]) -> Device
+ if not isinstance(input, dict):
+ raise ValueError('Device: Expected dict. Got `{}...`'.format(json.dumps(input)[:10]))
+ ret = cls(
+ **{
+ key: (input.get(key, None) if key != 'created'
+ or not input.get(key, None)
+ else str_to_datetime(input.get(key, None)))
+ for key in Device.report_fields
+ if key != 'human_readable_type'
+ }
+ )
+ if ret.rejected_reasons:
+ ret.rejected_reasons = sorted(ret.rejected_reasons)
+ return ret
+
+ @property
+ def human_readable_type(self):
+ # type: () -> str
+ if self.sys_api is None or 'rotational' not in self.sys_api:
+ return "unknown"
+ return 'hdd' if self.sys_api["rotational"] == "1" else 'ssd'
+
+ def __repr__(self) -> str:
+ device_desc: Dict[str, Union[str, List[str], List[Dict[str, str]]]] = {
+ 'path': self.path if self.path is not None else 'unknown',
+ 'lvs': self.lvs if self.lvs else 'None',
+ 'available': str(self.available),
+ 'ceph_device': str(self.ceph_device),
+ 'crush_device_class': str(self.crush_device_class)
+ }
+ if not self.available and self.rejected_reasons:
+ device_desc['rejection reasons'] = self.rejected_reasons
+ return "Device({})".format(
+ ', '.join('{}={}'.format(key, device_desc[key]) for key in device_desc.keys())
+ )
diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py
new file mode 100644
index 000000000..be9f3e8ea
--- /dev/null
+++ b/src/python-common/ceph/deployment/service_spec.py
@@ -0,0 +1,2011 @@
+import fnmatch
+import os
+import re
+import enum
+from collections import OrderedDict
+from contextlib import contextmanager
+from functools import wraps
+from ipaddress import ip_network, ip_address
+from typing import Optional, Dict, Any, List, Union, Callable, Iterable, Type, TypeVar, cast, \
+ NamedTuple, Mapping, Iterator
+
+import yaml
+
+from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host
+from ceph.deployment.utils import unwrap_ipv6, valid_addr
+from ceph.utils import is_hex
+
+ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec')
+FuncT = TypeVar('FuncT', bound=Callable)
+
+
+def handle_type_error(method: FuncT) -> FuncT:
+ @wraps(method)
+ def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
+ try:
+ return method(cls, *args, **kwargs)
+ except (TypeError, AttributeError) as e:
+ error_msg = '{}: {}'.format(cls.__name__, e)
+ raise SpecValidationError(error_msg)
+ return cast(FuncT, inner)
+
+
+class HostPlacementSpec(NamedTuple):
+ hostname: str
+ network: str
+ name: str
+
+ def __str__(self) -> str:
+ res = ''
+ res += self.hostname
+ if self.network:
+ res += ':' + self.network
+ if self.name:
+ res += '=' + self.name
+ return res
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data: Union[dict, str]) -> 'HostPlacementSpec':
+ if isinstance(data, str):
+ return cls.parse(data)
+ return cls(**data)
+
+ def to_json(self) -> str:
+ return str(self)
+
+ @classmethod
+ def parse(cls, host, require_network=True):
+ # type: (str, bool) -> HostPlacementSpec
+ """
+ Split host into host, network, and (optional) daemon name parts. The network
+ part can be an IP, CIDR, or ceph addrvec like '[v2:1.2.3.4:3300,v1:1.2.3.4:6789]'.
+ e.g.,
+ "myhost"
+ "myhost=name"
+ "myhost:1.2.3.4"
+ "myhost:1.2.3.4=name"
+ "myhost:1.2.3.0/24"
+ "myhost:1.2.3.0/24=name"
+ "myhost:[v2:1.2.3.4:3000]=name"
+ "myhost:[v2:1.2.3.4:3000,v1:1.2.3.4:6789]=name"
+ """
+ # Matches from start to : or = or until end of string
+ host_re = r'^(.*?)(:|=|$)'
+ # Matches from : to = or until end of string
+ ip_re = r':(.*?)(=|$)'
+ # Matches from = to end of string
+ name_re = r'=(.*?)$'
+
+ # assign defaults
+ host_spec = cls('', '', '')
+
+ match_host = re.search(host_re, host)
+ if match_host:
+ host_spec = host_spec._replace(hostname=match_host.group(1))
+
+ name_match = re.search(name_re, host)
+ if name_match:
+ host_spec = host_spec._replace(name=name_match.group(1))
+
+ ip_match = re.search(ip_re, host)
+ if ip_match:
+ host_spec = host_spec._replace(network=ip_match.group(1))
+
+ if not require_network:
+ return host_spec
+
+ networks = list() # type: List[str]
+ network = host_spec.network
+ # in case we have [v2:1.2.3.4:3000,v1:1.2.3.4:6478]
+ if ',' in network:
+ networks = [x for x in network.split(',')]
+ else:
+ if network != '':
+ networks.append(network)
+
+ for network in networks:
+ # only if we have versioned network configs
+ if network.startswith('v') or network.startswith('[v'):
+ # if this is ipv6 we can't just simply split on ':' so do
+ # a split once and rsplit once to leave us with just ipv6 addr
+ network = network.split(':', 1)[1]
+ network = network.rsplit(':', 1)[0]
+ try:
+ # if subnets are defined, also verify the validity
+ if '/' in network:
+ ip_network(network)
+ else:
+ ip_address(unwrap_ipv6(network))
+ except ValueError as e:
+ # logging?
+ raise e
+ host_spec.validate()
+ return host_spec
+
+ def validate(self) -> None:
+ assert_valid_host(self.hostname)
+
+
+class PlacementSpec(object):
+ """
+ For APIs that need to specify a host subset
+ """
+
+ def __init__(self,
+ label=None, # type: Optional[str]
+ hosts=None, # type: Union[List[str],List[HostPlacementSpec], None]
+ count=None, # type: Optional[int]
+ count_per_host=None, # type: Optional[int]
+ host_pattern=None, # type: Optional[str]
+ ):
+ # type: (...) -> None
+ self.label = label
+ self.hosts = [] # type: List[HostPlacementSpec]
+
+ if hosts:
+ self.set_hosts(hosts)
+
+ self.count = count # type: Optional[int]
+ self.count_per_host = count_per_host # type: Optional[int]
+
+ #: fnmatch patterns to select hosts. Can also be a single host.
+ self.host_pattern = host_pattern # type: Optional[str]
+
+ self.validate()
+
+ def is_empty(self) -> bool:
+ return (
+ self.label is None
+ and not self.hosts
+ and not self.host_pattern
+ and self.count is None
+ and self.count_per_host is None
+ )
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, PlacementSpec):
+ return self.label == other.label \
+ and self.hosts == other.hosts \
+ and self.count == other.count \
+ and self.host_pattern == other.host_pattern \
+ and self.count_per_host == other.count_per_host
+ return NotImplemented
+
+ def set_hosts(self, hosts: Union[List[str], List[HostPlacementSpec]]) -> None:
+ # To backpopulate the .hosts attribute when using labels or count
+ # in the orchestrator backend.
+ if all([isinstance(host, HostPlacementSpec) for host in hosts]):
+ self.hosts = hosts # type: ignore
+ else:
+ self.hosts = [HostPlacementSpec.parse(x, require_network=False) # type: ignore
+ for x in hosts if x]
+
+ # deprecated
+ def filter_matching_hosts(self, _get_hosts_func: Callable) -> List[str]:
+ return self.filter_matching_hostspecs(_get_hosts_func(as_hostspec=True))
+
+ def filter_matching_hostspecs(self, hostspecs: Iterable[HostSpec]) -> List[str]:
+ if self.hosts:
+ all_hosts = [hs.hostname for hs in hostspecs]
+ return [h.hostname for h in self.hosts if h.hostname in all_hosts]
+ if self.label:
+ return [hs.hostname for hs in hostspecs if self.label in hs.labels]
+ all_hosts = [hs.hostname for hs in hostspecs]
+ if self.host_pattern:
+ return fnmatch.filter(all_hosts, self.host_pattern)
+ return all_hosts
+
+ def get_target_count(self, hostspecs: Iterable[HostSpec]) -> int:
+ if self.count:
+ return self.count
+ return len(self.filter_matching_hostspecs(hostspecs)) * (self.count_per_host or 1)
+
+ def pretty_str(self) -> str:
+ """
+ >>> #doctest: +SKIP
+ ... ps = PlacementSpec(...) # For all placement specs:
+ ... PlacementSpec.from_string(ps.pretty_str()) == ps
+ """
+ kv = []
+ if self.hosts:
+ kv.append(';'.join([str(h) for h in self.hosts]))
+ if self.count:
+ kv.append('count:%d' % self.count)
+ if self.count_per_host:
+ kv.append('count-per-host:%d' % self.count_per_host)
+ if self.label:
+ kv.append('label:%s' % self.label)
+ if self.host_pattern:
+ kv.append(self.host_pattern)
+ return ';'.join(kv)
+
+ def __repr__(self) -> str:
+ kv = []
+ if self.count:
+ kv.append('count=%d' % self.count)
+ if self.count_per_host:
+ kv.append('count_per_host=%d' % self.count_per_host)
+ if self.label:
+ kv.append('label=%s' % repr(self.label))
+ if self.hosts:
+ kv.append('hosts={!r}'.format(self.hosts))
+ if self.host_pattern:
+ kv.append('host_pattern={!r}'.format(self.host_pattern))
+ return "PlacementSpec(%s)" % ', '.join(kv)
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data: dict) -> 'PlacementSpec':
+ c = data.copy()
+ hosts = c.get('hosts', [])
+ if hosts:
+ c['hosts'] = []
+ for host in hosts:
+ c['hosts'].append(HostPlacementSpec.from_json(host))
+ _cls = cls(**c)
+ _cls.validate()
+ return _cls
+
+ def to_json(self) -> dict:
+ r: Dict[str, Any] = {}
+ if self.label:
+ r['label'] = self.label
+ if self.hosts:
+ r['hosts'] = [host.to_json() for host in self.hosts]
+ if self.count:
+ r['count'] = self.count
+ if self.count_per_host:
+ r['count_per_host'] = self.count_per_host
+ if self.host_pattern:
+ r['host_pattern'] = self.host_pattern
+ return r
+
+ def validate(self) -> None:
+ if self.hosts and self.label:
+ # TODO: a less generic Exception
+ raise SpecValidationError('Host and label are mutually exclusive')
+ if self.count is not None:
+ try:
+ intval = int(self.count)
+ except (ValueError, TypeError):
+ raise SpecValidationError("num/count must be a numeric value")
+ if self.count != intval:
+ raise SpecValidationError("num/count must be an integer value")
+ if self.count < 1:
+ raise SpecValidationError("num/count must be >= 1")
+ if self.count_per_host is not None:
+ try:
+ intval = int(self.count_per_host)
+ except (ValueError, TypeError):
+ raise SpecValidationError("count-per-host must be a numeric value")
+ if self.count_per_host != intval:
+ raise SpecValidationError("count-per-host must be an integer value")
+ if self.count_per_host < 1:
+ raise SpecValidationError("count-per-host must be >= 1")
+ if self.count_per_host is not None and not (
+ self.label
+ or self.hosts
+ or self.host_pattern
+ ):
+ raise SpecValidationError(
+ "count-per-host must be combined with label or hosts or host_pattern"
+ )
+ if self.count is not None and self.count_per_host is not None:
+ raise SpecValidationError("cannot combine count and count-per-host")
+ if (
+ self.count_per_host is not None
+ and self.hosts
+ and any([hs.network or hs.name for hs in self.hosts])
+ ):
+ raise SpecValidationError(
+ "count-per-host cannot be combined explicit placement with names or networks"
+ )
+ if self.host_pattern:
+ if not isinstance(self.host_pattern, str):
+ raise SpecValidationError('host_pattern must be of type string')
+ if self.hosts:
+ raise SpecValidationError('cannot combine host patterns and hosts')
+
+ for h in self.hosts:
+ h.validate()
+
+ @classmethod
+ def from_string(cls, arg):
+ # type: (Optional[str]) -> PlacementSpec
+ """
+ A single integer is parsed as a count:
+
+ >>> PlacementSpec.from_string('3')
+ PlacementSpec(count=3)
+
+ A list of names is parsed as host specifications:
+
+ >>> PlacementSpec.from_string('host1 host2')
+ PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacemen\
+tSpec(hostname='host2', network='', name='')])
+
+ You can also prefix the hosts with a count as follows:
+
+ >>> PlacementSpec.from_string('2 host1 host2')
+ PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), Hos\
+tPlacementSpec(hostname='host2', network='', name='')])
+
+ You can specify labels using `label:<label>`
+
+ >>> PlacementSpec.from_string('label:mon')
+ PlacementSpec(label='mon')
+
+ Labels also support a count:
+
+ >>> PlacementSpec.from_string('3 label:mon')
+ PlacementSpec(count=3, label='mon')
+
+ fnmatch is also supported:
+
+ >>> PlacementSpec.from_string('data[1-3]')
+ PlacementSpec(host_pattern='data[1-3]')
+
+ >>> PlacementSpec.from_string(None)
+ PlacementSpec()
+ """
+ if arg is None or not arg:
+ strings = []
+ elif isinstance(arg, str):
+ if ' ' in arg:
+ strings = arg.split(' ')
+ elif ';' in arg:
+ strings = arg.split(';')
+ elif ',' in arg and '[' not in arg:
+ # FIXME: this isn't quite right. we want to avoid breaking
+ # a list of mons with addrvecs... so we're basically allowing
+ # , most of the time, except when addrvecs are used. maybe
+ # ok?
+ strings = arg.split(',')
+ else:
+ strings = [arg]
+ else:
+ raise SpecValidationError('invalid placement %s' % arg)
+
+ count = None
+ count_per_host = None
+ if strings:
+ try:
+ count = int(strings[0])
+ strings = strings[1:]
+ except ValueError:
+ pass
+ for s in strings:
+ if s.startswith('count:'):
+ try:
+ count = int(s[len('count:'):])
+ strings.remove(s)
+ break
+ except ValueError:
+ pass
+ for s in strings:
+ if s.startswith('count-per-host:'):
+ try:
+ count_per_host = int(s[len('count-per-host:'):])
+ strings.remove(s)
+ break
+ except ValueError:
+ pass
+
+ advanced_hostspecs = [h for h in strings if
+ (':' in h or '=' in h or not any(c in '[]?*:=' for c in h)) and
+ 'label:' not in h]
+ for a_h in advanced_hostspecs:
+ strings.remove(a_h)
+
+ labels = [x for x in strings if 'label:' in x]
+ if len(labels) > 1:
+ raise SpecValidationError('more than one label provided: {}'.format(labels))
+ for l in labels:
+ strings.remove(l)
+ label = labels[0][6:] if labels else None
+
+ host_patterns = strings
+ if len(host_patterns) > 1:
+ raise SpecValidationError(
+ 'more than one host pattern provided: {}'.format(host_patterns))
+
+ ps = PlacementSpec(count=count,
+ count_per_host=count_per_host,
+ hosts=advanced_hostspecs,
+ label=label,
+ host_pattern=host_patterns[0] if host_patterns else None)
+ return ps
+
+
+_service_spec_from_json_validate = True
+
+
+class CustomConfig:
+ """
+ Class to specify custom config files to be mounted in daemon's container
+ """
+
+ _fields = ['content', 'mount_path']
+
+ def __init__(self, content: str, mount_path: str) -> None:
+ self.content: str = content
+ self.mount_path: str = mount_path
+ self.validate()
+
+ def to_json(self) -> Dict[str, Any]:
+ return {
+ 'content': self.content,
+ 'mount_path': self.mount_path,
+ }
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Any]) -> "CustomConfig":
+ for k in cls._fields:
+ if k not in data:
+ raise SpecValidationError(f'CustomConfig must have "{k}" field')
+ for k in data.keys():
+ if k not in cls._fields:
+ raise SpecValidationError(f'CustomConfig got unknown field "{k}"')
+ return cls(**data)
+
+ @property
+ def filename(self) -> str:
+ return os.path.basename(self.mount_path)
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, CustomConfig):
+ return (
+ self.content == other.content
+ and self.mount_path == other.mount_path
+ )
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'CustomConfig({self.mount_path})'
+
+ def validate(self) -> None:
+ if not isinstance(self.content, str):
+ raise SpecValidationError(
+ f'CustomConfig content must be a string. Got {type(self.content)}')
+ if not isinstance(self.mount_path, str):
+ raise SpecValidationError(
+ f'CustomConfig content must be a string. Got {type(self.mount_path)}')
+
+
+@contextmanager
+def service_spec_allow_invalid_from_json() -> Iterator[None]:
+ """
+ I know this is evil, but unfortunately `ceph orch ls`
+ may return invalid OSD specs for OSDs not associated to
+ and specs. If you have a better idea, please!
+ """
+ global _service_spec_from_json_validate
+ _service_spec_from_json_validate = False
+ yield
+ _service_spec_from_json_validate = True
+
+
+class ArgumentSpec:
+ """The ArgumentSpec type represents an argument that can be
+ passed to an underyling subsystem, like a container engine or
+ another command line tool.
+
+ The ArgumentSpec aims to be backwards compatible with the previous
+ form of argument, a single string. The string was always assumed
+ to be indentended to be split on spaces. For example:
+ `--cpus 8` becomes `["--cpus", "8"]`. This type is converted from
+ either a string or an json/yaml object. In the object form you
+ can choose if the string part should be split so an argument like
+ `--migrate-from=//192.168.5.22/My Documents` can be expressed.
+ """
+ _fields = ['argument', 'split']
+
+ class OriginalType(enum.Enum):
+ OBJECT = 0
+ STRING = 1
+
+ def __init__(
+ self,
+ argument: str,
+ split: bool = False,
+ *,
+ origin: OriginalType = OriginalType.OBJECT,
+ ) -> None:
+ self.argument = argument
+ self.split = bool(split)
+ # origin helps with round-tripping between inputs that
+ # are simple strings or objects (dicts)
+ self._origin = origin
+ self.validate()
+
+ def to_json(self) -> Union[str, Dict[str, Any]]:
+ """Return a json-safe represenation of the ArgumentSpec."""
+ if self._origin == self.OriginalType.STRING:
+ return self.argument
+ return {
+ 'argument': self.argument,
+ 'split': self.split,
+ }
+
+ def to_args(self) -> List[str]:
+ """Convert this ArgumentSpec into a list of arguments suitable for
+ adding to an argv-style command line.
+ """
+ if not self.split:
+ return [self.argument]
+ return [part for part in self.argument.split(" ") if part]
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, ArgumentSpec):
+ return (
+ self.argument == other.argument
+ and self.split == other.split
+ )
+ if isinstance(other, object):
+ # This is a workaround for silly ceph mgr object/type identity
+ # mismatches due to multiple python interpreters in use.
+ try:
+ argument = getattr(other, 'argument')
+ split = getattr(other, 'split')
+ return (self.argument == argument and self.split == split)
+ except AttributeError:
+ pass
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'ArgumentSpec({self.argument!r}, {self.split!r})'
+
+ def validate(self) -> None:
+ if not isinstance(self.argument, str):
+ raise SpecValidationError(
+ f'ArgumentSpec argument must be a string. Got {type(self.argument)}')
+ if not isinstance(self.split, bool):
+ raise SpecValidationError(
+ f'ArgumentSpec split must be a boolean. Got {type(self.split)}')
+
+ @classmethod
+ def from_json(cls, data: Union[str, Dict[str, Any]]) -> "ArgumentSpec":
+ """Convert a json-object (dict) to an ArgumentSpec."""
+ if isinstance(data, str):
+ return cls(data, split=True, origin=cls.OriginalType.STRING)
+ if 'argument' not in data:
+ raise SpecValidationError(f'ArgumentSpec must have an "argument" field')
+ for k in data.keys():
+ if k not in cls._fields:
+ raise SpecValidationError(f'ArgumentSpec got an unknown field {k!r}')
+ return cls(**data)
+
+ @staticmethod
+ def map_json(
+ values: Optional["ArgumentList"]
+ ) -> Optional[List[Union[str, Dict[str, Any]]]]:
+ """Given a list of ArgumentSpec objects return a json-safe
+ representation.of them."""
+ if values is None:
+ return None
+ return [v.to_json() for v in values]
+
+ @classmethod
+ def from_general_args(cls, data: "GeneralArgList") -> "ArgumentList":
+ """Convert a list of strs, dicts, or existing ArgumentSpec objects
+ to a list of only ArgumentSpec objects.
+ """
+ out: ArgumentList = []
+ for item in data:
+ if isinstance(item, (str, dict)):
+ out.append(cls.from_json(item))
+ elif isinstance(item, cls):
+ out.append(item)
+ elif hasattr(item, 'to_json'):
+ # This is a workaround for silly ceph mgr object/type identity
+ # mismatches due to multiple python interpreters in use.
+ # It should be safe because we already have to be able to
+ # round-trip between json/yaml.
+ out.append(cls.from_json(item.to_json()))
+ else:
+ raise SpecValidationError(f"Unknown type for argument: {type(item)}")
+ return out
+
+
+ArgumentList = List[ArgumentSpec]
+GeneralArgList = List[Union[str, Dict[str, Any], "ArgumentSpec"]]
+
+
+class ServiceSpec(object):
+ """
+ Details of service creation.
+
+ Request to the orchestrator for a cluster of daemons
+ such as MDS, RGW, iscsi gateway, nvmeof gateway, MONs, MGRs, Prometheus
+
+ This structure is supposed to be enough information to
+ start the services.
+ """
+ KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \
+ 'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \
+ 'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \
+ 'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split()
+ REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split()
+ MANAGED_CONFIG_OPTIONS = [
+ 'mds_join_fs',
+ ]
+
+ @classmethod
+ def _cls(cls: Type[ServiceSpecT], service_type: str) -> Type[ServiceSpecT]:
+ from ceph.deployment.drive_group import DriveGroupSpec
+
+ ret = {
+ 'mon': MONSpec,
+ 'rgw': RGWSpec,
+ 'nfs': NFSServiceSpec,
+ 'osd': DriveGroupSpec,
+ 'mds': MDSSpec,
+ 'iscsi': IscsiServiceSpec,
+ 'nvmeof': NvmeofServiceSpec,
+ 'alertmanager': AlertManagerSpec,
+ 'ingress': IngressSpec,
+ 'container': CustomContainerSpec,
+ 'grafana': GrafanaSpec,
+ 'node-exporter': MonitoringSpec,
+ 'ceph-exporter': CephExporterSpec,
+ 'prometheus': PrometheusSpec,
+ 'loki': MonitoringSpec,
+ 'promtail': MonitoringSpec,
+ 'snmp-gateway': SNMPGatewaySpec,
+ 'elasticsearch': TracingSpec,
+ 'jaeger-agent': TracingSpec,
+ 'jaeger-collector': TracingSpec,
+ 'jaeger-query': TracingSpec,
+ 'jaeger-tracing': TracingSpec,
+ }.get(service_type, cls)
+ if ret == ServiceSpec and not service_type:
+ raise SpecValidationError('Spec needs a "service_type" key.')
+ return ret
+
+ def __new__(cls: Type[ServiceSpecT], *args: Any, **kwargs: Any) -> ServiceSpecT:
+ """
+ Some Python foo to make sure, we don't have an object
+ like `ServiceSpec('rgw')` of type `ServiceSpec`. Now we have:
+
+ >>> type(ServiceSpec('rgw')) == type(RGWSpec('rgw'))
+ True
+
+ """
+ if cls != ServiceSpec:
+ return object.__new__(cls)
+ service_type = kwargs.get('service_type', args[0] if args else None)
+ sub_cls: Any = cls._cls(service_type)
+ return object.__new__(sub_cls)
+
+ def __init__(self,
+ service_type: str,
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ count: Optional[int] = None,
+ config: Optional[Dict[str, str]] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+
+ #: See :ref:`orchestrator-cli-placement-spec`.
+ self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
+
+ assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES, service_type
+ #: The type of the service. Needs to be either a Ceph
+ #: service (``mon``, ``crash``, ``mds``, ``mgr``, ``osd`` or
+ #: ``rbd-mirror``), a gateway (``nfs`` or ``rgw``), part of the
+ #: monitoring stack (``alertmanager``, ``grafana``, ``node-exporter`` or
+ #: ``prometheus``) or (``container``) for custom containers.
+ self.service_type = service_type
+
+ #: The name of the service. Required for ``iscsi``, ``nvmeof``, ``mds``, ``nfs``, ``osd``,
+ #: ``rgw``, ``container``, ``ingress``
+ self.service_id = None
+
+ if self.service_type in self.REQUIRES_SERVICE_ID or self.service_type == 'osd':
+ self.service_id = service_id
+
+ #: If set to ``true``, the orchestrator will not deploy nor remove
+ #: any daemon associated with this service. Placement and all other properties
+ #: will be ignored. This is useful, if you do not want this service to be
+ #: managed temporarily. For cephadm, See :ref:`cephadm-spec-unmanaged`
+ self.unmanaged = unmanaged
+ self.preview_only = preview_only
+
+ #: A list of network identities instructing the daemons to only bind
+ #: on the particular networks in that list. In case the cluster is distributed
+ #: across multiple networks, you can add multiple networks. See
+ #: :ref:`cephadm-monitoring-networks-ports`,
+ #: :ref:`cephadm-rgw-networks` and :ref:`cephadm-mgr-networks`.
+ self.networks: List[str] = networks or []
+
+ self.config: Optional[Dict[str, str]] = None
+ if config:
+ self.config = {k.replace(' ', '_'): v for k, v in config.items()}
+
+ self.extra_container_args: Optional[ArgumentList] = None
+ self.extra_entrypoint_args: Optional[ArgumentList] = None
+ if extra_container_args:
+ self.extra_container_args = ArgumentSpec.from_general_args(
+ extra_container_args)
+ if extra_entrypoint_args:
+ self.extra_entrypoint_args = ArgumentSpec.from_general_args(
+ extra_entrypoint_args)
+ self.custom_configs: Optional[List[CustomConfig]] = custom_configs
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls: Type[ServiceSpecT], json_spec: Dict) -> ServiceSpecT:
+ """
+ Initialize 'ServiceSpec' object data from a json structure
+
+ There are two valid styles for service specs:
+
+ the "old" style:
+
+ .. code:: yaml
+
+ service_type: nfs
+ service_id: foo
+ pool: mypool
+ namespace: myns
+
+ and the "new" style:
+
+ .. code:: yaml
+
+ service_type: nfs
+ service_id: foo
+ config:
+ some_option: the_value
+ networks: [10.10.0.0/16]
+ spec:
+ pool: mypool
+ namespace: myns
+
+ In https://tracker.ceph.com/issues/45321 we decided that we'd like to
+ prefer the new style as it is more readable and provides a better
+ understanding of what fields are special for a give service type.
+
+ Note, we'll need to stay compatible with both versions for the
+ the next two major releases (octopus, pacific).
+
+ :param json_spec: A valid dict with ServiceSpec
+
+ :meta private:
+ """
+ if not isinstance(json_spec, dict):
+ raise SpecValidationError(
+ f'Service Spec is not an (JSON or YAML) object. got "{str(json_spec)}"')
+
+ json_spec = cls.normalize_json(json_spec)
+
+ c = json_spec.copy()
+
+ # kludge to make `from_json` compatible to `Orchestrator.describe_service`
+ # Open question: Remove `service_id` form to_json?
+ if c.get('service_name', ''):
+ service_type_id = c['service_name'].split('.', 1)
+
+ if not c.get('service_type', ''):
+ c['service_type'] = service_type_id[0]
+ if not c.get('service_id', '') and len(service_type_id) > 1:
+ c['service_id'] = service_type_id[1]
+ del c['service_name']
+
+ service_type = c.get('service_type', '')
+ _cls = cls._cls(service_type)
+
+ if 'status' in c:
+ del c['status'] # kludge to make us compatible to `ServiceDescription.to_json()`
+
+ return _cls._from_json_impl(c) # type: ignore
+
+ @staticmethod
+ def normalize_json(json_spec: dict) -> dict:
+ networks = json_spec.get('networks')
+ if networks is None:
+ return json_spec
+ if isinstance(networks, list):
+ return json_spec
+ if not isinstance(networks, str):
+ raise SpecValidationError(f'Networks ({networks}) must be a string or list of strings')
+ json_spec['networks'] = [networks]
+ return json_spec
+
+ @classmethod
+ def _from_json_impl(cls: Type[ServiceSpecT], json_spec: dict) -> ServiceSpecT:
+ args = {} # type: Dict[str, Any]
+ for k, v in json_spec.items():
+ if k == 'placement':
+ v = PlacementSpec.from_json(v)
+ if k == 'custom_configs':
+ v = [CustomConfig.from_json(c) for c in v]
+ if k == 'spec':
+ args.update(v)
+ continue
+ args.update({k: v})
+ _cls = cls(**args)
+ if _service_spec_from_json_validate:
+ _cls.validate()
+ return _cls
+
+ def service_name(self) -> str:
+ n = self.service_type
+ if self.service_id:
+ n += '.' + self.service_id
+ return n
+
+ def get_port_start(self) -> List[int]:
+ # If defined, we will allocate and number ports starting at this
+ # point.
+ return []
+
+ def get_virtual_ip(self) -> Optional[str]:
+ return None
+
+ def to_json(self):
+ # type: () -> OrderedDict[str, Any]
+ ret: OrderedDict[str, Any] = OrderedDict()
+ ret['service_type'] = self.service_type
+ if self.service_id:
+ ret['service_id'] = self.service_id
+ ret['service_name'] = self.service_name()
+ if self.placement.to_json():
+ ret['placement'] = self.placement.to_json()
+ if self.unmanaged:
+ ret['unmanaged'] = self.unmanaged
+ if self.networks:
+ ret['networks'] = self.networks
+ if self.extra_container_args:
+ ret['extra_container_args'] = ArgumentSpec.map_json(
+ self.extra_container_args
+ )
+ if self.extra_entrypoint_args:
+ ret['extra_entrypoint_args'] = ArgumentSpec.map_json(
+ self.extra_entrypoint_args
+ )
+ if self.custom_configs:
+ ret['custom_configs'] = [c.to_json() for c in self.custom_configs]
+
+ c = {}
+ for key, val in sorted(self.__dict__.items(), key=lambda tpl: tpl[0]):
+ if key in ret:
+ continue
+ if hasattr(val, 'to_json'):
+ val = val.to_json()
+ if val:
+ c[key] = val
+ if c:
+ ret['spec'] = c
+ return ret
+
+ def validate(self) -> None:
+ if not self.service_type:
+ raise SpecValidationError('Cannot add Service: type required')
+
+ if self.service_type != 'osd':
+ if self.service_type in self.REQUIRES_SERVICE_ID and not self.service_id:
+ raise SpecValidationError('Cannot add Service: id required')
+ if self.service_type not in self.REQUIRES_SERVICE_ID and self.service_id:
+ raise SpecValidationError(
+ f'Service of type \'{self.service_type}\' should not contain a service id')
+
+ if self.service_id:
+ if not re.match('^[a-zA-Z0-9_.-]+$', str(self.service_id)):
+ raise SpecValidationError('Service id contains invalid characters, '
+ 'only [a-zA-Z0-9_.-] allowed')
+
+ if self.placement is not None:
+ self.placement.validate()
+ if self.config:
+ for k, v in self.config.items():
+ if k in self.MANAGED_CONFIG_OPTIONS:
+ raise SpecValidationError(
+ f'Cannot set config option {k} in spec: it is managed by cephadm'
+ )
+ for network in self.networks or []:
+ try:
+ ip_network(network)
+ except ValueError as e:
+ raise SpecValidationError(
+ f'Cannot parse network {network}: {e}'
+ )
+
+ def __repr__(self) -> str:
+ y = yaml.dump(cast(dict, self), default_flow_style=False)
+ return f"{self.__class__.__name__}.from_json(yaml.safe_load('''{y}'''))"
+
+ def __eq__(self, other: Any) -> bool:
+ return (self.__class__ == other.__class__
+ and
+ self.__dict__ == other.__dict__)
+
+ def one_line_str(self) -> str:
+ return '<{} for service_name={}>'.format(self.__class__.__name__, self.service_name())
+
+ @staticmethod
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceSpec') -> Any:
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
+
+
+yaml.add_representer(ServiceSpec, ServiceSpec.yaml_representer)
+
+
+class NFSServiceSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'nfs',
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ port: Optional[int] = None,
+ virtual_ip: Optional[str] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ enable_haproxy_protocol: bool = False,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'nfs'
+ super(NFSServiceSpec, self).__init__(
+ 'nfs', service_id=service_id,
+ placement=placement, unmanaged=unmanaged, preview_only=preview_only,
+ config=config, networks=networks, extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args, custom_configs=custom_configs)
+
+ self.port = port
+ self.virtual_ip = virtual_ip
+ self.enable_haproxy_protocol = enable_haproxy_protocol
+
+ def get_port_start(self) -> List[int]:
+ if self.port:
+ return [self.port]
+ return []
+
+ def rados_config_name(self):
+ # type: () -> str
+ return 'conf-' + self.service_name()
+
+
+yaml.add_representer(NFSServiceSpec, ServiceSpec.yaml_representer)
+
+
+class RGWSpec(ServiceSpec):
+ """
+ Settings to configure a (multisite) Ceph RGW
+
+ .. code-block:: yaml
+
+ service_type: rgw
+ service_id: myrealm.myzone
+ spec:
+ rgw_realm: myrealm
+ rgw_zonegroup: myzonegroup
+ rgw_zone: myzone
+ ssl: true
+ rgw_frontend_port: 1234
+ rgw_frontend_type: beast
+ rgw_frontend_ssl_certificate: ...
+
+ See also: :ref:`orchestrator-cli-service-spec`
+ """
+
+ MANAGED_CONFIG_OPTIONS = ServiceSpec.MANAGED_CONFIG_OPTIONS + [
+ 'rgw_zone',
+ 'rgw_realm',
+ 'rgw_zonegroup',
+ 'rgw_frontends',
+ ]
+
+ def __init__(self,
+ service_type: str = 'rgw',
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ rgw_realm: Optional[str] = None,
+ rgw_zonegroup: Optional[str] = None,
+ rgw_zone: Optional[str] = None,
+ rgw_frontend_port: Optional[int] = None,
+ rgw_frontend_ssl_certificate: Optional[List[str]] = None,
+ rgw_frontend_type: Optional[str] = None,
+ rgw_frontend_extra_args: Optional[List[str]] = None,
+ unmanaged: bool = False,
+ ssl: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ subcluster: Optional[str] = None, # legacy, only for from_json on upgrade
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ rgw_realm_token: Optional[str] = None,
+ update_endpoints: Optional[bool] = False,
+ zone_endpoints: Optional[str] = None # commad separated endpoints list
+ ):
+ assert service_type == 'rgw', service_type
+
+ # for backward compatibility with octopus spec files,
+ if not service_id and (rgw_realm and rgw_zone):
+ service_id = rgw_realm + '.' + rgw_zone
+
+ super(RGWSpec, self).__init__(
+ 'rgw', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config, networks=networks,
+ extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ #: The RGW realm associated with this service. Needs to be manually created
+ #: if the spec is being applied directly to cephdam. In case of rgw module
+ #: the realm is created automatically.
+ self.rgw_realm: Optional[str] = rgw_realm
+ #: The RGW zonegroup associated with this service. Needs to be manually created
+ #: if the spec is being applied directly to cephdam. In case of rgw module
+ #: the zonegroup is created automatically.
+ self.rgw_zonegroup: Optional[str] = rgw_zonegroup
+ #: The RGW zone associated with this service. Needs to be manually created
+ #: if the spec is being applied directly to cephdam. In case of rgw module
+ #: the zone is created automatically.
+ self.rgw_zone: Optional[str] = rgw_zone
+ #: Port of the RGW daemons
+ self.rgw_frontend_port: Optional[int] = rgw_frontend_port
+ #: List of SSL certificates
+ self.rgw_frontend_ssl_certificate: Optional[List[str]] = rgw_frontend_ssl_certificate
+ #: civetweb or beast (default: beast). See :ref:`rgw_frontends`
+ self.rgw_frontend_type: Optional[str] = rgw_frontend_type
+ #: List of extra arguments for rgw_frontend in the form opt=value. See :ref:`rgw_frontends`
+ self.rgw_frontend_extra_args: Optional[List[str]] = rgw_frontend_extra_args
+ #: enable SSL
+ self.ssl = ssl
+ self.rgw_realm_token = rgw_realm_token
+ self.update_endpoints = update_endpoints
+ self.zone_endpoints = zone_endpoints
+
+ def get_port_start(self) -> List[int]:
+ return [self.get_port()]
+
+ def get_port(self) -> int:
+ if self.rgw_frontend_port:
+ return self.rgw_frontend_port
+ if self.ssl:
+ return 443
+ else:
+ return 80
+
+ def validate(self) -> None:
+ super(RGWSpec, self).validate()
+
+ if self.rgw_realm and not self.rgw_zone:
+ raise SpecValidationError(
+ 'Cannot add RGW: Realm specified but no zone specified')
+ if self.rgw_zone and not self.rgw_realm:
+ raise SpecValidationError('Cannot add RGW: Zone specified but no realm specified')
+
+ if self.rgw_frontend_type is not None:
+ if self.rgw_frontend_type not in ['beast', 'civetweb']:
+ raise SpecValidationError(
+ 'Invalid rgw_frontend_type value. Valid values are: beast, civetweb.\n'
+ 'Additional rgw type parameters can be passed using rgw_frontend_extra_args.'
+ )
+
+
+yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer)
+
+
+class NvmeofServiceSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'nvmeof',
+ service_id: Optional[str] = None,
+ name: Optional[str] = None,
+ group: Optional[str] = None,
+ port: Optional[int] = None,
+ pool: Optional[str] = None,
+ enable_auth: bool = False,
+ server_key: Optional[str] = None,
+ server_cert: Optional[str] = None,
+ client_key: Optional[str] = None,
+ client_cert: Optional[str] = None,
+ spdk_path: Optional[str] = None,
+ tgt_path: Optional[str] = None,
+ timeout: Optional[int] = 60,
+ conn_retries: Optional[int] = 10,
+ transports: Optional[str] = 'tcp',
+ transport_tcp_options: Optional[Dict[str, int]] =
+ {"in_capsule_data_size": 8192, "max_io_qpairs_per_ctrlr": 7},
+ tgt_cmd_extra_args: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'nvmeof'
+ super(NvmeofServiceSpec, self).__init__('nvmeof', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only,
+ config=config, networks=networks,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ #: RADOS pool where ceph-nvmeof config data is stored.
+ self.pool = pool
+ #: ``port`` port of the nvmeof gateway
+ self.port = port or 5500
+ #: ``name`` name of the nvmeof gateway
+ self.name = name
+ #: ``group`` name of the nvmeof gateway
+ self.group = group
+ #: ``enable_auth`` enables user authentication on nvmeof gateway
+ self.enable_auth = enable_auth
+ #: ``server_key`` gateway server key
+ self.server_key = server_key or './server.key'
+ #: ``server_cert`` gateway server certificate
+ self.server_cert = server_cert or './server.crt'
+ #: ``client_key`` client key
+ self.client_key = client_key or './client.key'
+ #: ``client_cert`` client certificate
+ self.client_cert = client_cert or './client.crt'
+ #: ``spdk_path`` path to SPDK
+ self.spdk_path = spdk_path or '/usr/local/bin/nvmf_tgt'
+ #: ``tgt_path`` nvmeof target path
+ self.tgt_path = tgt_path or '/usr/local/bin/nvmf_tgt'
+ #: ``timeout`` ceph connectivity timeout
+ self.timeout = timeout
+ #: ``conn_retries`` ceph connection retries number
+ self.conn_retries = conn_retries
+ #: ``transports`` tcp
+ self.transports = transports
+ #: List of extra arguments for transports in the form opt=value
+ self.transport_tcp_options: Optional[Dict[str, int]] = transport_tcp_options
+ #: ``tgt_cmd_extra_args`` extra arguments for the nvmf_tgt process
+ self.tgt_cmd_extra_args = tgt_cmd_extra_args
+
+ def get_port_start(self) -> List[int]:
+ return [5500, 4420, 8009]
+
+ def validate(self) -> None:
+ # TODO: what other parameters should be validated as part of this function?
+ super(NvmeofServiceSpec, self).validate()
+
+ if not self.pool:
+ raise SpecValidationError('Cannot add NVMEOF: No Pool specified')
+
+ if self.enable_auth:
+ if not any([self.server_key, self.server_cert, self.client_key, self.client_cert]):
+ raise SpecValidationError(
+ 'enable_auth is true but client/server certificates are missing')
+
+ if self.transports not in ['tcp']:
+ raise SpecValidationError('Invalid transport. Valid values are tcp')
+
+
+yaml.add_representer(NvmeofServiceSpec, ServiceSpec.yaml_representer)
+
+
+class IscsiServiceSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'iscsi',
+ service_id: Optional[str] = None,
+ pool: Optional[str] = None,
+ trusted_ip_list: Optional[str] = None,
+ api_port: Optional[int] = 5000,
+ api_user: Optional[str] = 'admin',
+ api_password: Optional[str] = 'admin',
+ api_secure: Optional[bool] = None,
+ ssl_cert: Optional[str] = None,
+ ssl_key: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'iscsi'
+ super(IscsiServiceSpec, self).__init__('iscsi', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only,
+ config=config, networks=networks,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ #: RADOS pool where ceph-iscsi config data is stored.
+ self.pool = pool
+ #: list of trusted IP addresses
+ self.trusted_ip_list = trusted_ip_list
+ #: ``api_port`` as defined in the ``iscsi-gateway.cfg``
+ self.api_port = api_port
+ #: ``api_user`` as defined in the ``iscsi-gateway.cfg``
+ self.api_user = api_user
+ #: ``api_password`` as defined in the ``iscsi-gateway.cfg``
+ self.api_password = api_password
+ #: ``api_secure`` as defined in the ``iscsi-gateway.cfg``
+ self.api_secure = api_secure
+ #: SSL certificate
+ self.ssl_cert = ssl_cert
+ #: SSL private key
+ self.ssl_key = ssl_key
+
+ if not self.api_secure and self.ssl_cert and self.ssl_key:
+ self.api_secure = True
+
+ def get_port_start(self) -> List[int]:
+ return [self.api_port or 5000]
+
+ def validate(self) -> None:
+ super(IscsiServiceSpec, self).validate()
+
+ if not self.pool:
+ raise SpecValidationError(
+ 'Cannot add ISCSI: No Pool specified')
+
+ # Do not need to check for api_user and api_password as they
+ # now default to 'admin' when setting up the gateway url. Older
+ # iSCSI specs from before this change should be fine as they will
+ # have been required to have an api_user and api_password set and
+ # will be unaffected by the new default value.
+
+
+yaml.add_representer(IscsiServiceSpec, ServiceSpec.yaml_representer)
+
+
+class IngressSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'ingress',
+ service_id: Optional[str] = None,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ placement: Optional[PlacementSpec] = None,
+ backend_service: Optional[str] = None,
+ frontend_port: Optional[int] = None,
+ ssl_cert: Optional[str] = None,
+ ssl_key: Optional[str] = None,
+ ssl_dh_param: Optional[str] = None,
+ ssl_ciphers: Optional[List[str]] = None,
+ ssl_options: Optional[List[str]] = None,
+ monitor_port: Optional[int] = None,
+ monitor_user: Optional[str] = None,
+ monitor_password: Optional[str] = None,
+ enable_stats: Optional[bool] = None,
+ keepalived_password: Optional[str] = None,
+ virtual_ip: Optional[str] = None,
+ virtual_ips_list: Optional[List[str]] = None,
+ virtual_interface_networks: Optional[List[str]] = [],
+ use_keepalived_multicast: Optional[bool] = False,
+ vrrp_interface_network: Optional[str] = None,
+ first_virtual_router_id: Optional[int] = 50,
+ unmanaged: bool = False,
+ ssl: bool = False,
+ keepalive_only: bool = False,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ enable_haproxy_protocol: bool = False,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'ingress'
+
+ super(IngressSpec, self).__init__(
+ 'ingress', service_id=service_id,
+ placement=placement, config=config,
+ networks=networks,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs
+ )
+ self.backend_service = backend_service
+ self.frontend_port = frontend_port
+ self.ssl_cert = ssl_cert
+ self.ssl_key = ssl_key
+ self.ssl_dh_param = ssl_dh_param
+ self.ssl_ciphers = ssl_ciphers
+ self.ssl_options = ssl_options
+ self.monitor_port = monitor_port
+ self.monitor_user = monitor_user
+ self.monitor_password = monitor_password
+ self.keepalived_password = keepalived_password
+ self.virtual_ip = virtual_ip
+ self.virtual_ips_list = virtual_ips_list
+ self.virtual_interface_networks = virtual_interface_networks or []
+ self.use_keepalived_multicast = use_keepalived_multicast
+ self.vrrp_interface_network = vrrp_interface_network
+ self.first_virtual_router_id = first_virtual_router_id
+ self.unmanaged = unmanaged
+ self.ssl = ssl
+ self.keepalive_only = keepalive_only
+ self.enable_haproxy_protocol = enable_haproxy_protocol
+
+ def get_port_start(self) -> List[int]:
+ ports = []
+ if self.frontend_port is not None:
+ ports.append(cast(int, self.frontend_port))
+ if self.monitor_port is not None:
+ ports.append(cast(int, self.monitor_port))
+ return ports
+
+ def get_virtual_ip(self) -> Optional[str]:
+ return self.virtual_ip
+
+ def validate(self) -> None:
+ super(IngressSpec, self).validate()
+
+ if not self.backend_service:
+ raise SpecValidationError(
+ 'Cannot add ingress: No backend_service specified')
+ if not self.keepalive_only and not self.frontend_port:
+ raise SpecValidationError(
+ 'Cannot add ingress: No frontend_port specified')
+ if not self.monitor_port:
+ raise SpecValidationError(
+ 'Cannot add ingress: No monitor_port specified')
+ if not self.virtual_ip and not self.virtual_ips_list:
+ raise SpecValidationError(
+ 'Cannot add ingress: No virtual_ip provided')
+ if self.virtual_ip is not None and self.virtual_ips_list is not None:
+ raise SpecValidationError(
+ 'Cannot add ingress: Single and multiple virtual IPs specified')
+
+
+yaml.add_representer(IngressSpec, ServiceSpec.yaml_representer)
+
+
+class CustomContainerSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'container',
+ service_id: Optional[str] = None,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ image: Optional[str] = None,
+ entrypoint: Optional[str] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ uid: Optional[int] = None,
+ gid: Optional[int] = None,
+ volume_mounts: Optional[Dict[str, str]] = {},
+ # args are for the container runtime, not entrypoint
+ args: Optional[GeneralArgList] = [],
+ envs: Optional[List[str]] = [],
+ privileged: Optional[bool] = False,
+ bind_mounts: Optional[List[List[str]]] = None,
+ ports: Optional[List[int]] = [],
+ dirs: Optional[List[str]] = [],
+ files: Optional[Dict[str, Any]] = {},
+ ):
+ assert service_type == 'container'
+ assert service_id is not None
+ assert image is not None
+
+ super(CustomContainerSpec, self).__init__(
+ service_type, service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config,
+ networks=networks, extra_entrypoint_args=extra_entrypoint_args)
+
+ self.image = image
+ self.entrypoint = entrypoint
+ self.uid = uid
+ self.gid = gid
+ self.volume_mounts = volume_mounts
+ self.args = args
+ self.envs = envs
+ self.privileged = privileged
+ self.bind_mounts = bind_mounts
+ self.ports = ports
+ self.dirs = dirs
+ self.files = files
+
+ def config_json(self) -> Dict[str, Any]:
+ """
+ Helper function to get the value of the `--config-json` cephadm
+ command line option. It will contain all specification properties
+ that haven't a `None` value. Such properties will get default
+ values in cephadm.
+ :return: Returns a dictionary containing all specification
+ properties.
+ """
+ config_json = {}
+ for prop in ['image', 'entrypoint', 'uid', 'gid', 'args',
+ 'envs', 'volume_mounts', 'privileged',
+ 'bind_mounts', 'ports', 'dirs', 'files']:
+ value = getattr(self, prop)
+ if value is not None:
+ config_json[prop] = value
+ return config_json
+
+
+yaml.add_representer(CustomContainerSpec, ServiceSpec.yaml_representer)
+
+
+class MonitoringSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str,
+ service_id: Optional[str] = None,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ port: Optional[int] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type in ['grafana', 'node-exporter', 'prometheus', 'alertmanager',
+ 'loki', 'promtail']
+
+ super(MonitoringSpec, self).__init__(
+ service_type, service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config,
+ networks=networks, extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ self.service_type = service_type
+ self.port = port
+
+ def get_port_start(self) -> List[int]:
+ return [self.get_port()]
+
+ def get_port(self) -> int:
+ if self.port:
+ return self.port
+ else:
+ return {'prometheus': 9095,
+ 'node-exporter': 9100,
+ 'alertmanager': 9093,
+ 'grafana': 3000,
+ 'loki': 3100,
+ 'promtail': 9080}[self.service_type]
+
+
+yaml.add_representer(MonitoringSpec, ServiceSpec.yaml_representer)
+
+
+class AlertManagerSpec(MonitoringSpec):
+ def __init__(self,
+ service_type: str = 'alertmanager',
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ user_data: Optional[Dict[str, Any]] = None,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ port: Optional[int] = None,
+ secure: bool = False,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'alertmanager'
+ super(AlertManagerSpec, self).__init__(
+ 'alertmanager', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config, networks=networks, port=port,
+ extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ # Custom configuration.
+ #
+ # Example:
+ # service_type: alertmanager
+ # service_id: xyz
+ # user_data:
+ # default_webhook_urls:
+ # - "https://foo"
+ # - "https://bar"
+ #
+ # Documentation:
+ # default_webhook_urls - A list of additional URL's that are
+ # added to the default receivers'
+ # <webhook_configs> configuration.
+ self.user_data = user_data or {}
+ self.secure = secure
+
+ def get_port_start(self) -> List[int]:
+ return [self.get_port(), 9094]
+
+ def validate(self) -> None:
+ super(AlertManagerSpec, self).validate()
+
+ if self.port == 9094:
+ raise SpecValidationError(
+ 'Port 9094 is reserved for AlertManager cluster listen address')
+
+
+yaml.add_representer(AlertManagerSpec, ServiceSpec.yaml_representer)
+
+
+class GrafanaSpec(MonitoringSpec):
+ def __init__(self,
+ service_type: str = 'grafana',
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ port: Optional[int] = None,
+ protocol: Optional[str] = 'https',
+ initial_admin_password: Optional[str] = None,
+ anonymous_access: Optional[bool] = True,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'grafana'
+ super(GrafanaSpec, self).__init__(
+ 'grafana', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config, networks=networks, port=port,
+ extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ self.initial_admin_password = initial_admin_password
+ self.anonymous_access = anonymous_access
+ self.protocol = protocol
+
+ def validate(self) -> None:
+ super(GrafanaSpec, self).validate()
+ if self.protocol not in ['http', 'https']:
+ err_msg = f"Invalid protocol '{self.protocol}'. Valid values are: 'http', 'https'."
+ raise SpecValidationError(err_msg)
+
+ if not self.anonymous_access and not self.initial_admin_password:
+ err_msg = ('Either initial_admin_password must be set or anonymous_access '
+ 'must be set to true. Otherwise the grafana dashboard will '
+ 'be inaccessible.')
+ raise SpecValidationError(err_msg)
+
+
+yaml.add_representer(GrafanaSpec, ServiceSpec.yaml_representer)
+
+
+class PrometheusSpec(MonitoringSpec):
+ def __init__(self,
+ service_type: str = 'prometheus',
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ port: Optional[int] = None,
+ retention_time: Optional[str] = None,
+ retention_size: Optional[str] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'prometheus'
+ super(PrometheusSpec, self).__init__(
+ 'prometheus', service_id=service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config, networks=networks, port=port,
+ extra_container_args=extra_container_args, extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ self.retention_time = retention_time.strip() if retention_time else None
+ self.retention_size = retention_size.strip() if retention_size else None
+
+ def validate(self) -> None:
+ super(PrometheusSpec, self).validate()
+
+ if self.retention_time:
+ valid_units = ['y', 'w', 'd', 'h', 'm', 's']
+ m = re.search(rf"^(\d+)({'|'.join(valid_units)})$", self.retention_time)
+ if not m:
+ units = ', '.join(valid_units)
+ raise SpecValidationError(f"Invalid retention time. Valid units are: {units}")
+ if self.retention_size:
+ valid_units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB']
+ m = re.search(rf"^(\d+)({'|'.join(valid_units)})$", self.retention_size)
+ if not m:
+ units = ', '.join(valid_units)
+ raise SpecValidationError(f"Invalid retention size. Valid units are: {units}")
+
+
+yaml.add_representer(PrometheusSpec, ServiceSpec.yaml_representer)
+
+
+class SNMPGatewaySpec(ServiceSpec):
+ class SNMPVersion(str, enum.Enum):
+ V2c = 'V2c'
+ V3 = 'V3'
+
+ def to_json(self) -> str:
+ return self.value
+
+ class SNMPAuthType(str, enum.Enum):
+ MD5 = 'MD5'
+ SHA = 'SHA'
+
+ def to_json(self) -> str:
+ return self.value
+
+ class SNMPPrivacyType(str, enum.Enum):
+ DES = 'DES'
+ AES = 'AES'
+
+ def to_json(self) -> str:
+ return self.value
+
+ valid_destination_types = [
+ 'Name:Port',
+ 'IPv4:Port'
+ ]
+
+ def __init__(self,
+ service_type: str = 'snmp-gateway',
+ snmp_version: Optional[SNMPVersion] = None,
+ snmp_destination: str = '',
+ credentials: Dict[str, str] = {},
+ engine_id: Optional[str] = None,
+ auth_protocol: Optional[SNMPAuthType] = None,
+ privacy_protocol: Optional[SNMPPrivacyType] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ port: Optional[int] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'snmp-gateway'
+
+ super(SNMPGatewaySpec, self).__init__(
+ service_type,
+ placement=placement,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ self.service_type = service_type
+ self.snmp_version = snmp_version
+ self.snmp_destination = snmp_destination
+ self.port = port
+ self.credentials = credentials
+ self.engine_id = engine_id
+ self.auth_protocol = auth_protocol
+ self.privacy_protocol = privacy_protocol
+
+ @classmethod
+ def _from_json_impl(cls, json_spec: dict) -> 'SNMPGatewaySpec':
+
+ cpy = json_spec.copy()
+ types = [
+ ('snmp_version', SNMPGatewaySpec.SNMPVersion),
+ ('auth_protocol', SNMPGatewaySpec.SNMPAuthType),
+ ('privacy_protocol', SNMPGatewaySpec.SNMPPrivacyType),
+ ]
+ for d in cpy, cpy.get('spec', {}):
+ for key, enum_cls in types:
+ try:
+ if key in d:
+ d[key] = enum_cls(d[key])
+ except ValueError:
+ raise SpecValidationError(f'{key} unsupported. Must be one of '
+ f'{", ".join(enum_cls)}')
+ return super(SNMPGatewaySpec, cls)._from_json_impl(cpy)
+
+ @property
+ def ports(self) -> List[int]:
+ return [self.port or 9464]
+
+ def get_port_start(self) -> List[int]:
+ return self.ports
+
+ def validate(self) -> None:
+ super(SNMPGatewaySpec, self).validate()
+
+ if not self.credentials:
+ raise SpecValidationError(
+ 'Missing authentication information (credentials). '
+ 'SNMP V2c and V3 require credential information'
+ )
+ elif not self.snmp_version:
+ raise SpecValidationError(
+ 'Missing SNMP version (snmp_version)'
+ )
+
+ creds_requirement = {
+ 'V2c': ['snmp_community'],
+ 'V3': ['snmp_v3_auth_username', 'snmp_v3_auth_password']
+ }
+ if self.privacy_protocol:
+ creds_requirement['V3'].append('snmp_v3_priv_password')
+
+ missing = [parm for parm in creds_requirement[self.snmp_version]
+ if parm not in self.credentials]
+ # check that credentials are correct for the version
+ if missing:
+ raise SpecValidationError(
+ f'SNMP {self.snmp_version} credentials are incomplete. Missing {", ".join(missing)}'
+ )
+
+ if self.engine_id:
+ if 10 <= len(self.engine_id) <= 64 and \
+ is_hex(self.engine_id) and \
+ len(self.engine_id) % 2 == 0:
+ pass
+ else:
+ raise SpecValidationError(
+ 'engine_id must be a string containing 10-64 hex characters. '
+ 'Its length must be divisible by 2'
+ )
+
+ else:
+ if self.snmp_version == 'V3':
+ raise SpecValidationError(
+ 'Must provide an engine_id for SNMP V3 notifications'
+ )
+
+ if not self.snmp_destination:
+ raise SpecValidationError(
+ 'SNMP destination (snmp_destination) must be provided'
+ )
+ else:
+ valid, description = valid_addr(self.snmp_destination)
+ if not valid:
+ raise SpecValidationError(
+ f'SNMP destination (snmp_destination) is invalid: {description}'
+ )
+ if description not in self.valid_destination_types:
+ raise SpecValidationError(
+ f'SNMP destination (snmp_destination) type ({description}) is invalid. '
+ f'Must be either: {", ".join(sorted(self.valid_destination_types))}'
+ )
+
+
+yaml.add_representer(SNMPGatewaySpec, ServiceSpec.yaml_representer)
+
+
+class MDSSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'mds',
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ config: Optional[Dict[str, str]] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ extra_container_args: Optional[GeneralArgList] = None,
+ extra_entrypoint_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ ):
+ assert service_type == 'mds'
+ super(MDSSpec, self).__init__('mds', service_id=service_id,
+ placement=placement,
+ config=config,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ extra_container_args=extra_container_args,
+ extra_entrypoint_args=extra_entrypoint_args,
+ custom_configs=custom_configs)
+
+ def validate(self) -> None:
+ super(MDSSpec, self).validate()
+
+ if str(self.service_id)[0].isdigit():
+ raise SpecValidationError('MDS service id cannot start with a numeric digit')
+
+
+yaml.add_representer(MDSSpec, ServiceSpec.yaml_representer)
+
+
+class MONSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str,
+ service_id: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ count: Optional[int] = None,
+ config: Optional[Dict[str, str]] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ networks: Optional[List[str]] = None,
+ extra_container_args: Optional[GeneralArgList] = None,
+ custom_configs: Optional[List[CustomConfig]] = None,
+ crush_locations: Optional[Dict[str, List[str]]] = None,
+ ):
+ assert service_type == 'mon'
+ super(MONSpec, self).__init__('mon', service_id=service_id,
+ placement=placement,
+ count=count,
+ config=config,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ networks=networks,
+ extra_container_args=extra_container_args,
+ custom_configs=custom_configs)
+
+ self.crush_locations = crush_locations
+ self.validate()
+
+ def validate(self) -> None:
+ if self.crush_locations:
+ for host, crush_locs in self.crush_locations.items():
+ try:
+ assert_valid_host(host)
+ except SpecValidationError as e:
+ err_str = f'Invalid hostname found in spec crush locations: {e}'
+ raise SpecValidationError(err_str)
+ for cloc in crush_locs:
+ if '=' not in cloc or len(cloc.split('=')) != 2:
+ err_str = ('Crush locations must be of form <bucket>=<location>. '
+ f'Found crush location: {cloc}')
+ raise SpecValidationError(err_str)
+
+
+yaml.add_representer(MONSpec, ServiceSpec.yaml_representer)
+
+
+class TracingSpec(ServiceSpec):
+ SERVICE_TYPES = ['elasticsearch', 'jaeger-collector', 'jaeger-query', 'jaeger-agent']
+
+ def __init__(self,
+ service_type: str,
+ es_nodes: Optional[str] = None,
+ without_query: bool = False,
+ service_id: Optional[str] = None,
+ config: Optional[Dict[str, str]] = None,
+ networks: Optional[List[str]] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False
+ ):
+ assert service_type in TracingSpec.SERVICE_TYPES + ['jaeger-tracing']
+
+ super(TracingSpec, self).__init__(
+ service_type, service_id,
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only, config=config,
+ networks=networks)
+ self.without_query = without_query
+ self.es_nodes = es_nodes
+
+ def get_port_start(self) -> List[int]:
+ return [self.get_port()]
+
+ def get_port(self) -> int:
+ return {'elasticsearch': 9200,
+ 'jaeger-agent': 6799,
+ 'jaeger-collector': 14250,
+ 'jaeger-query': 16686}[self.service_type]
+
+ def get_tracing_specs(self) -> List[ServiceSpec]:
+ assert self.service_type == 'jaeger-tracing'
+ specs: List[ServiceSpec] = []
+ daemons: Dict[str, Optional[PlacementSpec]] = {
+ daemon: None for daemon in TracingSpec.SERVICE_TYPES}
+
+ if self.es_nodes:
+ del daemons['elasticsearch']
+ if self.without_query:
+ del daemons['jaeger-query']
+ if self.placement:
+ daemons.update({'jaeger-collector': self.placement})
+
+ for daemon, daemon_placement in daemons.items():
+ specs.append(TracingSpec(service_type=daemon,
+ es_nodes=self.es_nodes,
+ placement=daemon_placement,
+ unmanaged=self.unmanaged,
+ config=self.config,
+ networks=self.networks,
+ preview_only=self.preview_only
+ ))
+ return specs
+
+
+yaml.add_representer(TracingSpec, ServiceSpec.yaml_representer)
+
+
+class TunedProfileSpec():
+ def __init__(self,
+ profile_name: str,
+ placement: Optional[PlacementSpec] = None,
+ settings: Optional[Dict[str, str]] = None,
+ ):
+ self.profile_name = profile_name
+ self.placement = placement or PlacementSpec(host_pattern='*')
+ self.settings = settings or {}
+ self._last_updated: str = ''
+
+ @classmethod
+ def from_json(cls, spec: Dict[str, Any]) -> 'TunedProfileSpec':
+ data = {}
+ if 'profile_name' not in spec:
+ raise SpecValidationError('Tuned profile spec must include "profile_name" field')
+ data['profile_name'] = spec['profile_name']
+ if not isinstance(data['profile_name'], str):
+ raise SpecValidationError('"profile_name" field must be a string')
+ if 'placement' in spec:
+ data['placement'] = PlacementSpec.from_json(spec['placement'])
+ if 'settings' in spec:
+ data['settings'] = spec['settings']
+ return cls(**data)
+
+ def to_json(self) -> Dict[str, Any]:
+ res: Dict[str, Any] = {}
+ res['profile_name'] = self.profile_name
+ res['placement'] = self.placement.to_json()
+ res['settings'] = self.settings
+ return res
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, TunedProfileSpec):
+ if (
+ self.placement == other.placement
+ and self.profile_name == other.profile_name
+ and self.settings == other.settings
+ ):
+ return True
+ return False
+ return NotImplemented
+
+ def __repr__(self) -> str:
+ return f'TunedProfile({self.profile_name})'
+
+ def copy(self) -> 'TunedProfileSpec':
+ # for making deep copies so you can edit the settings in one without affecting the other
+ # mostly for testing purposes
+ return TunedProfileSpec(self.profile_name, self.placement, self.settings.copy())
+
+
+class CephExporterSpec(ServiceSpec):
+ def __init__(self,
+ service_type: str = 'ceph-exporter',
+ sock_dir: Optional[str] = None,
+ addrs: str = '',
+ port: Optional[int] = None,
+ prio_limit: Optional[int] = 5,
+ stats_period: Optional[int] = 5,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ extra_container_args: Optional[GeneralArgList] = None,
+ ):
+ assert service_type == 'ceph-exporter'
+
+ super(CephExporterSpec, self).__init__(
+ service_type,
+ placement=placement,
+ unmanaged=unmanaged,
+ preview_only=preview_only,
+ extra_container_args=extra_container_args)
+
+ self.service_type = service_type
+ self.sock_dir = sock_dir
+ self.addrs = addrs
+ self.port = port
+ self.prio_limit = prio_limit
+ self.stats_period = stats_period
+
+ def validate(self) -> None:
+ super(CephExporterSpec, self).validate()
+
+ if not isinstance(self.prio_limit, int):
+ raise SpecValidationError(
+ f'prio_limit must be an integer. Got {type(self.prio_limit)}')
+ if not isinstance(self.stats_period, int):
+ raise SpecValidationError(
+ f'stats_period must be an integer. Got {type(self.stats_period)}')
+
+
+yaml.add_representer(CephExporterSpec, ServiceSpec.yaml_representer)
diff --git a/src/python-common/ceph/deployment/translate.py b/src/python-common/ceph/deployment/translate.py
new file mode 100644
index 000000000..86243b8ae
--- /dev/null
+++ b/src/python-common/ceph/deployment/translate.py
@@ -0,0 +1,198 @@
+import logging
+
+try:
+ from typing import Optional, List, Dict
+except ImportError:
+ pass
+
+from ceph.deployment.drive_selection.selector import DriveSelection
+
+logger = logging.getLogger(__name__)
+
+
+# TODO refactor this to a DriveSelection method
+class to_ceph_volume(object):
+
+ _supported_device_classes = [
+ "hdd", "ssd", "nvme"
+ ]
+
+ def __init__(self,
+ selection, # type: DriveSelection
+ osd_id_claims=None, # type: Optional[List[str]]
+ preview=False # type: bool
+ ):
+
+ self.selection = selection
+ self.spec = selection.spec
+ self.preview = preview
+ self.osd_id_claims = osd_id_claims
+
+ def prepare_devices(self):
+
+ # type: () -> Dict[str, List[str]]
+
+ lvcount: Dict[str, List[str]] = dict()
+
+ """
+ Default entry for the global crush_device_class definition;
+ if there's no global definition at spec level, we do not want
+ to apply anything to the provided devices, hence we need to run
+ a ceph-volume command without that option, otherwise we init an
+ entry for the globally defined crush_device_class.
+ """
+ if self.spec.crush_device_class:
+ lvcount[self.spec.crush_device_class] = []
+
+ # entry where the drives that don't require a crush_device_class
+ # option are collected
+ lvcount["no_crush"] = []
+
+ """
+ for each device, check if it's just a path or it has a crush_device
+ class definition, and append an entry to the right crush_device_
+ class group
+ """
+ for device in self.selection.data_devices():
+ # iterate on List[Device], containing both path and
+ # crush_device_class
+ path = device.path
+ crush_device_class = device.crush_device_class
+
+ if path is None:
+ raise ValueError("Device path can't be empty")
+
+ """
+ if crush_device_class is specified for the current Device path
+ we should either init the array for this group or append the
+ drive path to the existing entry
+ """
+ if crush_device_class:
+ if crush_device_class in lvcount.keys():
+ lvcount[crush_device_class].append(path)
+ else:
+ lvcount[crush_device_class] = [path]
+ continue
+
+ """
+ if no crush_device_class is specified for the current path
+ but a global definition is present in the spec, so we group
+ the drives together
+ """
+ if crush_device_class is None and self.spec.crush_device_class:
+ lvcount[self.spec.crush_device_class].append(path)
+ continue
+ else:
+ # default use case
+ lvcount["no_crush"].append(path)
+ continue
+
+ return lvcount
+
+ def run(self):
+ # type: () -> List[str]
+ """ Generate ceph-volume commands based on the DriveGroup filters """
+
+ db_devices = [x.path for x in self.selection.db_devices()]
+ wal_devices = [x.path for x in self.selection.wal_devices()]
+
+ if not self.selection.data_devices():
+ return []
+
+ cmds: List[str] = []
+
+ devices = self.prepare_devices()
+ # get the total number of devices provided by the Dict[str, List[str]]
+ devices_count = len(sum(list(devices.values()), []))
+
+ if devices and db_devices:
+ if (devices_count != len(db_devices)) and (self.spec.method == 'raw'):
+ raise ValueError('Number of data devices must match number of '
+ 'db devices for raw mode osds')
+
+ if devices and wal_devices:
+ if (devices_count != len(wal_devices)) and (self.spec.method == 'raw'):
+ raise ValueError('Number of data devices must match number of '
+ 'wal devices for raw mode osds')
+
+ for d in devices.keys():
+ data_devices: Optional[List[str]] = devices.get(d)
+ if not data_devices:
+ continue
+
+ if self.spec.method == 'raw':
+ assert self.spec.objectstore == 'bluestore'
+ # ceph-volume raw prepare only support 1:1 ratio of data to db/wal devices
+ # for raw prepare each data device needs its own prepare command
+ dev_counter = 0
+ # reversing the lists as we're assigning db_devices sequentially
+ db_devices.reverse()
+ wal_devices.reverse()
+
+ while dev_counter < len(data_devices):
+ cmd = "raw prepare --bluestore"
+ cmd += " --data {}".format(data_devices[dev_counter])
+ if db_devices:
+ cmd += " --block.db {}".format(db_devices.pop())
+ if wal_devices:
+ cmd += " --block.wal {}".format(wal_devices.pop())
+ if d in self._supported_device_classes:
+ cmd += " --crush-device-class {}".format(d)
+
+ cmds.append(cmd)
+ dev_counter += 1
+
+ elif self.spec.objectstore == 'bluestore':
+ # for lvm batch we can just do all devices in one command
+
+ cmd = "lvm batch --no-auto {}".format(" ".join(data_devices))
+
+ if db_devices:
+ cmd += " --db-devices {}".format(" ".join(db_devices))
+
+ if wal_devices:
+ cmd += " --wal-devices {}".format(" ".join(wal_devices))
+
+ if self.spec.block_wal_size:
+ cmd += " --block-wal-size {}".format(self.spec.block_wal_size)
+
+ if self.spec.block_db_size:
+ cmd += " --block-db-size {}".format(self.spec.block_db_size)
+
+ if d in self._supported_device_classes:
+ cmd += " --crush-device-class {}".format(d)
+ cmds.append(cmd)
+
+ for i in range(len(cmds)):
+ if self.spec.encrypted:
+ cmds[i] += " --dmcrypt"
+
+ if self.spec.osds_per_device:
+ cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device)
+
+ if self.spec.data_allocate_fraction:
+ cmds[i] += " --data-allocate-fraction {}".format(self.spec.data_allocate_fraction)
+
+ if self.osd_id_claims:
+ cmds[i] += " --osd-ids {}".format(" ".join(self.osd_id_claims))
+
+ if self.spec.method != 'raw':
+ cmds[i] += " --yes"
+ cmds[i] += " --no-systemd"
+
+ # set the --crush-device-class option when:
+ # - crush_device_class is specified at spec level (global for all the osds) # noqa E501
+ # - crush_device_class is allowed
+ # - there's no override at osd level
+ if (
+ self.spec.crush_device_class and
+ self.spec.crush_device_class in self._supported_device_classes and # noqa E501
+ "crush-device-class" not in cmds[i]
+ ):
+ cmds[i] += " --crush-device-class {}".format(self.spec.crush_device_class) # noqa E501
+
+ if self.preview:
+ cmds[i] += " --report"
+ cmds[i] += " --format json"
+
+ return cmds
diff --git a/src/python-common/ceph/deployment/utils.py b/src/python-common/ceph/deployment/utils.py
new file mode 100644
index 000000000..6aad15b75
--- /dev/null
+++ b/src/python-common/ceph/deployment/utils.py
@@ -0,0 +1,102 @@
+import ipaddress
+import socket
+from typing import Tuple, Optional
+from urllib.parse import urlparse
+
+
+def unwrap_ipv6(address):
+ # type: (str) -> str
+ if address.startswith('[') and address.endswith(']'):
+ return address[1:-1]
+ return address
+
+
+def wrap_ipv6(address):
+ # type: (str) -> str
+
+ # We cannot assume it's already wrapped or even an IPv6 address if
+ # it's already wrapped it'll not pass (like if it's a hostname) and trigger
+ # the ValueError
+ try:
+ if ipaddress.ip_address(address).version == 6:
+ return f"[{address}]"
+ except ValueError:
+ pass
+
+ return address
+
+
+def is_ipv6(address):
+ # type: (str) -> bool
+ address = unwrap_ipv6(address)
+ try:
+ return ipaddress.ip_address(address).version == 6
+ except ValueError:
+ return False
+
+
+def valid_addr(addr: str) -> Tuple[bool, str]:
+ """check that an address string is valid
+ Valid in this case means that a name is resolvable, or the
+ IP address string is a correctly formed IPv4 or IPv6 address,
+ with or without a port
+
+ Args:
+ addr (str): address
+
+ Returns:
+ Tuple[bool, str]: Validity of the address, either
+ True, address type (IPv4[:Port], IPv6[:Port], Name[:Port])
+ False, <error description>
+ """
+
+ def _dns_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]:
+ try:
+ socket.getaddrinfo(addr, None)
+ except socket.gaierror:
+ # not resolvable
+ return False, 'DNS lookup failed'
+ return True, 'Name:Port' if port else 'Name'
+
+ def _ip_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]:
+ unwrapped = unwrap_ipv6(addr)
+ try:
+ ip_addr = ipaddress.ip_address(unwrapped)
+ except ValueError:
+ return False, 'Invalid IP v4 or v6 address format'
+ return True, f'IPv{ip_addr.version}:Port' if port else f'IPv{ip_addr.version}'
+
+ dots = addr.count('.')
+ colons = addr.count(':')
+ addr_as_url = f'http://{addr}'
+
+ try:
+ res = urlparse(addr_as_url)
+ except ValueError as e:
+ if str(e) == 'Invalid IPv6 URL':
+ return False, 'Address has incorrect/incomplete use of enclosing brackets'
+ return False, f'Unknown urlparse error {str(e)} for {addr_as_url}'
+
+ addr = res.netloc
+ port = None
+ try:
+ port = res.port
+ if port:
+ addr = addr[:-len(f':{port}')]
+ except ValueError:
+ if colons == 1:
+ return False, 'Port must be numeric'
+ elif ']:' in addr:
+ return False, 'Port must be numeric'
+
+ if addr.startswith('[') and dots:
+ return False, "IPv4 address wrapped in brackets is invalid"
+
+ # catch partial address like 10.8 which would be valid IPaddress schemes
+ # but are classed as invalid here since they're not usable
+ if dots and addr[0].isdigit() and dots != 3:
+ return False, 'Invalid partial IPv4 address'
+
+ if addr[0].isalpha() and '.' in addr:
+ return _dns_lookup(addr, port)
+ return _ip_lookup(addr, port)
diff --git a/src/python-common/ceph/py.typed b/src/python-common/ceph/py.typed
new file mode 100644
index 000000000..444b02d77
--- /dev/null
+++ b/src/python-common/ceph/py.typed
@@ -0,0 +1 @@
+# Marker file for PEP 561. This package uses inline types. \ No newline at end of file
diff --git a/src/python-common/ceph/rgw/__init__.py b/src/python-common/ceph/rgw/__init__.py
new file mode 100644
index 000000000..3988bf129
--- /dev/null
+++ b/src/python-common/ceph/rgw/__init__.py
@@ -0,0 +1,3 @@
+import logging
+
+log = logging.getLogger(__name__)
diff --git a/src/python-common/ceph/rgw/diff.py b/src/python-common/ceph/rgw/diff.py
new file mode 100644
index 000000000..cd91aa97f
--- /dev/null
+++ b/src/python-common/ceph/rgw/diff.py
@@ -0,0 +1,93 @@
+class ZoneEPs:
+ def __init__(self):
+ self.endpoints = set()
+
+ def add(self, ep):
+ if not ep:
+ return
+
+ self.endpoints.add(ep)
+
+ def diff(self, zep):
+ return list(self.endpoints.difference(zep.endpoints))
+
+ def get_all(self):
+ for ep in self.endpoints:
+ yield ep
+
+
+class RealmEPs:
+ def __init__(self):
+ self.zones = {}
+
+ def add(self, zone, ep=None):
+ if not zone:
+ return
+
+ z = self.zones.get(zone)
+ if not z:
+ z = ZoneEPs()
+ self.zones[zone] = z
+
+ z.add(ep)
+
+ def diff(self, rep):
+ result = {}
+ for z, zep in rep.zones.items():
+ myzep = self.zones.get(z)
+ if not myzep:
+ continue
+
+ d = myzep.diff(zep)
+ if len(d) > 0:
+ result[z] = myzep.diff(zep)
+
+ return result
+
+ def get_all(self):
+ for z, zep in self.zones.items():
+ eps = []
+ for ep in zep.get_all():
+ eps.append(ep)
+ yield z, eps
+
+
+class RealmsEPs:
+ def __init__(self):
+ self.realms = {}
+
+ def add(self, realm, zone=None, ep=None):
+ if not realm:
+ return
+
+ r = self.realms.get(realm)
+ if not r:
+ r = RealmEPs()
+ self.realms[realm] = r
+
+ r.add(zone, ep)
+
+ def diff(self, rep):
+ result = {}
+
+ for r, rep in rep.realms.items():
+ myrealm = self.realms.get(r)
+ if not myrealm:
+ continue
+
+ d = myrealm.diff(rep)
+ if d:
+ result[r] = d
+
+ return result
+
+ def get_all(self):
+ result = {}
+ for r, rep in self.realms.items():
+ zs = {}
+ for z, eps in rep.get_all():
+ zs[z] = eps
+
+ result[r] = zs
+
+ return result
diff --git a/src/python-common/ceph/rgw/rgwam_core.py b/src/python-common/ceph/rgw/rgwam_core.py
new file mode 100644
index 000000000..7041ea154
--- /dev/null
+++ b/src/python-common/ceph/rgw/rgwam_core.py
@@ -0,0 +1,937 @@
+# -*- mode:python -*-
+# vim: ts=4 sw=4 smarttab expandtab
+#
+# Processed in Makefile to add python #! line and version variable
+#
+#
+
+import random
+import string
+import json
+import socket
+import base64
+import logging
+import errno
+
+from .types import RGWAMException, RGWAMCmdRunException, RGWPeriod, RGWUser, RealmToken
+from .diff import RealmsEPs
+
+DEFAULT_PORT = 8000
+
+log = logging.getLogger(__name__)
+
+
+def bool_str(x):
+ return 'true' if x else 'false'
+
+
+def rand_alphanum_lower(k):
+ return ''.join(random.choices(string.ascii_lowercase + string.digits, k=k))
+
+
+def gen_name(prefix, suffix_len):
+ return prefix + rand_alphanum_lower(suffix_len)
+
+
+def set_or_gen(val, gen, prefix):
+ if val:
+ return val
+ if gen:
+ return gen_name(prefix, 8)
+
+ return None
+
+
+def get_endpoints(endpoints, period=None):
+ if endpoints:
+ return endpoints
+
+ hostname = socket.getfqdn()
+
+ port = DEFAULT_PORT
+
+ while True:
+ ep = 'http://%s:%d' % (hostname, port)
+ if not period or not period.endpoint_exists(ep):
+ return ep
+ port += 1
+
+
+class EnvArgs:
+ def __init__(self, mgr):
+ self.mgr = mgr
+
+
+class EntityKey:
+ def __init__(self, name=None, id=None):
+ self.name = name
+ self.id = id
+
+ def safe_vals(ek):
+ if not ek:
+ return None, None
+ return ek.name, ek.id
+
+
+class EntityName(EntityKey):
+ def __init__(self, name=None):
+ super().__init__(name=name)
+
+
+class EntityID(EntityKey):
+ def __init__(self, id=None):
+ super().__init__(id=id)
+
+
+class ZoneEnv:
+ def __init__(self, env: EnvArgs, realm: EntityKey = None, zg: EntityKey = None,
+ zone: EntityKey = None):
+ self.env = env
+ self.realm = realm
+ self.zg = zg
+ self.zone = zone
+
+ def set(self, env: EnvArgs = None, realm: EntityKey = None, zg: EntityKey = None,
+ zone: EntityKey = None):
+ if env:
+ self.env = env
+ if realm:
+ self.realm = realm
+ if zg:
+ self.zg = zg
+ if zone:
+ self.zone = zone
+
+ return self
+
+ def _init_entity(self, ek: EntityKey, gen, prefix):
+ name, id = EntityKey.safe_vals(ek)
+ name = set_or_gen(name, gen, prefix)
+
+ return EntityKey(name, id)
+
+ def init_realm(self, realm: EntityKey = None, gen=False):
+ self.realm = self._init_entity(realm, gen, 'realm-')
+ return self
+
+ def init_zg(self, zg: EntityKey = None, gen=False):
+ self.zg = self._init_entity(zg, gen, 'zg-')
+ return self
+
+ def init_zone(self, zone: EntityKey = None, gen=False):
+ self.zone = self._init_entity(zone, gen, 'zone-')
+ return self
+
+
+def opt_arg(params, cmd, arg):
+ if arg:
+ params += [cmd, arg]
+
+
+def opt_arg_bool(params, flag, arg):
+ if arg:
+ params += [flag]
+
+
+class RGWCmdBase:
+ def __init__(self, prog, zone_env: ZoneEnv):
+ self.env = zone_env.env
+ self.mgr = self.env.mgr
+ self.prog = prog
+ self.cmd_suffix = []
+ if zone_env.realm:
+ opt_arg(self.cmd_suffix, '--rgw-realm', zone_env.realm.name)
+ opt_arg(self.cmd_suffix, '--realm-id', zone_env.realm.id)
+ if zone_env.zg:
+ opt_arg(self.cmd_suffix, '--rgw-zonegroup', zone_env.zg.name)
+ opt_arg(self.cmd_suffix, '--zonegroup-id', zone_env.zg.id)
+ if zone_env.zone:
+ opt_arg(self.cmd_suffix, '--rgw-zone', zone_env.zone.name)
+ opt_arg(self.cmd_suffix, '--zone-id', zone_env.zone.id)
+
+ def run(self, cmd):
+ args = cmd + self.cmd_suffix
+ cmd, returncode, stdout, stderr = self.mgr.tool_exec(self.prog, args)
+
+ log.debug('cmd=%s' % str(cmd))
+ log.debug('stdout=%s' % stdout)
+
+ if returncode != 0:
+ cmd_str = ' '.join(cmd)
+ log.error('ERROR: command exited with error status (%d): %s\nstdout=%s\nstderr=%s' %
+ (returncode, cmd_str, stdout, stderr))
+ raise RGWAMCmdRunException(cmd_str, -returncode, stdout, stderr)
+
+ return (stdout, stderr)
+
+
+class RGWAdminCmd(RGWCmdBase):
+ def __init__(self, zone_env: ZoneEnv):
+ super().__init__('radosgw-admin', zone_env)
+
+
+class RGWAdminJSONCmd(RGWAdminCmd):
+ def __init__(self, zone_env: ZoneEnv):
+ super().__init__(zone_env)
+
+ def run(self, cmd):
+ stdout, _ = RGWAdminCmd.run(self, cmd)
+
+ return json.loads(stdout)
+
+
+class RGWCmd(RGWCmdBase):
+ def __init__(self, zone_env: ZoneEnv):
+ super().__init__('radosgw', zone_env)
+
+
+class RealmOp:
+ def __init__(self, env: EnvArgs):
+ self.env = env
+
+ def list(self):
+ try:
+ ze = ZoneEnv(self.env)
+ params = ['realm', 'list']
+ output = RGWAdminJSONCmd(ze).run(params)
+ return output.get('realms') or []
+ except RGWAMException as e:
+ logging.info(f'Exception while listing realms {e.message}')
+ # in case the realm list is empty an exception is raised
+ return []
+
+ def get(self, realm: EntityKey = None):
+ ze = ZoneEnv(self.env, realm=realm)
+ params = ['realm', 'get']
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def create(self, realm: EntityKey = None):
+ ze = ZoneEnv(self.env).init_realm(realm=realm, gen=True)
+ params = ['realm', 'create']
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def pull(self, realm, url, access_key, secret):
+ params = ['realm',
+ 'pull',
+ '--url', url,
+ '--access-key', access_key,
+ '--secret', secret]
+ ze = ZoneEnv(self.env, realm=realm)
+ return RGWAdminJSONCmd(ze).run(params)
+
+
+class ZonegroupOp:
+ def __init__(self, env: EnvArgs):
+ self.env = env
+
+ def list(self):
+ try:
+ ze = ZoneEnv(self.env)
+ params = ['zonegroup', 'list']
+ output = RGWAdminJSONCmd(ze).run(params)
+ return output.get('zonegroups') or []
+ except RGWAMException as e:
+ logging.info(f'Exception while listing zonegroups {e.message}')
+ return []
+
+ def get(self, zonegroup: EntityKey = None):
+ ze = ZoneEnv(self.env)
+ params = ['zonegroup', 'get']
+ opt_arg(params, '--rgw-zonegroup', zonegroup)
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def create(self, realm: EntityKey, zg: EntityKey = None, endpoints=None, is_master=True):
+ ze = ZoneEnv(self.env, realm=realm).init_zg(zg, gen=True)
+
+ params = ['zonegroup',
+ 'create']
+
+ opt_arg_bool(params, '--master', is_master)
+ opt_arg(params, '--endpoints', endpoints)
+
+ stdout, _ = RGWAdminCmd(ze).run(params)
+
+ return json.loads(stdout)
+
+ def modify(self, realm: EntityKey, zg: EntityKey, endpoints=None):
+ ze = ZoneEnv(self.env, realm=realm, zg=zg)
+ params = ['zonegroup', 'modify']
+ opt_arg(params, '--endpoints', endpoints)
+ return RGWAdminJSONCmd(ze).run(params)
+
+
+class ZoneOp:
+ def __init__(self, env: EnvArgs):
+ self.env = env
+
+ def list(self):
+ try:
+ ze = ZoneEnv(self.env)
+ params = ['zone', 'list']
+ output = RGWAdminJSONCmd(ze).run(params)
+ return output.get('zones') or []
+ except RGWAMException as e:
+ logging.info(f'Exception while listing zones {e.message}')
+ return []
+
+ def get(self, zone: EntityKey):
+ ze = ZoneEnv(self.env, zone=zone)
+
+ params = ['zone',
+ 'get']
+
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def create(self, realm: EntityKey, zonegroup: EntityKey, zone: EntityKey = None,
+ endpoints=None, is_master=True,
+ access_key=None, secret=None):
+
+ ze = ZoneEnv(self.env, realm=realm, zg=zonegroup).init_zone(zone, gen=True)
+
+ params = ['zone',
+ 'create']
+
+ opt_arg_bool(params, '--master', is_master)
+ opt_arg(params, '--access-key', access_key)
+ opt_arg(params, '--secret', secret)
+ opt_arg(params, '--endpoints', endpoints)
+
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def modify(self, zone: EntityKey, zg: EntityKey, is_master=None,
+ access_key=None, secret=None, endpoints=None):
+ ze = ZoneEnv(self.env, zone=zone, zg=zg)
+
+ params = ['zone',
+ 'modify']
+
+ opt_arg_bool(params, '--master', is_master)
+ opt_arg(params, '--access-key', access_key)
+ opt_arg(params, '--secret', secret)
+ opt_arg(params, '--endpoints', endpoints)
+
+ return RGWAdminJSONCmd(ze).run(params)
+
+
+class PeriodOp:
+ def __init__(self, env):
+ self.env = env
+
+ def update(self, realm: EntityKey, zonegroup: EntityKey, zone: EntityKey, commit=True):
+ master_zone_info = self.get_master_zone(realm, zonegroup)
+ master_zone = EntityName(master_zone_info['name']) if master_zone_info else zone
+ master_zonegroup_info = self.get_master_zonegroup(realm)
+ master_zonegroup = EntityName(master_zonegroup_info['name']) \
+ if master_zonegroup_info else zonegroup
+ ze = ZoneEnv(self.env, realm=realm, zg=master_zonegroup, zone=master_zone)
+ params = ['period', 'update']
+ opt_arg_bool(params, '--commit', commit)
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def get_master_zone(self, realm, zonegroup=None):
+ try:
+ ze = ZoneEnv(self.env, realm=realm, zg=zonegroup)
+ params = ['zone', 'get']
+ return RGWAdminJSONCmd(ze).run(params)
+ except RGWAMCmdRunException:
+ return None
+
+ def get_master_zone_ep(self, realm, zonegroup=None):
+ try:
+ ze = ZoneEnv(self.env, realm=realm, zg=zonegroup)
+ params = ['period', 'get']
+ output = RGWAdminJSONCmd(ze).run(params)
+ for zg in output['period_map']['zonegroups']:
+ if not bool(zg['is_master']):
+ continue
+ for zone in zg['zones']:
+ if zone['id'] == zg['master_zone']:
+ return zone['endpoints']
+ return None
+ except RGWAMCmdRunException:
+ return None
+
+ def get_master_zonegroup(self, realm):
+ try:
+ ze = ZoneEnv(self.env, realm=realm)
+ params = ['zonegroup', 'get']
+ return RGWAdminJSONCmd(ze).run(params)
+ except RGWAMCmdRunException:
+ return None
+
+ def get(self, realm=None):
+ ze = ZoneEnv(self.env, realm=realm)
+ params = ['period', 'get']
+ return RGWAdminJSONCmd(ze).run(params)
+
+
+class UserOp:
+ def __init__(self, env):
+ self.env = env
+
+ def create(self, zone: EntityKey, zg: EntityKey, uid=None, uid_prefix=None, display_name=None,
+ email=None, is_system=False):
+ ze = ZoneEnv(self.env, zone=zone, zg=zg)
+
+ u = uid or gen_name(uid_prefix or 'user-', 6)
+
+ dn = display_name or u
+
+ params = ['user',
+ 'create',
+ '--uid', u,
+ '--display-name', dn]
+
+ opt_arg(params, '--email', email)
+ opt_arg_bool(params, '--system', is_system)
+
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def info(self, zone: EntityKey, zg: EntityKey, uid=None, access_key=None):
+ ze = ZoneEnv(self.env, zone=zone, zg=zg)
+
+ params = ['user',
+ 'info']
+
+ opt_arg(params, '--uid', uid)
+ opt_arg(params, '--access-key', access_key)
+
+ return RGWAdminJSONCmd(ze).run(params)
+
+ def rm(self, zone: EntityKey, zg: EntityKey, uid=None, access_key=None):
+ ze = ZoneEnv(self.env, zone=zone, zg=zg)
+
+ params = ['user',
+ 'rm']
+
+ opt_arg(params, '--uid', uid)
+ opt_arg(params, '--access-key', access_key)
+
+ return RGWAdminCmd(ze).run(params)
+
+ def rm_key(self, zone: EntityKey, zg: EntityKey, access_key=None):
+ ze = ZoneEnv(self.env, zone=zone, zg=zg)
+
+ params = ['key',
+ 'remove']
+
+ opt_arg(params, '--access-key', access_key)
+
+ return RGWAdminCmd(ze).run(params)
+
+
+class RGWAM:
+ def __init__(self, env):
+ self.env = env
+
+ def realm_op(self):
+ return RealmOp(self.env)
+
+ def period_op(self):
+ return PeriodOp(self.env)
+
+ def zonegroup_op(self):
+ return ZonegroupOp(self.env)
+
+ def zone_op(self):
+ return ZoneOp(self.env)
+
+ def user_op(self):
+ return UserOp(self.env)
+
+ def get_realm(self, realm_name):
+ try:
+ realm_info = self.realm_op().get(EntityName(realm_name))
+ realm = EntityKey(realm_info['name'], realm_info['id'])
+ return realm
+ except RGWAMException:
+ raise None
+
+ def create_realm(self, realm_name):
+ try:
+ realm_info = self.realm_op().create(EntityName(realm_name))
+ realm = EntityKey(realm_info['name'], realm_info['id'])
+ logging.info(f'Created realm name={realm.name} id={realm.id}')
+ return realm
+ except RGWAMException as e:
+ raise RGWAMException('failed to create realm', e)
+
+ def create_zonegroup(self, realm, zonegroup_name, zonegroup_is_master, endpoints=None):
+ try:
+ zg_info = self.zonegroup_op().create(realm,
+ EntityName(zonegroup_name),
+ endpoints,
+ is_master=zonegroup_is_master)
+ zonegroup = EntityKey(zg_info['name'], zg_info['id'])
+ logging.info(f'Created zonegroup name={zonegroup.name} id={zonegroup.id}')
+ return zonegroup
+ except RGWAMException as e:
+ raise RGWAMException('failed to create zonegroup', e)
+
+ def create_zone(self, realm, zg, zone_name, zone_is_master, access_key=None,
+ secret=None, endpoints=None):
+ try:
+ zone_info = self.zone_op().create(realm, zg,
+ EntityName(zone_name),
+ endpoints,
+ is_master=zone_is_master,
+ access_key=access_key,
+ secret=secret)
+
+ zone = EntityKey(zone_info['name'], zone_info['id'])
+ logging.info(f'Created zone name={zone.name} id={zone.id}')
+ return zone
+ except RGWAMException as e:
+ raise RGWAMException('failed to create zone', e)
+
+ def create_system_user(self, realm, zonegroup, zone):
+ try:
+ sys_user_info = self.user_op().create(zone,
+ zonegroup,
+ uid=f'sysuser-{realm.name}',
+ uid_prefix='user-sys',
+ is_system=True)
+ sys_user = RGWUser(sys_user_info)
+ logging.info(f'Created system user: {sys_user.uid} on'
+ '{realm.name}/{zonegroup.name}/{zone.name}')
+ return sys_user
+ except RGWAMException as e:
+ raise RGWAMException('failed to create system user', e)
+
+ def create_normal_user(self, zg, zone, uid=None):
+ try:
+ user_info = self.user_op().create(zone, zg, uid=uid, is_system=False)
+ user = RGWUser(user_info)
+ logging.info('Created regular user {user.uid} on'
+ '{realm.name}/{zonegroup.name}/{zone.name}')
+ return user
+ except RGWAMException as e:
+ raise RGWAMException('failed to create user', e)
+
+ def update_period(self, realm, zg, zone=None):
+ try:
+ period_info = self.period_op().update(realm, zg, zone, commit=True)
+ period = RGWPeriod(period_info)
+ logging.info('Period: ' + period.id)
+ except RGWAMCmdRunException as e:
+ raise RGWAMException('failed to update period', e)
+
+ def realm_bootstrap(self, rgw_spec, start_radosgw=True):
+
+ realm_name = rgw_spec.rgw_realm
+ zonegroup_name = rgw_spec.rgw_zonegroup
+ zone_name = rgw_spec.rgw_zone
+
+ # Some sanity checks
+ if realm_name in self.realm_op().list():
+ raise RGWAMException(f'Realm {realm_name} already exists')
+ if zonegroup_name in self.zonegroup_op().list():
+ raise RGWAMException(f'Zonegroup {zonegroup_name} already exists')
+ if zone_name in self.zone_op().list():
+ raise RGWAMException(f'Zone {zone_name} already exists')
+
+ # Create RGW entities and update the period
+ realm = self.create_realm(realm_name)
+ zonegroup = self.create_zonegroup(realm, zonegroup_name, zonegroup_is_master=True)
+ zone = self.create_zone(realm, zonegroup, zone_name, zone_is_master=True)
+ self.update_period(realm, zonegroup)
+
+ # Create system user, normal user and update the master zone
+ sys_user = self.create_system_user(realm, zonegroup, zone)
+ rgw_acces_key = sys_user.get_key(0)
+ access_key = rgw_acces_key.access_key if rgw_acces_key else ''
+ secret = rgw_acces_key.secret_key if rgw_acces_key else ''
+ self.zone_op().modify(zone, zonegroup, None,
+ access_key, secret, endpoints=rgw_spec.zone_endpoints)
+ self.update_period(realm, zonegroup)
+
+ if start_radosgw and rgw_spec.zone_endpoints is None:
+ # Instruct the orchestrator to start RGW daemons, asynchronically, this will
+ # call back the rgw module to update the master zone with the corresponding endpoints
+ realm_token = RealmToken(realm_name,
+ realm.id,
+ None, # no endpoint
+ access_key, secret)
+ realm_token_b = realm_token.to_json().encode('utf-8')
+ realm_token_s = base64.b64encode(realm_token_b).decode('utf-8')
+ rgw_spec.rgw_realm_token = realm_token_s
+ rgw_spec.update_endpoints = True
+ self.env.mgr.apply_rgw(rgw_spec)
+
+ def realm_new_zone_creds(self, realm_name, endpoints, sys_uid):
+ try:
+ period_info = self.period_op().get(EntityName(realm_name))
+ except RGWAMException as e:
+ raise RGWAMException('failed to fetch period info', e)
+
+ period = RGWPeriod(period_info)
+
+ master_zg = EntityID(period.master_zonegroup)
+ master_zone = EntityID(period.master_zone)
+
+ try:
+ zone_info = self.zone_op().get(zone=master_zone)
+ except RGWAMException as e:
+ raise RGWAMException('failed to access master zone', e)
+
+ zone_id = zone_info['id']
+
+ logging.info('Period: ' + period.id)
+ logging.info('Master zone: ' + period.master_zone)
+
+ if period.master_zone != zone_id:
+ return (-errno.EINVAL, '', 'Command needs to run on master zone')
+
+ ep = ''
+ if not endpoints:
+ eps = period.get_zone_endpoints(period.master_zonegroup, period.master_zone)
+ else:
+ eps = endpoints.split(',')
+
+ if len(eps) > 0:
+ ep = eps[0]
+
+ try:
+ sys_user_info = self.user_op().create(master_zone, master_zg, uid=sys_uid,
+ uid_prefix='user-sys', is_system=True)
+ except RGWAMException as e:
+ raise RGWAMException('failed to create system user', e)
+
+ sys_user = RGWUser(sys_user_info)
+
+ logging.info('Created system user: %s' % sys_user.uid)
+
+ sys_access_key = ''
+ sys_secret = ''
+
+ if len(sys_user.keys) > 0:
+ sys_access_key = sys_user.keys[0].access_key
+ sys_secret = sys_user.keys[0].secret_key
+
+ realm_token = RealmToken(realm_name, period.realm_id, ep, sys_access_key, sys_secret)
+
+ logging.info(realm_token.to_json())
+
+ realm_token_b = realm_token.to_json().encode('utf-8')
+ return (0, 'Realm Token: %s' % base64.b64encode(realm_token_b).decode('utf-8'), '')
+
+ def realm_rm_zone_creds(self, realm_token_b64):
+ if not realm_token_b64:
+ raise RGWAMException('missing realm token')
+
+ realm_token = RealmToken.from_base64_str(realm_token_b64)
+ try:
+ period_info = self.period_op().get(EntityID(realm_token.realm_id))
+ except RGWAMException as e:
+ raise RGWAMException('failed to fetch period info', e)
+
+ period = RGWPeriod(period_info)
+ master_zg = EntityID(period.master_zonegroup)
+ master_zone = EntityID(period.master_zone)
+ logging.info('Period: ' + period.id)
+ logging.info('Master zone: ' + period.master_zone)
+ try:
+ zone_info = self.zone_op().get(zone=master_zone)
+ except RGWAMException as e:
+ raise RGWAMException('failed to access master zone', e)
+
+ if period.master_zone != zone_info['id']:
+ return (-errno.EINVAL, '', 'Command needs to run on master zone')
+
+ access_key = realm_token.access_key
+ try:
+ user_info = self.user_op().info(master_zone, master_zg, access_key=access_key)
+ except RGWAMException as e:
+ raise RGWAMException('failed to get the system user information', e)
+
+ user = RGWUser(user_info)
+
+ only_key = True
+
+ for k in user.keys:
+ if k.access_key != access_key:
+ only_key = False
+ break
+
+ success_message = ''
+
+ if only_key:
+ # the only key this user has is the one defined in the token
+ # can remove the user completely
+
+ try:
+ self.user_op().rm(master_zone, master_zg, uid=user.uid)
+ except RGWAMException as e:
+ raise RGWAMException('failed removing user ' + user, user.uid, e)
+
+ success_message = 'Removed uid ' + user.uid
+ else:
+ try:
+ self.user_op().rm_key(master_zone, master_zg, access_key=access_key)
+ except RGWAMException as e:
+ raise RGWAMException('failed removing access key ' +
+ access_key + '(uid = ' + user.uid + ')', e)
+
+ success_message = 'Removed access key ' + access_key + '(uid = ' + user.uid + ')'
+
+ return (0, success_message, '')
+
+ def zone_modify(self, realm_name, zonegroup_name, zone_name, endpoints, realm_token_b64):
+
+ if not realm_token_b64:
+ raise RGWAMException('missing realm access config')
+ if zone_name is None:
+ raise RGWAMException('Zone name is a mandatory parameter')
+
+ realm_token = RealmToken.from_base64_str(realm_token_b64)
+ access_key = realm_token.access_key
+ secret = realm_token.secret
+ realm_name = realm_token.realm_name
+ realm_id = realm_token.realm_id
+ logging.info(f'Using realm {realm_name} {realm_id}')
+
+ realm = EntityID(realm_id)
+ period_info = self.period_op().get(realm)
+ period = RGWPeriod(period_info)
+ logging.info('Period: ' + period.id)
+ zonegroup = period.find_zonegroup_by_name(zonegroup_name)
+ if not zonegroup:
+ raise RGWAMException(f'zonegroup {zonegroup_name} not found')
+
+ zg = EntityName(zonegroup.name)
+ zone = EntityName(zone_name)
+ master_zone_info = self.period_op().get_master_zone(realm, zg)
+ success_message = f'Modified zone {realm_name} {zonegroup_name} {zone_name}'
+ logging.info(success_message)
+ try:
+ self.zone_op().modify(zone, zg, access_key=access_key,
+ secret=secret, endpoints=','.join(endpoints))
+ # we only update the zonegroup endpoints if the zone being
+ # modified is a master zone
+ if zone_name == master_zone_info['name']:
+ self.zonegroup_op().modify(realm, zg, endpoints=','.join(endpoints))
+ except RGWAMException as e:
+ raise RGWAMException('failed to modify zone', e)
+
+ # done, let's update the period
+ try:
+ period_info = self.period_op().update(realm, zg, zone, True)
+ except RGWAMException as e:
+ raise RGWAMException('failed to update period', e)
+
+ period = RGWPeriod(period_info)
+ logging.debug(period.to_json())
+
+ return (0, success_message, '')
+
+ def get_realms_info(self):
+ realms_info = []
+ for realm_name in self.realm_op().list():
+ realm = self.get_realm(realm_name)
+ master_zone_inf = self.period_op().get_master_zone(realm)
+ zone_ep = self.period_op().get_master_zone_ep(realm)
+ if master_zone_inf and 'system_key' in master_zone_inf:
+ access_key = master_zone_inf['system_key']['access_key']
+ secret = master_zone_inf['system_key']['secret_key']
+ else:
+ access_key = ''
+ secret = ''
+ realms_info.append({"realm_name": realm_name,
+ "realm_id": realm.id,
+ "master_zone_id": master_zone_inf['id'] if master_zone_inf else '',
+ "endpoint": zone_ep[0] if zone_ep else None,
+ "access_key": access_key,
+ "secret": secret})
+ return realms_info
+
+ def zone_create(self, rgw_spec, start_radosgw):
+
+ if not rgw_spec.rgw_realm_token:
+ raise RGWAMException('missing realm token')
+ if rgw_spec.rgw_zone is None:
+ raise RGWAMException('Zone name is a mandatory parameter')
+ if rgw_spec.rgw_zone in self.zone_op().list():
+ raise RGWAMException(f'Zone {rgw_spec.rgw_zone} already exists')
+
+ realm_token = RealmToken.from_base64_str(rgw_spec.rgw_realm_token)
+ if realm_token.endpoint is None:
+ raise RGWAMException('Provided realm token has no endpoint')
+
+ access_key = realm_token.access_key
+ secret = realm_token.secret
+ try:
+ realm_info = self.realm_op().pull(EntityName(realm_token.realm_name),
+ realm_token.endpoint, access_key, secret)
+ except RGWAMException as e:
+ raise RGWAMException('failed to pull realm', e)
+
+ logging.info(f"Pulled realm {realm_info['name']} ({realm_info['id']})")
+ realm_name = realm_info['name']
+ realm_id = realm_info['id']
+
+ realm = EntityID(realm_id)
+ period_info = self.period_op().get(realm)
+ period = RGWPeriod(period_info)
+ logging.info('Period: ' + period.id)
+
+ zonegroup = period.get_master_zonegroup()
+ if not zonegroup:
+ raise RGWAMException('Cannot find master zonegroup of realm {realm_name}')
+
+ zone = self.create_zone(realm, zonegroup, rgw_spec.rgw_zone,
+ False, # secondary zone
+ access_key, secret, endpoints=rgw_spec.zone_endpoints)
+ self.update_period(realm, zonegroup, zone)
+
+ period = RGWPeriod(period_info)
+ logging.debug(period.to_json())
+
+ if start_radosgw and rgw_spec.zone_endpoints is None:
+ secondary_realm_token = RealmToken(realm_name,
+ realm_id,
+ None, # no endpoint
+ realm_token.access_key,
+ realm_token.secret)
+ realm_token_b = secondary_realm_token.to_json().encode('utf-8')
+ realm_token_s = base64.b64encode(realm_token_b).decode('utf-8')
+ rgw_spec.update_endpoints = True
+ rgw_spec.rgw_token = realm_token_s
+ rgw_spec.rgw_zonegroup = zonegroup.name # master zonegroup is used
+ self.env.mgr.apply_rgw(rgw_spec)
+
+ def _get_daemon_eps(self, realm_name=None, zonegroup_name=None, zone_name=None):
+ # get running daemons info
+ service_name = None
+ if realm_name and zone_name:
+ service_name = 'rgw.%s.%s' % (realm_name, zone_name)
+
+ daemon_type = 'rgw'
+ daemon_id = None
+ hostname = None
+ refresh = True
+
+ daemons = self.env.mgr.list_daemons(service_name,
+ daemon_type,
+ daemon_id=daemon_id,
+ host=hostname,
+ refresh=refresh)
+
+ rep = RealmsEPs()
+
+ for s in daemons:
+ for p in s.ports:
+ svc_id = s.service_id()
+ sp = svc_id.split('.')
+ if len(sp) < 2:
+ log.error('ERROR: service id cannot be parsed: (svc_id=%s)' % svc_id)
+ continue
+
+ svc_realm = sp[0]
+ svc_zone = sp[1]
+
+ if realm_name and svc_realm != realm_name:
+ log.debug('skipping realm %s' % svc_realm)
+ continue
+
+ if zone_name and svc_zone != zone_name:
+ log.debug('skipping zone %s' % svc_zone)
+ continue
+
+ ep = 'http://%s:%d' % (s.hostname, p) # ssl?
+
+ rep.add(svc_realm, svc_zone, ep)
+
+ return rep
+
+ def _get_rgw_eps(self, realm_name=None, zonegroup_name=None, zone_name=None):
+ rep = RealmsEPs()
+
+ try:
+ realms = self.realm_op().list()
+ except RGWAMException as e:
+ raise RGWAMException('failed to list realms', e)
+
+ zones_map = {}
+ for realm in realms:
+ if realm_name and realm != realm_name:
+ log.debug('skipping realm %s' % realm)
+ continue
+
+ period_info = self.period_op().get(EntityName(realm))
+
+ period = RGWPeriod(period_info)
+
+ zones_map[realm] = {}
+
+ for zg in period.iter_zonegroups():
+ if zonegroup_name and zg.name != zonegroup_name:
+ log.debug('skipping zonegroup %s' % zg.name)
+ continue
+
+ for zone in zg.iter_zones():
+ if zone_name and zone.name != zone_name:
+ log.debug('skipping zone %s' % zone.name)
+ continue
+
+ zones_map[realm][zone.name] = zg.name
+
+ if len(zone.endpoints) == 0:
+ rep.add(realm, zone.name, None)
+ continue
+
+ for ep in zone.endpoints:
+ rep.add(realm, zone.name, ep)
+
+ return (rep, zones_map)
+
+ def realm_reconcile(self, realm_name=None, zonegroup_name=None, zone_name=None, update=False):
+
+ daemon_rep = self._get_daemon_eps(realm_name, zonegroup_name, zone_name)
+
+ rgw_rep, zones_map = self._get_rgw_eps(realm_name, zonegroup_name, zone_name)
+
+ diff = daemon_rep.diff(rgw_rep)
+
+ diffj = json.dumps(diff)
+
+ if not update:
+ return (0, diffj, '')
+
+ for realm, realm_diff in diff.items():
+ for zone, endpoints in realm_diff.items():
+
+ zg = zones_map[realm][zone]
+
+ try:
+ self.zone_op().modify(EntityName(zone), EntityName(zg),
+ endpoints=','.join(diff[realm][zone]))
+ except RGWAMException as e:
+ raise RGWAMException('failed to modify zone', e)
+
+ try:
+ self.period_op().update(EntityName(realm), EntityName(zg), EntityName(zone), True)
+ except RGWAMException as e:
+ raise RGWAMException('failed to update period', e)
+
+ return (0, 'Updated: ' + diffj, '')
+
+ def run_radosgw(self, port=None, log_file=None, debug_ms=None, debug_rgw=None):
+
+ fe_cfg = 'beast'
+ if port:
+ fe_cfg += ' port=%s' % port
+
+ params = ['--rgw-frontends', fe_cfg]
+
+ if log_file:
+ params += ['--log-file', log_file]
+
+ if debug_ms:
+ params += ['--debug-ms', debug_ms]
+
+ if debug_rgw:
+ params += ['--debug-rgw', debug_rgw]
+
+ (retcode, stdout, stderr) = RGWCmd(self.env).run(params)
+
+ return (retcode, stdout, stderr)
diff --git a/src/python-common/ceph/rgw/types.py b/src/python-common/ceph/rgw/types.py
new file mode 100644
index 000000000..3f65f9da0
--- /dev/null
+++ b/src/python-common/ceph/rgw/types.py
@@ -0,0 +1,186 @@
+import json
+import base64
+import binascii
+import errno
+
+from abc import abstractmethod
+
+
+class RGWAMException(Exception):
+ def __init__(self, message, orig=None):
+ if orig:
+ self.message = message + ': ' + orig.message
+ self.retcode = orig.retcode
+ self.stdout = orig.stdout
+ self.stderr = orig.stdout
+ else:
+ self.message = message
+ self.retcode = -errno.EINVAL
+ self.stdout = ''
+ self.stderr = message
+
+
+class RGWAMCmdRunException(RGWAMException):
+ def __init__(self, cmd, retcode, stdout, stderr):
+ super().__init__('Command error (%d): %s\nstdout:%s\nstderr:%s' %
+ (retcode, cmd, stdout, stderr))
+ self.retcode = retcode
+ self.stdout = stdout
+ self.stderr = stderr
+
+
+class RGWAMEnvMgr:
+ @abstractmethod
+ def tool_exec(self, prog, args):
+ pass
+
+ @abstractmethod
+ def apply_rgw(self, spec):
+ pass
+
+ @abstractmethod
+ def list_daemons(self, service_name, daemon_type=None, daemon_id=None, hostname=None,
+ refresh=True):
+ pass
+
+
+class JSONObj:
+ def to_json(self):
+ return json.dumps(self, default=lambda o: o.__dict__, indent=4)
+
+
+class RealmToken(JSONObj):
+ def __init__(self, realm_name, realm_id, endpoint, access_key, secret):
+ self.realm_name = realm_name
+ self.realm_id = realm_id
+ self.endpoint = endpoint
+ self.access_key = access_key
+ self.secret = secret
+
+ @classmethod
+ def from_base64_str(cls, realm_token_b64):
+ try:
+ realm_token_b = base64.b64decode(realm_token_b64)
+ realm_token_s = realm_token_b.decode('utf-8')
+ realm_token = json.loads(realm_token_s)
+ return cls(**realm_token)
+ except binascii.Error:
+ return None
+
+
+class RGWZone(JSONObj):
+ def __init__(self, zone_dict):
+ self.id = zone_dict['id']
+ self.name = zone_dict['name']
+ self.endpoints = zone_dict['endpoints']
+
+
+class RGWZoneGroup(JSONObj):
+ def __init__(self, zg_dict):
+ self.id = zg_dict['id']
+ self.name = zg_dict['name']
+ self.api_name = zg_dict['api_name']
+ self.is_master = zg_dict['is_master']
+ self.endpoints = zg_dict['endpoints']
+
+ self.zones_by_id = {}
+ self.zones_by_name = {}
+ self.all_endpoints = []
+
+ for zone in zg_dict['zones']:
+ z = RGWZone(zone)
+ self.zones_by_id[zone['id']] = z
+ self.zones_by_name[zone['name']] = z
+ self.all_endpoints += z.endpoints
+
+ def endpoint_exists(self, endpoint):
+ for ep in self.all_endpoints:
+ if ep == endpoint:
+ return True
+ return False
+
+ def get_zone_endpoints(self, zone_id):
+ z = self.zones_by_id.get(zone_id)
+ if not z:
+ return None
+
+ return z.endpoints
+
+ def iter_zones(self):
+ for zone in self.zones_by_id.values():
+ yield zone
+
+
+class RGWPeriod(JSONObj):
+ def __init__(self, period_dict):
+ self.id = period_dict['id']
+ self.epoch = period_dict['epoch']
+ self.master_zone = period_dict['master_zone']
+ self.master_zonegroup = period_dict['master_zonegroup']
+ self.realm_name = period_dict['realm_name']
+ self.realm_id = period_dict['realm_id']
+ pm = period_dict['period_map']
+ self.zonegroups_by_id = {}
+ self.zonegroups_by_name = {}
+
+ for zg in pm['zonegroups']:
+ self.zonegroups_by_id[zg['id']] = RGWZoneGroup(zg)
+ self.zonegroups_by_name[zg['name']] = RGWZoneGroup(zg)
+
+ def endpoint_exists(self, endpoint):
+ for _, zg in self.zonegroups_by_id.items():
+ if zg.endpoint_exists(endpoint):
+ return True
+ return False
+
+ def find_zonegroup_by_name(self, zonegroup):
+ if not zonegroup:
+ return self.find_zonegroup_by_id(self.master_zonegroup)
+ return self.zonegroups_by_name.get(zonegroup)
+
+ def get_master_zonegroup(self):
+ return self.find_zonegroup_by_id(self.master_zonegroup)
+
+ def find_zonegroup_by_id(self, zonegroup):
+ return self.zonegroups_by_id.get(zonegroup)
+
+ def get_zone_endpoints(self, zonegroup_id, zone_id):
+ zg = self.zonegroups_by_id.get(zonegroup_id)
+ if not zg:
+ return None
+
+ return zg.get_zone_endpoints(zone_id)
+
+ def iter_zonegroups(self):
+ for zg in self.zonegroups_by_id.values():
+ yield zg
+
+
+class RGWAccessKey(JSONObj):
+ def __init__(self, d):
+ self.uid = d['user']
+ self.access_key = d['access_key']
+ self.secret_key = d['secret_key']
+
+
+class RGWUser(JSONObj):
+ def __init__(self, d):
+ self.uid = d['user_id']
+ self.display_name = d['display_name']
+ self.email = d['email']
+
+ self.keys = []
+
+ for k in d['keys']:
+ self.keys.append(RGWAccessKey(k))
+
+ is_system = d.get('system') or 'false'
+ self.system = (is_system == 'true')
+
+ def add_key(self, access_key, secret):
+ self.keys.append(RGWAccessKey({'user': self.uid,
+ 'access_key': access_key,
+ 'secret_key': secret}))
+
+ def get_key(self, index):
+ return self.keys[index] if index < len(self.keys) else None
diff --git a/src/python-common/ceph/tests/__init__.py b/src/python-common/ceph/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/python-common/ceph/tests/__init__.py
diff --git a/src/python-common/ceph/tests/c-v-inventory.json b/src/python-common/ceph/tests/c-v-inventory.json
new file mode 100644
index 000000000..c24345525
--- /dev/null
+++ b/src/python-common/ceph/tests/c-v-inventory.json
@@ -0,0 +1,155 @@
+[
+ {
+ "available": false,
+ "created": "2022-02-11T10:58:23.177450Z",
+ "rejected_reasons": [
+ "locked"
+ ],
+ "sys_api": {
+ "scheduler_mode": "",
+ "rotational": "0",
+ "vendor": "",
+ "human_readable_size": "50.00 GB",
+ "sectors": 0,
+ "sas_device_handle": "",
+ "partitions": {},
+ "rev": "",
+ "sas_address": "",
+ "locked": 1,
+ "sectorsize": "512",
+ "removable": "0",
+ "path": "/dev/dm-0",
+ "support_discard": "",
+ "model": "",
+ "ro": "0",
+ "nr_requests": "128",
+ "size": 53687091200
+ },
+ "lvs": [],
+ "path": "/dev/dm-0"
+ },
+ {
+ "available": false,
+ "rejected_reasons": [
+ "locked"
+ ],
+ "sys_api": {
+ "scheduler_mode": "",
+ "rotational": "0",
+ "vendor": "",
+ "human_readable_size": "31.47 GB",
+ "sectors": 0,
+ "sas_device_handle": "",
+ "partitions": {},
+ "rev": "",
+ "sas_address": "",
+ "locked": 1,
+ "sectorsize": "512",
+ "removable": "0",
+ "path": "/dev/dm-1",
+ "support_discard": "",
+ "model": "",
+ "ro": "0",
+ "nr_requests": "128",
+ "size": 33789313024
+ },
+ "lvs": [],
+ "path": "/dev/dm-1"
+ },
+ {
+ "available": false,
+ "created": "2022-02-11T10:58:23.177450Z",
+ "rejected_reasons": [
+ "locked"
+ ],
+ "sys_api": {
+ "scheduler_mode": "",
+ "rotational": "0",
+ "vendor": "",
+ "human_readable_size": "394.27 GB",
+ "sectors": 0,
+ "sas_device_handle": "",
+ "partitions": {},
+ "rev": "",
+ "sas_address": "",
+ "locked": 1,
+ "sectorsize": "512",
+ "removable": "0",
+ "path": "/dev/dm-2",
+ "support_discard": "",
+ "model": "",
+ "ro": "0",
+ "nr_requests": "128",
+ "size": 423347879936
+ },
+ "lvs": [],
+ "path": "/dev/dm-2"
+ },
+ {
+ "available": false,
+ "rejected_reasons": [
+ "locked"
+ ],
+ "sys_api": {
+ "scheduler_mode": "cfq",
+ "rotational": "0",
+ "vendor": "ATA",
+ "human_readable_size": "476.94 GB",
+ "sectors": 0,
+ "sas_device_handle": "",
+ "partitions": {
+ "sda2": {
+ "start": "411648",
+ "holders": [],
+ "sectorsize": 512,
+ "sectors": "2097152",
+ "size": "1024.00 MB"
+ },
+ "sda3": {
+ "start": "2508800",
+ "holders": [
+ "dm-1",
+ "dm-2",
+ "dm-0"
+ ],
+ "sectorsize": 512,
+ "sectors": "997705728",
+ "size": "475.74 GB"
+ },
+ "sda1": {
+ "start": "2048",
+ "holders": [],
+ "sectorsize": 512,
+ "sectors": "409600",
+ "size": "200.00 MB"
+ }
+ },
+ "rev": "0000",
+ "sas_address": "",
+ "locked": 1,
+ "sectorsize": "512",
+ "removable": "0",
+ "path": "/dev/sda",
+ "support_discard": "",
+ "model": "SanDisk SD8SN8U5",
+ "ro": "0",
+ "nr_requests": "128",
+ "size": 512110190592
+ },
+ "lvs": [
+ {
+ "comment": "not used by ceph",
+ "name": "swap"
+ },
+ {
+ "comment": "not used by ceph",
+ "name": "home"
+ },
+ {
+ "comment": "not used by ceph",
+ "name": "root"
+ }
+ ],
+ "path": "/dev/sda"
+ }
+]
diff --git a/src/python-common/ceph/tests/factories.py b/src/python-common/ceph/tests/factories.py
new file mode 100644
index 000000000..6938fd084
--- /dev/null
+++ b/src/python-common/ceph/tests/factories.py
@@ -0,0 +1,101 @@
+from ceph.deployment.inventory import Device
+
+
+class InventoryFactory(object):
+ def __init__(self):
+ self.taken_paths = []
+
+ def _make_path(self, ident='b'):
+ return "/dev/{}{}".format(self.prefix, ident)
+
+ def _find_new_path(self):
+ cnt = 0
+ if len(self.taken_paths) >= 25:
+ raise Exception(
+ "Double-character disks are not implemetend. Maximum amount"
+ "of disks reached.")
+
+ while self.path in self.taken_paths:
+ ident = chr(ord('b') + cnt)
+ self.path = "/dev/{}{}".format(self.prefix, ident)
+ cnt += 1
+
+ def assemble(self):
+ if self.empty:
+ return {}
+ self._find_new_path()
+ inventory_sample = {
+ 'available': self.available,
+ 'lvs': [],
+ 'path': self.path,
+ 'rejected_reasons': self.rejected_reason,
+ 'sys_api': {
+ 'human_readable_size': self.human_readable_size,
+ 'locked': 1,
+ 'model': self.model,
+ 'nr_requests': '256',
+ 'partitions':
+ { # partitions are not as relevant for now, todo for later
+ 'sda1': {
+ 'sectors': '41940992',
+ 'sectorsize': 512,
+ 'size': self.human_readable_size,
+ 'start': '2048'
+ }
+ },
+ 'path': self.path,
+ 'removable': '0',
+ 'rev': '',
+ 'ro': '0',
+ 'rotational': str(self.rotational),
+ 'sas_address': '',
+ 'sas_device_handle': '',
+ 'scheduler_mode': 'mq-deadline',
+ 'sectors': 0,
+ 'sectorsize': '512',
+ 'size': self.size,
+ 'support_discard': '',
+ 'vendor': self.vendor
+ }
+ }
+
+ if self.available:
+ self.taken_paths.append(self.path)
+ return inventory_sample
+ return {}
+
+ def _init(self, **kwargs):
+ self.prefix = 'sd'
+ self.path = kwargs.get('path', self._make_path())
+ self.human_readable_size = kwargs.get('human_readable_size',
+ '50.00 GB')
+ self.vendor = kwargs.get('vendor', 'samsung')
+ self.model = kwargs.get('model', '42-RGB')
+ self.available = kwargs.get('available', True)
+ self.rejected_reason = kwargs.get('rejected_reason', [''])
+ self.rotational = kwargs.get('rotational', '1')
+ if not self.available:
+ self.rejected_reason = ['locked']
+ self.empty = kwargs.get('empty', False)
+ self.size = kwargs.get('size', 5368709121)
+
+ def produce(self, pieces=1, **kwargs):
+ if kwargs.get('path') and pieces > 1:
+ raise Exception("/path/ and /pieces/ are mutually exclusive")
+ # Move to custom init to track _taken_paths.
+ # class is invoked once in each context.
+ # if disks with different properties are being created
+ # we'd have to re-init the class and loose track of the
+ # taken_paths
+ self._init(**kwargs)
+ return [self.assemble() for x in range(0, pieces)]
+
+
+class DeviceFactory(object):
+ def __init__(self, device_setup):
+ self.device_setup = device_setup
+ self.pieces = device_setup.get('pieces', 1)
+ self.device_conf = device_setup.get('device_config', {})
+
+ def produce(self):
+ return [Device(**self.device_conf) for x in range(0, self.pieces)]
diff --git a/src/python-common/ceph/tests/test_datetime.py b/src/python-common/ceph/tests/test_datetime.py
new file mode 100644
index 000000000..d03a82930
--- /dev/null
+++ b/src/python-common/ceph/tests/test_datetime.py
@@ -0,0 +1,61 @@
+import datetime
+
+import pytest
+
+from ceph.utils import datetime_now, datetime_to_str, str_to_datetime
+
+
+def test_datetime_to_str_1():
+ dt = datetime.datetime.now()
+ assert type(datetime_to_str(dt)) is str
+
+
+def test_datetime_to_str_2():
+ # note: tz isn't specified in the string, so explicitly store this as UTC
+ dt = datetime.datetime.strptime(
+ '2019-04-24T17:06:53.039991',
+ '%Y-%m-%dT%H:%M:%S.%f'
+ ).replace(tzinfo=datetime.timezone.utc)
+ assert datetime_to_str(dt) == '2019-04-24T17:06:53.039991Z'
+
+
+def test_datetime_to_str_3():
+ dt = datetime.datetime.strptime('2020-11-02T04:40:12.748172-0800',
+ '%Y-%m-%dT%H:%M:%S.%f%z')
+ assert datetime_to_str(dt) == '2020-11-02T12:40:12.748172Z'
+
+
+def test_str_to_datetime_1():
+ dt = str_to_datetime('2020-03-03T09:21:43.636153304Z')
+ assert type(dt) is datetime.datetime
+ assert dt.tzinfo is not None
+
+
+def test_str_to_datetime_2():
+ dt = str_to_datetime('2020-03-03T15:52:30.136257504-0600')
+ assert type(dt) is datetime.datetime
+ assert dt.tzinfo is not None
+
+
+def test_str_to_datetime_3():
+ dt = str_to_datetime('2020-03-03T15:52:30.136257504')
+ assert type(dt) is datetime.datetime
+ assert dt.tzinfo is not None
+
+
+def test_str_to_datetime_invalid_format_1():
+ with pytest.raises(ValueError):
+ str_to_datetime('2020-03-03 15:52:30.136257504')
+
+
+def test_str_to_datetime_invalid_format_2():
+ with pytest.raises(ValueError):
+ str_to_datetime('2020-03-03')
+
+
+def test_datetime_now_1():
+ dt = str_to_datetime('2020-03-03T09:21:43.636153304Z')
+ dt_now = datetime_now()
+ assert type(dt_now) is datetime.datetime
+ assert dt_now.tzinfo is not None
+ assert dt < dt_now
diff --git a/src/python-common/ceph/tests/test_disk_selector.py b/src/python-common/ceph/tests/test_disk_selector.py
new file mode 100644
index 000000000..b08236130
--- /dev/null
+++ b/src/python-common/ceph/tests/test_disk_selector.py
@@ -0,0 +1,560 @@
+# flake8: noqa
+import pytest
+
+from ceph.deployment.drive_selection.matchers import _MatchInvalid
+from ceph.deployment.inventory import Devices, Device
+
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \
+ DriveGroupValidationError
+
+from ceph.deployment import drive_selection
+from ceph.deployment.service_spec import PlacementSpec
+from ceph.tests.factories import InventoryFactory
+from ceph.tests.utils import _mk_inventory, _mk_device
+
+
+class TestMatcher(object):
+ """ Test Matcher base class
+ """
+
+ def test_get_disk_key_3(self):
+ """
+ virtual is False
+ key is found
+ retrun value of key is expected
+ """
+ disk_map = Device(path='/dev/vdb', sys_api={'foo': 'bar'})
+ ret = drive_selection.Matcher('foo', 'bar')._get_disk_key(disk_map)
+ assert ret is disk_map.sys_api.get('foo')
+
+ def test_get_disk_key_4(self):
+ """
+ virtual is False
+ key is not found
+ expect raise Exception
+ """
+ disk_map = Device(path='/dev/vdb')
+ with pytest.raises(Exception):
+ drive_selection.Matcher('bar', 'foo')._get_disk_key(disk_map)
+ pytest.fail("No disk_key found for foo or None")
+
+
+class TestSubstringMatcher(object):
+ def test_compare(self):
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(model='samsung'))
+ matcher = drive_selection.SubstringMatcher('model', 'samsung')
+ ret = matcher.compare(disk_dict)
+ assert ret is True
+
+ def test_compare_false(self):
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(model='nothing_matching'))
+ matcher = drive_selection.SubstringMatcher('model', 'samsung')
+ ret = matcher.compare(disk_dict)
+ assert ret is False
+
+
+class TestEqualityMatcher(object):
+ def test_compare(self):
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(rotates='1'))
+ matcher = drive_selection.EqualityMatcher('rotates', '1')
+ ret = matcher.compare(disk_dict)
+ assert ret is True
+
+ def test_compare_false(self):
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(rotates='1'))
+ matcher = drive_selection.EqualityMatcher('rotates', '0')
+ ret = matcher.compare(disk_dict)
+ assert ret is False
+
+
+class TestAllMatcher(object):
+ def test_compare(self):
+ disk_dict = Device(path='/dev/vdb')
+ matcher = drive_selection.AllMatcher('all', 'True')
+ ret = matcher.compare(disk_dict)
+ assert ret is True
+
+ def test_compare_value_not_true(self):
+ disk_dict = Device(path='/dev/vdb')
+ matcher = drive_selection.AllMatcher('all', 'False')
+ ret = matcher.compare(disk_dict)
+ assert ret is True
+
+
+class TestSizeMatcher(object):
+ def test_parse_filter_exact(self):
+ """ Testing exact notation with 20G """
+ matcher = drive_selection.SizeMatcher('size', '20G')
+ assert isinstance(matcher.exact, tuple)
+ assert matcher.exact[0] == '20'
+ assert matcher.exact[1] == 'GB'
+
+ def test_parse_filter_exact_GB_G(self):
+ """ Testing exact notation with 20G """
+ matcher = drive_selection.SizeMatcher('size', '20GB')
+ assert isinstance(matcher.exact, tuple)
+ assert matcher.exact[0] == '20'
+ assert matcher.exact[1] == 'GB'
+
+ def test_parse_filter_high_low(self):
+ """ Testing high-low notation with 20G:50G """
+
+ matcher = drive_selection.SizeMatcher('size', '20G:50G')
+ assert isinstance(matcher.exact, tuple)
+ assert matcher.low[0] == '20'
+ assert matcher.high[0] == '50'
+ assert matcher.low[1] == 'GB'
+ assert matcher.high[1] == 'GB'
+
+ def test_parse_filter_max_high(self):
+ """ Testing high notation with :50G """
+
+ matcher = drive_selection.SizeMatcher('size', ':50G')
+ assert isinstance(matcher.exact, tuple)
+ assert matcher.high[0] == '50'
+ assert matcher.high[1] == 'GB'
+
+ def test_parse_filter_min_low(self):
+ """ Testing low notation with 20G: """
+
+ matcher = drive_selection.SizeMatcher('size', '50G:')
+ assert isinstance(matcher.exact, tuple)
+ assert matcher.low[0] == '50'
+ assert matcher.low[1] == 'GB'
+
+ def test_to_byte_KB(self):
+ """ I doubt anyone ever thought we'd need to understand KB """
+
+ ret = drive_selection.SizeMatcher('size', '4K').to_byte(('4', 'KB'))
+ assert ret == 4 * 1e+3
+
+ def test_to_byte_GB(self):
+ """ Pretty nonesense test.."""
+
+ ret = drive_selection.SizeMatcher('size', '10G').to_byte(('10', 'GB'))
+ assert ret == 10 * 1e+9
+
+ def test_to_byte_MB(self):
+ """ Pretty nonesense test.."""
+
+ ret = drive_selection.SizeMatcher('size', '10M').to_byte(('10', 'MB'))
+ assert ret == 10 * 1e+6
+
+ def test_to_byte_TB(self):
+ """ Pretty nonesense test.."""
+
+ ret = drive_selection.SizeMatcher('size', '10T').to_byte(('10', 'TB'))
+ assert ret == 10 * 1e+12
+
+ def test_to_byte_PB(self):
+ """ Expect to raise """
+
+ with pytest.raises(_MatchInvalid):
+ drive_selection.SizeMatcher('size', '10P').to_byte(('10', 'PB'))
+ assert 'Unit \'P\' is not supported'
+
+ def test_compare_exact(self):
+
+ matcher = drive_selection.SizeMatcher('size', '20GB')
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.00 GB'))
+ ret = matcher.compare(disk_dict)
+ assert ret is True
+
+ def test_compare_exact_decimal(self):
+
+ matcher = drive_selection.SizeMatcher('size', '20.12GB')
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.12 GB'))
+ ret = matcher.compare(disk_dict)
+ assert ret is True
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("1.00 GB", False),
+ ("20.00 GB", True),
+ ("50.00 GB", True),
+ ("100.00 GB", True),
+ ("101.00 GB", False),
+ ("1101.00 GB", False),
+ ])
+ def test_compare_high_low(self, test_input, expected):
+
+ matcher = drive_selection.SizeMatcher('size', '20GB:100GB')
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input))
+ ret = matcher.compare(disk_dict)
+ assert ret is expected
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("1.00 GB", True),
+ ("20.00 GB", True),
+ ("50.00 GB", True),
+ ("100.00 GB", False),
+ ("101.00 GB", False),
+ ("1101.00 GB", False),
+ ])
+ def test_compare_high(self, test_input, expected):
+
+ matcher = drive_selection.SizeMatcher('size', ':50GB')
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input))
+ ret = matcher.compare(disk_dict)
+ assert ret is expected
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("1.00 GB", False),
+ ("20.00 GB", False),
+ ("50.00 GB", True),
+ ("100.00 GB", True),
+ ("101.00 GB", True),
+ ("1101.00 GB", True),
+ ])
+ def test_compare_low(self, test_input, expected):
+
+ matcher = drive_selection.SizeMatcher('size', '50GB:')
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(size=test_input))
+ ret = matcher.compare(disk_dict)
+ assert ret is expected
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("1.00 GB", False),
+ ("20.00 GB", False),
+ ("50.00 GB", False),
+ ("100.00 GB", False),
+ ("101.00 GB", False),
+ ("1101.00 GB", True),
+ ("9.10 TB", True),
+ ])
+ def test_compare_at_least_1TB(self, test_input, expected):
+
+ matcher = drive_selection.SizeMatcher('size', '1TB:')
+ disk_dict = Device(path='/dev/sdz', sys_api=dict(size=test_input))
+ ret = matcher.compare(disk_dict)
+ assert ret is expected
+
+ def test_compare_raise(self):
+
+ matcher = drive_selection.SizeMatcher('size', 'None')
+ disk_dict = Device(path='/dev/vdb', sys_api=dict(size='20.00 GB'))
+ with pytest.raises(Exception):
+ matcher.compare(disk_dict)
+ pytest.fail("Couldn't parse size")
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("10G", ('10', 'GB')),
+ ("20GB", ('20', 'GB')),
+ ("10g", ('10', 'GB')),
+ ("20gb", ('20', 'GB')),
+ ])
+ def test_get_k_v(self, test_input, expected):
+ assert drive_selection.SizeMatcher('size', '10G')._get_k_v(test_input) == expected
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("10G", ('GB')),
+ ("10g", ('GB')),
+ ("20GB", ('GB')),
+ ("20gb", ('GB')),
+ ("20TB", ('TB')),
+ ("20tb", ('TB')),
+ ("20T", ('TB')),
+ ("20t", ('TB')),
+ ("20MB", ('MB')),
+ ("20mb", ('MB')),
+ ("20M", ('MB')),
+ ("20m", ('MB')),
+ ])
+ def test_parse_suffix(self, test_input, expected):
+ assert drive_selection.SizeMatcher('size', '10G')._parse_suffix(test_input) == expected
+
+ @pytest.mark.parametrize("test_input,expected", [
+ ("G", 'GB'),
+ ("GB", 'GB'),
+ ("TB", 'TB'),
+ ("T", 'TB'),
+ ("MB", 'MB'),
+ ("M", 'MB'),
+ ])
+ def test_normalize_suffix(self, test_input, expected):
+
+ assert drive_selection.SizeMatcher('10G', 'size')._normalize_suffix(test_input) == expected
+
+ def test_normalize_suffix_raises(self):
+
+ with pytest.raises(_MatchInvalid):
+ drive_selection.SizeMatcher('10P', 'size')._normalize_suffix("P")
+ pytest.fail("Unit 'P' not supported")
+
+
+class TestDriveGroup(object):
+ @pytest.fixture(scope='class')
+ def test_fix(self, empty=None):
+ def make_sample_data(empty=empty,
+ data_limit=0,
+ wal_limit=0,
+ db_limit=0,
+ osds_per_device='',
+ disk_format='bluestore'):
+ raw_sample_bluestore = {
+ 'service_type': 'osd',
+ 'service_id': 'foo',
+ 'placement': {'host_pattern': 'data*'},
+ 'data_devices': {
+ 'size': '30G:50G',
+ 'model': '42-RGB',
+ 'vendor': 'samsung',
+ 'limit': data_limit
+ },
+ 'wal_devices': {
+ 'model': 'fast',
+ 'limit': wal_limit
+ },
+ 'db_devices': {
+ 'size': ':20G',
+ 'limit': db_limit
+ },
+ 'db_slots': 5,
+ 'wal_slots': 5,
+ 'block_wal_size': '5G',
+ 'block_db_size': '10G',
+ 'objectstore': disk_format,
+ 'osds_per_device': osds_per_device,
+ 'encrypted': True,
+ }
+ raw_sample_filestore = {
+ 'service_type': 'osd',
+ 'service_id': 'foo',
+ 'placement': {'host_pattern': 'data*'},
+ 'objectstore': 'filestore',
+ 'data_devices': {
+ 'size': '30G:50G',
+ 'model': 'foo',
+ 'vendor': '1x',
+ 'limit': data_limit
+ },
+ 'journal_devices': {
+ 'size': ':20G'
+ },
+ 'journal_size': '5G',
+ 'osds_per_device': osds_per_device,
+ 'encrypted': True,
+ }
+ if disk_format == 'filestore':
+ raw_sample = raw_sample_filestore
+ else:
+ raw_sample = raw_sample_bluestore
+
+ if empty:
+ raw_sample = {
+ 'service_type': 'osd',
+ 'service_id': 'foo',
+ 'placement': {'host_pattern': 'data*'},
+ 'data_devices': {
+ 'all': True
+ },
+ }
+
+ dgo = DriveGroupSpec.from_json(raw_sample)
+ return dgo
+
+ return make_sample_data
+
+ def test_encryption_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.encrypted is True
+
+ def test_encryption_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.encrypted is False
+
+ def test_db_slots_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.db_slots == 5
+
+ def test_db_slots_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.db_slots is None
+
+ def test_wal_slots_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.wal_slots == 5
+
+ def test_wal_slots_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.wal_slots is None
+
+ def test_block_wal_size_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.block_wal_size == '5G'
+
+ def test_block_wal_size_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.block_wal_size is None
+
+ def test_block_db_size_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.block_db_size == '10G'
+
+ def test_block_db_size_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.block_db_size is None
+
+ def test_data_devices_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.data_devices == DeviceSelection(
+ model='42-RGB',
+ size='30G:50G',
+ vendor='samsung',
+ limit=0,
+ )
+
+ def test_data_devices_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.db_devices is None
+
+ def test_db_devices_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.db_devices == DeviceSelection(
+ size=':20G',
+ limit=0,
+ )
+
+ def test_db_devices_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.db_devices is None
+
+ def test_wal_device_prop(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.wal_devices == DeviceSelection(
+ model='fast',
+ limit=0,
+ )
+
+ def test_wal_device_prop_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.wal_devices is None
+
+ def test_bluestore_format_prop(self, test_fix):
+ test_fix = test_fix(disk_format='bluestore')
+ assert test_fix.objectstore == 'bluestore'
+
+ def test_default_format_prop(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.objectstore == 'bluestore'
+
+ def test_osds_per_device(self, test_fix):
+ test_fix = test_fix(osds_per_device='3')
+ assert test_fix.osds_per_device == '3'
+
+ def test_osds_per_device_default(self, test_fix):
+ test_fix = test_fix()
+ assert test_fix.osds_per_device == ''
+
+ def test_journal_size_empty(self, test_fix):
+ test_fix = test_fix(empty=True)
+ assert test_fix.journal_size is None
+
+ @pytest.fixture
+ def inventory(self, available=True):
+ def make_sample_data(available=available,
+ data_devices=10,
+ wal_devices=0,
+ db_devices=2,
+ human_readable_size_data='50.00 GB',
+ human_readable_size_wal='20.00 GB',
+ size=5368709121,
+ human_readable_size_db='20.00 GB'):
+ factory = InventoryFactory()
+ inventory_sample = []
+ data_disks = factory.produce(
+ pieces=data_devices,
+ available=available,
+ size=size,
+ human_readable_size=human_readable_size_data)
+ wal_disks = factory.produce(
+ pieces=wal_devices,
+ human_readable_size=human_readable_size_wal,
+ rotational='0',
+ model='ssd_type_model',
+ size=size,
+ available=available)
+ db_disks = factory.produce(
+ pieces=db_devices,
+ human_readable_size=human_readable_size_db,
+ rotational='0',
+ size=size,
+ model='ssd_type_model',
+ available=available)
+ inventory_sample.extend(data_disks)
+ inventory_sample.extend(wal_disks)
+ inventory_sample.extend(db_disks)
+
+ return Devices(devices=inventory_sample)
+
+ return make_sample_data
+
+
+class TestDriveSelection(object):
+
+ testdata = [
+ (
+ DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(all=True)),
+ _mk_inventory(_mk_device() * 5),
+ ['/dev/sda', '/dev/sdb', '/dev/sdc', '/dev/sdd', '/dev/sde'], []
+ ),
+ (
+ DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(all=True, limit=3),
+ db_devices=DeviceSelection(all=True)
+ ),
+ _mk_inventory(_mk_device() * 5),
+ ['/dev/sda', '/dev/sdb', '/dev/sdc'], ['/dev/sdd', '/dev/sde']
+ ),
+ (
+ DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False)
+ ),
+ _mk_inventory(_mk_device(rotational=False) + _mk_device(rotational=True)),
+ ['/dev/sdb'], ['/dev/sda']
+ ),
+ (
+ DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False)
+ ),
+ _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)),
+ ['/dev/sda', '/dev/sdb'], ['/dev/sdc']
+ ),
+ (
+ DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False)
+ ),
+ _mk_inventory(_mk_device(rotational=True)*2),
+ ['/dev/sda', '/dev/sdb'], []
+ ),
+ ]
+
+ @pytest.mark.parametrize("spec,inventory,expected_data,expected_db", testdata)
+ def test_disk_selection(self, spec, inventory, expected_data, expected_db):
+ sel = drive_selection.DriveSelection(spec, inventory)
+ assert [d.path for d in sel.data_devices()] == expected_data
+ assert [d.path for d in sel.db_devices()] == expected_db
+
+ def test_disk_selection_raise(self):
+ spec = DriveGroupSpec(
+ placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(size='wrong'),
+ )
+ inventory = _mk_inventory(_mk_device(rotational=True)*2)
+ m = 'Failed to validate OSD spec "foobar.data_devices": No filters applied'
+ with pytest.raises(DriveGroupValidationError, match=m):
+ drive_selection.DriveSelection(spec, inventory) \ No newline at end of file
diff --git a/src/python-common/ceph/tests/test_drive_group.py b/src/python-common/ceph/tests/test_drive_group.py
new file mode 100644
index 000000000..77e9b4083
--- /dev/null
+++ b/src/python-common/ceph/tests/test_drive_group.py
@@ -0,0 +1,592 @@
+# flake8: noqa
+import re
+
+import pytest
+import yaml
+
+from ceph.deployment import drive_selection, translate
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
+from ceph.deployment.inventory import Device
+from ceph.deployment.service_spec import PlacementSpec
+from ceph.tests.utils import _mk_inventory, _mk_device
+from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, \
+ DriveGroupValidationError
+
+@pytest.mark.parametrize("test_input",
+[
+ ( # new style json
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - /dev/sda
+"""
+ ),
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: ssd"""
+ ),
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+spec:
+ osds_per_device: 2
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd"""
+ ),
+])
+def test_DriveGroup(test_input):
+
+ dg = DriveGroupSpec.from_json(yaml.safe_load(test_input))
+ assert dg.service_id == 'testing_drivegroup'
+ assert all([isinstance(x, Device) for x in dg.data_devices.paths])
+ if isinstance(dg.data_devices.paths[0].path, str):
+ assert dg.data_devices.paths[0].path == '/dev/sda'
+
+
+
+@pytest.mark.parametrize("match,test_input",
+[
+ (
+ re.escape('Service Spec is not an (JSON or YAML) object. got "None"'),
+ ''
+ ),
+ (
+ 'Failed to validate OSD spec "<unnamed>": `placement` required',
+ """data_devices:
+ all: True
+"""
+ ),
+ (
+ 'Failed to validate OSD spec "mydg.data_devices": device selection cannot be empty', """
+service_type: osd
+service_id: mydg
+placement:
+ host_pattern: '*'
+data_devices:
+ limit: 1
+"""
+ ),
+ (
+ 'Failed to validate OSD spec "mydg": filter_logic must be either <AND> or <OR>', """
+service_type: osd
+service_id: mydg
+placement:
+ host_pattern: '*'
+data_devices:
+ all: True
+filter_logic: XOR
+"""
+ ),
+ (
+ 'Failed to validate OSD spec "mydg": `data_devices` element is required.', """
+service_type: osd
+service_id: mydg
+placement:
+ host_pattern: '*'
+spec:
+ db_devices:
+ model: model
+"""
+ ),
+ (
+ 'Failed to validate OSD spec "mydg.db_devices": Filtering for `unknown_key` is not supported', """
+service_type: osd
+service_id: mydg
+placement:
+ host_pattern: '*'
+spec:
+ db_devices:
+ unknown_key: 1
+"""
+ ),
+ (
+ 'Failed to validate OSD spec "mydg": Feature `unknown_key` is not supported', """
+service_type: osd
+service_id: mydg
+placement:
+ host_pattern: '*'
+spec:
+ db_devices:
+ all: true
+ unknown_key: 1
+"""
+ ),
+])
+def test_DriveGroup_fail(match, test_input):
+ with pytest.raises(SpecValidationError, match=match):
+ osd_spec = DriveGroupSpec.from_json(yaml.safe_load(test_input))
+ osd_spec.validate()
+
+
+def test_drivegroup_pattern():
+ dg = DriveGroupSpec(
+ PlacementSpec(host_pattern='node[1-3]'),
+ service_id='foobar',
+ data_devices=DeviceSelection(all=True))
+ assert dg.placement.filter_matching_hostspecs([HostSpec('node{}'.format(i)) for i in range(10)]) == ['node1', 'node2', 'node3']
+
+
+def test_drive_selection():
+ devs = DeviceSelection(paths=['/dev/sda'])
+ spec = DriveGroupSpec(
+ PlacementSpec('node_name'),
+ service_id='foobar',
+ data_devices=devs)
+ assert all([isinstance(x, Device) for x in spec.data_devices.paths])
+ assert spec.data_devices.paths[0].path == '/dev/sda'
+
+ with pytest.raises(DriveGroupValidationError, match='exclusive'):
+ ds = DeviceSelection(paths=['/dev/sda'], rotational=False)
+ ds.validate('')
+
+
+def test_ceph_volume_command_0():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(all=True)
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device()*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_1():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False)
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_2():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+ db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+ wal_devices=DeviceSelection(size='10G')
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 +
+ _mk_device(rotational=False, size="300.00 GB")*2 +
+ _mk_device(size="10.0 GB", rotational=False)*2
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
+ '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_3():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+ db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+ wal_devices=DeviceSelection(size='10G'),
+ encrypted=True
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 +
+ _mk_device(rotational=False, size="300.00 GB")*2 +
+ _mk_device(size="10.0 GB", rotational=False)*2
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd '
+ '--wal-devices /dev/sde /dev/sdf --dmcrypt '
+ '--yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_4():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(size='200GB:350GB', rotational=True),
+ db_devices=DeviceSelection(size='200GB:350GB', rotational=False),
+ wal_devices=DeviceSelection(size='10G'),
+ block_db_size='500M',
+ block_wal_size='500M',
+ osds_per_device=3,
+ encrypted=True
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True, size="300.00 GB")*2 +
+ _mk_device(rotational=False, size="300.00 GB")*2 +
+ _mk_device(size="10.0 GB", rotational=False)*2
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sda /dev/sdb '
+ '--db-devices /dev/sdc /dev/sdd --wal-devices /dev/sde /dev/sdf '
+ '--block-wal-size 500M --block-db-size 500M --dmcrypt '
+ '--osds-per-device 3 --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_5():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ objectstore='filestore'
+ )
+ with pytest.raises(DriveGroupValidationError):
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --filestore --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_6():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=False),
+ journal_devices=DeviceSelection(rotational=True),
+ journal_size='500M',
+ objectstore='filestore'
+ )
+ with pytest.raises(DriveGroupValidationError):
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True)*2 + _mk_device(rotational=False)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == ('lvm batch --no-auto /dev/sdc /dev/sdd '
+ '--journal-size 500M --journal-devices /dev/sda /dev/sdb '
+ '--filestore --yes --no-systemd') for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_7():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(all=True),
+ osd_id_claims={'host1': ['0', '1']}
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True)*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, ['0', '1']).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --osd-ids 0 1 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_8():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True, model='INTEL SSDS'),
+ db_devices=DeviceSelection(model='INTEL SSDP'),
+ filter_logic='OR',
+ osd_id_claims={}
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True, size='1.82 TB', model='ST2000DM001-1ER1') + # data
+ _mk_device(rotational=False, size="223.0 GB", model='INTEL SSDSC2KG24') + # data
+ _mk_device(rotational=False, size="349.0 GB", model='INTEL SSDPED1K375GA') # wal/db
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --db-devices /dev/sdc --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+def test_ceph_volume_command_9():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(all=True),
+ data_allocate_fraction=0.8
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device()*2)
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --data-allocate-fraction 0.8 --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+@pytest.mark.parametrize("test_input_base",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - /dev/sda
+"""
+ ),
+ ])
+def test_ceph_volume_command_10(test_input_base):
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input_base))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert all(cmd == 'lvm batch --no-auto /dev/sda --crush-device-class ssd --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+@pytest.mark.parametrize("test_input1",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: hdd
+"""
+ ),
+ ])
+def test_ceph_volume_command_11(test_input1):
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input1))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert all(cmd == 'lvm batch --no-auto /dev/sda /dev/sdb --crush-device-class hdd --yes --no-systemd' for cmd in cmds), f'Expected {cmd} in {cmds}'
+
+
+@pytest.mark.parametrize("test_input2",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+crush_device_class: ssd
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+"""
+ ),
+ ])
+def test_ceph_volume_command_12(test_input2):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input2))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert (cmds[0] == 'lvm batch --no-auto /dev/sdb --crush-device-class ssd --yes --no-systemd') # noqa E501
+ assert (cmds[1] == 'lvm batch --no-auto /dev/sda --crush-device-class hdd --yes --no-systemd') # noqa E501
+
+
+@pytest.mark.parametrize("test_input3",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+"""
+ ),
+ ])
+def test_ceph_volume_command_13(test_input3):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input3))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert (cmds[0] == 'lvm batch --no-auto /dev/sdb --yes --no-systemd') # noqa E501
+ assert (cmds[1] == 'lvm batch --no-auto /dev/sda --crush-device-class hdd --yes --no-systemd') # noqa E501
+
+
+@pytest.mark.parametrize("test_input4",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+data_devices:
+ paths:
+ - crush_device_class: hdd
+"""
+ ),
+ ])
+def test_ceph_volume_command_14(test_input4):
+
+ with pytest.raises(DriveGroupValidationError, match='Device path'):
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input4))
+ spec.validate()
+
+
+def test_raw_ceph_volume_command_0():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False),
+ method='raw',
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True) + # data
+ _mk_device(rotational=True) + # data
+ _mk_device(rotational=False) + # db
+ _mk_device(rotational=False) # db
+ )
+ exp_cmds = ['raw prepare --bluestore --data /dev/sda --block.db /dev/sdc', 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdd']
+ sel = drive_selection.DriveSelection(spec, inventory)
+ cmds = translate.to_ceph_volume(sel, []).run()
+ assert all(cmd in exp_cmds for cmd in cmds), f'Expected {exp_cmds} to match {cmds}'
+
+def test_raw_ceph_volume_command_1():
+ spec = DriveGroupSpec(placement=PlacementSpec(host_pattern='*'),
+ service_id='foobar',
+ data_devices=DeviceSelection(rotational=True),
+ db_devices=DeviceSelection(rotational=False),
+ method='raw',
+ )
+ spec.validate()
+ inventory = _mk_inventory(_mk_device(rotational=True) + # data
+ _mk_device(rotational=True) + # data
+ _mk_device(rotational=False) # db
+ )
+ sel = drive_selection.DriveSelection(spec, inventory)
+ with pytest.raises(ValueError):
+ cmds = translate.to_ceph_volume(sel, []).run()
+
+@pytest.mark.parametrize("test_input5",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+method: raw
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: hdd
+ - path: /dev/sdc
+ crush_device_class: hdd
+db_devices:
+ paths:
+ - /dev/sdd
+ - /dev/sde
+ - /dev/sdf
+
+"""
+ ),
+ ])
+def test_raw_ceph_volume_command_2(test_input5):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input5))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --crush-device-class hdd'
+ assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sde --crush-device-class hdd'
+ assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sdf --crush-device-class hdd'
+
+
+@pytest.mark.parametrize("test_input6",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+method: raw
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: hdd
+ - path: /dev/sdc
+ crush_device_class: ssd
+db_devices:
+ paths:
+ - /dev/sdd
+ - /dev/sde
+ - /dev/sdf
+
+"""
+ ),
+ ])
+def test_raw_ceph_volume_command_3(test_input6):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input6))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --crush-device-class hdd'
+ assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sde --crush-device-class hdd'
+ assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sdf --crush-device-class ssd'
+
+
+@pytest.mark.parametrize("test_input7",
+[
+ (
+ """service_type: osd
+service_id: testing_drivegroup
+placement:
+ host_pattern: hostname
+method: raw
+data_devices:
+ paths:
+ - path: /dev/sda
+ crush_device_class: hdd
+ - path: /dev/sdb
+ crush_device_class: nvme
+ - path: /dev/sdc
+ crush_device_class: ssd
+db_devices:
+ paths:
+ - /dev/sdd
+ - /dev/sde
+ - /dev/sdf
+wal_devices:
+ paths:
+ - /dev/sdg
+ - /dev/sdh
+ - /dev/sdi
+
+"""
+ ),
+ ])
+def test_raw_ceph_volume_command_4(test_input7):
+
+ spec = DriveGroupSpec.from_json(yaml.safe_load(test_input7))
+ spec.validate()
+ drive = drive_selection.DriveSelection(spec, spec.data_devices.paths)
+ cmds = translate.to_ceph_volume(drive, []).run()
+
+ assert cmds[0] == 'raw prepare --bluestore --data /dev/sda --block.db /dev/sdd --block.wal /dev/sdg --crush-device-class hdd'
+ assert cmds[1] == 'raw prepare --bluestore --data /dev/sdb --block.db /dev/sdf --block.wal /dev/sdi --crush-device-class nvme'
+ assert cmds[2] == 'raw prepare --bluestore --data /dev/sdc --block.db /dev/sde --block.wal /dev/sdh --crush-device-class ssd'
diff --git a/src/python-common/ceph/tests/test_hostspec.py b/src/python-common/ceph/tests/test_hostspec.py
new file mode 100644
index 000000000..b6817579e
--- /dev/null
+++ b/src/python-common/ceph/tests/test_hostspec.py
@@ -0,0 +1,40 @@
+# flake8: noqa
+import json
+import yaml
+
+import pytest
+
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
+
+
+@pytest.mark.parametrize(
+ "test_input,expected",
+ [
+ ({"hostname": "foo"}, HostSpec('foo')),
+ ({"hostname": "foo", "labels": "l1"}, HostSpec('foo', labels=['l1'])),
+ ({"hostname": "foo", "labels": ["l1", "l2"]}, HostSpec('foo', labels=['l1', 'l2'])),
+ ({"hostname": "foo", "location": {"rack": "foo"}}, HostSpec('foo', location={'rack': 'foo'})),
+ ]
+)
+def test_parse_host_specs(test_input, expected):
+ hs = HostSpec.from_json(test_input)
+ assert hs == expected
+
+
+@pytest.mark.parametrize(
+ "bad_input",
+ [
+ ({"hostname": "foo", "labels": 124}),
+ ({"hostname": "foo", "labels": {"a", "b"}}),
+ ({"hostname": "foo", "labels": {"a", "b"}}),
+ ({"hostname": "foo", "labels": ["a", 2]}),
+ ({"hostname": "foo", "location": "rack=bar"}),
+ ({"hostname": "foo", "location": ["a"]}),
+ ({"hostname": "foo", "location": {"rack", 1}}),
+ ({"hostname": "foo", "location": {1: "rack"}}),
+ ]
+)
+def test_parse_host_specs(bad_input):
+ with pytest.raises(SpecValidationError):
+ hs = HostSpec.from_json(bad_input)
+
diff --git a/src/python-common/ceph/tests/test_inventory.py b/src/python-common/ceph/tests/test_inventory.py
new file mode 100644
index 000000000..2d916fad2
--- /dev/null
+++ b/src/python-common/ceph/tests/test_inventory.py
@@ -0,0 +1,71 @@
+import datetime
+import json
+import os
+import pytest
+
+from ceph.deployment.inventory import Devices, Device
+from ceph.utils import datetime_now
+
+
+@pytest.mark.parametrize("filename",
+ [
+ os.path.dirname(__file__) + '/c-v-inventory.json',
+ os.path.dirname(__file__) + '/../../../pybind/mgr/test_orchestrator/du'
+ 'mmy_data.json',
+ ])
+def test_from_json(filename):
+ with open(filename) as f:
+ data = json.load(f)
+ if 'inventory' in data:
+ data = data['inventory']
+ ds = Devices.from_json(data)
+ assert len(ds.devices) == len(data)
+ assert Devices.from_json(ds.to_json()) == ds
+
+
+class TestDevicesEquality():
+ created_time1 = datetime_now()
+ created_time2 = created_time1 + datetime.timedelta(seconds=30)
+
+ @pytest.mark.parametrize(
+ "old_devices, new_devices, expected_equal",
+ [
+ ( # identical list should be equal
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ True,
+ ),
+ ( # differing only in created time should still be equal
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ Devices([Device('/dev/sdb', available=True, created=created_time2),
+ Device('/dev/sdc', available=True, created=created_time2)]),
+ True,
+ ),
+ ( # differing in some other field should make them not equal
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ Devices([Device('/dev/sdb', available=False, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ False,
+ ),
+ ( # different amount of devices should not pass equality
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1),
+ Device('/dev/sdd', available=True, created=created_time1)]),
+ False,
+ ),
+ ( # differing order should not affect equality
+ Devices([Device('/dev/sdb', available=True, created=created_time1),
+ Device('/dev/sdc', available=True, created=created_time1)]),
+ Devices([Device('/dev/sdc', available=True, created=created_time1),
+ Device('/dev/sdb', available=True, created=created_time1)]),
+ True,
+ ),
+ ])
+ def test_equality(self, old_devices, new_devices, expected_equal):
+ assert (old_devices == new_devices) == expected_equal
diff --git a/src/python-common/ceph/tests/test_service_spec.py b/src/python-common/ceph/tests/test_service_spec.py
new file mode 100644
index 000000000..502057f5c
--- /dev/null
+++ b/src/python-common/ceph/tests/test_service_spec.py
@@ -0,0 +1,1270 @@
+# flake8: noqa
+import json
+import re
+
+import yaml
+
+import pytest
+
+from ceph.deployment.service_spec import (
+ AlertManagerSpec,
+ ArgumentSpec,
+ CustomContainerSpec,
+ GrafanaSpec,
+ HostPlacementSpec,
+ IscsiServiceSpec,
+ NFSServiceSpec,
+ PlacementSpec,
+ PrometheusSpec,
+ RGWSpec,
+ ServiceSpec,
+)
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.hostspec import SpecValidationError
+
+
+@pytest.mark.parametrize("test_input,expected, require_network",
+ [("myhost", ('myhost', '', ''), False),
+ ("myhost=sname", ('myhost', '', 'sname'), False),
+ ("myhost:10.1.1.10", ('myhost', '10.1.1.10', ''), True),
+ ("myhost:10.1.1.10=sname", ('myhost', '10.1.1.10', 'sname'), True),
+ ("myhost:10.1.1.0/32", ('myhost', '10.1.1.0/32', ''), True),
+ ("myhost:10.1.1.0/32=sname", ('myhost', '10.1.1.0/32', 'sname'), True),
+ ("myhost:[v1:10.1.1.10:6789]", ('myhost', '[v1:10.1.1.10:6789]', ''), True),
+ ("myhost:[v1:10.1.1.10:6789]=sname", ('myhost', '[v1:10.1.1.10:6789]', 'sname'), True),
+ ("myhost:[v1:10.1.1.10:6789,v2:10.1.1.11:3000]", ('myhost', '[v1:10.1.1.10:6789,v2:10.1.1.11:3000]', ''), True),
+ ("myhost:[v1:10.1.1.10:6789,v2:10.1.1.11:3000]=sname", ('myhost', '[v1:10.1.1.10:6789,v2:10.1.1.11:3000]', 'sname'), True),
+ ])
+def test_parse_host_placement_specs(test_input, expected, require_network):
+ ret = HostPlacementSpec.parse(test_input, require_network=require_network)
+ assert ret == expected
+ assert str(ret) == test_input
+
+ ps = PlacementSpec.from_string(test_input)
+ assert ps.pretty_str() == test_input
+ assert ps == PlacementSpec.from_string(ps.pretty_str())
+
+ # Testing the old verbose way of generating json. Don't remove:
+ assert ret == HostPlacementSpec.from_json({
+ 'hostname': ret.hostname,
+ 'network': ret.network,
+ 'name': ret.name
+ })
+
+ assert ret == HostPlacementSpec.from_json(ret.to_json())
+
+
+@pytest.mark.parametrize(
+ "spec, raise_exception, msg",
+ [
+ (GrafanaSpec(protocol=''), True, '^Invalid protocol'),
+ (GrafanaSpec(protocol='invalid'), True, '^Invalid protocol'),
+ (GrafanaSpec(protocol='-http'), True, '^Invalid protocol'),
+ (GrafanaSpec(protocol='-https'), True, '^Invalid protocol'),
+ (GrafanaSpec(protocol='http'), False, ''),
+ (GrafanaSpec(protocol='https'), False, ''),
+ (GrafanaSpec(anonymous_access=False), True, '^Either initial'), # we require inital_admin_password if anonymous_access is False
+ (GrafanaSpec(anonymous_access=False, initial_admin_password='test'), False, ''),
+ ])
+def test_apply_grafana(spec: GrafanaSpec, raise_exception: bool, msg: str):
+ if raise_exception:
+ with pytest.raises(SpecValidationError, match=msg):
+ spec.validate()
+ else:
+ spec.validate()
+
+@pytest.mark.parametrize(
+ "spec, raise_exception, msg",
+ [
+ # Valid retention_time values (valid units: 'y', 'w', 'd', 'h', 'm', 's')
+ (PrometheusSpec(retention_time='1y'), False, ''),
+ (PrometheusSpec(retention_time=' 10w '), False, ''),
+ (PrometheusSpec(retention_time=' 1348d'), False, ''),
+ (PrometheusSpec(retention_time='2000h '), False, ''),
+ (PrometheusSpec(retention_time='173847m'), False, ''),
+ (PrometheusSpec(retention_time='200s'), False, ''),
+ (PrometheusSpec(retention_time=' '), False, ''), # default value will be used
+ # Invalid retention_time values
+ (PrometheusSpec(retention_time='100k'), True, '^Invalid retention time'), # invalid unit
+ (PrometheusSpec(retention_time='10'), True, '^Invalid retention time'), # no unit
+ (PrometheusSpec(retention_time='100.00y'), True, '^Invalid retention time'), # invalid value and valid unit
+ (PrometheusSpec(retention_time='100.00k'), True, '^Invalid retention time'), # invalid value and invalid unit
+ (PrometheusSpec(retention_time='---'), True, '^Invalid retention time'), # invalid value
+
+ # Valid retention_size values (valid units: 'B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB')
+ (PrometheusSpec(retention_size='123456789B'), False, ''),
+ (PrometheusSpec(retention_size=' 200KB'), False, ''),
+ (PrometheusSpec(retention_size='99999MB '), False, ''),
+ (PrometheusSpec(retention_size=' 10GB '), False, ''),
+ (PrometheusSpec(retention_size='100TB'), False, ''),
+ (PrometheusSpec(retention_size='500PB'), False, ''),
+ (PrometheusSpec(retention_size='200EB'), False, ''),
+ (PrometheusSpec(retention_size=' '), False, ''), # default value will be used
+
+ # Invalid retention_size values
+ (PrometheusSpec(retention_size='100b'), True, '^Invalid retention size'), # invalid unit (case sensitive)
+ (PrometheusSpec(retention_size='333kb'), True, '^Invalid retention size'), # invalid unit (case sensitive)
+ (PrometheusSpec(retention_size='2000'), True, '^Invalid retention size'), # no unit
+ (PrometheusSpec(retention_size='200.00PB'), True, '^Invalid retention size'), # invalid value and valid unit
+ (PrometheusSpec(retention_size='400.B'), True, '^Invalid retention size'), # invalid value and invalid unit
+ (PrometheusSpec(retention_size='10.000s'), True, '^Invalid retention size'), # invalid value and invalid unit
+ (PrometheusSpec(retention_size='...'), True, '^Invalid retention size'), # invalid value
+
+ # valid retention_size and valid retention_time
+ (PrometheusSpec(retention_time='1y', retention_size='100GB'), False, ''),
+ # invalid retention_time and valid retention_size
+ (PrometheusSpec(retention_time='1j', retention_size='100GB'), True, '^Invalid retention time'),
+ # valid retention_time and invalid retention_size
+ (PrometheusSpec(retention_time='1y', retention_size='100gb'), True, '^Invalid retention size'),
+ # valid retention_time and invalid retention_size
+ (PrometheusSpec(retention_time='1y', retention_size='100gb'), True, '^Invalid retention size'),
+ # invalid retention_time and invalid retention_size
+ (PrometheusSpec(retention_time='1i', retention_size='100gb'), True, '^Invalid retention time'),
+ ])
+def test_apply_prometheus(spec: PrometheusSpec, raise_exception: bool, msg: str):
+ if raise_exception:
+ with pytest.raises(SpecValidationError, match=msg):
+ spec.validate()
+ else:
+ spec.validate()
+
+@pytest.mark.parametrize(
+ "test_input,expected",
+ [
+ ('', "PlacementSpec()"),
+ ("count:2", "PlacementSpec(count=2)"),
+ ("3", "PlacementSpec(count=3)"),
+ ("host1 host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"),
+ ("host1;host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"),
+ ("host1,host2", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"),
+ ("host1 host2=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='b')])"),
+ ("host1=a host2=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='', name='a'), HostPlacementSpec(hostname='host2', network='', name='b')])"),
+ ("host1:1.2.3.4=a host2:1.2.3.5=b", "PlacementSpec(hosts=[HostPlacementSpec(hostname='host1', network='1.2.3.4', name='a'), HostPlacementSpec(hostname='host2', network='1.2.3.5', name='b')])"),
+ ("myhost:[v1:10.1.1.10:6789]", "PlacementSpec(hosts=[HostPlacementSpec(hostname='myhost', network='[v1:10.1.1.10:6789]', name='')])"),
+ ('2 host1 host2', "PlacementSpec(count=2, hosts=[HostPlacementSpec(hostname='host1', network='', name=''), HostPlacementSpec(hostname='host2', network='', name='')])"),
+ ('label:foo', "PlacementSpec(label='foo')"),
+ ('3 label:foo', "PlacementSpec(count=3, label='foo')"),
+ ('*', "PlacementSpec(host_pattern='*')"),
+ ('3 data[1-3]', "PlacementSpec(count=3, host_pattern='data[1-3]')"),
+ ('3 data?', "PlacementSpec(count=3, host_pattern='data?')"),
+ ('3 data*', "PlacementSpec(count=3, host_pattern='data*')"),
+ ("count-per-host:4 label:foo", "PlacementSpec(count_per_host=4, label='foo')"),
+ ])
+def test_parse_placement_specs(test_input, expected):
+ ret = PlacementSpec.from_string(test_input)
+ assert str(ret) == expected
+ assert PlacementSpec.from_string(ret.pretty_str()) == ret, f'"{ret.pretty_str()}" != "{test_input}"'
+
+@pytest.mark.parametrize(
+ "test_input",
+ [
+ ("host=a host*"),
+ ("host=a label:wrong"),
+ ("host? host*"),
+ ('host=a count-per-host:0'),
+ ('host=a count-per-host:-10'),
+ ('count:2 count-per-host:1'),
+ ('host1=a host2=b count-per-host:2'),
+ ('host1:10/8 count-per-host:2'),
+ ('count-per-host:2'),
+ ]
+)
+def test_parse_placement_specs_raises(test_input):
+ with pytest.raises(SpecValidationError):
+ PlacementSpec.from_string(test_input)
+
+@pytest.mark.parametrize("test_input",
+ # wrong subnet
+ [("myhost:1.1.1.1/24"),
+ # wrong ip format
+ ("myhost:1"),
+ ])
+def test_parse_host_placement_specs_raises_wrong_format(test_input):
+ with pytest.raises(ValueError):
+ HostPlacementSpec.parse(test_input)
+
+
+@pytest.mark.parametrize(
+ "p,hosts,size",
+ [
+ (
+ PlacementSpec(count=3),
+ ['host1', 'host2', 'host3', 'host4', 'host5'],
+ 3
+ ),
+ (
+ PlacementSpec(host_pattern='*'),
+ ['host1', 'host2', 'host3', 'host4', 'host5'],
+ 5
+ ),
+ (
+ PlacementSpec(count_per_host=2, host_pattern='*'),
+ ['host1', 'host2', 'host3', 'host4', 'host5'],
+ 10
+ ),
+ (
+ PlacementSpec(host_pattern='foo*'),
+ ['foo1', 'foo2', 'bar1', 'bar2'],
+ 2
+ ),
+ (
+ PlacementSpec(count_per_host=2, host_pattern='foo*'),
+ ['foo1', 'foo2', 'bar1', 'bar2'],
+ 4
+ ),
+ ])
+def test_placement_target_size(p, hosts, size):
+ assert p.get_target_count(
+ [HostPlacementSpec(n, '', '') for n in hosts]
+ ) == size
+
+
+def _get_dict_spec(s_type, s_id):
+ dict_spec = {
+ "service_id": s_id,
+ "service_type": s_type,
+ "placement":
+ dict(hosts=["host1:1.1.1.1"])
+ }
+ if s_type == 'nfs':
+ pass
+ elif s_type == 'iscsi':
+ dict_spec['pool'] = 'pool'
+ dict_spec['api_user'] = 'api_user'
+ dict_spec['api_password'] = 'api_password'
+ elif s_type == 'osd':
+ dict_spec['spec'] = {
+ 'data_devices': {
+ 'all': True
+ }
+ }
+ elif s_type == 'rgw':
+ dict_spec['rgw_realm'] = 'realm'
+ dict_spec['rgw_zone'] = 'zone'
+
+ return dict_spec
+
+
+@pytest.mark.parametrize(
+ "s_type,o_spec,s_id",
+ [
+ ("mgr", ServiceSpec, 'test'),
+ ("mon", ServiceSpec, 'test'),
+ ("mds", ServiceSpec, 'test'),
+ ("rgw", RGWSpec, 'realm.zone'),
+ ("nfs", NFSServiceSpec, 'test'),
+ ("iscsi", IscsiServiceSpec, 'test'),
+ ("osd", DriveGroupSpec, 'test'),
+ ])
+def test_servicespec_map_test(s_type, o_spec, s_id):
+ spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id))
+ assert isinstance(spec, o_spec)
+ assert isinstance(spec.placement, PlacementSpec)
+ assert isinstance(spec.placement.hosts[0], HostPlacementSpec)
+ assert spec.placement.hosts[0].hostname == 'host1'
+ assert spec.placement.hosts[0].network == '1.1.1.1'
+ assert spec.placement.hosts[0].name == ''
+ assert spec.validate() is None
+ ServiceSpec.from_json(spec.to_json())
+
+
+@pytest.mark.parametrize(
+ "realm, zone, frontend_type, raise_exception, msg",
+ [
+ ('realm', 'zone1', 'beast', False, ''),
+ ('realm', 'zone2', 'civetweb', False, ''),
+ ('realm', None, 'beast', True, 'Cannot add RGW: Realm specified but no zone specified'),
+ (None, 'zone1', 'beast', True, 'Cannot add RGW: Zone specified but no realm specified'),
+ ('realm', 'zone', 'invalid-beast', True, '^Invalid rgw_frontend_type value'),
+ ('realm', 'zone', 'invalid-civetweb', True, '^Invalid rgw_frontend_type value'),
+ ])
+def test_rgw_servicespec_parse(realm, zone, frontend_type, raise_exception, msg):
+ spec = RGWSpec(service_id='foo',
+ rgw_realm=realm,
+ rgw_zone=zone,
+ rgw_frontend_type=frontend_type)
+ if raise_exception:
+ with pytest.raises(SpecValidationError, match=msg):
+ spec.validate()
+ else:
+ spec.validate()
+
+def test_osd_unmanaged():
+ osd_spec = {"placement": {"host_pattern": "*"},
+ "service_id": "all-available-devices",
+ "service_name": "osd.all-available-devices",
+ "service_type": "osd",
+ "spec": {"data_devices": {"all": True}, "filter_logic": "AND", "objectstore": "bluestore"},
+ "unmanaged": True}
+
+ dg_spec = ServiceSpec.from_json(osd_spec)
+ assert dg_spec.unmanaged == True
+
+
+@pytest.mark.parametrize("y",
+"""service_type: crash
+service_name: crash
+placement:
+ host_pattern: '*'
+---
+service_type: crash
+service_name: crash
+placement:
+ host_pattern: '*'
+unmanaged: true
+---
+service_type: rgw
+service_id: default-rgw-realm.eu-central-1.1
+service_name: rgw.default-rgw-realm.eu-central-1.1
+placement:
+ hosts:
+ - ceph-001
+networks:
+- 10.0.0.0/8
+- 192.168.0.0/16
+spec:
+ rgw_frontend_type: civetweb
+ rgw_realm: default-rgw-realm
+ rgw_zone: eu-central-1
+---
+service_type: osd
+service_id: osd_spec_default
+service_name: osd.osd_spec_default
+placement:
+ host_pattern: '*'
+spec:
+ data_devices:
+ model: MC-55-44-XZ
+ db_devices:
+ model: SSD-123-foo
+ filter_logic: AND
+ objectstore: bluestore
+ wal_devices:
+ model: NVME-QQQQ-987
+---
+service_type: alertmanager
+service_name: alertmanager
+spec:
+ port: 1234
+ user_data:
+ default_webhook_urls:
+ - foo
+---
+service_type: grafana
+service_name: grafana
+spec:
+ anonymous_access: true
+ port: 1234
+ protocol: https
+---
+service_type: grafana
+service_name: grafana
+spec:
+ anonymous_access: true
+ initial_admin_password: secure
+ port: 1234
+ protocol: https
+---
+service_type: ingress
+service_id: rgw.foo
+service_name: ingress.rgw.foo
+placement:
+ hosts:
+ - host1
+ - host2
+ - host3
+spec:
+ backend_service: rgw.foo
+ first_virtual_router_id: 50
+ frontend_port: 8080
+ monitor_port: 8081
+ virtual_ip: 192.168.20.1/24
+---
+service_type: nfs
+service_id: mynfs
+service_name: nfs.mynfs
+spec:
+ port: 1234
+---
+service_type: iscsi
+service_id: iscsi
+service_name: iscsi.iscsi
+networks:
+- ::0/8
+spec:
+ api_password: admin
+ api_port: 5000
+ api_user: admin
+ pool: pool
+ trusted_ip_list:
+ - ::1
+ - ::2
+---
+service_type: container
+service_id: hello-world
+service_name: container.hello-world
+spec:
+ args:
+ - --foo
+ bind_mounts:
+ - - type=bind
+ - source=lib/modules
+ - destination=/lib/modules
+ - ro=true
+ dirs:
+ - foo
+ - bar
+ entrypoint: /usr/bin/bash
+ envs:
+ - FOO=0815
+ files:
+ bar.conf:
+ - foo
+ - bar
+ foo.conf: 'foo
+
+ bar'
+ gid: 2000
+ image: docker.io/library/hello-world:latest
+ ports:
+ - 8080
+ - 8443
+ uid: 1000
+ volume_mounts:
+ foo: /foo
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_community: public
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V2c
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ auth_protocol: MD5
+ credentials:
+ snmp_v3_auth_password: mypassword
+ snmp_v3_auth_username: myuser
+ engine_id: 8000C53F00000000
+ port: 9464
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V3
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_password: mypassword
+ snmp_v3_auth_username: myuser
+ snmp_v3_priv_password: mysecret
+ engine_id: 8000C53F00000000
+ privacy_protocol: AES
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V3
+""".split('---\n'))
+def test_yaml(y):
+ data = yaml.safe_load(y)
+ object = ServiceSpec.from_json(data)
+
+ assert yaml.dump(object) == y
+ assert yaml.dump(ServiceSpec.from_json(object.to_json())) == y
+
+
+def test_alertmanager_spec_1():
+ spec = AlertManagerSpec()
+ assert spec.service_type == 'alertmanager'
+ assert isinstance(spec.user_data, dict)
+ assert len(spec.user_data.keys()) == 0
+ assert spec.get_port_start() == [9093, 9094]
+
+
+def test_alertmanager_spec_2():
+ spec = AlertManagerSpec(user_data={'default_webhook_urls': ['foo']})
+ assert isinstance(spec.user_data, dict)
+ assert 'default_webhook_urls' in spec.user_data.keys()
+
+
+
+def test_repr():
+ val = """ServiceSpec.from_json(yaml.safe_load('''service_type: crash
+service_name: crash
+placement:
+ count: 42
+'''))"""
+ obj = eval(val)
+ assert obj.service_type == 'crash'
+ assert val == repr(obj)
+
+@pytest.mark.parametrize("spec1, spec2, eq",
+ [
+ (
+ ServiceSpec(
+ service_type='mon'
+ ),
+ ServiceSpec(
+ service_type='mon'
+ ),
+ True
+ ),
+ (
+ ServiceSpec(
+ service_type='mon'
+ ),
+ ServiceSpec(
+ service_type='mon',
+ service_id='foo'
+ ),
+ True
+ ),
+ # Add service_type='mgr'
+ (
+ ServiceSpec(
+ service_type='osd'
+ ),
+ ServiceSpec(
+ service_type='osd',
+ ),
+ True
+ ),
+ (
+ ServiceSpec(
+ service_type='osd'
+ ),
+ DriveGroupSpec(),
+ True
+ ),
+ (
+ ServiceSpec(
+ service_type='osd'
+ ),
+ ServiceSpec(
+ service_type='osd',
+ service_id='foo',
+ ),
+ False
+ ),
+ (
+ ServiceSpec(
+ service_type='rgw',
+ service_id='foo',
+ ),
+ RGWSpec(service_id='foo'),
+ True
+ ),
+ ])
+def test_spec_hash_eq(spec1: ServiceSpec,
+ spec2: ServiceSpec,
+ eq: bool):
+
+ assert (spec1 == spec2) is eq
+
+@pytest.mark.parametrize(
+ "s_type,s_id,s_name",
+ [
+ ('mgr', 's_id', 'mgr'),
+ ('mon', 's_id', 'mon'),
+ ('mds', 's_id', 'mds.s_id'),
+ ('rgw', 's_id', 'rgw.s_id'),
+ ('nfs', 's_id', 'nfs.s_id'),
+ ('iscsi', 's_id', 'iscsi.s_id'),
+ ('osd', 's_id', 'osd.s_id'),
+ ])
+def test_service_name(s_type, s_id, s_name):
+ spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id))
+ spec.validate()
+ assert spec.service_name() == s_name
+
+@pytest.mark.parametrize(
+ 's_type,s_id',
+ [
+ ('mds', 's:id'), # MDS service_id cannot contain an invalid char ':'
+ ('mds', '1abc'), # MDS service_id cannot start with a numeric digit
+ ('mds', ''), # MDS service_id cannot be empty
+ ('rgw', '*s_id'),
+ ('nfs', 's/id'),
+ ('iscsi', 's@id'),
+ ('osd', 's;id'),
+ ])
+
+def test_service_id_raises_invalid_char(s_type, s_id):
+ with pytest.raises(SpecValidationError):
+ spec = ServiceSpec.from_json(_get_dict_spec(s_type, s_id))
+ spec.validate()
+
+def test_custom_container_spec():
+ spec = CustomContainerSpec(service_id='hello-world',
+ image='docker.io/library/hello-world:latest',
+ entrypoint='/usr/bin/bash',
+ uid=1000,
+ gid=2000,
+ volume_mounts={'foo': '/foo'},
+ args=['--foo'],
+ envs=['FOO=0815'],
+ bind_mounts=[
+ [
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]
+ ],
+ ports=[8080, 8443],
+ dirs=['foo', 'bar'],
+ files={
+ 'foo.conf': 'foo\nbar',
+ 'bar.conf': ['foo', 'bar']
+ })
+ assert spec.service_type == 'container'
+ assert spec.entrypoint == '/usr/bin/bash'
+ assert spec.uid == 1000
+ assert spec.gid == 2000
+ assert spec.volume_mounts == {'foo': '/foo'}
+ assert spec.args == ['--foo']
+ assert spec.envs == ['FOO=0815']
+ assert spec.bind_mounts == [
+ [
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]
+ ]
+ assert spec.ports == [8080, 8443]
+ assert spec.dirs == ['foo', 'bar']
+ assert spec.files == {
+ 'foo.conf': 'foo\nbar',
+ 'bar.conf': ['foo', 'bar']
+ }
+
+
+def test_custom_container_spec_config_json():
+ spec = CustomContainerSpec(service_id='foo', image='foo', dirs=None)
+ config_json = spec.config_json()
+ for key in ['entrypoint', 'uid', 'gid', 'bind_mounts', 'dirs']:
+ assert key not in config_json
+
+
+def test_ingress_spec():
+ yaml_str = """service_type: ingress
+service_id: rgw.foo
+placement:
+ hosts:
+ - host1
+ - host2
+ - host3
+spec:
+ virtual_ip: 192.168.20.1/24
+ backend_service: rgw.foo
+ frontend_port: 8080
+ monitor_port: 8081
+"""
+ yaml_file = yaml.safe_load(yaml_str)
+ spec = ServiceSpec.from_json(yaml_file)
+ assert spec.service_type == "ingress"
+ assert spec.service_id == "rgw.foo"
+ assert spec.virtual_ip == "192.168.20.1/24"
+ assert spec.frontend_port == 8080
+ assert spec.monitor_port == 8081
+
+
+@pytest.mark.parametrize("y, error_match", [
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count_per_host: "twelve"
+""", "count-per-host must be a numeric value",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count_per_host: "2"
+""", "count-per-host must be an integer value",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count_per_host: 7.36
+""", "count-per-host must be an integer value",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count: "fifteen"
+""", "num/count must be a numeric value",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count: "4"
+""", "num/count must be an integer value",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count: 7.36
+""", "num/count must be an integer value",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count: 0
+""", "num/count must be >= 1",),
+ ("""
+service_type: rgw
+service_id: foo
+placement:
+ count_per_host: 0
+""", "count-per-host must be >= 1",),
+ ("""
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_password: mypassword
+ snmp_v3_auth_username: myuser
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ engine_id: 8000c53f0000000000
+ privacy_protocol: WEIRD
+ snmp_destination: 192.168.122.1:162
+ auth_protocol: BIZARRE
+ snmp_version: V3
+""", "auth_protocol unsupported. Must be one of MD5, SHA"),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_community: public
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V4
+""", 'snmp_version unsupported. Must be one of V2c, V3'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_community: public
+ port: 9464
+ snmp_destination: 192.168.1.42:162
+""", re.escape('Missing SNMP version (snmp_version)')),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ port: 9464
+ auth_protocol: wah
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V3
+""", 'auth_protocol unsupported. Must be one of MD5, SHA'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: weewah
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V3
+""", 'privacy_protocol unsupported. Must be one of DES, AES'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V3
+""", 'Must provide an engine_id for SNMP V3 notifications'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_community: public
+ port: 9464
+ snmp_destination: 192.168.1.42
+ snmp_version: V2c
+""", re.escape('SNMP destination (snmp_destination) type (IPv4) is invalid. Must be either: IPv4:Port, Name:Port')),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: bogus
+ snmp_destination: 192.168.1.42:162
+ snmp_version: V3
+""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ port: 9464
+ auth_protocol: SHA
+ engine_id: 8000C53F0000000000
+ snmp_version: V3
+""", re.escape('SNMP destination (snmp_destination) must be provided')),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: 8000C53F0000000000
+ snmp_destination: my.imaginary.snmp-host
+ snmp_version: V3
+""", re.escape('SNMP destination (snmp_destination) is invalid: DNS lookup failed')),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: 8000C53F0000000000
+ snmp_destination: 10.79.32.10:fred
+ snmp_version: V3
+""", re.escape('SNMP destination (snmp_destination) is invalid: Port must be numeric')),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: 8000C53
+ snmp_destination: 10.79.32.10:162
+ snmp_version: V3
+""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: 8000C53DOH!
+ snmp_destination: 10.79.32.10:162
+ snmp_version: V3
+""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: 8000C53FCA7344403DC611EC9B985254002537A6C53FCA7344403DC6112537A60
+ snmp_destination: 10.79.32.10:162
+ snmp_version: V3
+""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'),
+ ("""
+---
+service_type: snmp-gateway
+service_name: snmp-gateway
+placement:
+ count: 1
+spec:
+ credentials:
+ snmp_v3_auth_username: myuser
+ snmp_v3_auth_password: mypassword
+ snmp_v3_priv_password: mysecret
+ port: 9464
+ auth_protocol: SHA
+ privacy_protocol: AES
+ engine_id: 8000C53F00000
+ snmp_destination: 10.79.32.10:162
+ snmp_version: V3
+""", 'engine_id must be a string containing 10-64 hex characters. Its length must be divisible by 2'),
+ ])
+def test_service_spec_validation_error(y, error_match):
+ data = yaml.safe_load(y)
+ with pytest.raises(SpecValidationError) as err:
+ specObj = ServiceSpec.from_json(data)
+ assert err.match(error_match)
+
+
+@pytest.mark.parametrize("y, ec_args, ee_args, ec_final_args, ee_final_args", [
+ pytest.param("""
+service_type: container
+service_id: hello-world
+service_name: container.hello-world
+spec:
+ args:
+ - --foo
+ bind_mounts:
+ - - type=bind
+ - source=lib/modules
+ - destination=/lib/modules
+ - ro=true
+ dirs:
+ - foo
+ - bar
+ entrypoint: /usr/bin/bash
+ envs:
+ - FOO=0815
+ files:
+ bar.conf:
+ - foo
+ - bar
+ foo.conf: 'foo
+
+ bar'
+ gid: 2000
+ image: docker.io/library/hello-world:latest
+ ports:
+ - 8080
+ - 8443
+ uid: 1000
+ volume_mounts:
+ foo: /foo
+""",
+ None,
+ None,
+ None,
+ None,
+ id="no_extra_args"),
+ pytest.param("""
+service_type: container
+service_id: hello-world
+service_name: container.hello-world
+spec:
+ args:
+ - --foo
+ extra_entrypoint_args:
+ - "--lasers=blue"
+ - "--enable-confetti"
+ bind_mounts:
+ - - type=bind
+ - source=lib/modules
+ - destination=/lib/modules
+ - ro=true
+ dirs:
+ - foo
+ - bar
+ entrypoint: /usr/bin/bash
+ envs:
+ - FOO=0815
+ files:
+ bar.conf:
+ - foo
+ - bar
+ foo.conf: 'foo
+
+ bar'
+ gid: 2000
+ image: docker.io/library/hello-world:latest
+ ports:
+ - 8080
+ - 8443
+ uid: 1000
+ volume_mounts:
+ foo: /foo
+""",
+ None,
+ ["--lasers=blue", "--enable-confetti"],
+ None,
+ ["--lasers=blue", "--enable-confetti"],
+ id="only_extra_entrypoint_args_spec"),
+ pytest.param("""
+service_type: container
+service_id: hello-world
+service_name: container.hello-world
+spec:
+ args:
+ - --foo
+ bind_mounts:
+ - - type=bind
+ - source=lib/modules
+ - destination=/lib/modules
+ - ro=true
+ dirs:
+ - foo
+ - bar
+ entrypoint: /usr/bin/bash
+ envs:
+ - FOO=0815
+ files:
+ bar.conf:
+ - foo
+ - bar
+ foo.conf: 'foo
+
+ bar'
+ gid: 2000
+ image: docker.io/library/hello-world:latest
+ ports:
+ - 8080
+ - 8443
+ uid: 1000
+ volume_mounts:
+ foo: /foo
+extra_entrypoint_args:
+- "--lasers blue"
+- "--enable-confetti"
+""",
+ None,
+ ["--lasers blue", "--enable-confetti"],
+ None,
+ ["--lasers", "blue", "--enable-confetti"],
+ id="only_extra_entrypoint_args_toplevel"),
+ pytest.param("""
+service_type: nfs
+service_id: mynfs
+service_name: nfs.mynfs
+spec:
+ port: 1234
+ extra_entrypoint_args:
+ - "--lasers=blue"
+ - "--title=Custom NFS Options"
+ extra_container_args:
+ - "--cap-add=CAP_NET_BIND_SERVICE"
+ - "--oom-score-adj=12"
+""",
+ ["--cap-add=CAP_NET_BIND_SERVICE", "--oom-score-adj=12"],
+ ["--lasers=blue", "--title=Custom NFS Options"],
+ ["--cap-add=CAP_NET_BIND_SERVICE", "--oom-score-adj=12"],
+ ["--lasers=blue", "--title=Custom", "NFS", "Options"],
+ id="both_kinds_nfs"),
+ pytest.param("""
+service_type: container
+service_id: hello-world
+service_name: container.hello-world
+spec:
+ args:
+ - --foo
+ bind_mounts:
+ - - type=bind
+ - source=lib/modules
+ - destination=/lib/modules
+ - ro=true
+ dirs:
+ - foo
+ - bar
+ entrypoint: /usr/bin/bash
+ envs:
+ - FOO=0815
+ files:
+ bar.conf:
+ - foo
+ - bar
+ foo.conf: 'foo
+
+ bar'
+ gid: 2000
+ image: docker.io/library/hello-world:latest
+ ports:
+ - 8080
+ - 8443
+ uid: 1000
+ volume_mounts:
+ foo: /foo
+extra_entrypoint_args:
+- argument: "--lasers=blue"
+ split: true
+- argument: "--enable-confetti"
+""",
+ None,
+ [
+ {"argument": "--lasers=blue", "split": True},
+ {"argument": "--enable-confetti", "split": False},
+ ],
+ None,
+ [
+ "--lasers=blue",
+ "--enable-confetti",
+ ],
+ id="only_extra_entrypoint_args_obj_toplevel"),
+ pytest.param("""
+service_type: container
+service_id: hello-world
+service_name: container.hello-world
+spec:
+ args:
+ - --foo
+ bind_mounts:
+ - - type=bind
+ - source=lib/modules
+ - destination=/lib/modules
+ - ro=true
+ dirs:
+ - foo
+ - bar
+ entrypoint: /usr/bin/bash
+ envs:
+ - FOO=0815
+ files:
+ bar.conf:
+ - foo
+ - bar
+ foo.conf: 'foo
+
+ bar'
+ gid: 2000
+ image: docker.io/library/hello-world:latest
+ ports:
+ - 8080
+ - 8443
+ uid: 1000
+ volume_mounts:
+ foo: /foo
+ extra_entrypoint_args:
+ - argument: "--lasers=blue"
+ split: true
+ - argument: "--enable-confetti"
+""",
+ None,
+ [
+ {"argument": "--lasers=blue", "split": True},
+ {"argument": "--enable-confetti", "split": False},
+ ],
+ None,
+ [
+ "--lasers=blue",
+ "--enable-confetti",
+ ],
+ id="only_extra_entrypoint_args_obj_indented"),
+ pytest.param("""
+service_type: nfs
+service_id: mynfs
+service_name: nfs.mynfs
+spec:
+ port: 1234
+extra_entrypoint_args:
+- argument: "--lasers=blue"
+- argument: "--title=Custom NFS Options"
+extra_container_args:
+- argument: "--cap-add=CAP_NET_BIND_SERVICE"
+- argument: "--oom-score-adj=12"
+""",
+ [
+ {"argument": "--cap-add=CAP_NET_BIND_SERVICE", "split": False},
+ {"argument": "--oom-score-adj=12", "split": False},
+ ],
+ [
+ {"argument": "--lasers=blue", "split": False},
+ {"argument": "--title=Custom NFS Options", "split": False},
+ ],
+ [
+ "--cap-add=CAP_NET_BIND_SERVICE",
+ "--oom-score-adj=12",
+ ],
+ [
+ "--lasers=blue",
+ "--title=Custom NFS Options",
+ ],
+ id="both_kinds_obj_nfs"),
+])
+def test_extra_args_handling(y, ec_args, ee_args, ec_final_args, ee_final_args):
+ data = yaml.safe_load(y)
+ spec_obj = ServiceSpec.from_json(data)
+
+ assert ArgumentSpec.map_json(spec_obj.extra_container_args) == ec_args
+ assert ArgumentSpec.map_json(spec_obj.extra_entrypoint_args) == ee_args
+ if ec_final_args is None:
+ assert spec_obj.extra_container_args is None
+ else:
+ ec_res = []
+ for args in spec_obj.extra_container_args:
+ ec_res.extend(args.to_args())
+ assert ec_res == ec_final_args
+ if ee_final_args is None:
+ assert spec_obj.extra_entrypoint_args is None
+ else:
+ ee_res = []
+ for args in spec_obj.extra_entrypoint_args:
+ ee_res.extend(args.to_args())
+ assert ee_res == ee_final_args
diff --git a/src/python-common/ceph/tests/test_utils.py b/src/python-common/ceph/tests/test_utils.py
new file mode 100644
index 000000000..8a94ac400
--- /dev/null
+++ b/src/python-common/ceph/tests/test_utils.py
@@ -0,0 +1,75 @@
+import pytest
+
+from ceph.deployment.utils import is_ipv6, unwrap_ipv6, wrap_ipv6, valid_addr
+from typing import NamedTuple
+
+
+def test_is_ipv6():
+ for good in ("[::1]", "::1",
+ "fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"):
+ assert is_ipv6(good)
+ for bad in ("127.0.0.1",
+ "ffff:ffff:ffff:ffff:ffff:ffff:ffff:fffg",
+ "1:2:3:4:5:6:7:8:9", "fd00::1::1", "[fg::1]"):
+ assert not is_ipv6(bad)
+
+
+def test_unwrap_ipv6():
+ def unwrap_test(address, expected):
+ assert unwrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '::1'), ('[::1]', '::1'),
+ ('[fde4:8dba:82e1:0:5054:ff:fe6a:357]', 'fde4:8dba:82e1:0:5054:ff:fe6a:357'),
+ ('can actually be any string', 'can actually be any string'),
+ ('[but needs to be stripped] ', '[but needs to be stripped] ')]
+ for address, expected in tests:
+ unwrap_test(address, expected)
+
+
+def test_wrap_ipv6():
+ def wrap_test(address, expected):
+ assert wrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '[::1]'), ('[::1]', '[::1]'),
+ ('fde4:8dba:82e1:0:5054:ff:fe6a:357', '[fde4:8dba:82e1:0:5054:ff:fe6a:357]'),
+ ('myhost.example.com', 'myhost.example.com'), ('192.168.0.1', '192.168.0.1'),
+ ('', ''), ('fd00::1::1', 'fd00::1::1')]
+ for address, expected in tests:
+ wrap_test(address, expected)
+
+
+class Address(NamedTuple):
+ addr: str
+ status: bool
+ description: str
+
+
+@pytest.mark.parametrize('addr_object', [
+ Address('www.ibm.com', True, 'Name'),
+ Address('www.google.com:162', True, 'Name:Port'),
+ Address('my.big.domain.name.for.big.people', False, 'DNS lookup failed'),
+ Address('192.168.122.1', True, 'IPv4'),
+ Address('[192.168.122.1]', False, 'IPv4 address wrapped in brackets is invalid'),
+ Address('10.40003.200', False, 'Invalid partial IPv4 address'),
+ Address('10.7.5', False, 'Invalid partial IPv4 address'),
+ Address('10.7', False, 'Invalid partial IPv4 address'),
+ Address('192.168.122.5:7090', True, 'IPv4:Port'),
+ Address('fe80::7561:c8fb:d3d7:1fa4', True, 'IPv6'),
+ Address('[fe80::7561:c8fb:d3d7:1fa4]:9464', True, 'IPv6:Port'),
+ Address('[fe80::7561:c8fb:d3d7:1fa4]', True, 'IPv6'),
+ Address('[fe80::7561:c8fb:d3d7:1fa4', False,
+ 'Address has incorrect/incomplete use of enclosing brackets'),
+ Address('fe80::7561:c8fb:d3d7:1fa4]', False,
+ 'Address has incorrect/incomplete use of enclosing brackets'),
+ Address('fred.knockinson.org', False, 'DNS lookup failed'),
+ Address('tumbleweed.pickles.gov.au', False, 'DNS lookup failed'),
+ Address('192.168.122.5:00PS', False, 'Port must be numeric'),
+ Address('[fe80::7561:c8fb:d3d7:1fa4]:DOH', False, 'Port must be numeric')
+])
+def test_valid_addr(addr_object: Address):
+
+ valid, description = valid_addr(addr_object.addr)
+ assert valid == addr_object.status
+ assert description == addr_object.description
diff --git a/src/python-common/ceph/tests/utils.py b/src/python-common/ceph/tests/utils.py
new file mode 100644
index 000000000..04b8a4e38
--- /dev/null
+++ b/src/python-common/ceph/tests/utils.py
@@ -0,0 +1,46 @@
+from ceph.deployment.inventory import Devices, Device
+
+try:
+ from typing import Any, List
+except ImportError:
+ pass # for type checking
+
+
+def _mk_device(rotational=True,
+ locked=False,
+ size="394.27 GB",
+ vendor='Vendor',
+ model='Model'):
+ return [Device(
+ path='??',
+ sys_api={
+ "rotational": '1' if rotational else '0',
+ "vendor": vendor,
+ "human_readable_size": size,
+ "partitions": {},
+ "locked": int(locked),
+ "sectorsize": "512",
+ "removable": "0",
+ "path": "??",
+ "support_discard": "",
+ "model": model,
+ "ro": "0",
+ "nr_requests": "128",
+ "size": 423347879936 # ignore coversion from human_readable_size
+ },
+ available=not locked,
+ rejected_reasons=['locked'] if locked else [],
+ lvs=[],
+ device_id="Model-Vendor-foobar"
+ )]
+
+
+def _mk_inventory(devices):
+ # type: (Any) -> List[Device]
+ devs = []
+ for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))):
+ dev = Device.from_json(dev_.to_json())
+ dev.path = '/dev/sd' + name
+ dev.sys_api = dict(dev_.sys_api, path='/dev/sd' + name)
+ devs.append(dev)
+ return Devices(devices=devs).devices
diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py
new file mode 100644
index 000000000..643be0658
--- /dev/null
+++ b/src/python-common/ceph/utils.py
@@ -0,0 +1,123 @@
+import datetime
+import re
+import string
+
+from typing import Optional
+
+
+def datetime_now() -> datetime.datetime:
+ """
+ Return the current local date and time.
+ :return: Returns an aware datetime object of the current date
+ and time.
+ """
+ return datetime.datetime.now(tz=datetime.timezone.utc)
+
+
+def datetime_to_str(dt: datetime.datetime) -> str:
+ """
+ Convert a datetime object into a ISO 8601 string, e.g.
+ '2019-04-24T17:06:53.039991Z'.
+ :param dt: The datetime object to process.
+ :return: Return a string representing the date in
+ ISO 8601 (timezone=UTC).
+ """
+ return dt.astimezone(tz=datetime.timezone.utc).strftime(
+ '%Y-%m-%dT%H:%M:%S.%fZ')
+
+
+def str_to_datetime(string: str) -> datetime.datetime:
+ """
+ Convert an ISO 8601 string into a datetime object.
+ The following formats are supported:
+
+ - 2020-03-03T09:21:43.636153304Z
+ - 2020-03-03T15:52:30.136257504-0600
+ - 2020-03-03T15:52:30.136257504
+
+ :param string: The string to parse.
+ :return: Returns an aware datetime object of the given date
+ and time string.
+ :raises: :exc:`~exceptions.ValueError` for an unknown
+ datetime string.
+ """
+ fmts = [
+ '%Y-%m-%dT%H:%M:%S.%f',
+ '%Y-%m-%dT%H:%M:%S.%f%z'
+ ]
+
+ # In *all* cases, the 9 digit second precision is too much for
+ # Python's strptime. Shorten it to 6 digits.
+ p = re.compile(r'(\.[\d]{6})[\d]*')
+ string = p.sub(r'\1', string)
+
+ # Replace trailing Z with -0000, since (on Python 3.6.8) it
+ # won't parse.
+ if string and string[-1] == 'Z':
+ string = string[:-1] + '-0000'
+
+ for fmt in fmts:
+ try:
+ dt = datetime.datetime.strptime(string, fmt)
+ # Make sure the datetime object is aware (timezone is set).
+ # If not, then assume the time is in UTC.
+ if dt.tzinfo is None:
+ dt = dt.replace(tzinfo=datetime.timezone.utc)
+ return dt
+ except ValueError:
+ pass
+
+ raise ValueError("Time data {} does not match one of the formats {}".format(
+ string, str(fmts)))
+
+
+def parse_timedelta(delta: str) -> Optional[datetime.timedelta]:
+ """
+ Returns a timedelta object represents a duration, the difference
+ between two dates or times.
+
+ >>> parse_timedelta('foo')
+
+ >>> parse_timedelta('2d') == datetime.timedelta(days=2)
+ True
+
+ >>> parse_timedelta("4w") == datetime.timedelta(days=28)
+ True
+
+ >>> parse_timedelta("5s") == datetime.timedelta(seconds=5)
+ True
+
+ >>> parse_timedelta("-5s") == datetime.timedelta(days=-1, seconds=86395)
+ True
+
+ :param delta: The string to process, e.g. '2h', '10d', '30s'.
+ :return: The `datetime.timedelta` object or `None` in case of
+ a parsing error.
+ """
+ parts = re.match(r'(?P<seconds>-?\d+)s|'
+ r'(?P<minutes>-?\d+)m|'
+ r'(?P<hours>-?\d+)h|'
+ r'(?P<days>-?\d+)d|'
+ r'(?P<weeks>-?\d+)w$',
+ delta,
+ re.IGNORECASE)
+ if not parts:
+ return None
+ parts = parts.groupdict()
+ args = {name: int(param) for name, param in parts.items() if param}
+ return datetime.timedelta(**args)
+
+
+def is_hex(s: str, strict: bool = True) -> bool:
+ """Simple check that a string contains only hex chars"""
+ try:
+ int(s, 16)
+ except ValueError:
+ return False
+
+ # s is multiple chars, but we should catch a '+/-' prefix too.
+ if strict:
+ if s[0] not in string.hexdigits:
+ return False
+
+ return True
diff --git a/src/python-common/requirements-lint.txt b/src/python-common/requirements-lint.txt
new file mode 100644
index 000000000..2a7142182
--- /dev/null
+++ b/src/python-common/requirements-lint.txt
@@ -0,0 +1,2 @@
+flake8==3.7.8
+rstcheck==3.3.1
diff --git a/src/python-common/requirements.txt b/src/python-common/requirements.txt
new file mode 100644
index 000000000..88b47310d
--- /dev/null
+++ b/src/python-common/requirements.txt
@@ -0,0 +1,8 @@
+pytest >=2.1.3,<5; python_version < '3.5'
+mock; python_version < '3.3'
+mypy; python_version >= '3'
+pytest-mypy; python_version >= '3'
+pytest >= 2.1.3; python_version >= '3'
+pyyaml
+typing-extensions; python_version < '3.8'
+types-PyYAML
diff --git a/src/python-common/setup.py b/src/python-common/setup.py
new file mode 100644
index 000000000..43a46eb10
--- /dev/null
+++ b/src/python-common/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup, find_packages
+
+
+with open("README.rst", "r") as fh:
+ long_description = fh.read()
+
+
+setup(
+ name='ceph',
+ version='1.0.0',
+ packages=find_packages(),
+ author='',
+ author_email='dev@ceph.io',
+ description='Ceph common library',
+ long_description=long_description,
+ license='LGPLv2+',
+ keywords='ceph',
+ url="https://github.com/ceph/ceph",
+ zip_safe = False,
+ install_requires=(
+ 'pyyaml',
+ ),
+ classifiers = [
+ 'Intended Audience :: Developer',
+ 'Operating System :: POSIX :: Linux',
+ 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3.5',
+ 'Programming Language :: Python :: 3.6',
+ ]
+)
diff --git a/src/python-common/tox.ini b/src/python-common/tox.ini
new file mode 100644
index 000000000..2737a87e8
--- /dev/null
+++ b/src/python-common/tox.ini
@@ -0,0 +1,35 @@
+[tox]
+envlist = py3, mypy, lint
+skip_missing_interpreters = true
+
+[testenv:py3]
+deps=
+ -rrequirements.txt
+ -c{toxinidir}/../mypy-constrains.txt
+commands=
+ pytest --doctest-modules ceph/deployment/service_spec.py ceph/utils.py
+ pytest {posargs}
+ mypy --config-file=../mypy.ini -p ceph
+
+[testenv:mypy]
+deps=
+ -rrequirements.txt
+ -c{toxinidir}/../mypy-constrains.txt
+commands=
+ mypy --config-file=../mypy.ini -p ceph
+
+[tool:pytest]
+norecursedirs = .* _* virtualenv
+
+[flake8]
+max-line-length = 100
+exclude =
+ __pycache__
+
+[testenv:lint]
+deps =
+ -rrequirements-lint.txt
+commands =
+ flake8 {posargs:ceph}
+ rstcheck --report info --debug README.rst
+