diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/deepsea/module.py | 527 |
1 files changed, 527 insertions, 0 deletions
diff --git a/src/pybind/mgr/deepsea/module.py b/src/pybind/mgr/deepsea/module.py new file mode 100644 index 00000000..734a457d --- /dev/null +++ b/src/pybind/mgr/deepsea/module.py @@ -0,0 +1,527 @@ +# vim: ts=8 et sw=4 sts=4 +""" +ceph-mgr DeepSea orchestrator module +""" + +# We want orchestrator methods in this to be 1:1 mappings to DeepSea runners, +# we don't want to aggregate multiple salt invocations here, because that means +# this module would need to know too much about how DeepSea works internally. +# Better to expose new runners from DeepSea to match what the orchestrator needs. + +import json +import errno +import requests + +from threading import Event, Thread, Lock + +from mgr_module import MgrModule +import orchestrator + + +class RequestException(Exception): + def __init__(self, message, status_code=None): + super(RequestException, self).__init__(message) + self.status_code = status_code + + +class DeepSeaReadCompletion(orchestrator.ReadCompletion): + def __init__(self, process_result_callback): + super(DeepSeaReadCompletion, self).__init__() + self._complete = False + self._cb = process_result_callback + + def _process_result(self, data): + self._result = self._cb(data) + self._complete = True + + @property + def result(self): + return self._result + + @property + def is_complete(self): + return self._complete + + +class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator): + MODULE_OPTIONS = [ + { + 'name': 'salt_api_url', + 'default': '' + }, + { + 'name': 'salt_api_eauth', + 'default': 'sharedsecret' + }, + { + 'name': 'salt_api_username', + 'default': '' + }, + { + 'name': 'salt_api_password', + 'default': '' + } + ] + + + COMMANDS = [ + { + "cmd": "deepsea config-set name=key,type=CephString " + "name=value,type=CephString", + "desc": "Set a configuration value", + "perm": "rw" + }, + { + "cmd": "deepsea config-show", + "desc": "Show current configuration", + "perm": "r" + } + ] + + + @property + def config_keys(self): + return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS) + + + def get_module_option(self, key, default=None): + """ + Overrides the default MgrModule get_module_option() method to pull in defaults + specific to this module + """ + return super(DeepSeaOrchestrator, self).get_module_option(key, default=self.config_keys[key]) + + + def _config_valid(self): + for key in self.config_keys.keys(): + if not self.get_module_option(key, self.config_keys[key]): + return False + return True + + + def __init__(self, *args, **kwargs): + super(DeepSeaOrchestrator, self).__init__(*args, **kwargs) + self._event = Event() + self._token = None + self._event_reader = None + self._reading_events = False + self._last_failure_msg = None + self._all_completions = dict() + self._completion_lock = Lock() + self.inventory_cache = orchestrator.OutdatableDict() + self.service_cache = orchestrator.OutdatableDict() + + def available(self): + if not self._config_valid(): + return False, "Configuration invalid; try `ceph deepsea config-set [...]`" + + if not self._reading_events and self._last_failure_msg: + return False, self._last_failure_msg + + return True, "" + + def get_inventory(self, node_filter=None, refresh=False): + """ + Note that this will raise an exception (e.g. if the salt-api is down, + or the username/password is incorret). Same for other methods. + Callers should expect this and react appropriately. The orchestrator + cli, for example, just prints the traceback in the console, so the + user at least sees the error. + """ + self.inventory_cache.remove_outdated() + if not self.inventory_cache.any_outdated() and not refresh: + if node_filter is None: + return orchestrator.TrivialReadCompletion( + orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items())) + elif node_filter.labels is None: + try: + return orchestrator.TrivialReadCompletion( + orchestrator.InventoryNode.from_nested_items( + self.inventory_cache.items_filtered(node_filter.nodes))) + except KeyError: + # items_filtered() will raise KeyError if passed a node name that doesn't exist + return orchestrator.TrivialReadCompletion([]) + + def process_result(event_data): + result = [] + if event_data['success']: + for node_name, node_devs in event_data["return"].items(): + if node_filter is None: + # The cache will only be populated when this function is invoked + # without a node filter, i.e. if you run it once for the whole + # cluster, you can then call it for individual nodes and return + # cached data. However, if you only *ever* call it for individual + # nodes, the cache will never be populated, and you'll always have + # the full round trip to DeepSea. + self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs) + devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs) + result.append(orchestrator.InventoryNode(node_name, devs)) + else: + self.log.error(event_data['return']) + return result + + with self._completion_lock: + c = DeepSeaReadCompletion(process_result) + + nodes = [] + roles = [] + if node_filter: + nodes = node_filter.nodes + roles = node_filter.labels + + resp = self._do_request_with_login("POST", data = { + "client": "runner_async", + "fun": "mgr_orch.get_inventory", + "nodes": nodes, + "roles": roles + }) + + # ['return'][0]['tag'] in the resonse JSON is what we need to match + # on when looking for the result event (e.g.: "salt/run/20181018074024331230") + self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c + + return c + + def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False): + + # Note: describe_service() does *not* support OSDs. This is because + # DeepSea doesn't really record what OSDs are deployed where; Ceph is + # considered the canonical source of this information, so having this + # function query OSD information from DeepSea doesn't make a lot of + # sense (DeepSea would have to call back into Ceph). + + assert service_type in ("mon", "mgr", "mds", "rgw", "nfs", "iscsi", None), service_type + " unsupported" + + def _deepsea_to_ceph(service): + if service == "ganesha": + return "nfs" + elif service == "igw": + return "iscsi" + else: + return service + + # presently unused + def _ceph_to_deepsea(service): + if service == "nfs": + return "ganesha" + elif service == "iscsi": + return "igw" + else: + return service + + self.service_cache.remove_outdated() + if not self.service_cache.any_outdated() and not refresh: + # Let's hope the services are complete. + try: + node_filter = [node_name] if node_name else None + services_by_node = [d[1].data for d in self.service_cache.items_filtered(node_filter)] + services = [orchestrator.ServiceDescription.from_json(s) for services in services_by_node for s in services] + services = [s for s in services if + (True if service_type is None else s.service_type == service_type) and + (True if service_id is None else s.service_instance == service_id)] + return orchestrator.TrivialReadCompletion(services) + except KeyError: + # items_filtered() will raise KeyError if passed a node name that doesn't exist + return orchestrator.TrivialReadCompletion([]) + + def process_result(event_data): + result = [] + if event_data['success']: + for service_node, service_info in event_data["return"].items(): + node_service_cache = [] + for this_service_type, service_dict in service_info.items(): + if isinstance(service_dict, str): + # map old form where deepsea only returned service IDs + # to new form where it retuns a dict + service_dict = { 'service_instance': service_dict } + desc = orchestrator.ServiceDescription(nodename=service_node, + service_instance=service_dict['service_instance'], + service_type=_deepsea_to_ceph(this_service_type), + # the following may or may not be present + container_id=service_dict.get('container_id', None), + service=service_dict.get('service', None), + version=service_dict.get('version', None), + rados_config_location=service_dict.get('rados_config_location', None), + service_url = service_dict.get('service_url', None), + status=service_dict.get('status', None), + status_desc=service_dict.get('status_desc', None) + ) + # Always add every service to the cache... + node_service_cache.append(desc.to_json()) + # ...but only return the ones the caller asked for + if ((service_type is None or desc.service_type == service_type) and + (service_id is None or desc.service_instance == service_id) and + (node_name is None or desc.nodename == node_name)): + result.append(desc) + + self.service_cache[service_node] = orchestrator.OutdatableData(node_service_cache) + else: + self.log.error(event_data['return']) + return result + + with self._completion_lock: + c = DeepSeaReadCompletion(process_result) + + # Always request all services, so we always have all services cached. + resp = self._do_request_with_login("POST", data = { + "client": "runner_async", + "fun": "mgr_orch.describe_service" + }) + self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c + + return c + + def wait(self, completions): + incomplete = False + + with self._completion_lock: + for c in completions: + if c.is_complete: + continue + if not c.is_complete: + # TODO: the job is in the bus, it should reach us eventually + # unless something has gone wrong (e.g. salt-api died, etc.), + # in which case it's possible the job finished but we never + # noticed the salt/run/$id/ret event. Need to add the job ID + # (or possibly the full event tag) to the completion object. + # That way, if we want to double check on a job that hasn't + # been completed yet, we can make a synchronous request to + # salt-api to invoke jobs.lookup_jid, and if it's complete we + # should be able to pass its return value to _process_result() + # Question: do we do this automatically after some timeout? + # Or do we add a function so the admin can check and "unstick" + # a stuck completion? + incomplete = True + + return not incomplete + + + def handle_command(self, inbuf, cmd): + if cmd['prefix'] == 'deepsea config-show': + return 0, json.dumps(dict([(key, self.get_module_option(key)) for key in self.config_keys.keys()])), '' + + elif cmd['prefix'] == 'deepsea config-set': + if cmd['key'] not in self.config_keys.keys(): + return (-errno.EINVAL, '', + "Unknown configuration option '{0}'".format(cmd['key'])) + + self.set_module_option(cmd['key'], cmd['value']) + self._event.set() + return 0, "Configuration option '{0}' updated".format(cmd['key']), '' + + return (-errno.EINVAL, '', + "Command not found '{0}'".format(cmd['prefix'])) + + + def serve(self): + self.log.info('DeepSea module starting up') + self.run = True + while self.run: + if not self._config_valid(): + # This will spin until the config is valid, spitting a warning + # that the config is invalid every 60 seconds. The one oddity + # is that while setting the various parameters, this log warning + # will print once for each parameter set until the config is valid. + self.log.warn("Configuration invalid; try `ceph deepsea config-set [...]`") + self._event.wait(60) + self._event.clear() + continue + + if self._event_reader and not self._reading_events: + self._event_reader = None + + if not self._event_reader: + self._last_failure_msg = None + try: + # This spawns a separate thread to read the salt event bus + # stream. We can't do it in the serve thead, because reading + # from the response blocks, which would prevent the serve + # thread from handling anything else. + # + # TODO: figure out how to restart the _event_reader thread if + # config changes, e.g.: a new username or password is set. + # This will be difficult, because _read_sse() just blocks waiting + # for response lines. The closest I got was setting a read timeout + # on the request, but in the general case (where not much is + # happening most of the time), this will result in continual + # timeouts and reconnects. We really need an asynchronous read + # to support this. + self._event_response = self._do_request_with_login("GET", "events", stream=True) + self._event_reader = Thread(target=self._read_sse) + self._reading_events = True + self._event_reader.start() + except Exception as ex: + self._set_last_failure_msg("Failure setting up event reader: " + str(ex)) + # gives an (arbitrary) 60 second retry if we can't attach to + # the salt-api event bus for some reason (e.g.: invalid username, + # or password, which will be logged as "Request failed with status + # code 401"). Note that this 60 second retry will also happen if + # salt-api dies. + self._event.wait(60) + self._event.clear() + continue + + # Wait indefinitely for something interesting to happen (e.g. + # config-set, or shutdown), or the event reader to fail, which + # will happen if the salt-api server dies or restarts). + self._event.wait() + self._event.clear() + + + def shutdown(self): + self.log.info('DeepSea module shutting down') + self.run = False + self._event.set() + + + def _set_last_failure_msg(self, msg): + self._last_failure_msg = msg + self.log.warn(msg) + + + # Reader/parser of SSE events, see: + # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events) + # - https://www.w3.org/TR/2009/WD-eventsource-20090421/ + # Note: this is pretty braindead and doesn't implement the full eventsource + # spec, but it *does* implement enough for us to listen to events from salt + # and potentially do something with them. + def _read_sse(self): + event = {} + try: + # Just starting the event reader; if we've made it here, we know we're + # talking to salt-api (_do_request would have raised an exception if the + # response wasn't ok), so check if there's any completions inflight that + # need to be dealt with. This handles the case where some command was + # invoked, then salt-api died somehow, and we reconneced, but missed the + # completion at the time it actually happened. + for tag in list(self._all_completions): + self.log.info("Found event {} inflight".format(tag)) + try: + resp = self._do_request_with_login("POST", data = { + "client": "runner", + "fun": "jobs.lookup_jid", + "jid": tag.split('/')[2] + }) + # jobs.lookup_jid returns a dict keyed by hostname. + return_dict = resp.json()['return'][0] + if return_dict: + # If the job is complete, there'll be one item in the dict. + self.log.info("Event {} complete".format(tag)) + # The key is the salt master hostname, but we don't care + # about that, so just grab the data. + data = next(iter(return_dict.items()))[1] + self._all_completions[tag]._process_result(data) + # TODO: decide whether it's bad to delete the completion + # here -- would we ever need to resurrect it? + del self._all_completions[tag] + else: + # if the job is not complete, there'll be nothing in the dict + self.log.info("Event {} still pending".format(tag)) + except Exception as ex: + # Logging a warning if the request failed, so we can continue + # checking any other completions, then get onto reading events + self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex))) + + for line in self._event_response.iter_lines(): + with self._completion_lock: + if line: + line = line.decode('utf-8') + colon = line.find(':') + if colon > 0: + k = line[:colon] + v = line[colon+2:] + if k == "retry": + # TODO: find out if we need to obey this reconnection time + self.log.warn("Server requested retry {}, ignored".format(v)) + else: + event[k] = v + else: + # Empty line, terminates an event. Note that event['tag'] + # is a salt-api extension to SSE to avoid having to decode + # json data if you don't care about it. To get to the + # interesting stuff, you want event['data'], which is json. + # If you want to have some fun, try + # `ceph daemon mgr.$(hostname) config set debug_mgr 20` + # then `salt '*' test.ping` on the master + self.log.debug("Got event '{}'".format(str(event))) + + # If we're actually interested in this event (i.e. it's + # in our completion dict), fire off that completion's + # _process_result() callback and remove it from our list. + if event['tag'] in self._all_completions: + self.log.info("Event {} complete".format(event['tag'])) + self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data']) + # TODO: decide whether it's bad to delete the completion + # here -- would we ever need to resurrect it? + del self._all_completions[event['tag']] + + event = {} + self._set_last_failure_msg("SSE read terminated") + except Exception as ex: + self.log.exception(ex) + self._set_last_failure_msg("SSE read failed: {}".format(str(ex))) + + self._reading_events = False + self._event.set() + + + # _do_request(), _login() and _do_request_with_login() are an extremely + # minimalist form of the following, with notably terse error handling: + # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default + # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default + # rationale: + # - I needed slightly different behaviour than in openATTIC (I want the + # caller to read the response, to allow streaming the salt-api event bus) + # - I didn't want to pull in 400+ lines more code into this presently + # experimental module, to save everyone having to review it ;-) + + def _do_request(self, method, path="", data=None, stream=False): + """ + returns the response, which the caller then has to read + """ + url = "{0}/{1}".format(self.get_module_option('salt_api_url'), path) + try: + if method.lower() == 'get': + resp = requests.get(url, headers = { "X-Auth-Token": self._token }, + data=data, stream=stream) + elif method.lower() == 'post': + resp = requests.post(url, headers = { "X-Auth-Token": self._token }, + data=data) + + else: + raise RequestException("Method '{}' not supported".format(method.upper())) + if resp.ok: + return resp + else: + msg = "Request failed with status code {}".format(resp.status_code) + raise RequestException(msg, resp.status_code) + except requests.exceptions.ConnectionError as ex: + self.log.exception(str(ex)) + raise RequestException(str(ex)) + except requests.exceptions.InvalidURL as ex: + self.log.exception(str(ex)) + raise RequestException(str(ex)) + + + def _login(self): + resp = self._do_request('POST', 'login', data = { + "eauth": self.get_module_option('salt_api_eauth'), + "password": self.get_module_option('salt_api_password'), + "username": self.get_module_option('salt_api_username') + }) + self._token = resp.json()['return'][0]['token'] + self.log.info("Salt API login successful") + + + def _do_request_with_login(self, method, path="", data=None, stream=False): + retries = 2 + while True: + try: + if not self._token: + self._login() + return self._do_request(method, path, data, stream) + except RequestException as ex: + retries -= 1 + if ex.status_code not in [401, 403] or retries == 0: + raise ex + self._token = None |