diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/seastar/scripts | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/seastar/scripts')
-rwxr-xr-x | src/seastar/scripts/addr2line.py | 387 | ||||
-rwxr-xr-x | src/seastar/scripts/build.sh | 54 | ||||
-rwxr-xr-x | src/seastar/scripts/dpdk_nic_bind.py | 539 | ||||
-rwxr-xr-x | src/seastar/scripts/io-trace-parse.py | 173 | ||||
-rwxr-xr-x | src/seastar/scripts/perftune.py | 1762 | ||||
-rw-r--r-- | src/seastar/scripts/perftune.yaml | 33 | ||||
-rwxr-xr-x | src/seastar/scripts/posix_net_conf.sh | 81 | ||||
-rwxr-xr-x | src/seastar/scripts/run_with_dpdk.sh | 98 | ||||
-rwxr-xr-x | src/seastar/scripts/seastar-addr2line | 271 | ||||
-rwxr-xr-x | src/seastar/scripts/seastar-cpu-map.sh | 54 | ||||
-rwxr-xr-x | src/seastar/scripts/seastar-json2code.py | 578 | ||||
-rwxr-xr-x | src/seastar/scripts/stall-analyser.py | 370 | ||||
-rw-r--r-- | src/seastar/scripts/tap.sh | 31 |
13 files changed, 4431 insertions, 0 deletions
diff --git a/src/seastar/scripts/addr2line.py b/src/seastar/scripts/addr2line.py new file mode 100755 index 000000000..7cca90163 --- /dev/null +++ b/src/seastar/scripts/addr2line.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Copyright (C) 2017 ScyllaDB + +import bisect +import collections +import re +import sys +import subprocess +from enum import Enum +from typing import Any + +# special binary path/module indicating that the address is from the kernel +KERNEL_MODULE = '<kernel>' + +class Addr2Line: + + # Matcher for a line that appears at the end a single decoded + # address, which we force by adding a dummy 0x0 address. The + # pattern varies between binutils addr2line and llvm-addr2line + # so we match both. + dummy_pattern = re.compile( + r"(.*0x0000000000000000: \?\? \?\?:0\n)" # addr2line pattern + r"|" + r"(.*0x0: \?\? at \?\?:0\n)" # llvm-addr2line pattern + ) + + def __init__(self, binary, concise=False, cmd_path="addr2line"): + self._binary = binary + + # Print warning if binary has no debug info according to `file`. + # Note: no message is printed for system errors as they will be + # printed also by addr2line later on. + output = subprocess.check_output(["file", self._binary]) + s = output.decode("utf-8") + if s.find('ELF') >= 0 and s.find('debug_info', len(self._binary)) < 0: + print('{}'.format(s)) + + options = f"-{'C' if not concise else ''}fpia" + self._input = subprocess.Popen([cmd_path, options, "-e", self._binary], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) + if concise: + self._output = subprocess.Popen(["c++filt", "-p"], stdin=self._input.stdout, stdout=subprocess.PIPE, universal_newlines=True) + else: + self._output = self._input + + # If a library doesn't exist in a particular path, addr2line + # will just exit. We need to be robust against that. We + # can't just wait on self._addr2line since there is no + # guarantee on what timeout is sufficient. + self._input.stdin.write('\n') + self._input.stdin.flush() + res = self._output.stdout.readline() + self._missing = res == '' + + def _read_resolved_address(self): + res = self._output.stdout.readline() + # remove the address + res = res.split(': ', 1)[1] + line = '' + while Addr2Line.dummy_pattern.fullmatch(line) is None: + res += line + line = self._output.stdout.readline() + return res + + def __call__(self, address): + if self._missing: + return " ".join([self._binary, address, '\n']) + # We print a dummy 0x0 address after the address we are interested in + # which we can look for in _read_address + self._input.stdin.write(address + '\n0x0\n') + self._input.stdin.flush() + return self._read_resolved_address() + +class KernelResolver: + """A resolver for kernel addresses which tries to read from /proc/kallsyms.""" + + LAST_SYMBOL_MAX_SIZE = 1024 + + def __init__(self): + syms : list[tuple[int, str]] = [] + ksym_re = re.compile(r'(?P<addr>[0-9a-f]+) (?P<type>.+) (?P<name>\S+)') + warnings_left = 10 + + self.error = None + + try: + f = open('/proc/kallsyms', 'r') + except OSError as e: + self.error = f'Cannot open /proc/kallsyms: {e}' + print(self.error) + return + + try: + for line in f: + m = ksym_re.match(line) + if not m: + if warnings_left > 0: # don't spam too much + print(f'WARNING: /proc/kallsyms regex match failure: {line.strip()}', file=sys.stdout) + warnings_left -= 1 + else: + syms.append((int(m.group('addr'), 16), m.group('name'))) + finally: + f.close() + + if not syms: + # make empty kallsyms (?) an error so we can assum len >= 1 below + self.error = 'kallsyms was empty' + print(self.error) + return + + syms.sort() + + if syms[-1][0] == 0: + # zero values for all symbols means that kptr_restrict blocked you + # from seeing the kernel symbol addresses + print('kallsyms is restricted, set /proc/sys/kernel/kptr_restrict to 0 to decode') + self.error = 'kallsyms is restricted' + return + + # split because bisect can't take a key func before 3.10 + self.sym_addrs : tuple[int] + self.sym_names : tuple[str] + self.sym_addrs, self.sym_names = zip(*syms) # type: ignore + + + def __call__(self, addrstr): + if self.error: + return addrstr + '\n' + + sa = self.sym_addrs + sn = self.sym_names + slen = len(sa) + address = int(addrstr, 16) + idx = bisect.bisect_right(sa, address) - 1 + assert -1 <= idx < slen + if idx == -1: + return f'{addrstr} ({sa[0] - address} bytes before first symbol)\n' + if idx == slen - 1: + # We can easily detect symbol addresses which are too small: they fall before + # the first symbol in kallsyms, but for too large it is harder: we can't really + # distinguish between an address that is in the *very last* function in the symbol map + # and one which is beyond that, since kallsyms doesn't include symbol size. Instead + # we use a bit of a quick and dirty heuristic: if the symbol is *far enough* beyond + # the last symbol we assume it is not valid. Most likely, the overwhelming majority + # of cases are invalid (e.g., due to KASLR) as the final symbol in the map is usually + # something obscure. + lastsym = sa[-1] + if address - lastsym > self.LAST_SYMBOL_MAX_SIZE: + return f'{addrstr} ({address - lastsym} bytes after last symbol)\n' + saddr = sa[idx] + assert saddr <= address + return f'{sn[idx]}+0x{address - saddr:x}\n' + + +class BacktraceResolver(object): + + class BacktraceParser(object): + class Type(Enum): + ADDRESS = 1 + SEPARATOR = 2 + + def __init__(self): + addr = "0x[0-9a-f]+" + path = "\S+" + token = f"(?:{path}\+)?{addr}" + full_addr_match = f"(?:(?P<path>{path})\s*\+\s*)?(?P<addr>{addr})" + ignore_addr_match = f"(?:(?P<path>{path})\s*\+\s*)?(?:{addr})" + self.oneline_re = re.compile(f"^((?:.*(?:(?:at|backtrace):?|:))?(?:\s+))?({token}(?:\s+{token})*)(?:\).*|\s*)$", flags=re.IGNORECASE) + self.address_re = re.compile(full_addr_match, flags=re.IGNORECASE) + self.syslog_re = re.compile(f"^(?:#\d+\s+)(?P<addr>{addr})(?:.*\s+)\({ignore_addr_match}\)\s*$", flags=re.IGNORECASE) + self.kernel_re = re.compile(fr'^kernel callstack: (?P<addrs>(?:{addr}\s*)+)$') + self.asan_re = re.compile(f"^(?:.*\s+)\({full_addr_match}\)(\s+\(BuildId: [0-9a-fA-F]+\))?$", flags=re.IGNORECASE) + self.asan_ignore_re = re.compile(f"^=.*$", flags=re.IGNORECASE) + self.generic_re = re.compile(f"^(?:.*\s+){full_addr_match}\s*$", flags=re.IGNORECASE) + self.separator_re = re.compile('^\W*-+\W*$') + + + def split_addresses(self, addrstring: str, default_path=None): + addresses : list[dict[str, Any]] = [] + for obj in addrstring.split(): + m = re.match(self.address_re, obj) + assert m, f'addr did not match address regex: {obj}' + #print(f" >>> '{obj}': address {m.groups()}") + addresses.append({'path': m.group(1) or default_path, 'addr': m.group(2)}) + return addresses + + def __call__(self, line): + def get_prefix(s): + if s is not None: + s = s.strip() + return s or None + + # order here is important: the kernel callstack regex + # needs to come first since it is more specific and would + # otherwise be matched by the online regex which comes next + m = self.kernel_re.match(line) + if m: + return { + 'type': self.Type.ADDRESS, + 'prefix': 'kernel callstack: ', + 'addresses' : self.split_addresses(m.group('addrs'), KERNEL_MODULE) + } + + m = re.match(self.oneline_re, line) + if m: + #print(f">>> '{line}': oneline {m.groups()}") + return { + 'type': self.Type.ADDRESS, + 'prefix': get_prefix(m.group(1)), + 'addresses': self.split_addresses(m.group(2)) + } + + m = re.match(self.syslog_re, line) + if m: + #print(f">>> '{line}': syslog {m.groups()}") + ret = {'type': self.Type.ADDRESS} + ret['prefix'] = None + ret['addresses'] = [{'path': m.group('path'), 'addr': m.group('addr')}] + return ret + + m = re.match(self.asan_ignore_re, line) + if m: + #print(f">>> '{line}': asan ignore") + return None + + m = re.match(self.asan_re, line) + if m: + #print(f">>> '{line}': asan {m.groups()}") + ret = {'type': self.Type.ADDRESS} + ret['prefix'] = None + ret['addresses'] = [{'path': m.group('path'), 'addr': m.group('addr')}] + return ret + + m = re.match(self.generic_re, line) + if m: + #print(f">>> '{line}': generic {m.groups()}") + ret = {'type': self.Type.ADDRESS} + ret['prefix'] = None + ret['addresses'] = [{'path': m.group('path'), 'addr': m.group('addr')}] + return ret + + match = re.match(self.separator_re, line) + if match: + return {'type': self.Type.SEPARATOR} + + #print(f">>> '{line}': None") + return None + + def __init__(self, executable, before_lines=1, context_re='', verbose=False, concise=False, cmd_path='addr2line'): + self._executable = executable + self._current_backtrace = [] + self._prefix = None + self._before_lines = before_lines + self._before_lines_queue = collections.deque(maxlen=before_lines) + self._i = 0 + self._known_backtraces = {} + if context_re is not None: + self._context_re = re.compile(context_re) + else: + self._context_re = None + self._verbose = verbose + self._concise = concise + self._cmd_path = cmd_path + self._known_modules = {} + self._get_resolver_for_module(self._executable) # fail fast if there is something wrong with the exe resolver + self.parser = self.BacktraceParser() + + def _get_resolver_for_module(self, module): + if not module in self._known_modules: + if module == KERNEL_MODULE: + resolver = KernelResolver() + else: + resolver = Addr2Line(module, self._concise, self._cmd_path) + self._known_modules[module] = resolver + return self._known_modules[module] + + def __enter__(self): + return self + + def __exit__(self, type, value, tb): + self._print_current_backtrace() + + def resolve_address(self, address, module=None, verbose=None): + if module is None: + module = self._executable + if verbose is None: + verbose = self._verbose + resolved_address = self._get_resolver_for_module(module)(address) + if verbose: + resolved_address = '{{{}}} {}: {}'.format(module, address, resolved_address) + return resolved_address + + def _print_resolved_address(self, module, address): + sys.stdout.write(self.resolve_address(address, module)) + + def _backtrace_context_matches(self): + if self._context_re is None: + return True + + if any(map(lambda x: self._context_re.search(x) is not None, self._before_lines_queue)): + return True + + if (not self._prefix is None) and self._context_re.search(self._prefix): + return True + + return False + + def _print_current_backtrace(self): + if len(self._current_backtrace) == 0: + return + + if not self._backtrace_context_matches(): + self._current_backtrace = [] + return + + for line in self._before_lines_queue: + sys.stdout.write(line) + + if not self._prefix is None: + print(self._prefix) + self._prefix = None + + backtrace = "".join(map(str, self._current_backtrace)) + if backtrace in self._known_backtraces: + print("[Backtrace #{}] Already seen, not resolving again.".format(self._known_backtraces[backtrace])) + print("") # To separate traces with an empty line + self._current_backtrace = [] + return + + self._known_backtraces[backtrace] = self._i + + print("[Backtrace #{}]".format(self._i)) + + for module, addr in self._current_backtrace: + self._print_resolved_address(module, addr) + + print("") # To separate traces with an empty line + + self._current_backtrace = [] + self._i += 1 + + def __call__(self, line): + res = self.parser(line) + + if not res: + self._print_current_backtrace() + if self._before_lines > 0: + self._before_lines_queue.append(line) + elif self._before_lines < 0: + sys.stdout.write(line) # line already has a trailing newline + else: + pass # when == 0 no non-backtrace lines are printed + elif res['type'] == self.BacktraceParser.Type.SEPARATOR: + pass + elif res['type'] == self.BacktraceParser.Type.ADDRESS: + addresses = res['addresses'] + if len(addresses) > 1: + self._print_current_backtrace() + if len(self._current_backtrace) == 0: + self._prefix = res['prefix'] + for r in addresses: + if r['path']: + self._current_backtrace.append((r['path'], r['addr'])) + else: + self._current_backtrace.append((self._executable, r['addr'])) + if len(addresses) > 1: + self._print_current_backtrace() + else: + print(f"Unknown '{line}': {res}") + raise RuntimeError("Unknown result type {res}") + diff --git a/src/seastar/scripts/build.sh b/src/seastar/scripts/build.sh new file mode 100755 index 000000000..6b3ea88a0 --- /dev/null +++ b/src/seastar/scripts/build.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# This scripts expects seastar-dev container to be built e.g. like this +# $ docker build -t seastar-dev -f ./docker/dev/Dockerfile . + +BUILDER_IMAGE=${BUILDER_IMAGE:=seastar-dev} + +if [ $# -eq 0 -o "$1" == "--help" -o "$1" == "-h" ]; then + echo "Usage: $(basename $0) <mode> [<compiler>] [<version>] [<c++ standard>]" + exit 0 +fi + +if [ -z "$CONTAINERIZED" ]; then + OPTIONS="" + if [ -t 1 ]; then + OPTIONS="-it" + echo "Wrapping self into $BUILDER_IMAGE container" + fi + exec docker run $OPTIONS --rm -v$(pwd):/home/src -e 'CONTAINERIZED=yes' -w /home/src $BUILDER_IMAGE /home/src/scripts/$(basename $0) "$@" +fi + +set -e +set -x +update-alternatives --auto gcc +update-alternatives --auto clang + +MODE=$1 +COMPILER=$2 +VERSION=$3 +STANDARD=$4 + +CONFIGURE="--mode=$MODE" +if [ ! -z "$COMPILER" ]; then + if [ "$COMPILER" == "gcc" ]; then + CPP_COMPILER="g++" + elif [ "$COMPILER" == "clang" ]; then + CPP_COMPILER="clang++" + else + echo "Unknown compiler (use 'gcc' or 'clang')" + exit 1 + fi + CONFIGURE="$CONFIGURE --compiler=$CPP_COMPILER" + + if [ ! -z "$VERSION" ]; then + update-alternatives --set $COMPILER /usr/bin/${COMPILER}-${VERSION} + update-alternatives --set $CPP_COMPILER /usr/bin/${CPP_COMPILER}-${VERSION} + + if [ ! -z "$STANDARD" ]; then + CONFIGURE="$CONFIGURE --c++-standard=$STANDARD" + fi + fi +fi + +./configure.py $CONFIGURE +ninja -C build/$MODE diff --git a/src/seastar/scripts/dpdk_nic_bind.py b/src/seastar/scripts/dpdk_nic_bind.py new file mode 100755 index 000000000..e5557bf7f --- /dev/null +++ b/src/seastar/scripts/dpdk_nic_bind.py @@ -0,0 +1,539 @@ +#!/usr/bin/env python2 +# +# BSD LICENSE +# +# Copyright(c) 2010-2014 Intel Corporation. All rights reserved. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +import sys, os, getopt, subprocess +from os.path import exists, abspath, dirname, basename + + +# The PCI device class for ETHERNET devices +ETHERNET_CLASS = "0200" + +# global dict ethernet devices present. Dictionary indexed by PCI address. +# Each device within this is itself a dictionary of device properties +devices = {} +# list of supported DPDK drivers +dpdk_drivers = [ "igb_uio", "vfio-pci", "uio_pci_generic" ] + +# command-line arg flags +b_flag = None +status_flag = False +force_flag = False +args = [] + +def usage(): + '''Print usage information for the program''' + argv0 = basename(sys.argv[0]) + print """ +Usage: +------ + + %(argv0)s [options] DEVICE1 DEVICE2 .... + +where DEVICE1, DEVICE2 etc, are specified via PCI "domain:bus:slot.func" syntax +or "bus:slot.func" syntax. For devices bound to Linux kernel drivers, they may +also be referred to by Linux interface name e.g. eth0, eth1, em0, em1, etc. + +Options: + --help, --usage: + Display usage information and quit + + --status: + Print the current status of all known network interfaces. + For each device, it displays the PCI domain, bus, slot and function, + along with a text description of the device. Depending upon whether the + device is being used by a kernel driver, the igb_uio driver, or no + driver, other relevant information will be displayed: + * the Linux interface name e.g. if=eth0 + * the driver being used e.g. drv=igb_uio + * any suitable drivers not currently using that device + e.g. unused=igb_uio + NOTE: if this flag is passed along with a bind/unbind option, the status + display will always occur after the other operations have taken place. + + -b driver, --bind=driver: + Select the driver to use or \"none\" to unbind the device + + -u, --unbind: + Unbind a device (Equivalent to \"-b none\") + + --force: + By default, devices which are used by Linux - as indicated by having + routes in the routing table - cannot be modified. Using the --force + flag overrides this behavior, allowing active links to be forcibly + unbound. + WARNING: This can lead to loss of network connection and should be used + with caution. + +Examples: +--------- + +To display current device status: + %(argv0)s --status + +To bind eth1 from the current driver and move to use igb_uio + %(argv0)s --bind=igb_uio eth1 + +To unbind 0000:01:00.0 from using any driver + %(argv0)s -u 0000:01:00.0 + +To bind 0000:02:00.0 and 0000:02:00.1 to the ixgbe kernel driver + %(argv0)s -b ixgbe 02:00.0 02:00.1 + + """ % locals() # replace items from local variables + +# This is roughly compatible with check_output function in subprocess module +# which is only available in python 2.7. +def check_output(args, stderr=None): + '''Run a command and capture its output''' + return subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=stderr).communicate()[0] + +def find_module(mod): + '''find the .ko file for kernel module named mod. + Searches the $RTE_SDK/$RTE_TARGET directory, the kernel + modules directory and finally under the parent directory of + the script ''' + # check $RTE_SDK/$RTE_TARGET directory + if 'RTE_SDK' in os.environ and 'RTE_TARGET' in os.environ: + path = "%s/%s/kmod/%s.ko" % (os.environ['RTE_SDK'],\ + os.environ['RTE_TARGET'], mod) + if exists(path): + return path + + # check using depmod + try: + depmod_out = check_output(["modinfo", "-n", mod], \ + stderr=subprocess.STDOUT).lower() + if "error" not in depmod_out: + path = depmod_out.strip() + if exists(path): + return path + except: # if modinfo can't find module, it fails, so continue + pass + + # check for a copy based off current path + tools_dir = dirname(abspath(sys.argv[0])) + if (tools_dir.endswith("tools")): + base_dir = dirname(tools_dir) + find_out = check_output(["find", base_dir, "-name", mod + ".ko"]) + if len(find_out) > 0: #something matched + path = find_out.splitlines()[0] + if exists(path): + return path + +def check_modules(): + '''Checks that igb_uio is loaded''' + global dpdk_drivers + + fd = file("/proc/modules") + loaded_mods = fd.readlines() + fd.close() + + # list of supported modules + mods = [{"Name" : driver, "Found" : False} for driver in dpdk_drivers] + + # first check if module is loaded + for line in loaded_mods: + for mod in mods: + if line.startswith(mod["Name"]): + mod["Found"] = True + # special case for vfio_pci (module is named vfio-pci, + # but its .ko is named vfio_pci) + elif line.replace("_", "-").startswith(mod["Name"]): + mod["Found"] = True + + # check if we have at least one loaded module + if True not in [mod["Found"] for mod in mods] and b_flag is not None: + if b_flag in dpdk_drivers: + print "Error - no supported modules(DPDK driver) are loaded" + sys.exit(1) + else: + print "Warning - no supported modules(DPDK driver) are loaded" + + # change DPDK driver list to only contain drivers that are loaded + dpdk_drivers = [mod["Name"] for mod in mods if mod["Found"]] + +def has_driver(dev_id): + '''return true if a device is assigned to a driver. False otherwise''' + return "Driver_str" in devices[dev_id] + +def get_pci_device_details(dev_id): + '''This function gets additional details for a PCI device''' + device = {} + + extra_info = check_output(["lspci", "-vmmks", dev_id]).splitlines() + + # parse lspci details + for line in extra_info: + if len(line) == 0: + continue + name, value = line.split("\t", 1) + name = name.strip(":") + "_str" + device[name] = value + # check for a unix interface name + sys_path = "/sys/bus/pci/devices/%s/net/" % dev_id + if exists(sys_path): + device["Interface"] = ",".join(os.listdir(sys_path)) + else: + device["Interface"] = "" + # check if a port is used for ssh connection + device["Ssh_if"] = False + device["Active"] = "" + + return device + +def get_nic_details(): + '''This function populates the "devices" dictionary. The keys used are + the pci addresses (domain:bus:slot.func). The values are themselves + dictionaries - one for each NIC.''' + global devices + global dpdk_drivers + + # clear any old data + devices = {} + # first loop through and read details for all devices + # request machine readable format, with numeric IDs + dev = {}; + dev_lines = check_output(["lspci", "-Dvmmn"]).splitlines() + for dev_line in dev_lines: + if (len(dev_line) == 0): + if dev["Class"] == ETHERNET_CLASS: + #convert device and vendor ids to numbers, then add to global + dev["Vendor"] = int(dev["Vendor"],16) + dev["Device"] = int(dev["Device"],16) + devices[dev["Slot"]] = dict(dev) # use dict to make copy of dev + else: + name, value = dev_line.split("\t", 1) + dev[name.rstrip(":")] = value + + # check what is the interface if any for an ssh connection if + # any to this host, so we can mark it later. + ssh_if = [] + route = check_output(["ip", "-o", "route"]) + # filter out all lines for 169.254 routes + route = "\n".join(filter(lambda ln: not ln.startswith("169.254"), + route.splitlines())) + rt_info = route.split() + for i in xrange(len(rt_info) - 1): + if rt_info[i] == "dev": + ssh_if.append(rt_info[i+1]) + + # based on the basic info, get extended text details + for d in devices.keys(): + # get additional info and add it to existing data + devices[d] = dict(devices[d].items() + + get_pci_device_details(d).items()) + + for _if in ssh_if: + if _if in devices[d]["Interface"].split(","): + devices[d]["Ssh_if"] = True + devices[d]["Active"] = "*Active*" + break; + + # add igb_uio to list of supporting modules if needed + if "Module_str" in devices[d]: + for driver in dpdk_drivers: + if driver not in devices[d]["Module_str"]: + devices[d]["Module_str"] = devices[d]["Module_str"] + ",%s" % driver + else: + devices[d]["Module_str"] = ",".join(dpdk_drivers) + + # make sure the driver and module strings do not have any duplicates + if has_driver(d): + modules = devices[d]["Module_str"].split(",") + if devices[d]["Driver_str"] in modules: + modules.remove(devices[d]["Driver_str"]) + devices[d]["Module_str"] = ",".join(modules) + +def dev_id_from_dev_name(dev_name): + '''Take a device "name" - a string passed in by user to identify a NIC + device, and determine the device id - i.e. the domain:bus:slot.func - for + it, which can then be used to index into the devices array''' + dev = None + # check if it's already a suitable index + if dev_name in devices: + return dev_name + # check if it's an index just missing the domain part + elif "0000:" + dev_name in devices: + return "0000:" + dev_name + else: + # check if it's an interface name, e.g. eth1 + for d in devices.keys(): + if dev_name in devices[d]["Interface"].split(","): + return devices[d]["Slot"] + # if nothing else matches - error + print "Unknown device: %s. " \ + "Please specify device in \"bus:slot.func\" format" % dev_name + sys.exit(1) + +def unbind_one(dev_id, force): + '''Unbind the device identified by "dev_id" from its current driver''' + dev = devices[dev_id] + if not has_driver(dev_id): + print "%s %s %s is not currently managed by any driver\n" % \ + (dev["Slot"], dev["Device_str"], dev["Interface"]) + return + + # prevent us disconnecting ourselves + if dev["Ssh_if"] and not force: + print "Routing table indicates that interface %s is active" \ + ". Skipping unbind" % (dev_id) + return + + # write to /sys to unbind + filename = "/sys/bus/pci/drivers/%s/unbind" % dev["Driver_str"] + try: + f = open(filename, "a") + except: + print "Error: unbind failed for %s - Cannot open %s" % (dev_id, filename) + sys/exit(1) + f.write(dev_id) + f.close() + +def bind_one(dev_id, driver, force): + '''Bind the device given by "dev_id" to the driver "driver". If the device + is already bound to a different driver, it will be unbound first''' + dev = devices[dev_id] + saved_driver = None # used to rollback any unbind in case of failure + + # prevent disconnection of our ssh session + if dev["Ssh_if"] and not force: + print "Routing table indicates that interface %s is active" \ + ". Not modifying" % (dev_id) + return + + # unbind any existing drivers we don't want + if has_driver(dev_id): + if dev["Driver_str"] == driver: + print "%s already bound to driver %s, skipping\n" % (dev_id, driver) + return + else: + saved_driver = dev["Driver_str"] + unbind_one(dev_id, force) + dev["Driver_str"] = "" # clear driver string + + # if we are binding to one of DPDK drivers, add PCI id's to that driver + if driver in dpdk_drivers: + filename = "/sys/bus/pci/drivers/%s/new_id" % driver + try: + f = open(filename, "w") + except: + print "Error: bind failed for %s - Cannot open %s" % (dev_id, filename) + return + try: + f.write("%04x %04x" % (dev["Vendor"], dev["Device"])) + f.close() + except: + print "Error: bind failed for %s - Cannot write new PCI ID to " \ + "driver %s" % (dev_id, driver) + return + + # do the bind by writing to /sys + filename = "/sys/bus/pci/drivers/%s/bind" % driver + try: + f = open(filename, "a") + except: + print "Error: bind failed for %s - Cannot open %s" % (dev_id, filename) + if saved_driver is not None: # restore any previous driver + bind_one(dev_id, saved_driver, force) + return + try: + f.write(dev_id) + f.close() + except: + # for some reason, closing dev_id after adding a new PCI ID to new_id + # results in IOError. however, if the device was successfully bound, + # we don't care for any errors and can safely ignore IOError + tmp = get_pci_device_details(dev_id) + if "Driver_str" in tmp and tmp["Driver_str"] == driver: + return + print "Error: bind failed for %s - Cannot bind to driver %s" % (dev_id, driver) + if saved_driver is not None: # restore any previous driver + bind_one(dev_id, saved_driver, force) + return + + +def unbind_all(dev_list, force=False): + """Unbind method, takes a list of device locations""" + dev_list = map(dev_id_from_dev_name, dev_list) + for d in dev_list: + unbind_one(d, force) + +def bind_all(dev_list, driver, force=False): + """Bind method, takes a list of device locations""" + global devices + + dev_list = map(dev_id_from_dev_name, dev_list) + + for d in dev_list: + bind_one(d, driver, force) + + # when binding devices to a generic driver (i.e. one that doesn't have a + # PCI ID table), some devices that are not bound to any other driver could + # be bound even if no one has asked them to. hence, we check the list of + # drivers again, and see if some of the previously-unbound devices were + # erroneously bound. + for d in devices.keys(): + # skip devices that were already bound or that we know should be bound + if "Driver_str" in devices[d] or d in dev_list: + continue + + # update information about this device + devices[d] = dict(devices[d].items() + + get_pci_device_details(d).items()) + + # check if updated information indicates that the device was bound + if "Driver_str" in devices[d]: + unbind_one(d, force) + +def display_devices(title, dev_list, extra_params = None): + '''Displays to the user the details of a list of devices given in "dev_list" + The "extra_params" parameter, if given, should contain a string with + %()s fields in it for replacement by the named fields in each device's + dictionary.''' + strings = [] # this holds the strings to print. We sort before printing + print "\n%s" % title + print "="*len(title) + if len(dev_list) == 0: + strings.append("<none>") + else: + for dev in dev_list: + if extra_params is not None: + strings.append("%s '%s' %s" % (dev["Slot"], \ + dev["Device_str"], extra_params % dev)) + else: + strings.append("%s '%s'" % (dev["Slot"], dev["Device_str"])) + # sort before printing, so that the entries appear in PCI order + strings.sort() + print "\n".join(strings) # print one per line + +def show_status(): + '''Function called when the script is passed the "--status" option. Displays + to the user what devices are bound to the igb_uio driver, the kernel driver + or to no driver''' + global dpdk_drivers + kernel_drv = [] + dpdk_drv = [] + no_drv = [] + + # split our list of devices into the three categories above + for d in devices.keys(): + if not has_driver(d): + no_drv.append(devices[d]) + continue + if devices[d]["Driver_str"] in dpdk_drivers: + dpdk_drv.append(devices[d]) + else: + kernel_drv.append(devices[d]) + + # print each category separately, so we can clearly see what's used by DPDK + display_devices("Network devices using DPDK-compatible driver", dpdk_drv, \ + "drv=%(Driver_str)s unused=%(Module_str)s") + display_devices("Network devices using kernel driver", kernel_drv, + "if=%(Interface)s drv=%(Driver_str)s unused=%(Module_str)s %(Active)s") + display_devices("Other network devices", no_drv,\ + "unused=%(Module_str)s") + +def parse_args(): + '''Parses the command-line arguments given by the user and takes the + appropriate action for each''' + global b_flag + global status_flag + global force_flag + global args + if len(sys.argv) <= 1: + usage() + sys.exit(0) + + try: + opts, args = getopt.getopt(sys.argv[1:], "b:u", + ["help", "usage", "status", "force", + "bind=", "unbind"]) + except getopt.GetoptError, error: + print str(error) + print "Run '%s --usage' for further information" % sys.argv[0] + sys.exit(1) + + for opt, arg in opts: + if opt == "--help" or opt == "--usage": + usage() + sys.exit(0) + if opt == "--status": + status_flag = True + if opt == "--force": + force_flag = True + if opt == "-b" or opt == "-u" or opt == "--bind" or opt == "--unbind": + if b_flag is not None: + print "Error - Only one bind or unbind may be specified\n" + sys.exit(1) + if opt == "-u" or opt == "--unbind": + b_flag = "none" + else: + b_flag = arg + +def do_arg_actions(): + '''do the actual action requested by the user''' + global b_flag + global status_flag + global force_flag + global args + + if b_flag is None and not status_flag: + print "Error: No action specified for devices. Please give a -b or -u option" + print "Run '%s --usage' for further information" % sys.argv[0] + sys.exit(1) + + if b_flag is not None and len(args) == 0: + print "Error: No devices specified." + print "Run '%s --usage' for further information" % sys.argv[0] + sys.exit(1) + + if b_flag == "none" or b_flag == "None": + unbind_all(args, force_flag) + elif b_flag is not None: + bind_all(args, b_flag, force_flag) + if status_flag: + if b_flag is not None: + get_nic_details() # refresh if we have changed anything + show_status() + +def main(): + '''program main function''' + parse_args() + check_modules() + get_nic_details() + do_arg_actions() + +if __name__ == "__main__": + main() diff --git a/src/seastar/scripts/io-trace-parse.py b/src/seastar/scripts/io-trace-parse.py new file mode 100755 index 000000000..65f1a3e2c --- /dev/null +++ b/src/seastar/scripts/io-trace-parse.py @@ -0,0 +1,173 @@ +#!/bin/env python3 +# +# Script to parse IO trace logs and show some stats +# + +import sys +import statistics + + +# prints average, .99 quantile and maximum value for an array +def print_stat_line(what, st): + def q99(arr): + return statistics.quantiles(arr, n=100)[-1] + + print("\t{:18}: avg:{:12.6f} .99:{:12.6f} max:{:12.6f}".format(what, + statistics.fmean(st), q99(st), max(st))) + + +# Inc/Dec counter that also collects its value history +class counter: + def __init__(self): + self._v = 0 + self._stat = [] + + def inc(self): + self._v += 1 + self._stat.append(self._v) + + def dec(self): + self._v -= 1 + self._stat.append(self._v) + + def stat(self): + return self._stat + + +class req: + def __init__(self, rqlen): + self.len = rqlen + self.queue = None + self.submit = None + self.complete = None + + +# Timings for requests +class req_stat: + def __init__(self): + self.qtimes = [] # time in queue + self.xtimes = [] # time in disk + self.latencies = [] # sum of the above + self.delays = [] # time between submits + self.prev = None # helper for the above + self.in_queue = counter() + self.in_disk = counter() + + def queue(self, rq): + self.in_queue.inc() + + def submit(self, rq): + if self.prev: + self.delays.append(rq.submit - self.prev) + self.prev = rq.submit + self.qtimes.append(rq.submit - rq.queue) + self.in_queue.dec() + self.in_disk.inc() + + def complete(self, rq): + self.xtimes.append(rq.complete - rq.submit) + self.latencies.append(rq.complete - rq.queue) + self.in_disk.dec() + + def show(self, rqlen): + print("{}k requests".format(int(rqlen/1024))) + print("\ttotal: {}".format(len(self.latencies))) + print_stat_line('in queue usec', self.qtimes) + print_stat_line(' `- num ', self.in_queue.stat()) + print_stat_line('in disk usec', self.xtimes) + print_stat_line(' `- num ', self.in_disk.stat()) + print_stat_line('latency', self.latencies) + print_stat_line('period', self.delays) + + +# Stats for a device. Umbrella-object for the above stats +class device_stat: + def __init__(self): + self.reqs = {} # collection of req's + self.req_stats = {} # statistics by request size + self.in_queue = counter() + self.in_disk = counter() + + def queue(self, rqid, ts, rqlen): + rq = req(rqlen) + self.reqs[rqid] = rq + rq.queue = ts + if rq.len not in self.req_stats: + self.req_stats[rq.len] = req_stat() + st = self.req_stats[rq.len] + st.queue(rq) + self.in_queue.inc() + + def submit(self, rqid, ts): + rq = self.reqs[rqid] + rq.submit = ts + st = self.req_stats[rq.len] + st.submit(rq) + self.in_queue.dec() + self.in_disk.inc() + + def complete(self, rqid, ts): + rq = self.reqs[rqid] + rq.complete = ts + st = self.req_stats[rq.len] + st.complete(rq) + del self.reqs[rqid] + self.in_disk.dec() + + def _show_req_stats(self): + for rlen in self.req_stats: + st = self.req_stats[rlen] + st.show(rlen) + + def _show_queue_stats(self): + print("queue") + print_stat_line('in queue num:', self.in_queue.stat()) + print_stat_line('in disk num:', self.in_disk.stat()) + + def show(self, devid): + print("{}".format(devid).center(80, "-")) + self._show_req_stats() + self._show_queue_stats() + + +class parser: + def __init__(self, f): + self._file = f + self._dev_stats = {} + + def _get_dev_stats(self, devid): + if devid not in self._dev_stats: + self._dev_stats[devid] = device_stat() + + return self._dev_stats[devid] + + def _parse_req_event(self, ln): + req_id = ln[10] + ts = float(ln[1]) + st = self._get_dev_stats(int(ln[7])) + + if ln[11] == 'queue': + st.queue(req_id, ts, int(ln[13])) + elif ln[11] == 'submit': + st.submit(req_id, ts) + elif ln[11] == 'complete': + st.complete(req_id, ts) + + def _parse_line(self, ln): + if ln[4] == 'io': + if ln[9] == 'req': + self._parse_req_event(ln) + + def parse(self): + for ln in self._file: + if ln.startswith('TRACE'): + self._parse_line(ln.strip().split()) + + return self._dev_stats + + +if __name__ == "__main__": + p = parser(sys.stdin) + stats = p.parse() + for devid in stats: + stats[devid].show(devid) diff --git a/src/seastar/scripts/perftune.py b/src/seastar/scripts/perftune.py new file mode 100755 index 000000000..39e6fe203 --- /dev/null +++ b/src/seastar/scripts/perftune.py @@ -0,0 +1,1762 @@ +#!/usr/bin/env python3 + +import abc +import argparse +import distutils.util +import enum +import functools +import glob +import itertools +import logging +import math +import multiprocessing +import os +import pathlib +import pyudev +import re +import shutil +import subprocess +import sys +import urllib.request +import yaml +import platform +import shlex + +dry_run_mode = False +def perftune_print(log_msg, *args, **kwargs): + if dry_run_mode: + log_msg = "# " + log_msg + print(log_msg, *args, **kwargs) + +def __run_one_command(prog_args, stderr=None, check=True): + proc = subprocess.Popen(prog_args, stdout = subprocess.PIPE, stderr = stderr) + outs, errs = proc.communicate() + outs = str(outs, 'utf-8') + + if check and proc.returncode != 0: + raise subprocess.CalledProcessError(returncode=proc.returncode, cmd=" ".join(prog_args), output=outs, stderr=errs) + + return outs + +def run_one_command(prog_args, stderr=None, check=True): + if dry_run_mode: + print(" ".join([shlex.quote(x) for x in prog_args])) + else: + __run_one_command(prog_args, stderr=stderr, check=check) + +def run_read_only_command(prog_args, stderr=None, check=True): + return __run_one_command(prog_args, stderr=stderr, check=check) + +def run_hwloc_distrib(prog_args): + """ + Returns a list of strings - each representing a single line of hwloc-distrib output. + """ + return run_read_only_command(['hwloc-distrib'] + prog_args).splitlines() + +def run_hwloc_calc(prog_args): + """ + Returns a single string with the result of the execution. + """ + return run_read_only_command(['hwloc-calc'] + prog_args).rstrip() + +def run_ethtool(prog_args): + """ + Returns a list of strings - each representing a single line of ethtool output. + """ + return run_read_only_command(['ethtool'] + prog_args).splitlines() + +def fwriteln(fname, line, log_message, log_errors=True): + try: + if dry_run_mode: + print("echo {} > {}".format(line, fname)) + return + else: + with open(fname, 'w') as f: + f.write(line) + print(log_message) + except: + if log_errors: + print("{}: failed to write into {}: {}".format(log_message, fname, sys.exc_info())) + +def readlines(fname): + try: + with open(fname, 'r') as f: + return f.readlines() + except: + print("Failed to read {}: {}".format(fname, sys.exc_info())) + return [] + +def fwriteln_and_log(fname, line, log_errors=True): + msg = "Writing '{}' to {}".format(line, fname) + fwriteln(fname, line, log_message=msg, log_errors=log_errors) + +double_commas_pattern = re.compile(',,') + +def set_one_mask(conf_file, mask, log_errors=True): + if not os.path.exists(conf_file): + raise Exception("Configure file to set mask doesn't exist: {}".format(conf_file)) + mask = re.sub('0x', '', mask) + + while double_commas_pattern.search(mask): + mask = double_commas_pattern.sub(',0,', mask) + + msg = "Setting mask {} in {}".format(mask, conf_file) + fwriteln(conf_file, mask, log_message=msg, log_errors=log_errors) + +def distribute_irqs(irqs, cpu_mask, log_errors=True): + # If IRQs' list is empty - do nothing + if not irqs: + return + + for i, mask in enumerate(run_hwloc_distrib(["{}".format(len(irqs)), '--single', '--restrict', cpu_mask])): + set_one_mask("/proc/irq/{}/smp_affinity".format(irqs[i]), mask, log_errors=log_errors) + +def is_process_running(name): + return len(list(filter(lambda ps_line : not re.search('<defunct>', ps_line), run_read_only_command(['ps', '--no-headers', '-C', name], check=False).splitlines()))) > 0 + +def restart_irqbalance(banned_irqs): + """ + Restart irqbalance if it's running and ban it from moving the IRQs from the + given list. + """ + config_file = '/etc/default/irqbalance' + options_key = 'OPTIONS' + systemd = False + banned_irqs_list = list(banned_irqs) + + # If there is nothing to ban - quit + if not banned_irqs_list: + return + + # return early if irqbalance is not running + if not is_process_running('irqbalance'): + perftune_print("irqbalance is not running") + return + + # If this file exists - this a "new (systemd) style" irqbalance packaging. + # This type of packaging uses IRQBALANCE_ARGS as an option key name, "old (init.d) style" + # packaging uses an OPTION key. + if os.path.exists('/lib/systemd/system/irqbalance.service') or \ + os.path.exists('/usr/lib/systemd/system/irqbalance.service'): + options_key = 'IRQBALANCE_ARGS' + systemd = True + + if not os.path.exists(config_file): + if os.path.exists('/etc/sysconfig/irqbalance'): + config_file = '/etc/sysconfig/irqbalance' + elif os.path.exists('/etc/conf.d/irqbalance'): + config_file = '/etc/conf.d/irqbalance' + options_key = 'IRQBALANCE_OPTS' + with open('/proc/1/comm', 'r') as comm: + systemd = 'systemd' in comm.read() + else: + perftune_print("Unknown system configuration - not restarting irqbalance!") + perftune_print("You have to prevent it from moving IRQs {} manually!".format(banned_irqs_list)) + return + + orig_file = "{}.scylla.orig".format(config_file) + + # Save the original file + if not dry_run_mode: + if not os.path.exists(orig_file): + print("Saving the original irqbalance configuration is in {}".format(orig_file)) + shutil.copyfile(config_file, orig_file) + else: + print("File {} already exists - not overwriting.".format(orig_file)) + + # Read the config file lines + cfile_lines = open(config_file, 'r').readlines() + + # Build the new config_file contents with the new options configuration + perftune_print("Restarting irqbalance: going to ban the following IRQ numbers: {} ...".format(", ".join(banned_irqs_list))) + + # Search for the original options line + opt_lines = list(filter(lambda line : re.search("^\s*{}".format(options_key), line), cfile_lines)) + if not opt_lines: + new_options = "{}=\"".format(options_key) + elif len(opt_lines) == 1: + # cut the last " + new_options = re.sub("\"\s*$", "", opt_lines[0].rstrip()) + opt_lines = opt_lines[0].strip() + else: + raise Exception("Invalid format in {}: more than one lines with {} key".format(config_file, options_key)) + + for irq in banned_irqs_list: + # prevent duplicate "ban" entries for the same IRQ + patt_str = "\-\-banirq\={}\Z|\-\-banirq\={}\s".format(irq, irq) + if not re.search(patt_str, new_options): + new_options += " --banirq={}".format(irq) + + new_options += "\"" + + if dry_run_mode: + if opt_lines: + print("sed -i 's/^{}/#{}/g' {}".format(options_key, options_key, config_file)) + print("echo {} | tee -a {}".format(new_options, config_file)) + else: + with open(config_file, 'w') as cfile: + for line in cfile_lines: + if not re.search("^\s*{}".format(options_key), line): + cfile.write(line) + + cfile.write(new_options + "\n") + + if systemd: + perftune_print("Restarting irqbalance via systemctl...") + run_one_command(['systemctl', 'try-restart', 'irqbalance']) + else: + perftune_print("Restarting irqbalance directly (init.d)...") + run_one_command(['/etc/init.d/irqbalance', 'restart']) + +def learn_irqs_from_proc_interrupts(pattern, irq2procline): + return [ irq for irq, proc_line in filter(lambda irq_proc_line_pair : re.search(pattern, irq_proc_line_pair[1]), irq2procline.items()) ] + +def learn_all_irqs_one(irq_conf_dir, irq2procline, xen_dev_name): + """ + Returns a list of IRQs of a single device. + + irq_conf_dir: a /sys/... directory with the IRQ information for the given device + irq2procline: a map of IRQs to the corresponding lines in the /proc/interrupts + xen_dev_name: a device name pattern as it appears in the /proc/interrupts on Xen systems + """ + msi_irqs_dir_name = os.path.join(irq_conf_dir, 'msi_irqs') + # Device uses MSI IRQs + if os.path.exists(msi_irqs_dir_name): + return os.listdir(msi_irqs_dir_name) + + irq_file_name = os.path.join(irq_conf_dir, 'irq') + # Device uses INT#x + if os.path.exists(irq_file_name): + return [ line.lstrip().rstrip() for line in open(irq_file_name, 'r').readlines() ] + + # No irq file detected + modalias = open(os.path.join(irq_conf_dir, 'modalias'), 'r').readline() + + # virtio case + if re.search("^virtio", modalias): + return list(itertools.chain.from_iterable( + map(lambda dirname : learn_irqs_from_proc_interrupts(dirname, irq2procline), + filter(lambda dirname : re.search('virtio', dirname), + itertools.chain.from_iterable([ dirnames for dirpath, dirnames, filenames in os.walk(os.path.join(irq_conf_dir, 'driver')) ]))))) + + # xen case + if re.search("^xen:", modalias): + return learn_irqs_from_proc_interrupts(xen_dev_name, irq2procline) + + return [] + +def get_irqs2procline_map(): + return { line.split(':')[0].lstrip().rstrip() : line for line in open('/proc/interrupts', 'r').readlines() } + + +class AutodetectError(Exception): + pass + + +def auto_detect_irq_mask(cpu_mask, cores_per_irq_core): + """ + The logic of auto-detection of what was once a 'mode' is generic and is all about the amount of CPUs and NUMA + nodes that are present and a restricting 'cpu_mask'. + This function implements this logic: + + * up to 4 CPU threads: use 'cpu_mask' + * up to 4 CPU cores (on x86 this would translate to 8 CPU threads): use a single CPU thread out of allowed + * up to 16 CPU cores: use a single CPU core out of allowed + * more than 16 CPU cores: use a single CPU core for each 16 CPU cores and distribute them evenly among all + present NUMA nodes. + + An AutodetectError exception is raised if 'cpu_mask' is defined in a way that there is a different number of threads + and/or cores among different NUMA nodes. In such a case a user needs to provide + an IRQ CPUs definition explicitly using 'irq_cpu_mask' parameter. + + :param cpu_mask: CPU mask that defines which out of present CPUs can be considered for tuning + :param cores_per_irq_core number of cores to allocate a single IRQ core out of, e.g. 6 means allocate a single IRQ + core out of every 6 CPU cores. + :return: CPU mask to bind IRQs to, a.k.a. irq_cpu_mask + """ + cores_key = 'cores' + PUs_key = 'PUs' + + # List of NUMA IDs that own CPUs from the given CPU mask + numa_ids_list = run_hwloc_calc(['-I', 'numa', cpu_mask]).split(",") + + # Let's calculate number of HTs and cores on each NUMA node belonging to the given CPU set + cores_PUs_per_numa = {} # { <numa_id> : {'cores': <number of cores>, 'PUs': <number of PUs>}} + for n in numa_ids_list: + num_cores = int(run_hwloc_calc(['--restrict', cpu_mask, '--number-of', 'core', f'numa:{n}'])) + num_PUs = int(run_hwloc_calc(['--restrict', cpu_mask, '--number-of', 'PU', f'numa:{n}'])) + cores_PUs_per_numa[n] = {cores_key: num_cores, PUs_key: num_PUs} + + # Let's check if configuration on each NUMA is the same. If it's not then we can't auto-detect the IRQs CPU set + # and a user needs to provide it explicitly + num_cores0 = cores_PUs_per_numa[numa_ids_list[0]][cores_key] + num_PUs0 = cores_PUs_per_numa[numa_ids_list[0]][PUs_key] + for n in numa_ids_list: + if cores_PUs_per_numa[n][cores_key] != num_cores0 or cores_PUs_per_numa[n][PUs_key] != num_PUs0: + raise AutodetectError(f"NUMA{n} has a different configuration from NUMA0 for a given CPU mask {cpu_mask}: " + f"{cores_PUs_per_numa[n][cores_key]}:{cores_PUs_per_numa[n][PUs_key]} vs " + f"{num_cores0}:{num_PUs0}. Auto-detection of IRQ CPUs in not possible. " + f"Please, provide irq_cpu_mask explicitly.") + + # Auto-detection of IRQ CPU set is possible - let's get to it! + # + # Total counts for the whole machine + num_cores = int(run_hwloc_calc(['--restrict', cpu_mask, '--number-of', 'core', 'machine:0'])) + num_PUs = int(run_hwloc_calc(['--restrict', cpu_mask, '--number-of', 'PU', 'machine:0'])) + + if num_PUs <= 4: + return cpu_mask + elif num_cores <= 4: + return run_hwloc_calc(['--restrict', cpu_mask, 'PU:0']) + elif num_cores <= cores_per_irq_core: + return run_hwloc_calc(['--restrict', cpu_mask, 'core:0']) + else: + # Big machine. + # Let's allocate a full core out of every cores_per_irq_core cores. + # Let's distribute IRQ cores among present NUMA nodes + num_irq_cores = math.ceil(num_cores / cores_per_irq_core) + hwloc_args = [] + numa_cores_count = {n: 0 for n in numa_ids_list} + added_cores = 0 + while added_cores < num_irq_cores: + for numa in numa_ids_list: + hwloc_args.append(f"node:{numa}.core:{numa_cores_count[numa]}") + added_cores += 1 + numa_cores_count[numa] += 1 + + if added_cores >= num_irq_cores: + break + + return run_hwloc_calc(['--restrict', cpu_mask] + hwloc_args) + + +################################################################################ +class PerfTunerBase(metaclass=abc.ABCMeta): + def __init__(self, args): + self.__args = args + self.__args.cpu_mask = run_hwloc_calc(['--restrict', self.__args.cpu_mask, 'all']) + self.__mode = None + self.__compute_cpu_mask = None + + if self.args.mode: + self.mode = PerfTunerBase.SupportedModes[self.args.mode] + elif args.irq_cpu_mask: + self.irqs_cpu_mask = args.irq_cpu_mask + else: + self.irqs_cpu_mask = auto_detect_irq_mask(self.cpu_mask, self.cores_per_irq_core) + + self.__is_aws_i3_nonmetal_instance = None + +#### Public methods ########################## + class CPUMaskIsZeroException(Exception): + """Thrown if CPU mask turns out to be zero""" + pass + + class SupportedModes(enum.IntEnum): + """ + Modes are ordered from the one that cuts the biggest number of CPUs + from the compute CPUs' set to the one that takes the smallest ('mq' doesn't + cut any CPU from the compute set). + + This fact is used when we calculate the 'common quotient' mode out of a + given set of modes (e.g. default modes of different Tuners) - this would + be the smallest among the given modes. + """ + sq_split = 0 + sq = 1 + mq = 2 + + # Note: no_irq_restrictions should always have the greatest value in the enum since it's the least restricting mode. + no_irq_restrictions = 9999 + + @staticmethod + def names(): + return PerfTunerBase.SupportedModes.__members__.keys() + + @staticmethod + def combine(modes): + """ + :param modes: a set of modes of the PerfTunerBase.SupportedModes type + :return: the mode that is the "common ground" for a given set of modes. + """ + + # Perform an explicit cast in order to verify that the values in the 'modes' are compatible with the + # expected PerfTunerBase.SupportedModes type. + return min([PerfTunerBase.SupportedModes(m) for m in modes]) + + @staticmethod + def cpu_mask_is_zero(cpu_mask): + """ + The cpu_mask is a comma-separated list of 32-bit hex values with possibly omitted zero components, + e.g. 0xffff,,0xffff + We want to estimate if the whole mask is all-zeros. + :param cpu_mask: hwloc-calc generated CPU mask + :return: True if mask is zero, False otherwise + """ + for cur_cpu_mask in cpu_mask.split(','): + if cur_cpu_mask and int(cur_cpu_mask, 16) != 0: + return False + + return True + + @staticmethod + def compute_cpu_mask_for_mode(mq_mode, cpu_mask): + mq_mode = PerfTunerBase.SupportedModes(mq_mode) + + if mq_mode == PerfTunerBase.SupportedModes.sq: + # all but CPU0 + compute_cpu_mask = run_hwloc_calc([cpu_mask, '~PU:0']) + elif mq_mode == PerfTunerBase.SupportedModes.sq_split: + # all but CPU0 and its HT siblings + compute_cpu_mask = run_hwloc_calc([cpu_mask, '~core:0']) + elif mq_mode == PerfTunerBase.SupportedModes.mq: + # all available cores + compute_cpu_mask = cpu_mask + elif mq_mode == PerfTunerBase.SupportedModes.no_irq_restrictions: + # all available cores + compute_cpu_mask = cpu_mask + else: + raise Exception("Unsupported mode: {}".format(mq_mode)) + + if PerfTunerBase.cpu_mask_is_zero(compute_cpu_mask): + raise PerfTunerBase.CPUMaskIsZeroException("Bad configuration mode ({}) and cpu-mask value ({}): this results in a zero-mask for compute".format(mq_mode.name, cpu_mask)) + + return compute_cpu_mask + + @staticmethod + def irqs_cpu_mask_for_mode(mq_mode, cpu_mask): + mq_mode = PerfTunerBase.SupportedModes(mq_mode) + irqs_cpu_mask = 0 + + if mq_mode != PerfTunerBase.SupportedModes.mq and mq_mode != PerfTunerBase.SupportedModes.no_irq_restrictions: + irqs_cpu_mask = run_hwloc_calc([cpu_mask, "~{}".format(PerfTunerBase.compute_cpu_mask_for_mode(mq_mode, cpu_mask))]) + else: # mq_mode == PerfTunerBase.SupportedModes.mq or mq_mode == PerfTunerBase.SupportedModes.no_irq_restrictions + # distribute equally between all available cores + irqs_cpu_mask = cpu_mask + + if PerfTunerBase.cpu_mask_is_zero(irqs_cpu_mask): + raise PerfTunerBase.CPUMaskIsZeroException("Bad configuration mode ({}) and cpu-mask value ({}): this results in a zero-mask for IRQs".format(mq_mode.name, cpu_mask)) + + return irqs_cpu_mask + + @property + def mode(self): + """ + Return the configuration mode + """ + return self.__mode + + @mode.setter + def mode(self, new_mode): + """ + Set the new configuration mode and recalculate the corresponding masks. + """ + # Make sure the new_mode is of PerfTunerBase.AllowedModes type + self.__mode = PerfTunerBase.SupportedModes(new_mode) + self.__compute_cpu_mask = PerfTunerBase.compute_cpu_mask_for_mode(self.__mode, self.__args.cpu_mask) + self.__irq_cpu_mask = PerfTunerBase.irqs_cpu_mask_for_mode(self.__mode, self.__args.cpu_mask) + + @property + def cpu_mask(self): + """ + Return the CPU mask we operate on (the total CPU set) + """ + + return self.__args.cpu_mask + + @property + def cores_per_irq_core(self): + """ + Return the number of cores we are going to allocate a single IRQ core out of when auto-detecting + """ + return self.__args.cores_per_irq_core + + @staticmethod + def min_cores_per_irq_core(): + """ + A minimum value of cores_per_irq_core. + We don't allocate a full IRQ core if total number of CPU cores is less or equal to 4. + """ + return 5 + + @property + def compute_cpu_mask(self): + """ + Return the CPU mask to use for seastar application binding. + """ + return self.__compute_cpu_mask + + @property + def irqs_cpu_mask(self): + """ + Return the mask of CPUs used for IRQs distribution. + """ + return self.__irq_cpu_mask + + @irqs_cpu_mask.setter + def irqs_cpu_mask(self, new_irq_cpu_mask): + self.__irq_cpu_mask = new_irq_cpu_mask + + # Sanity check + if PerfTunerBase.cpu_mask_is_zero(self.__irq_cpu_mask): + raise PerfTunerBase.CPUMaskIsZeroException("Bad configuration: zero IRQ CPU mask is given") + + if run_hwloc_calc([self.__irq_cpu_mask]) == run_hwloc_calc([self.cpu_mask]): + # Special case: if IRQ CPU mask is the same as total CPU mask - set a Compute CPU mask to cpu_mask + self.__compute_cpu_mask = self.cpu_mask + else: + # Otherwise, a Compute CPU mask is a CPU mask without IRQ CPU mask bits + self.__compute_cpu_mask = run_hwloc_calc([self.cpu_mask, f"~{self.__irq_cpu_mask}"]) + + # Sanity check + if PerfTunerBase.cpu_mask_is_zero(self.__compute_cpu_mask): + raise PerfTunerBase.CPUMaskIsZeroException( + f"Bad configuration: cpu_maks:{self.cpu_mask}, irq_cpu_mask:{self.__irq_cpu_mask}: " + f"results in a zero-mask for compute") + + @property + def is_aws_i3_non_metal_instance(self): + """ + :return: True if we are running on the AWS i3.nonmetal instance, e.g. i3.4xlarge + """ + if self.__is_aws_i3_nonmetal_instance is None: + self.__check_host_type() + + return self.__is_aws_i3_nonmetal_instance + + @property + def args(self): + return self.__args + + @property + def irqs(self): + return self._get_irqs() + +#### "Protected"/Public (pure virtual) methods ########### + @abc.abstractmethod + def tune(self): + pass + + + @abc.abstractmethod + def _get_irqs(self): + """ + Return the iteratable value with all IRQs to be configured. + """ + pass + +#### Private methods ############################ + def __check_host_type(self): + """ + Check if we are running on the AWS i3 nonmetal instance. + If yes, set self.__is_aws_i3_nonmetal_instance to True, and to False otherwise. + """ + try: + aws_instance_type = urllib.request.urlopen("http://169.254.169.254/latest/meta-data/instance-type", timeout=0.1).read().decode() + if re.match(r'^i3\.((?!metal)\w)+$', aws_instance_type): + self.__is_aws_i3_nonmetal_instance = True + else: + self.__is_aws_i3_nonmetal_instance = False + + return + except (urllib.error.URLError, ConnectionError, TimeoutError): + # Non-AWS case + pass + except: + logging.warning("Unexpected exception while attempting to access AWS meta server: {}".format(sys.exc_info()[0])) + + self.__is_aws_i3_nonmetal_instance = False + +################################################# +class NetPerfTuner(PerfTunerBase): + def __init__(self, args): + super().__init__(args) + + self.nics=args.nics + + self.__nic_is_bond_iface = self.__check_dev_is_bond_iface() + self.__slaves = self.__learn_slaves() + + # check that self.nics contain a HW device or a bonding interface + self.__check_nics() + + # Fetch IRQs related info + self.__get_irqs_info() + + +#### Public methods ############################ + def tune(self): + """ + Tune the networking server configuration. + """ + for nic in self.nics: + if self.nic_is_hw_iface(nic): + perftune_print("Setting a physical interface {}...".format(nic)) + self.__setup_one_hw_iface(nic) + else: + perftune_print("Setting {} bonding interface...".format(nic)) + self.__setup_bonding_iface(nic) + + # Increase the socket listen() backlog + fwriteln_and_log('/proc/sys/net/core/somaxconn', '4096') + + # Increase the maximum number of remembered connection requests, which are still + # did not receive an acknowledgment from connecting client. + fwriteln_and_log('/proc/sys/net/ipv4/tcp_max_syn_backlog', '4096') + + def nic_is_bond_iface(self, nic): + return self.__nic_is_bond_iface[nic] + + def nic_exists(self, nic): + return self.__iface_exists(nic) + + def nic_is_hw_iface(self, nic): + return self.__dev_is_hw_iface(nic) + + def slaves(self, nic): + """ + Returns an iterator for all slaves of the nic. + If agrs.nic is not a bonding interface an attempt to use the returned iterator + will immediately raise a StopIteration exception - use __dev_is_bond_iface() check to avoid this. + """ + return iter(self.__slaves[nic]) + +#### Protected methods ########################## + def _get_irqs(self): + """ + Returns the iterator for all IRQs that are going to be configured (according to args.nics parameter). + For instance, for a bonding interface that's going to include IRQs of all its slaves. + """ + return itertools.chain.from_iterable(self.__nic2irqs.values()) + +#### Private methods ############################ + def __get_irqs_info(self): + self.__irqs2procline = get_irqs2procline_map() + self.__nic2irqs = self.__learn_irqs() + + @property + def __rfs_table_size(self): + return 32768 + + def __check_nics(self): + """ + Checks that self.nics are supported interfaces + """ + for nic in self.nics: + if not self.nic_exists(nic): + raise Exception("Device {} does not exist".format(nic)) + if not self.nic_is_hw_iface(nic) and not self.nic_is_bond_iface(nic): + raise Exception("Not supported virtual device {}".format(nic)) + + def __get_irqs_one(self, iface): + """ + Returns the list of IRQ numbers for the given interface. + """ + return self.__nic2irqs[iface] + + def __setup_rfs(self, iface): + rps_limits = glob.glob("/sys/class/net/{}/queues/*/rps_flow_cnt".format(iface)) + one_q_limit = int(self.__rfs_table_size / len(rps_limits)) + + # If RFS feature is not present - get out + try: + run_one_command(['sysctl', 'net.core.rps_sock_flow_entries']) + except: + return + + # Enable RFS + perftune_print("Setting net.core.rps_sock_flow_entries to {}".format(self.__rfs_table_size)) + run_one_command(['sysctl', '-w', 'net.core.rps_sock_flow_entries={}'.format(self.__rfs_table_size)]) + + # Set each RPS queue limit + for rfs_limit_cnt in rps_limits: + msg = "Setting limit {} in {}".format(one_q_limit, rfs_limit_cnt) + fwriteln(rfs_limit_cnt, "{}".format(one_q_limit), log_message=msg) + + # Enable/Disable ntuple filtering HW offload on the NIC. This is going to enable/disable aRFS on NICs supporting + # aRFS since ntuple is pre-requisite for an aRFS feature. + # If no explicit configuration has been requested enable ntuple (and thereby aRFS) only in MQ mode. + # + # aRFS acts similar to (SW) RFS: it places a TCP packet on a HW queue that it supposed to be "close" to an + # application thread that sent a packet on the same TCP stream. + # + # For instance if a given TCP stream was sent from CPU3 then the next Rx packet is going to be placed in an Rx + # HW queue which IRQ affinity is set to CPU3 or otherwise to the one with affinity close enough to CPU3. + # + # Read more here: https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/performance_tuning_guide/network-acc-rfs + # + # Obviously it would achieve the best result if there is at least one Rx HW queue with an affinity set to each + # application threads that handle TCP. + # + # And, similarly, if we know in advance that there won't be any such HW queue (sq and sq_split modes) - there is + # no sense enabling aRFS. + op = "Enable" + value = 'on' + + if (self.args.enable_arfs is None and self.irqs_cpu_mask == self.cpu_mask) or self.args.enable_arfs is False: + op = "Disable" + value = 'off' + + ethtool_msg = "{} ntuple filtering HW offload for {}...".format(op, iface) + + if dry_run_mode: + perftune_print(ethtool_msg) + run_one_command(['ethtool','-K', iface, 'ntuple', value], stderr=subprocess.DEVNULL) + else: + try: + print("Trying to {} ntuple filtering HW offload for {}...".format(op.lower(), iface), end='') + run_one_command(['ethtool','-K', iface, 'ntuple', value], stderr=subprocess.DEVNULL) + print("ok") + except: + print("not supported") + + def __setup_rps(self, iface, mask): + for one_rps_cpus in self.__get_rps_cpus(iface): + set_one_mask(one_rps_cpus, mask) + + self.__setup_rfs(iface) + + def __setup_xps(self, iface): + xps_cpus_list = glob.glob("/sys/class/net/{}/queues/*/xps_cpus".format(iface)) + masks = run_hwloc_distrib(["{}".format(len(xps_cpus_list))]) + + for i, mask in enumerate(masks): + set_one_mask(xps_cpus_list[i], mask) + + def __iface_exists(self, iface): + if len(iface) == 0: + return False + return os.path.exists("/sys/class/net/{}".format(iface)) + + def __dev_is_hw_iface(self, iface): + return os.path.exists("/sys/class/net/{}/device".format(iface)) + + def __check_dev_is_bond_iface(self): + bond_dict = {} + if not os.path.exists('/sys/class/net/bonding_masters'): + for nic in self.nics: + bond_dict[nic] = False + #return False for every nic + return bond_dict + for nic in self.nics: + bond_dict[nic] = any([re.search(nic, line) for line in open('/sys/class/net/bonding_masters', 'r').readlines()]) + return bond_dict + + def __learn_slaves(self): + slaves_list_per_nic = {} + for nic in self.nics: + if self.nic_is_bond_iface(nic): + slaves_list_per_nic[nic] = list(itertools.chain.from_iterable([line.split() for line in open("/sys/class/net/{}/bonding/slaves".format(nic), 'r').readlines()])) + + return slaves_list_per_nic + + def __intel_irq_to_queue_idx(self, irq): + """ + Return the HW queue index for a given IRQ for Intel NICs in order to sort the IRQs' list by this index. + + Intel's fast path IRQs have the following name convention: + <bla-bla>-TxRx-<queue index> + + Intel NICs also have the IRQ for Flow Director (which is not a regular fast path IRQ) whose name looks like + this: + <bla-bla>:fdir-TxRx-<index> + + We want to put the Flow Director's IRQ at the end of the sorted list of IRQs. + + :param irq: IRQ number + :return: HW queue index for Intel NICs and sys.maxsize for all other NICs + """ + intel_fp_irq_re = re.compile("\-TxRx\-(\d+)") + fdir_re = re.compile("fdir\-TxRx\-\d+") + + m = intel_fp_irq_re.search(self.__irqs2procline[irq]) + m1 = fdir_re.search(self.__irqs2procline[irq]) + if m and not m1: + return int(m.group(1)) + else: + return sys.maxsize + + def __mlx_irq_to_queue_idx(self, irq): + """ + Return the HW queue index for a given IRQ for Mellanox NICs in order to sort the IRQs' list by this index. + + Mellanox NICs have the IRQ which name looks like + this: + mlx5_comp23 + mlx5_comp<index> + or this: + mlx4-6 + mlx4-<index> + + :param irq: IRQ number + :return: HW queue index for Mellanox NICs and sys.maxsize for all other NICs + """ + mlx5_fp_irq_re = re.compile("mlx5_comp(\d+)") + mlx4_fp_irq_re = re.compile("mlx4\-(\d+)") + + m5 = mlx5_fp_irq_re.search(self.__irqs2procline[irq]) + if m5: + return int(m5.group(1)) + else: + m4 = mlx4_fp_irq_re.search(self.__irqs2procline[irq]) + if m4: + return int(m4.group(1)) + + return sys.maxsize + + def __virtio_irq_to_queue_idx(self, irq): + """ + Return the HW queue index for a given IRQ for VIRTIO in order to sort the IRQs' list by this index. + + VIRTIO NICs have the IRQ's name that looks like this: + Queue K of a device virtioY, where Y is some integer is comprised of 2 IRQs + with following names: + * Tx IRQ: + virtioY-output.K + * Rx IRQ: + virtioY-input.K + + :param irq: IRQ number + :return: HW queue index for VIRTIO fast path IRQ and sys.maxsize for all other IRQs + """ + virtio_fp_re = re.compile(r"virtio\d+-(input|output)\.(\d+)$") + + virtio_fp_irq = virtio_fp_re.search(self.__irqs2procline[irq]) + if virtio_fp_irq: + return int(virtio_fp_irq.group(2)) + + return sys.maxsize + + + def __get_driver_name(self, iface): + """ + :param iface: Interface to check + :return: driver name from ethtool + """ + + driver_name = '' + ethtool_i_lines = run_ethtool(['-i', iface]) + driver_re = re.compile("driver:") + driver_lines = list(filter(lambda one_line: driver_re.search(one_line), ethtool_i_lines)) + + if driver_lines: + if len(driver_lines) > 1: + raise Exception("More than one 'driver:' entries in the 'ethtool -i {}' output. Unable to continue.".format(iface)) + + driver_name = driver_lines[0].split()[1].strip() + + return driver_name + + def __learn_irqs_one(self, iface): + """ + This is a slow method that is going to read from the system files. Never + use it outside the initialization code. Use __get_irqs_one() instead. + + Filter the fast path queues IRQs from the __get_all_irqs_one() result according to the known + patterns. + Right now we know about the following naming convention of the fast path queues vectors: + - Intel: <bla-bla>-TxRx-<bla-bla> + - Broadcom: <bla-bla>-fp-<bla-bla> + - ena: <bla-bla>-Tx-Rx-<bla-bla> + - Mellanox: for mlx4 + mlx4-<queue idx>@<bla-bla> + or for mlx5 + mlx5_comp<queue idx>@<bla-bla> + - VIRTIO: virtioN-[input|output].D + + So, we will try to filter the etries in /proc/interrupts for IRQs we've got from get_all_irqs_one() + according to the patterns above. + + If as a result all IRQs are filtered out (if there are no IRQs with the names from the patterns above) then + this means that the given NIC uses a different IRQs naming pattern. In this case we won't filter any IRQ. + + Otherwise, we will use only IRQs which names fit one of the patterns above. + + For NICs with a limited number of Rx queues the IRQs that handle Rx are going to be at the beginning of the + list. + """ + # filter 'all_irqs' to only reference valid keys from 'irqs2procline' and avoid an IndexError on the 'irqs' search below + all_irqs = set(learn_all_irqs_one("/sys/class/net/{}/device".format(iface), self.__irqs2procline, iface)).intersection(self.__irqs2procline.keys()) + fp_irqs_re = re.compile("\-TxRx\-|\-fp\-|\-Tx\-Rx\-|mlx4-\d+@|mlx5_comp\d+@|virtio\d+-(input|output)") + irqs = sorted(list(filter(lambda irq : fp_irqs_re.search(self.__irqs2procline[irq]), all_irqs))) + if irqs: + irqs.sort(key=self.__get_irq_to_queue_idx_functor(iface)) + return irqs + else: + return list(all_irqs) + + def __get_irq_to_queue_idx_functor(self, iface): + """ + Get a functor returning a queue index for a given IRQ. + This functor is needed for NICs that are known to not release IRQs when the number of Rx + channels is reduced or have extra IRQs for non-RSS channels. + + Therefore, for these NICs we need a functor that would allow us to pick IRQs that belong to channels that are + going to handle TCP traffic: first X channels, where the value of X depends on the NIC's type and configuration. + + For others, e.g. ENA, or Broadcom, which are only going to allocate IRQs that belong to TCP handling channels, + we don't really need to sort them as long as we filter fast path IRQs and distribute them evenly among IRQ CPUs. + + :param iface: NIC's interface name, e.g. eth19 + :return: A functor that returns a queue index for a given IRQ if a mapping is known + or a constant big integer value if mapping is unknown. + """ + # There are a few known drivers for which we know how to get a queue index from an IRQ name in /proc/interrupts + driver_name = self.__get_driver_name(iface) + + # Every functor returns a sys.maxsize for an unknown driver IRQs. + # So, choosing Intel's as a default is as good as any other. + irq_to_idx_func = self.__intel_irq_to_queue_idx + if driver_name.startswith("mlx"): + irq_to_idx_func = self.__mlx_irq_to_queue_idx + elif driver_name.startswith("virtio"): + irq_to_idx_func = self.__virtio_irq_to_queue_idx + + return irq_to_idx_func + + def __irq_lower_bound_by_queue(self, iface, irqs, queue_idx): + """ + Get the index of the first element in irqs array which queue is greater or equal to a given index. + IRQs array is supposed to be sorted by queues numbers IRQs belong to. + + There are additional assumptions: + * IRQs array items queue numbers are monotonically not decreasing, and if it increases then it increases by + one. + * Queue indexes are numbered starting from zero. + + :param irqs: IRQs array sorted by queues numbers IRQs belong to + :param queue_idx: Queue index to partition by + :return: The first index in the IRQs array that corresponds to a queue number greater or equal to a given index + which is at least queue_idx. If there is no such IRQ - returns len(irqs). + """ + irq_to_idx_func = self.__get_irq_to_queue_idx_functor(iface) + + if queue_idx < len(irqs): + for idx in range(queue_idx, len(irqs)): + if irq_to_idx_func(irqs[idx]) >= queue_idx: + return idx + + return len(irqs) + + def __learn_irqs(self): + """ + This is a slow method that is going to read from the system files. Never + use it outside the initialization code. + """ + nic_irq_dict={} + for nic in self.nics: + if self.nic_is_bond_iface(nic): + for slave in filter(self.__dev_is_hw_iface, self.slaves(nic)): + nic_irq_dict[slave] = self.__learn_irqs_one(slave) + else: + nic_irq_dict[nic] = self.__learn_irqs_one(nic) + return nic_irq_dict + + def __get_rps_cpus(self, iface): + """ + Prints all rps_cpus files names for the given HW interface. + + There is a single rps_cpus file for each RPS queue and there is a single RPS + queue for each HW Rx queue. Each HW Rx queue should have an IRQ. + Therefore the number of these files is equal to the number of fast path Rx IRQs for this interface. + """ + return glob.glob("/sys/class/net/{}/queues/*/rps_cpus".format(iface)) + + def __set_rx_channels_count(self, iface, count): + """ + Try to set the number of Rx channels of a given interface to a given value. + + Rx channels of any NIC can be configured using 'ethtool -L' command using one of the following semantics: + + ethtool -L <iface> rx <count> + or + ethtool -L <iface> combined <count> + + If a specific semantics is not supported by a given NIC or if changing the number of channels is not supported + ethtool is going to return an error. + + Instead of parsing and trying to detect which one of the following semantics a given interface supports we will + simply try to use both semantics till either one of them succeeds or both fail. + + + :param iface: NIC interface name, e.g. eth4 + :param count: number of Rx channels we want to configure + :return: True if configuration was successful, False otherwise + """ + options = ["rx", "combined"] + for o in options: + try: + cmd = ['ethtool', '-L', iface, o, f"{count}"] + perftune_print(f"Executing: {' '.join(cmd)}") + run_one_command(cmd, stderr=subprocess.DEVNULL) + return True + except subprocess.CalledProcessError: + pass + + return False + + def __setup_one_hw_iface(self, iface): + # Set Rx channels count to a number of IRQ CPUs unless an explicit count is given + if self.args.num_rx_queues is not None: + num_rx_channels = self.args.num_rx_queues + else: + num_rx_channels = 0 + + # If a mask is wider than 32 bits it's going to be presented as a comma-separated list of 32-bit masks + # with possibly omitted zero components, e.g. 0x01,0x100,,0x12122 + for m in self.irqs_cpu_mask.split(","): + if m: + num_rx_channels += bin(int(m, 16)).count('1') + + # Let's try setting the number of Rx channels to the number of IRQ CPUs. + # + # If we were able to change the number of Rx channels the number of IRQs could have changed. + # In this case let's refresh IRQs info. + rx_channels_set = self.__set_rx_channels_count(iface, num_rx_channels) + if rx_channels_set: + self.__get_irqs_info() + + max_num_rx_queues = self.__max_rx_queue_count(iface) + all_irqs = self.__get_irqs_one(iface) + + # Bind the NIC's IRQs according to the configuration mode + # + # If this NIC has a limited number of Rx queues then we want to distribute their IRQs separately. + # For such NICs we've sorted IRQs list so that IRQs that handle Rx are all at the head of the list. + if rx_channels_set or max_num_rx_queues < len(all_irqs): + num_rx_queues = self.__get_rx_queue_count(iface) + tcp_irqs_lower_bound = self.__irq_lower_bound_by_queue(iface, all_irqs, num_rx_queues) + perftune_print(f"Distributing IRQs handling Rx and Tx for first {num_rx_queues} channels:") + distribute_irqs(all_irqs[0:tcp_irqs_lower_bound], self.irqs_cpu_mask) + perftune_print("Distributing the rest of IRQs") + distribute_irqs(all_irqs[tcp_irqs_lower_bound:], self.irqs_cpu_mask) + else: + perftune_print("Distributing all IRQs") + distribute_irqs(all_irqs, self.irqs_cpu_mask) + + self.__setup_rps(iface, self.cpu_mask) + self.__setup_xps(iface) + + def __setup_bonding_iface(self, nic): + for slave in self.slaves(nic): + if self.__dev_is_hw_iface(slave): + perftune_print("Setting up {}...".format(slave)) + self.__setup_one_hw_iface(slave) + else: + perftune_print("Skipping {} (not a physical slave device?)".format(slave)) + + def __max_rx_queue_count(self, iface): + """ + :param iface: Interface to check + :return: The maximum number of RSS queues for the given interface if there is known limitation and sys.maxsize + otherwise. + + Networking drivers serving HW with the known maximum RSS queue limitation (due to lack of RSS bits): + + ixgbe: PF NICs support up to 16 RSS queues. + ixgbevf: VF NICs support up to 4 RSS queues. + i40e: PF NICs support up to 64 RSS queues. + i40evf: VF NICs support up to 16 RSS queues. + + """ + driver_to_max_rss = {'ixgbe': 16, 'ixgbevf': 4, 'i40e': 64, 'i40evf': 16} + + driver_name = self.__get_driver_name(iface) + return driver_to_max_rss.get(driver_name, sys.maxsize) + + def __get_rx_queue_count(self, iface): + """ + :return: the RSS Rx queues count for the given interface. + """ + num_irqs = len(self.__get_irqs_one(iface)) + rx_queues_count = len(self.__get_rps_cpus(iface)) + + if rx_queues_count == 0: + rx_queues_count = num_irqs + + return min(self.__max_rx_queue_count(iface), rx_queues_count) + + + +class ClocksourceManager: + class PreferredClockSourceNotAvailableException(Exception): + pass + + def __init__(self, args): + self.__args = args + self._preferred = {"x86_64": "tsc", "kvm": "kvm-clock"} + self._arch = self._get_arch() + self._available_clocksources_file = "/sys/devices/system/clocksource/clocksource0/available_clocksource" + self._current_clocksource_file = "/sys/devices/system/clocksource/clocksource0/current_clocksource" + self._recommendation_if_unavailable = { "x86_64": "The tsc clocksource is not available. Consider using a hardware platform where the tsc clocksource is available, or try forcing it withe the tsc=reliable boot option", "kvm": "kvm-clock is not available" } + + def _available_clocksources(self): + return open(self._available_clocksources_file).readline().split() + + def _current_clocksource(self): + return open(self._current_clocksource_file).readline().strip() + + def _get_arch(self): + try: + virt = run_read_only_command(['systemd-detect-virt']).strip() + if virt == "kvm": + return virt + except: + pass + return platform.machine() + + def enforce_preferred_clocksource(self): + fwriteln(self._current_clocksource_file, self._preferred[self._arch], "Setting clocksource to {}".format(self._preferred[self._arch])) + + def preferred(self): + return self._preferred[self._arch] + + def setting_available(self): + return self._arch in self._preferred + + def preferred_clocksource_available(self): + return self._preferred[self._arch] in self._available_clocksources() + + def recommendation_if_unavailable(self): + return self._recommendation_if_unavailable[self._arch] + +class SystemPerfTuner(PerfTunerBase): + def __init__(self, args): + super().__init__(args) + self._clocksource_manager = ClocksourceManager(args) + + def tune(self): + if self.args.tune_clock: + if not self._clocksource_manager.setting_available(): + perftune_print("Clocksource setting not available or not needed for this architecture. Not tuning") + elif not self._clocksource_manager.preferred_clocksource_available(): + perftune_print(self._clocksource_manager.recommendation_if_unavailable()) + else: + self._clocksource_manager.enforce_preferred_clocksource() + +#### Protected methods ########################## + def _get_irqs(self): + return [] + + +################################################# +class DiskPerfTuner(PerfTunerBase): + class SupportedDiskTypes(enum.IntEnum): + nvme = 0 + non_nvme = 1 + + def __init__(self, args): + super().__init__(args) + + if not (self.args.dirs or self.args.devs): + raise Exception("'disks' tuning was requested but neither directories nor storage devices were given") + + self.__pyudev_ctx = pyudev.Context() + self.__dir2disks = self.__learn_directories() + self.__irqs2procline = get_irqs2procline_map() + self.__disk2irqs = self.__learn_irqs() + self.__type2diskinfo = self.__group_disks_info_by_type() + + # sets of devices that have already been tuned + self.__io_scheduler_tuned_devs = set() + self.__nomerges_tuned_devs = set() + self.__write_back_cache_tuned_devs = set() + +#### Public methods ############################# + def tune(self): + """ + Distribute IRQs according to the requested mode (args.mode): + - Distribute NVMe disks' IRQs equally among all available CPUs. + - Distribute non-NVMe disks' IRQs equally among designated CPUs or among + all available CPUs in the 'mq' mode. + """ + non_nvme_disks, non_nvme_irqs = self.__disks_info_by_type(DiskPerfTuner.SupportedDiskTypes.non_nvme) + if non_nvme_disks: + perftune_print("Setting non-NVMe disks: {}...".format(", ".join(non_nvme_disks))) + distribute_irqs(non_nvme_irqs, self.irqs_cpu_mask) + self.__tune_disks(non_nvme_disks) + else: + perftune_print("No non-NVMe disks to tune") + + nvme_disks, nvme_irqs = self.__disks_info_by_type(DiskPerfTuner.SupportedDiskTypes.nvme) + if nvme_disks: + # Linux kernel is going to use IRQD_AFFINITY_MANAGED mode for NVMe IRQs + # on most systems (currently only AWS i3 non-metal are known to have a + # different configuration). SMP affinity of an IRQ in this mode may not be + # changed and an attempt to modify it is going to fail. However right now + # the only way to determine that IRQD_AFFINITY_MANAGED mode has been used + # is to attempt to modify IRQ SMP affinity (and fail) therefore we prefer + # to always do it. + # + # What we don't want however is to see annoying errors every time we + # detect that IRQD_AFFINITY_MANAGED was actually used. Therefore we will only log + # them in the "verbose" mode or when we run on an i3.nonmetal AWS instance. + perftune_print("Setting NVMe disks: {}...".format(", ".join(nvme_disks))) + distribute_irqs(nvme_irqs, self.args.cpu_mask, + log_errors=(self.is_aws_i3_non_metal_instance or self.args.verbose)) + self.__tune_disks(nvme_disks) + else: + perftune_print("No NVMe disks to tune") + +#### Protected methods ########################## + def _get_irqs(self): + return itertools.chain.from_iterable(irqs for disks, irqs in self.__type2diskinfo.values()) + +#### Private methods ############################ + @property + def __io_schedulers(self): + """ + :return: An ordered list of IO schedulers that we want to configure. Schedulers are ordered by their priority + from the highest (left most) to the lowest. + """ + return ["none", "noop"] + + @property + def __nomerges(self): + return '2' + + @property + def __write_cache_config(self): + """ + :return: None - if write cache mode configuration is not requested or the corresponding write cache + configuration value string + """ + if self.args.set_write_back is None: + return None + + return "write back" if self.args.set_write_back else "write through" + + def __disks_info_by_type(self, disks_type): + """ + Returns a tuple ( [<disks>], [<irqs>] ) for the given disks type. + IRQs numbers in the second list are promised to be unique. + """ + return self.__type2diskinfo[DiskPerfTuner.SupportedDiskTypes(disks_type)] + + def __nvme_fast_path_irq_filter(self, irq): + """ + Return True for fast path NVMe IRQs. + For NVMe device only queues 1-<number of CPUs> are going to do fast path work. + + NVMe IRQs have the following name convention: + nvme<device index>q<queue index>, e.g. nvme0q7 + + :param irq: IRQ number + :return: True if this IRQ is an IRQ of a FP NVMe queue. + """ + nvme_irq_re = re.compile(r'(\s|^)nvme\d+q(\d+)(\s|$)') + + # There may be more than an single HW queue bound to the same IRQ. In this case queue names are going to be + # comma separated + split_line = self.__irqs2procline[irq].split(",") + + for line in split_line: + m = nvme_irq_re.search(line) + if m and 0 < int(m.group(2)) <= multiprocessing.cpu_count(): + return True + + return False + + def __group_disks_info_by_type(self): + """ + Return a map of tuples ( [<disks>], [<irqs>] ), where "disks" are all disks of the specific type + and "irqs" are the corresponding IRQs. + + It's promised that every element is "disks" and "irqs" is unique. + + The disk types are 'nvme' and 'non-nvme' + """ + disks_info_by_type = {} + nvme_disks = set() + nvme_irqs = set() + non_nvme_disks = set() + non_nvme_irqs = set() + nvme_disk_name_pattern = re.compile('^nvme') + + for disk, irqs in self.__disk2irqs.items(): + if nvme_disk_name_pattern.search(disk): + nvme_disks.add(disk) + for irq in irqs: + nvme_irqs.add(irq) + else: + non_nvme_disks.add(disk) + for irq in irqs: + non_nvme_irqs.add(irq) + + if not (nvme_disks or non_nvme_disks): + raise Exception("'disks' tuning was requested but no disks were found") + + nvme_irqs = list(nvme_irqs) + + # There is a known issue with Xen hypervisor that exposes itself on AWS i3 instances where nvme module + # over-allocates HW queues and uses only queues 1,2,3,..., <up to number of CPUs> for data transfer. + # On these instances we will distribute only these queues. + + if self.is_aws_i3_non_metal_instance: + nvme_irqs = list(filter(self.__nvme_fast_path_irq_filter, nvme_irqs)) + + # Sort IRQs for easier verification + nvme_irqs.sort(key=lambda irq_num_str: int(irq_num_str)) + + disks_info_by_type[DiskPerfTuner.SupportedDiskTypes.nvme] = (list(nvme_disks), nvme_irqs) + disks_info_by_type[DiskPerfTuner.SupportedDiskTypes.non_nvme] = ( list(non_nvme_disks), list(non_nvme_irqs) ) + + return disks_info_by_type + + def __learn_directories(self): + return { directory : self.__learn_directory(directory) for directory in self.args.dirs } + + def __learn_directory(self, directory, recur=False): + """ + Returns a list of disks the given directory is mounted on (there will be more than one if + the mount point is on the RAID volume) + """ + if not os.path.exists(directory): + if not recur: + perftune_print("{} doesn't exist - skipping".format(directory)) + + return [] + + try: + udev_obj = pyudev.Devices.from_device_number(self.__pyudev_ctx, 'block', os.stat(directory).st_dev) + return self.__get_phys_devices(udev_obj) + except: + # handle cases like ecryptfs where the directory is mounted to another directory and not to some block device + filesystem = run_read_only_command(['df', '-P', directory]).splitlines()[-1].split()[0].strip() + if not re.search(r'^/dev/', filesystem): + devs = self.__learn_directory(filesystem, True) + else: + raise Exception("Logic error: failed to create a udev device while 'df -P' {} returns a {}".format(directory, filesystem)) + + # log error only for the original directory + if not recur and not devs: + perftune_print("Can't get a block device for {} - skipping".format(directory)) + + return devs + + def __get_phys_devices(self, udev_obj): + # if device is a virtual device - the underlying physical devices are going to be its slaves + if re.search(r'virtual', udev_obj.sys_path): + slaves = os.listdir(os.path.join(udev_obj.sys_path, 'slaves')) + # If the device is virtual but doesn't have slaves (e.g. as nvm-subsystem virtual devices) handle it + # as a regular device. + if slaves: + return list(itertools.chain.from_iterable([ self.__get_phys_devices(pyudev.Devices.from_device_file(self.__pyudev_ctx, "/dev/{}".format(slave))) for slave in slaves ])) + + # device node is something like /dev/sda1 - we need only the part without /dev/ + return [ re.match(r'/dev/(\S+\d*)', udev_obj.device_node).group(1) ] + + def __learn_irqs(self): + disk2irqs = {} + + for devices in list(self.__dir2disks.values()) + [ self.args.devs ]: + for device in devices: + # There could be that some of the given directories are on the same disk. + # There is no need to rediscover IRQs of the disk we've already handled. + if device in disk2irqs.keys(): + continue + + udev_obj = pyudev.Devices.from_device_file(self.__pyudev_ctx, "/dev/{}".format(device)) + dev_sys_path = udev_obj.sys_path + + # If the device is a virtual NVMe device it's sys file name goes as follows: + # /sys/devices/virtual/nvme-subsystem/nvme-subsys0/nvme0n1 + # + # and then there is this symlink: + # /sys/devices/virtual/nvme-subsystem/nvme-subsys0/nvme0n1/device/nvme0 -> ../../../pci0000:85/0000:85:01.0/0000:87:00.0/nvme/nvme0 + # + # So, the "main device" is a "nvme\d+" prefix of the actual device name. + if re.search(r'virtual', udev_obj.sys_path): + m = re.match(r'(nvme\d+)\S*', device) + if m: + dev_sys_path = "{}/device/{}".format(udev_obj.sys_path, m.group(1)) + + split_sys_path = list(pathlib.PurePath(pathlib.Path(dev_sys_path).resolve()).parts) + + # first part is always /sys/devices/pciXXX ... + controller_path_parts = split_sys_path[0:4] + + # ...then there is a chain of one or more "domain:bus:device.function" followed by the storage device enumeration crap + # e.g. /sys/devices/pci0000:00/0000:00:1f.2/ata2/host1/target1:0:0/1:0:0:0/block/sda/sda3 or + # /sys/devices/pci0000:00/0000:00:02.0/0000:02:00.0/host6/target6:2:0/6:2:0:0/block/sda/sda1 + # We want only the path till the last BDF including - it contains the IRQs information. + + patt = re.compile("^[0-9ABCDEFabcdef]{4}\:[0-9ABCDEFabcdef]{2}\:[0-9ABCDEFabcdef]{2}\.[0-9ABCDEFabcdef]$") + for split_sys_path_branch in split_sys_path[4:]: + if patt.search(split_sys_path_branch): + controller_path_parts.append(split_sys_path_branch) + else: + break + + controler_path_str = functools.reduce(lambda x, y : os.path.join(x, y), controller_path_parts) + disk2irqs[device] = learn_all_irqs_one(controler_path_str, self.__irqs2procline, 'blkif') + + return disk2irqs + + def __get_feature_file(self, dev_node, path_creator): + """ + Find the closest ancestor with the given feature and return its ('feature file', 'device node') tuple. + + If there isn't such an ancestor - return (None, None) tuple. + + :param dev_node Device node file name, e.g. /dev/sda1 + :param path_creator A functor that creates a feature file name given a device system file name + """ + # Sanity check + if dev_node is None or path_creator is None: + return None, None + + udev = pyudev.Devices.from_device_file(pyudev.Context(), dev_node) + feature_file = path_creator(udev.sys_path) + + if os.path.exists(feature_file): + return feature_file, dev_node + elif udev.parent is not None: + return self.__get_feature_file(udev.parent.device_node, path_creator) + else: + return None, None + + def __tune_one_feature(self, dev_node, path_creator, value, tuned_devs_set): + """ + Find the closest ancestor that has the given feature, configure it and + return True. + + If there isn't such ancestor - return False. + + :param dev_node Device node file name, e.g. /dev/sda1 + :param path_creator A functor that creates a feature file name given a device system file name + """ + feature_file, feature_node = self.__get_feature_file(dev_node, path_creator) + + if feature_file is None: + return False + + if feature_node not in tuned_devs_set: + fwriteln_and_log(feature_file, value) + tuned_devs_set.add(feature_node) + + return True + + def __tune_io_scheduler(self, dev_node, io_scheduler): + return self.__tune_one_feature(dev_node, lambda p : os.path.join(p, 'queue', 'scheduler'), io_scheduler, self.__io_scheduler_tuned_devs) + + def __tune_nomerges(self, dev_node): + return self.__tune_one_feature(dev_node, lambda p : os.path.join(p, 'queue', 'nomerges'), self.__nomerges, self.__nomerges_tuned_devs) + + # If write cache configuration is not requested - return True immediately + def __tune_write_back_cache(self, dev_node): + if self.__write_cache_config is None: + return True + + return self.__tune_one_feature(dev_node, lambda p : os.path.join(p, 'queue', 'write_cache'), self.__write_cache_config, self.__write_back_cache_tuned_devs) + + def __get_io_scheduler(self, dev_node): + """ + Return a supported scheduler that is also present in the required schedulers list (__io_schedulers). + + If there isn't such a supported scheduler - return None. + """ + feature_file, feature_node = self.__get_feature_file(dev_node, lambda p : os.path.join(p, 'queue', 'scheduler')) + + lines = readlines(feature_file) + if not lines: + return None + + # Supported schedulers appear in the config file as a single line as follows: + # + # sched1 [sched2] sched3 + # + # ...with one or more schedulers where currently selected scheduler is the one in brackets. + # + # Return the scheduler with the highest priority among those that are supported for the current device. + supported_schedulers = frozenset([scheduler.lstrip("[").rstrip("]").rstrip("\n") for scheduler in lines[0].split(" ")]) + return next((scheduler for scheduler in self.__io_schedulers if scheduler in supported_schedulers), None) + + def __tune_disk(self, device): + dev_node = "/dev/{}".format(device) + io_scheduler = self.__get_io_scheduler(dev_node) + + if not io_scheduler: + perftune_print("Not setting I/O Scheduler for {} - required schedulers ({}) are not supported".format(device, list(self.__io_schedulers))) + elif not self.__tune_io_scheduler(dev_node, io_scheduler): + perftune_print("Not setting I/O Scheduler for {} - feature not present".format(device)) + + if not self.__tune_nomerges(dev_node): + perftune_print("Not setting 'nomerges' for {} - feature not present".format(device)) + + if not self.__tune_write_back_cache(dev_node): + perftune_print("Not setting 'write_cache' for {} - feature not present".format(device)) + + def __tune_disks(self, disks): + for disk in disks: + self.__tune_disk(disk) + +################################################################################ +class TuneModes(enum.Enum): + disks = 0 + net = 1 + system = 2 + + @staticmethod + def names(): + return list(TuneModes.__members__.keys()) + +argp = argparse.ArgumentParser(description = 'Configure various system parameters in order to improve the seastar application performance.', formatter_class=argparse.RawDescriptionHelpFormatter, + epilog= +''' +This script will: + + - Ban relevant IRQs from being moved by irqbalance. + - Configure various system parameters in /proc/sys. + - Distribute the IRQs (using SMP affinity configuration) among CPUs according to the configuration mode (see below) + or an 'irq_cpu_mask' value. + +As a result some of the CPUs may be destined to only handle the IRQs and taken out of the CPU set +that should be used to run the seastar application ("compute CPU set"). + +Modes description: + + sq - set all IRQs of a given NIC to CPU0 and configure RPS + to spreads NAPIs' handling between other CPUs. + + sq_split - divide all IRQs of a given NIC between CPU0 and its HT siblings and configure RPS + to spreads NAPIs' handling between other CPUs. + + mq - distribute NIC's IRQs among all CPUs instead of binding + them all to CPU0. In this mode RPS is always enabled to + spreads NAPIs' handling between all CPUs. + + If there isn't any mode given script will use a default mode: + - If number of CPU cores is greater than 16, allocate a single IRQ CPU core for each 16 CPU cores in 'cpu_mask'. + IRQ cores are going to be allocated evenly on available NUMA nodes according to 'cpu_mask' value. + - If number of physical CPU cores per Rx HW queue is greater than 4 and less than 16 - use the 'sq-split' mode. + - Otherwise, if number of hyper-threads per Rx HW queue is greater than 4 - use the 'sq' mode. + - Otherwise use the 'mq' mode. + +Default values: + + --nic NIC - default: eth0 + --cpu-mask MASK - default: all available cores mask + --tune-clock - default: false +''') +argp.add_argument('--mode', choices=PerfTunerBase.SupportedModes.names(), help='configuration mode (deprecated, use --irq-cpu-mask instead)') +argp.add_argument('--nic', action='append', help='network interface name(s), by default uses \'eth0\' (may appear more than once)', dest='nics', default=[]) +argp.add_argument('--tune-clock', action='store_true', help='Force tuning of the system clocksource') +argp.add_argument('--get-cpu-mask', action='store_true', help="print the CPU mask to be used for compute") +argp.add_argument('--get-cpu-mask-quiet', action='store_true', help="print the CPU mask to be used for compute, print the zero CPU set if that's what it turns out to be") +argp.add_argument('--get-irq-cpu-mask', action='store_true', help="print the CPU mask to be used for IRQs binding") +argp.add_argument('--verbose', action='store_true', help="be more verbose about operations and their result") +argp.add_argument('--tune', choices=TuneModes.names(), help="components to configure (may be given more than once)", action='append', default=[]) +argp.add_argument('--cpu-mask', help="mask of cores to use, by default use all available cores", metavar='MASK') +argp.add_argument('--irq-cpu-mask', help="mask of cores to use for IRQs binding", metavar='MASK') +argp.add_argument('--dir', help="directory to optimize (may appear more than once)", action='append', dest='dirs', default=[]) +argp.add_argument('--dev', help="device to optimize (may appear more than once), e.g. sda1", action='append', dest='devs', default=[]) +argp.add_argument('--options-file', help="configuration YAML file") +argp.add_argument('--dump-options-file', action='store_true', help="Print the configuration YAML file containing the current configuration") +argp.add_argument('--dry-run', action='store_true', help="Don't take any action, just recommend what to do.") +argp.add_argument('--write-back-cache', help="Enable/Disable \'write back\' write cache mode.", dest="set_write_back") +argp.add_argument('--arfs', help="Enable/Disable aRFS", dest="enable_arfs") +argp.add_argument('--num-rx-queues', help="Set a given number of Rx queues", type=int) +argp.add_argument('--irq-core-auto-detection-ratio', help="Use a given ratio for IRQ mask auto-detection. For " + "instance, if 8 is given and auto-detection is requested, a " + "single IRQ CPU core is going to be allocated for every 8 " + "CPU cores out of available according to a 'cpu_mask' value." + "Default is 16", + type=int, default=16, dest='cores_per_irq_core') + +def parse_cpu_mask_from_yaml(y, field_name, fname): + hex_32bit_pattern='0x[0-9a-fA-F]{1,8}' + mask_pattern = re.compile('^{}((,({})?)*,{})*$'.format(hex_32bit_pattern, hex_32bit_pattern, hex_32bit_pattern)) + + if mask_pattern.match(str(y[field_name])): + return y[field_name] + else: + raise Exception("Bad '{}' value in {}: {}".format(field_name, fname, str(y[field_name]))) + +def extend_and_unique(orig_list, iterable): + """ + Extend items to a list, and make the list items unique + """ + assert(isinstance(orig_list, list)) + assert(isinstance(iterable, list)) + orig_list.extend(iterable) + return list(set(orig_list)) + +def parse_tri_state_arg(value, arg_name): + try: + if value is not None: + return distutils.util.strtobool(value) + else: + return None + except: + sys.exit("Invalid {} value: should be boolean but given: {}".format(arg_name, value)) + +def parse_options_file(prog_args): + if not prog_args.options_file: + return + + y = yaml.safe_load(open(prog_args.options_file)) + if y is None: + return + + if 'mode' in y and not prog_args.mode: + if not y['mode'] in PerfTunerBase.SupportedModes.names(): + raise Exception("Bad 'mode' value in {}: {}".format(prog_args.options_file, y['mode'])) + prog_args.mode = y['mode'] + + if 'nic' in y: + # Multiple nics was supported by commit a2fc9d72c31b97840bc75ae49dbd6f4b6d394e25 + # `nic' option dumped to config file will be list after this change, but the `nic' + # option in old config file is still string, which was generated before this change. + # So here convert the string option to list. + if not isinstance(y['nic'], list): + y['nic'] = [y['nic']] + prog_args.nics = extend_and_unique(prog_args.nics, y['nic']) + + if 'tune_clock' in y and not prog_args.tune_clock: + prog_args.tune_clock= y['tune_clock'] + + if 'tune' in y: + if set(y['tune']) <= set(TuneModes.names()): + prog_args.tune = extend_and_unique(prog_args.tune, y['tune']) + else: + raise Exception("Bad 'tune' value in {}: {}".format(prog_args.options_file, y['tune'])) + + if 'cpu_mask' in y and not prog_args.cpu_mask: + prog_args.cpu_mask = parse_cpu_mask_from_yaml(y, 'cpu_mask', prog_args.options_file) + + if 'irq_cpu_mask' in y and not prog_args.irq_cpu_mask: + prog_args.irq_cpu_mask = parse_cpu_mask_from_yaml(y, 'irq_cpu_mask', prog_args.options_file) + + if 'dir' in y: + prog_args.dirs = extend_and_unique(prog_args.dirs, y['dir']) + + if 'dev' in y: + prog_args.devs = extend_and_unique(prog_args.devs, y['dev']) + + if 'write_back_cache' in y: + prog_args.set_write_back = distutils.util.strtobool("{}".format(y['write_back_cache'])) + + if 'arfs' in y: + prog_args.enable_arfs = distutils.util.strtobool("{}".format(y['arfs'])) + + if 'num_rx_queues' in y: + prog_args.num_rx_queues = int(y['num_rx_queues']) + + # prog_options['irq_core_auto_detection_ratio'] = prog_args.cores_per_irq_core + if 'irq_core_auto_detection_ratio' in y: + prog_args.cores_per_irq_core = int(y['irq_core_auto_detection_ratio']) + +def dump_config(prog_args): + prog_options = {} + + if prog_args.mode: + assert prog_args.cpu_mask, "cpu_mask has to always be set. Something is terribly wrong (a bug in perftune.py?)" + mode = PerfTunerBase.SupportedModes[prog_args.mode] + prog_options['irq_cpu_mask'] = PerfTunerBase.irqs_cpu_mask_for_mode(mode, prog_args.cpu_mask) + + if prog_args.nics: + prog_options['nic'] = list(set(prog_args.nics)) + + if prog_args.tune_clock: + prog_options['tune_clock'] = prog_args.tune_clock + + if prog_args.tune: + prog_options['tune'] = list(set(prog_args.tune)) + + if prog_args.cpu_mask: + prog_options['cpu_mask'] = prog_args.cpu_mask + + if prog_args.irq_cpu_mask: + prog_options['irq_cpu_mask'] = prog_args.irq_cpu_mask + + if prog_args.dirs: + prog_options['dir'] = list(set(prog_args.dirs)) + + if prog_args.devs: + prog_options['dev'] = list(set(prog_args.devs)) + + if prog_args.set_write_back is not None: + prog_options['write_back_cache'] = prog_args.set_write_back + + if prog_args.enable_arfs is not None: + prog_options['arfs'] = prog_args.enable_arfs + + if prog_args.num_rx_queues is not None: + prog_options['num_rx_queues'] = f"{prog_args.num_rx_queues}" + + prog_options['irq_core_auto_detection_ratio'] = prog_args.cores_per_irq_core + + perftune_print(yaml.dump(prog_options, default_flow_style=False)) +################################################################################ + +args = argp.parse_args() + +# Sanity check +args.set_write_back = parse_tri_state_arg(args.set_write_back, "--write-back-cache/write_back_cache") +args.enable_arfs = parse_tri_state_arg(args.enable_arfs, "--arfs/arfs") + +dry_run_mode = args.dry_run +parse_options_file(args) + +# if nothing needs to be configured - quit +if not args.tune: + sys.exit("ERROR: At least one tune mode MUST be given.") + +# The must be either 'mode' or an explicit 'irq_cpu_mask' given - not both +if args.mode and args.irq_cpu_mask: + sys.exit("ERROR: Provide either tune mode or IRQs CPU mask - not both.") + +# Sanity check +if args.cores_per_irq_core < PerfTunerBase.min_cores_per_irq_core(): + sys.exit(f"ERROR: irq_core_auto_detection_ratio value must be greater or equal than " + f"{PerfTunerBase.min_cores_per_irq_core()}") + +# set default values ##################### +if not args.nics: + args.nics = ['eth0'] + +if not args.cpu_mask: + args.cpu_mask = run_hwloc_calc(['all']) +########################################## + +# Sanity: irq_cpu_mask should be a subset of cpu_mask +if args.irq_cpu_mask and run_hwloc_calc([args.cpu_mask]) != run_hwloc_calc([args.cpu_mask, args.irq_cpu_mask]): + sys.exit("ERROR: IRQ CPU mask({}) must be a subset of CPU mask({})".format(args.irq_cpu_mask, args.cpu_mask)) + +if args.dump_options_file: + dump_config(args) + sys.exit(0) + +try: + tuners = [] + + if TuneModes.disks.name in args.tune: + tuners.append(DiskPerfTuner(args)) + + if TuneModes.net.name in args.tune: + tuners.append(NetPerfTuner(args)) + + if TuneModes.system.name in args.tune: + tuners.append(SystemPerfTuner(args)) + + if args.get_cpu_mask or args.get_cpu_mask_quiet: + # Print the compute mask from the first tuner - it's going to be the same in all of them + perftune_print(tuners[0].compute_cpu_mask) + elif args.get_irq_cpu_mask: + perftune_print(tuners[0].irqs_cpu_mask) + else: + # Tune the system + restart_irqbalance(itertools.chain.from_iterable([ tuner.irqs for tuner in tuners ])) + + for tuner in tuners: + tuner.tune() +except PerfTunerBase.CPUMaskIsZeroException as e: + # Print a zero CPU set if --get-cpu-mask-quiet was requested. + if args.get_cpu_mask_quiet: + perftune_print("0x0") + else: + sys.exit("ERROR: {}. Your system can't be tuned until the issue is fixed.".format(e)) +except Exception as e: + sys.exit("ERROR: {}. Your system can't be tuned until the issue is fixed.".format(e)) + diff --git a/src/seastar/scripts/perftune.yaml b/src/seastar/scripts/perftune.yaml new file mode 100644 index 000000000..945f23e62 --- /dev/null +++ b/src/seastar/scripts/perftune.yaml @@ -0,0 +1,33 @@ +# Mode is one of the following values: +# - 'mq' +# - 'sq' +# - 'sq_split' +#mode: 'sq_split' + +# Name of the NIC to tune, e.g. eth7. +# By default would use 'eth0'. +#nic: eth7 + +# If 'true' the script will the CPU mask to be used for compute. +#get_cpu_mask: true + +# Define what to tune: could be any combination of the values from {'net', 'disks'} set. +#tune: +# - net +# - disks + +# Mask of cores to use, by default use all available cores. +#cpu_mask: '0x00f,,,0x0,,0x00f' + +# Set of directories to optimize. +#dir: +# - /root +# - /home + +# Set of disk devices to optimize +#dev: +# - /dev/sda2 +# - /dev/md0 + +# write_back_cache: false + diff --git a/src/seastar/scripts/posix_net_conf.sh b/src/seastar/scripts/posix_net_conf.sh new file mode 100755 index 000000000..d3fa52d32 --- /dev/null +++ b/src/seastar/scripts/posix_net_conf.sh @@ -0,0 +1,81 @@ +#!/bin/bash +# ! +# ! Usage: posix_net_conf.sh [iface name, eth0 by default] [-mq|-sq] [--cpu-mask] [-h|--help] [--use-cpu-mask <mask>] +# ! +# ! Ban NIC IRQs from being moved by irqbalance. +# ! +# ! -sq - set all IRQs of a given NIC to CPU0 and configure RPS +# ! to spreads NAPIs' handling between other CPUs. +# ! +# ! -mq - distribute NIC's IRQs among all CPUs instead of binding +# ! them all to CPU0. In this mode RPS is always enabled to +# ! spreads NAPIs' handling between all CPUs. +# ! +# ! --options-file <YAML file> - YAML file with perftune.py options +# ! +# ! If there isn't any mode given script will use a default mode: +# ! - If number of physical CPU cores per Rx HW queue is greater than 4 - use the '-sq' mode. +# ! - Otherwise use the '-mq' mode. +# ! +# ! --use-cpu-mask <mask> - mask of cores to use, by default use all available cores +# ! +# ! --cpu-mask - Print out RPS CPU assignments. On MQ NIC, just print all cpus. +# ! +# ! -h|--help - print this help information +# ! +# ! Enable XPS, increase the default values of somaxconn and tcp_max_syn_backlog. +# ! + +usage() +{ + cat $0 | grep ^"# !" | cut -d"!" -f2- +} + +parse_args() +{ + local i + local arg + + until [ -z "$1" ] + do + arg=$1 + case "$arg" in + "-mq") + MQ_MODE="--mode mq" + ;; + "-sq") + MQ_MODE="--mode sq" + ;; + "--cpu-mask") + CPU_MASK="--get-cpu-mask" + ;; + "--use-cpu-mask") + CPU_FILTER_MASK="--cpu-mask $2" + shift + ;; + "--options-file") + OPTIONS_FILE="--options-file $2" + shift + ;; + "-h"|"--help") + usage + exit 0 + ;; + *) + IFACE=$arg + ;; + esac + shift + done +} + +IFACE="eth0" +MQ_MODE="" +CPU_FILTER_MASK="" +CPU_MASK="" +MY_DIR=`dirname $0` +OPTIONS_FILE="" + +parse_args $@ + +$MY_DIR/perftune.py --nic $IFACE $MQ_MODE $CPU_FILTER_MASK $CPU_MASK $OPTIONS_FILE --tune net diff --git a/src/seastar/scripts/run_with_dpdk.sh b/src/seastar/scripts/run_with_dpdk.sh new file mode 100755 index 000000000..e00fbc29f --- /dev/null +++ b/src/seastar/scripts/run_with_dpdk.sh @@ -0,0 +1,98 @@ +#!/bin/bash +# ! +# ! Usage: ./prepare_dpdk_env.sh <NIC to use> <number of huge pages per NUMA Node> <command to execute> [command parameters] +# ! +# ! Prepares the DPDK environment (binds a given NIC to UIO, allocates the required +# ! number of huge pages) and executes the given command in it. +# ! After the command terminates the original environment is restored apart from +# ! huge pages, that remain allocated. +# ! + +usage() +{ + cat $0 | grep ^"# !" | cut -d"!" -f2- +} + +# +# check_stat_and_exit <error message> +# +check_stat_and_exit() +{ + if [[ $? -ne 0 ]]; then + echo $@ + exit 1 + fi +} + +rollback() +{ + echo "Binding $NIC($BDF) back to $DRIVER..." + $SCRIPTS_DIR/dpdk_nic_bind.py -u $BDF + $SCRIPTS_DIR/dpdk_nic_bind.py -b $DRIVER $BDF +} + +check_stat_and_rollback() +{ + if [[ $? -ne 0 ]]; then + echo $@ + rollback + exit 1 + fi +} + +# Check number of parameters +if [[ $# -lt 3 ]]; then + usage + exit 1 +fi + +NIC=$1 +shift +NUM_HUGE_PAGES_PER_NODE=$1 +shift +SCRIPTS_DIR=`dirname $0` + + +ifconfig $NIC down +check_stat_and_exit "Failed to shut down $NIC. Is $NIC present? Are your permissions sufficient?" + +DRIVER=`ethtool -i $NIC | grep driver | cut -d":" -f2- | tr -d ' '` +BDF=`ethtool -i $NIC | grep bus-info | cut -d":" -f2- | tr -d ' '` + +# command to execute +CMD=$@ + +echo "Binding $NIC($BDF) to uio_pci_generic..." +$SCRIPTS_DIR/dpdk_nic_bind.py -u $BDF +check_stat_and_exit +$SCRIPTS_DIR/dpdk_nic_bind.py -b uio_pci_generic $BDF +check_stat_and_rollback + +echo "Allocating $NUM_HUGE_PAGES_PER_NODE 2MB huge pages on each NUMA Node:" +for d in /sys/devices/system/node/node? ; do + echo $NUM_HUGE_PAGES_PER_NODE > $d/hugepages/hugepages-2048kB/nr_hugepages + check_stat_and_rollback + cur_node=`basename $d` + echo "...$cur_node done..." +done + +mkdir -p /mnt/huge +check_stat_and_rollback + +grep -s '/mnt/huge' /proc/mounts > /dev/null +if [[ $? -ne 0 ]] ; then + echo "Mounting hugetlbfs at /mnt/huge..." + mount -t hugetlbfs nodev /mnt/huge + check_stat_and_rollback +fi + +# Run scylla +echo "Running: $CMD" +$CMD +ret=$? + +# Revert the NIC binding +rollback + +exit $ret + diff --git a/src/seastar/scripts/seastar-addr2line b/src/seastar/scripts/seastar-addr2line new file mode 100755 index 000000000..a59312726 --- /dev/null +++ b/src/seastar/scripts/seastar-addr2line @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Copyright (C) 2017 ScyllaDB + +import argparse +import collections +import re +import sys +import subprocess +from enum import Enum + +from addr2line import BacktraceResolver + +class StdinBacktraceIterator(object): + """ + Read stdin char-by-char and stop when when user pressed Ctrl+D or the + Enter twice. Altough reading char-by-char is slow this won't be a + problem here as backtraces shouldn't be huge. + """ + def __iter__(self): + linefeeds = 0 + lines = [] + line = [] + + while True: + char = sys.stdin.read(1) + + if char == '\n': + linefeeds += 1 + + if len(line) > 0: + lines.append(''.join(line)) + line = [] + else: + line.append(char) + linefeeds = 0 + + if char == '' or linefeeds > 1: + break + + return iter(lines) + + +description='Massage and pass addresses to the real addr2line for symbol lookup.' +epilog=''' +There are three operational modes: + 1) If -f is specified input will be read from FILE + 2) If -f is omitted and there are ADDRESS args they will be read as input + 3) If -f is omitted and there are no ADDRESS args input will be read from stdin +''' + +cmdline_parser = argparse.ArgumentParser( + description=description, + epilog=epilog, + formatter_class=argparse.RawDescriptionHelpFormatter, +) + +cmdline_parser.add_argument( + '-e', + '--executable', + type=str, + required=True, + metavar='EXECUTABLE', + dest='executable', + help='The executable where the addresses originate from') + +cmdline_parser.add_argument( + '-f', + '--file', + type=str, + required=False, + metavar='FILE', + dest='file', + help='The file containing the addresses') + +cmdline_parser.add_argument( + '-b', + '--before', + type=int, + metavar='BEFORE', + default=1, + help='Non-backtrace lines to print before resolved backtraces for context.' + ' Set to 0 to print only resolved backtraces.' + ' Set to -1 to print all non-backtrace lines. Default is 1.') + +cmdline_parser.add_argument( + '-m', + '--match', + type=str, + metavar='MATCH', + help='Only resolve backtraces whose non-backtrace lines match the regular-expression.' + ' The amount of non-backtrace lines considered can be set with --before.' + ' By default no matching is performed.') + +cmdline_parser.add_argument( + '-a', + '--addr2line', + type=str, + metavar='CMD_PATH', + default='addr2line', + help='The path or name of the addr2line command, which should behave as and ' + 'accept the same options as binutils addr2line or llvm-addr2line.') + +cmdline_parser.add_argument( + '-v', + '--verbose', + action='store_true', + default=False, + help='Make resolved backtraces verbose, prepend to each line the module' + ' it originates from, as well as the address being resolved') + +cmdline_parser.add_argument( + '-t', + '--test', + action='store_true', + default=False, + help='Self-test') + +cmdline_parser.add_argument( + 'addresses', + type=str, + metavar='ADDRESS', + nargs='*', + help='Addresses to parse') + +args = cmdline_parser.parse_args() + +if args.test: + data = [ + ('---', {'type': BacktraceResolver.BacktraceParser.Type.SEPARATOR}), + + ('0x12f34', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': None, 'addr': '0x12f34'}]}), + ('0xa1234 0xb4567', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': None, 'addr': '0xa1234'},{'path': None, 'addr': '0xb4567'}]}), + (' 0xa1234 /my/path+0xb4567', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': None, 'addr': '0xa1234'},{'path': '/my/path', 'addr': '0xb4567'}]}), + ('/my/path+0x12f34', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': '/my/path', 'addr': '0x12f34'}]}), + (' /my/path+0x12f34', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': '/my/path', 'addr': '0x12f34'}]}), + + ('Some prefix 0x12f34', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': None, 'addr': '0x12f34'}]}), + ('Some prefix /my/path+0x12f34', {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': '/my/path', 'addr': '0x12f34'}]}), + + ('Reactor stalled on shard 1. Backtrace: 0x12f34', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'Reactor stalled on shard 1. Backtrace:', + 'addresses': [{'path': None, 'addr': '0x12f34'}]}), + ('Reactor stalled on shard 1. Backtrace: 0xa1234 0xb5678', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'Reactor stalled on shard 1. Backtrace:', + 'addresses': [{'path': None, 'addr': '0xa1234'}, {'path': None, 'addr': '0xb5678'}]}), + ('Reactor stalled on shard 1. Backtrace: /my/path+0xabcd', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'Reactor stalled on shard 1. Backtrace:', + 'addresses': [{'path': '/my/path', 'addr': '0xabcd'}]}), + + ('Expected partition_end(), but got end of stream, at 0xa1234 0xb5678 /my/path+0xabcd', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'Expected partition_end(), but got end of stream, at', + 'addresses': [{'path': None, 'addr': '0xa1234'}, {'path': None, 'addr': '0xb5678'}, {'path': '/my/path', 'addr': '0xabcd'}]}), + + ('==16118==ERROR: AddressSanitizer: heap-use-after-free on address 0x60700019c710 at pc 0x000014d24643 bp 0x7ffc51f72220 sp 0x7ffc51f72218', None), + ('READ of size 8 at 0x60700019c710 thread T0', None), + ('#0 0x14d24642 (/jenkins/workspace/scylla-enterprise/dtest-debug/scylla/.ccm/scylla-repository/1a5173bd45d01697d98ba2a7645f5d86afb2d0be/scylla/libexec/scylla+0x14d24642)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, + 'addresses': [{'path': '/jenkins/workspace/scylla-enterprise/dtest-debug/scylla/.ccm/scylla-repository/1a5173bd45d01697d98ba2a7645f5d86afb2d0be/scylla/libexec/scylla', 'addr': '0x14d24642'}]}), + (' #1 0xd8d910f (/home/myhome/.dtest/dtest-84j9064d/test/node1/bin/scylla+0xd8d910f) (BuildId: 05a1d3d58d2b07e526decdad717e71a4590be2e0)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, + 'addresses': [{'path': '/home/myhome/.dtest/dtest-84j9064d/test/node1/bin/scylla', 'addr': '0xd8d910f'}]}), + + ('Apr 28 11:42:58 ip-172-31-2-154.ec2.internal scylla[10612]: Reactor stalled for 260 ms on shard 20.', None), + ('Apr 28 11:42:58 ip-172-31-2-154.ec2.internal scylla[10612]: Backtrace:', None), + ('Apr 28 11:42:58 ip-172-31-2-154.ec2.internal scylla[10612]: 0x0000000003163dc2', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'Apr 28 11:42:58 ip-172-31-2-154.ec2.internal scylla[10612]:', + 'addresses': [{'path': None, 'addr': '0x0000000003163dc2'}]}), + + ('seastar::internal::backtraced<std::runtime_error> (throw_with_backtrace_exception_logging Backtrace: 0x42bc95 /lib64/libc.so.6+0x281e1 0x412cfd)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'seastar::internal::backtraced<std::runtime_error> (throw_with_backtrace_exception_logging Backtrace:', + 'addresses': [{'path': None, 'addr': '0x42bc95'}, {'path': '/lib64/libc.so.6', 'addr': '0x281e1'}, {'path': None, 'addr': '0x412cfd'}]}), + ('seastar::nested_exception: seastar::internal::backtraced<std::runtime_error> (inner Backtrace: 0x42bc95 /lib64/libc.so.6+0x281e1 0x412cfd) (while cleaning up after unknown_obj)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': 'seastar::nested_exception: seastar::internal::backtraced<std::runtime_error> (inner Backtrace:', + 'addresses': [{'path': None, 'addr': '0x42bc95'}, {'path': '/lib64/libc.so.6', 'addr': '0x281e1'}, {'path': None, 'addr': '0x412cfd'}]}), + ('seastar::nested_exception: seastar::internal::backtraced<std::runtime_error> (inner Backtrace: 0x42bc95 /lib64/libc.so.6+0x281e1 0x412cfd) ' + '(while cleaning up after seastar::internal::backtraced<std::runtime_error> (outer Backtrace: 0x1234 /lib64/libc.so.6+0x5678 0xabcd))', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, + 'prefix': 'seastar::nested_exception: seastar::internal::backtraced<std::runtime_error> (inner Backtrace: 0x42bc95 /lib64/libc.so.6+0x281e1 0x412cfd)' + ' (while cleaning up after seastar::internal::backtraced<std::runtime_error> (outer Backtrace:', + 'addresses': [{'path': None, 'addr': '0x1234'}, {'path': '/lib64/libc.so.6', 'addr': '0x5678'}, {'path': None, 'addr': '0xabcd'}]}), + + ("2022-04-15T06:19:24+00:00 gemini-1tb-10h-master-db-node-c0c7fc43-4 ! INFO | " + "scylla: sstables/consumer.hh:610: future<> data_consumer::continuous_data_consumer" + "<sstables::index_consume_entry_context<sstables::index_consumer>>::fast_forward_to" + "(size_t, size_t) [StateProcessor = sstables::index_consume_entry_context" + "<sstables::index_consumer>]: Assertion `begin >= _stream_position.position' failed.", None), + ('2022-04-15T06:19:24+00:00 gemini-1tb-10h-master-db-node-c0c7fc43-4 ! INFO | Backtrace:', None), + ('2022-04-15T06:19:24+00:00 gemini-1tb-10h-master-db-node-c0c7fc43-4 ! INFO | 0x199d312', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, + 'prefix': None, + 'addresses': [{'path': None, 'addr': '0x199d312'}]}), + ('2022-04-15T06:19:24+00:00 gemini-1tb-10h-master-db-node-c0c7fc43-4 ! INFO | (throw_with_backtrace_exception_logging Backtrace: 0x42bc95 /lib64/libc.so.6+0x281e1 0x412cfd)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, + 'prefix': '2022-04-15T06:19:24+00:00 gemini-1tb-10h-master-db-node-c0c7fc43-4 ! INFO | (throw_with_backtrace_exception_logging Backtrace:', + 'addresses': [{'path': None, 'addr': '0x42bc95'}, {'path': '/lib64/libc.so.6', 'addr': '0x281e1'}, {'path': None, 'addr': '0x412cfd'}]}), + + ('[2022-04-19T23:09:28.311Z] Segmentation fault on shard 1.', None), + ('[2022-04-19T23:09:28.311Z] Backtrace:', None), + ('[2022-04-19T23:09:28.311Z] 0x461bbb8', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': None, 'addr': '0x461bbb8'}]}), + ('[2022-04-19T23:09:28.311Z] /lib64/libpthread.so.0+0x92a4', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': '/lib64/libpthread.so.0', 'addr': '0x92a4'}]}), + + ('#0 0x19c01681 (/path/to/scylla+0xdeadbeef)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': '/path/to/scylla', 'addr': '0x19c01681'}]}), + ('#1 0x00007fd2dab4f950 abort (libc.so.6 + 0x26950)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': 'libc.so.6', 'addr': '0x00007fd2dab4f950'}]}), + ('#2 0x00000000015c4cd3 n/a (/path/to/scylla + 0x15c4cd3)', + {'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, 'prefix': None, 'addresses': [{'path': '/path/to/scylla', 'addr': '0x00000000015c4cd3'}]}), + + ('kernel callstack: ', None), + ('kernel callstack: 0xffffffffffffff80', + { + 'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, + 'prefix': 'kernel callstack: ', + 'addresses': [{'path': '<kernel>', 'addr': '0xffffffffffffff80'}] + } + ), + ('kernel callstack: 0xffffffffffffff80 0xffffffffaf15ccca', + { + 'type': BacktraceResolver.BacktraceParser.Type.ADDRESS, + 'prefix': 'kernel callstack: ', + 'addresses': [{'path': '<kernel>', 'addr': '0xffffffffffffff80'}, {'path': '<kernel>', 'addr': '0xffffffffaf15ccca'}] + } + ) + ] + parser = BacktraceResolver.BacktraceParser() + for line, expected in data: + res = parser(line.strip() + '\n') + assert res == expected, f"{line}:\nExpected {expected}\nBut got {res}" + if args.verbose: + print(f'All {len(data)} tests passed successfully.') + exit(0) + +if args.addresses and args.file: + print("Cannot use both -f and ADDRESS") + cmdline_parser.print_help() + + +if args.file: + lines = open(args.file, 'r') +elif args.addresses: + lines = args.addresses +else: + if sys.stdin.isatty(): + lines = StdinBacktraceIterator() + else: + lines = sys.stdin + +with BacktraceResolver(executable=args.executable, before_lines=args.before, context_re=args.match, + verbose=args.verbose, cmd_path=args.addr2line) as resolve: + p = re.compile(r'\W+') + for line in lines: + resolve(line.strip() + '\n') diff --git a/src/seastar/scripts/seastar-cpu-map.sh b/src/seastar/scripts/seastar-cpu-map.sh new file mode 100755 index 000000000..e6cc41efa --- /dev/null +++ b/src/seastar/scripts/seastar-cpu-map.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# ! +# ! Usage: ./seastar-cpu-map.sh -p <process_PID> -n <process_Name> -s (optional) <shard> +# ! +# ! List CPU affinity for a particular running process +# ! providing a map of threads -> shard, for any seastar apps. +# ! Ex.: ./seastar-cpu-map.sh -n scylla +# ! ./seastar-cpu-map.sh -n scylla -s 0 +# ! ./seastar-cpu-map.sh -p 34 +# ! ./seastar-cpu-map.sh -p 32 -s 12 + +usage() { + cat $0 | grep ^"# !" | cut -d"!" -f2- +} + +while getopts 'p:n:s:' option; do + case "$option" in + p) PID=$OPTARG + ;; + n) PID=`pgrep --newest --exact $OPTARG` + ;; + s) SHARD=$OPTARG + ;; + :) printf "missing argument for -%s\n" "$OPTARG" >&2 + usage >&2 + exit 1 + ;; + \?) printf "illegal option: -%s\n" "$OPTARG" >&2 + usage >&2 + exit 1 + ;; + esac +done + +if [ $# -eq 0 ]; then usage >&2; exit 1; fi + +if [ -e "/proc/$PID/task" ]; then + # get list of threads for given PID + THREADS=`ls /proc/$PID/task` + for i in $THREADS; do + # get shards from threads + # there were three options here to get the shard number: + # reactor-xx, syscall-xx and timer-xx + # syscall was preferred because reactor as a special case (reactor-0 is called scylla) + SYSCALL=`grep syscall /proc/$i/comm | cut -d"-" -f2` + if [ -n "$SYSCALL" ] && [ "$SYSCALL" = "$SHARD" ]; then + echo -e "shard: $SYSCALL, cpu:$(taskset -c -p $i | cut -d":" -f2)" + elif [ -n "$SYSCALL" ] && [ -z "$SHARD" ]; then + echo -e "shard: $SYSCALL, cpu:$(taskset -c -p $i | cut -d":" -f2)" + fi + done +else + echo "Process does not exist" +fi diff --git a/src/seastar/scripts/seastar-json2code.py b/src/seastar/scripts/seastar-json2code.py new file mode 100755 index 000000000..b19df4fa0 --- /dev/null +++ b/src/seastar/scripts/seastar-json2code.py @@ -0,0 +1,578 @@ +#!/usr/bin/env python3 + +# C++ Code generation utility from Swagger definitions. +# This utility support Both the swagger 1.2 format +# https://github.com/OAI/OpenAPI-Specification/blob/master/versions/1.2.md +# And the 2.0 format +# https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md +# +# Swagger 2.0 is not only different in its structure (apis have moved, and +# models are now under definitions) It also moved from multiple file structure +# to a single file. +# To keep the multiple file support, each group of APIs will be placed in a single file +# Each group can have a .def.json file with its definitions (What used to be models) +# Because the APIs in definitions are snippets, they are not legal json objects +# and need to be formated as such so that a json parser would work. + +import json +import sys +import re +import glob +import argparse +import os +from string import Template + +parser = argparse.ArgumentParser(description="""Generate C++ class for json +handling from swagger definition""") + +parser.add_argument('--outdir', help='the output directory', default='autogen') +parser.add_argument('-o', help='Output file', default='') +parser.add_argument('-f', help='input file', default='api-java.json') +parser.add_argument('-ns', help="""namespace when set struct will be created +under the namespace""", default='') +parser.add_argument('-jsoninc', help='relative path to the jsaon include', + default='json/') +parser.add_argument('-jsonns', help='set the json namespace', default='json') +parser.add_argument('-indir', help="""when set all json file in the given +directory will be parsed, do not use with -f""", default='') +parser.add_argument('-debug', help='debug level 0 -quite,1-error,2-verbose', + default='1', type=int) +parser.add_argument('-combined', help='set the name of the combined file', + default='autogen/pathautogen.ee') +parser.add_argument('--create-cc', dest='create_cc', action='store_true', default=False, + help='Put global variables in a .cc file') +config = parser.parse_args() + + +valid_vars = {'string': 'sstring', 'int': 'int', 'double': 'double', + 'float': 'float', 'long': 'long', 'boolean': 'bool', 'char': 'char', + 'datetime': 'json::date_time'} + +current_file = '' + +spacing = " " +def getitem(d, key, name): + if key in d: + return d[key] + else: + raise Exception("'" + key + "' not found in " + name) + +def fprint(f, *args): + for arg in args: + f.write(arg) + +def fprintln(f, *args): + for arg in args: + f.write(arg) + f.write('\n') + + +def open_namespace(f, ns=config.ns): + fprintln(f, "namespace ", ns , ' {\n') + + +def close_namespace(f): + fprintln(f, '}') + + +def add_include(f, includes): + for include in includes: + fprintln(f, '#include ', include) + fprintln(f, "") + +def trace_verbose(*params): + if config.debug > 1: + print(''.join(params)) + + +def trace_err(*params): + if config.debug > 0: + print(current_file + ':' + ''.join(params)) + + +def valid_type(param): + if param in valid_vars: + return valid_vars[param] + trace_err("Type [", param, "] not defined") + return param + + +def type_change(param, member): + if param == "array": + if "items" not in member: + trace_err("array without item declaration in ", param) + return "" + item = member["items"] + if "type" in item: + t = item["type"] + elif "$ref" in item: + t = item["$ref"] + else: + trace_err("array items with no type or ref declaration ", param) + return "" + return "json_list< " + valid_type(t) + " >" + return "json_element< " + valid_type(param) + " >" + + + +def print_ind_comment(f, ind, *params): + fprintln(f, ind, "/**") + for s in params: + fprintln(f, ind, " * ", s) + fprintln(f, ind, " */") + +def print_comment(f, *params): + print_ind_comment(f, spacing, *params) + +def print_copyrights(f): + fprintln(f, "/*") + fprintln(f, "* Copyright (C) 2014 Cloudius Systems, Ltd.") + fprintln(f, "*") + fprintln(f, "* This work is open source software, licensed under the", + " terms of the") + fprintln(f, "* BSD license as described in the LICENSE f in the top-", + "level directory.") + fprintln(f, "*") + fprintln(f, "* This is an Auto-Generated-code ") + fprintln(f, "* Changes you do in this file will be erased on next", + " code generation") + fprintln(f, "*/\n") + + +def print_h_file_headers(f, name): + print_copyrights(f) + fprintln(f, "#ifndef __JSON_AUTO_GENERATED_" + name) + fprintln(f, "#define __JSON_AUTO_GENERATED_" + name + "\n") + + +def clean_param(param): + match = re.match(r"^\{\s*([^\}]+)\s*}", param) + if match: + return [match.group(1), False] + return [param, True] + + +def get_parameter_by_name(obj, name): + for p in obj["parameters"]: + if p["name"] == name: + return p + trace_err ("No Parameter declaration found for ", name) + + +def clear_path_ending(path): + if not path or path[-1] != '/': + return path + return path[0:-1] + +# check if a parameter is query required. +# It will return true if the required flag is set +# and if it is a query parameter, both swagger 1.2 'paramType' and swagger 2.0 'in' attributes +# are supported +def is_required_query_param(param): + return "required" in param and param["required"] and ("paramType" in param and param["paramType"] == "query" or "in" in param and param["in"] == "query") + +def add_path(f, path, details): + if "summary" in details: + print_comment(f, details["summary"]) + param_starts = path.find("{") + if param_starts >= 0: + path_reminder = path[param_starts:] + vals = path.split("/") + vals.reverse() + fprintln(f, spacing, 'path_description::add_path("', clear_path_ending(vals.pop()), + '",', details["method"], ',"', details["nickname"], '")') + while vals: + param, is_url = clean_param(vals.pop()) + if is_url: + fprintln(f, spacing, ' ->pushurl("', param, '")') + else: + param_type = get_parameter_by_name(details, param) + if ("allowMultiple" in param_type and + param_type["allowMultiple"] == True): + fprintln(f, spacing, ' ->pushparam("', param, '",true)') + else: + fprintln(f, spacing, ' ->pushparam("', param, '")') + else: + fprintln(f, spacing, 'path_description::add_path("', clear_path_ending(path), '",', + details["method"], ',"', details["nickname"], '")') + if "parameters" in details: + for param in details["parameters"]: + if is_required_query_param(param): + fprintln(f, spacing, ' ->pushmandatory_param("', param["name"], '")') + fprintln(f, spacing, ";") + + +def get_base_name(param): + return os.path.basename(param) + + +def is_model_valid(name, model): + if name in valid_vars: + return "" + properties = getitem(model[name], "properties", name) + for var in properties: + type = getitem(properties[var], "type", name + ":" + var) + if type == "array": + items = getitem(properties[var], "items", name + ":" + var); + try : + type = getitem(items, "type", name + ":" + var + ":items") + except Exception as e: + try: + type = getitem(items, "$ref", name + ":" + var + ":items") + except: + raise e; + if type not in valid_vars: + if type not in model: + raise Exception("Unknown type '" + type + "' in Model '" + name + "'") + return type + valid_vars[name] = name + return "" + +def resolve_model_order(data): + res = [] + models = set() + for model_name in data: + visited = set(model_name) + missing = is_model_valid(model_name, data) + resolved = missing == '' + if not resolved: + stack = [model_name] + while not resolved: + if missing in visited: + raise Exception("Cyclic dependency found: " + missing) + missing_depends = is_model_valid(missing, data) + if missing_depends == '': + if missing not in models: + res.append(missing) + models.add(missing) + resolved = len(stack) == 0 + if not resolved: + missing = stack.pop() + else: + stack.append(missing) + missing = missing_depends + elif model_name not in models: + res.append(model_name) + models.add(model_name) + return res + +def create_enum_wrapper(model_name, name, values): + enum_name = model_name + "_" + name + res = " enum class " + enum_name + " {" + for enum_entry in values: + res = res + " " + enum_entry + ", " + res = res + "NUM_ITEMS};\n" + wrapper = name + "_wrapper" + res = res + Template(""" struct $wrapper : public json::jsonable { + $wrapper() = default; + virtual std::string to_json() const { + switch(v) { + """).substitute({'wrapper' : wrapper}) + for enum_entry in values: + res = res + " case " + enum_name + "::" + enum_entry + ": return \"\\\"" + enum_entry + "\\\"\";\n" + res = res + Template(""" default: return \"\\\"Unknown\\\"\"; + } + } + template<class T> + $wrapper (const T& _v) { + switch(_v) { + """).substitute({'wrapper' : wrapper}) + for enum_entry in values: + res = res + " case T::" + enum_entry + ": v = " + enum_name + "::" + enum_entry + "; break;\n" + res = res + Template(""" default: v = $enum_name::NUM_ITEMS; + } + } + template<class T> + operator T() const { + switch(v) { + """).substitute({'enum_name': enum_name}) + for enum_entry in values: + res = res + " case " + enum_name + "::" + enum_entry + ": return T::" + enum_entry + ";\n" + return res + Template(""" default: return T::$value; + } + } + typedef typename std::underlying_type<$enum_name>::type pos_type; + $wrapper& operator++() { + v = static_cast<$enum_name>(static_cast<pos_type>(v) + 1); + return *this; + } + $wrapper & operator++(int) { + return ++(*this); + } + bool operator==(const $wrapper& c) const { + return v == c.v; + } + bool operator!=(const $wrapper& c) const { + return v != c.v; + } + bool operator<=(const $wrapper& c) const { + return static_cast<pos_type>(v) <= static_cast<pos_type>(c.v); + } + static $wrapper begin() { + return $wrapper ($enum_name::$value); + } + static $wrapper end() { + return $wrapper ($enum_name::NUM_ITEMS); + } + static boost::integer_range<$wrapper> all_items() { + return boost::irange(begin(), end()); + } + $enum_name v; + }; + """).substitute({'enum_name': enum_name, 'wrapper' : wrapper, 'value':values[0]}) + +def to_operation(opr, data): + data["method"] = opr.upper() + data["nickname"] = data["operationId"] + return data + +def to_path(path, data): + data["operations"] = [to_operation(k, data[k]) for k in data] + data["path"] = path + + return data + +def create_h_file(data, hfile_name, api_name, init_method, base_api): + if config.o != '': + final_hfile_name = config.o + else: + final_hfile_name = config.outdir + "/" + hfile_name + hfile = open(final_hfile_name, "w") + + if config.create_cc: + ccfile = open(final_hfile_name.rstrip('.hh') + ".cc", "w") + add_include(ccfile, ['"{}"'.format(final_hfile_name)]) + open_namespace(ccfile, "seastar") + open_namespace(ccfile, "httpd") + open_namespace(ccfile, api_name) + else: + ccfile = hfile + print_h_file_headers(hfile, api_name) + add_include(hfile, ['<seastar/core/sstring.hh>', + '<seastar/json/json_elements.hh>', + '<seastar/http/json_path.hh>']) + + add_include(hfile, ['<iostream>', '<boost/range/irange.hpp>']) + open_namespace(hfile, "seastar") + open_namespace(hfile, "httpd") + open_namespace(hfile, api_name) + + if "models" in data: + models_order = resolve_model_order(data["models"]) + for model_name in models_order: + model = data["models"][model_name] + if 'description' in model: + print_ind_comment(hfile, "", model["description"]) + fprintln(hfile, "struct ", model_name, " : public json::json_base {") + member_init = '' + member_assignment = '' + member_copy = '' + for member_name in model["properties"]: + member = model["properties"][member_name] + if "description" in member: + print_comment(hfile, member["description"]) + if "enum" in member: + enum_name = model_name + "_" + member_name + fprintln(hfile, create_enum_wrapper(model_name, member_name, member["enum"])) + fprintln(hfile, " ", config.jsonns, "::json_element<", + member_name, "_wrapper> ", + member_name, ";\n") + else: + fprintln(hfile, " ", config.jsonns, "::", + type_change(member["type"], member), " ", + member_name, ";\n") + member_init += " add(&" + member_name + ',"' + member_init += member_name + '");\n' + member_assignment += " " + member_name + " = " + "e." + member_name + ";\n" + member_copy += " e." + member_name + " = " + member_name + ";\n" + fprintln(hfile, "void register_params() {") + fprintln(hfile, member_init) + fprintln(hfile, '}') + + fprintln(hfile, model_name, '() {') + fprintln(hfile, ' register_params();') + fprintln(hfile, '}') + fprintln(hfile, model_name, '(const ' + model_name + ' & e) {') + fprintln(hfile, ' register_params();') + fprintln(hfile, member_assignment) + fprintln(hfile, '}') + fprintln(hfile, "template<class T>") + fprintln(hfile, model_name, "& operator=(const ", "T& e) {") + fprintln(hfile, member_assignment) + fprintln(hfile, " return *this;") + fprintln(hfile, "}") + fprintln(hfile, model_name, "& operator=(const ", model_name, "& e) {") + fprintln(hfile, member_assignment) + fprintln(hfile, " return *this;") + fprintln(hfile, "}") + fprintln(hfile, "template<class T>") + fprintln(hfile, model_name, "& update(T& e) {") + fprintln(hfile, member_copy) + fprintln(hfile, " return *this;") + fprintln(hfile, "}") + fprintln(hfile, "};\n\n") + + # print_ind_comment(hfile, "", "Initialize the path") +# fprintln(hfile, init_method + "(const std::string& description);") + fprintln(hfile, 'static const sstring name = "', base_api, '";') + for item in data["apis"]: + path = item["path"] + if "operations" in item: + for oper in item["operations"]: + if "summary" in oper: + print_comment(hfile, oper["summary"]) + + param_starts = path.find("{") + base_url = path + vals = [] + if param_starts >= 0: + vals = path[param_starts:].split("/") + vals.reverse() + base_url = path[:param_starts] + + varname = getitem(oper, "nickname", oper) + if config.create_cc: + fprintln(hfile, 'extern const path_description ', varname, ';') + maybe_static = '' + else: + maybe_static = 'static ' + fprintln(ccfile, maybe_static, 'const path_description ', varname, '("', clear_path_ending(base_url), + '",', oper["method"], ',"', oper["nickname"], '",') + fprint(ccfile, '{') + first = True + while vals: + path_param, is_url = clean_param(vals.pop()) + if path_param == "": + continue + if first == True: + first = False + else: + fprint(ccfile, "\n,") + if is_url: + fprint(ccfile, '{', '"/', path_param , '", path_description::url_component_type::FIXED_STRING', '}') + else: + path_param_type = get_parameter_by_name(oper, path_param) + if ("allowMultiple" in path_param_type and + path_param_type["allowMultiple"] == True): + fprint(ccfile, '{', '"', path_param , '", path_description::url_component_type::PARAM_UNTIL_END_OF_PATH', '}') + else: + fprint(ccfile, '{', '"', path_param , '", path_description::url_component_type::PARAM', '}') + fprint(ccfile, '}') + fprint(ccfile, ',{') + first = True + enum_definitions = "" + if "enum" in oper: + enum_definitions = ("namespace ns_" + oper["nickname"] + " {\n" + + create_enum_wrapper(oper["nickname"], "return_type", oper["enum"]) + + "}\n") + funcs = "" + if "parameters" in oper: + for param in oper["parameters"]: + if is_required_query_param(param): + if first == True: + first = False + else: + fprint(ccfile, "\n,") + fprint(ccfile, '"', param["name"], '"') + if "enum" in param: + enum_definitions = enum_definitions + 'namespace ns_' + oper["nickname"] + '{\n' + enm = param["name"] + enum_definitions = enum_definitions + 'enum class ' + enm + ' {' + for val in param["enum"]: + enum_definitions = enum_definitions + val + ", " + enum_definitions = enum_definitions + 'NUM_ITEMS};\n' + enum_definitions = enum_definitions + enm + ' str2' + enm + '(const sstring& str);' + + funcs = funcs + enm + ' str2' + enm + '(const sstring& str) {\n' + funcs = funcs + ' static const sstring arr[] = {"' + '","'.join(param["enum"]) + '"};\n' + funcs = funcs + ' int i;\n' + funcs = funcs + ' for (i=0; i < ' + str(len(param["enum"])) + '; i++) {\n' + funcs = funcs + ' if (arr[i] == str) {return (' + enm + ')i;}\n}\n' + funcs = funcs + ' return (' + enm + ')i;\n' + funcs = funcs + '}\n' + + enum_definitions = enum_definitions + '}\n' + + fprintln(ccfile, '});') + fprintln(hfile, enum_definitions) + open_namespace(ccfile, 'ns_' + oper["nickname"]) + fprintln(ccfile, funcs) + close_namespace(ccfile) + + close_namespace(hfile) + close_namespace(hfile) + close_namespace(hfile) + if config.create_cc: + close_namespace(ccfile) + close_namespace(ccfile) + close_namespace(ccfile) + + hfile.write("#endif //__JSON_AUTO_GENERATED_HEADERS\n") + hfile.close() + +def remove_leading_comma(data): + return re.sub(r'^\s*,','', data) + +def format_as_json_object(data): + return "{" + remove_leading_comma(data) + "}" + +def check_for_models(data, param): + model_name = param.replace(".json", ".def.json") + if not os.path.isfile(model_name): + return + try: + with open(model_name) as myfile: + json_data = myfile.read() + def_data = json.loads(format_as_json_object(json_data)) + data["models"] = def_data + except Exception as e: + type, value, tb = sys.exc_info() + print("Bad formatted JSON definition file '" + model_name + "' error ", value.message) + sys.exit(-1) + +def set_apis(data): + return {"apis": [to_path(p, data[p]) for p in data]} + +def parse_file(param, combined): + global current_file + trace_verbose("parsing ", param, " file") + with open(param) as myfile: + json_data = myfile.read() + try: + data = json.loads(json_data) + except Exception as e: + try: + # the passed data is not a valid json, so maybe its a swagger 2.0 + # snippet, format it as json and try again + # set_apis and check_for_models will create an object with a similiar format + # to a swagger 1.2 so the code generation would work + data = set_apis(json.loads(format_as_json_object(json_data))) + check_for_models(data, param) + except: + # The problem is with the file, + # just report the error and exit. + type, value, tb = sys.exc_info() + print("Bad formatted JSON file '" + param + "' error ", value.message) + sys.exit(-1) + try: + base_file_name = get_base_name(param) + current_file = base_file_name + hfile_name = base_file_name + ".hh" + api_name = base_file_name.replace('.', '_') + base_api = base_file_name.replace('.json', '') + init_method = "void " + api_name + "_init_path" + trace_verbose("creating ", hfile_name) + if (combined): + fprintln(combined, '#include "', base_file_name, ".cc", '"') + create_h_file(data, hfile_name, api_name, init_method, base_api) + except: + type, value, tb = sys.exc_info() + print("Error while parsing JSON file '" + param + "' error ", value.message) + sys.exit(-1) + +if "indir" in config and config.indir != '': + combined = open(config.combined, "w") + for f in glob.glob(os.path.join(config.indir, "*.json")): + parse_file(f, combined) +else: + parse_file(config.f, None) diff --git a/src/seastar/scripts/stall-analyser.py b/src/seastar/scripts/stall-analyser.py new file mode 100755 index 000000000..d5a085506 --- /dev/null +++ b/src/seastar/scripts/stall-analyser.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python + +import argparse +import sys +import re + +import addr2line + +parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, + description='A reactor stall backtrace graph analyser.', + epilog=""" +stall-analyser helps analyze a series of reactor-stall backtraces using a graph. +Each node in the graph includes: + `addr` - a program address +Each link in the graph includes: + `total` - the total sum of stalls, in milliseconds + of all reactor stalls that pass via this caller/callee link. + `count` - number of backtraces going through the link. + +When printed, the graph is traversed in descending `total` order +to emphasize stall backtraces that are frequent and long. + +Each node in the printed output is preceded with [level#index pct%], +where `level` is the level of that node in the graph (0 are root nodes), +`index` is the index in the parent node's list of callers/callees, and +`pct` is the percantage of this link's `total` time relative to +its siblings. + +When given an executable, addresses are decoding using `addr2line` +""") +parser.add_argument('--address-threshold', default='0x100000000', + help='Skip common backtrace prefix terminated by one or more addresses greater or equal to the threshold (0=disabled)') +parser.add_argument('-e', '--executable', + help='Decode addresses to lines using given executable') +parser.add_argument('-f', '--full-function-names', action='store_const', const=True, default=False, + help="When demangling C++ function names, display all information, including the type of the function's parameters. Otherwise, they are omitted (see `c++filt(1) -p`).") +parser.add_argument('-w', '--width', type=int, default=None, + help='Smart trim of long lines to width characters (0=disabled)') +parser.add_argument('-d', '--direction', choices=['bottom-up', 'top-down'], default='bottom-up', + help='Print graph bottom-up (default, callees first) or top-down (callers first)') +parser.add_argument('-m', '--minimum', type=int, default=None, + help='Process only stalls lasting the given time, in milliseconds, or longer') +parser.add_argument('-b', '--branch-threshold', type=float, default=0.05, + help='Drop branches responsible for less than this threshold relative to the previous level, not global. (default 5%%)') +parser.add_argument('file', nargs='?', + help='File containing reactor stall backtraces. Read from stdin if missing.') + +args = parser.parse_args() + +resolver = addr2line.BacktraceResolver(executable=args.executable, concise=not args.full_function_names) if args.executable else None + +class Node: + def __init__(self, addr:str): + self.addr = addr + self.callers = {} + self.callees = {} + self.printed = False + + def __repr__(self): + return f"Node({self.addr})" + + class Link: + def __init__(self, node, t:int): + self.node = node + self.total = t + self.count = 1 + + def __eq__(self, other): + return self.total == other.total and self.count == other.count + + def __ne__(self, other): + return not (self == other) + + def __lt__(self, other): + return self.total < other.total or self.total == other.total and self.count < other.count + + def add(self, t:int): + self.total += t + self.count += 1 + + def link_caller(self, t:int, n): + if n.addr in self.callers: + link = self.callers[n.addr] + link.add(t) + n.callees[self.addr].add(t) + else: + self.callers[n.addr] = self.Link(n, t) + n.callees[self.addr] = self.Link(self, t) + return n + + def unlink_caller(self, addr:str): + link = self.callers.pop(addr) + link.node.callees.pop(self.addr) + + def link_callee(self, t:int, n): + if n.addr in self.callees: + link = self.callees[n.addr] + link.add(t) + n.callers[self.addr].add(t) + else: + self.callees[n.addr] = self.Link(n, t) + n.callers[self.addr] = self.Link(self, t) + return n + + def unlink_callee(self, addr:str): + link = self.callees.pop(addr) + link.node.callers.pop(self.addr) + + def sorted_links(self, links:list, descending=True): + return sorted([l for l in links if l.node.addr], reverse=descending) + + def sorted_callers(self, descending=True): + return self.sorted_links(self.callers.values(), descending) + + def sorted_callees(self, descending=True): + return self.sorted_links(self.callees.values(), descending) + +class Graph: + def __init__(self): + # Each node in the tree contains: + self.count = 0 + self.total = 0 + self.nodes = {} + self.tail = Node('') + self.head = Node('') + + def add(self, prev:Node, t:int, addr:str): + if addr in self.nodes: + n = self.nodes[addr] + else: + n = Node(addr) + self.nodes[addr] = n + if prev: + if prev.addr in self.head.callees: + self.head.unlink_callee(prev.addr) + prev.link_caller(t, n) + if addr in self.tail.callers: + self.tail.unlink_caller(addr) + elif not n.callees or addr in self.tail.callers: + self.tail.link_caller(t, n) + return n + + def add_head(self, t:int, n:Node): + self.head.link_callee(t, n) + + def smart_print(self, lines:str, width:int): + def _print(l:str, width:int): + if not width or len(l) <= width: + print(l) + return + i = l.rfind(" at ") + if i < 0: + print(l[:width]) + return + sfx = l[i:] + w = width - len(sfx) - 3 + if w > 0: + pfx = l[:w] + else: + pfx = "" + print(f"{pfx}...{sfx}") + for l in lines.splitlines(): + if l: + _print(l, width) + + def print_graph(self, direction:str): + top_down = (direction == 'top-down') + print(f""" +This graph is printed in {direction} order, where {'callers' if top_down else 'callees'} are printed first. +Use --direction={'bottom-up' if top_down else 'top-down'} to print {'callees' if top_down else 'callers'} first. + +[level#index/out_of pct%] below denotes: + level - nesting level in the graph + index - index of node among to its siblings + out_of - number of siblings + pct - percentage of total stall time of this call relative to its siblings +""") + varargs = vars(args) + clopts = "" + for k in varargs.keys(): + val = varargs[k] + opt = re.sub('_', '-', k) + if val is None: + continue + elif not isinstance(val, bool): + clopts += f" --{opt}={val}" + elif val: + clopts += f" --{opt}" + print(f"Command line options:{clopts}\n") + + def _prefix(prefix_list:list): + prefix = '' + for p in prefix_list: + prefix += p + return prefix + + def _recursive_print_graph(n:Node, total:int=0, count:int=0, level:int=-1, idx:int=0, out_of:int=0, rel:float=1.0, prefix_list=[], skip_stats=False): + nonlocal top_down + if level >= 0: + avg = round(total / count) if count else 0 + prefix = _prefix(prefix_list) + p = '+' if idx == 1 or idx == out_of else '|' + p += '+' + l = f"[{level}#{idx}/{out_of} {round(100*rel)}%]" + cont_indent = len(l) + 1 + if skip_stats: + l = f"{' ' * (len(l)-2)} -" + stats = '' + else: + stats = f" total={total} count={count} avg={avg}" + l = f"{prefix}{p}{l} addr={n.addr}{stats}" + p = "| " + if resolver: + lines = resolver.resolve_address(n.addr).splitlines() + if len(lines) == 1: + li = lines[0] + if li.startswith("??"): + l += f": {lines[0]}" + else: + l += f":\n{prefix}{p}{' '*cont_indent}{li.strip()}" + else: + l += ":\n" + if top_down: + lines.reverse() + for li in lines: + l += f"{prefix}{p}{' '*cont_indent}{li.strip()}\n" + width = args.width or 0 + self.smart_print(l, width) + if n.printed: + print(f"{prefix}-> continued at addr={n.addr} above") + return + n.printed = True + next = n.sorted_callees() if top_down else n.sorted_callers() + if not next: + return + link = next[0] + if level >= 0 and len(next) == 1 and link.total == total and link.count == count: + _recursive_print_graph(link.node, link.total, link.count, level, idx, out_of, rel, prefix_list, skip_stats=True) + else: + total = sum(link.total for link in next) + next_prefix_list = prefix_list + ["| " if idx < out_of else " "] if level >= 0 else [] + i = 1 + last_idx = len(next) + omitted_idx = 0 + omitted_total = 0 + omitted_count = 0 + for link in next: + rel = link.total / total + if rel < args.branch_threshold: + if not omitted_idx: + omitted_idx = i + omitted_total += link.total + omitted_count += link.count + else: + _recursive_print_graph(link.node, link.total, link.count, level + 1, i, last_idx, rel, next_prefix_list) + i += 1 + if omitted_idx: + prefix = _prefix(next_prefix_list) + p = '++' + rel = omitted_total / total + avg = round(omitted_total / omitted_count) if count else 0 + l = f"[{level+1}#{omitted_idx}/{last_idx} {round(100*rel)}%]" + print(f"{prefix}{p}{l} {last_idx - omitted_idx + 1} more branches total={omitted_total} count={omitted_count} avg={avg}") + + r = self.head if top_down else self.tail + _recursive_print_graph(r) + +graph = Graph() + +# process each backtrace and insert it to the tree +# +# The backtraces are assumed to be in bottom-up order, i.e. +# the first address indicates the innermost frame and the last +# address is in the outermost, in calling order. +# +# This helps identifying closely related reactor stalls +# where a code path that stalls may be called from multiple +# call sites. +def process_graph(t: int, trace: list[str]): + n = None + for addr in trace: + n = graph.add(n, t, addr) + graph.add_head(t, n) + +address_threshold = int(args.address_threshold, 0) +tally = {} + +def print_stats(tally:dict, tmin): + data = [] + total_time = 0 + total_count = 0 + processed_count = 0 + min_time = 1000000 + max_time = 0 + median = None + p95 = None + p99 = None + p999 = None + for t in sorted(tally.keys()): + count = tally[t] + data.append((t, count)) + total_time += t * count + if t < min_time: + min_time = t + if t > max_time: + max_time = t + total_count += count + if t >= tmin: + processed_count += count + running_count = 0 + for (t, count) in data: + running_count += count + if median is None and running_count >= total_count / 2: + median = t + elif p95 is None and running_count >= (total_count * 95) / 100: + p95 = t + elif p99 is None and running_count >= (total_count * 99) / 100: + p99 = t + elif p999 is None and running_count >= (total_count * 999) / 1000: + p999 = t + print(f"Processed {total_count} stalls lasting a total of {total_time} milliseconds.") + if tmin: + print(f"Of which, {processed_count} lasted {tmin} milliseconds or longer.") + avg_time = total_time / total_count if total_count else 0 + print(f"min={min_time} avg={avg_time:.1f} median={median} p95={p95} p99={p99} p999={p999} max={max_time}") + +input = open(args.file) if args.file else sys.stdin +count = 0 +comment = re.compile('^\s*#') +pattern = re.compile('Reactor stall') +for s in input: + if comment.search(s) or not pattern.search(s): + continue + count += 1 + trace = s.split() + for i in range(0, len(trace)): + if trace[i] == 'Reactor': + i += 3 + break + t = int(trace[i]) + tally[t] = tally.pop(t, 0) + 1 + trace = trace[i + 6:] + # The address_threshold typically indicates a library call + # and the backtrace up-to and including it are usually of + # no interest as they all contain the stall backtrace geneneration code, e.g.: + # seastar::internal::cpu_stall_detector::generate_trace + # void seastar::backtrace<seastar::backtrace_buffer::append_backtrace_oneline()::{lambda(seastar::frame)#1}>(seastar::backt> + # (inlined by) seastar::backtrace_buffer::append_backtrace_oneline() at ./build/release/seastar/./seastar/src/core/reactor.cc:771 + # (inlined by) seastar::print_with_backtrace(seastar::backtrace_buffer&, bool) at ./build/release/seastar/./seastar/src/core/reactor.cc> + # seastar::internal::cpu_stall_detector::generate_trace() at ./build/release/seastar/./seastar/src/core/reactor.cc:1257 + # seastar::internal::cpu_stall_detector::maybe_report() at ./build/release/seastar/./seastar/src/core/reactor.cc:1103 + # (inlined by) seastar::internal::cpu_stall_detector::on_signal() at ./build/release/seastar/./seastar/src/core/reactor.cc:1117 + # (inlined by) seastar::reactor::block_notifier(int) at ./build/release/seastar/./seastar/src/core/reactor.cc:1240 + # ?? ??:0 + if address_threshold: + for i in range(0, len(trace)): + if int(trace[i], 0) >= address_threshold: + while int(trace[i], 0) >= address_threshold: + i += 1 + trace = trace[i:] + break + tmin = args.minimum or 0 + if t >= tmin: + process_graph(t, trace) + +try: + print_stats(tally, tmin) + graph.print_graph(args.direction) +except BrokenPipeError: + pass diff --git a/src/seastar/scripts/tap.sh b/src/seastar/scripts/tap.sh new file mode 100644 index 000000000..a8cc6d3ab --- /dev/null +++ b/src/seastar/scripts/tap.sh @@ -0,0 +1,31 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +### Set up a tap device for seastar +tap=tap0 +bridge=virbr0 +user=`whoami` +sudo ip link del $tap +sudo ip tuntap add mode tap dev $tap user $user one_queue vnet_hdr +sudo ifconfig $tap up +sudo brctl addif $bridge $tap +sudo brctl stp $bridge off +sudo modprobe vhost-net +sudo chown $user.$user /dev/vhost-net +sudo brctl show $bridge +sudo ifconfig $bridge |