summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/deepsea/module.py
blob: 734a457d8587e50e81b354c9edd8d545faee1dab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
# vim: ts=8 et sw=4 sts=4
"""
ceph-mgr DeepSea orchestrator module
"""

# We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
# we don't want to aggregate multiple salt invocations here, because that means
# this module would need to know too much about how DeepSea works internally.
# Better to expose new runners from DeepSea to match what the orchestrator needs.

import json
import errno
import requests

from threading import Event, Thread, Lock

from mgr_module import MgrModule
import orchestrator


class RequestException(Exception):
    def __init__(self, message, status_code=None):
        super(RequestException, self).__init__(message)
        self.status_code = status_code


class DeepSeaReadCompletion(orchestrator.ReadCompletion):
    def __init__(self, process_result_callback):
        super(DeepSeaReadCompletion, self).__init__()
        self._complete = False
        self._cb = process_result_callback

    def _process_result(self, data):
        self._result = self._cb(data)
        self._complete = True

    @property
    def result(self):
        return self._result

    @property
    def is_complete(self):
        return self._complete


class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
    MODULE_OPTIONS = [
        {
            'name': 'salt_api_url',
            'default': ''
        },
        {
            'name': 'salt_api_eauth',
            'default': 'sharedsecret'
        },
        {
            'name': 'salt_api_username',
            'default': ''
        },
        {
            'name': 'salt_api_password',
            'default': ''
        }
    ]


    COMMANDS = [
        {
            "cmd": "deepsea config-set name=key,type=CephString "
                   "name=value,type=CephString",
            "desc": "Set a configuration value",
            "perm": "rw"
        },
        {
            "cmd": "deepsea config-show",
            "desc": "Show current configuration",
            "perm": "r"
        }
    ]


    @property
    def config_keys(self):
        return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)


    def get_module_option(self, key, default=None):
        """
        Overrides the default MgrModule get_module_option() method to pull in defaults
        specific to this module
        """
        return super(DeepSeaOrchestrator, self).get_module_option(key, default=self.config_keys[key])


    def _config_valid(self):
        for key in self.config_keys.keys():
            if not self.get_module_option(key, self.config_keys[key]):
                return False
        return True


    def __init__(self, *args, **kwargs):
        super(DeepSeaOrchestrator, self).__init__(*args, **kwargs)
        self._event = Event()
        self._token = None
        self._event_reader = None
        self._reading_events = False
        self._last_failure_msg = None
        self._all_completions = dict()
        self._completion_lock = Lock()
        self.inventory_cache = orchestrator.OutdatableDict()
        self.service_cache = orchestrator.OutdatableDict()

    def available(self):
        if not self._config_valid():
            return False, "Configuration invalid; try `ceph deepsea config-set [...]`"

        if not self._reading_events and self._last_failure_msg:
            return False, self._last_failure_msg

        return True, ""

    def get_inventory(self, node_filter=None, refresh=False):
        """
        Note that this will raise an exception (e.g. if the salt-api is down,
        or the username/password is incorret).  Same for other methods.
        Callers should expect this and react appropriately.  The orchestrator
        cli, for example, just prints the traceback in the console, so the
        user at least sees the error.
        """
        self.inventory_cache.remove_outdated()
        if not self.inventory_cache.any_outdated() and not refresh:
            if node_filter is None:
                return orchestrator.TrivialReadCompletion(
                    orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items()))
            elif node_filter.labels is None:
                try:
                    return orchestrator.TrivialReadCompletion(
                        orchestrator.InventoryNode.from_nested_items(
                            self.inventory_cache.items_filtered(node_filter.nodes)))
                except KeyError:
                    # items_filtered() will raise KeyError if passed a node name that doesn't exist
                    return orchestrator.TrivialReadCompletion([])

        def process_result(event_data):
            result = []
            if event_data['success']:
                for node_name, node_devs in event_data["return"].items():
                    if node_filter is None:
                        # The cache will only be populated when this function is invoked
                        # without a node filter, i.e. if you run it once for the whole
                        # cluster, you can then call it for individual nodes and return
                        # cached data.  However, if you only *ever* call it for individual
                        # nodes, the cache will never be populated, and you'll always have
                        # the full round trip to DeepSea.
                        self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs)
                    devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs)
                    result.append(orchestrator.InventoryNode(node_name, devs))
            else:
                self.log.error(event_data['return'])
            return result

        with self._completion_lock:
            c = DeepSeaReadCompletion(process_result)

            nodes = []
            roles = []
            if node_filter:
                nodes = node_filter.nodes
                roles = node_filter.labels

            resp = self._do_request_with_login("POST", data = {
                "client": "runner_async",
                "fun": "mgr_orch.get_inventory",
                "nodes": nodes,
                "roles": roles
            })

            # ['return'][0]['tag'] in the resonse JSON is what we need to match
            # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
            self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c

            return c

    def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):

        # Note: describe_service() does *not* support OSDs.  This is because
        # DeepSea doesn't really record what OSDs are deployed where; Ceph is
        # considered the canonical source of this information, so having this
        # function query OSD information from DeepSea doesn't make a lot of
        # sense (DeepSea would have to call back into Ceph).

        assert service_type in ("mon", "mgr", "mds", "rgw", "nfs", "iscsi", None), service_type + " unsupported"

        def _deepsea_to_ceph(service):
            if service == "ganesha":
                return "nfs"
            elif service == "igw":
                return "iscsi"
            else:
                return service

        # presently unused
        def _ceph_to_deepsea(service):
            if service == "nfs":
                return "ganesha"
            elif service == "iscsi":
                return "igw"
            else:
                return service

        self.service_cache.remove_outdated()
        if not self.service_cache.any_outdated() and not refresh:
            # Let's hope the services are complete.
            try:
                node_filter = [node_name] if node_name else None
                services_by_node = [d[1].data for d in self.service_cache.items_filtered(node_filter)]
                services = [orchestrator.ServiceDescription.from_json(s) for services in services_by_node for s in services]
                services = [s for s in services if
                            (True if service_type is None else s.service_type == service_type) and
                            (True if service_id is None else s.service_instance == service_id)]
                return orchestrator.TrivialReadCompletion(services)
            except KeyError:
                # items_filtered() will raise KeyError if passed a node name that doesn't exist
                return orchestrator.TrivialReadCompletion([])

        def process_result(event_data):
            result = []
            if event_data['success']:
                for service_node, service_info in event_data["return"].items():
                    node_service_cache = []
                    for this_service_type, service_dict in service_info.items():
                        if isinstance(service_dict, str):
                            # map old form where deepsea only returned service IDs
                            # to new form where it retuns a dict
                            service_dict = { 'service_instance': service_dict }
                        desc = orchestrator.ServiceDescription(nodename=service_node,
                                                               service_instance=service_dict['service_instance'],
                                                               service_type=_deepsea_to_ceph(this_service_type),
                                                               # the following may or may not be present
                                                               container_id=service_dict.get('container_id', None),
                                                               service=service_dict.get('service', None),
                                                               version=service_dict.get('version', None),
                                                               rados_config_location=service_dict.get('rados_config_location', None),
                                                               service_url = service_dict.get('service_url', None),
                                                               status=service_dict.get('status', None),
                                                               status_desc=service_dict.get('status_desc', None)
                                                               )
                        # Always add every service to the cache...
                        node_service_cache.append(desc.to_json())
                        # ...but only return the ones the caller asked for
                        if ((service_type is None or desc.service_type == service_type) and
                            (service_id is None or desc.service_instance == service_id) and
                            (node_name is None or desc.nodename == node_name)):
                            result.append(desc)

                    self.service_cache[service_node] = orchestrator.OutdatableData(node_service_cache)
            else:
                self.log.error(event_data['return'])
            return result

        with self._completion_lock:
            c = DeepSeaReadCompletion(process_result)

            # Always request all services, so we always have all services cached.
            resp = self._do_request_with_login("POST", data = {
                "client": "runner_async",
                "fun": "mgr_orch.describe_service"
            })
            self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c

            return c

    def wait(self, completions):
        incomplete = False

        with self._completion_lock:
            for c in completions:
                if c.is_complete:
                    continue
                if not c.is_complete:
                    # TODO: the job is in the bus, it should reach us eventually
                    # unless something has gone wrong (e.g. salt-api died, etc.),
                    # in which case it's possible the job finished but we never
                    # noticed the salt/run/$id/ret event.  Need to add the job ID
                    # (or possibly the full event tag) to the completion object.
                    # That way, if we want to double check on a job that hasn't
                    # been completed yet, we can make a synchronous request to
                    # salt-api to invoke jobs.lookup_jid, and if it's complete we
                    # should be able to pass its return value to _process_result()
                    # Question: do we do this automatically after some timeout?
                    # Or do we add a function so the admin can check and "unstick"
                    # a stuck completion?
                    incomplete = True

        return not incomplete


    def handle_command(self, inbuf, cmd):
        if cmd['prefix'] == 'deepsea config-show':
            return 0, json.dumps(dict([(key, self.get_module_option(key)) for key in self.config_keys.keys()])), ''

        elif cmd['prefix'] == 'deepsea config-set':
            if cmd['key'] not in self.config_keys.keys():
                return (-errno.EINVAL, '',
                        "Unknown configuration option '{0}'".format(cmd['key']))

            self.set_module_option(cmd['key'], cmd['value'])
            self._event.set()
            return 0, "Configuration option '{0}' updated".format(cmd['key']), ''

        return (-errno.EINVAL, '',
                "Command not found '{0}'".format(cmd['prefix']))


    def serve(self):
        self.log.info('DeepSea module starting up')
        self.run = True
        while self.run:
            if not self._config_valid():
                # This will spin until the config is valid, spitting a warning
                # that the config is invalid every 60 seconds.  The one oddity
                # is that while setting the various parameters, this log warning
                # will print once for each parameter set until the config is valid.
                self.log.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
                self._event.wait(60)
                self._event.clear()
                continue

            if self._event_reader and not self._reading_events:
                self._event_reader = None

            if not self._event_reader:
                self._last_failure_msg = None
                try:
                    # This spawns a separate thread to read the salt event bus
                    # stream.  We can't do it in the serve thead, because reading
                    # from the response blocks, which would prevent the serve
                    # thread from handling anything else.
                    #
                    # TODO: figure out how to restart the _event_reader thread if
                    # config changes, e.g.: a new username or password is set.
                    # This will be difficult, because _read_sse() just blocks waiting
                    # for response lines.  The closest I got was setting a read timeout
                    # on the request, but in the general case (where not much is
                    # happening most of the time), this will result in continual
                    # timeouts and reconnects.  We really need an asynchronous read
                    # to support this.
                    self._event_response = self._do_request_with_login("GET", "events", stream=True)
                    self._event_reader = Thread(target=self._read_sse)
                    self._reading_events = True
                    self._event_reader.start()
                except Exception as ex:
                    self._set_last_failure_msg("Failure setting up event reader: " + str(ex))
                    # gives an (arbitrary) 60 second retry if we can't attach to
                    # the salt-api event bus for some reason (e.g.: invalid username,
                    # or password, which will be logged as "Request failed with status
                    # code 401").  Note that this 60 second retry will also happen if
                    # salt-api dies.
                    self._event.wait(60)
                    self._event.clear()
                    continue

            # Wait indefinitely for something interesting to happen (e.g.
            # config-set, or shutdown), or the event reader to fail, which
            # will happen if the salt-api server dies or restarts).
            self._event.wait()
            self._event.clear()


    def shutdown(self):
        self.log.info('DeepSea module shutting down')
        self.run = False
        self._event.set()


    def _set_last_failure_msg(self, msg):
        self._last_failure_msg = msg
        self.log.warn(msg)


    # Reader/parser of SSE events, see:
    # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events)
    # - https://www.w3.org/TR/2009/WD-eventsource-20090421/
    # Note: this is pretty braindead and doesn't implement the full eventsource
    # spec, but it *does* implement enough for us to listen to events from salt
    # and potentially do something with them.
    def _read_sse(self):
        event = {}
        try:
            # Just starting the event reader; if we've made it here, we know we're
            # talking to salt-api (_do_request would have raised an exception if the
            # response wasn't ok), so check if there's any completions inflight that
            # need to be dealt with.  This handles the case where some command was
            # invoked, then salt-api died somehow, and we reconneced, but missed the
            # completion at the time it actually happened.
            for tag in list(self._all_completions):
                self.log.info("Found event {} inflight".format(tag))
                try:
                    resp = self._do_request_with_login("POST", data = {
                        "client": "runner",
                        "fun": "jobs.lookup_jid",
                        "jid": tag.split('/')[2]
                    })
                    # jobs.lookup_jid returns a dict keyed by hostname.
                    return_dict = resp.json()['return'][0]
                    if return_dict:
                        # If the job is complete, there'll be one item in the dict.
                        self.log.info("Event {} complete".format(tag))
                        # The key is the salt master hostname, but we don't care
                        # about that, so just grab the data.
                        data = next(iter(return_dict.items()))[1]
                        self._all_completions[tag]._process_result(data)
                        # TODO: decide whether it's bad to delete the completion
                        # here -- would we ever need to resurrect it?
                        del self._all_completions[tag]
                    else:
                        # if the job is not complete, there'll be nothing in the dict
                        self.log.info("Event {} still pending".format(tag))
                except Exception as ex:
                    # Logging a warning if the request failed, so we can continue
                    # checking any other completions, then get onto reading events
                    self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))

            for line in self._event_response.iter_lines():
                with self._completion_lock:
                    if line:
                        line = line.decode('utf-8')
                        colon = line.find(':')
                        if colon > 0:
                            k = line[:colon]
                            v = line[colon+2:]
                            if k == "retry":
                                # TODO: find out if we need to obey this reconnection time
                                self.log.warn("Server requested retry {}, ignored".format(v))
                            else:
                                event[k] = v
                    else:
                        # Empty line, terminates an event.  Note that event['tag']
                        # is a salt-api extension to SSE to avoid having to decode
                        # json data if you don't care about it.  To get to the
                        # interesting stuff, you want event['data'], which is json.
                        # If you want to have some fun, try
                        # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
                        # then `salt '*' test.ping` on the master
                        self.log.debug("Got event '{}'".format(str(event)))

                        # If we're actually interested in this event (i.e. it's
                        # in our completion dict), fire off that completion's
                        # _process_result() callback and remove it from our list.
                        if event['tag'] in self._all_completions:
                            self.log.info("Event {} complete".format(event['tag']))
                            self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
                            # TODO: decide whether it's bad to delete the completion
                            # here -- would we ever need to resurrect it?
                            del self._all_completions[event['tag']]

                        event = {}
            self._set_last_failure_msg("SSE read terminated")
        except Exception as ex:
            self.log.exception(ex)
            self._set_last_failure_msg("SSE read failed: {}".format(str(ex)))

        self._reading_events = False
        self._event.set()


    # _do_request(), _login() and _do_request_with_login() are an extremely
    # minimalist form of the following, with notably terse error handling:
    # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default
    # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default
    # rationale:
    # - I needed slightly different behaviour than in openATTIC (I want the
    #   caller to read the response, to allow streaming the salt-api event bus)
    # - I didn't want to pull in 400+ lines more code into this presently
    #   experimental module, to save everyone having to review it ;-)

    def _do_request(self, method, path="", data=None, stream=False):
        """
        returns the response, which the caller then has to read
        """
        url = "{0}/{1}".format(self.get_module_option('salt_api_url'), path)
        try:
            if method.lower() == 'get':
                resp = requests.get(url, headers = { "X-Auth-Token": self._token },
                                    data=data, stream=stream)
            elif method.lower() == 'post':
                resp = requests.post(url, headers = { "X-Auth-Token": self._token },
                                     data=data)

            else:
                raise RequestException("Method '{}' not supported".format(method.upper()))
            if resp.ok:
                return resp
            else:
                msg = "Request failed with status code {}".format(resp.status_code)
                raise RequestException(msg, resp.status_code)
        except requests.exceptions.ConnectionError as ex:
            self.log.exception(str(ex))
            raise RequestException(str(ex))
        except requests.exceptions.InvalidURL as ex:
            self.log.exception(str(ex))
            raise RequestException(str(ex))


    def _login(self):
        resp = self._do_request('POST', 'login', data = {
            "eauth": self.get_module_option('salt_api_eauth'),
            "password": self.get_module_option('salt_api_password'),
            "username": self.get_module_option('salt_api_username')
        })
        self._token = resp.json()['return'][0]['token']
        self.log.info("Salt API login successful")


    def _do_request_with_login(self, method, path="", data=None, stream=False):
        retries = 2
        while True:
            try:
                if not self._token:
                    self._login()
                return self._do_request(method, path, data, stream)
            except RequestException as ex:
                retries -= 1
                if ex.status_code not in [401, 403] or retries == 0:
                    raise ex
                self._token = None