diff options
Diffstat (limited to 'src/tools/cephfs/top')
-rw-r--r-- | src/tools/cephfs/top/CMakeLists.txt | 11 | ||||
-rwxr-xr-x | src/tools/cephfs/top/cephfs-top | 1227 | ||||
-rw-r--r-- | src/tools/cephfs/top/setup.py | 25 | ||||
-rw-r--r-- | src/tools/cephfs/top/tox.ini | 7 |
4 files changed, 1270 insertions, 0 deletions
diff --git a/src/tools/cephfs/top/CMakeLists.txt b/src/tools/cephfs/top/CMakeLists.txt new file mode 100644 index 000000000..8f9df0187 --- /dev/null +++ b/src/tools/cephfs/top/CMakeLists.txt @@ -0,0 +1,11 @@ +include(Distutils) +distutils_install_module(cephfs-top) + +if(WITH_TESTS) + include(AddCephTest) + add_tox_test(cephfs-top) +endif() + +set(MINIMUM_COMPATIBLE_VERSION 3.6.0) +find_package(Python3 ${MINIMUM_COMPATIBLE_VERSION} REQUIRED + COMPONENTS Interpreter) diff --git a/src/tools/cephfs/top/cephfs-top b/src/tools/cephfs/top/cephfs-top new file mode 100755 index 000000000..b39e815fa --- /dev/null +++ b/src/tools/cephfs/top/cephfs-top @@ -0,0 +1,1227 @@ +#!/usr/bin/python3 + +import argparse +import sys +import curses +import errno +import json +import signal +import time +import math +import threading + +from collections import OrderedDict +from datetime import datetime +from enum import Enum, unique +from curses import ascii + +import rados + + +class FSTopException(Exception): + def __init__(self, msg=''): + self.error_msg = msg + + def get_error_msg(self): + return self.error_msg + + +@unique +class MetricType(Enum): + METRIC_TYPE_NONE = 0 + METRIC_TYPE_PERCENTAGE = 1 + METRIC_TYPE_LATENCY = 2 + METRIC_TYPE_SIZE = 3 + METRIC_TYPE_STDEV = 4 + + +FS_TOP_PROG_STR = 'cephfs-top' +FS_TOP_ALL_FS_APP = 'ALL_FS_APP' +FS_TOP_FS_SELECTED_APP = 'SELECTED_FS_APP' + +# version match b/w fstop and stats emitted by mgr/stats +FS_TOP_SUPPORTED_VER = 2 + +ITEMS_PAD_LEN = 3 +ITEMS_PAD = " " * ITEMS_PAD_LEN +DEFAULT_REFRESH_INTERVAL = 1 + +# metadata provided by mgr/stats +FS_TOP_MAIN_WINDOW_COL_CLIENT_ID = "client_id" +FS_TOP_MAIN_WINDOW_COL_MNT_ROOT = "mount_root" +FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR = "mount_point@host/addr" + +MAIN_WINDOW_TOP_LINE_ITEMS_START = [ITEMS_PAD, + FS_TOP_MAIN_WINDOW_COL_CLIENT_ID, + FS_TOP_MAIN_WINDOW_COL_MNT_ROOT] +MAIN_WINDOW_TOP_LINE_ITEMS_END = [FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR] + +MAIN_WINDOW_TOP_LINE_METRICS_LEGACY = ["READ_LATENCY", + "WRITE_LATENCY", + "METADATA_LATENCY" + ] + +# adjust this map according to stats version and maintain order +# as emitted by mgr/stast +MAIN_WINDOW_TOP_LINE_METRICS = OrderedDict([ + ("CAP_HIT", MetricType.METRIC_TYPE_PERCENTAGE), + ("READ_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("WRITE_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("METADATA_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("DENTRY_LEASE", MetricType.METRIC_TYPE_PERCENTAGE), + ("OPENED_FILES", MetricType.METRIC_TYPE_NONE), + ("PINNED_ICAPS", MetricType.METRIC_TYPE_NONE), + ("OPENED_INODES", MetricType.METRIC_TYPE_NONE), + ("READ_IO_SIZES", MetricType.METRIC_TYPE_SIZE), + ("WRITE_IO_SIZES", MetricType.METRIC_TYPE_SIZE), + ("AVG_READ_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("STDEV_READ_LATENCY", MetricType.METRIC_TYPE_STDEV), + ("AVG_WRITE_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("STDEV_WRITE_LATENCY", MetricType.METRIC_TYPE_STDEV), + ("AVG_METADATA_LATENCY", MetricType.METRIC_TYPE_LATENCY), + ("STDEV_METADATA_LATENCY", MetricType.METRIC_TYPE_STDEV), +]) +MGR_STATS_COUNTERS = list(MAIN_WINDOW_TOP_LINE_METRICS.keys()) + +FS_TOP_VERSION_HEADER_FMT = '{prog_name} - {now}' +FS_TOP_CLIENT_HEADER_FMT = 'Total Client(s): {num_clients} - '\ + '{num_mounts} FUSE, {num_kclients} kclient, {num_libs} libcephfs' +FS_TOP_NAME_TOPL_FMT = 'Filesystem: {fs_name} - {client_count} client(s)' + +CLIENT_METADATA_KEY = "client_metadata" +CLIENT_METADATA_MOUNT_POINT_KEY = "mount_point" +CLIENT_METADATA_MOUNT_ROOT_KEY = "root" +CLIENT_METADATA_IP_KEY = "IP" +CLIENT_METADATA_HOSTNAME_KEY = "hostname" +CLIENT_METADATA_VALID_METRICS_KEY = "valid_metrics" + +GLOBAL_METRICS_KEY = "global_metrics" +GLOBAL_COUNTERS_KEY = "global_counters" + +fs_list = [] +# store the current states of cephfs-top +# last_fs : last filesystem visited +# last_field : last field selected for sorting +# limit : last limit value +current_states = {"last_fs": "", "last_field": 'chit', "limit": None} +metrics_dict = {} + + +def calc_perc(c): + if c[0] == 0 and c[1] == 0: + return 0.0 + return round((c[0] / (c[0] + c[1])) * 100, 2) + + +def calc_lat(c): + return round(c[0] * 1000 + c[1] / 1000000, 2) + + +def calc_stdev(c): + stdev = 0.0 + if c[1] > 1: + stdev = math.sqrt(c[0] / (c[1] - 1)) / 1000000 + return round(stdev, 2) + + +# in MB +def calc_size(c): + return round(c[1] / (1024 * 1024), 2) + + +# in MB +def calc_avg_size(c): + if c[0] == 0: + return 0.0 + return round(c[1] / (c[0] * 1024 * 1024), 2) + + +# in MB/s +def calc_speed(size, duration): + if duration == 0: + return 0.0 + return round(size / (duration * 1024 * 1024), 2) + + +def wrap(s, sl): + """return a '+' suffixed wrapped string""" + if len(s) < sl: + return s + return f'{s[0:sl-1]}+' + + +class FSTopBase(object): + def __init__(self): + self.last_time = time.time() + self.last_read_size = {} + self.last_write_size = {} + self.dump_json = {} + + @staticmethod + def has_metric(metadata, metrics_key): + return metrics_key in metadata + + @staticmethod + def has_metrics(metadata, metrics_keys): + for key in metrics_keys: + if not FSTopBase.has_metric(metadata, key): + return False + return True + + def __build_clients(self, fs): + fs_meta = self.dump_json.setdefault(fs, {}) + fs_key = self.stats_json[GLOBAL_METRICS_KEY].get(fs, {}) + clients = fs_key.keys() + for client_id in clients: + cur_time = time.time() + duration = cur_time - self.last_time + self.last_time = cur_time + client_meta = self.stats_json[CLIENT_METADATA_KEY].get(fs, {}).get(client_id, {}) + for item in MAIN_WINDOW_TOP_LINE_ITEMS_START[1:]: + if item == FS_TOP_MAIN_WINDOW_COL_CLIENT_ID: + client_id_meta = fs_meta.setdefault(client_id.split('.')[1], {}) + elif item == FS_TOP_MAIN_WINDOW_COL_MNT_ROOT: + client_id_meta.update({item: + client_meta[CLIENT_METADATA_MOUNT_ROOT_KEY]}) + counters = [m.upper() for m in self.stats_json[GLOBAL_COUNTERS_KEY]] + metrics = fs_key.get(client_id, {}) + cidx = 0 + for item in counters: + if item in MAIN_WINDOW_TOP_LINE_METRICS_LEGACY: + cidx += 1 + continue + m = metrics[cidx] + key = MGR_STATS_COUNTERS[cidx] + typ = MAIN_WINDOW_TOP_LINE_METRICS[key] + if item.lower() in client_meta.get( + CLIENT_METADATA_VALID_METRICS_KEY, []): + key_name = self.items(item) + if typ == MetricType.METRIC_TYPE_PERCENTAGE: + client_id_meta.update({f'{key_name}': calc_perc(m)}) + elif typ == MetricType.METRIC_TYPE_LATENCY: + client_id_meta.update({f'{key_name}': calc_lat(m)}) + elif typ == MetricType.METRIC_TYPE_STDEV: + client_id_meta.update({f'{key_name}': calc_stdev(m)}) + elif typ == MetricType.METRIC_TYPE_SIZE: + client_id_meta.update({f'{key_name}': calc_size(m)}) + # average io sizes + client_id_meta.update({f'{self.avg_items(item)}': + calc_avg_size(m)}) + # io speeds + size = 0 + if key == "READ_IO_SIZES": + if m[1] > 0: + last_size = self.last_read_size.get(client_id, 0) + size = m[1] - last_size + self.last_read_size[client_id] = m[1] + if key == "WRITE_IO_SIZES": + if m[1] > 0: + last_size = self.last_write_size.get(client_id, 0) + size = m[1] - last_size + self.last_write_size[client_id] = m[1] + client_id_meta.update({f'{self.speed_items(item)}': + calc_speed(abs(size), duration)}) + else: + # display 0th element from metric tuple + client_id_meta.update({f'{key_name}': f'{m[0]}'}) + else: + client_id_meta.update({f'{self.items(item)}': "N/A"}) + cidx += 1 + + for item in MAIN_WINDOW_TOP_LINE_ITEMS_END: + if item == FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR: + if FSTopBase.has_metrics(client_meta, + [CLIENT_METADATA_MOUNT_POINT_KEY, + CLIENT_METADATA_HOSTNAME_KEY, + CLIENT_METADATA_IP_KEY]): + mount_point = f'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}'\ + f'@{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/'\ + f'{client_meta[CLIENT_METADATA_IP_KEY]}' + client_id_meta.update({item: mount_point}) + else: + client_id_meta.update({item: "N/A"}) + + def dump_metrics_to_stdout(self, fs_name=None): + fs_list = self.get_fs_names() + if not fs_list: + sys.stdout.write("No filesystem available\n") + else: + self.stats_json = self.perf_stats_query() + if fs_name: # --dumpfs + if fs_name in fs_list: + self.__build_clients(fs_name) + else: + sys.stdout.write(f"Filesystem {fs_name} not available\n") + return + else: # --dump + for fs in fs_list: + self.__build_clients(fs) + sys.stdout.write(json.dumps(self.dump_json)) + sys.stdout.write("\n") + + +class FSTop(FSTopBase): + def __init__(self, args): + super(FSTop, self).__init__() + self.rados = None + self.stdscr = None # curses instance + self.active_screen = "" + self.client_name = args.id + self.cluster_name = args.cluster + self.conffile = args.conffile + self.refresh_interval_secs = args.delay + self.PAD_HEIGHT = 10000 # height of the fstop_pad + self.PAD_WIDTH = 300 # width of the fstop_pad + self.exit_ev = threading.Event() + + def handle_signal(self, signum, _): + self.exit_ev.set() + + def init(self): + try: + if self.conffile: + r_rados = rados.Rados(rados_id=self.client_name, + clustername=self.cluster_name, + conffile=self.conffile) + else: + r_rados = rados.Rados(rados_id=self.client_name, + clustername=self.cluster_name) + r_rados.conf_read_file() + r_rados.connect() + self.rados = r_rados + except rados.Error as e: + if e.errno == errno.ENOENT: + raise FSTopException(f'cluster {self.cluster_name}' + ' does not exist') + else: + raise FSTopException(f'error connecting to cluster: {e}') + self.verify_perf_stats_support() + signal.signal(signal.SIGTERM, self.handle_signal) + signal.signal(signal.SIGINT, self.handle_signal) + + def fini(self): + if self.rados: + self.rados.shutdown() + self.rados = None + + def selftest(self): + stats_json = self.perf_stats_query() + if not stats_json['version'] == FS_TOP_SUPPORTED_VER: + raise FSTopException('perf stats version mismatch!') + missing = [m for m in stats_json["global_counters"] + if m.upper() not in MGR_STATS_COUNTERS] + if missing: + raise FSTopException('Cannot handle unknown metrics from' + f'\'ceph fs perf stats\': {missing}') + + def get_fs_names(self): + mon_cmd = {'prefix': 'fs ls', 'format': 'json'} + try: + ret, buf, out = self.rados.mon_command(json.dumps(mon_cmd), b'') + except Exception as e: + raise FSTopException(f'Error in fs ls: {e}') + fs_map = json.loads(buf.decode('utf-8')) + global fs_list + fs_list.clear() + for filesystem in fs_map: + fs = filesystem['name'] + fs_list.append(fs) + return fs_list + + def setup_curses(self, win): + self.stdscr = win + self.stdscr.keypad(True) + curses.use_default_colors() + curses.start_color() + try: + curses.curs_set(0) + except curses.error: + # If the terminal do not support the visibility + # requested it will raise an exception + pass + self.fstop_pad = curses.newpad(self.PAD_HEIGHT, self.PAD_WIDTH) + self.run_all_display() + + def display_fs_menu(self, stdscr, selected_row_idx): + stdscr.clear() + h, w = stdscr.getmaxyx() + title = ['Filesystems', 'Press "q" to go back to the previous screen'] + pos_x1 = w // 2 - len(title[0]) // 2 + pos_x2 = w // 2 - len(title[1]) // 2 + stdscr.addstr(1, pos_x1, title[0], curses.A_STANDOUT | curses.A_BOLD) + stdscr.addstr(3, pos_x2, title[1], curses.A_DIM) + for index, name in enumerate(fs_list): + x = w // 2 - len(name) // 2 + y = h // 2 - len(fs_list) // 2 + index + if index == selected_row_idx: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(y, x, name) + stdscr.attroff(curses.color_pair(1)) + else: + stdscr.addstr(y, x, name) + stdscr.refresh() + + def display_sort_menu(self, stdscr, selected_row_idx, field_menu): + stdscr.clear() + title = ['Fields', 'Press "q" to go back to the previous screen'] + pos_x1 = 0 + pos_x2 = 0 + stdscr.addstr(1, pos_x1, title[0], curses.A_STANDOUT | curses.A_BOLD) + stdscr.addstr(3, pos_x2, title[1], curses.A_DIM) + for index, name in enumerate(field_menu): + x = 0 + y = 5 + index + if index == selected_row_idx: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(y, x, name) + stdscr.attroff(curses.color_pair(1)) + else: + stdscr.addstr(y, x, name) + stdscr.refresh() + + def display_menu(self, stdscr): + stdscr.clear() + h, w = stdscr.getmaxyx() + title = ['No filesystem available', + 'Press "q" to go back to home (All Filesystem Info) screen'] + pos_x1 = w // 2 - len(title[0]) // 2 + pos_x2 = w // 2 - len(title[1]) // 2 + stdscr.addstr(1, pos_x1, title[0], curses.A_STANDOUT | curses.A_BOLD) + stdscr.addstr(3, pos_x2, title[1], curses.A_DIM) + stdscr.refresh() + + def set_key(self, stdscr): + curses.curs_set(0) + curses.init_pair(1, curses.COLOR_MAGENTA, curses.COLOR_WHITE) + curr_row = 0 + key = 0 + endmenu = False + while not endmenu: + global fs_list + fs_list = self.get_fs_names() + + if key == curses.KEY_UP and curr_row > 0: + curr_row -= 1 + elif key == curses.KEY_DOWN and curr_row < len(fs_list) - 1: + curr_row += 1 + elif (key in [curses.KEY_ENTER, 10, 13]) and fs_list: + self.stdscr.erase() + current_states['last_fs'] = fs_list[curr_row] + self.run_display() + endmenu = True + elif key == ord('q'): + self.stdscr.erase() + if fs_list and self.active_screen == FS_TOP_FS_SELECTED_APP: + self.run_display() + else: + self.run_all_display() + endmenu = True + + try: + if not fs_list: + self.display_menu(stdscr) + else: + self.display_fs_menu(stdscr, curr_row) + except curses.error: + pass + curses.halfdelay(self.refresh_interval_secs) + key = stdscr.getch() + + def choose_field(self, stdscr): + curses.curs_set(0) + curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE) + field_menu = ["chit= CAP_HIT", "dlease= DENTRY_LEASE", "ofiles= OPENED_FILES", + "oicaps= PINNED_ICAPS", "oinodes= OPENED_INODES", + "rtio= READ_IO_SIZES", "raio= READ_AVG_IO_SIZES", + "rsp= READ_IO_SPEED", "wtio= WRITE_IO_SIZES", + "waio= WRITE_AVG_IO_SIZES", "wsp= WRITE_IO_SPEED", + "rlatavg= AVG_READ_LATENCY", "rlatsd= STDEV_READ_LATENCY", + "wlatavg= AVG_WRITE_LATENCY", "wlatsd= STDEV_WRITE_LATENCY", + "mlatavg= AVG_METADATA_LATENCY", "mlatsd= STDEV_METADATA_LATENCY", + "Default"] + curr_row1 = 0 + key = 0 + endwhile = False + while not endwhile: + global current_states, fs_list + fs_list = self.get_fs_names() + + if key == curses.KEY_UP and curr_row1 > 0: + curr_row1 -= 1 + elif key == curses.KEY_DOWN and curr_row1 < len(field_menu) - 1: + curr_row1 += 1 + elif (key in [curses.KEY_ENTER, 10, 13]) and fs_list: + self.stdscr.erase() + if curr_row1 != len(field_menu) - 1: + current_states["last_field"] = (field_menu[curr_row1].split('='))[0] + else: + current_states["last_field"] = 'chit' + self.header.erase() # erase the previous text + if self.active_screen == FS_TOP_ALL_FS_APP: + self.run_all_display() + else: + self.run_display() + endwhile = True + elif key == ord('q'): + self.stdscr.erase() + if fs_list and self.active_screen == FS_TOP_FS_SELECTED_APP: + self.run_display() + else: + self.run_all_display() + endwhile = True + + try: + if not fs_list: + self.display_menu(stdscr) + else: + self.display_sort_menu(stdscr, curr_row1, field_menu) + except curses.error: + pass + curses.halfdelay(self.refresh_interval_secs) + key = stdscr.getch() + + def set_limit(self, stdscr): + key = '' + endwhile = False + while not endwhile: + stdscr.clear() + h, w = stdscr.getmaxyx() + title = 'Enter the limit you want to set (number) and press ENTER,'\ + ' press "d" for default, "q" to go back to previous screen ' + pos_x1 = w // 2 - len(title) // 2 + try: + stdscr.addstr(1, pos_x1, title, curses.A_STANDOUT | curses.A_BOLD) + except curses.error: + pass + curses.halfdelay(self.refresh_interval_secs) + inp = stdscr.getch() + if inp in [ord('d'), ord('q')] or ascii.isdigit(inp): + key = key + chr(inp) + if key == 'd': + current_states["limit"] = None + elif key == 'q': + endwhile = True + elif (key).isnumeric(): + i = 1 + length = 4 + while i <= length: + pos = w // 2 - len(key) // 2 + try: + stdscr.move(3, 0) + stdscr.clrtoeol() + stdscr.addstr(3, pos, key, curses.A_BOLD) + except curses.error: + pass + if key[i - 1] == '\n': + break + inp = stdscr.getch() + if inp == ord('q'): + if current_states['limit'] is None: + key = current_states["limit"] + else: + key = current_states['limit'] + " " + break + if inp == curses.KEY_RESIZE: + stdscr.clear() + windowsize = stdscr.getmaxyx() + wd = windowsize[1] - 1 + pos_x1 = wd // 2 - len(title) // 2 + try: + stdscr.addstr(1, pos_x1, title, curses.A_STANDOUT | curses.A_BOLD) + except curses.error: + pass + if inp == curses.KEY_BACKSPACE or inp == curses.KEY_DC or inp == 127: + if i > 1: + key = key[:-1] + i = i - 1 + stdscr.move(4, 0) + stdscr.clrtoeol() + elif i == 1: + curses.wrapper(self.set_limit) + elif i == length: + if inp == ord('\n'): + key = key + chr(inp) + i = i + 1 + else: + info = "Max length is reached, press Backspace" \ + " to edit or Enter to set the limit!" + pos = w // 2 - len(info) // 2 + try: + stdscr.addstr(4, pos, info, curses.A_BOLD) + except curses.error: + pass + elif ascii.isdigit(inp) or inp == ord('\n'): + key = key + chr(inp) + i = i + 1 + if key is None: + current_states["limit"] = key + elif int(key) != 0: + current_states["limit"] = key[:-1] + self.stdscr.erase() + self.header.erase() # erase the previous text + if self.active_screen == FS_TOP_ALL_FS_APP: + self.run_all_display() + else: + self.run_display() + + def set_option_all_fs(self, opt): + # sets the options for 'All Filesystem Info' screen + if opt == ord('m'): + if fs_list: + curses.wrapper(self.set_key) + else: + return False + elif opt == ord('s'): + if fs_list: + curses.wrapper(self.choose_field) + else: + return False + elif opt == ord('l'): + if fs_list: + curses.wrapper(self.set_limit) + else: + return False + elif opt == ord('r'): + if fs_list: + current_states['last_field'] = 'chit' + current_states["limit"] = None + return False # We are already in run_all_display() + elif opt == ord('q'): + quit() + return True + + def set_option_sel_fs(self, opt, selected_fs): + # sets the options for 'Selected Filesystem Info' screen + if opt == ord('m'): + if selected_fs in fs_list: + curses.wrapper(self.set_key) + else: + return False + elif opt == ord('s'): + if selected_fs in fs_list: + curses.wrapper(self.choose_field) + else: + return False + elif opt == ord('l'): + if selected_fs in fs_list: + curses.wrapper(self.set_limit) + else: + return False + elif opt == ord('r'): + if selected_fs in fs_list: + current_states['last_field'] = 'chit' + current_states["limit"] = None + return False # we are already in run_display() + elif opt == ord('q'): + self.run_all_display() + return True + + def verify_perf_stats_support(self): + mon_cmd = {'prefix': 'mgr module ls', 'format': 'json'} + try: + ret, buf, out = self.rados.mon_command(json.dumps(mon_cmd), b'') + except Exception as e: + raise FSTopException(f'error checking \'stats\' module: {e}') + if ret != 0: + raise FSTopException(f'error checking \'stats\' module: {out}') + if 'stats' not in json.loads(buf.decode('utf-8'))['enabled_modules']: + raise FSTopException('\'stats\' module not enabled. Use' + '\'ceph mgr module enable stats\' to enable') + + def perf_stats_query(self): + mgr_cmd = {'prefix': 'fs perf stats', 'format': 'json'} + try: + ret, buf, out = self.rados.mgr_command(json.dumps(mgr_cmd), b'') + except Exception as e: + raise FSTopException(f'error in \'perf stats\' query: {e}') + if ret != 0: + raise FSTopException(f'error in \'perf stats\' query: {out}') + return json.loads(buf.decode('utf-8')) + + def items(self, item): + if item == "CAP_HIT": + return "chit" + if item == "READ_LATENCY": + return "rlat" + if item == "WRITE_LATENCY": + return "wlat" + if item == "METADATA_LATENCY": + return "mlat" + if item == "DENTRY_LEASE": + return "dlease" + if item == "OPENED_FILES": + return "ofiles" + if item == "PINNED_ICAPS": + return "oicaps" + if item == "OPENED_INODES": + return "oinodes" + if item == "READ_IO_SIZES": + return "rtio" + if item == "WRITE_IO_SIZES": + return "wtio" + if item == 'AVG_READ_LATENCY': + return 'rlatavg' + if item == 'STDEV_READ_LATENCY': + return 'rlatsd' + if item == 'AVG_WRITE_LATENCY': + return 'wlatavg' + if item == 'STDEV_WRITE_LATENCY': + return 'wlatsd' + if item == 'AVG_METADATA_LATENCY': + return 'mlatavg' + if item == 'STDEV_METADATA_LATENCY': + return 'mlatsd' + else: + # return empty string for none type + return '' + + def mtype(self, typ): + if typ == MetricType.METRIC_TYPE_PERCENTAGE: + return "(%)" + elif typ == MetricType.METRIC_TYPE_LATENCY: + return "(ms)" + elif typ == MetricType.METRIC_TYPE_SIZE: + return "(MB)" + elif typ == MetricType.METRIC_TYPE_STDEV: + return "(ms)" + else: + # return empty string for none type + return '' + + def avg_items(self, item): + if item == "READ_IO_SIZES": + return "raio" + if item == "WRITE_IO_SIZES": + return "waio" + else: + # return empty string for none type + return '' + + def speed_items(self, item): + if item == "READ_IO_SIZES": + return "rsp" + if item == "WRITE_IO_SIZES": + return "wsp" + else: + # return empty string for none type + return '' + + def speed_mtype(self, typ): + if typ == MetricType.METRIC_TYPE_SIZE: + return "(MB/s)" + else: + # return empty string for none type + return '' + + def create_table_header(self): # formerly named as top_line + heading = [] + for item in MAIN_WINDOW_TOP_LINE_ITEMS_START: + heading.append(item) + + for item, typ in MAIN_WINDOW_TOP_LINE_METRICS.items(): + if item in MAIN_WINDOW_TOP_LINE_METRICS_LEGACY: + continue + it = f'{self.items(item)}{self.mtype(typ)}' + heading.append(it) + + if item == "READ_IO_SIZES" or item == "WRITE_IO_SIZES": + # average io sizes + it = f'{self.avg_items(item)}{self.mtype(typ)}' + heading.append(it) + + # io speeds + it = f'{self.speed_items(item)}{self.speed_mtype(typ)}' + heading.append(it) + + for item in MAIN_WINDOW_TOP_LINE_ITEMS_END: + heading.append(item) + title = ITEMS_PAD.join(heading) + self.fsstats.addstr(self.tablehead_y, 0, title, curses.A_STANDOUT | curses.A_BOLD) + + def create_client(self, fs_name, client_id, metrics, counters, + client_meta, y_coord): + metrics_dict.setdefault(fs_name, {}) + metrics_dict[fs_name].setdefault(client_id, {}) + cur_time = time.time() + duration = cur_time - self.last_time + self.last_time = cur_time + xp = 0 # xp is incremented after each addstr to position the next incoming metrics. + for item in MAIN_WINDOW_TOP_LINE_ITEMS_START: # note: the first item is ITEMS_PAD + hlen = len(item) + ITEMS_PAD_LEN + if item == FS_TOP_MAIN_WINDOW_COL_CLIENT_ID: + self.fsstats.addstr(y_coord, xp, + wrap(client_id.split('.')[1], hlen), curses.A_DIM) + elif item == FS_TOP_MAIN_WINDOW_COL_MNT_ROOT: + if FSTop.has_metric(client_meta, + CLIENT_METADATA_MOUNT_ROOT_KEY): + hlen = len(item) + ITEMS_PAD_LEN + self.fsstats.addstr( + y_coord, xp, + wrap(client_meta[CLIENT_METADATA_MOUNT_ROOT_KEY], hlen), curses.A_DIM) + else: + self.fsstats.addstr(y_coord, xp, "N/A", curses.A_DIM) + xp += hlen + + cidx = 0 + for item in counters: + if item in MAIN_WINDOW_TOP_LINE_METRICS_LEGACY: + cidx += 1 + continue + m = metrics[cidx] + key = MGR_STATS_COUNTERS[cidx] + typ = MAIN_WINDOW_TOP_LINE_METRICS[key] + if item.lower() in client_meta.get( + CLIENT_METADATA_VALID_METRICS_KEY, []): + if typ == MetricType.METRIC_TYPE_PERCENTAGE: + perc = calc_perc(m) + metrics_dict[fs_name][client_id][self.items(item)] = perc + self.fsstats.addstr(y_coord, xp, + f'{perc}', curses.A_DIM) + xp += len(f'{self.items(item)}{self.mtype(typ)}') + ITEMS_PAD_LEN + elif typ == MetricType.METRIC_TYPE_LATENCY: + lat = calc_lat(m) + metrics_dict[fs_name][client_id][self.items(item)] = lat + self.fsstats.addstr(y_coord, xp, + f'{lat}', curses.A_DIM) + xp += len(f'{self.items(item)}{self.mtype(typ)}') + ITEMS_PAD_LEN + elif typ == MetricType.METRIC_TYPE_STDEV: + stdev = calc_stdev(m) + metrics_dict[fs_name][client_id][self.items(item)] = stdev + self.fsstats.addstr(y_coord, xp, + f'{stdev}', curses.A_DIM) + xp += len(f'{self.items(item)}{self.mtype(typ)}') + ITEMS_PAD_LEN + elif typ == MetricType.METRIC_TYPE_SIZE: + size = calc_size(m) + metrics_dict[fs_name][client_id][self.items(item)] = size + self.fsstats.addstr(y_coord, xp, + f'{size}', curses.A_DIM) + xp += len(f'{self.items(item)}{self.mtype(typ)}') + ITEMS_PAD_LEN + + # average io sizes + avg_size = calc_avg_size(m) + metrics_dict[fs_name][client_id][self.avg_items(key)] = avg_size + self.fsstats.addstr(y_coord, xp, + f'{avg_size}', curses.A_DIM) + xp += len(f'{self.avg_items(item)}{self.mtype(typ)}') + ITEMS_PAD_LEN + + # io speeds + size = 0 + if key == "READ_IO_SIZES": + if m[1] > 0: + last_size = self.last_read_size.get(client_id, 0) + size = m[1] - last_size + self.last_read_size[client_id] = m[1] + if key == "WRITE_IO_SIZES": + if m[1] > 0: + last_size = self.last_write_size.get(client_id, 0) + size = m[1] - last_size + self.last_write_size[client_id] = m[1] + speed = calc_speed(abs(size), duration) + metrics_dict[fs_name][client_id][self.speed_items(key)] = speed + self.fsstats.addstr(y_coord, xp, + f'{speed}', curses.A_DIM) + xp += len(f'{self.speed_items(item)}{self.speed_mtype(typ)}') + ITEMS_PAD_LEN + else: + # display 0th element from metric tuple + metrics_dict[fs_name][client_id][self.items(item)] = m[0] + self.fsstats.addstr(y_coord, xp, f'{m[0]}', curses.A_DIM) + xp += len(f'{self.items(item)}{self.mtype(typ)}') + ITEMS_PAD_LEN + else: + self.fsstats.addstr(y_coord, xp, "N/A", curses.A_DIM) + xp += len(self.items(item)) + ITEMS_PAD_LEN + cidx += 1 + + for item in MAIN_WINDOW_TOP_LINE_ITEMS_END: + wrapLen = self.PAD_WIDTH - xp + if item == FS_TOP_MAIN_WINDOW_COL_MNTPT_HOST_ADDR: + if FSTop.has_metrics(client_meta, + [CLIENT_METADATA_MOUNT_POINT_KEY, + CLIENT_METADATA_HOSTNAME_KEY, + CLIENT_METADATA_IP_KEY]): + mount_point = f'{client_meta[CLIENT_METADATA_MOUNT_POINT_KEY]}@'\ + f'{client_meta[CLIENT_METADATA_HOSTNAME_KEY]}/'\ + f'{client_meta[CLIENT_METADATA_IP_KEY]}' + self.fsstats.addstr( + y_coord, xp, + wrap(mount_point, wrapLen), curses.A_DIM) + else: + self.fsstats.addstr(y_coord, xp, "N/A", curses.A_DIM) + xp += len(self.items(item)) + ITEMS_PAD_LEN + + def create_clients(self, stats_json, fs_name): + global metrics_dict, current_states + counters = [m.upper() for m in stats_json[GLOBAL_COUNTERS_KEY]] + self.tablehead_y += 2 + res = stats_json[GLOBAL_METRICS_KEY].get(fs_name, {}) + client_cnt = len(res) + self.fsstats.addstr(self.tablehead_y, 0, FS_TOP_NAME_TOPL_FMT.format( + fs_name=fs_name, client_count=client_cnt), curses.A_BOLD) + self.tablehead_y += 2 + metrics_dict_client = metrics_dict.get(fs_name, {}) + if len(metrics_dict) > len(fs_list): + stale_fs = set(metrics_dict) - set(fs_list) + for key in stale_fs: + del metrics_dict[key] + if len(metrics_dict_client) > client_cnt: + stale_clients = set(metrics_dict_client) - set(res) + for key in stale_clients: + del metrics_dict_client[key] + if client_cnt: + if len(metrics_dict_client) != client_cnt: + sort_list = sorted(list(res.keys())) + else: + sort_arg = current_states['last_field'] + sort_list = sorted(list(res.keys()), + key=lambda x: metrics_dict[fs_name].get(x, {}).get(sort_arg, 0), + reverse=True) + if current_states['limit'] is not None and int(current_states['limit']) < client_cnt: + sort_list = sort_list[0:int(current_states['limit'])] + for client_id in sort_list: + self.create_client( + fs_name, client_id, res.get(client_id, {}), counters, + stats_json[CLIENT_METADATA_KEY].get(fs_name, {}).get(client_id, {}), + self.tablehead_y) + self.tablehead_y += 1 + + def create_header(self, stats_json, help, screen_title="", color_id=0): + num_clients, num_mounts, num_kclients, num_libs = 0, 0, 0, 0 + if not stats_json['version'] == FS_TOP_SUPPORTED_VER: + self.header.addstr(0, 0, 'perf stats version mismatch!', curses.A_BOLD) + return False + global fs_list + for fs_name in fs_list: + client_metadata = stats_json[CLIENT_METADATA_KEY].get(fs_name, {}) + client_cnt = len(client_metadata) + if client_cnt: + num_clients = num_clients + client_cnt + num_mounts = num_mounts + len( + [client for client, metadata in client_metadata.items() if + CLIENT_METADATA_MOUNT_POINT_KEY in metadata + and metadata[CLIENT_METADATA_MOUNT_POINT_KEY] != 'N/A']) + num_kclients = num_kclients + len( + [client for client, metadata in client_metadata.items() if + "kernel_version" in metadata]) + num_libs = num_clients - (num_mounts + num_kclients) + now = datetime.now().ctime() + self.header.addstr(0, 0, FS_TOP_VERSION_HEADER_FMT.format(prog_name=FS_TOP_PROG_STR, + now=now), curses.A_BOLD) + self.header.addstr(2, 0, screen_title, curses.color_pair(color_id) | curses.A_BOLD) + self.header.addstr(3, 0, FS_TOP_CLIENT_HEADER_FMT.format(num_clients=num_clients, + num_mounts=num_mounts, + num_kclients=num_kclients, + num_libs=num_libs), curses.A_DIM) + self.header.addstr(4, 0, f"Filters: Sort - {current_states['last_field']}, " + f"Limit - {current_states['limit']}", curses.A_DIM) + self.header.addstr(5, 0, help, curses.A_DIM) + return True + + def run_display(self): + # clear the pads to have a smooth refresh + self.header.erase() + self.fsstats.erase() + + self.active_screen = FS_TOP_FS_SELECTED_APP + screen_title = "Selected Filesystem Info" + help_commands = "m - select a filesystem | s - sort menu | l - limit number of clients"\ + " | r - reset to default | q - home (All Filesystem Info) screen" + curses.init_pair(3, curses.COLOR_MAGENTA, -1) + + top, left = 0, 0 # where to place pad + vscrollOffset, hscrollOffset = 0, 0 # scroll offsets + + # calculate the initial viewport height and width + windowsize = self.stdscr.getmaxyx() + self.viewportHeight, self.viewportWidth = windowsize[0] - 1, windowsize[1] - 1 + + # create header subpad + self.header_height = 7 + self.header = self.fstop_pad.subwin(self.header_height, self.viewportWidth, 0, 0) + + # create fsstats subpad + fsstats_begin_y = self.header_height + fsstats_height = self.PAD_HEIGHT - self.header_height + self.fsstats = self.fstop_pad.subwin(fsstats_height, self.PAD_WIDTH, fsstats_begin_y, 0) + + curses.halfdelay(1) + cmd = self.stdscr.getch() + global fs_list, current_states + while not self.exit_ev.is_set(): + fs_list = self.get_fs_names() + fs = current_states["last_fs"] + if cmd in [ord('m'), ord('s'), ord('l'), ord('r'), ord('q')]: + if self.set_option_sel_fs(cmd, fs): + self.exit_ev.set() + + stats_json = self.perf_stats_query() + vscrollEnd = 0 + if fs not in fs_list: + help = f"Error: The selected filesystem '{fs}' is not available now. " \ + "[Press 'q' to go back to home (All Filesystem Info) screen]" + # reset the sort/limit settings if fs_list is empty, otherwise continue the + # settings for the other filesystems. + if not fs_list: + current_states["last_field"] = 'chit' + current_states["limit"] = None + self.header.erase() # erase previous text + self.fsstats.erase() + self.create_header(stats_json, help, screen_title, 3) + else: + self.tablehead_y = 0 + help = "COMMANDS: " + help_commands + self.fsstats.erase() # erase previous text + + client_metadata = stats_json[GLOBAL_METRICS_KEY].get(fs, {}) + if current_states['limit'] is not None and \ + int(current_states['limit']) < len(client_metadata): + num_client = int(current_states['limit']) + else: + num_client = len(client_metadata) + vscrollEnd += num_client + if self.create_header(stats_json, help, screen_title, 3): + self.create_table_header() + self.create_clients(stats_json, fs) + + # scroll and refresh + if cmd == curses.KEY_DOWN: + if (vscrollEnd - vscrollOffset) > 1: + vscrollOffset += 1 + else: + vscrollOffset = vscrollEnd + elif cmd == curses.KEY_UP: + if vscrollOffset > 0: + vscrollOffset -= 1 + elif cmd == curses.KEY_NPAGE: + if (vscrollEnd - vscrollOffset) / 20 > 1: + vscrollOffset += 20 + else: + vscrollOffset = vscrollEnd + elif cmd == curses.KEY_PPAGE: + if vscrollOffset / 20 >= 1: + vscrollOffset -= 20 + else: + vscrollOffset = 0 + elif cmd == curses.KEY_RIGHT: + if hscrollOffset < self.PAD_WIDTH - self.viewportWidth - 1: + hscrollOffset += 1 + elif cmd == curses.KEY_LEFT: + if hscrollOffset > 0: + hscrollOffset -= 1 + elif cmd == curses.KEY_HOME: + hscrollOffset = 0 + elif cmd == curses.KEY_END: + hscrollOffset = self.PAD_WIDTH - self.viewportWidth - 1 + elif cmd == curses.KEY_RESIZE: + # terminal resize event. Update the viewport dimensions + windowsize = self.stdscr.getmaxyx() + self.viewportHeight, self.viewportWidth = windowsize[0] - 1, windowsize[1] - 1 + + if cmd: + try: + # refresh the viewport for the header portion + if cmd not in [curses.KEY_DOWN, + curses.KEY_UP, + curses.KEY_NPAGE, + curses.KEY_PPAGE, + curses.KEY_RIGHT, + curses.KEY_LEFT]: + self.fstop_pad.refresh(0, 0, + top, left, + top + self.header_height, left + self.viewportWidth) + # refresh the viewport for the current table header portion in the fsstats pad + if cmd not in [curses.KEY_DOWN, + curses.KEY_UP, + curses.KEY_NPAGE, + curses.KEY_PPAGE]: + self.fstop_pad.refresh(fsstats_begin_y, hscrollOffset, + top + fsstats_begin_y, left, + 7, left + self.viewportWidth) + # refresh the viewport for the current client records portion in the fsstats pad + self.fstop_pad.refresh(fsstats_begin_y + 1 + vscrollOffset, hscrollOffset, + top + fsstats_begin_y + 2, left, + top + self.viewportHeight, left + self.viewportWidth) + except curses.error: + # This happens when the user switches to a terminal of different zoom size. + # just retry it. + pass + # End scroll and refresh + + curses.halfdelay(self.refresh_interval_secs * 10) + cmd = self.stdscr.getch() + + def run_all_display(self): + # clear text from the previous screen + if self.active_screen == FS_TOP_FS_SELECTED_APP: + self.header.erase() + + self.active_screen = FS_TOP_ALL_FS_APP + screen_title = "All Filesystem Info" + curses.init_pair(2, curses.COLOR_CYAN, -1) + + top, left = 0, 0 # where to place pad + vscrollOffset, hscrollOffset = 0, 0 # scroll offsets + + # calculate the initial viewport height and width + windowsize = self.stdscr.getmaxyx() + self.viewportHeight, self.viewportWidth = windowsize[0] - 1, windowsize[1] - 1 + + # create header subpad + self.header_height = 7 + self.header = self.fstop_pad.subwin(self.header_height, self.viewportWidth, 0, 0) + + # create fsstats subpad + fsstats_begin_y = self.header_height + fsstats_height = self.PAD_HEIGHT - self.header_height + self.fsstats = self.fstop_pad.subwin(fsstats_height, self.PAD_WIDTH, fsstats_begin_y, 0) + + curses.halfdelay(1) + cmd = self.stdscr.getch() + while not self.exit_ev.is_set(): + if cmd in [ord('m'), ord('s'), ord('l'), ord('r'), ord('q')]: + if self.set_option_all_fs(cmd): + self.exit_ev.set() + + # header display + global fs_list, current_states + fs_list = self.get_fs_names() + current_states["last_fs"] = fs_list + stats_json = self.perf_stats_query() + vscrollEnd = 0 + if not fs_list: + help = "INFO: No filesystem is available [Press 'q' to quit]" + # reset the sort/limit settings + current_states["last_field"] = 'chit' + current_states["limit"] = None + self.header.erase() # erase previous text + self.fsstats.erase() + self.create_header(stats_json, help, screen_title, 2) + else: + self.tablehead_y = 0 + num_client = 0 + help = "COMMANDS: m - select a filesystem | s - sort menu |"\ + " l - limit number of clients | r - reset to default | q - quit" + self.fsstats.erase() # erase previous text + for index, fs in enumerate(fs_list): + # Get the vscrollEnd in advance + client_metadata = stats_json[GLOBAL_METRICS_KEY].get(fs, {}) + if current_states['limit'] is not None and \ + int(current_states['limit']) < len(client_metadata): + num_client = int(current_states['limit']) + else: + num_client = len(client_metadata) + vscrollEnd += num_client + if self.create_header(stats_json, help, screen_title, 2): + if not index: # do it only for the first fs + self.create_table_header() + self.create_clients(stats_json, fs) + + # scroll and refresh + if cmd == curses.KEY_DOWN: + if (vscrollEnd - vscrollOffset) > 1: + vscrollOffset += 1 + else: + vscrollOffset = vscrollEnd + elif cmd == curses.KEY_UP: + if vscrollOffset > 0: + vscrollOffset -= 1 + elif cmd == curses.KEY_NPAGE: + if (vscrollEnd - vscrollOffset) / 20 > 1: + vscrollOffset += 20 + else: + vscrollOffset = vscrollEnd + elif cmd == curses.KEY_PPAGE: + if vscrollOffset / 20 >= 1: + vscrollOffset -= 20 + else: + vscrollOffset = 0 + elif cmd == curses.KEY_RIGHT: + if hscrollOffset < self.PAD_WIDTH - self.viewportWidth - 1: + hscrollOffset += 1 + elif cmd == curses.KEY_LEFT: + if hscrollOffset > 0: + hscrollOffset -= 1 + elif cmd == curses.KEY_HOME: + hscrollOffset = 0 + elif cmd == curses.KEY_END: + hscrollOffset = self.PAD_WIDTH - self.viewportWidth - 1 + elif cmd == curses.KEY_RESIZE: + # terminal resize event. Update the viewport dimensions + windowsize = self.stdscr.getmaxyx() + self.viewportHeight, self.viewportWidth = windowsize[0] - 1, windowsize[1] - 1 + if cmd: + try: + # refresh the viewport for the header portion + if cmd not in [curses.KEY_DOWN, + curses.KEY_UP, + curses.KEY_NPAGE, + curses.KEY_PPAGE, + curses.KEY_RIGHT, + curses.KEY_LEFT]: + self.fstop_pad.refresh(0, 0, + top, left, + top + self.header_height, left + self.viewportWidth) + # refresh the viewport for the current table header portion in the fsstats pad + if cmd not in [curses.KEY_DOWN, + curses.KEY_UP, + curses.KEY_NPAGE, + curses.KEY_PPAGE]: + self.fstop_pad.refresh(fsstats_begin_y, hscrollOffset, + top + fsstats_begin_y, left, + 7, left + self.viewportWidth) + # refresh the viewport for the current client records portion in the fsstats pad + self.fstop_pad.refresh(fsstats_begin_y + 1 + vscrollOffset, hscrollOffset, + top + fsstats_begin_y + 2, left, + top + self.viewportHeight, left + self.viewportWidth) + except curses.error: + # This happens when the user switches to a terminal of different zoom size. + # just retry it. + pass + # End scroll and refresh + + curses.halfdelay(self.refresh_interval_secs * 10) + cmd = self.stdscr.getch() +# End class FSTop + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Ceph Filesystem top utility') + parser.add_argument('--cluster', nargs='?', const='ceph', default='ceph', + help='Ceph cluster to connect (default: ceph)') + parser.add_argument('--id', nargs='?', const='fstop', default='fstop', + help='Ceph user to use to connection (default: fstop)') + parser.add_argument('--conffile', nargs='?', default=None, + help='Path to cluster configuration file') + parser.add_argument('--selftest', dest='selftest', action='store_true', + help='Run in selftest mode') + parser.add_argument('-d', '--delay', metavar='DELAY', dest='delay', choices=range(1, 26), + default=DEFAULT_REFRESH_INTERVAL, + type=int, + help='Refresh interval in seconds ' + f'(default: {DEFAULT_REFRESH_INTERVAL}, range: 1 - 25)') + parser.add_argument('--dump', dest='dump', action='store_true', + help='Dump the metrics to stdout') + parser.add_argument('--dumpfs', action='append', + help='Dump the metrics of the given fs to stdout') + + args = parser.parse_args() + err = False + ft = FSTop(args) + try: + ft.init() + if args.selftest: + ft.selftest() + sys.stdout.write("selftest ok\n") + elif args.dump: + ft.dump_metrics_to_stdout() + elif args.dumpfs: + ft.dump_metrics_to_stdout(args.dumpfs[0]) + else: + curses.wrapper(ft.setup_curses) + except FSTopException as fst: + err = True + sys.stderr.write(f'{fst.get_error_msg()}\n') + except Exception as e: + err = True + sys.stderr.write(f'exception: {e}\n') + finally: + ft.fini() + sys.exit(0 if not err else -1) diff --git a/src/tools/cephfs/top/setup.py b/src/tools/cephfs/top/setup.py new file mode 100644 index 000000000..92fbd964c --- /dev/null +++ b/src/tools/cephfs/top/setup.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- + +from setuptools import setup + +__version__ = '0.0.1' + +setup( + name='cephfs-top', + version=__version__, + description='top(1) like utility for Ceph Filesystem', + keywords='cephfs, top', + scripts=['cephfs-top'], + install_requires=[ + 'rados', + ], + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Environment :: Console', + 'Intended Audience :: System Administrators', + 'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 3' + ], + license='LGPLv2+', +) diff --git a/src/tools/cephfs/top/tox.ini b/src/tools/cephfs/top/tox.ini new file mode 100644 index 000000000..b125c0bc8 --- /dev/null +++ b/src/tools/cephfs/top/tox.ini @@ -0,0 +1,7 @@ +[tox] +envlist = py3 +skipsdist = true + +[testenv:py3] +deps = flake8 +commands = flake8 --ignore=W503 --max-line-length=100 cephfs-top |