""" ceph-mgr orchestrator interface Please see the ceph-mgr module developer's guide for more information. """ import sys import time import fnmatch import uuid import datetime import six from mgr_module import MgrModule, PersistentStoreDict from mgr_util import format_bytes try: from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator T = TypeVar('T') G = Generic[T] except ImportError: T, G = object, object class OrchestratorError(Exception): """ General orchestrator specific error. Used for deployment, configuration or user errors. It's not intended for programming errors or orchestrator internal errors. """ class NoOrchestrator(OrchestratorError): """ No orchestrator in configured. """ def __init__(self, msg="No orchestrator configured (try `ceph orchestrator set backend`)"): super(NoOrchestrator, self).__init__(msg) class OrchestratorValidationError(OrchestratorError): """ Raised when an orchestrator doesn't support a specific feature. """ class _Completion(G): @property def result(self): # type: () -> T """ Return the result of the operation that we were waited for. Only valid after calling Orchestrator.wait() on this completion. """ raise NotImplementedError() @property def exception(self): # type: () -> Optional[Exception] """ Holds an exception object. """ try: return self.__exception except AttributeError: return None @exception.setter def exception(self, value): self.__exception = value @property def is_read(self): # type: () -> bool raise NotImplementedError() @property def is_complete(self): # type: () -> bool raise NotImplementedError() @property def is_errored(self): # type: () -> bool """ Has the completion failed. Default implementation looks for self.exception. Can be overwritten. """ return self.exception is not None @property def should_wait(self): # type: () -> bool raise NotImplementedError() def raise_if_exception(c): # type: (_Completion) -> None """ :raises OrchestratorError: Some user error or a config error. :raises Exception: Some internal error """ def copy_to_this_subinterpreter(r_obj): # This is something like `return pickle.loads(pickle.dumps(r_obj))` # Without importing anything. r_cls = r_obj.__class__ if r_cls.__module__ == '__builtin__': return r_obj my_cls = getattr(sys.modules[r_cls.__module__], r_cls.__name__) if id(my_cls) == id(r_cls): return r_obj my_obj = my_cls.__new__(my_cls) for k,v in r_obj.__dict__.items(): setattr(my_obj, k, copy_to_this_subinterpreter(v)) return my_obj if c.exception is not None: raise copy_to_this_subinterpreter(c.exception) class ReadCompletion(_Completion): """ ``Orchestrator`` implementations should inherit from this class to implement their own handles to operations in progress, and return an instance of their subclass from calls into methods. """ def __init__(self): pass @property def is_read(self): return True @property def should_wait(self): """Could the external operation be deemed as complete, or should we wait? We must wait for a read operation only if it is not complete. """ return not self.is_complete class TrivialReadCompletion(ReadCompletion): """ This is the trivial completion simply wrapping a result. """ def __init__(self, result): super(TrivialReadCompletion, self).__init__() self._result = result @property def result(self): return self._result @property def is_complete(self): return True class WriteCompletion(_Completion): """ ``Orchestrator`` implementations should inherit from this class to implement their own handles to operations in progress, and return an instance of their subclass from calls into methods. """ def __init__(self): self.progress_id = str(uuid.uuid4()) #: if a orchestrator module can provide a more detailed #: progress information, it needs to also call ``progress.update()``. self.progress = 0.5 def __str__(self): """ ``__str__()`` is used for determining the message for progress events. """ return super(WriteCompletion, self).__str__() @property def is_persistent(self): # type: () -> bool """ Has the operation updated the orchestrator's configuration persistently? Typically this would indicate that an update had been written to a manifest, but that the update had not necessarily been pushed out to the cluster. """ raise NotImplementedError() @property def is_effective(self): """ Has the operation taken effect on the cluster? For example, if we were adding a service, has it come up and appeared in Ceph's cluster maps? """ raise NotImplementedError() @property def is_complete(self): return self.is_errored or (self.is_persistent and self.is_effective) @property def is_read(self): return False @property def should_wait(self): """Could the external operation be deemed as complete, or should we wait? We must wait for a write operation only if we know it is not persistent yet. """ return not self.is_persistent class Orchestrator(object): """ Calls in this class may do long running remote operations, with time periods ranging from network latencies to package install latencies and large internet downloads. For that reason, all are asynchronous, and return ``Completion`` objects. Implementations are not required to start work on an operation until the caller waits on the relevant Completion objects. Callers making multiple updates should not wait on Completions until they're done sending operations: this enables implementations to batch up a series of updates when wait() is called on a set of Completion objects. Implementations are encouraged to keep reasonably fresh caches of the status of the system: it is better to serve a stale-but-recent result read of e.g. device inventory than it is to keep the caller waiting while you scan hosts every time. """ def is_orchestrator_module(self): """ Enable other modules to interrogate this module to discover whether it's usable as an orchestrator module. Subclasses do not need to override this. """ return True def available(self): # type: () -> Tuple[bool, str] """ Report whether we can talk to the orchestrator. This is the place to give the user a meaningful message if the orchestrator isn't running or can't be contacted. This method may be called frequently (e.g. every page load to conditionally display a warning banner), so make sure it's not too expensive. It's okay to give a slightly stale status (e.g. based on a periodic background ping of the orchestrator) if that's necessary to make this method fast. ..note:: `True` doesn't mean that the desired functionality is actually available in the orchestrator. I.e. this won't work as expected:: >>> if OrchestratorClientMixin().available()[0]: # wrong. ... OrchestratorClientMixin().get_hosts() :return: two-tuple of boolean, string """ raise NotImplementedError() def wait(self, completions): """ Given a list of Completion instances, progress any which are incomplete. Return a true if everything is done. Callers should inspect the detail of each completion to identify partial completion/progress information, and present that information to the user. For fast operations (e.g. reading from a database), implementations may choose to do blocking IO in this call. :rtype: bool """ raise NotImplementedError() def add_host(self, host): # type: (str) -> WriteCompletion """ Add a host to the orchestrator inventory. :param host: hostname """ raise NotImplementedError() def remove_host(self, host): # type: (str) -> WriteCompletion """ Remove a host from the orchestrator inventory. :param host: hostname """ raise NotImplementedError() def get_hosts(self): # type: () -> ReadCompletion[List[InventoryNode]] """ Report the hosts in the cluster. The default implementation is extra slow. :return: list of InventoryNodes """ return self.get_inventory() def get_inventory(self, node_filter=None, refresh=False): # type: (InventoryFilter, bool) -> ReadCompletion[List[InventoryNode]] """ Returns something that was created by `ceph-volume inventory`. :return: list of InventoryNode """ raise NotImplementedError() def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): # type: (Optional[str], Optional[str], Optional[str], bool) -> ReadCompletion[List[ServiceDescription]] """ Describe a service (of any kind) that is already configured in the orchestrator. For example, when viewing an OSD in the dashboard we might like to also display information about the orchestrator's view of the service (like the kubernetes pod ID). When viewing a CephFS filesystem in the dashboard, we would use this to display the pods being currently run for MDS daemons. :return: list of ServiceDescription objects. """ raise NotImplementedError() def service_action(self, action, service_type, service_name=None, service_id=None): # type: (str, str, str, str) -> WriteCompletion """ Perform an action (start/stop/reload) on a service. Either service_name or service_id must be specified: * If using service_name, perform the action on that entire logical service (i.e. all daemons providing that named service). * If using service_id, perform the action on a single specific daemon instance. :param action: one of "start", "stop", "reload" :param service_type: e.g. "mds", "rgw", ... :param service_name: name of logical service ("cephfs", "us-east", ...) :param service_id: service daemon instance (usually a short hostname) :rtype: WriteCompletion """ assert action in ["start", "stop", "reload"] assert service_name or service_id assert not (service_name and service_id) raise NotImplementedError() def create_osds(self, drive_group, all_hosts): # type: (DriveGroupSpec, List[str]) -> WriteCompletion """ Create one or more OSDs within a single Drive Group. The principal argument here is the drive_group member of OsdSpec: other fields are advisory/extensible for any finer-grained OSD feature enablement (choice of backing store, compression/encryption, etc). :param drive_group: DriveGroupSpec :param all_hosts: TODO, this is required because the orchestrator methods are not composable Probably this parameter can be easily removed because each orchestrator can use the "get_inventory" method and the "drive_group.host_pattern" attribute to obtain the list of hosts where to apply the operation """ raise NotImplementedError() def replace_osds(self, drive_group): # type: (DriveGroupSpec) -> WriteCompletion """ Like create_osds, but the osd_id_claims must be fully populated. """ raise NotImplementedError() def remove_osds(self, osd_ids): # type: (List[str]) -> WriteCompletion """ :param osd_ids: list of OSD IDs Note that this can only remove OSDs that were successfully created (i.e. got an OSD ID). """ raise NotImplementedError() def update_mgrs(self, num, hosts): # type: (int, List[str]) -> WriteCompletion """ Update the number of cluster managers. :param num: requested number of managers. :param hosts: list of hosts (optional) """ raise NotImplementedError() def update_mons(self, num, hosts): # type: (int, List[Tuple[str,str]]) -> WriteCompletion """ Update the number of cluster monitors. :param num: requested number of monitors. :param hosts: list of hosts + network (optional) """ raise NotImplementedError() def add_stateless_service(self, service_type, spec): # type: (str, StatelessServiceSpec) -> WriteCompletion """ Installing and adding a completely new service to the cluster. This is not about starting services. """ raise NotImplementedError() def update_stateless_service(self, service_type, spec): # type: (str, StatelessServiceSpec) -> WriteCompletion """ This is about changing / redeploying existing services. Like for example changing the number of service instances. :rtype: WriteCompletion """ raise NotImplementedError() def remove_stateless_service(self, service_type, id_): # type: (str, str) -> WriteCompletion """ Uninstalls an existing service from the cluster. This is not about stopping services. """ raise NotImplementedError() def upgrade_start(self, upgrade_spec): # type: (UpgradeSpec) -> WriteCompletion raise NotImplementedError() def upgrade_status(self): # type: () -> ReadCompletion[UpgradeStatusSpec] """ If an upgrade is currently underway, report on where we are in the process, or if some error has occurred. :return: UpgradeStatusSpec instance """ raise NotImplementedError() def upgrade_available(self): # type: () -> ReadCompletion[List[str]] """ Report on what versions are available to upgrade to :return: List of strings """ raise NotImplementedError() class UpgradeSpec(object): # Request to orchestrator to initiate an upgrade to a particular # version of Ceph def __init__(self): self.target_version = None class UpgradeStatusSpec(object): # Orchestrator's report on what's going on with any ongoing upgrade def __init__(self): self.in_progress = False # Is an upgrade underway? self.services_complete = [] # Which daemon types are fully updated? self.message = "" # Freeform description class PlacementSpec(object): """ For APIs that need to specify a node subset """ def __init__(self): self.label = None class ServiceDescription(object): """ For responding to queries about the status of a particular service, stateful or stateless. This is not about health or performance monitoring of services: it's about letting the orchestrator tell Ceph whether and where a service is scheduled in the cluster. When an orchestrator tells Ceph "it's running on node123", that's not a promise that the process is literally up this second, it's a description of where the orchestrator has decided the service should run. """ def __init__(self, nodename=None, container_id=None, service=None, service_instance=None, service_type=None, version=None, rados_config_location=None, service_url=None, status=None, status_desc=None): # Node is at the same granularity as InventoryNode self.nodename = nodename # Not everyone runs in containers, but enough people do to # justify having this field here. self.container_id = container_id # Some services can be deployed in groups. For example, mds's can # have an active and standby daemons, and nfs-ganesha can run daemons # in parallel. This tag refers to a group of daemons as a whole. # # For instance, a cluster of mds' all service the same fs, and they # will all have the same service value (which may be the # Filesystem name in the FSMap). # # Single-instance services should leave this set to None self.service = service # The orchestrator will have picked some names for daemons, # typically either based on hostnames or on pod names. # This is the in mds., the ID that will appear # in the FSMap/ServiceMap. self.service_instance = service_instance # The type of service (osd, mon, mgr, etc.) self.service_type = service_type # Service version that was deployed self.version = version # Location of the service configuration when stored in rados # object. Format: "rados:///[]" self.rados_config_location = rados_config_location # If the service exposes REST-like API, this attribute should hold # the URL. self.service_url = service_url # Service status: -1 error, 0 stopped, 1 running self.status = status # Service status description when status == -1. self.status_desc = status_desc def to_json(self): out = { 'nodename': self.nodename, 'container_id': self.container_id, 'service': self.service, 'service_instance': self.service_instance, 'service_type': self.service_type, 'version': self.version, 'rados_config_location': self.rados_config_location, 'service_url': self.service_url, 'status': self.status, 'status_desc': self.status_desc, } return {k: v for (k, v) in out.items() if v is not None} @classmethod def from_json(cls, data): return cls(**data) class DeviceSelection(object): """ Used within :class:`myclass.DriveGroupSpec` to specify the devices used by the Drive Group. Any attributes (even none) can be included in the device specification structure. """ def __init__(self, paths=None, id_model=None, size=None, rotates=None, count=None): # type: (List[str], str, str, bool, int) -> None """ ephemeral drive group device specification TODO: translate from the user interface (Drive Groups) to an actual list of devices. """ if paths is None: paths = [] #: List of absolute paths to the devices. self.paths = paths # type: List[str] #: A wildcard string. e.g: "SDD*" self.id_model = id_model #: Size specification of format LOW:HIGH. #: Can also take the the form :HIGH, LOW: #: or an exact value (as ceph-volume inventory reports) self.size = size #: is the drive rotating or not self.rotates = rotates #: if this is present limit the number of drives to this number. self.count = count self.validate() def validate(self): props = [self.id_model, self.size, self.rotates, self.count] if self.paths and any(p is not None for p in props): raise DriveGroupValidationError('DeviceSelection: `paths` and other parameters are mutually exclusive') if not any(p is not None for p in [self.paths] + props): raise DriveGroupValidationError('DeviceSelection cannot be empty') @classmethod def from_json(cls, device_spec): return cls(**device_spec) class DriveGroupValidationError(Exception): """ Defining an exception here is a bit problematic, cause you cannot properly catch it, if it was raised in a different mgr module. """ def __init__(self, msg): super(DriveGroupValidationError, self).__init__('Failed to validate Drive Group: ' + msg) class DriveGroupSpec(object): """ Describe a drive group in the same form that ceph-volume understands. """ def __init__(self, host_pattern, data_devices=None, db_devices=None, wal_devices=None, journal_devices=None, data_directories=None, osds_per_device=None, objectstore='bluestore', encrypted=False, db_slots=None, wal_slots=None): # type: (str, Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[DeviceSelection], Optional[List[str]], int, str, bool, int, int) -> None # concept of applying a drive group to a (set) of hosts is tightly # linked to the drive group itself # #: An fnmatch pattern to select hosts. Can also be a single host. self.host_pattern = host_pattern #: A :class:`orchestrator.DeviceSelection` self.data_devices = data_devices #: A :class:`orchestrator.DeviceSelection` self.db_devices = db_devices #: A :class:`orchestrator.DeviceSelection` self.wal_devices = wal_devices #: A :class:`orchestrator.DeviceSelection` self.journal_devices = journal_devices #: Number of osd daemons per "DATA" device. #: To fully utilize nvme devices multiple osds are required. self.osds_per_device = osds_per_device #: A list of strings, containing paths which should back OSDs self.data_directories = data_directories #: ``filestore`` or ``bluestore`` self.objectstore = objectstore #: ``true`` or ``false`` self.encrypted = encrypted #: How many OSDs per DB device self.db_slots = db_slots #: How many OSDs per WAL device self.wal_slots = wal_slots # FIXME: needs ceph-volume support #: Optional: mapping of drive to OSD ID, used when the #: created OSDs are meant to replace previous OSDs on #: the same node. self.osd_id_claims = {} @classmethod def from_json(self, json_drive_group): """ Initialize 'Drive group' structure :param json_drive_group: A valid json string with a Drive Group specification """ args = {k: (DeviceSelection.from_json(v) if k.endswith('_devices') else v) for k, v in json_drive_group.items()} return DriveGroupSpec(**args) def hosts(self, all_hosts): return fnmatch.filter(all_hosts, self.host_pattern) def validate(self, all_hosts): if not isinstance(self.host_pattern, six.string_types): raise DriveGroupValidationError('host_pattern must be of type string') specs = [self.data_devices, self.db_devices, self.wal_devices, self.journal_devices] for s in filter(None, specs): s.validate() if self.objectstore not in ('filestore', 'bluestore'): raise DriveGroupValidationError("objectstore not in ('filestore', 'bluestore')") if not self.hosts(all_hosts): raise DriveGroupValidationError( "host_pattern '{}' does not match any hosts".format(self.host_pattern)) class StatelessServiceSpec(object): # Request to orchestrator for a group of stateless services # such as MDS, RGW, nfs gateway, iscsi gateway """ Details of stateless service creation. This is *not* supposed to contain all the configuration of the services: it's just supposed to be enough information to execute the binaries. """ def __init__(self): self.placement = PlacementSpec() # Give this set of statelss services a name: typically it would # be the name of a CephFS filesystem, RGW zone, etc. Must be unique # within one ceph cluster. self.name = "" # Count of service instances self.count = 1 # Arbitrary JSON-serializable object. # Maybe you're using e.g. kubenetes and you want to pass through # some replicaset special sauce for autoscaling? self.extended = {} class InventoryFilter(object): """ When fetching inventory, use this filter to avoid unnecessarily scanning the whole estate. Typical use: filter by node when presenting UI workflow for configuring a particular server. filter by label when not all of estate is Ceph servers, and we want to only learn about the Ceph servers. filter by label when we are interested particularly in e.g. OSD servers. """ def __init__(self, labels=None, nodes=None): # type: (List[str], List[str]) -> None self.labels = labels # Optional: get info about nodes matching labels self.nodes = nodes # Optional: get info about certain named nodes only class InventoryDevice(object): """ When fetching inventory, block devices are reported in this format. Note on device identifiers: the format of this is up to the orchestrator, but the same identifier must also work when passed into StatefulServiceSpec. The identifier should be something meaningful like a device WWID or stable device node path -- not something made up by the orchestrator. "Extended" is for reporting any special configuration that may have already been done out of band on the block device. For example, if the device has already been configured for encryption, report that here so that it can be indicated to the user. The set of extended properties may differ between orchestrators. An orchestrator is permitted to support no extended properties (only normal block devices) """ def __init__(self, blank=False, type=None, id=None, size=None, rotates=False, available=False, dev_id=None, extended=None, metadata_space_free=None): # type: (bool, str, str, int, bool, bool, str, dict, bool) -> None self.blank = blank #: 'ssd', 'hdd', 'nvme' self.type = type #: unique within a node (or globally if you like). self.id = id #: byte integer. self.size = size #: indicates if it is a spinning disk self.rotates = rotates #: can be used to create a new OSD? self.available = available #: vendor/model self.dev_id = dev_id #: arbitrary JSON-serializable object self.extended = extended if extended is not None else extended # If this drive is not empty, but is suitable for appending # additional journals, wals, or bluestore dbs, then report # how much space is available. self.metadata_space_free = metadata_space_free def to_json(self): return dict(type=self.type, blank=self.blank, id=self.id, size=self.size, rotates=self.rotates, available=self.available, dev_id=self.dev_id, extended=self.extended) @classmethod def from_ceph_volume_inventory(cls, data): # TODO: change InventoryDevice itself to mirror c-v inventory closely! dev = InventoryDevice() dev.id = data["path"] dev.type = 'hdd' if data["sys_api"]["rotational"] == "1" else 'sdd/nvme' dev.size = data["sys_api"]["size"] dev.rotates = data["sys_api"]["rotational"] == "1" dev.available = data["available"] dev.dev_id = "%s/%s" % (data["sys_api"]["vendor"], data["sys_api"]["model"]) dev.extended = data return dev @classmethod def from_ceph_volume_inventory_list(cls, datas): return [cls.from_ceph_volume_inventory(d) for d in datas] def pretty_print(self, only_header=False): """Print a human friendly line with the information of the device :param only_header: Print only the name of the device attributes Ex:: Device Path Type Size Rotates Available Model /dev/sdc hdd 50.00 GB True True ATA/QEMU """ row_format = " {0:<15} {1:>10} {2:>10} {3:>10} {4:>10} {5:<15}\n" if only_header: return row_format.format("Device Path", "Type", "Size", "Rotates", "Available", "Model") else: return row_format.format(str(self.id), self.type if self.type is not None else "", format_bytes(self.size if self.size is not None else 0, 5, colored=False), str(self.rotates), str(self.available), self.dev_id if self.dev_id is not None else "") class InventoryNode(object): """ When fetching inventory, all Devices are groups inside of an InventoryNode. """ def __init__(self, name, devices): # type: (str, List[InventoryDevice]) -> None assert isinstance(devices, list) self.name = name # unique within cluster. For example a hostname. self.devices = devices def to_json(self): return {'name': self.name, 'devices': [d.to_json() for d in self.devices]} @classmethod def from_nested_items(cls, hosts): devs = InventoryDevice.from_ceph_volume_inventory_list return [cls(item[0], devs(item[1].data)) for item in hosts] def _mk_orch_methods(cls): # Needs to be defined outside of for. # Otherwise meth is always bound to last key def shim(method_name): def inner(self, *args, **kwargs): completion = self._oremote(method_name, args, kwargs) self._update_completion_progress(completion, 0) return completion return inner for meth in Orchestrator.__dict__: if not meth.startswith('_') and meth not in ['is_orchestrator_module']: setattr(cls, meth, shim(meth)) return cls @_mk_orch_methods class OrchestratorClientMixin(Orchestrator): """ A module that inherents from `OrchestratorClientMixin` can directly call all :class:`Orchestrator` methods without manually calling remote. Every interface method from ``Orchestrator`` is converted into a stub method that internally calls :func:`OrchestratorClientMixin._oremote` >>> class MyModule(OrchestratorClientMixin): ... def func(self): ... completion = self.add_host('somehost') # calls `_oremote()` ... self._orchestrator_wait([completion]) ... self.log.debug(completion.result) """ def set_mgr(self, mgr): # type: (MgrModule) -> None """ Useable in the Dashbord that uses a global ``mgr`` """ self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties def _oremote(self, meth, args, kwargs): """ Helper for invoking `remote` on whichever orchestrator is enabled :raises RuntimeError: If the remote method failed. :raises OrchestratorError: orchestrator failed to perform :raises ImportError: no `orchestrator_cli` module or backend not found. """ try: mgr = self.__mgr except AttributeError: mgr = self try: o = mgr._select_orchestrator() except AttributeError: o = mgr.remote('orchestrator_cli', '_select_orchestrator') if o is None: raise NoOrchestrator() mgr.log.debug("_oremote {} -> {}.{}(*{}, **{})".format(mgr.module_name, o, meth, args, kwargs)) return mgr.remote(o, meth, *args, **kwargs) def _update_completion_progress(self, completion, force_progress=None): # type: (WriteCompletion, Optional[float]) -> None try: progress = force_progress if force_progress is not None else completion.progress if completion.is_complete: self.remote("progress", "complete", completion.progress_id) else: self.remote("progress", "update", completion.progress_id, str(completion), progress, ["orchestrator"]) except AttributeError: # No WriteCompletion. Ignore. pass except ImportError: # If the progress module is disabled that's fine, # they just won't see the output. pass def _orchestrator_wait(self, completions): # type: (List[_Completion]) -> None """ Wait for completions to complete (reads) or become persistent (writes). Waits for writes to be *persistent* but not *effective*. :param completions: List of Completions :raises NoOrchestrator: :raises ImportError: no `orchestrator_cli` module or backend not found. """ for c in completions: self._update_completion_progress(c) while not self.wait(completions): if any(c.should_wait for c in completions): time.sleep(5) else: break for c in completions: self._update_completion_progress(c) class OutdatableData(object): DATEFMT = '%Y-%m-%d %H:%M:%S.%f' def __init__(self, data=None, last_refresh=None): # type: (Optional[dict], Optional[datetime.datetime]) -> None self._data = data if data is not None and last_refresh is None: self.last_refresh = datetime.datetime.utcnow() else: self.last_refresh = last_refresh def json(self): if self.last_refresh is not None: timestr = self.last_refresh.strftime(self.DATEFMT) else: timestr = None return { "data": self._data, "last_refresh": timestr, } @property def data(self): return self._data # @data.setter # No setter, as it doesn't work as expected: It's not saved in store automatically @classmethod def time_from_string(cls, timestr): if timestr is None: return None # drop the 'Z' timezone indication, it's always UTC timestr = timestr.rstrip('Z') return datetime.datetime.strptime(timestr, cls.DATEFMT) @classmethod def from_json(cls, data): return cls(data['data'], cls.time_from_string(data['last_refresh'])) def outdated(self, timeout_min=None): if timeout_min is None: timeout_min = 10 if self.last_refresh is None: return True cutoff = datetime.datetime.utcnow() - datetime.timedelta( minutes=timeout_min) return self.last_refresh < cutoff def __repr__(self): return 'OutdatableData(data={}, last_refresh={})'.format(self._data, self.last_refresh) class OutdatableDictMixin(object): """ Toolbox for implementing a cache. As every orchestrator has different needs, we cannot implement any logic here. """ def __getitem__(self, item): # type: (str) -> OutdatableData return OutdatableData.from_json(super(OutdatableDictMixin, self).__getitem__(item)) def __setitem__(self, key, value): # type: (str, OutdatableData) -> None val = None if value is None else value.json() super(OutdatableDictMixin, self).__setitem__(key, val) def items(self): # type: () -> Iterator[Tuple[str, OutdatableData]] for item in super(OutdatableDictMixin, self).items(): k, v = item yield k, OutdatableData.from_json(v) def items_filtered(self, keys=None): if keys: return [(host, self[host]) for host in keys] else: return list(self.items()) def any_outdated(self, timeout=None): items = self.items() if not list(items): return True return any([i[1].outdated(timeout) for i in items]) def remove_outdated(self): outdated = [item[0] for item in self.items() if item[1].outdated()] for o in outdated: del self[o] class OutdatablePersistentDict(OutdatableDictMixin, PersistentStoreDict): pass class OutdatableDict(OutdatableDictMixin, dict): pass