diff options
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/k8sevents/module.py | 1460 |
1 files changed, 1460 insertions, 0 deletions
diff --git a/src/pybind/mgr/k8sevents/module.py b/src/pybind/mgr/k8sevents/module.py new file mode 100644 index 00000000..e6568269 --- /dev/null +++ b/src/pybind/mgr/k8sevents/module.py @@ -0,0 +1,1460 @@ +# Integrate with the kubernetes events API. +# This module sends events to Kubernetes, and also captures/tracks all events +# in the rook-ceph namespace so kubernetes activity like pod restarts, +# imagepulls etc can be seen from within the ceph cluster itself. +# +# To interact with the events API, the mgr service to access needs to be +# granted additional permissions +# e.g. kubectl -n rook-ceph edit clusterrole rook-ceph-mgr-cluster-rules +# +# These are the changes needed; +# - apiGroups: +# - "" +# resources: +# - events +# verbs: +# - create +# - patch +# - list +# - get +# - watch + + +import os +import re +import sys +import time +import json +import yaml +import errno +import socket +import base64 +import logging +import tempfile +import threading + +try: + # python 3 + from urllib.parse import urlparse +except ImportError: + # python 2 fallback + from urlparse import urlparse + +from datetime import tzinfo, datetime, timedelta + +from urllib3.exceptions import MaxRetryError,ProtocolError +from collections import OrderedDict + +import rados +from mgr_module import MgrModule +from mgr_util import verify_cacrt, ServerConfigException + +try: + import queue +except ImportError: + # python 2.7.5 + import Queue as queue +finally: + # python 2.7.15 or python3 + event_queue = queue.Queue() + +try: + from kubernetes import client, config, watch + from kubernetes.client.rest import ApiException +except ImportError: + kubernetes_imported = False + client = None + config = None + watch = None +else: + kubernetes_imported = True + + # The watch.Watch.stream method can provide event objects that have involved_object = None + # which causes an exception in the generator. A workaround is discussed for a similar issue + # in https://github.com/kubernetes-client/python/issues/376 which has been used here + # pylint: disable=no-member + from kubernetes.client.models.v1_event import V1Event + def local_involved_object(self, involved_object): + if involved_object is None: + involved_object = client.V1ObjectReference(api_version="1") + self._involved_object = involved_object + V1Event.involved_object = V1Event.involved_object.setter(local_involved_object) + +log = logging.getLogger(__name__) + +# use a simple local class to represent UTC +# datetime pkg modules vary between python2 and 3 and pytz is not available on older +# ceph container images, so taking a pragmatic approach! +class UTC(tzinfo): + def utcoffset(self, dt): + return timedelta(0) + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return timedelta(0) + + +def text_suffix(num): + """Define a text suffix based on a value i.e. turn host into hosts""" + return '' if num == 1 else 's' + + +def create_temp_file(fname, content, suffix=".tmp"): + """Create a temp file + + Attempt to create an temporary file containing the given content + + Returns: + str .. full path to the temporary file + + Raises: + OSError: problems creating the file + + """ + + if content is not None: + file_name = os.path.join(tempfile.gettempdir(), fname + suffix) + + try: + with open(file_name, "w") as f: + f.write(content) + except OSError as e: + raise OSError("Unable to create temporary file : {}".format(str(e))) + + return file_name + + +class HealthCheck(object): + """Transform a healthcheck msg into it's component parts""" + + def __init__(self, msg, msg_level): + + # msg looks like + # + # Health check failed: Reduced data availability: 100 pgs inactive (PG_AVAILABILITY) + # Health check cleared: OSDMAP_FLAGS (was: nodown flag(s) set) + # Health check failed: nodown flag(s) set (OSDMAP_FLAGS) + # + self.msg = None + self.name = None + self.text = None + self.valid = False + + if msg.lower().startswith('health check'): + + self.valid = True + self.msg = msg + msg_tokens = self.msg.split() + + if msg_level == 'INF': + self.text = ' '.join(msg_tokens[3:]) + self.name = msg_tokens[3] # health check name e.g. OSDMAP_FLAGS + else: # WRN or ERR + self.text = ' '.join(msg_tokens[3:-1]) + self.name = msg_tokens[-1][1:-1] + + +class LogEntry(object): + """Generic 'log' object""" + + reason_map = { + "audit": "Audit", + "cluster": "HealthCheck", + "config": "ClusterChange", + "heartbeat":"Heartbeat", + "startup": "Started" + } + + def __init__(self, source, msg, msg_type, level, tstamp=None): + + self.source = source + self.msg = msg + self.msg_type = msg_type + self.level = level + self.tstamp = tstamp + self.healthcheck = None + + if 'health check ' in self.msg.lower(): + self.healthcheck = HealthCheck(self.msg, self.level) + + + def __str__(self): + return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source, + self.msg_type, + self.msg, + self.level, + self.tstamp) + + @property + def cmd(self): + """Look at the msg string and extract the command content""" + + # msg looks like 'from=\'client.205306 \' entity=\'client.admin\' cmd=\'[{"prefix": "osd set", "key": "nodown"}]\': finished' + if self.msg_type != 'audit': + return None + else: + _m=self.msg[:-10].replace("\'","").split("cmd=") + _s='"cmd":{}'.format(_m[1]) + cmds_list = json.loads('{' + _s + '}')['cmd'] + + # TODO. Assuming only one command was issued for now + _c = cmds_list[0] + return "{} {}".format(_c['prefix'], _c.get('key', '')) + + @property + def event_type(self): + return 'Normal' if self.level == 'INF' else 'Warning' + + @property + def event_reason(self): + return self.reason_map[self.msg_type] + + @property + def event_name(self): + if self.msg_type == 'heartbeat': + return 'mgr.Heartbeat' + elif self.healthcheck: + return 'mgr.health.{}'.format(self.healthcheck.name) + elif self.msg_type == 'audit': + return 'mgr.audit.{}'.format(self.cmd).replace(' ', '_') + elif self.msg_type == 'config': + return 'mgr.ConfigurationChange' + elif self.msg_type == 'startup': + return "mgr.k8sevents-module" + else: + return None + + @property + def event_entity(self): + if self.msg_type == 'audit': + return self.msg.replace("\'","").split('entity=')[1].split(' ')[0] + else: + return None + + @property + def event_msg(self): + if self.msg_type == 'audit': + return "Client '{}' issued: ceph {}".format(self.event_entity, self.cmd) + + elif self.healthcheck: + return self.healthcheck.text + else: + return self.msg + + +class BaseThread(threading.Thread): + health = 'OK' + reported = False + daemon = True + + +def clean_event(event): + """ clean an event record """ + if not event.first_timestamp: + log.error("first_timestamp is empty") + if event.metadata.creation_timestamp: + log.error("setting first_timestamp to the creation timestamp") + event.first_timestamp = event.metadata.creation_timestamp + else: + log.error("defaulting event first timestamp to current datetime") + event.first_timestamp = datetime.datetime.now() + + if not event.last_timestamp: + log.error("setting event last timestamp to {}".format(event.first_timestamp)) + event.last_timestamp = event.first_timestamp + + if not event.count: + event.count = 1 + + return event + + +class NamespaceWatcher(BaseThread): + """Watch events in a given namespace + + Using the watch package we can listen to event traffic in the namespace to + get an idea of what kubernetes related events surround the ceph cluster. The + thing to bear in mind is that events have a TTL enforced by the kube-apiserver + so this stream will only really show activity inside this retention window. + """ + + def __init__(self, api_client_config, namespace=None): + super(NamespaceWatcher, self).__init__() + + if api_client_config: + self.api = client.CoreV1Api(api_client_config) + else: + self.api = client.CoreV1Api() + + self.namespace = namespace + + self.events = OrderedDict() + self.lock = threading.Lock() + self.active = None + self.resource_version = None + + def fetch(self): + # clear the cache on every call to fetch + self.events.clear() + try: + resp = self.api.list_namespaced_event(self.namespace) + # TODO - Perhaps test for auth problem to be more specific in the except clause? + except: + self.active = False + self.health = "Unable to access events API (list_namespaced_event call failed)" + log.warning(self.health) + else: + self.active = True + self.resource_version = resp.metadata.resource_version + + for item in resp.items: + self.events[item.metadata.name] = clean_event(item) + log.info('Added {} events'.format(len(resp.items))) + + def run(self): + self.fetch() + func = getattr(self.api, "list_namespaced_event") + + if self.active: + log.info("Namespace event watcher started") + + + while True: + + try: + w = watch.Watch() + # execute generator to continually watch resource for changes + for item in w.stream(func, namespace=self.namespace, resource_version=self.resource_version, watch=True): + obj = item['object'] + + with self.lock: + + if item['type'] in ['ADDED', 'MODIFIED']: + self.events[obj.metadata.name] = clean_event(obj) + + elif item['type'] == 'DELETED': + del self.events[obj.metadata.name] + + # TODO test the exception for auth problem (403?) + + # Attribute error is generated when urllib3 on the system is old and doesn't have a + # read_chunked method + except AttributeError as e: + self.health = ("Error: Unable to 'watch' events API in namespace '{}' - " + "urllib3 too old? ({})".format(self.namespace, e)) + self.active = False + log.warning(self.health) + break + + except ApiException as e: + # refresh the resource_version & watcher + log.warning("API exception caught in watcher ({})".format(e)) + log.warning("Restarting namespace watcher") + self.fetch() + + except ProtocolError as e: + log.warning("Namespace watcher hit protocolerror ({}) - restarting".format(e)) + self.fetch() + + except Exception: + self.health = "{} Exception at {}".format( + sys.exc_info()[0].__name__, + datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") + ) + log.exception(self.health) + self.active = False + break + + log.warning("Namespace event watcher stopped") + + +class KubernetesEvent(object): + + def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None): + + if api_client_config: + self.api = client.CoreV1Api(api_client_config) + else: + self.api = client.CoreV1Api() + + self.namespace = namespace + + self.event_name = log_entry.event_name + self.message = log_entry.event_msg + self.event_type = log_entry.event_type + self.event_reason = log_entry.event_reason + self.unique_name = unique_name + + self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN')) + + self.api_status = 200 + self.count = 1 + self.first_timestamp = None + self.last_timestamp = None + + @property + def type(self): + """provide a type property matching a V1Event object""" + return self.event_type + + @property + def event_body(self): + if self.unique_name: + obj_meta = client.V1ObjectMeta(name="{}".format(self.event_name)) + else: + obj_meta = client.V1ObjectMeta(generate_name="{}".format(self.event_name)) + + # field_path is needed to prevent problems in the namespacewatcher when + # deleted event are received + obj_ref = client.V1ObjectReference(kind="CephCluster", + field_path='spec.containers{mgr}', + name=self.event_name, + namespace=self.namespace) + + event_source = client.V1EventSource(component="ceph-mgr", + host=self.host) + return client.V1Event( + involved_object=obj_ref, + metadata=obj_meta, + message=self.message, + count=self.count, + type=self.event_type, + reason=self.event_reason, + source=event_source, + first_timestamp=self.first_timestamp, + last_timestamp=self.last_timestamp + ) + + def write(self): + + now=datetime.now(UTC()) + + self.first_timestamp = now + self.last_timestamp = now + + try: + self.api.create_namespaced_event(self.namespace, self.event_body) + except (OSError, ProtocolError): + # unable to reach to the API server + log.error("Unable to reach API server") + self.api_status = 400 + except MaxRetryError: + # k8s config has not be defined properly + log.error("multiple attempts to connect to the API have failed") + self.api_status = 403 # Forbidden + except ApiException as e: + log.debug("event.write status:{}".format(e.status)) + self.api_status = e.status + if e.status == 409: + log.debug("attempting event update for an existing event") + # 409 means the event is there already, so read it back (v1Event object returned) + # this could happen if the event has been created, and then the k8sevent module + # disabled and reenabled - i.e. the internal event tracking no longer matches k8s + response = self.api.read_namespaced_event(self.event_name, self.namespace) + # + # response looks like + # + # {'action': None, + # 'api_version': 'v1', + # 'count': 1, + # 'event_time': None, + # 'first_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), + # 'involved_object': {'api_version': None, + # 'field_path': None, + # 'kind': 'CephCluster', + # 'name': 'ceph-mgr.k8sevent-module', + # 'namespace': 'rook-ceph', + # 'resource_version': None, + # 'uid': None}, + # 'kind': 'Event', + # 'last_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), + # 'message': 'Ceph log -> event tracking started', + # 'metadata': {'annotations': None, + # 'cluster_name': None, + # 'creation_timestamp': datetime.datetime(2019, 7, 18, 5, 24, 59, tzinfo=tzlocal()), + # 'deletion_grace_period_seconds': None, + # 'deletion_timestamp': None, + # 'finalizers': None, + # 'generate_name': 'ceph-mgr.k8sevent-module', + # 'generation': None, + # 'initializers': None, + # 'labels': None, + # 'name': 'ceph-mgr.k8sevent-module5z7kq', + # 'namespace': 'rook-ceph', + # 'owner_references': None, + # 'resource_version': '1195832', + # 'self_link': '/api/v1/namespaces/rook-ceph/events/ceph-mgr.k8sevent-module5z7kq', + # 'uid': '62fde5f1-a91c-11e9-9c80-6cde63a9debf'}, + # 'reason': 'Started', + # 'related': None, + # 'reporting_component': '', + # 'reporting_instance': '', + # 'series': None, + # 'source': {'component': 'ceph-mgr', 'host': 'minikube'}, + # 'type': 'Normal'} + + # conflict event already exists + # read it + # update : count and last_timestamp and msg + + self.count = response.count + 1 + self.first_timestamp = response.first_timestamp + try: + self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) + except ApiException as e: + log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status)) + self.api_status = e.status + else: + log.debug("event {} patched".format(self.event_name)) + self.api_status = 200 + + else: + log.debug("event {} created successfully".format(self.event_name)) + self.api_status = 200 + + @property + def api_success(self): + return self.api_status == 200 + + def update(self, log_entry): + self.message = log_entry.event_msg + self.event_type = log_entry.event_type + self.last_timestamp = datetime.now(UTC()) + self.count += 1 + log.debug("performing event update for {}".format(self.event_name)) + + try: + self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) + except ApiException as e: + log.error("event patch call failed: {}".format(e.status)) + if e.status == 404: + # tried to patch, but hit a 404. The event's TTL must have been reached, and + # pruned by the kube-apiserver + log.debug("event not found, so attempting to create it") + try: + self.api.create_namespaced_event(self.namespace, self.event_body) + except ApiException as e: + log.error("unable to create the event: {}".format(e.status)) + self.api_status = e.status + else: + log.debug("event {} created successfully".format(self.event_name)) + self.api_status = 200 + else: + log.debug("event {} updated".format(self.event_name)) + self.api_status = 200 + + +class EventProcessor(BaseThread): + """Handle a global queue used to track events we want to send/update to kubernetes""" + + can_run = True + + def __init__(self, config_watcher, event_retention_days, api_client_config, namespace): + super(EventProcessor, self).__init__() + + self.events = dict() + self.config_watcher = config_watcher + self.event_retention_days = event_retention_days + self.api_client_config = api_client_config + self.namespace = namespace + + def startup(self): + """Log an event to show we're active""" + + event = KubernetesEvent( + LogEntry( + source='self', + msg='Ceph log -> event tracking started', + msg_type='startup', + level='INF', + tstamp=None + ), + unique_name=False, + api_client_config=self.api_client_config, + namespace=self.namespace + ) + + event.write() + return event.api_success + + @property + def ok(self): + return self.startup() + + def prune_events(self): + log.debug("prune_events - looking for old events to remove from cache") + oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days) + local_events = dict(self.events) + + for event_name in sorted(local_events, + key = lambda name: local_events[name].last_timestamp): + event = local_events[event_name] + if event.last_timestamp >= oldest: + break + else: + # drop this event + log.debug("prune_events - removing old event : {}".format(event_name)) + del self.events[event_name] + + def process(self, log_object): + + log.debug("log entry being processed : {}".format(str(log_object))) + + event_out = False + unique_name = True + + if log_object.msg_type == 'audit': + # audit traffic : operator commands + if log_object.msg.endswith('finished'): + log.debug("K8sevents received command finished msg") + event_out = True + else: + # NO OP - ignoring 'dispatch' log records + return + + elif log_object.msg_type == 'cluster': + # cluster messages : health checks + if log_object.event_name: + event_out = True + + elif log_object.msg_type == 'config': + # configuration checker messages + event_out = True + unique_name = False + + elif log_object.msg_type == 'heartbeat': + # hourly health message summary from Ceph + event_out = True + unique_name = False + log_object.msg = str(self.config_watcher) + + else: + log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type)) + + if event_out: + log.debug("k8sevents sending event to kubernetes") + # we don't cache non-unique events like heartbeats or config changes + if not unique_name or log_object.event_name not in self.events.keys(): + event = KubernetesEvent(log_entry=log_object, + unique_name=unique_name, + api_client_config=self.api_client_config, + namespace=self.namespace) + event.write() + log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status)) + if event.api_success and unique_name: + self.events[log_object.event_name] = event + else: + event = self.events[log_object.event_name] + event.update(log_object) + log.debug("event update ended : {}".format(event.api_status)) + + self.prune_events() + + else: + log.debug("K8sevents ignored message : {}".format(log_object.msg)) + + def run(self): + log.info("Ceph event processing thread started, " + "event retention set to {} days".format(self.event_retention_days)) + + while True: + + try: + log_object = event_queue.get(block=False) + except queue.Empty: + pass + else: + try: + self.process(log_object) + except Exception: + self.health = "{} Exception at {}".format( + sys.exc_info()[0].__name__, + datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") + ) + log.exception(self.health) + break + + if not self.can_run: + break + + time.sleep(0.5) + + log.warning("Ceph event processing thread stopped") + + +class ListDiff(object): + def __init__(self, before, after): + self.before = set(before) + self.after = set(after) + + @property + def removed(self): + return list(self.before - self.after) + + @property + def added(self): + return list(self.after - self.before) + + @property + def is_equal(self): + return self.before == self.after + + +class CephConfigWatcher(BaseThread): + """Detect configuration changes within the cluster and generate human readable events""" + + def __init__(self, mgr): + super(CephConfigWatcher, self).__init__() + self.mgr = mgr + self.server_map = dict() + self.osd_map = dict() + self.pool_map = dict() + self.service_map = dict() + + self.config_check_secs = mgr.config_check_secs + + @property + def raw_capacity(self): + # Note. if the osd's are not online the capacity field will be 0 + return sum([self.osd_map[osd]['capacity'] for osd in self.osd_map]) + + @property + def num_servers(self): + return len(self.server_map.keys()) + + @property + def num_osds(self): + return len(self.osd_map.keys()) + + @property + def num_pools(self): + return len(self.pool_map.keys()) + + def __str__(self): + s = '' + + s += "{} : {:>3} host{}, {} pool{}, {} OSDs. Raw Capacity {}B".format( + json.loads(self.mgr.get('health')['json'])['status'], + self.num_servers, + text_suffix(self.num_servers), + self.num_pools, + text_suffix(self.num_pools), + self.num_osds, + MgrModule.to_pretty_iec(self.raw_capacity)) + return s + + def fetch_servers(self): + """Return a server summary, and service summary""" + servers = self.mgr.list_servers() + server_map = dict() # host -> services + service_map = dict() # service -> host + for server_info in servers: + services = dict() + for svc in server_info['services']: + if svc.get('type') in services.keys(): + services[svc.get('type')].append(svc.get('id')) + else: + services[svc.get('type')] = list([svc.get('id')]) + # maintain the service xref map service -> host and version + service_map[(svc.get('type'), str(svc.get('id')))] = server_info.get('hostname', '') + server_map[server_info.get('hostname')] = services + + return server_map, service_map + + def fetch_pools(self): + interesting = ["type", "size", "min_size"] + # pools = [{'pool': 1, 'pool_name': 'replicapool', 'flags': 1, 'flags_names': 'hashpspool', + # 'type': 1, 'size': 3, 'min_size': 1, 'crush_rule': 1, 'object_hash': 2, 'pg_autoscale_mode': 'warn', + # 'pg_num': 100, 'pg_placement_num': 100, 'pg_placement_num_target': 100, 'pg_num_target': 100, 'pg_num_pending': 100, + # 'last_pg_merge_meta': {'ready_epoch': 0, 'last_epoch_started': 0, 'last_epoch_clean': 0, 'source_pgid': '0.0', + # 'source_version': "0'0", 'target_version': "0'0"}, 'auid': 0, 'snap_mode': 'selfmanaged', 'snap_seq': 0, 'snap_epoch': 0, + # 'pool_snaps': [], 'quota_max_bytes': 0, 'quota_max_objects': 0, 'tiers': [], 'tier_of': -1, 'read_tier': -1, + # 'write_tier': -1, 'cache_mode': 'none', 'target_max_bytes': 0, 'target_max_objects': 0, + # 'cache_target_dirty_ratio_micro': 400000, 'cache_target_dirty_high_ratio_micro': 600000, + # 'cache_target_full_ratio_micro': 800000, 'cache_min_flush_age': 0, 'cache_min_evict_age': 0, + # 'erasure_code_profile': '', 'hit_set_params': {'type': 'none'}, 'hit_set_period': 0, 'hit_set_count': 0, + # 'use_gmt_hitset': True, 'min_read_recency_for_promote': 0, 'min_write_recency_for_promote': 0, + # 'hit_set_grade_decay_rate': 0, 'hit_set_search_last_n': 0, 'grade_table': [], 'stripe_width': 0, + # 'expected_num_objects': 0, 'fast_read': False, 'options': {}, 'application_metadata': {'rbd': {}}, + # 'create_time': '2019-08-02 02:23:01.618519', 'last_change': '19', 'last_force_op_resend': '0', + # 'last_force_op_resend_prenautilus': '0', 'last_force_op_resend_preluminous': '0', 'removed_snaps': '[]'}] + pools = self.mgr.get('osd_map')['pools'] + pool_map = dict() + for pool in pools: + pool_map[pool.get('pool_name')] = {k:pool.get(k) for k in interesting} + return pool_map + + + def fetch_osd_map(self, service_map): + """Create an osd map""" + stats = self.mgr.get('osd_stats') + + osd_map = dict() + + devices = self.mgr.get('osd_map_crush')['devices'] + for dev in devices: + osd_id = str(dev['id']) + osd_map[osd_id] = dict( + deviceclass=dev.get('class'), + capacity=0, + hostname=service_map['osd', osd_id] + ) + + for osd_stat in stats['osd_stats']: + osd_id = str(osd_stat.get('osd')) + osd_map[osd_id]['capacity'] = osd_stat['statfs']['total'] + + return osd_map + + def push_events(self, changes): + """Add config change to the global queue to generate an event in kubernetes""" + log.debug("{} events will be generated") + for change in changes: + event_queue.put(change) + + def _generate_config_logentry(self, msg): + return LogEntry( + source="config", + msg_type="config", + msg=msg, + level='INF', + tstamp=None + ) + + def _check_hosts(self, server_map): + log.debug("K8sevents checking host membership") + changes = list() + servers = ListDiff(self.server_map.keys(), server_map.keys()) + if servers.is_equal: + # no hosts have been added or removed + pass + else: + # host changes detected, find out what + host_msg = "Host '{}' has been {} the cluster" + for new_server in servers.added: + changes.append(self._generate_config_logentry( + msg=host_msg.format(new_server, 'added to')) + ) + + for removed_server in servers.removed: + changes.append(self._generate_config_logentry( + msg=host_msg.format(removed_server, 'removed from')) + ) + + return changes + + def _check_osds(self,server_map, osd_map): + log.debug("K8sevents checking OSD configuration") + changes = list() + before_osds = list() + for svr in self.server_map: + before_osds.extend(self.server_map[svr].get('osd',[])) + + after_osds = list() + for svr in server_map: + after_osds.extend(server_map[svr].get('osd',[])) + + if set(before_osds) == set(after_osds): + # no change in osd id's + pass + else: + # osd changes detected + osd_msg = "Ceph OSD '{}' ({} @ {}B) has been {} host {}" + + osds = ListDiff(before_osds, after_osds) + for new_osd in osds.added: + changes.append(self._generate_config_logentry( + msg=osd_msg.format( + new_osd, + osd_map[new_osd]['deviceclass'], + MgrModule.to_pretty_iec(osd_map[new_osd]['capacity']), + 'added to', + osd_map[new_osd]['hostname'])) + ) + + for removed_osd in osds.removed: + changes.append(self._generate_config_logentry( + msg=osd_msg.format( + removed_osd, + osd_map[removed_osd]['deviceclass'], + MgrModule.to_pretty_iec(osd_map[removed_osd]['capacity']), + 'removed from', + osd_map[removed_osd]['hostname'])) + ) + + return changes + + def _check_pools(self, pool_map): + changes = list() + log.debug("K8sevents checking pool configurations") + if self.pool_map.keys() == pool_map.keys(): + # no pools added/removed + pass + else: + # Pool changes + pools = ListDiff(self.pool_map.keys(), pool_map.keys()) + pool_msg = "Pool '{}' has been {} the cluster" + for new_pool in pools.added: + changes.append(self._generate_config_logentry( + msg=pool_msg.format(new_pool, 'added to')) + ) + + for removed_pool in pools.removed: + changes.append(self._generate_config_logentry( + msg=pool_msg.format(removed_pool, 'removed from')) + ) + + # check pool configuration changes + for pool_name in pool_map: + if not self.pool_map.get(pool_name, dict()): + # pool didn't exist before so just skip the checks + continue + + if pool_map[pool_name] == self.pool_map[pool_name]: + # no changes - dicts match in key and value + continue + else: + # determine the change and add it to the change list + size_diff = pool_map[pool_name]['size'] - self.pool_map[pool_name]['size'] + if size_diff != 0: + if size_diff < 0: + msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name, + pool_map[pool_name]['size']) + level = 'WRN' + else: + msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name, + pool_map[pool_name]['size']) + level = 'INF' + + changes.append(LogEntry(source="config", + msg_type="config", + msg=msg, + level=level, + tstamp=None) + ) + + if pool_map[pool_name]['min_size'] != self.pool_map[pool_name]['min_size']: + changes.append(LogEntry(source="config", + msg_type="config", + msg="Minimum acceptable number of replicas in pool '{}' has changed".format(pool_name), + level='WRN', + tstamp=None) + ) + + return changes + + def get_changes(self, server_map, osd_map, pool_map): + """Detect changes in maps between current observation and the last""" + + changes = list() + + changes.extend(self._check_hosts(server_map)) + changes.extend(self._check_osds(server_map, osd_map)) + changes.extend(self._check_pools(pool_map)) + + # FUTURE + # Could generate an event if a ceph daemon has moved hosts + # (assumes the ceph metadata host information is valid though!) + + return changes + + def run(self): + log.info("Ceph configuration watcher started, interval set to {}s".format(self.config_check_secs)) + + self.server_map, self.service_map = self.fetch_servers() + self.pool_map = self.fetch_pools() + + self.osd_map = self.fetch_osd_map(self.service_map) + + while True: + + try: + start_time = time.time() + server_map, service_map = self.fetch_servers() + pool_map = self.fetch_pools() + osd_map = self.fetch_osd_map(service_map) + + changes = self.get_changes(server_map, osd_map, pool_map) + if changes: + self.push_events(changes) + + self.osd_map = osd_map + self.pool_map = pool_map + self.server_map = server_map + self.service_map = service_map + + checks_duration = int(time.time() - start_time) + + # check that the time it took to run the checks fits within the + # interval, and if not extend the interval and emit a log message + # to show that the runtime for the checks exceeded the desired + # interval + if checks_duration > self.config_check_secs: + new_interval = self.config_check_secs * 2 + log.warning("K8sevents check interval warning. " + "Current checks took {}s, interval was {}s. " + "Increasing interval to {}s".format(int(checks_duration), + self.config_check_secs, + new_interval)) + self.config_check_secs = new_interval + + time.sleep(self.config_check_secs) + + except Exception: + self.health = "{} Exception at {}".format( + sys.exc_info()[0].__name__, + datetime.strftime(datetime.now(),"%Y/%m/%d %H:%M:%S") + ) + log.exception(self.health) + break + + log.warning("Ceph configuration watcher stopped") + + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "k8sevents status", + "desc": "Show the status of the data gathering threads", + "perm": "r" + }, + { + "cmd": "k8sevents ls", + "desc": "List all current Kuberenetes events from the Ceph namespace", + "perm": "r" + }, + { + "cmd": "k8sevents ceph", + "desc": "List Ceph events tracked & sent to the kubernetes cluster", + "perm": "r" + }, + { + "cmd": "k8sevents set-access name=key,type=CephString", + "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt", + "perm": "rw" + }, + { + "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString", + "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433", + "perm": "rw" + }, + { + "cmd": "k8sevents clear-config", + "desc": "Clear external kubernetes configuration settings", + "perm": "rw" + }, + ] + MODULE_OPTIONS = [ + {'name': 'config_check_secs', + 'type': 'int', + 'default': 10, + 'min': 10, + 'desc': "interval (secs) to check for cluster configuration changes"}, + {'name': 'ceph_event_retention_days', + 'type': 'int', + 'default': 7, + 'desc': "Days to hold ceph event information within local cache"} + ] + + def __init__(self, *args, **kwargs): + self.run = True + self.kubernetes_control = 'POD_NAME' in os.environ + self.event_processor = None + self.config_watcher = None + self.ns_watcher = None + self.trackers = list() + self.error_msg = None + self._api_client_config = None + self._namespace = None + + # Declare the module options we accept + self.config_check_secs = None + self.ceph_event_retention_days = None + + self.k8s_config = dict( + cacrt = None, + token = None, + server = None, + namespace = None + ) + + super(Module, self).__init__(*args, **kwargs) + + def k8s_ready(self): + """Validate the k8s_config dict + + Returns: + - bool .... indicating whether the config is ready to use + - string .. variables that need to be defined before the module will function + + """ + missing = list() + ready = True + for k in self.k8s_config: + if not self.k8s_config[k]: + missing.append(k) + ready = False + return ready, missing + + def config_notify(self): + """Apply runtime module options, and defaults from the modules KV store""" + self.log.debug("applying runtime module option settings") + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name'])) + + if not self.kubernetes_control: + # Populate the config + self.log.debug("loading config from KV store") + for k in self.k8s_config: + self.k8s_config[k] = self.get_store(k, default=None) + + def fetch_events(self, limit=None): + """Interface to expose current events to another mgr module""" + # FUTURE: Implement this to provide k8s events to the dashboard? + raise NotImplementedError + + def process_clog(self, log_message): + """Add log message to the event queue + + :param log_message: dict from the cluster log (audit/cluster channels) + """ + required_fields = ['channel', 'message', 'priority', 'stamp'] + _message_attrs = log_message.keys() + if all(_field in _message_attrs for _field in required_fields): + self.log.debug("clog entry received - adding to the queue") + if log_message.get('message').startswith('overall HEALTH'): + m_type = 'heartbeat' + else: + m_type = log_message.get('channel') + + event_queue.put( + LogEntry( + source='log', + msg_type=m_type, + msg=log_message.get('message'), + level=log_message.get('priority')[1:-1], + tstamp=log_message.get('stamp') + ) + ) + + else: + self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) + + def notify(self, notify_type, notify_id): + """ + Called by the ceph-mgr service to notify the Python plugin + that new state is available. + + :param notify_type: string indicating what kind of notification, + such as osd_map, mon_map, fs_map, mon_status, + health, pg_summary, command, service_map + :param notify_id: string (may be empty) that optionally specifies + which entity is being notified about. With + "command" notifications this is set to the tag + ``from send_command``. + """ + + # only interested in cluster log (clog) messages for now + if notify_type == 'clog': + self.log.debug("received a clog entry from mgr.notify") + if isinstance(notify_id, dict): + # create a log object to process + self.process_clog(notify_id) + else: + self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type))) + + def _show_events(self, events): + + max_msg_length = max([len(events[k].message) for k in events]) + fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n" + s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name") + + for event_name in sorted(events, + key = lambda name: events[name].last_timestamp, + reverse=True): + + event = events[event_name] + + s += fmt.format( + datetime.strftime(event.last_timestamp,"%Y/%m/%d %H:%M:%S"), + str(event.type), + str(event.count), + str(event.message), + str(event_name) + ) + s += "Total : {:>3}\n".format(len(events)) + return s + + def show_events(self, events): + """Show events we're holding from the ceph namespace - most recent 1st""" + + if len(events): + return 0, "", self._show_events(events) + else: + return 0, "", "No events emitted yet, local cache is empty" + + def show_status(self): + s = "Kubernetes\n" + s += "- Hostname : {}\n".format(self.k8s_config['server']) + s += "- Namespace : {}\n".format(self._namespace) + s += "Tracker Health\n" + for t in self.trackers: + s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health) + s += "Tracked Events\n" + s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events)) + s += "- ceph events : {:>3}\n".format(len(self.event_processor.events)) + return 0, "", s + + def _valid_server(self, server): + # must be a valid server url format + server = server.strip() + + res = urlparse(server) + port = res.netloc.split(":")[-1] + + if res.scheme != 'https': + return False, "Server URL must use https" + + elif not res.hostname: + return False, "Invalid server URL format" + + elif res.hostname: + try: + socket.gethostbyname(res.hostname) + except socket.gaierror: + return False, "Unresolvable server URL" + + if not port.isdigit(): + return False, "Server URL must end in a port number" + + return True, "" + + def _valid_cacrt(self, cacrt_data): + """use mgr_util.verify_cacrt to validate the CA file""" + + cacrt_fname = create_temp_file("ca_file", cacrt_data) + + try: + verify_cacrt(cacrt_fname) + except ServerConfigException as e: + return False, "Invalid CA certificate: {}".format(str(e)) + else: + return True, "" + + def _valid_token(self, token_data): + """basic checks on the token""" + if not token_data: + return False, "Token file is empty" + + pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$") + if not pattern.match(token_data): + return False, "Token contains invalid characters" + + return True, "" + + def _valid_namespace(self, namespace): + # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols + + if len(namespace) > 253: + return False, "Name too long" + if namespace.isdigit(): + return False, "Invalid name - must be alphanumeric" + + pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$") + if not pattern.match(namespace): + return False, "Invalid characters in the name" + + return True, "" + + def _config_set(self, key, val): + """Attempt to validate the content, then save to KV store""" + + val = val.rstrip() # remove any trailing whitespace/newline + + try: + checker = getattr(self, "_valid_" + key) + except AttributeError: + # no checker available, just let it pass + self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key)) + valid = True + else: + valid, reason = checker(val) + + if valid: + self.set_store(key, val) + self.log.info("Updated config KV Store item: " + key) + return 0, "", "Config updated for parameter '{}'".format(key) + else: + return -22, "", "Invalid value for '{}' :{}".format(key, reason) + + def clear_config_settings(self): + for k in self.k8s_config: + self.set_store(k, None) + return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys())) + + def handle_command(self, inbuf, cmd): + + access_options = ['cacrt', 'token'] + config_options = ['server', 'namespace'] + + if cmd['prefix'] == 'k8sevents clear-config': + return self.clear_config_settings() + + if cmd['prefix'] == 'k8sevents set-access': + if cmd['key'] not in access_options: + return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options)) + + if inbuf: + return self._config_set(cmd['key'], inbuf) + else: + return -errno.EINVAL, "", "Command must specify -i <filename>" + + if cmd['prefix'] == 'k8sevents set-config': + + if cmd['key'] not in config_options: + return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options)) + + return self._config_set(cmd['key'], cmd['value']) + + # At this point the command is trying to interact with k8sevents, so intercept if the configuration is + # not ready + if self.error_msg: + _msg = "k8sevents unavailable: " + self.error_msg + ready, _ = self.k8s_ready() + if not self.kubernetes_control and not ready: + _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect" + return -errno.ENODATA, "", _msg + + if cmd["prefix"] == "k8sevents status": + return self.show_status() + + elif cmd["prefix"] == "k8sevents ls": + return self.show_events(self.ns_watcher.events) + + elif cmd["prefix"] == "k8sevents ceph": + return self.show_events(self.event_processor.events) + + else: + raise NotImplementedError(cmd["prefix"]) + + @staticmethod + def can_run(): + """Determine whether the pre-reqs for the module are in place""" + + if not kubernetes_imported: + return False, "kubernetes python client is not available" + return True, "" + + def load_kubernetes_config(self): + """Load configuration for remote kubernetes API using KV store values + + Attempt to create an API client configuration from settings stored in + KV store. + + Returns: + client.ApiClient: kubernetes API client object + + Raises: + OSError: unable to create the cacrt file + """ + + # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a + # temporary file containing the cert for the client to load from + try: + ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt']) + except OSError as e: + self.log.error("Unable to create file to hold cacrt: {}".format(str(e))) + raise OSError(str(e)) + else: + self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file)) + + configuration = client.Configuration() + configuration.host = self.k8s_config['server'] + configuration.ssl_ca_cert = ca_crt_file + configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] } + api_client = client.ApiClient(configuration) + self.log.info("API client created for remote kubernetes access using cacrt and token from KV store") + + return api_client + + def serve(self): + # apply options set by CLI to this module + self.config_notify() + + if not kubernetes_imported: + self.error_msg = "Unable to start : python kubernetes package is missing" + else: + if self.kubernetes_control: + # running under rook-ceph + config.load_incluster_config() + self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'), + os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN')) + self._api_client_config = None + self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph") + else: + # running outside of rook-ceph, so we need additional settings to tell us + # how to connect to the kubernetes cluster + ready, errors = self.k8s_ready() + if not ready: + self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors)) + else: + try: + self._api_client_config = self.load_kubernetes_config() + except OSError as e: + self.error_msg = str(e) + else: + self._namespace = self.k8s_config['namespace'] + self.log.info("k8sevents configuration loaded from KV store") + + if self.error_msg: + self.log.error(self.error_msg) + return + + # All checks have passed + self.config_watcher = CephConfigWatcher(self) + + self.event_processor = EventProcessor(self.config_watcher, + self.ceph_event_retention_days, + self._api_client_config, + self._namespace) + + self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config, + namespace=self._namespace) + + if self.event_processor.ok: + log.info("Ceph Log processor thread starting") + self.event_processor.start() # start log consumer thread + log.info("Ceph config watcher thread starting") + self.config_watcher.start() + log.info("Rook-ceph namespace events watcher starting") + self.ns_watcher.start() + + self.trackers.extend([self.event_processor, self.config_watcher, self.ns_watcher]) + + while True: + # stay alive + time.sleep(1) + + trackers = self.trackers + for t in trackers: + if not t.is_alive() and not t.reported: + log.error("K8sevents tracker thread '{}' stopped: {}".format(t.__class__.__name__, t.health)) + t.reported = True + + else: + self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?" + log.warning(self.error_msg) + log.warning("k8sevents module exiting") + self.run = False + + def shutdown(self): + self.run = False + log.info("Shutting down k8sevents module") + self.event_processor.can_run = False + + if self._rados: + self._rados.shutdown() |