diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/stats | |
parent | Initial commit. (diff) | |
download | ceph-upstream/16.2.11+ds.tar.xz ceph-upstream/16.2.11+ds.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/pybind/mgr/stats/__init__.py | 1 | ||||
-rw-r--r-- | src/pybind/mgr/stats/fs/__init__.py | 0 | ||||
-rw-r--r-- | src/pybind/mgr/stats/fs/perf_stats.py | 567 | ||||
-rw-r--r-- | src/pybind/mgr/stats/module.py | 41 |
4 files changed, 609 insertions, 0 deletions
diff --git a/src/pybind/mgr/stats/__init__.py b/src/pybind/mgr/stats/__init__.py new file mode 100644 index 000000000..8f210ac92 --- /dev/null +++ b/src/pybind/mgr/stats/__init__.py @@ -0,0 +1 @@ +from .module import Module diff --git a/src/pybind/mgr/stats/fs/__init__.py b/src/pybind/mgr/stats/fs/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/pybind/mgr/stats/fs/__init__.py diff --git a/src/pybind/mgr/stats/fs/perf_stats.py b/src/pybind/mgr/stats/fs/perf_stats.py new file mode 100644 index 000000000..596b3bc08 --- /dev/null +++ b/src/pybind/mgr/stats/fs/perf_stats.py @@ -0,0 +1,567 @@ +import re +import json +import time +import uuid +import errno +import traceback +import logging +from collections import OrderedDict +from typing import List, Dict, Set + +from mgr_module import CommandResult + +from datetime import datetime, timedelta +from threading import Lock, Condition, Thread, Timer +from ipaddress import ip_address + +PERF_STATS_VERSION = 2 + +QUERY_IDS = "query_ids" +GLOBAL_QUERY_ID = "global_query_id" +QUERY_LAST_REQUEST = "last_time_stamp" +QUERY_RAW_COUNTERS = "query_raw_counters" +QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global" + +MDS_RANK_ALL = (-1,) +CLIENT_ID_ALL = "\d*" +CLIENT_IP_ALL = ".*" + +fs_list = [] # type: List[str] + +MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$' +MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*' +MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0, + 'read_latency': 1, + 'write_latency': 2, + 'metadata_latency': 3, + 'dentry_lease': 4, + 'opened_files': 5, + 'pinned_icaps': 6, + 'opened_inodes': 7, + 'read_io_sizes': 8, + 'write_io_sizes': 9, + 'avg_read_latency': 10, + 'stdev_read_latency': 11, + 'avg_write_latency': 12, + 'stdev_write_latency': 13, + 'avg_metadata_latency': 14, + 'stdev_metadata_latency': 15}) +MDS_PERF_QUERY_COUNTERS = [] # type: List[str] +MDS_GLOBAL_PERF_QUERY_COUNTERS = list(MDS_PERF_QUERY_COUNTERS_MAP.keys()) + +QUERY_EXPIRE_INTERVAL = timedelta(minutes=1) +REREGISTER_TIMER_INTERVAL = 1 + +CLIENT_METADATA_KEY = "client_metadata" +CLIENT_METADATA_SUBKEYS = ["hostname", "root"] +CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"] + +NON_EXISTENT_KEY_STR = "N/A" + +logger = logging.getLogger(__name__) + +class FilterSpec(object): + """ + query filters encapsulated and used as key for query map + """ + def __init__(self, mds_ranks, client_id, client_ip): + self.mds_ranks = mds_ranks + self.client_id = client_id + self.client_ip = client_ip + + def __hash__(self): + return hash((self.mds_ranks, self.client_id, self.client_ip)) + + def __eq__(self, other): + return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip) + + def __ne__(self, other): + return not(self == other) + +def extract_mds_ranks_from_spec(mds_rank_spec): + if not mds_rank_spec: + return MDS_RANK_ALL + match = re.match(r'^\d+(,\d+)*$', mds_rank_spec) + if not match: + raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec)) + return tuple(int(mds_rank) for mds_rank in match.group(0).split(',')) + +def extract_client_id_from_spec(client_id_spec): + if not client_id_spec: + return CLIENT_ID_ALL + # the client id is the spec itself since it'll be a part + # of client filter regex. + if not client_id_spec.isdigit(): + raise ValueError('invalid client_id filter spec: {}'.format(client_id_spec)) + return client_id_spec + +def extract_client_ip_from_spec(client_ip_spec): + if not client_ip_spec: + return CLIENT_IP_ALL + + client_ip = client_ip_spec + if client_ip.startswith('v1:'): + client_ip = client_ip.replace('v1:', '') + elif client_ip.startswith('v2:'): + client_ip = client_ip.replace('v2:', '') + + try: + ip_address(client_ip) + return client_ip_spec + except ValueError: + raise ValueError('invalid client_ip filter spec: {}'.format(client_ip_spec)) + +def extract_mds_ranks_from_report(mds_ranks_str): + if not mds_ranks_str: + return [] + return [int(x) for x in mds_ranks_str.split(',')] + +def extract_client_id_and_ip(client): + match = re.match(r'^(client\.\d+)\s(.*)', client) + if match: + return match.group(1), match.group(2) + return None, None + +class FSPerfStats(object): + lock = Lock() + q_cv = Condition(lock) + r_cv = Condition(lock) + + user_queries = {} # type: Dict[str, Dict] + + meta_lock = Lock() + rqtimer = None + client_metadata = { + 'metadata' : {}, + 'to_purge' : set(), + 'in_progress' : {}, + } # type: Dict + + def __init__(self, module): + self.module = module + self.log = module.log + self.prev_rank0_gid = None + # report processor thread + self.report_processor = Thread(target=self.run) + self.report_processor.start() + + def set_client_metadata(self, fs_name, client_id, key, meta): + result = (self.client_metadata['metadata'].setdefault( + fs_name, {})).setdefault(client_id, {}) + if not key in result or not result[key] == meta: + result[key] = meta + + def notify_cmd(self, cmdtag): + self.log.debug("cmdtag={0}".format(cmdtag)) + with self.meta_lock: + try: + result = self.client_metadata['in_progress'].pop(cmdtag) + except KeyError: + self.log.warn(f"cmdtag {cmdtag} not found in client metadata") + return + fs_name = result[0] + client_meta = result[2].wait() + if client_meta[0] != 0: + self.log.warn("failed to fetch client metadata from gid {0}, err={1}".format( + result[1], client_meta[2])) + return + self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1]))) + for metadata in json.loads(client_meta[1]): + client_id = "client.{0}".format(metadata['id']) + result = (self.client_metadata['metadata'].setdefault(fs_name, {})).setdefault(client_id, {}) + for subkey in CLIENT_METADATA_SUBKEYS: + self.set_client_metadata(fs_name, client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey]) + for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL: + self.set_client_metadata(fs_name, client_id, subkey, + metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR)) + metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16) + supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)] + self.set_client_metadata(fs_name, client_id, "valid_metrics", supported_metrics) + kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None) + if kver: + self.set_client_metadata(fs_name, client_id, "kernel_version", kver) + # when all async requests are done, purge clients metadata if any. + if not self.client_metadata['in_progress']: + global fs_list + for fs_name in fs_list: + for client in self.client_metadata['to_purge']: + try: + if client in self.client_metadata['metadata'][fs_name]: + self.log.info("purge client metadata for {0}".format(client)) + self.client_metadata['metadata'][fs_name].pop(client) + except: + pass + if fs_name in self.client_metadata['metadata'] and not bool(self.client_metadata['metadata'][fs_name]): + self.client_metadata['metadata'].pop(fs_name) + self.client_metadata['to_purge'].clear() + self.log.debug("client_metadata={0}, to_purge={1}".format( + self.client_metadata['metadata'], self.client_metadata['to_purge'])) + + def notify_fsmap(self): + #Reregister the user queries when there is a new rank0 mds + with self.lock: + gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map')) + if not gid_state: + return + for value in gid_state: + rank0_gid, state = value + if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'): + #the new rank0 MDS is up:active + ua_last_updated = time.monotonic() + if (self.rqtimer and self.rqtimer.is_alive()): + self.rqtimer.cancel() + self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL, + self.re_register_queries, + args=(rank0_gid, ua_last_updated,)) + self.rqtimer.start() + + def re_register_queries(self, rank0_gid, ua_last_updated): + #reregister queries if the metrics are the latest. Otherwise reschedule the timer and + #wait for the empty metrics + with self.lock: + if self.mx_last_updated >= ua_last_updated: + self.log.debug("reregistering queries...") + self.module.reregister_mds_perf_queries() + self.prev_rank0_gid = rank0_gid + else: + #reschedule the timer + self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL, + self.re_register_queries, args=(rank0_gid, ua_last_updated,)) + self.rqtimer.start() + + @staticmethod + def get_rank0_mds_gid_state(fsmap): + gid_state = [] + for fs in fsmap['filesystems']: + mds_map = fs['mdsmap'] + if mds_map is not None: + for mds_id, mds_status in mds_map['info'].items(): + if mds_status['rank'] == 0: + gid_state.append([mds_status['gid'], mds_status['state']]) + if gid_state: + return gid_state + logger.warn("No rank0 mds in the fsmap") + + def update_client_meta(self): + new_updates = {} + pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()] + global fs_list + fs_list.clear() + with self.meta_lock: + fsmap = self.module.get('fs_map') + for fs in fsmap['filesystems']: + mds_map = fs['mdsmap'] + if mds_map is not None: + fsname = mds_map['fs_name'] + for mds_id, mds_status in mds_map['info'].items(): + if mds_status['rank'] == 0: + fs_list.append(fsname) + rank0_gid = mds_status['gid'] + tag = str(uuid.uuid4()) + result = CommandResult(tag) + new_updates[tag] = (fsname, rank0_gid, result) + self.client_metadata['in_progress'].update(new_updates) + + self.log.debug(f"updating client metadata from {new_updates}") + + cmd_dict = {'prefix': 'client ls'} + for tag,val in new_updates.items(): + self.module.send_command(val[2], "mds", str(val[1]), json.dumps(cmd_dict), tag) + + def run(self): + try: + self.log.info("FSPerfStats::report_processor starting...") + while True: + with self.lock: + self.scrub_expired_queries() + self.process_mds_reports() + self.r_cv.notify() + + stats_period = int(self.module.get_ceph_option("mgr_stats_period")) + self.q_cv.wait(stats_period) + self.log.debug("FSPerfStats::tick") + except Exception as e: + self.log.fatal("fatal error: {}".format(traceback.format_exc())) + + def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients): + # this is pretty straight forward -- find what MDSs are missing from + # what is tracked vs what we received in incoming report and purge + # the whole bunch. + tracked_ranks = raw_perf_counters.keys() + available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics] + for rank in set(tracked_ranks) - set(available_ranks): + culled = raw_perf_counters.pop(rank) + self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format( + len(culled[1]), rank, "yes" if culled[0] else "no")) + missing_clients.update(list(culled[1].keys())) + + def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients): + # this is a bit more involed -- for each rank figure out what clients + # are missing in incoming report and purge them from our tracked map. + # but, if this is invoked after cull_mds_entries(), the rank set + # is same, so we can loop based on that assumption. + ranks = raw_perf_counters.keys() + for rank in ranks: + tracked_clients = raw_perf_counters[rank][1].keys() + available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics] + for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]): + raw_perf_counters[rank][1].pop(client) + self.log.info("culled {0} from rank {1}".format(client, rank)) + missing_clients.add(client) + + def cull_missing_entries(self, raw_perf_counters, incoming_metrics): + missing_clients = set() # type: Set[str] + self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients) + self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients) + + self.log.debug("missing_clients={0}".format(missing_clients)) + with self.meta_lock: + if self.client_metadata['in_progress']: + self.client_metadata['to_purge'].update(missing_clients) + self.log.info("deferring client metadata purge (now {0} client(s))".format( + len(self.client_metadata['to_purge']))) + else: + global fs_list + for fs_name in fs_list: + for client in missing_clients: + try: + self.log.info("purge client metadata for {0}".format(client)) + if client in self.client_metadata['metadata'][fs_name]: + self.client_metadata['metadata'][fs_name].pop(client) + except KeyError: + pass + self.log.debug("client_metadata={0}".format(self.client_metadata['metadata'])) + + def cull_global_metrics(self, raw_perf_counters, incoming_metrics): + tracked_clients = raw_perf_counters.keys() + available_clients = [counter['k'][0][0] for counter in incoming_metrics] + for client in set(tracked_clients) - set(available_clients): + raw_perf_counters.pop(client) + + def get_raw_perf_counters(self, query): + raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {}) + + for query_id in query[QUERY_IDS]: + result = self.module.get_mds_perf_counters(query_id) + self.log.debug("raw_perf_counters={}".format(raw_perf_counters)) + self.log.debug("get_raw_perf_counters={}".format(result)) + + # extract passed in delayed ranks. metrics for delayed ranks are tagged + # as stale. + delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0]) + + # what's received from MDS + incoming_metrics = result['metrics'][1] + + # metrics updated (monotonic) time + self.mx_last_updated = result['metrics'][2][0] + + # cull missing MDSs and clients + self.cull_missing_entries(raw_perf_counters, incoming_metrics) + + # iterate over metrics list and update our copy (note that we have + # already culled the differences). + global fs_list + for fs_name in fs_list: + for counter in incoming_metrics: + mds_rank = int(counter['k'][0][0]) + client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0]) + if self.client_metadata['metadata'].get(fs_name): + if (client_id is not None or not client_ip) and\ + self.client_metadata["metadata"][fs_name].get(client_id): # client_id _could_ be 0 + with self.meta_lock: + self.set_client_metadata(fs_name, client_id, "IP", client_ip) + else: + self.log.warn(f"client metadata for client_id={client_id} might be unavailable") + else: + self.log.warn(f"client metadata for filesystem={fs_name} might be unavailable") + + raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}]) + raw_counters[0] = True if mds_rank in delayed_ranks else False + raw_client_counters = raw_counters[1].setdefault(client_id, []) + + del raw_client_counters[:] + raw_client_counters.extend(counter['c']) + # send an asynchronous client metadata refresh + self.update_client_meta() + + def get_raw_perf_counters_global(self, query): + raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) + result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID]) + + self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters)) + self.log.debug("get_raw_perf_counters_global={}".format(result)) + + global_metrics = result['metrics'][1] + self.cull_global_metrics(raw_perf_counters, global_metrics) + for counter in global_metrics: + client_id, _ = extract_client_id_and_ip(counter['k'][0][0]) + raw_client_counters = raw_perf_counters.setdefault(client_id, []) + del raw_client_counters[:] + raw_client_counters.extend(counter['c']) + + def process_mds_reports(self): + for query in self.user_queries.values(): + self.get_raw_perf_counters(query) + self.get_raw_perf_counters_global(query) + + def scrub_expired_queries(self): + expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL + for filter_spec in list(self.user_queries.keys()): + user_query = self.user_queries[filter_spec] + self.log.debug("scrubbing query={}".format(user_query)) + if user_query[QUERY_LAST_REQUEST] < expire_time: + expired_query_ids = user_query[QUERY_IDS].copy() + expired_query_ids.append(user_query[GLOBAL_QUERY_ID]) + self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids)) + self.unregister_mds_perf_queries(filter_spec, expired_query_ids) + del self.user_queries[filter_spec] + + def prepare_mds_perf_query(self, rank, client_id, client_ip): + mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS + if not rank == -1: + mds_rank_regex = '^({})$'.format(rank) + client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) + return { + 'key_descriptor' : [ + {'type' : 'mds_rank', 'regex' : mds_rank_regex}, + {'type' : 'client_id', 'regex' : client_regex}, + ], + 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS, + } + + def prepare_global_perf_query(self, client_id, client_ip): + client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip) + return { + 'key_descriptor' : [ + {'type' : 'client_id', 'regex' : client_regex}, + ], + 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS, + } + + def unregister_mds_perf_queries(self, filter_spec, query_ids): + self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format( + filter_spec, query_ids)) + for query_id in query_ids: + self.module.remove_mds_perf_query(query_id) + + def register_mds_perf_query(self, filter_spec): + mds_ranks = filter_spec.mds_ranks + client_id = filter_spec.client_id + client_ip = filter_spec.client_ip + + query_ids = [] + try: + # register per-mds perf query + for rank in mds_ranks: + query = self.prepare_mds_perf_query(rank, client_id, client_ip) + self.log.info("register_mds_perf_query: {}".format(query)) + + query_id = self.module.add_mds_perf_query(query) + if query_id is None: # query id can be 0 + raise RuntimeError("failed to add MDS perf query: {}".format(query)) + query_ids.append(query_id) + except Exception: + for query_id in query_ids: + self.module.remove_mds_perf_query(query_id) + raise + return query_ids + + def register_global_perf_query(self, filter_spec): + client_id = filter_spec.client_id + client_ip = filter_spec.client_ip + + # register a global perf query for metrics + query = self.prepare_global_perf_query(client_id, client_ip) + self.log.info("register_global_perf_query: {}".format(query)) + + query_id = self.module.add_mds_perf_query(query) + if query_id is None: # query id can be 0 + raise RuntimeError("failed to add global perf query: {}".format(query)) + return query_id + + def register_query(self, filter_spec): + user_query = self.user_queries.get(filter_spec, None) + if not user_query: + user_query = { + QUERY_IDS : self.register_mds_perf_query(filter_spec), + GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec), + QUERY_LAST_REQUEST : datetime.now(), + } + self.user_queries[filter_spec] = user_query + + self.q_cv.notify() + self.r_cv.wait(5) + else: + user_query[QUERY_LAST_REQUEST] = datetime.now() + return user_query + + def generate_report(self, user_query): + result = {} # type: Dict + global fs_list + # start with counter info -- metrics that are global and per mds + result["version"] = PERF_STATS_VERSION + result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS + result["counters"] = MDS_PERF_QUERY_COUNTERS + + # fill in client metadata + raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {}) + raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {}) + with self.meta_lock: + raw_counters_clients = [] + for val in raw_perfs.values(): + raw_counters_clients.extend(list(val[1])) + result_meta = result.setdefault("client_metadata", {}) + for fs_name in fs_list: + meta = self.client_metadata["metadata"] + if fs_name in meta and len(meta[fs_name]): + for client_id in raw_perfs_global.keys(): + if client_id in meta[fs_name] and client_id in raw_counters_clients: + client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {}) + client_meta.update(meta[fs_name][client_id]) + + # start populating global perf metrics w/ client metadata + metrics = result.setdefault("global_metrics", {}) + for fs_name in fs_list: + if fs_name in meta and len(meta[fs_name]): + for client_id, counters in raw_perfs_global.items(): + if client_id in meta[fs_name] and client_id in raw_counters_clients: + global_client_metrics = (metrics.setdefault(fs_name, {})).setdefault(client_id, []) + del global_client_metrics[:] + global_client_metrics.extend(counters) + + # and, now per-mds metrics keyed by mds rank along with delayed ranks + metrics = result.setdefault("metrics", {}) + + metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]] + for rank, counters in raw_perfs.items(): + mds_key = "mds.{}".format(rank) + mds_metrics = metrics.setdefault(mds_key, {}) + mds_metrics.update(counters[1]) + return result + + def extract_query_filters(self, cmd): + mds_rank_spec = cmd.get('mds_rank', None) + client_id_spec = cmd.get('client_id', None) + client_ip_spec = cmd.get('client_ip', None) + + self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format( + mds_rank_spec, client_id_spec, client_ip_spec)) + + mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec) + client_id = extract_client_id_from_spec(client_id_spec) + client_ip = extract_client_ip_from_spec(client_ip_spec) + + return FilterSpec(mds_ranks, client_id, client_ip) + + def get_perf_data(self, cmd): + try: + filter_spec = self.extract_query_filters(cmd) + except ValueError as e: + return -errno.EINVAL, "", str(e) + + counters = {} + with self.lock: + user_query = self.register_query(filter_spec) + result = self.generate_report(user_query) + return 0, json.dumps(result), "" diff --git a/src/pybind/mgr/stats/module.py b/src/pybind/mgr/stats/module.py new file mode 100644 index 000000000..fcc1bce97 --- /dev/null +++ b/src/pybind/mgr/stats/module.py @@ -0,0 +1,41 @@ +""" +performance stats for ceph filesystem (for now...) +""" + +import json +from typing import List, Dict + +from mgr_module import MgrModule, Option, NotifyType + +from .fs.perf_stats import FSPerfStats + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "fs perf stats " + "name=mds_rank,type=CephString,req=false " + "name=client_id,type=CephString,req=false " + "name=client_ip,type=CephString,req=false ", + "desc": "retrieve ceph fs performance stats", + "perm": "r" + }, + ] + MODULE_OPTIONS: List[Option] = [] + NOTIFY_TYPES = [NotifyType.command, NotifyType.fs_map] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.fs_perf_stats = FSPerfStats(self) + + def notify(self, notify_type: NotifyType, notify_id): + if notify_type == NotifyType.command: + self.fs_perf_stats.notify_cmd(notify_id) + elif notify_type == NotifyType.fs_map: + self.fs_perf_stats.notify_fsmap() + + def handle_command(self, inbuf, cmd): + prefix = cmd['prefix'] + # only supported command is `fs perf stats` right now + if prefix.startswith('fs perf stats'): + return self.fs_perf_stats.get_perf_data(cmd) + raise NotImplementedError(cmd['prefix']) |