summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/stats
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/pybind/mgr/stats
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/pybind/mgr/stats/__init__.py1
-rw-r--r--src/pybind/mgr/stats/fs/__init__.py0
-rw-r--r--src/pybind/mgr/stats/fs/perf_stats.py567
-rw-r--r--src/pybind/mgr/stats/module.py41
4 files changed, 609 insertions, 0 deletions
diff --git a/src/pybind/mgr/stats/__init__.py b/src/pybind/mgr/stats/__init__.py
new file mode 100644
index 000000000..8f210ac92
--- /dev/null
+++ b/src/pybind/mgr/stats/__init__.py
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/stats/fs/__init__.py b/src/pybind/mgr/stats/fs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/pybind/mgr/stats/fs/__init__.py
diff --git a/src/pybind/mgr/stats/fs/perf_stats.py b/src/pybind/mgr/stats/fs/perf_stats.py
new file mode 100644
index 000000000..596b3bc08
--- /dev/null
+++ b/src/pybind/mgr/stats/fs/perf_stats.py
@@ -0,0 +1,567 @@
+import re
+import json
+import time
+import uuid
+import errno
+import traceback
+import logging
+from collections import OrderedDict
+from typing import List, Dict, Set
+
+from mgr_module import CommandResult
+
+from datetime import datetime, timedelta
+from threading import Lock, Condition, Thread, Timer
+from ipaddress import ip_address
+
+PERF_STATS_VERSION = 2
+
+QUERY_IDS = "query_ids"
+GLOBAL_QUERY_ID = "global_query_id"
+QUERY_LAST_REQUEST = "last_time_stamp"
+QUERY_RAW_COUNTERS = "query_raw_counters"
+QUERY_RAW_COUNTERS_GLOBAL = "query_raw_counters_global"
+
+MDS_RANK_ALL = (-1,)
+CLIENT_ID_ALL = "\d*"
+CLIENT_IP_ALL = ".*"
+
+fs_list = [] # type: List[str]
+
+MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS = '^(.*)$'
+MDS_PERF_QUERY_REGEX_MATCH_CLIENTS = '^(client.{0}\s+{1}):.*'
+MDS_PERF_QUERY_COUNTERS_MAP = OrderedDict({'cap_hit': 0,
+ 'read_latency': 1,
+ 'write_latency': 2,
+ 'metadata_latency': 3,
+ 'dentry_lease': 4,
+ 'opened_files': 5,
+ 'pinned_icaps': 6,
+ 'opened_inodes': 7,
+ 'read_io_sizes': 8,
+ 'write_io_sizes': 9,
+ 'avg_read_latency': 10,
+ 'stdev_read_latency': 11,
+ 'avg_write_latency': 12,
+ 'stdev_write_latency': 13,
+ 'avg_metadata_latency': 14,
+ 'stdev_metadata_latency': 15})
+MDS_PERF_QUERY_COUNTERS = [] # type: List[str]
+MDS_GLOBAL_PERF_QUERY_COUNTERS = list(MDS_PERF_QUERY_COUNTERS_MAP.keys())
+
+QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
+REREGISTER_TIMER_INTERVAL = 1
+
+CLIENT_METADATA_KEY = "client_metadata"
+CLIENT_METADATA_SUBKEYS = ["hostname", "root"]
+CLIENT_METADATA_SUBKEYS_OPTIONAL = ["mount_point"]
+
+NON_EXISTENT_KEY_STR = "N/A"
+
+logger = logging.getLogger(__name__)
+
+class FilterSpec(object):
+ """
+ query filters encapsulated and used as key for query map
+ """
+ def __init__(self, mds_ranks, client_id, client_ip):
+ self.mds_ranks = mds_ranks
+ self.client_id = client_id
+ self.client_ip = client_ip
+
+ def __hash__(self):
+ return hash((self.mds_ranks, self.client_id, self.client_ip))
+
+ def __eq__(self, other):
+ return (self.mds_ranks, self.client_id, self.client_ip) == (other.mds_ranks, other.client_id, self.client_ip)
+
+ def __ne__(self, other):
+ return not(self == other)
+
+def extract_mds_ranks_from_spec(mds_rank_spec):
+ if not mds_rank_spec:
+ return MDS_RANK_ALL
+ match = re.match(r'^\d+(,\d+)*$', mds_rank_spec)
+ if not match:
+ raise ValueError("invalid mds filter spec: {}".format(mds_rank_spec))
+ return tuple(int(mds_rank) for mds_rank in match.group(0).split(','))
+
+def extract_client_id_from_spec(client_id_spec):
+ if not client_id_spec:
+ return CLIENT_ID_ALL
+ # the client id is the spec itself since it'll be a part
+ # of client filter regex.
+ if not client_id_spec.isdigit():
+ raise ValueError('invalid client_id filter spec: {}'.format(client_id_spec))
+ return client_id_spec
+
+def extract_client_ip_from_spec(client_ip_spec):
+ if not client_ip_spec:
+ return CLIENT_IP_ALL
+
+ client_ip = client_ip_spec
+ if client_ip.startswith('v1:'):
+ client_ip = client_ip.replace('v1:', '')
+ elif client_ip.startswith('v2:'):
+ client_ip = client_ip.replace('v2:', '')
+
+ try:
+ ip_address(client_ip)
+ return client_ip_spec
+ except ValueError:
+ raise ValueError('invalid client_ip filter spec: {}'.format(client_ip_spec))
+
+def extract_mds_ranks_from_report(mds_ranks_str):
+ if not mds_ranks_str:
+ return []
+ return [int(x) for x in mds_ranks_str.split(',')]
+
+def extract_client_id_and_ip(client):
+ match = re.match(r'^(client\.\d+)\s(.*)', client)
+ if match:
+ return match.group(1), match.group(2)
+ return None, None
+
+class FSPerfStats(object):
+ lock = Lock()
+ q_cv = Condition(lock)
+ r_cv = Condition(lock)
+
+ user_queries = {} # type: Dict[str, Dict]
+
+ meta_lock = Lock()
+ rqtimer = None
+ client_metadata = {
+ 'metadata' : {},
+ 'to_purge' : set(),
+ 'in_progress' : {},
+ } # type: Dict
+
+ def __init__(self, module):
+ self.module = module
+ self.log = module.log
+ self.prev_rank0_gid = None
+ # report processor thread
+ self.report_processor = Thread(target=self.run)
+ self.report_processor.start()
+
+ def set_client_metadata(self, fs_name, client_id, key, meta):
+ result = (self.client_metadata['metadata'].setdefault(
+ fs_name, {})).setdefault(client_id, {})
+ if not key in result or not result[key] == meta:
+ result[key] = meta
+
+ def notify_cmd(self, cmdtag):
+ self.log.debug("cmdtag={0}".format(cmdtag))
+ with self.meta_lock:
+ try:
+ result = self.client_metadata['in_progress'].pop(cmdtag)
+ except KeyError:
+ self.log.warn(f"cmdtag {cmdtag} not found in client metadata")
+ return
+ fs_name = result[0]
+ client_meta = result[2].wait()
+ if client_meta[0] != 0:
+ self.log.warn("failed to fetch client metadata from gid {0}, err={1}".format(
+ result[1], client_meta[2]))
+ return
+ self.log.debug("notify: client metadata={0}".format(json.loads(client_meta[1])))
+ for metadata in json.loads(client_meta[1]):
+ client_id = "client.{0}".format(metadata['id'])
+ result = (self.client_metadata['metadata'].setdefault(fs_name, {})).setdefault(client_id, {})
+ for subkey in CLIENT_METADATA_SUBKEYS:
+ self.set_client_metadata(fs_name, client_id, subkey, metadata[CLIENT_METADATA_KEY][subkey])
+ for subkey in CLIENT_METADATA_SUBKEYS_OPTIONAL:
+ self.set_client_metadata(fs_name, client_id, subkey,
+ metadata[CLIENT_METADATA_KEY].get(subkey, NON_EXISTENT_KEY_STR))
+ metric_features = int(metadata[CLIENT_METADATA_KEY]["metric_spec"]["metric_flags"]["feature_bits"], 16)
+ supported_metrics = [metric for metric, bit in MDS_PERF_QUERY_COUNTERS_MAP.items() if metric_features & (1 << bit)]
+ self.set_client_metadata(fs_name, client_id, "valid_metrics", supported_metrics)
+ kver = metadata[CLIENT_METADATA_KEY].get("kernel_version", None)
+ if kver:
+ self.set_client_metadata(fs_name, client_id, "kernel_version", kver)
+ # when all async requests are done, purge clients metadata if any.
+ if not self.client_metadata['in_progress']:
+ global fs_list
+ for fs_name in fs_list:
+ for client in self.client_metadata['to_purge']:
+ try:
+ if client in self.client_metadata['metadata'][fs_name]:
+ self.log.info("purge client metadata for {0}".format(client))
+ self.client_metadata['metadata'][fs_name].pop(client)
+ except:
+ pass
+ if fs_name in self.client_metadata['metadata'] and not bool(self.client_metadata['metadata'][fs_name]):
+ self.client_metadata['metadata'].pop(fs_name)
+ self.client_metadata['to_purge'].clear()
+ self.log.debug("client_metadata={0}, to_purge={1}".format(
+ self.client_metadata['metadata'], self.client_metadata['to_purge']))
+
+ def notify_fsmap(self):
+ #Reregister the user queries when there is a new rank0 mds
+ with self.lock:
+ gid_state = FSPerfStats.get_rank0_mds_gid_state(self.module.get('fs_map'))
+ if not gid_state:
+ return
+ for value in gid_state:
+ rank0_gid, state = value
+ if (rank0_gid and rank0_gid != self.prev_rank0_gid and state == 'up:active'):
+ #the new rank0 MDS is up:active
+ ua_last_updated = time.monotonic()
+ if (self.rqtimer and self.rqtimer.is_alive()):
+ self.rqtimer.cancel()
+ self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+ self.re_register_queries,
+ args=(rank0_gid, ua_last_updated,))
+ self.rqtimer.start()
+
+ def re_register_queries(self, rank0_gid, ua_last_updated):
+ #reregister queries if the metrics are the latest. Otherwise reschedule the timer and
+ #wait for the empty metrics
+ with self.lock:
+ if self.mx_last_updated >= ua_last_updated:
+ self.log.debug("reregistering queries...")
+ self.module.reregister_mds_perf_queries()
+ self.prev_rank0_gid = rank0_gid
+ else:
+ #reschedule the timer
+ self.rqtimer = Timer(REREGISTER_TIMER_INTERVAL,
+ self.re_register_queries, args=(rank0_gid, ua_last_updated,))
+ self.rqtimer.start()
+
+ @staticmethod
+ def get_rank0_mds_gid_state(fsmap):
+ gid_state = []
+ for fs in fsmap['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['rank'] == 0:
+ gid_state.append([mds_status['gid'], mds_status['state']])
+ if gid_state:
+ return gid_state
+ logger.warn("No rank0 mds in the fsmap")
+
+ def update_client_meta(self):
+ new_updates = {}
+ pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
+ global fs_list
+ fs_list.clear()
+ with self.meta_lock:
+ fsmap = self.module.get('fs_map')
+ for fs in fsmap['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ fsname = mds_map['fs_name']
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['rank'] == 0:
+ fs_list.append(fsname)
+ rank0_gid = mds_status['gid']
+ tag = str(uuid.uuid4())
+ result = CommandResult(tag)
+ new_updates[tag] = (fsname, rank0_gid, result)
+ self.client_metadata['in_progress'].update(new_updates)
+
+ self.log.debug(f"updating client metadata from {new_updates}")
+
+ cmd_dict = {'prefix': 'client ls'}
+ for tag,val in new_updates.items():
+ self.module.send_command(val[2], "mds", str(val[1]), json.dumps(cmd_dict), tag)
+
+ def run(self):
+ try:
+ self.log.info("FSPerfStats::report_processor starting...")
+ while True:
+ with self.lock:
+ self.scrub_expired_queries()
+ self.process_mds_reports()
+ self.r_cv.notify()
+
+ stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
+ self.q_cv.wait(stats_period)
+ self.log.debug("FSPerfStats::tick")
+ except Exception as e:
+ self.log.fatal("fatal error: {}".format(traceback.format_exc()))
+
+ def cull_mds_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
+ # this is pretty straight forward -- find what MDSs are missing from
+ # what is tracked vs what we received in incoming report and purge
+ # the whole bunch.
+ tracked_ranks = raw_perf_counters.keys()
+ available_ranks = [int(counter['k'][0][0]) for counter in incoming_metrics]
+ for rank in set(tracked_ranks) - set(available_ranks):
+ culled = raw_perf_counters.pop(rank)
+ self.log.info("culled {0} client entries from rank {1} (laggy: {2})".format(
+ len(culled[1]), rank, "yes" if culled[0] else "no"))
+ missing_clients.update(list(culled[1].keys()))
+
+ def cull_client_entries(self, raw_perf_counters, incoming_metrics, missing_clients):
+ # this is a bit more involed -- for each rank figure out what clients
+ # are missing in incoming report and purge them from our tracked map.
+ # but, if this is invoked after cull_mds_entries(), the rank set
+ # is same, so we can loop based on that assumption.
+ ranks = raw_perf_counters.keys()
+ for rank in ranks:
+ tracked_clients = raw_perf_counters[rank][1].keys()
+ available_clients = [extract_client_id_and_ip(counter['k'][1][0]) for counter in incoming_metrics]
+ for client in set(tracked_clients) - set([c[0] for c in available_clients if c[0] is not None]):
+ raw_perf_counters[rank][1].pop(client)
+ self.log.info("culled {0} from rank {1}".format(client, rank))
+ missing_clients.add(client)
+
+ def cull_missing_entries(self, raw_perf_counters, incoming_metrics):
+ missing_clients = set() # type: Set[str]
+ self.cull_mds_entries(raw_perf_counters, incoming_metrics, missing_clients)
+ self.cull_client_entries(raw_perf_counters, incoming_metrics, missing_clients)
+
+ self.log.debug("missing_clients={0}".format(missing_clients))
+ with self.meta_lock:
+ if self.client_metadata['in_progress']:
+ self.client_metadata['to_purge'].update(missing_clients)
+ self.log.info("deferring client metadata purge (now {0} client(s))".format(
+ len(self.client_metadata['to_purge'])))
+ else:
+ global fs_list
+ for fs_name in fs_list:
+ for client in missing_clients:
+ try:
+ self.log.info("purge client metadata for {0}".format(client))
+ if client in self.client_metadata['metadata'][fs_name]:
+ self.client_metadata['metadata'][fs_name].pop(client)
+ except KeyError:
+ pass
+ self.log.debug("client_metadata={0}".format(self.client_metadata['metadata']))
+
+ def cull_global_metrics(self, raw_perf_counters, incoming_metrics):
+ tracked_clients = raw_perf_counters.keys()
+ available_clients = [counter['k'][0][0] for counter in incoming_metrics]
+ for client in set(tracked_clients) - set(available_clients):
+ raw_perf_counters.pop(client)
+
+ def get_raw_perf_counters(self, query):
+ raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS, {})
+
+ for query_id in query[QUERY_IDS]:
+ result = self.module.get_mds_perf_counters(query_id)
+ self.log.debug("raw_perf_counters={}".format(raw_perf_counters))
+ self.log.debug("get_raw_perf_counters={}".format(result))
+
+ # extract passed in delayed ranks. metrics for delayed ranks are tagged
+ # as stale.
+ delayed_ranks = extract_mds_ranks_from_report(result['metrics'][0][0])
+
+ # what's received from MDS
+ incoming_metrics = result['metrics'][1]
+
+ # metrics updated (monotonic) time
+ self.mx_last_updated = result['metrics'][2][0]
+
+ # cull missing MDSs and clients
+ self.cull_missing_entries(raw_perf_counters, incoming_metrics)
+
+ # iterate over metrics list and update our copy (note that we have
+ # already culled the differences).
+ global fs_list
+ for fs_name in fs_list:
+ for counter in incoming_metrics:
+ mds_rank = int(counter['k'][0][0])
+ client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
+ if self.client_metadata['metadata'].get(fs_name):
+ if (client_id is not None or not client_ip) and\
+ self.client_metadata["metadata"][fs_name].get(client_id): # client_id _could_ be 0
+ with self.meta_lock:
+ self.set_client_metadata(fs_name, client_id, "IP", client_ip)
+ else:
+ self.log.warn(f"client metadata for client_id={client_id} might be unavailable")
+ else:
+ self.log.warn(f"client metadata for filesystem={fs_name} might be unavailable")
+
+ raw_counters = raw_perf_counters.setdefault(mds_rank, [False, {}])
+ raw_counters[0] = True if mds_rank in delayed_ranks else False
+ raw_client_counters = raw_counters[1].setdefault(client_id, [])
+
+ del raw_client_counters[:]
+ raw_client_counters.extend(counter['c'])
+ # send an asynchronous client metadata refresh
+ self.update_client_meta()
+
+ def get_raw_perf_counters_global(self, query):
+ raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
+ result = self.module.get_mds_perf_counters(query[GLOBAL_QUERY_ID])
+
+ self.log.debug("raw_perf_counters_global={}".format(raw_perf_counters))
+ self.log.debug("get_raw_perf_counters_global={}".format(result))
+
+ global_metrics = result['metrics'][1]
+ self.cull_global_metrics(raw_perf_counters, global_metrics)
+ for counter in global_metrics:
+ client_id, _ = extract_client_id_and_ip(counter['k'][0][0])
+ raw_client_counters = raw_perf_counters.setdefault(client_id, [])
+ del raw_client_counters[:]
+ raw_client_counters.extend(counter['c'])
+
+ def process_mds_reports(self):
+ for query in self.user_queries.values():
+ self.get_raw_perf_counters(query)
+ self.get_raw_perf_counters_global(query)
+
+ def scrub_expired_queries(self):
+ expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
+ for filter_spec in list(self.user_queries.keys()):
+ user_query = self.user_queries[filter_spec]
+ self.log.debug("scrubbing query={}".format(user_query))
+ if user_query[QUERY_LAST_REQUEST] < expire_time:
+ expired_query_ids = user_query[QUERY_IDS].copy()
+ expired_query_ids.append(user_query[GLOBAL_QUERY_ID])
+ self.log.debug("unregistering query={} ids={}".format(user_query, expired_query_ids))
+ self.unregister_mds_perf_queries(filter_spec, expired_query_ids)
+ del self.user_queries[filter_spec]
+
+ def prepare_mds_perf_query(self, rank, client_id, client_ip):
+ mds_rank_regex = MDS_PERF_QUERY_REGEX_MATCH_ALL_RANKS
+ if not rank == -1:
+ mds_rank_regex = '^({})$'.format(rank)
+ client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
+ return {
+ 'key_descriptor' : [
+ {'type' : 'mds_rank', 'regex' : mds_rank_regex},
+ {'type' : 'client_id', 'regex' : client_regex},
+ ],
+ 'performance_counter_descriptors' : MDS_PERF_QUERY_COUNTERS,
+ }
+
+ def prepare_global_perf_query(self, client_id, client_ip):
+ client_regex = MDS_PERF_QUERY_REGEX_MATCH_CLIENTS.format(client_id, client_ip)
+ return {
+ 'key_descriptor' : [
+ {'type' : 'client_id', 'regex' : client_regex},
+ ],
+ 'performance_counter_descriptors' : MDS_GLOBAL_PERF_QUERY_COUNTERS,
+ }
+
+ def unregister_mds_perf_queries(self, filter_spec, query_ids):
+ self.log.info("unregister_mds_perf_queries: filter_spec={0}, query_id={1}".format(
+ filter_spec, query_ids))
+ for query_id in query_ids:
+ self.module.remove_mds_perf_query(query_id)
+
+ def register_mds_perf_query(self, filter_spec):
+ mds_ranks = filter_spec.mds_ranks
+ client_id = filter_spec.client_id
+ client_ip = filter_spec.client_ip
+
+ query_ids = []
+ try:
+ # register per-mds perf query
+ for rank in mds_ranks:
+ query = self.prepare_mds_perf_query(rank, client_id, client_ip)
+ self.log.info("register_mds_perf_query: {}".format(query))
+
+ query_id = self.module.add_mds_perf_query(query)
+ if query_id is None: # query id can be 0
+ raise RuntimeError("failed to add MDS perf query: {}".format(query))
+ query_ids.append(query_id)
+ except Exception:
+ for query_id in query_ids:
+ self.module.remove_mds_perf_query(query_id)
+ raise
+ return query_ids
+
+ def register_global_perf_query(self, filter_spec):
+ client_id = filter_spec.client_id
+ client_ip = filter_spec.client_ip
+
+ # register a global perf query for metrics
+ query = self.prepare_global_perf_query(client_id, client_ip)
+ self.log.info("register_global_perf_query: {}".format(query))
+
+ query_id = self.module.add_mds_perf_query(query)
+ if query_id is None: # query id can be 0
+ raise RuntimeError("failed to add global perf query: {}".format(query))
+ return query_id
+
+ def register_query(self, filter_spec):
+ user_query = self.user_queries.get(filter_spec, None)
+ if not user_query:
+ user_query = {
+ QUERY_IDS : self.register_mds_perf_query(filter_spec),
+ GLOBAL_QUERY_ID : self.register_global_perf_query(filter_spec),
+ QUERY_LAST_REQUEST : datetime.now(),
+ }
+ self.user_queries[filter_spec] = user_query
+
+ self.q_cv.notify()
+ self.r_cv.wait(5)
+ else:
+ user_query[QUERY_LAST_REQUEST] = datetime.now()
+ return user_query
+
+ def generate_report(self, user_query):
+ result = {} # type: Dict
+ global fs_list
+ # start with counter info -- metrics that are global and per mds
+ result["version"] = PERF_STATS_VERSION
+ result["global_counters"] = MDS_GLOBAL_PERF_QUERY_COUNTERS
+ result["counters"] = MDS_PERF_QUERY_COUNTERS
+
+ # fill in client metadata
+ raw_perfs_global = user_query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})
+ raw_perfs = user_query.setdefault(QUERY_RAW_COUNTERS, {})
+ with self.meta_lock:
+ raw_counters_clients = []
+ for val in raw_perfs.values():
+ raw_counters_clients.extend(list(val[1]))
+ result_meta = result.setdefault("client_metadata", {})
+ for fs_name in fs_list:
+ meta = self.client_metadata["metadata"]
+ if fs_name in meta and len(meta[fs_name]):
+ for client_id in raw_perfs_global.keys():
+ if client_id in meta[fs_name] and client_id in raw_counters_clients:
+ client_meta = (result_meta.setdefault(fs_name, {})).setdefault(client_id, {})
+ client_meta.update(meta[fs_name][client_id])
+
+ # start populating global perf metrics w/ client metadata
+ metrics = result.setdefault("global_metrics", {})
+ for fs_name in fs_list:
+ if fs_name in meta and len(meta[fs_name]):
+ for client_id, counters in raw_perfs_global.items():
+ if client_id in meta[fs_name] and client_id in raw_counters_clients:
+ global_client_metrics = (metrics.setdefault(fs_name, {})).setdefault(client_id, [])
+ del global_client_metrics[:]
+ global_client_metrics.extend(counters)
+
+ # and, now per-mds metrics keyed by mds rank along with delayed ranks
+ metrics = result.setdefault("metrics", {})
+
+ metrics["delayed_ranks"] = [rank for rank, counters in raw_perfs.items() if counters[0]]
+ for rank, counters in raw_perfs.items():
+ mds_key = "mds.{}".format(rank)
+ mds_metrics = metrics.setdefault(mds_key, {})
+ mds_metrics.update(counters[1])
+ return result
+
+ def extract_query_filters(self, cmd):
+ mds_rank_spec = cmd.get('mds_rank', None)
+ client_id_spec = cmd.get('client_id', None)
+ client_ip_spec = cmd.get('client_ip', None)
+
+ self.log.debug("mds_rank_spec={0}, client_id_spec={1}, client_ip_spec={2}".format(
+ mds_rank_spec, client_id_spec, client_ip_spec))
+
+ mds_ranks = extract_mds_ranks_from_spec(mds_rank_spec)
+ client_id = extract_client_id_from_spec(client_id_spec)
+ client_ip = extract_client_ip_from_spec(client_ip_spec)
+
+ return FilterSpec(mds_ranks, client_id, client_ip)
+
+ def get_perf_data(self, cmd):
+ try:
+ filter_spec = self.extract_query_filters(cmd)
+ except ValueError as e:
+ return -errno.EINVAL, "", str(e)
+
+ counters = {}
+ with self.lock:
+ user_query = self.register_query(filter_spec)
+ result = self.generate_report(user_query)
+ return 0, json.dumps(result), ""
diff --git a/src/pybind/mgr/stats/module.py b/src/pybind/mgr/stats/module.py
new file mode 100644
index 000000000..fcc1bce97
--- /dev/null
+++ b/src/pybind/mgr/stats/module.py
@@ -0,0 +1,41 @@
+"""
+performance stats for ceph filesystem (for now...)
+"""
+
+import json
+from typing import List, Dict
+
+from mgr_module import MgrModule, Option, NotifyType
+
+from .fs.perf_stats import FSPerfStats
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "fs perf stats "
+ "name=mds_rank,type=CephString,req=false "
+ "name=client_id,type=CephString,req=false "
+ "name=client_ip,type=CephString,req=false ",
+ "desc": "retrieve ceph fs performance stats",
+ "perm": "r"
+ },
+ ]
+ MODULE_OPTIONS: List[Option] = []
+ NOTIFY_TYPES = [NotifyType.command, NotifyType.fs_map]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.fs_perf_stats = FSPerfStats(self)
+
+ def notify(self, notify_type: NotifyType, notify_id):
+ if notify_type == NotifyType.command:
+ self.fs_perf_stats.notify_cmd(notify_id)
+ elif notify_type == NotifyType.fs_map:
+ self.fs_perf_stats.notify_fsmap()
+
+ def handle_command(self, inbuf, cmd):
+ prefix = cmd['prefix']
+ # only supported command is `fs perf stats` right now
+ if prefix.startswith('fs perf stats'):
+ return self.fs_perf_stats.get_perf_data(cmd)
+ raise NotImplementedError(cmd['prefix'])