+# vim: ts=8 et sw=4 sts=4
+ceph-mgr DeepSea orchestrator module
+# We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
+# we don't want to aggregate multiple salt invocations here, because that means
+# this module would need to know too much about how DeepSea works internally.
+# Better to expose new runners from DeepSea to match what the orchestrator needs.
+import json
+import errno
+import requests
+from threading import Event, Thread, Lock
+from mgr_module import MgrModule
+import orchestrator
+class RequestException(Exception):
+ def __init__(self, message, status_code=None):
+ super(RequestException, self).__init__(message)
+ self.status_code = status_code
+class DeepSeaReadCompletion(orchestrator.ReadCompletion):
+ def __init__(self, process_result_callback):
+ super(DeepSeaReadCompletion, self).__init__()
+ self._complete = False
+ self._cb = process_result_callback
+ def _process_result(self, data):
+ self._result = self._cb(data)
+ self._complete = True
+ @property
+ def result(self):
+ return self._result
+ @property
+ def is_complete(self):
+ return self._complete
+class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
+ {
+ 'name': 'salt_api_url',
+ 'default': ''
+ },
+ {
+ 'name': 'salt_api_eauth',
+ 'default': 'sharedsecret'
+ },
+ {
+ 'name': 'salt_api_username',
+ 'default': ''
+ },
+ {
+ 'name': 'salt_api_password',
+ 'default': ''
+ }
+ ]
+ {
+ "cmd": "deepsea config-set name=key,type=CephString "
+ "name=value,type=CephString",
+ "desc": "Set a configuration value",
+ "perm": "rw"
+ },
+ {
+ "cmd": "deepsea config-show",
+ "desc": "Show current configuration",
+ "perm": "r"
+ }
+ ]
+ @property
+ def config_keys(self):
+ return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
+ def get_module_option(self, key, default=None):
+ """
+ Overrides the default MgrModule get_module_option() method to pull in defaults
+ specific to this module
+ """
+ return super(DeepSeaOrchestrator, self).get_module_option(key, default=self.config_keys[key])
+ def _config_valid(self):
+ for key in self.config_keys.keys():
+ if not self.get_module_option(key, self.config_keys[key]):
+ return False
+ return True
+ def __init__(self, *args, **kwargs):
+ super(DeepSeaOrchestrator, self).__init__(*args, **kwargs)
+ self._event = Event()
+ self._token = None
+ self._event_reader = None
+ self._reading_events = False
+ self._last_failure_msg = None
+ self._all_completions = dict()
+ self._completion_lock = Lock()
+ self.inventory_cache = orchestrator.OutdatableDict()
+ self.service_cache = orchestrator.OutdatableDict()
+ def available(self):
+ if not self._config_valid():
+ return False, "Configuration invalid; try `ceph deepsea config-set [...]`"
+ if not self._reading_events and self._last_failure_msg:
+ return False, self._last_failure_msg
+ return True, ""
+ def get_inventory(self, node_filter=None, refresh=False):
+ """
+ Note that this will raise an exception (e.g. if the salt-api is down,
+ or the username/password is incorret). Same for other methods.
+ Callers should expect this and react appropriately. The orchestrator
+ cli, for example, just prints the traceback in the console, so the
+ user at least sees the error.
+ """
+ self.inventory_cache.remove_outdated()
+ if not self.inventory_cache.any_outdated() and not refresh:
+ if node_filter is None:
+ return orchestrator.TrivialReadCompletion(
+ orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items()))
+ elif node_filter.labels is None:
+ try:
+ return orchestrator.TrivialReadCompletion(
+ orchestrator.InventoryNode.from_nested_items(
+ self.inventory_cache.items_filtered(node_filter.nodes)))
+ except KeyError:
+ # items_filtered() will raise KeyError if passed a node name that doesn't exist
+ return orchestrator.TrivialReadCompletion([])
+ def process_result(event_data):
+ result = []
+ if event_data['success']:
+ for node_name, node_devs in event_data["return"].items():
+ if node_filter is None:
+ # The cache will only be populated when this function is invoked
+ # without a node filter, i.e. if you run it once for the whole
+ # cluster, you can then call it for individual nodes and return
+ # cached data. However, if you only *ever* call it for individual
+ # nodes, the cache will never be populated, and you'll always have
+ # the full round trip to DeepSea.
+ self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs)
+ devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs)
+ result.append(orchestrator.InventoryNode(node_name, devs))
+ else:
+ self.log.error(event_data['return'])
+ return result
+ with self._completion_lock:
+ c = DeepSeaReadCompletion(process_result)
+ nodes = []
+ roles = []
+ if node_filter:
+ nodes = node_filter.nodes
+ roles = node_filter.labels
+ resp = self._do_request_with_login("POST", data = {
+ "client": "runner_async",
+ "fun": "mgr_orch.get_inventory",
+ "nodes": nodes,
+ "roles": roles
+ })
+ # ['return'][0]['tag'] in the resonse JSON is what we need to match
+ # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
+ self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
+ return c
+ def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
+ # Note: describe_service() does *not* support OSDs. This is because
+ # DeepSea doesn't really record what OSDs are deployed where; Ceph is
+ # considered the canonical source of this information, so having this
+ # function query OSD information from DeepSea doesn't make a lot of
+ # sense (DeepSea would have to call back into Ceph).
+ assert service_type in ("mon", "mgr", "mds", "rgw", "nfs", "iscsi", None), service_type + " unsupported"
+ def _deepsea_to_ceph(service):
+ if service == "ganesha":
+ return "nfs"
+ elif service == "igw":
+ return "iscsi"
+ else:
+ return service
+ # presently unused
+ def _ceph_to_deepsea(service):
+ if service == "nfs":
+ return "ganesha"
+ elif service == "iscsi":
+ return "igw"
+ else:
+ return service
+ self.service_cache.remove_outdated()
+ if not self.service_cache.any_outdated() and not refresh:
+ # Let's hope the services are complete.
+ try:
+ node_filter = [node_name] if node_name else None
+ services_by_node = [d[1].data for d in self.service_cache.items_filtered(node_filter)]
+ services = [orchestrator.ServiceDescription.from_json(s) for services in services_by_node for s in services]
+ services = [s for s in services if
+ (True if service_type is None else s.service_type == service_type) and
+ (True if service_id is None else s.service_instance == service_id)]
+ return orchestrator.TrivialReadCompletion(services)
+ except KeyError:
+ # items_filtered() will raise KeyError if passed a node name that doesn't exist
+ return orchestrator.TrivialReadCompletion([])
+ def process_result(event_data):
+ result = []
+ if event_data['success']:
+ for service_node, service_info in event_data["return"].items():
+ node_service_cache = []
+ for this_service_type, service_dict in service_info.items():
+ if isinstance(service_dict, str):
+ # map old form where deepsea only returned service IDs
+ # to new form where it retuns a dict
+ service_dict = { 'service_instance': service_dict }
+ desc = orchestrator.ServiceDescription(nodename=service_node,
+ service_instance=service_dict['service_instance'],
+ service_type=_deepsea_to_ceph(this_service_type),
+ # the following may or may not be present
+ container_id=service_dict.get('container_id', None),
+ service=service_dict.get('service', None),
+ version=service_dict.get('version', None),
+ rados_config_location=service_dict.get('rados_config_location', None),
+ service_url = service_dict.get('service_url', None),
+ status=service_dict.get('status', None),
+ status_desc=service_dict.get('status_desc', None)
+ )
+ # Always add every service to the cache...
+ node_service_cache.append(desc.to_json())
+ # ...but only return the ones the caller asked for
+ if ((service_type is None or desc.service_type == service_type) and
+ (service_id is None or desc.service_instance == service_id) and
+ (node_name is None or desc.nodename == node_name)):
+ result.append(desc)
+ self.service_cache[service_node] = orchestrator.OutdatableData(node_service_cache)
+ else:
+ self.log.error(event_data['return'])
+ return result
+ with self._completion_lock:
+ c = DeepSeaReadCompletion(process_result)
+ # Always request all services, so we always have all services cached.
+ resp = self._do_request_with_login("POST", data = {
+ "client": "runner_async",
+ "fun": "mgr_orch.describe_service"
+ })
+ self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
+ return c
+ def wait(self, completions):
+ incomplete = False
+ with self._completion_lock:
+ for c in completions:
+ if c.is_complete:
+ continue
+ if not c.is_complete:
+ # TODO: the job is in the bus, it should reach us eventually
+ # unless something has gone wrong (e.g. salt-api died, etc.),
+ # in which case it's possible the job finished but we never
+ # noticed the salt/run/$id/ret event. Need to add the job ID
+ # (or possibly the full event tag) to the completion object.
+ # That way, if we want to double check on a job that hasn't
+ # been completed yet, we can make a synchronous request to
+ # salt-api to invoke jobs.lookup_jid, and if it's complete we
+ # should be able to pass its return value to _process_result()
+ # Question: do we do this automatically after some timeout?
+ # Or do we add a function so the admin can check and "unstick"
+ # a stuck completion?
+ incomplete = True
+ return not incomplete
+ def handle_command(self, inbuf, cmd):
+ if cmd['prefix'] == 'deepsea config-show':
+ return 0, json.dumps(dict([(key, self.get_module_option(key)) for key in self.config_keys.keys()])), ''
+ elif cmd['prefix'] == 'deepsea config-set':
+ if cmd['key'] not in self.config_keys.keys():
+ return (-errno.EINVAL, '',
+ "Unknown configuration option '{0}'".format(cmd['key']))
+ self.set_module_option(cmd['key'], cmd['value'])
+ self._event.set()
+ return 0, "Configuration option '{0}' updated".format(cmd['key']), ''
+ return (-errno.EINVAL, '',
+ "Command not found '{0}'".format(cmd['prefix']))
+ def serve(self):
+'DeepSea module starting up')
+ = True
+ while
+ if not self._config_valid():
+ # This will spin until the config is valid, spitting a warning
+ # that the config is invalid every 60 seconds. The one oddity
+ # is that while setting the various parameters, this log warning
+ # will print once for each parameter set until the config is valid.
+ self.log.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
+ self._event.wait(60)
+ self._event.clear()
+ continue
+ if self._event_reader and not self._reading_events:
+ self._event_reader = None
+ if not self._event_reader:
+ self._last_failure_msg = None
+ try:
+ # This spawns a separate thread to read the salt event bus
+ # stream. We can't do it in the serve thead, because reading
+ # from the response blocks, which would prevent the serve
+ # thread from handling anything else.
+ #
+ # TODO: figure out how to restart the _event_reader thread if
+ # config changes, e.g.: a new username or password is set.
+ # This will be difficult, because _read_sse() just blocks waiting
+ # for response lines. The closest I got was setting a read timeout
+ # on the request, but in the general case (where not much is
+ # happening most of the time), this will result in continual
+ # timeouts and reconnects. We really need an asynchronous read
+ # to support this.
+ self._event_response = self._do_request_with_login("GET", "events", stream=True)
+ self._event_reader = Thread(target=self._read_sse)
+ self._reading_events = True
+ self._event_reader.start()
+ except Exception as ex:
+ self._set_last_failure_msg("Failure setting up event reader: " + str(ex))
+ # gives an (arbitrary) 60 second retry if we can't attach to
+ # the salt-api event bus for some reason (e.g.: invalid username,
+ # or password, which will be logged as "Request failed with status
+ # code 401"). Note that this 60 second retry will also happen if
+ # salt-api dies.
+ self._event.wait(60)
+ self._event.clear()
+ continue
+ # Wait indefinitely for something interesting to happen (e.g.
+ # config-set, or shutdown), or the event reader to fail, which
+ # will happen if the salt-api server dies or restarts).
+ self._event.wait()
+ self._event.clear()
+ def shutdown(self):
+'DeepSea module shutting down')
+ = False
+ self._event.set()
+ def _set_last_failure_msg(self, msg):
+ self._last_failure_msg = msg
+ self.log.warn(msg)
+ # Reader/parser of SSE events, see:
+ # -
+ # -
+ # Note: this is pretty braindead and doesn't implement the full eventsource
+ # spec, but it *does* implement enough for us to listen to events from salt
+ # and potentially do something with them.
+ def _read_sse(self):
+ event = {}
+ try:
+ # Just starting the event reader; if we've made it here, we know we're
+ # talking to salt-api (_do_request would have raised an exception if the
+ # response wasn't ok), so check if there's any completions inflight that
+ # need to be dealt with. This handles the case where some command was
+ # invoked, then salt-api died somehow, and we reconneced, but missed the
+ # completion at the time it actually happened.
+ for tag in list(self._all_completions):
+"Found event {} inflight".format(tag))
+ try:
+ resp = self._do_request_with_login("POST", data = {
+ "client": "runner",
+ "fun": "jobs.lookup_jid",
+ "jid": tag.split('/')[2]
+ })
+ # jobs.lookup_jid returns a dict keyed by hostname.
+ return_dict = resp.json()['return'][0]
+ if return_dict:
+ # If the job is complete, there'll be one item in the dict.
+"Event {} complete".format(tag))
+ # The key is the salt master hostname, but we don't care
+ # about that, so just grab the data.
+ data = next(iter(return_dict.items()))[1]
+ self._all_completions[tag]._process_result(data)
+ # TODO: decide whether it's bad to delete the completion
+ # here -- would we ever need to resurrect it?
+ del self._all_completions[tag]
+ else:
+ # if the job is not complete, there'll be nothing in the dict
+"Event {} still pending".format(tag))
+ except Exception as ex:
+ # Logging a warning if the request failed, so we can continue
+ # checking any other completions, then get onto reading events
+ self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
+ for line in self._event_response.iter_lines():
+ with self._completion_lock:
+ if line:
+ line = line.decode('utf-8')
+ colon = line.find(':')
+ if colon > 0:
+ k = line[:colon]
+ v = line[colon+2:]
+ if k == "retry":
+ # TODO: find out if we need to obey this reconnection time
+ self.log.warn("Server requested retry {}, ignored".format(v))
+ else:
+ event[k] = v
+ else:
+ # Empty line, terminates an event. Note that event['tag']
+ # is a salt-api extension to SSE to avoid having to decode
+ # json data if you don't care about it. To get to the
+ # interesting stuff, you want event['data'], which is json.
+ # If you want to have some fun, try
+ # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
+ # then `salt '*'` on the master
+ self.log.debug("Got event '{}'".format(str(event)))
+ # If we're actually interested in this event (i.e. it's
+ # in our completion dict), fire off that completion's
+ # _process_result() callback and remove it from our list.
+ if event['tag'] in self._all_completions:
+"Event {} complete".format(event['tag']))
+ self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
+ # TODO: decide whether it's bad to delete the completion
+ # here -- would we ever need to resurrect it?
+ del self._all_completions[event['tag']]
+ event = {}
+ self._set_last_failure_msg("SSE read terminated")
+ except Exception as ex:
+ self.log.exception(ex)
+ self._set_last_failure_msg("SSE read failed: {}".format(str(ex)))
+ self._reading_events = False
+ self._event.set()
+ # _do_request(), _login() and _do_request_with_login() are an extremely
+ # minimalist form of the following, with notably terse error handling:
+ #
+ #
+ # rationale:
+ # - I needed slightly different behaviour than in openATTIC (I want the
+ # caller to read the response, to allow streaming the salt-api event bus)
+ # - I didn't want to pull in 400+ lines more code into this presently
+ # experimental module, to save everyone having to review it ;-)
+ def _do_request(self, method, path="", data=None, stream=False):
+ """
+ returns the response, which the caller then has to read
+ """
+ url = "{0}/{1}".format(self.get_module_option('salt_api_url'), path)
+ try:
+ if method.lower() == 'get':
+ resp = requests.get(url, headers = { "X-Auth-Token": self._token },
+ data=data, stream=stream)
+ elif method.lower() == 'post':
+ resp =, headers = { "X-Auth-Token": self._token },
+ data=data)
+ else:
+ raise RequestException("Method '{}' not supported".format(method.upper()))
+ if resp.ok:
+ return resp
+ else:
+ msg = "Request failed with status code {}".format(resp.status_code)
+ raise RequestException(msg, resp.status_code)
+ except requests.exceptions.ConnectionError as ex:
+ self.log.exception(str(ex))
+ raise RequestException(str(ex))
+ except requests.exceptions.InvalidURL as ex:
+ self.log.exception(str(ex))
+ raise RequestException(str(ex))
+ def _login(self):
+ resp = self._do_request('POST', 'login', data = {
+ "eauth": self.get_module_option('salt_api_eauth'),
+ "password": self.get_module_option('salt_api_password'),
+ "username": self.get_module_option('salt_api_username')
+ })
+ self._token = resp.json()['return'][0]['token']
+"Salt API login successful")
+ def _do_request_with_login(self, method, path="", data=None, stream=False):
+ retries = 2
+ while True:
+ try:
+ if not self._token:
+ self._login()
+ return self._do_request(method, path, data, stream)
+ except RequestException as ex:
+ retries -= 1
+ if ex.status_code not in [401, 403] or retries == 0:
+ raise ex
+ self._token = None