From 88837172f69eabc408ae3945d82e0270b8e07440 Mon Sep 17 00:00:00 2001 From: Benjamin Drung Date: Sat, 10 Jun 2023 10:55:33 +0200 Subject: Adding upstream version 2.2.1. Signed-off-by: Benjamin Drung --- staslib/.gitignore | 1 + staslib/__init__.py | 11 + staslib/avahi.py | 456 ++++++++++++++++++++++++++ staslib/conf.py | 703 +++++++++++++++++++++++++++++++++++++++++ staslib/ctrl.py | 850 +++++++++++++++++++++++++++++++++++++++++++++++++ staslib/defs.py | 51 +++ staslib/gutil.py | 418 ++++++++++++++++++++++++ staslib/iputil.py | 169 ++++++++++ staslib/log.py | 53 ++++ staslib/meson.build | 60 ++++ staslib/service.py | 878 +++++++++++++++++++++++++++++++++++++++++++++++++++ staslib/singleton.py | 23 ++ staslib/stacd.idl | 27 ++ staslib/stafd.idl | 49 +++ staslib/stas.py | 554 ++++++++++++++++++++++++++++++++ staslib/timeparse.py | 139 ++++++++ staslib/trid.py | 137 ++++++++ staslib/udev.py | 334 ++++++++++++++++++++ staslib/version.py | 64 ++++ 19 files changed, 4977 insertions(+) create mode 100644 staslib/.gitignore create mode 100644 staslib/__init__.py create mode 100644 staslib/avahi.py create mode 100644 staslib/conf.py create mode 100644 staslib/ctrl.py create mode 100644 staslib/defs.py create mode 100644 staslib/gutil.py create mode 100644 staslib/iputil.py create mode 100644 staslib/log.py create mode 100644 staslib/meson.build create mode 100644 staslib/service.py create mode 100644 staslib/singleton.py create mode 100644 staslib/stacd.idl create mode 100644 staslib/stafd.idl create mode 100644 staslib/stas.py create mode 100644 staslib/timeparse.py create mode 100644 staslib/trid.py create mode 100644 staslib/udev.py create mode 100644 staslib/version.py (limited to 'staslib') diff --git a/staslib/.gitignore b/staslib/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/staslib/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/staslib/__init__.py b/staslib/__init__.py new file mode 100644 index 0000000..27673d1 --- /dev/null +++ b/staslib/__init__.py @@ -0,0 +1,11 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''STorage Appliance Services''' + +__version__ = '@VERSION@' diff --git a/staslib/avahi.py b/staslib/avahi.py new file mode 100644 index 0000000..c8a3a0b --- /dev/null +++ b/staslib/avahi.py @@ -0,0 +1,456 @@ +# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +''' Module that provides a way to retrieve discovered + services from the Avahi daemon over D-Bus. +''' +import socket +import typing +import logging +import functools +import dasbus.error +import dasbus.connection +import dasbus.client.proxy +import dasbus.client.observer +from gi.repository import GLib +from staslib import defs, conf, gutil + + +def _txt2dict(txt: list): + '''@param txt: A list of list of integers. The integers are the ASCII value + of printable text characters. + ''' + the_dict = dict() + for list_of_chars in txt: + try: + string = functools.reduce(lambda accumulator, c: accumulator + chr(c), list_of_chars, '') + key, val = string.split("=") + the_dict[key.lower()] = val + except Exception: # pylint: disable=broad-except + pass + + return the_dict + + +def _proto2trans(protocol): + '''Return the matching transport for the given protocol.''' + if protocol is None: + return None + + protocol = protocol.strip().lower() + if protocol == 'tcp': + return 'tcp' + + if protocol in ('roce', 'iwarp', 'rdma'): + return 'rdma' + + return None + + +# ****************************************************************************** +class Avahi: # pylint: disable=too-many-instance-attributes + '''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi + daemon and register to be notified when services of a certain + type (stype) are discovered or lost. + ''' + + DBUS_NAME = 'org.freedesktop.Avahi' + DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + '.ServiceBrowser' + DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + '.ServiceResolver' + LOOKUP_USE_MULTICAST = 2 + + IF_UNSPEC = -1 + PROTO_INET = 0 + PROTO_INET6 = 1 + PROTO_UNSPEC = -1 + + LOOKUP_RESULT_LOCAL = 8 # This record/service resides on and was announced by the local host + LOOKUP_RESULT_CACHED = 1 # This response originates from the cache + LOOKUP_RESULT_STATIC = 32 # The returned data has been defined statically by some configuration option + LOOKUP_RESULT_OUR_OWN = 16 # This service belongs to the same local client as the browser object + LOOKUP_RESULT_WIDE_AREA = 2 # This response originates from wide area DNS + LOOKUP_RESULT_MULTICAST = 4 # This response originates from multicast DNS + + result_flags = { + LOOKUP_RESULT_LOCAL: 'local', + LOOKUP_RESULT_CACHED: 'cache', + LOOKUP_RESULT_STATIC: 'static', + LOOKUP_RESULT_OUR_OWN: 'own', + LOOKUP_RESULT_WIDE_AREA: 'wan', + LOOKUP_RESULT_MULTICAST: 'mcast', + } + + protos = {PROTO_INET: 'IPv4', PROTO_INET6: 'IPv6', PROTO_UNSPEC: 'uspecified'} + + @classmethod + def result_flags_as_string(cls, flags): + '''Convert flags to human-readable string''' + return '+'.join((value for flag, value in Avahi.result_flags.items() if (flags & flag) != 0)) + + @classmethod + def protocol_as_string(cls, proto): + '''Convert protocol codes to human-readable strings''' + return Avahi.protos.get(proto, 'unknown') + + # ========================================================================== + def __init__(self, sysbus, change_cb): + self._change_cb = change_cb + self._services = dict() + self._sysbus = sysbus + self._stypes = set() + self._service_browsers = dict() + + # Avahi is an on-demand service. If, for some reason, the avahi-daemon + # were to stop, we need to try to contact it for it to restart. For + # example, when installing the avahi-daemon package on a running system, + # the daemon doesn't get started right away. It needs another process to + # access it over D-Bus to wake it up. The following timer is used to + # periodically query the avahi-daemon until we successfully establish + # first contact. + self._kick_avahi_tmr = gutil.GTimer(60, self._on_kick_avahi) + + # Subscribe for Avahi signals (i.e. events). This must be done before + # any Browser or Resolver is created to avoid race conditions and + # missed events. + self._subscriptions = [ + self._sysbus.connection.signal_subscribe( + Avahi.DBUS_NAME, + Avahi.DBUS_INTERFACE_SERVICE_BROWSER, + 'ItemNew', + None, + None, + 0, + self._service_discovered, + ), + self._sysbus.connection.signal_subscribe( + Avahi.DBUS_NAME, + Avahi.DBUS_INTERFACE_SERVICE_BROWSER, + 'ItemRemove', + None, + None, + 0, + self._service_removed, + ), + self._sysbus.connection.signal_subscribe( + Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_BROWSER, 'Failure', None, None, 0, self._failure_handler + ), + self._sysbus.connection.signal_subscribe( + Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Found', None, None, 0, self._service_identified + ), + self._sysbus.connection.signal_subscribe( + Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Failure', None, None, 0, self._failure_handler + ), + ] + + self._avahi = self._sysbus.get_proxy(Avahi.DBUS_NAME, '/') + + self._avahi_watcher = dasbus.client.observer.DBusObserver(self._sysbus, Avahi.DBUS_NAME) + self._avahi_watcher.service_available.connect(self._avahi_available) + self._avahi_watcher.service_unavailable.connect(self._avahi_unavailable) + self._avahi_watcher.connect_once_available() + + def kill(self): + '''@brief Clean up object''' + logging.debug('Avahi.kill()') + + self._kick_avahi_tmr.kill() + self._kick_avahi_tmr = None + + for subscription in self._subscriptions: + self._sysbus.connection.signal_unsubscribe(subscription) + self._subscriptions = list() + + self._disconnect() + + self._avahi_watcher.service_available.disconnect() + self._avahi_watcher.service_unavailable.disconnect() + self._avahi_watcher.disconnect() + self._avahi_watcher = None + + dasbus.client.proxy.disconnect_proxy(self._avahi) + self._avahi = None + + self._change_cb = None + self._sysbus = None + + def info(self) -> dict: + '''@brief return debug info about this object''' + services = dict() + for service, obj in self._services.items(): + interface, protocol, name, stype, domain = service + key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})' + services[key] = obj.get('data', {}) + + info = { + 'avahi wake up timer': str(self._kick_avahi_tmr), + 'service types': list(self._stypes), + 'services': services, + } + + return info + + def get_controllers(self) -> list: + '''@brief Get the discovery controllers as a list of dict() + as follows: + [ + { + 'transport': tcp, + 'traddr': str(), + 'trsvcid': str(), + 'host-iface': str(), + 'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery', + }, + { + 'transport': tcp, + 'traddr': str(), + 'trsvcid': str(), + 'host-iface': str(), + 'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery', + }, + [...] + ] + ''' + return [service['data'] for service in self._services.values() if len(service['data'])] + + def config_stypes(self, stypes: list): + '''@brief Configure the service types that we want to discover. + @param stypes: A list of services types, e.g. ['_nvme-disc._tcp'] + ''' + self._stypes = set(stypes) + success = self._configure_browsers() + if not success: + self._kick_avahi_tmr.start() + + def kick_start(self): + '''@brief We use this to kick start the Avahi + daemon (i.e. socket activation). + ''' + self._kick_avahi_tmr.clear() + + def _disconnect(self): + logging.debug('Avahi._disconnect()') + for service in self._services.values(): + resolver = service.pop('resolver', None) + if resolver is not None: + try: + resolver.Free() + dasbus.client.proxy.disconnect_proxy(resolver) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex) + + self._services = dict() + + for browser in self._service_browsers.values(): + try: + browser.Free() + dasbus.client.proxy.disconnect_proxy(browser) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Avahi._disconnect() - Failed to Free() browser. %s', ex) + + self._service_browsers = dict() + + def _on_kick_avahi(self): + try: + # try to contact avahi-daemon. This is just a wake + # up call in case the avahi-daemon was sleeping. + self._avahi.GetVersionString() + except dasbus.error.DBusError: + return GLib.SOURCE_CONTINUE + + return GLib.SOURCE_REMOVE + + def _avahi_available(self, _avahi_watcher): + '''@brief Hook up DBus signal handlers for signals from stafd.''' + logging.info('avahi-daemon service available, zeroconf supported.') + success = self._configure_browsers() + if not success: + self._kick_avahi_tmr.start() + + def _avahi_unavailable(self, _avahi_watcher): + self._disconnect() + logging.warning('avahi-daemon not available, zeroconf not supported.') + self._kick_avahi_tmr.start() + + def _configure_browsers(self): + stypes_cur = set(self._service_browsers.keys()) + stypes_to_add = self._stypes - stypes_cur + stypes_to_rm = stypes_cur - self._stypes + + logging.debug('Avahi._configure_browsers() - stypes_to_rm = %s', list(stypes_to_rm)) + logging.debug('Avahi._configure_browsers() - stypes_to_add = %s', list(stypes_to_add)) + + for stype_to_rm in stypes_to_rm: + browser = self._service_browsers.pop(stype_to_rm, None) + if browser is not None: + try: + browser.Free() + dasbus.client.proxy.disconnect_proxy(browser) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex) + + # Find the cached services corresponding to stype_to_rm and remove them + services_to_rm = [service for service in self._services if service[3] == stype_to_rm] + for service in services_to_rm: + resolver = self._services.pop(service, {}).pop('resolver', None) + if resolver is not None: + try: + resolver.Free() + dasbus.client.proxy.disconnect_proxy(resolver) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex) + + for stype in stypes_to_add: + try: + obj_path = self._avahi.ServiceBrowserNew( + Avahi.IF_UNSPEC, Avahi.PROTO_UNSPEC, stype, 'local', Avahi.LOOKUP_USE_MULTICAST + ) + self._service_browsers[stype] = self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path) + except dasbus.error.DBusError as ex: + logging.debug('Avahi._configure_browsers() - Failed to contact avahi-daemon. %s', ex) + logging.warning('avahi-daemon not available, operating w/o mDNS discovery.') + return False + + return True + + def _service_discovered( + self, + _connection, + _sender_name: str, + _object_path: str, + _interface_name: str, + _signal_name: str, + args: typing.Tuple[int, int, str, str, str, int], + *_user_data, + ): + (interface, protocol, name, stype, domain, flags) = args + logging.debug( + 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s', + interface, + socket.if_indextoname(interface), + Avahi.protocol_as_string(protocol), + stype, + domain, + flags, + '(' + Avahi.result_flags_as_string(flags) + '),', + name, + ) + + service = (interface, protocol, name, stype, domain) + if service not in self._services: + try: + obj_path = self._avahi.ServiceResolverNew( + interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST + ) + self._services[service] = { + 'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path), + 'data': {}, + } + except dasbus.error.DBusError as ex: + logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex) + + def _service_removed( + self, + _connection, + _sender_name: str, + _object_path: str, + _interface_name: str, + _signal_name: str, + args: typing.Tuple[int, int, str, str, str, int], + *_user_data, + ): + (interface, protocol, name, stype, domain, flags) = args + logging.debug( + 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s', + interface, + socket.if_indextoname(interface), + Avahi.protocol_as_string(protocol), + stype, + domain, + flags, + '(' + Avahi.result_flags_as_string(flags) + '),', + name, + ) + + service = (interface, protocol, name, stype, domain) + resolver = self._services.pop(service, {}).pop('resolver', None) + if resolver is not None: + try: + resolver.Free() + dasbus.client.proxy.disconnect_proxy(resolver) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex) + + self._change_cb() + + def _service_identified( # pylint: disable=too-many-locals + self, + _connection, + _sender_name: str, + _object_path: str, + _interface_name: str, + _signal_name: str, + args: typing.Tuple[int, int, str, str, str, str, int, str, int, list, int], + *_user_data, + ): + (interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args + txt = _txt2dict(txt) + logging.debug( + 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s', + interface, + socket.if_indextoname(interface), + Avahi.protocol_as_string(protocol), + stype, + domain, + flags, + '(' + Avahi.result_flags_as_string(flags) + '),', + name, + host, + Avahi.protocol_as_string(aprotocol), + address, + port, + txt, + ) + + service = (interface, protocol, name, stype, domain) + if service in self._services: + transport = _proto2trans(txt.get('p')) + if transport is not None: + self._services[service]['data'] = { + 'transport': transport, + 'traddr': address.strip(), + 'trsvcid': str(port).strip(), + # host-iface permitted for tcp alone and not rdma + 'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '', + 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip() + if conf.NvmeOptions().discovery_supp + else defs.WELL_KNOWN_DISC_NQN, + } + + self._change_cb() + else: + logging.error( + 'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s', + address, + socket.if_indextoname(interface).strip(), + txt, + ) + + def _failure_handler( # pylint: disable=no-self-use + self, + _connection, + _sender_name: str, + _object_path: str, + interface_name: str, + _signal_name: str, + args: typing.Tuple[str], + *_user_data, + ): + (error,) = args + if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error: + # ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal. + logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error) diff --git a/staslib/conf.py b/staslib/conf.py new file mode 100644 index 0000000..a54da98 --- /dev/null +++ b/staslib/conf.py @@ -0,0 +1,703 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''nvme-stas configuration module''' + +import re +import os +import sys +import logging +import functools +import configparser +from staslib import defs, singleton, timeparse + +__TOKEN_RE = re.compile(r'\s*;\s*') +__OPTION_RE = re.compile(r'\s*=\s*') + + +class InvalidOption(Exception): + '''Exception raised when an invalid option value is detected''' + + +def _parse_controller(controller): + '''@brief Parse a "controller" entry. Controller entries are strings + composed of several configuration parameters delimited by + semi-colons. Each configuration parameter is specified as a + "key=value" pair. + @return A dictionary of key-value pairs. + ''' + options = dict() + tokens = __TOKEN_RE.split(controller) + for token in tokens: + if token: + try: + option, val = __OPTION_RE.split(token) + options[option.strip()] = val.strip() + except ValueError: + pass + + return options + + +def _parse_single_val(text): + if isinstance(text, str): + return text + if not isinstance(text, list) or len(text) == 0: + return None + + return text[-1] + + +def _parse_list(text): + return text if isinstance(text, list) else [text] + + +def _to_int(text): + try: + return int(_parse_single_val(text)) + except (ValueError, TypeError): + raise InvalidOption # pylint: disable=raise-missing-from + + +def _to_bool(text, positive='true'): + return _parse_single_val(text).lower() == positive + + +def _to_ncc(text): + value = _to_int(text) + if value == 1: # 1 is invalid. A minimum of 2 is required (with the exception of 0, which is valid). + value = 2 + return value + + +def _to_ip_family(text): + return tuple((4 if text == 'ipv4' else 6 for text in _parse_single_val(text).split('+'))) + + +# ****************************************************************************** +class OrderedMultisetDict(dict): + '''This class is used to change the behavior of configparser.ConfigParser + and allow multiple configuration parameters with the same key. The + result is a list of values. + ''' + + def __setitem__(self, key, value): + if key in self and isinstance(value, list): + self[key].extend(value) + else: + super().__setitem__(key, value) + + def __getitem__(self, key): + value = super().__getitem__(key) + + if isinstance(value, str): + return value.split('\n') + + return value + + +class SvcConf(metaclass=singleton.Singleton): # pylint: disable=too-many-public-methods + '''Read and cache configuration file.''' + + OPTION_CHECKER = { + 'Global': { + 'tron': { + 'convert': _to_bool, + 'default': False, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + 'kato': { + 'convert': _to_int, + }, + 'pleo': { + 'convert': functools.partial(_to_bool, positive='enabled'), + 'default': True, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('disabled', 'enabled'), + }, + 'ip-family': { + 'convert': _to_ip_family, + 'default': (4, 6), + 'txt-chk': lambda text: _parse_single_val(text) in ('ipv4', 'ipv6', 'ipv4+ipv6', 'ipv6+ipv4'), + }, + 'queue-size': { + 'convert': _to_int, + 'rng-chk': lambda value: None if value in range(16, 1025) else range(16, 1025), + }, + 'hdr-digest': { + 'convert': _to_bool, + 'default': False, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + 'data-digest': { + 'convert': _to_bool, + 'default': False, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + 'ignore-iface': { + 'convert': _to_bool, + 'default': False, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + 'nr-io-queues': { + 'convert': _to_int, + }, + 'ctrl-loss-tmo': { + 'convert': _to_int, + }, + 'disable-sqflow': { + 'convert': _to_bool, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + 'nr-poll-queues': { + 'convert': _to_int, + }, + 'nr-write-queues': { + 'convert': _to_int, + }, + 'reconnect-delay': { + 'convert': _to_int, + }, + ### BEGIN: LEGACY SECTION TO BE REMOVED ### + 'persistent-connections': { + 'convert': _to_bool, + 'default': False, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + ### END: LEGACY SECTION TO BE REMOVED ### + }, + 'Service Discovery': { + 'zeroconf': { + 'convert': functools.partial(_to_bool, positive='enabled'), + 'default': True, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('disabled', 'enabled'), + }, + }, + 'Discovery controller connection management': { + 'persistent-connections': { + 'convert': _to_bool, + 'default': True, + 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'), + }, + 'zeroconf-connections-persistence': { + 'convert': lambda text: timeparse.timeparse(_parse_single_val(text)), + 'default': timeparse.timeparse('72hours'), + }, + }, + 'I/O controller connection management': { + 'disconnect-scope': { + 'convert': _parse_single_val, + 'default': 'only-stas-connections', + 'txt-chk': lambda text: _parse_single_val(text) + in ('only-stas-connections', 'all-connections-matching-disconnect-trtypes', 'no-disconnect'), + }, + 'disconnect-trtypes': { + # Use set() to eliminate potential duplicates + 'convert': lambda text: set(_parse_single_val(text).split('+')), + 'default': [ + 'tcp', + ], + 'lst-chk': ('tcp', 'rdma', 'fc'), + }, + 'connect-attempts-on-ncc': { + 'convert': _to_ncc, + 'default': 0, + }, + }, + 'Controllers': { + 'controller': { + 'convert': _parse_list, + 'default': [], + }, + 'exclude': { + 'convert': _parse_list, + 'default': [], + }, + ### BEGIN: LEGACY SECTION TO BE REMOVED ### + 'blacklist': { + 'convert': _parse_list, + 'default': [], + }, + ### END: LEGACY SECTION TO BE REMOVED ### + }, + } + + def __init__(self, default_conf=None, conf_file='/dev/null'): + self._config = None + self._defaults = default_conf if default_conf else {} + + if self._defaults is not None and len(self._defaults) != 0: + self._valid_conf = {} + for section, option in self._defaults: + self._valid_conf.setdefault(section, set()).add(option) + else: + self._valid_conf = None + + self._conf_file = conf_file + self.reload() + + def reload(self): + '''@brief Reload the configuration file.''' + self._config = self._read_conf_file() + + @property + def conf_file(self): + '''Return the configuration file name''' + return self._conf_file + + def set_conf_file(self, fname): + '''Set the configuration file name and reload config''' + self._conf_file = fname + self.reload() + + def get_option(self, section, option, ignore_default=False): # pylint: disable=too-many-locals + '''Retrieve @option from @section, convert raw text to + appropriate object type, and validate.''' + try: + checker = self.OPTION_CHECKER[section][option] + except KeyError: + logging.error('Requesting invalid section=%s and/or option=%s', section, option) + raise + + default = checker.get('default', None) + + try: + text = self._config.get(section=section, option=option) + except (configparser.NoSectionError, configparser.NoOptionError, KeyError): + return None if ignore_default else self._defaults.get((section, option), default) + + return self._check(text, section, option, default) + + tron = property(functools.partial(get_option, section='Global', option='tron')) + kato = property(functools.partial(get_option, section='Global', option='kato')) + ip_family = property(functools.partial(get_option, section='Global', option='ip-family')) + queue_size = property(functools.partial(get_option, section='Global', option='queue-size')) + hdr_digest = property(functools.partial(get_option, section='Global', option='hdr-digest')) + data_digest = property(functools.partial(get_option, section='Global', option='data-digest')) + ignore_iface = property(functools.partial(get_option, section='Global', option='ignore-iface')) + pleo_enabled = property(functools.partial(get_option, section='Global', option='pleo')) + nr_io_queues = property(functools.partial(get_option, section='Global', option='nr-io-queues')) + ctrl_loss_tmo = property(functools.partial(get_option, section='Global', option='ctrl-loss-tmo')) + disable_sqflow = property(functools.partial(get_option, section='Global', option='disable-sqflow')) + nr_poll_queues = property(functools.partial(get_option, section='Global', option='nr-poll-queues')) + nr_write_queues = property(functools.partial(get_option, section='Global', option='nr-write-queues')) + reconnect_delay = property(functools.partial(get_option, section='Global', option='reconnect-delay')) + + zeroconf_enabled = property(functools.partial(get_option, section='Service Discovery', option='zeroconf')) + + zeroconf_persistence_sec = property( + functools.partial( + get_option, section='Discovery controller connection management', option='zeroconf-connections-persistence' + ) + ) + + disconnect_scope = property( + functools.partial(get_option, section='I/O controller connection management', option='disconnect-scope') + ) + disconnect_trtypes = property( + functools.partial(get_option, section='I/O controller connection management', option='disconnect-trtypes') + ) + connect_attempts_on_ncc = property( + functools.partial(get_option, section='I/O controller connection management', option='connect-attempts-on-ncc') + ) + + @property + def stypes(self): + '''@brief Get the DNS-SD/mDNS service types.''' + return ['_nvme-disc._tcp', '_nvme-disc._udp'] if self.zeroconf_enabled else list() + + @property + def persistent_connections(self): + '''@brief return the "persistent-connections" config parameter''' + section = 'Discovery controller connection management' + option = 'persistent-connections' + + value = self.get_option(section, option, ignore_default=True) + legacy = self.get_option('Global', 'persistent-connections', ignore_default=True) + + if value is None and legacy is None: + return self._defaults.get((section, option), True) + + return value or legacy + + def get_controllers(self): + '''@brief Return the list of controllers in the config file. + Each controller is in the form of a dictionary as follows. + Note that some of the keys are optional. + { + 'transport': [TRANSPORT], + 'traddr': [TRADDR], + 'trsvcid': [TRSVCID], + 'host-traddr': [TRADDR], + 'host-iface': [IFACE], + 'subsysnqn': [NQN], + 'dhchap-ctrl-secret': [KEY], + 'hdr-digest': [BOOL] + 'data-digest': [BOOL] + 'nr-io-queues': [NUMBER] + 'nr-write-queues': [NUMBER] + 'nr-poll-queues': [NUMBER] + 'queue-size': [SIZE] + 'kato': [KATO] + 'reconnect-delay': [SECONDS] + 'ctrl-loss-tmo': [SECONDS] + 'disable-sqflow': [BOOL] + } + ''' + controller_list = self.get_option('Controllers', 'controller') + cids = [_parse_controller(controller) for controller in controller_list] + for cid in cids: + try: + # replace 'nqn' key by 'subsysnqn', if present. + cid['subsysnqn'] = cid.pop('nqn') + except KeyError: + pass + + # Verify values of the options used to overload the matching [Global] options + for option in cid: + if option in self.OPTION_CHECKER['Global']: + value = self._check(cid[option], 'Global', option, None) + if value is not None: + cid[option] = value + + return cids + + def get_excluded(self): + '''@brief Return the list of excluded controllers in the config file. + Each excluded controller is in the form of a dictionary + as follows. All the keys are optional. + { + 'transport': [TRANSPORT], + 'traddr': [TRADDR], + 'trsvcid': [TRSVCID], + 'host-iface': [IFACE], + 'subsysnqn': [NQN], + } + ''' + controller_list = self.get_option('Controllers', 'exclude') + + # 2022-09-20: Look for "blacklist". This is for backwards compatibility + # with releases 1.0 to 1.1.6. This is to be phased out (i.e. remove by 2024) + controller_list += self.get_option('Controllers', 'blacklist') + + excluded = [_parse_controller(controller) for controller in controller_list] + for controller in excluded: + controller.pop('host-traddr', None) # remove host-traddr + try: + # replace 'nqn' key by 'subsysnqn', if present. + controller['subsysnqn'] = controller.pop('nqn') + except KeyError: + pass + return excluded + + def _check(self, text, section, option, default): + checker = self.OPTION_CHECKER[section][option] + text_checker = checker.get('txt-chk', None) + if text_checker is not None and not text_checker(text): + logging.warning( + 'File:%s [%s]: %s - Text check found invalid value "%s". Default will be used', + self.conf_file, + section, + option, + text, + ) + return self._defaults.get((section, option), default) + + converter = checker.get('convert', None) + try: + value = converter(text) + except InvalidOption: + logging.warning( + 'File:%s [%s]: %s - Data converter found invalid value "%s". Default will be used', + self.conf_file, + section, + option, + text, + ) + return self._defaults.get((section, option), default) + + value_in_range = checker.get('rng-chk', None) + if value_in_range is not None: + expected_range = value_in_range(value) + if expected_range is not None: + logging.warning( + 'File:%s [%s]: %s - "%s" is not within range %s..%s. Default will be used', + self.conf_file, + section, + option, + value, + min(expected_range), + max(expected_range), + ) + return self._defaults.get((section, option), default) + + list_checker = checker.get('lst-chk', None) + if list_checker: + values = set() + for item in value: + if item not in list_checker: + logging.warning( + 'File:%s [%s]: %s - List checker found invalid item "%s" will be ignored.', + self.conf_file, + section, + option, + item, + ) + else: + values.add(item) + + if len(values) == 0: + return self._defaults.get((section, option), default) + + value = list(values) + + return value + + def _read_conf_file(self): + '''@brief Read the configuration file if the file exists.''' + config = configparser.ConfigParser( + default_section=None, + allow_no_value=True, + delimiters=('='), + interpolation=None, + strict=False, + dict_type=OrderedMultisetDict, + ) + if self._conf_file and os.path.isfile(self._conf_file): + config.read(self._conf_file) + + # Parse Configuration and validate. + if self._valid_conf is not None: + invalid_sections = set() + for section in config.sections(): + if section not in self._valid_conf: + invalid_sections.add(section) + else: + invalid_options = set() + for option in config.options(section): + if option not in self._valid_conf.get(section, []): + invalid_options.add(option) + + if len(invalid_options) != 0: + logging.error( + 'File:%s [%s] contains invalid options: %s', + self.conf_file, + section, + invalid_options, + ) + + if len(invalid_sections) != 0: + logging.error( + 'File:%s contains invalid sections: %s', + self.conf_file, + invalid_sections, + ) + + return config + + +# ****************************************************************************** +class SysConf(metaclass=singleton.Singleton): + '''Read and cache the host configuration file.''' + + def __init__(self, conf_file=defs.SYS_CONF_FILE): + self._config = None + self._conf_file = conf_file + self.reload() + + def reload(self): + '''@brief Reload the configuration file.''' + self._config = self._read_conf_file() + + @property + def conf_file(self): + '''Return the configuration file name''' + return self._conf_file + + def set_conf_file(self, fname): + '''Set the configuration file name and reload config''' + self._conf_file = fname + self.reload() + + def as_dict(self): + '''Return configuration as a dictionary''' + return { + 'hostnqn': self.hostnqn, + 'hostid': self.hostid, + 'hostkey': self.hostkey, + 'symname': self.hostsymname, + } + + @property + def hostnqn(self): + '''@brief return the host NQN + @return: Host NQN + @raise: Host NQN is mandatory. The program will terminate if a + Host NQN cannot be determined. + ''' + try: + value = self.__get_value('Host', 'nqn', defs.NVME_HOSTNQN) + except FileNotFoundError as ex: + sys.exit(f'Error reading mandatory Host NQN (see stasadm --help): {ex}') + + if value is not None and not value.startswith('nqn.'): + sys.exit(f'Error Host NQN "{value}" should start with "nqn."') + + return value + + @property + def hostid(self): + '''@brief return the host ID + @return: Host ID + @raise: Host ID is mandatory. The program will terminate if a + Host ID cannot be determined. + ''' + try: + value = self.__get_value('Host', 'id', defs.NVME_HOSTID) + except FileNotFoundError as ex: + sys.exit(f'Error reading mandatory Host ID (see stasadm --help): {ex}') + + return value + + @property + def hostkey(self): + '''@brief return the host key + @return: Host key + @raise: Host key is optional, but mandatory if authorization will be performed. + ''' + try: + value = self.__get_value('Host', 'key', defs.NVME_HOSTKEY) + except FileNotFoundError as ex: + logging.info('Host key undefined: %s', ex) + value = None + + return value + + @property + def hostsymname(self): + '''@brief return the host symbolic name (or None) + @return: symbolic name or None + ''' + try: + value = self.__get_value('Host', 'symname') + except FileNotFoundError as ex: + logging.warning('Error reading host symbolic name (will remain undefined): %s', ex) + value = None + + return value + + def _read_conf_file(self): + '''@brief Read the configuration file if the file exists.''' + config = configparser.ConfigParser( + default_section=None, allow_no_value=True, delimiters=('='), interpolation=None, strict=False + ) + if os.path.isfile(self._conf_file): + config.read(self._conf_file) + return config + + def __get_value(self, section, option, default_file=None): + '''@brief A configuration file consists of sections, each led by a + [section] header, followed by key/value entries separated + by a equal sign (=). This method retrieves the value + associated with the key @option from the section @section. + If the value starts with the string "file://", then the value + will be retrieved from that file. + + @param section: Configuration section + @param option: The key to look for + @param default_file: A file that contains the default value + + @return: On success, the value associated with the key. On failure, + this method will return None is a default_file is not + specified, or will raise an exception if a file is not + found. + + @raise: This method will raise the FileNotFoundError exception if + the value retrieved is a file that does not exist. + ''' + try: + value = self._config.get(section=section, option=option) + if not value.startswith('file://'): + return value + file = value[7:] + except (configparser.NoSectionError, configparser.NoOptionError, KeyError): + if default_file is None: + return None + file = default_file + + try: + with open(file) as f: # pylint: disable=unspecified-encoding + return f.readline().split()[0] + except IndexError: + return None + + +# ****************************************************************************** +class NvmeOptions(metaclass=singleton.Singleton): + '''Object used to read and cache contents of file /dev/nvme-fabrics. + Note that this file was not readable prior to Linux 5.16. + ''' + + def __init__(self): + # Supported options can be determined by looking at the kernel version + # or by reading '/dev/nvme-fabrics'. The ability to read the options + # from '/dev/nvme-fabrics' was only introduced in kernel 5.17, but may + # have been backported to older kernels. In any case, if the kernel + # version meets the minimum version for that option, then we don't + # even need to read '/dev/nvme-fabrics'. + self._supported_options = { + 'discovery': defs.KERNEL_VERSION >= defs.KERNEL_TP8013_MIN_VERSION, + 'host_iface': defs.KERNEL_VERSION >= defs.KERNEL_IFACE_MIN_VERSION, + 'dhchap_secret': defs.KERNEL_VERSION >= defs.KERNEL_HOSTKEY_MIN_VERSION, + 'dhchap_ctrl_secret': defs.KERNEL_VERSION >= defs.KERNEL_CTRLKEY_MIN_VERSION, + } + + # If some of the options are False, we need to check wether they can be + # read from '/dev/nvme-fabrics'. This method allows us to determine that + # an older kernel actually supports a specific option because it was + # backported to that kernel. + if not all(self._supported_options.values()): # At least one option is False. + try: + with open('/dev/nvme-fabrics') as f: # pylint: disable=unspecified-encoding + options = [option.split('=')[0].strip() for option in f.readline().rstrip('\n').split(',')] + except PermissionError: # Must be root to read this file + raise + except (OSError, FileNotFoundError): + logging.warning('Cannot determine which NVMe options the kernel supports') + else: + for option, supported in self._supported_options.items(): + if not supported: + self._supported_options[option] = option in options + + def __str__(self): + return f'supported options: {self._supported_options}' + + def get(self): + '''get the supported options as a dict''' + return self._supported_options + + @property + def discovery_supp(self): + '''This option adds support for TP8013''' + return self._supported_options['discovery'] + + @property + def host_iface_supp(self): + '''This option allows forcing connections to go over + a specific interface regardless of the routing tables. + ''' + return self._supported_options['host_iface'] + + @property + def dhchap_hostkey_supp(self): + '''This option allows specifying the host DHCHAP key used for authentication.''' + return self._supported_options['dhchap_secret'] + + @property + def dhchap_ctrlkey_supp(self): + '''This option allows specifying the controller DHCHAP key used for authentication.''' + return self._supported_options['dhchap_ctrl_secret'] diff --git a/staslib/ctrl.py b/staslib/ctrl.py new file mode 100644 index 0000000..97a1c7b --- /dev/null +++ b/staslib/ctrl.py @@ -0,0 +1,850 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''This module defines the base Controller object from which the +Dc (Discovery Controller) and Ioc (I/O Controller) objects are derived.''' + +import time +import inspect +import logging +from gi.repository import GLib +from libnvme import nvme +from staslib import conf, defs, gutil, trid, udev, stas + + +DLP_CHANGED = ( + (nvme.NVME_LOG_LID_DISCOVER << 16) | (nvme.NVME_AER_NOTICE_DISC_CHANGED << 8) | nvme.NVME_AER_NOTICE +) # 0x70f002 + + +def get_eflags(dlpe): + '''@brief Return eflags field of dlpe''' + return int(dlpe.get('eflags', 0)) if dlpe else 0 + + +def get_ncc(eflags: int): + '''@brief Return True if Not Connected to CDC bit is asserted, False otherwise''' + return eflags & nvme.NVMF_DISC_EFLAGS_NCC != 0 + + +def dlp_supp_opts_as_string(dlp_supp_opts: int): + '''@brief Return the list of options supported by the Get + discovery log page command. + ''' + data = { + nvme.NVMF_LOG_DISC_LID_EXTDLPES: "EXTDLPES", + nvme.NVMF_LOG_DISC_LID_PLEOS: "PLEOS", + nvme.NVMF_LOG_DISC_LID_ALLSUBES: "ALLSUBES", + } + return [txt for msk, txt in data.items() if dlp_supp_opts & msk] + + +# ****************************************************************************** +class Controller(stas.ControllerABC): # pylint: disable=too-many-instance-attributes + '''@brief Base class used to manage the connection to a controller.''' + + def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False): + sysconf = conf.SysConf() + self._nvme_options = conf.NvmeOptions() + self._root = nvme.root() + self._host = nvme.host( + self._root, hostnqn=sysconf.hostnqn, hostid=sysconf.hostid, hostsymname=sysconf.hostsymname + ) + self._host.dhchap_key = sysconf.hostkey if self._nvme_options.dhchap_hostkey_supp else None + self._udev = udev.UDEV + self._device = None # Refers to the nvme device (e.g. /dev/nvme[n]) + self._ctrl = None # libnvme's nvme.ctrl object + self._connect_op = None + + super().__init__(tid, service, discovery_ctrl) + + def _release_resources(self): + logging.debug('Controller._release_resources() - %s | %s', self.id, self.device) + + if self._udev: + self._udev.unregister_for_device_events(self._on_udev_notification) + + self._kill_ops() + + super()._release_resources() + + self._ctrl = None + self._udev = None + self._host = None + self._root = None + self._nvme_options = None + + @property + def device(self) -> str: + '''@brief return the Linux nvme device id (e.g. nvme3) or empty + string if no device is associated with this controller''' + if not self._device and self._ctrl and self._ctrl.name: + self._device = self._ctrl.name + + return self._device or 'nvme?' + + def all_ops_completed(self) -> bool: + '''@brief Returns True if all operations have completed. False otherwise.''' + return self._connect_op is None or self._connect_op.completed() + + def connected(self): + '''@brief Return whether a connection is established''' + return self._ctrl and self._ctrl.connected() + + def controller_id_dict(self) -> dict: + '''@brief return the controller ID as a dict.''' + cid = super().controller_id_dict() + cid['device'] = self.device + return cid + + def details(self) -> dict: + '''@brief return detailed debug info about this controller''' + details = super().details() + details.update( + self._udev.get_attributes(self.device, ('hostid', 'hostnqn', 'model', 'serial', 'dctype', 'cntrltype')) + ) + details['connected'] = str(self.connected()) + return details + + def info(self) -> dict: + '''@brief Get the controller info for this object''' + info = super().info() + if self._connect_op: + info['connect operation'] = str(self._connect_op.as_dict()) + return info + + def cancel(self): + '''@brief Used to cancel pending operations.''' + super().cancel() + if self._connect_op: + self._connect_op.cancel() + + def _kill_ops(self): + if self._connect_op: + self._connect_op.kill() + self._connect_op = None + + def set_level_from_tron(self, tron): + '''Set log level based on TRON''' + if self._root: + self._root.log_level("debug" if tron else "err") + + def _on_udev_notification(self, udev_obj): + if self._alive(): + if udev_obj.action == 'change': + nvme_aen = udev_obj.get('NVME_AEN') + nvme_event = udev_obj.get('NVME_EVENT') + if isinstance(nvme_aen, str): + logging.info('%s | %s - Received AEN: %s', self.id, udev_obj.sys_name, nvme_aen) + self._on_aen(int(nvme_aen, 16)) + if isinstance(nvme_event, str): + self._on_nvme_event(nvme_event) + elif udev_obj.action == 'remove': + logging.info('%s | %s - Received "remove" event', self.id, udev_obj.sys_name) + self._on_ctrl_removed(udev_obj) + else: + logging.debug( + 'Controller._on_udev_notification() - %s | %s: Received "%s" event', + self.id, + udev_obj.sys_name, + udev_obj.action, + ) + else: + logging.debug( + 'Controller._on_udev_notification() - %s | %s: Received event on dead object. udev_obj %s: %s', + self.id, + self.device, + udev_obj.action, + udev_obj.sys_name, + ) + + def _on_ctrl_removed(self, udev_obj): # pylint: disable=unused-argument + if self._udev: + self._udev.unregister_for_device_events(self._on_udev_notification) + self._kill_ops() # Kill all pending operations + self._ctrl = None + + # Defer removal of this object to the next main loop's idle period. + GLib.idle_add(self._serv.remove_controller, self, True) + + def _get_cfg(self): + '''Get configuration parameters. These may either come from the [Global] + section or from a "controller" entry in the configuration file. A + definition found in a "controller" entry overrides the same definition + found in the [Global] section. + ''' + cfg = {} + service_conf = conf.SvcConf() + for option, keyword in ( + ('kato', 'keep_alive_tmo'), + ('queue-size', 'queue_size'), + ('hdr-digest', 'hdr_digest'), + ('data-digest', 'data_digest'), + ('nr-io-queues', 'nr_io_queues'), + ('ctrl-loss-tmo', 'ctrl_loss_tmo'), + ('disable-sqflow', 'disable_sqflow'), + ('nr-poll-queues', 'nr_poll_queues'), + ('nr-write-queues', 'nr_write_queues'), + ('reconnect-delay', 'reconnect_delay'), + ): + # Check if the value is defined as a "controller" entry (i.e. override) + ovrd_val = self.tid.cfg.get(option, None) + if ovrd_val is not None: + cfg[keyword] = ovrd_val + else: + # Check if the value is found in the [Global] section. + glob_val = service_conf.get_option('Global', option) + if glob_val is not None: + cfg[keyword] = glob_val + + return cfg + + def _do_connect(self): + service_conf = conf.SvcConf() + host_iface = ( + self.tid.host_iface + if (self.tid.host_iface and not service_conf.ignore_iface and self._nvme_options.host_iface_supp) + else None + ) + self._ctrl = nvme.ctrl( + self._root, + subsysnqn=self.tid.subsysnqn, + transport=self.tid.transport, + traddr=self.tid.traddr, + trsvcid=self.tid.trsvcid if self.tid.trsvcid else None, + host_traddr=self.tid.host_traddr if self.tid.host_traddr else None, + host_iface=host_iface, + ) + self._ctrl.discovery_ctrl_set(self._discovery_ctrl) + + # Set the DHCHAP key on the controller + # NOTE that this will eventually have to + # change once we have support for AVE (TP8019) + ctrl_dhchap_key = self.tid.cfg.get('dhchap-ctrl-secret') + if ctrl_dhchap_key and self._nvme_options.dhchap_ctrlkey_supp: + has_dhchap_key = hasattr(self._ctrl, 'dhchap_key') + if not has_dhchap_key: + logging.warning( + '%s | %s - libnvme-%s does not allow setting the controller DHCHAP key. Please upgrade libnvme.', + self.id, + self.device, + defs.LIBNVME_VERSION, + ) + else: + self._ctrl.dhchap_key = ctrl_dhchap_key + + # Audit existing nvme devices. If we find a match, then + # we'll just borrow that device instead of creating a new one. + udev_obj = self._find_existing_connection() + if udev_obj is not None: + # A device already exists. + self._device = udev_obj.sys_name + logging.debug( + 'Controller._do_connect() - %s Found existing control device: %s', self.id, udev_obj.sys_name + ) + self._connect_op = gutil.AsyncTask( + self._on_connect_success, self._on_connect_fail, self._ctrl.init, self._host, int(udev_obj.sys_number) + ) + else: + cfg = self._get_cfg() + logging.debug( + 'Controller._do_connect() - %s Connecting to nvme control with cfg=%s', self.id, cfg + ) + self._connect_op = gutil.AsyncTask( + self._on_connect_success, self._on_connect_fail, self._ctrl.connect, self._host, cfg + ) + + self._connect_op.run_async() + + # -------------------------------------------------------------------------- + def _on_connect_success(self, op_obj: gutil.AsyncTask, data): + '''@brief Function called when we successfully connect to the + Controller. + ''' + op_obj.kill() + self._connect_op = None + + if self._alive(): + self._device = self._ctrl.name + logging.info('%s | %s - Connection established!', self.id, self.device) + self._connect_attempts = 0 + self._udev.register_for_device_events(self._device, self._on_udev_notification) + else: + logging.debug( + 'Controller._on_connect_success() - %s | %s: Received event on dead object. data=%s', + self.id, + self.device, + data, + ) + + def _on_connect_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt): # pylint: disable=unused-argument + '''@brief Function called when we fail to connect to the Controller.''' + op_obj.kill() + self._connect_op = None + if self._alive(): + if self._connect_attempts == 1: + # Do a fast re-try on the first failure. + self._retry_connect_tmr.set_timeout(self.FAST_CONNECT_RETRY_PERIOD_SEC) + elif self._connect_attempts == 2: + # If the fast connect re-try fails, then we can print a message to + # indicate the failure, and start a slow re-try period. + self._retry_connect_tmr.set_timeout(self.CONNECT_RETRY_PERIOD_SEC) + logging.error('%s Failed to connect to controller. %s %s', self.id, err.domain, err.message) + + if self._should_try_to_reconnect(): + logging.debug( + 'Controller._on_connect_fail() - %s %s. Retry in %s sec.', + self.id, + err, + self._retry_connect_tmr.get_timeout(), + ) + self._retry_connect_tmr.start() + else: + logging.debug( + 'Controller._on_connect_fail() - %s Received event on dead object. %s %s', + self.id, + err.domain, + err.message, + ) + + def disconnect(self, disconnected_cb, keep_connection): + '''@brief Issue an asynchronous disconnect command to a Controller. + Once the async command has completed, the callback 'disconnected_cb' + will be invoked. If a controller is already disconnected, then the + callback will be added to the main loop's next idle slot to be executed + ASAP. + + @param disconnected_cb: Callback to be called when disconnect has + completed. the callback must have this signature: + def cback(controller: Controller, success: bool) + @param keep_connection: Whether the underlying connection should remain + in the kernel. + ''' + logging.debug( + 'Controller.disconnect() - %s | %s: keep_connection=%s', self.id, self.device, keep_connection + ) + if self._ctrl and self._ctrl.connected() and not keep_connection: + logging.info('%s | %s - Disconnect initiated', self.id, self.device) + op = gutil.AsyncTask(self._on_disconn_success, self._on_disconn_fail, self._ctrl.disconnect) + op.run_async(disconnected_cb) + else: + # Defer callback to the next main loop's idle period. The callback + # cannot be called directly as the current Controller object is in the + # process of being disconnected and the callback will in fact delete + # the object. This would invariably lead to unpredictable outcome. + GLib.idle_add(disconnected_cb, self, True) + + def _on_disconn_success(self, op_obj: gutil.AsyncTask, data, disconnected_cb): # pylint: disable=unused-argument + logging.debug('Controller._on_disconn_success() - %s | %s', self.id, self.device) + op_obj.kill() + # Defer callback to the next main loop's idle period. The callback + # cannot be called directly as the current Controller object is in the + # process of being disconnected and the callback will in fact delete + # the object. This would invariably lead to unpredictable outcome. + GLib.idle_add(disconnected_cb, self, True) + + def _on_disconn_fail( + self, op_obj: gutil.AsyncTask, err, fail_cnt, disconnected_cb + ): # pylint: disable=unused-argument + logging.debug('Controller._on_disconn_fail() - %s | %s: %s', self.id, self.device, err) + op_obj.kill() + # Defer callback to the next main loop's idle period. The callback + # cannot be called directly as the current Controller object is in the + # process of being disconnected and the callback will in fact delete + # the object. This would invariably lead to unpredictable outcome. + GLib.idle_add(disconnected_cb, self, False) + + +# ****************************************************************************** +class Dc(Controller): + '''@brief This object establishes a connection to one Discover Controller (DC). + It retrieves the discovery log pages and caches them. + It also monitors udev events associated with that DC and updates + the cached discovery log pages accordingly. + ''' + + GET_LOG_PAGE_RETRY_RERIOD_SEC = 20 + REGISTRATION_RETRY_RERIOD_SEC = 5 + GET_SUPPORTED_RETRY_RERIOD_SEC = 5 + + def __init__(self, staf, tid: trid.TID, log_pages=None, origin=None): + super().__init__(tid, staf, discovery_ctrl=True) + self._register_op = None + self._get_supported_op = None + self._get_log_op = None + self._origin = origin + self._log_pages = log_pages if log_pages else list() # Log pages cache + + # For Avahi-discovered DCs that later become unresponsive, monitor how + # long the controller remains unresponsive and if it does not return for + # a configurable soak period (_ctrl_unresponsive_tmr), remove that + # controller. Only Avahi-discovered controllers need this timeout-based + # cleanup. + self._ctrl_unresponsive_time = None # The time at which connectivity was lost + self._ctrl_unresponsive_tmr = gutil.GTimer(0, self._serv.controller_unresponsive, self.tid) + + def _release_resources(self): + logging.debug('Dc._release_resources() - %s | %s', self.id, self.device) + super()._release_resources() + + if self._ctrl_unresponsive_tmr is not None: + self._ctrl_unresponsive_tmr.kill() + + self._log_pages = list() + self._ctrl_unresponsive_tmr = None + + def _kill_ops(self): + super()._kill_ops() + if self._get_log_op: + self._get_log_op.kill() + self._get_log_op = None + if self._register_op: + self._register_op.kill() + self._register_op = None + if self._get_supported_op: + self._get_supported_op.kill() + self._get_supported_op = None + + def all_ops_completed(self) -> bool: + '''@brief Returns True if all operations have completed. False otherwise.''' + return ( + super().all_ops_completed() + and (self._get_log_op is None or self._get_log_op.completed()) + and (self._register_op is None or self._register_op.completed()) + and (self._get_supported_op is None or self._get_supported_op.completed()) + ) + + @property + def origin(self): + '''@brief Return how this controller came into existance. Was it + "discovered" through mDNS service discovery (TP8009), was it manually + "configured" in stafd.conf, or was it a "referral". + ''' + return self._origin + + @origin.setter + def origin(self, value): + '''@brief Set the origin of this controller.''' + if value in ('discovered', 'configured', 'referral'): + self._origin = value + self._handle_lost_controller() + else: + logging.error('%s | %s - Trying to set invalid origin to %s', self.id, self.device, value) + + def reload_hdlr(self): + '''@brief This is called when a "reload" signal is received.''' + logging.debug('Dc.reload_hdlr() - %s | %s', self.id, self.device) + + self._handle_lost_controller() + self._resync_with_controller() + + def info(self) -> dict: + '''@brief Get the controller info for this object''' + timeout = conf.SvcConf().zeroconf_persistence_sec + unresponsive_time = ( + time.asctime(self._ctrl_unresponsive_time) if self._ctrl_unresponsive_time is not None else '---' + ) + info = super().info() + info['origin'] = self.origin + if self.origin == 'discovered': + # The code that handles "unresponsive" DCs only applies to + # discovered DCs. So, let's only print that info when it's relevant. + info['unresponsive timer'] = str(self._ctrl_unresponsive_tmr) + info['unresponsive timeout'] = f'{timeout} sec' if timeout >= 0 else 'forever' + info['unresponsive time'] = unresponsive_time + if self._get_log_op: + info['get log page operation'] = str(self._get_log_op.as_dict()) + if self._register_op: + info['register operation'] = str(self._register_op.as_dict()) + if self._get_supported_op: + info['get supported log page operation'] = str(self._get_supported_op.as_dict()) + return info + + def cancel(self): + '''@brief Used to cancel pending operations.''' + super().cancel() + if self._get_log_op: + self._get_log_op.cancel() + if self._register_op: + self._register_op.cancel() + if self._get_supported_op: + self._get_supported_op.cancel() + + def log_pages(self) -> list: + '''@brief Get the cached log pages for this object''' + return self._log_pages + + def referrals(self) -> list: + '''@brief Return the list of referrals''' + return [page for page in self._log_pages if page['subtype'] == 'referral'] + + def _is_ddc(self): + return self._ctrl and self._ctrl.dctype != 'cdc' + + def _on_aen(self, aen: int): + if aen == DLP_CHANGED and self._get_log_op: + self._get_log_op.run_async() + + def _handle_lost_controller(self): + if self.origin == 'discovered': # Only apply to mDNS-discovered DCs + if not self._serv.is_avahi_reported(self.tid) and not self.connected(): + timeout = conf.SvcConf().zeroconf_persistence_sec + if timeout >= 0: + if self._ctrl_unresponsive_time is None: + self._ctrl_unresponsive_time = time.localtime() + self._ctrl_unresponsive_tmr.start(timeout) + logging.info( + '%s | %s - Controller is not responding. Will be removed by %s unless restored', + self.id, + self.device, + time.ctime(time.mktime(self._ctrl_unresponsive_time) + timeout), + ) + + return + + logging.info( + '%s | %s - Controller not responding. Retrying...', + self.id, + self.device, + ) + + self._ctrl_unresponsive_time = None + self._ctrl_unresponsive_tmr.stop() + self._ctrl_unresponsive_tmr.set_timeout(0) + + def is_unresponsive(self): + '''@brief For "discovered" DC, return True if DC is unresponsive, + False otherwise. + ''' + return ( + self.origin == 'discovered' + and not self._serv.is_avahi_reported(self.tid) + and not self.connected() + and self._ctrl_unresponsive_time is not None + and self._ctrl_unresponsive_tmr.time_remaining() <= 0 + ) + + def _resync_with_controller(self): + '''Communicate with DC to resync the states''' + if self._register_op: + self._register_op.run_async() + elif self._get_supported_op: + self._get_supported_op.run_async() + elif self._get_log_op: + self._get_log_op.run_async() + + def _on_nvme_event(self, nvme_event: str): + if nvme_event in ('connected', 'rediscover'): + # This event indicates that the kernel + # driver re-connected to the DC. + logging.debug( + 'Dc._on_nvme_event() - %s | %s: Received "%s" event', + self.id, + self.device, + nvme_event, + ) + self._resync_with_controller() + + def _find_existing_connection(self): + return self._udev.find_nvme_dc_device(self.tid) + + def _post_registration_actions(self): + # Need to check that supported_log_pages() is available (introduced in libnvme 1.2) + has_supported_log_pages = hasattr(self._ctrl, 'supported_log_pages') + if not has_supported_log_pages: + logging.warning( + '%s | %s - libnvme-%s does not support "Get supported log pages". Please upgrade libnvme.', + self.id, + self.device, + defs.LIBNVME_VERSION, + ) + + if conf.SvcConf().pleo_enabled and self._is_ddc() and has_supported_log_pages: + self._get_supported_op = gutil.AsyncTask( + self._on_get_supported_success, self._on_get_supported_fail, self._ctrl.supported_log_pages + ) + self._get_supported_op.run_async() + else: + self._get_log_op = gutil.AsyncTask(self._on_get_log_success, self._on_get_log_fail, self._ctrl.discover) + self._get_log_op.run_async() + + # -------------------------------------------------------------------------- + def _on_connect_success(self, op_obj: gutil.AsyncTask, data): + '''@brief Function called when we successfully connect to the + Discovery Controller. + ''' + super()._on_connect_success(op_obj, data) + + if self._alive(): + self._ctrl_unresponsive_time = None + self._ctrl_unresponsive_tmr.stop() + self._ctrl_unresponsive_tmr.set_timeout(0) + + if self._ctrl.is_registration_supported(): + self._register_op = gutil.AsyncTask( + self._on_registration_success, + self._on_registration_fail, + self._ctrl.registration_ctlr, + nvme.NVMF_DIM_TAS_REGISTER, + ) + self._register_op.run_async() + else: + self._post_registration_actions() + + def _on_connect_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt): + '''@brief Function called when we fail to connect to the Controller.''' + super()._on_connect_fail(op_obj, err, fail_cnt) + + if self._alive(): + self._handle_lost_controller() + + # -------------------------------------------------------------------------- + def _on_registration_success(self, op_obj: gutil.AsyncTask, data): # pylint: disable=unused-argument + '''@brief Function called when we successfully register with the + Discovery Controller. See self._register_op object + for details. + + NOTE: The name _on_registration_success() may be misleading. "success" + refers to the fact that a successful exchange was made with the DC. + It doesn't mean that the registration itself succeeded. + ''' + if self._alive(): + if data is not None: + logging.warning('%s | %s - Registration error. %s.', self.id, self.device, data) + else: + logging.debug('Dc._on_registration_success() - %s | %s', self.id, self.device) + + self._post_registration_actions() + else: + logging.debug( + 'Dc._on_registration_success() - %s | %s: Received event on dead object.', self.id, self.device + ) + + def _on_registration_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt): + '''@brief Function called when we fail to register with the + Discovery Controller. See self._register_op object + for details. + ''' + if self._alive(): + logging.debug( + 'Dc._on_registration_fail() - %s | %s: %s. Retry in %s sec', + self.id, + self.device, + err, + Dc.REGISTRATION_RETRY_RERIOD_SEC, + ) + if fail_cnt == 1: # Throttle the logs. Only print the first time the command fails + logging.error('%s | %s - Failed to register with Discovery Controller. %s', self.id, self.device, err) + op_obj.retry(Dc.REGISTRATION_RETRY_RERIOD_SEC) + else: + logging.debug( + 'Dc._on_registration_fail() - %s | %s: Received event on dead object. %s', + self.id, + self.device, + err, + ) + op_obj.kill() + + # -------------------------------------------------------------------------- + def _on_get_supported_success(self, op_obj: gutil.AsyncTask, data): # pylint: disable=unused-argument + '''@brief Function called when we successfully retrieved the supported + log pages from the Discovery Controller. See self._get_supported_op object + for details. + + NOTE: The name _on_get_supported_success() may be misleading. "success" + refers to the fact that a successful exchange was made with the DC. + It doesn't mean that the Get Supported Log Page itself succeeded. + ''' + if self._alive(): + try: + dlp_supp_opts = data[nvme.NVME_LOG_LID_DISCOVER] >> 16 + except (TypeError, IndexError): + dlp_supp_opts = 0 + + logging.debug( + 'Dc._on_get_supported_success() - %s | %s: supported options = 0x%04X = %s', + self.id, + self.device, + dlp_supp_opts, + dlp_supp_opts_as_string(dlp_supp_opts), + ) + + if 'lsp' in inspect.signature(self._ctrl.discover).parameters: + lsp = nvme.NVMF_LOG_DISC_LSP_PLEO if dlp_supp_opts & nvme.NVMF_LOG_DISC_LID_PLEOS else 0 + self._get_log_op = gutil.AsyncTask( + self._on_get_log_success, self._on_get_log_fail, self._ctrl.discover, lsp + ) + else: + logging.warning( + '%s | %s - libnvme-%s does not support setting PLEO bit. Please upgrade.', + self.id, + self.device, + defs.LIBNVME_VERSION, + ) + self._get_log_op = gutil.AsyncTask(self._on_get_log_success, self._on_get_log_fail, self._ctrl.discover) + self._get_log_op.run_async() + else: + logging.debug( + 'Dc._on_get_supported_success() - %s | %s: Received event on dead object.', self.id, self.device + ) + + def _on_get_supported_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt): + '''@brief Function called when we fail to retrieve the supported log + page from the Discovery Controller. See self._get_supported_op object + for details. + ''' + if self._alive(): + logging.debug( + 'Dc._on_get_supported_fail() - %s | %s: %s. Retry in %s sec', + self.id, + self.device, + err, + Dc.GET_SUPPORTED_RETRY_RERIOD_SEC, + ) + if fail_cnt == 1: # Throttle the logs. Only print the first time the command fails + logging.error( + '%s | %s - Failed to Get supported log pages from Discovery Controller. %s', + self.id, + self.device, + err, + ) + op_obj.retry(Dc.GET_SUPPORTED_RETRY_RERIOD_SEC) + else: + logging.debug( + 'Dc._on_get_supported_fail() - %s | %s: Received event on dead object. %s', + self.id, + self.device, + err, + ) + op_obj.kill() + + # -------------------------------------------------------------------------- + def _on_get_log_success(self, op_obj: gutil.AsyncTask, data): # pylint: disable=unused-argument + '''@brief Function called when we successfully retrieve the log pages + from the Discovery Controller. See self._get_log_op object + for details. + ''' + if self._alive(): + # Note that for historical reasons too long to explain, the CDC may + # return invalid addresses ('0.0.0.0', '::', or ''). Those need to + # be filtered out. + referrals_before = self.referrals() + self._log_pages = ( + [ + {k.strip(): str(v).strip() for k, v in dictionary.items()} + for dictionary in data + if dictionary.get('traddr', '').strip() not in ('0.0.0.0', '::', '') + ] + if data + else list() + ) + logging.info( + '%s | %s - Received discovery log pages (num records=%s).', self.id, self.device, len(self._log_pages) + ) + referrals_after = self.referrals() + self._serv.log_pages_changed(self, self.device) + if referrals_after != referrals_before: + logging.debug( + 'Dc._on_get_log_success() - %s | %s: Referrals before = %s', + self.id, + self.device, + referrals_before, + ) + logging.debug( + 'Dc._on_get_log_success() - %s | %s: Referrals after = %s', + self.id, + self.device, + referrals_after, + ) + self._serv.referrals_changed() + else: + logging.debug( + 'Dc._on_get_log_success() - %s | %s: Received event on dead object.', self.id, self.device + ) + + def _on_get_log_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt): + '''@brief Function called when we fail to retrieve the log pages + from the Discovery Controller. See self._get_log_op object + for details. + ''' + if self._alive(): + logging.debug( + 'Dc._on_get_log_fail() - %s | %s: %s. Retry in %s sec', + self.id, + self.device, + err, + Dc.GET_LOG_PAGE_RETRY_RERIOD_SEC, + ) + if fail_cnt == 1: # Throttle the logs. Only print the first time the command fails + logging.error('%s | %s - Failed to retrieve log pages. %s', self.id, self.device, err) + op_obj.retry(Dc.GET_LOG_PAGE_RETRY_RERIOD_SEC) + else: + logging.debug( + 'Dc._on_get_log_fail() - %s | %s: Received event on dead object. %s', + self.id, + self.device, + err, + ) + op_obj.kill() + + +# ****************************************************************************** +class Ioc(Controller): + '''@brief This object establishes a connection to one I/O Controller.''' + + def __init__(self, stac, tid: trid.TID): + self._dlpe = None + super().__init__(tid, stac) + + def _find_existing_connection(self): + return self._udev.find_nvme_ioc_device(self.tid) + + def _on_aen(self, aen: int): + pass + + def _on_nvme_event(self, nvme_event): + pass + + def reload_hdlr(self): + '''@brief This is called when a "reload" signal is received.''' + if not self.connected() and self._retry_connect_tmr.time_remaining() == 0: + self._try_to_connect_deferred.schedule() + + @property + def eflags(self): + '''@brief Return the eflag field of the DLPE''' + return get_eflags(self._dlpe) + + @property + def ncc(self): + '''@brief Return Not Connected to CDC status''' + return get_ncc(self.eflags) + + def details(self) -> dict: + '''@brief return detailed debug info about this controller''' + details = super().details() + details['dlpe'] = str(self._dlpe) + details['dlpe.eflags.ncc'] = str(self.ncc) + return details + + def update_dlpe(self, dlpe): + '''@brief This method is called when a new DLPE associated + with this controller is received.''' + new_ncc = get_ncc(get_eflags(dlpe)) + old_ncc = self.ncc + self._dlpe = dlpe + + if old_ncc and not new_ncc: # NCC bit cleared? + if not self.connected(): + self._connect_attempts = 0 + self._try_to_connect_deferred.schedule() + + def _should_try_to_reconnect(self): + '''@brief This is used to determine when it's time to stop trying toi connect''' + max_connect_attempts = conf.SvcConf().connect_attempts_on_ncc if self.ncc else 0 + return max_connect_attempts == 0 or self._connect_attempts < max_connect_attempts diff --git a/staslib/defs.py b/staslib/defs.py new file mode 100644 index 0000000..5a50371 --- /dev/null +++ b/staslib/defs.py @@ -0,0 +1,51 @@ +# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger + +''' @brief This file gets automagically configured by meson at build time. +''' +import os +import sys +import shutil +import platform +from staslib.version import KernelVersion + +try: + import libnvme + + LIBNVME_VERSION = libnvme.__version__ +except (AttributeError, ModuleNotFoundError): + LIBNVME_VERSION = '?.?' + +VERSION = '@VERSION@' +LICENSE = '@LICENSE@' + +STACD_DBUS_NAME = '@STACD_DBUS_NAME@' +STACD_DBUS_PATH = '@STACD_DBUS_PATH@' + +STAFD_DBUS_NAME = '@STAFD_DBUS_NAME@' +STAFD_DBUS_PATH = '@STAFD_DBUS_PATH@' + +KERNEL_VERSION = KernelVersion(platform.release()) +KERNEL_IFACE_MIN_VERSION = KernelVersion('5.14') +KERNEL_TP8013_MIN_VERSION = KernelVersion('5.16') +KERNEL_HOSTKEY_MIN_VERSION = KernelVersion('5.20') +KERNEL_CTRLKEY_MIN_VERSION = KernelVersion('5.20') + +WELL_KNOWN_DISC_NQN = 'nqn.2014-08.org.nvmexpress.discovery' + +PROG_NAME = os.path.basename(sys.argv[0]) + +NVME_HOSTID = '/etc/nvme/hostid' +NVME_HOSTNQN = '/etc/nvme/hostnqn' +NVME_HOSTKEY = '/etc/nvme/hostkey' + +SYS_CONF_FILE = '/etc/stas/sys.conf' +STAFD_CONF_FILE = '/etc/stas/stafd.conf' +STACD_CONF_FILE = '/etc/stas/stacd.conf' + +SYSTEMCTL = shutil.which('systemctl') diff --git a/staslib/gutil.py b/staslib/gutil.py new file mode 100644 index 0000000..c40b80e --- /dev/null +++ b/staslib/gutil.py @@ -0,0 +1,418 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''This module provides utility functions/classes to provide easier to use +access to GLib/Gio/Gobject resources. +''' + +import logging +from gi.repository import Gio, GLib, GObject +from staslib import conf, iputil, trid + + +# ****************************************************************************** +class GTimer: + '''@brief Convenience class to wrap GLib timers''' + + def __init__( + self, interval_sec: float = 0, user_cback=lambda: GLib.SOURCE_REMOVE, *user_data, priority=GLib.PRIORITY_DEFAULT + ): # pylint: disable=keyword-arg-before-vararg + self._source = None + self._interval_sec = float(interval_sec) + self._user_cback = user_cback + self._user_data = user_data + self._priority = priority if priority is not None else GLib.PRIORITY_DEFAULT + + def _release_resources(self): + self.stop() + self._user_cback = None + self._user_data = None + + def kill(self): + '''@brief Used to release all resources associated with a timer.''' + self._release_resources() + + def __str__(self): + if self._source is not None: + return f'{self._interval_sec}s [{self.time_remaining()}s]' + + return f'{self._interval_sec}s [off]' + + def _callback(self, *_): + retval = self._user_cback(*self._user_data) + if retval == GLib.SOURCE_REMOVE: + self._source = None + return retval + + def stop(self): + '''@brief Stop timer''' + if self._source is not None: + self._source.destroy() + self._source = None + + def start(self, new_interval_sec: float = -1.0): + '''@brief Start (or restart) timer''' + if new_interval_sec >= 0: + self._interval_sec = float(new_interval_sec) + + if self._source is not None: + self._source.set_ready_time( + self._source.get_time() + (self._interval_sec * 1000000) + ) # ready time is in micro-seconds (monotonic time) + else: + if self._interval_sec.is_integer(): + self._source = GLib.timeout_source_new_seconds(int(self._interval_sec)) # seconds resolution + else: + self._source = GLib.timeout_source_new(self._interval_sec * 1000.0) # mili-seconds resolution + + self._source.set_priority(self._priority) + self._source.set_callback(self._callback) + self._source.attach() + + def clear(self): + '''@brief Make timer expire now. The callback function + will be invoked immediately by the main loop. + ''' + if self._source is not None: + self._source.set_ready_time(0) # Expire now! + + def set_callback(self, user_cback, *user_data): + '''@brief set the callback function to invoke when timer expires''' + self._user_cback = user_cback + self._user_data = user_data + + def set_timeout(self, new_interval_sec: float): + '''@brief set the timer's duration''' + if new_interval_sec >= 0: + self._interval_sec = float(new_interval_sec) + + def get_timeout(self): + '''@brief get the timer's duration''' + return self._interval_sec + + def time_remaining(self) -> float: + '''@brief Get how much time remains on a timer before it fires.''' + if self._source is not None: + delta_us = self._source.get_ready_time() - self._source.get_time() # monotonic time in micro-seconds + if delta_us > 0: + return delta_us / 1000000.0 + + return 0 + + +# ****************************************************************************** +class NameResolver: # pylint: disable=too-few-public-methods + '''@brief DNS resolver to convert host names to IP addresses.''' + + def __init__(self): + self._resolver = Gio.Resolver.get_default() + + def resolve_ctrl_async(self, cancellable, controllers_in: list, callback): + '''@brief The traddr fields may specify a hostname instead of an IP + address. We need to resolve all the host names to addresses. + Resolving hostnames may take a while as a DNS server may need + to be contacted. For that reason, we're using async APIs with + callbacks to resolve all the hostnames. + + The callback @callback will be called once all hostnames have + been resolved. + + @param controllers: List of trid.TID + ''' + pending_resolution_count = 0 + controllers_out = [] + service_conf = conf.SvcConf() + + def addr_resolved(resolver, result, controller): + try: + addresses = resolver.lookup_by_name_finish(result) # List of Gio.InetAddress objects + + except GLib.GError as err: + # We don't need to report "cancellation" errors. + if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED): + # pylint: disable=no-member + logging.debug('NameResolver.resolve_ctrl_async() - %s %s', err.message, controller) + else: + logging.error('%s', err.message) # pylint: disable=no-member + + # if err.matches(Gio.resolver_error_quark(), Gio.ResolverError.TEMPORARY_FAILURE): + # elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.NOT_FOUND): + # elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.INTERNAL): + + else: + traddr = None + + # If multiple addresses are returned (which is often the case), + # prefer IPv4 addresses over IPv6. + if 4 in service_conf.ip_family: + for address in addresses: + # There may be multiple IPv4 addresses. Pick 1st one. + if address.get_family() == Gio.SocketFamily.IPV4: + traddr = address.to_string() + break + + if traddr is None and 6 in service_conf.ip_family: + for address in addresses: + # There may be multiple IPv6 addresses. Pick 1st one. + if address.get_family() == Gio.SocketFamily.IPV6: + traddr = address.to_string() + break + + if traddr is not None: + logging.debug( + 'NameResolver.resolve_ctrl_async() - resolved \'%s\' -> %s', controller.traddr, traddr + ) + cid = controller.as_dict() + cid['traddr'] = traddr + nonlocal controllers_out + controllers_out.append(trid.TID(cid)) + + # Invoke callback after all hostnames have been resolved + nonlocal pending_resolution_count + pending_resolution_count -= 1 + if pending_resolution_count == 0: + callback(controllers_out) + + for controller in controllers_in: + if controller.transport in ('tcp', 'rdma'): + hostname_or_addr = controller.traddr + if not hostname_or_addr: + logging.error('Invalid traddr: %s', controller) + else: + # Try to convert to an ipaddress object. If this + # succeeds, then we don't need to call the resolver. + ip = iputil.get_ipaddress_obj(hostname_or_addr) + if ip is None: + logging.debug('NameResolver.resolve_ctrl_async() - resolving \'%s\'', hostname_or_addr) + pending_resolution_count += 1 + self._resolver.lookup_by_name_async(hostname_or_addr, cancellable, addr_resolved, controller) + elif ip.version in service_conf.ip_family: + controllers_out.append(controller) + else: + logging.warning( + 'Excluding configured IP address %s based on "ip-family" setting', hostname_or_addr + ) + else: + controllers_out.append(controller) + + if pending_resolution_count == 0: # No names are pending asynchronous resolution + callback(controllers_out) + + +# ****************************************************************************** +class _TaskRunner(GObject.Object): + '''@brief This class allows running methods asynchronously in a thread.''' + + def __init__(self, user_function, *user_args): + '''@param user_function: function to run inside a thread + @param user_args: arguments passed to @user_function + ''' + super().__init__() + self._user_function = user_function + self._user_args = user_args + + def communicate(self, cancellable, cb_function, *cb_args): + '''@param cancellable: A Gio.Cancellable object that can be used to + cancel an in-flight async command. + @param cb_function: User callback function to call when the async + command has completed. The callback function + will be passed these arguments: + + (runner, result, *cb_args) + + Where: + runner: This _TaskRunner object instance + result: A GObject.Object instance that contains the result + cb_args: The cb_args arguments passed to communicate() + + @param cb_args: User arguments to pass to @cb_function + ''' + + def in_thread_exec(task, self, task_data, cancellable): # pylint: disable=unused-argument + if task.return_error_if_cancelled(): + return # Bail out if task has been cancelled + + try: + value = GObject.Object() + value.result = self._user_function(*self._user_args) + task.return_value(value) + except Exception as ex: # pylint: disable=broad-except + task.return_error(GLib.Error(message=str(ex), domain=type(ex).__name__)) + + task = Gio.Task.new(self, cancellable, cb_function, *cb_args) + task.set_return_on_cancel(False) + task.run_in_thread(in_thread_exec) + return task + + def communicate_finish(self, result): # pylint: disable=no-self-use + '''@brief Use this function in your callback (see @cb_function) to + extract data from the result object. + + @return On success (True, data, None), + On failure (False, None, err: GLib.Error) + ''' + try: + success, value = result.propagate_value() + return success, value.result, None + except GLib.Error as err: + return False, None, err + + +# ****************************************************************************** +class AsyncTask: # pylint: disable=too-many-instance-attributes + '''Object used to manage an asynchronous GLib operation. The operation + can be cancelled or retried. + ''' + + def __init__(self, on_success_callback, on_failure_callback, operation, *op_args): + '''@param on_success_callback: Callback method invoked when @operation completes successfully + @param on_failure_callback: Callback method invoked when @operation fails + @param operation: Operation (i.e. a function) to execute asynchronously + @param op_args: Arguments passed to operation + ''' + self._cancellable = Gio.Cancellable() + self._operation = operation + self._op_args = op_args + self._success_cb = on_success_callback + self._fail_cb = on_failure_callback + self._retry_tmr = None + self._errmsg = None + self._task = None + self._fail_cnt = 0 + + def _release_resources(self): + if self._alive(): + self._cancellable.cancel() + + if self._retry_tmr is not None: + self._retry_tmr.kill() + + self._operation = None + self._op_args = None + self._success_cb = None + self._fail_cb = None + self._retry_tmr = None + self._errmsg = None + self._task = None + self._fail_cnt = None + self._cancellable = None + + def __str__(self): + return str(self.as_dict()) + + def as_dict(self): + '''Return object members as a dictionary''' + info = { + 'fail count': self._fail_cnt, + 'completed': self._task.get_completed(), + 'alive': self._alive(), + } + + if self._retry_tmr: + info['retry timer'] = str(self._retry_tmr) + + if self._errmsg: + info['error'] = self._errmsg + + return info + + def _alive(self): + return self._cancellable and not self._cancellable.is_cancelled() + + def completed(self): + '''@brief Returns True if the task has completed, False otherwise.''' + return self._task is not None and self._task.get_completed() + + def cancel(self): + '''@brief cancel async operation''' + if self._alive(): + self._cancellable.cancel() + + def kill(self): + '''@brief kill and clean up this object''' + self._release_resources() + + def run_async(self, *args): + '''@brief + Method used to initiate an asynchronous operation with the + Controller. When the operation completes (or fails) the + callback method @_on_operation_complete() will be invoked. + ''' + runner = _TaskRunner(self._operation, *self._op_args) + self._task = runner.communicate(self._cancellable, self._on_operation_complete, *args) + + def retry(self, interval_sec, *args): + '''@brief Tell this object that the async operation is to be retried + in @interval_sec seconds. + + ''' + if self._retry_tmr is None: + self._retry_tmr = GTimer() + self._retry_tmr.set_callback(self._on_retry_timeout, *args) + self._retry_tmr.start(interval_sec) + + def _on_retry_timeout(self, *args): + '''@brief + When an operation fails, the application has the option to + retry at a later time by calling the retry() method. The + retry() method starts a timer at the end of which the operation + will be executed again. This is the method that is called when + the timer expires. + ''' + if self._alive(): + self.run_async(*args) + return GLib.SOURCE_REMOVE + + def _on_operation_complete(self, runner, result, *args): + '''@brief + This callback method is invoked when the operation with the + Controller has completed (be it successful or not). + ''' + # The operation might have been cancelled. + # Only proceed if it hasn't been cancelled. + if self._operation is None or not self._alive(): + return + + success, data, err = runner.communicate_finish(result) + + if success: + self._errmsg = None + self._fail_cnt = 0 + self._success_cb(self, data, *args) + else: + self._errmsg = str(err) + self._fail_cnt += 1 + self._fail_cb(self, err, self._fail_cnt, *args) + + +# ****************************************************************************** +class Deferred: + '''Implement a deferred function call. A deferred is a function that gets + added to the main loop to be executed during the next idle slot.''' + + def __init__(self, func, *user_data): + self._source = None + self._func = func + self._user_data = user_data + + def schedule(self): + '''Schedule the function to be called by the main loop. If the + function is already scheduled, then do nothing''' + if not self.is_scheduled(): + srce_id = GLib.idle_add(self._func, *self._user_data) + self._source = GLib.main_context_default().find_source_by_id(srce_id) + + def is_scheduled(self): + '''Check if deferred is currently schedules to run''' + return self._source and not self._source.is_destroyed() + + def cancel(self): + '''Remove deferred from main loop''' + if self.is_scheduled(): + self._source.destroy() + self._source = None diff --git a/staslib/iputil.py b/staslib/iputil.py new file mode 100644 index 0000000..9199a49 --- /dev/null +++ b/staslib/iputil.py @@ -0,0 +1,169 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger + +'''A collection of IP address and network interface utilities''' + +import socket +import logging +import ipaddress +from staslib import conf + +RTM_NEWADDR = 20 +RTM_GETADDR = 22 +NLM_F_REQUEST = 0x01 +NLM_F_ROOT = 0x100 +NLMSG_DONE = 3 +IFLA_ADDRESS = 1 +NLMSGHDR_SZ = 16 +IFADDRMSG_SZ = 8 +RTATTR_SZ = 4 + +# Netlink request (Get address command) +GETADDRCMD = ( + # BEGIN: struct nlmsghdr + b'\0' * 4 # nlmsg_len (placeholder - actual length calculated below) + + (RTM_GETADDR).to_bytes(2, byteorder='little', signed=False) # nlmsg_type + + (NLM_F_REQUEST | NLM_F_ROOT).to_bytes(2, byteorder='little', signed=False) # nlmsg_flags + + b'\0' * 2 # nlmsg_seq + + b'\0' * 2 # nlmsg_pid + # END: struct nlmsghdr + + b'\0' * 8 # struct ifaddrmsg +) +GETADDRCMD = len(GETADDRCMD).to_bytes(4, byteorder='little') + GETADDRCMD[4:] # nlmsg_len + + +# ****************************************************************************** +def get_ipaddress_obj(ipaddr): + '''@brief Return a IPv4Address or IPv6Address depending on whether @ipaddr + is a valid IPv4 or IPv6 address. Return None otherwise.''' + try: + ip = ipaddress.ip_address(ipaddr) + except ValueError: + return None + + return ip + + +# ****************************************************************************** +def _data_matches_ip(data_family, data, ip): + if data_family == socket.AF_INET: + try: + other_ip = ipaddress.IPv4Address(data) + except ValueError: + return False + if ip.version == 6: + ip = ip.ipv4_mapped + elif data_family == socket.AF_INET6: + try: + other_ip = ipaddress.IPv6Address(data) + except ValueError: + return False + if ip.version == 4: + other_ip = other_ip.ipv4_mapped + else: + return False + + return other_ip == ip + + +# ****************************************************************************** +def iface_of(src_addr): + '''@brief Find the interface that has src_addr as one of its assigned IP addresses. + @param src_addr: The IP address to match + @type src_addr: Instance of ipaddress.IPv4Address or ipaddress.IPv6Address + ''' + with socket.socket(socket.AF_NETLINK, socket.SOCK_RAW) as sock: + sock.sendall(GETADDRCMD) + nlmsg = sock.recv(8192) + nlmsg_idx = 0 + while True: + if nlmsg_idx >= len(nlmsg): + nlmsg += sock.recv(8192) + + nlmsg_type = int.from_bytes(nlmsg[nlmsg_idx + 4 : nlmsg_idx + 6], byteorder='little', signed=False) + if nlmsg_type == NLMSG_DONE: + break + + if nlmsg_type != RTM_NEWADDR: + break + + nlmsg_len = int.from_bytes(nlmsg[nlmsg_idx : nlmsg_idx + 4], byteorder='little', signed=False) + if nlmsg_len % 4: # Is msg length not a multiple of 4? + break + + ifaddrmsg_indx = nlmsg_idx + NLMSGHDR_SZ + ifa_family = nlmsg[ifaddrmsg_indx] + ifa_index = int.from_bytes(nlmsg[ifaddrmsg_indx + 4 : ifaddrmsg_indx + 8], byteorder='little', signed=False) + + rtattr_indx = ifaddrmsg_indx + IFADDRMSG_SZ + while rtattr_indx < (nlmsg_idx + nlmsg_len): + rta_len = int.from_bytes(nlmsg[rtattr_indx : rtattr_indx + 2], byteorder='little', signed=False) + rta_type = int.from_bytes(nlmsg[rtattr_indx + 2 : rtattr_indx + 4], byteorder='little', signed=False) + if rta_type == IFLA_ADDRESS: + data = nlmsg[rtattr_indx + RTATTR_SZ : rtattr_indx + rta_len] + if _data_matches_ip(ifa_family, data, src_addr): + return socket.if_indextoname(ifa_index) + + rta_len = (rta_len + 3) & ~3 # Round up to multiple of 4 + rtattr_indx += rta_len # Move to next rtattr + + nlmsg_idx += nlmsg_len # Move to next Netlink message + + return '' + + +# ****************************************************************************** +def get_interface(src_addr): + '''Get interface for given source address + @param src_addr: The source address + @type src_addr: str + ''' + if not src_addr: + return '' + + src_addr = src_addr.split('%')[0] # remove scope-id (if any) + src_addr = get_ipaddress_obj(src_addr) + return '' if src_addr is None else iface_of(src_addr) + + +# ****************************************************************************** +def remove_invalid_addresses(controllers: list): + '''@brief Remove controllers with invalid addresses from the list of controllers. + @param controllers: List of TIDs + ''' + service_conf = conf.SvcConf() + valid_controllers = list() + for controller in controllers: + if controller.transport in ('tcp', 'rdma'): + # Let's make sure that traddr is + # syntactically a valid IPv4 or IPv6 address. + ip = get_ipaddress_obj(controller.traddr) + if ip is None: + logging.warning('%s IP address is not valid', controller) + continue + + # Let's make sure the address family is enabled. + if ip.version not in service_conf.ip_family: + logging.debug( + '%s ignored because IPv%s is disabled in %s', + controller, + ip.version, + service_conf.conf_file, + ) + continue + + valid_controllers.append(controller) + + elif controller.transport in ('fc', 'loop'): + # At some point, need to validate FC addresses as well... + valid_controllers.append(controller) + + else: + logging.warning('Invalid transport %s', controller.transport) + + return valid_controllers diff --git a/staslib/log.py b/staslib/log.py new file mode 100644 index 0000000..9622e98 --- /dev/null +++ b/staslib/log.py @@ -0,0 +1,53 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''nvme-stas logging module''' + +import sys +import logging +from staslib import defs + + +def init(syslog: bool): + '''Init log module + @param syslog: True to send messages to the syslog, + False to send messages to stdout. + ''' + log = logging.getLogger() + log.propagate = False + + if syslog: + try: + # Try journal logger first + import systemd.journal # pylint: disable=import-outside-toplevel + + handler = systemd.journal.JournalHandler(SYSLOG_IDENTIFIER=defs.PROG_NAME) + except ModuleNotFoundError: + # Go back to standard syslog handler + from logging.handlers import SysLogHandler # pylint: disable=import-outside-toplevel + + handler = SysLogHandler(address="/dev/log") + handler.setFormatter(logging.Formatter(f'{defs.PROG_NAME}: %(message)s')) + else: + # Log to stdout + handler = logging.StreamHandler(stream=sys.stdout) + + log.addHandler(handler) + log.setLevel(logging.INFO if syslog else logging.DEBUG) + + +def level() -> str: + '''@brief return current log level''' + logger = logging.getLogger() + return str(logging.getLevelName(logger.getEffectiveLevel())) + + +def set_level_from_tron(tron): + '''Set log level based on TRON''' + logger = logging.getLogger() + logger.setLevel(logging.DEBUG if tron else logging.INFO) diff --git a/staslib/meson.build b/staslib/meson.build new file mode 100644 index 0000000..eb006f0 --- /dev/null +++ b/staslib/meson.build @@ -0,0 +1,60 @@ +# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# + +files_to_configure = [ 'defs.py', '__init__.py', 'stafd.idl', 'stacd.idl' ] +configured_files = [] +foreach file : files_to_configure + configured_files += configure_file( + input: file, + output: file, + configuration: conf + ) +endforeach + +files_to_copy = [ + 'avahi.py', + 'conf.py', + 'ctrl.py', + 'gutil.py', + 'iputil.py', + 'log.py', + 'service.py', + 'singleton.py', + 'stas.py', + 'timeparse.py', + 'trid.py', + 'udev.py', + 'version.py' +] +copied_files = [] +foreach file : files_to_copy + copied_files += configure_file( + input: file, + output: file, + copy: true, + ) +endforeach + +files_to_install = copied_files + configured_files +python3.install_sources( + files_to_install, + pure: true, + subdir: 'staslib', +) + +#=============================================================================== +# Make a list of modules to lint +skip = ['stafd.idl', 'stacd.idl'] +foreach file: files_to_install + fname = fs.name('@0@'.format(file)) + if fname not in skip + modules_to_lint += file + endif +endforeach + diff --git a/staslib/service.py b/staslib/service.py new file mode 100644 index 0000000..ce4769d --- /dev/null +++ b/staslib/service.py @@ -0,0 +1,878 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''This module defines the base Service object from +which the Staf and the Stac objects are derived.''' + +import json +import logging +import pathlib +import subprocess +from itertools import filterfalse +import dasbus.error +import dasbus.client.observer +import dasbus.client.proxy + +from gi.repository import GLib +from systemd.daemon import notify as sd_notify +from staslib import avahi, conf, ctrl, defs, gutil, iputil, stas, timeparse, trid, udev + + +# ****************************************************************************** +class CtrlTerminator: + '''The Controller Terminator is used to gracefully disconnect from + controllers. All communications with controllers is handled by the kernel. + Once we make a request to the kernel to perform an operation (e.g. connect), + we have to wait for it to complete before requesting another operation. This + is particularly important when we want to disconnect from a controller while + there are pending operations, especially a pending connect. + + The "connect" operation is especially unpredictable because all connect + requests are made through the blocking interface "/dev/nvme-fabrics". This + means that once a "connect" operation has been submitted, and depending on + how many connect requests are made concurrently, it can take several seconds + for a connect to be processed by the kernel. + + While connect or other operations are being performed, it is possible + that a disconnect may be requested (e.g. someone or something changes the + configuration to remove a controller). Because it is not possible to + terminate a pending operation request, we have to wait for it to complete + before we can issue a disconnect. Failure to do that will result in + operations being performed by the kernel in reverse order. For example, + a disconnect may be executed before a pending connect has had a chance to + complete. And this will result in controllers that are supposed to be + disconnected to be connected without nvme-stas knowing about it. + + The Controller Terminator is used when we need to disconnect from a + controller. It will make sure that there are no pending operations before + issuing a disconnect. + ''' + + DISPOSAL_AUDIT_PERIOD_SEC = 30 + + def __init__(self): + self._udev = udev.UDEV + self._controllers = list() # The list of controllers to dispose of. + self._audit_tmr = gutil.GTimer(self.DISPOSAL_AUDIT_PERIOD_SEC, self._on_disposal_check) + + def dispose(self, controller: ctrl.Controller, on_controller_removed_cb, keep_connection: bool): + '''Invoked by a service (stafd or stacd) to dispose of a controller''' + if controller.all_ops_completed(): + logging.debug( + 'CtrlTerminator.dispose() - %s | %s: Invoke disconnect()', controller.tid, controller.device + ) + controller.disconnect(on_controller_removed_cb, keep_connection) + else: + logging.debug( + 'CtrlTerminator.dispose() - %s | %s: Add controller to garbage disposal', + controller.tid, + controller.device, + ) + self._controllers.append((controller, keep_connection, on_controller_removed_cb, controller.tid)) + + self._udev.register_for_action_events('add', self._on_kernel_events) + self._udev.register_for_action_events('remove', self._on_kernel_events) + + if self._audit_tmr.time_remaining() == 0: + self._audit_tmr.start() + + def pending_disposal(self, tid): + '''Check whether @tid is pending disposal''' + for controller in self._controllers: + if controller.tid == tid: + return True + return False + + def info(self): + '''@brief Get info about this object (used for debug)''' + info = { + 'terminator.audit timer': str(self._audit_tmr), + } + for controller, _, _, tid in self._controllers: + info[f'terminator.controller.{tid}'] = str(controller.info()) + return info + + def kill(self): + '''Stop Controller Terminator and release resources.''' + self._audit_tmr.stop() + self._audit_tmr = None + + if self._udev: + self._udev.unregister_for_action_events('add', self._on_kernel_events) + self._udev.unregister_for_action_events('remove', self._on_kernel_events) + self._udev = None + + for controller, keep_connection, on_controller_removed_cb, _ in self._controllers: + controller.disconnect(on_controller_removed_cb, keep_connection) + + self._controllers.clear() + + def _on_kernel_events(self, udev_obj): + logging.debug('CtrlTerminator._on_kernel_events() - %s event received', udev_obj.action) + self._disposal_check() + + def _on_disposal_check(self, *_user_data): + logging.debug('CtrlTerminator._on_disposal_check()- Periodic audit') + return GLib.SOURCE_REMOVE if self._disposal_check() else GLib.SOURCE_CONTINUE + + @staticmethod + def _keep_or_terminate(args): + '''Return False if controller is to be kept. True if controller + was terminated and can be removed from the list.''' + controller, keep_connection, on_controller_removed_cb, tid = args + if controller.all_ops_completed(): + logging.debug( + 'CtrlTerminator._keep_or_terminate()- %s | %s: Disconnecting controller', + tid, + controller.device, + ) + controller.disconnect(on_controller_removed_cb, keep_connection) + return True + + return False + + def _disposal_check(self): + # Iterate over the list, terminating (disconnecting) those controllers + # that have no pending operations, and remove those controllers from the + # list (only keep controllers that still have operations pending). + self._controllers[:] = filterfalse(self._keep_or_terminate, self._controllers) + disposal_complete = len(self._controllers) == 0 + + if disposal_complete: + logging.debug('CtrlTerminator._disposal_check() - Disposal complete') + self._audit_tmr.stop() + self._udev.unregister_for_action_events('add', self._on_kernel_events) + self._udev.unregister_for_action_events('remove', self._on_kernel_events) + else: + self._audit_tmr.start() # Restart timer + + return disposal_complete + + +# ****************************************************************************** +class Service(stas.ServiceABC): + '''@brief Base class used to manage a STorage Appliance Service''' + + def __init__(self, args, default_conf, reload_hdlr): + self._udev = udev.UDEV + self._terminator = CtrlTerminator() + + super().__init__(args, default_conf, reload_hdlr) + + def _release_resources(self): + logging.debug('Service._release_resources()') + super()._release_resources() + + if self._terminator: + self._terminator.kill() + + self._udev = None + self._terminator = None + + def _disconnect_all(self): + '''Tell all controller objects to disconnect''' + keep_connections = self._keep_connections_on_exit() + controllers = self._controllers.values() + logging.debug( + 'Service._stop_hdlr() - Controller count = %s, keep_connections = %s', + len(controllers), + keep_connections, + ) + for controller in controllers: + self._terminator.dispose(controller, self._on_final_disconnect, keep_connections) + + def info(self) -> dict: + '''@brief Get the status info for this object (used for debug)''' + info = super().info() + if self._terminator: + info.update(self._terminator.info()) + return info + + @stas.ServiceABC.tron.setter + def tron(self, value): + '''@brief Set Trace ON property''' + super(__class__, self.__class__).tron.__set__(self, value) + + +# ****************************************************************************** +class Stac(Service): + '''STorage Appliance Connector (STAC)''' + + CONF_STABILITY_LONG_SOAK_TIME_SEC = 10 # pylint: disable=invalid-name + ADD_EVENT_SOAK_TIME_SEC = 1 + + def __init__(self, args, dbus): + default_conf = { + ('Global', 'tron'): False, + ('Global', 'hdr-digest'): False, + ('Global', 'data-digest'): False, + ('Global', 'kato'): None, # None to let the driver decide the default + ('Global', 'nr-io-queues'): None, # None to let the driver decide the default + ('Global', 'nr-write-queues'): None, # None to let the driver decide the default + ('Global', 'nr-poll-queues'): None, # None to let the driver decide the default + ('Global', 'queue-size'): None, # None to let the driver decide the default + ('Global', 'reconnect-delay'): None, # None to let the driver decide the default + ('Global', 'ctrl-loss-tmo'): None, # None to let the driver decide the default + ('Global', 'disable-sqflow'): None, # None to let the driver decide the default + ('Global', 'ignore-iface'): False, + ('Global', 'ip-family'): (4, 6), + ('Controllers', 'controller'): list(), + ('Controllers', 'exclude'): list(), + ('I/O controller connection management', 'disconnect-scope'): 'only-stas-connections', + ('I/O controller connection management', 'disconnect-trtypes'): ['tcp'], + ('I/O controller connection management', 'connect-attempts-on-ncc'): 0, + } + + super().__init__(args, default_conf, self._reload_hdlr) + + self._add_event_soak_tmr = gutil.GTimer(self.ADD_EVENT_SOAK_TIME_SEC, self._on_add_event_soaked) + + self._config_connections_audit() + + # Create the D-Bus instance. + self._config_dbus(dbus, defs.STACD_DBUS_NAME, defs.STACD_DBUS_PATH) + + # Connect to STAF D-Bus interface + self._staf = None + self._staf_watcher = dasbus.client.observer.DBusObserver(self._sysbus, defs.STAFD_DBUS_NAME) + self._staf_watcher.service_available.connect(self._connect_to_staf) + self._staf_watcher.service_unavailable.connect(self._disconnect_from_staf) + self._staf_watcher.connect_once_available() + + def _release_resources(self): + logging.debug('Stac._release_resources()') + + if self._add_event_soak_tmr: + self._add_event_soak_tmr.kill() + + if self._udev: + self._udev.unregister_for_action_events('add', self._on_add_event) + + self._destroy_staf_comlink(self._staf_watcher) + if self._staf_watcher is not None: + self._staf_watcher.disconnect() + + super()._release_resources() + + self._staf = None + self._staf_watcher = None + self._add_event_soak_tmr = None + + def _dump_last_known_config(self, controllers): + config = list(controllers.keys()) + logging.debug('Stac._dump_last_known_config() - IOC count = %s', len(config)) + self._write_lkc(config) + + def _load_last_known_config(self): + config = self._read_lkc() or list() + logging.debug('Stac._load_last_known_config() - IOC count = %s', len(config)) + + controllers = {} + for tid in config: + # Only create Ioc objects if there is already a connection in the kernel + # First, regenerate the TID (in case of soft. upgrade and TID object + # has changed internally) + tid = trid.TID(tid.as_dict()) + if udev.UDEV.find_nvme_ioc_device(tid) is not None: + controllers[tid] = ctrl.Ioc(self, tid) + + return controllers + + def _audit_all_connections(self, tids): + '''A host should only connect to I/O controllers that have been zoned + for that host or a manual "controller" entry exists in stacd.conf. + A host should disconnect from an I/O controller when that I/O controller + is removed from the zone or a "controller" entry is manually removed + from stacd.conf. stacd will audit connections if "disconnect-scope= + all-connections-matching-disconnect-trtypes". stacd will delete any + connection that is not supposed to exist. + ''' + logging.debug('Stac._audit_all_connections() - tids = %s', tids) + num_controllers = len(self._controllers) + for tid in tids: + if tid not in self._controllers and not self._terminator.pending_disposal(tid): + self._controllers[tid] = ctrl.Ioc(self, tid) + + if num_controllers != len(self._controllers): + self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC) + + def _on_add_event(self, udev_obj): + '''@brief This function is called when a "add" event is received from + the kernel for an NVMe device. This is used to trigger an audit and make + sure that the connection to an I/O controller is allowed. + + WARNING: There is a race condition with the "add" event from the kernel. + The kernel sends the "add" event a bit early and the sysfs attributes + associated with the nvme object are not always fully initialized. + To workaround this problem we use a soaking timer to give time for the + sysfs attributes to stabilize. + ''' + logging.debug('Stac._on_add_event(() - Received "add" event: %s', udev_obj.sys_name) + self._add_event_soak_tmr.start() + + def _on_add_event_soaked(self): + '''@brief After the add event has been soaking for ADD_EVENT_SOAK_TIME_SEC + seconds, we can audit the connections. + ''' + if self._alive(): + svc_conf = conf.SvcConf() + if svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes': + self._audit_all_connections(self._udev.get_nvme_ioc_tids(svc_conf.disconnect_trtypes)) + return GLib.SOURCE_REMOVE + + def _config_connections_audit(self): + '''This function checks the "disconnect_scope" parameter to determine + whether audits should be performed. Audits are enabled when + "disconnect_scope == all-connections-matching-disconnect-trtypes". + ''' + svc_conf = conf.SvcConf() + if svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes': + if not self._udev.is_action_cback_registered('add', self._on_add_event): + self._udev.register_for_action_events('add', self._on_add_event) + self._audit_all_connections(self._udev.get_nvme_ioc_tids(svc_conf.disconnect_trtypes)) + else: + self._udev.unregister_for_action_events('add', self._on_add_event) + + def _keep_connections_on_exit(self): + '''@brief Determine whether connections should remain when the + process exits. + ''' + return True + + def _reload_hdlr(self): + '''@brief Reload configuration file. This is triggered by the SIGHUP + signal, which can be sent with "systemctl reload stacd". + ''' + if not self._alive(): + return GLib.SOURCE_REMOVE + + sd_notify('RELOADING=1') + service_cnf = conf.SvcConf() + service_cnf.reload() + self.tron = service_cnf.tron + self._config_connections_audit() + self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC) + + for controller in self._controllers.values(): + controller.reload_hdlr() + + sd_notify('READY=1') + return GLib.SOURCE_CONTINUE + + def _get_log_pages_from_stafd(self): + if self._staf: + try: + return json.loads(self._staf.get_all_log_pages(True)) + except dasbus.error.DBusError: + pass + + return list() + + def _config_ctrls_finish(self, configured_ctrl_list: list): # pylint: disable=too-many-locals + '''@param configured_ctrl_list: list of TIDs''' + # This is a callback function, which may be called after the service + # has been signalled to stop. So let's make sure the service is still + # alive and well before continuing. + if not self._alive(): + logging.debug('Stac._config_ctrls_finish() - Exiting because service is no longer alive') + return + + # Eliminate invalid entries from stacd.conf "controller list". + configured_ctrl_list = [ + tid for tid in configured_ctrl_list if '' not in (tid.transport, tid.traddr, tid.trsvcid, tid.subsysnqn) + ] + + logging.debug('Stac._config_ctrls_finish() - configured_ctrl_list = %s', configured_ctrl_list) + + discovered_ctrls = dict() + for staf_data in self._get_log_pages_from_stafd(): + host_traddr = staf_data['discovery-controller']['host-traddr'] + host_iface = staf_data['discovery-controller']['host-iface'] + for dlpe in staf_data['log-pages']: + if dlpe.get('subtype') == 'nvme': # eliminate discovery controllers + tid = stas.tid_from_dlpe(dlpe, host_traddr, host_iface) + discovered_ctrls[tid] = dlpe + + discovered_ctrl_list = list(discovered_ctrls.keys()) + logging.debug('Stac._config_ctrls_finish() - discovered_ctrl_list = %s', discovered_ctrl_list) + + controllers = stas.remove_excluded(configured_ctrl_list + discovered_ctrl_list) + controllers = iputil.remove_invalid_addresses(controllers) + + new_controller_tids = set(controllers) + cur_controller_tids = set(self._controllers.keys()) + controllers_to_add = new_controller_tids - cur_controller_tids + controllers_to_del = cur_controller_tids - new_controller_tids + + logging.debug('Stac._config_ctrls_finish() - controllers_to_add = %s', list(controllers_to_add)) + logging.debug('Stac._config_ctrls_finish() - controllers_to_del = %s', list(controllers_to_del)) + + svc_conf = conf.SvcConf() + no_disconnect = svc_conf.disconnect_scope == 'no-disconnect' + match_trtypes = svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes' + logging.debug( + 'Stac._config_ctrls_finish() - no_disconnect=%s, match_trtypes=%s, svc_conf.disconnect_trtypes=%s', + no_disconnect, + match_trtypes, + svc_conf.disconnect_trtypes, + ) + for tid in controllers_to_del: + controller = self._controllers.pop(tid, None) + if controller is not None: + keep_connection = no_disconnect or (match_trtypes and tid.transport not in svc_conf.disconnect_trtypes) + self._terminator.dispose(controller, self.remove_controller, keep_connection) + + for tid in controllers_to_add: + self._controllers[tid] = ctrl.Ioc(self, tid) + + for tid, controller in self._controllers.items(): + if tid in discovered_ctrls: + dlpe = discovered_ctrls[tid] + controller.update_dlpe(dlpe) + + self._dump_last_known_config(self._controllers) + + def _connect_to_staf(self, _): + '''@brief Hook up DBus signal handlers for signals from stafd.''' + if not self._alive(): + return + + try: + self._staf = self._sysbus.get_proxy(defs.STAFD_DBUS_NAME, defs.STAFD_DBUS_PATH) + self._staf.log_pages_changed.connect(self._log_pages_changed) + self._staf.dc_removed.connect(self._dc_removed) + self._cfg_soak_tmr.start() + + # Make sure timer is set back to its normal value. + self._cfg_soak_tmr.set_timeout(self.CONF_STABILITY_SOAK_TIME_SEC) + logging.debug('Stac._connect_to_staf() - Connected to staf') + except dasbus.error.DBusError: + logging.error('Failed to connect to staf') + + def _destroy_staf_comlink(self, watcher): # pylint: disable=unused-argument + if self._staf: + self._staf.log_pages_changed.disconnect(self._log_pages_changed) + self._staf.dc_removed.disconnect(self._dc_removed) + dasbus.client.proxy.disconnect_proxy(self._staf) + self._staf = None + + def _disconnect_from_staf(self, watcher): + self._destroy_staf_comlink(watcher) + + # When we lose connectivity with stafd, the most logical explanation + # is that stafd restarted. In that case, it may take some time for stafd + # to re-populate its log pages cache. So let's give stafd plenty of time + # to update its log pages cache and send log pages change notifications + # before triggering a stacd re-config. We do this by momentarily + # increasing the config soak timer to a longer period. + if self._cfg_soak_tmr: + self._cfg_soak_tmr.set_timeout(self.CONF_STABILITY_LONG_SOAK_TIME_SEC) + + logging.debug('Stac._disconnect_from_staf() - Disconnected from staf') + + def _log_pages_changed( # pylint: disable=too-many-arguments + self, transport, traddr, trsvcid, host_traddr, host_iface, subsysnqn, device + ): + if not self._alive(): + return + + logging.debug( + 'Stac._log_pages_changed() - transport=%s, traddr=%s, trsvcid=%s, host_traddr=%s, host_iface=%s, subsysnqn=%s, device=%s', + transport, + traddr, + trsvcid, + host_traddr, + host_iface, + subsysnqn, + device, + ) + if self._cfg_soak_tmr: + self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC) + + def _dc_removed(self): + if not self._alive(): + return + + logging.debug('Stac._dc_removed()') + if self._cfg_soak_tmr: + self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC) + + +# ****************************************************************************** +# Only keep legacy FC rule (not even sure this is still in use today, but just to be safe). +UDEV_RULE_OVERRIDE = r''' +ACTION=="change", SUBSYSTEM=="fc", ENV{FC_EVENT}=="nvmediscovery", \ + ENV{NVMEFC_HOST_TRADDR}=="*", ENV{NVMEFC_TRADDR}=="*", \ + RUN+="%s --no-block start nvmf-connect@--transport=fc\t--traddr=$env{NVMEFC_TRADDR}\t--trsvcid=none\t--host-traddr=$env{NVMEFC_HOST_TRADDR}.service" +''' + + +def _udev_rule_ctrl(suppress): + '''@brief We override the standard udev rule installed by nvme-cli, i.e. + '/usr/lib/udev/rules.d/70-nvmf-autoconnect.rules', with a copy into + /run/udev/rules.d. The goal is to suppress the udev rule that controls TCP + connections to I/O controllers. This is to avoid race conditions between + stacd and udevd. This is configurable. See "udev-rule" in stacd.conf + for details. + + @param enable: When True, override nvme-cli's udev rule and prevent TCP I/O + Controller connections by nvme-cli. When False, allow nvme-cli's udev rule + to make TCP I/O connections. + @type enable: bool + ''' + udev_rule_file = pathlib.Path('/run/udev/rules.d', '70-nvmf-autoconnect.rules') + if suppress: + if not udev_rule_file.exists(): + pathlib.Path('/run/udev/rules.d').mkdir(parents=True, exist_ok=True) + text = UDEV_RULE_OVERRIDE % (defs.SYSTEMCTL) + udev_rule_file.write_text(text) # pylint: disable=unspecified-encoding + else: + try: + udev_rule_file.unlink() + except FileNotFoundError: + pass + + +def _is_dlp_changed_aen(udev_obj): + '''Check whether we received a Change of Discovery Log Page AEN''' + nvme_aen = udev_obj.get('NVME_AEN') + if not isinstance(nvme_aen, str): + return False + + aen = int(nvme_aen, 16) + if aen != ctrl.DLP_CHANGED: + return False + + logging.info( + '%s - Received AEN: Change of Discovery Log Page (%s)', + udev_obj.sys_name, + nvme_aen, + ) + return True + + +def _event_matches(udev_obj, nvme_events): + '''Check whether we received an NVMe Event matching + one of the events listed in @nvme_events''' + nvme_event = udev_obj.get('NVME_EVENT') + if nvme_event not in nvme_events: + return False + + logging.info('%s - Received "%s" event', udev_obj.sys_name, nvme_event) + return True + + +# ****************************************************************************** +class Staf(Service): + '''STorage Appliance Finder (STAF)''' + + def __init__(self, args, dbus): + default_conf = { + ('Global', 'tron'): False, + ('Global', 'hdr-digest'): False, + ('Global', 'data-digest'): False, + ('Global', 'kato'): 30, + ('Global', 'queue-size'): None, # None to let the driver decide the default + ('Global', 'reconnect-delay'): None, # None to let the driver decide the default + ('Global', 'ctrl-loss-tmo'): None, # None to let the driver decide the default + ('Global', 'disable-sqflow'): None, # None to let the driver decide the default + ('Global', 'persistent-connections'): False, # Deprecated + ('Discovery controller connection management', 'persistent-connections'): True, + ('Discovery controller connection management', 'zeroconf-connections-persistence'): timeparse.timeparse( + '72hours' + ), + ('Global', 'ignore-iface'): False, + ('Global', 'ip-family'): (4, 6), + ('Global', 'pleo'): True, + ('Service Discovery', 'zeroconf'): True, + ('Controllers', 'controller'): list(), + ('Controllers', 'exclude'): list(), + } + + super().__init__(args, default_conf, self._reload_hdlr) + + self._avahi = avahi.Avahi(self._sysbus, self._avahi_change) + self._avahi.config_stypes(conf.SvcConf().stypes) + + # Create the D-Bus instance. + self._config_dbus(dbus, defs.STAFD_DBUS_NAME, defs.STAFD_DBUS_PATH) + + self._udev.register_for_action_events('change', self._nvme_cli_interop) + _udev_rule_ctrl(True) + + def info(self) -> dict: + '''@brief Get the status info for this object (used for debug)''' + info = super().info() + info['avahi'] = self._avahi.info() + return info + + def _release_resources(self): + logging.debug('Staf._release_resources()') + if self._udev: + self._udev.unregister_for_action_events('change', self._nvme_cli_interop) + + super()._release_resources() + + _udev_rule_ctrl(False) + if self._avahi: + self._avahi.kill() + self._avahi = None + + def _dump_last_known_config(self, controllers): + config = {tid: {'log_pages': dc.log_pages(), 'origin': dc.origin} for tid, dc in controllers.items()} + logging.debug('Staf._dump_last_known_config() - DC count = %s', len(config)) + self._write_lkc(config) + + def _load_last_known_config(self): + config = self._read_lkc() or dict() + logging.debug('Staf._load_last_known_config() - DC count = %s', len(config)) + + controllers = {} + for tid, data in config.items(): + if isinstance(data, dict): + log_pages = data.get('log_pages') + origin = data.get('origin') + else: + log_pages = data + origin = None + + # Regenerate the TID (in case of soft. upgrade and TID object + # has changed internally) + tid = trid.TID(tid.as_dict()) + controllers[tid] = ctrl.Dc(self, tid, log_pages, origin) + + return controllers + + def _keep_connections_on_exit(self): + '''@brief Determine whether connections should remain when the + process exits. + ''' + return conf.SvcConf().persistent_connections + + def _reload_hdlr(self): + '''@brief Reload configuration file. This is triggered by the SIGHUP + signal, which can be sent with "systemctl reload stafd". + ''' + if not self._alive(): + return GLib.SOURCE_REMOVE + + sd_notify('RELOADING=1') + service_cnf = conf.SvcConf() + service_cnf.reload() + self.tron = service_cnf.tron + self._avahi.kick_start() # Make sure Avahi is running + self._avahi.config_stypes(service_cnf.stypes) + self._cfg_soak_tmr.start() + + for controller in self._controllers.values(): + controller.reload_hdlr() + + sd_notify('READY=1') + return GLib.SOURCE_CONTINUE + + def is_avahi_reported(self, tid): + '''@brief Return whether @tid is being reported by the Avahi daemon. + @return: True if the Avahi daemon is reporting it, False otherwise. + ''' + for cid in self._avahi.get_controllers(): + if trid.TID(cid) == tid: + return True + return False + + def log_pages_changed(self, controller, device): + '''@brief Function invoked when a controller's cached log pages + have changed. This will emit a D-Bus signal to inform + other applications that the cached log pages have changed. + ''' + self._dbus_iface.log_pages_changed.emit( + controller.tid.transport, + controller.tid.traddr, + controller.tid.trsvcid, + controller.tid.host_traddr, + controller.tid.host_iface, + controller.tid.subsysnqn, + device, + ) + + def dc_removed(self): + '''@brief Function invoked when a controller's cached log pages + have changed. This will emit a D-Bus signal to inform + other applications that the cached log pages have changed. + ''' + self._dbus_iface.dc_removed.emit() + + def _referrals(self) -> list: + return [ + stas.tid_from_dlpe(dlpe, controller.tid.host_traddr, controller.tid.host_iface) + for controller in self.get_controllers() + for dlpe in controller.referrals() + ] + + def _config_ctrls_finish(self, configured_ctrl_list: list): + '''@brief Finish discovery controllers configuration after + hostnames (if any) have been resolved. All the logic associated + with discovery controller creation/deletion is found here. To + avoid calling this algorith repetitively for each and every events, + it is called after a soaking period controlled by self._cfg_soak_tmr. + + @param configured_ctrl_list: List of TIDs configured in stafd.conf with + all hostnames resolved to their corresponding IP addresses. + ''' + # This is a callback function, which may be called after the service + # has been signalled to stop. So let's make sure the service is still + # alive and well before continuing. + if not self._alive(): + logging.debug('Staf._config_ctrls_finish() - Exiting because service is no longer alive') + return + + # Eliminate invalid entries from stafd.conf "controller list". + controllers = list() + for tid in configured_ctrl_list: + if '' in (tid.transport, tid.traddr, tid.trsvcid): + continue + if not tid.subsysnqn: + cid = tid.as_dict() + cid['subsysnqn'] = defs.WELL_KNOWN_DISC_NQN + controllers.append(trid.TID(cid)) + else: + controllers.append(tid) + configured_ctrl_list = controllers + + # Get the Avahi-discovered list and the referrals. + discovered_ctrl_list = [trid.TID(cid) for cid in self._avahi.get_controllers()] + referral_ctrl_list = self._referrals() + logging.debug('Staf._config_ctrls_finish() - configured_ctrl_list = %s', configured_ctrl_list) + logging.debug('Staf._config_ctrls_finish() - discovered_ctrl_list = %s', discovered_ctrl_list) + logging.debug('Staf._config_ctrls_finish() - referral_ctrl_list = %s', referral_ctrl_list) + + all_ctrls = configured_ctrl_list + discovered_ctrl_list + referral_ctrl_list + controllers = stas.remove_excluded(all_ctrls) + controllers = iputil.remove_invalid_addresses(controllers) + + new_controller_tids = set(controllers) + cur_controller_tids = set(self._controllers.keys()) + controllers_to_add = new_controller_tids - cur_controller_tids + controllers_to_del = cur_controller_tids - new_controller_tids + + # Make a list list of excluded and invalid controllers + must_remove_list = set(all_ctrls) - new_controller_tids + + # Find "discovered" controllers that have not responded + # in a while and add them to controllers that must be removed. + must_remove_list.update({tid for tid, controller in self._controllers.items() if controller.is_unresponsive()}) + + # Do not remove Avahi-discovered DCs from controllers_to_del unless + # marked as "must-be-removed" (must_remove_list). This is to account for + # the case where mDNS discovery is momentarily disabled (e.g. Avahi + # daemon restarts). We don't want to delete connections because of + # temporary mDNS impairments. Removal of Avahi-discovered DCs will be + # handled differently and only if the connection cannot be established + # for a long period of time. + logging.debug('Staf._config_ctrls_finish() - must_remove_list = %s', list(must_remove_list)) + controllers_to_del = { + tid + for tid in controllers_to_del + if tid in must_remove_list or self._controllers[tid].origin != 'discovered' + } + + logging.debug('Staf._config_ctrls_finish() - controllers_to_add = %s', list(controllers_to_add)) + logging.debug('Staf._config_ctrls_finish() - controllers_to_del = %s', list(controllers_to_del)) + + # Delete controllers + for tid in controllers_to_del: + controller = self._controllers.pop(tid, None) + if controller is not None: + self._terminator.dispose(controller, self.remove_controller, keep_connection=False) + + if len(controllers_to_del) > 0: + self.dc_removed() # Let other apps (e.g. stacd) know that discovery controllers were removed. + + # Add controllers + for tid in controllers_to_add: + self._controllers[tid] = ctrl.Dc(self, tid) + + # Update "origin" on all DC objects + for tid, controller in self._controllers.items(): + origin = ( + 'configured' + if tid in configured_ctrl_list + else 'referral' + if tid in referral_ctrl_list + else 'discovered' + if tid in discovered_ctrl_list + else None + ) + if origin is not None: + controller.origin = origin + + self._dump_last_known_config(self._controllers) + + def _avahi_change(self): + if self._alive() and self._cfg_soak_tmr is not None: + self._cfg_soak_tmr.start() + + def controller_unresponsive(self, tid): + '''@brief Function invoked when a controller becomes unresponsive and + needs to be removed. + ''' + if self._alive() and self._cfg_soak_tmr is not None: + logging.debug('Staf.controller_unresponsive() - tid = %s', tid) + self._cfg_soak_tmr.start() + + def referrals_changed(self): + '''@brief Function invoked when a controller's cached referrals + have changed. + ''' + if self._alive() and self._cfg_soak_tmr is not None: + logging.debug('Staf.referrals_changed()') + self._cfg_soak_tmr.start() + + def _nvme_cli_interop(self, udev_obj): + '''Interoperability with nvme-cli: + stafd will invoke nvme-cli's connect-all the same way nvme-cli's udev + rules would do normally. This is for the case where a user has an hybrid + configuration where some controllers are configured through nvme-stas + and others through nvme-cli. This is not an optimal configuration. It + would be better if everything was configured through nvme-stas, however + support for hybrid configuration was requested by users (actually only + one user requested this).''' + + # Looking for 'change' events only + if udev_obj.action != 'change': + return + + # Looking for events from Discovery Controllers only + if not udev.Udev.is_dc_device(udev_obj): + return + + # Is the controller already being monitored by stafd? + for controller in self.get_controllers(): + if controller.device == udev_obj.sys_name: + return + + # Did we receive a Change of DLP AEN or an NVME Event indicating 'connect' or 'rediscover'? + if not _is_dlp_changed_aen(udev_obj) and not _event_matches(udev_obj, ('connected', 'rediscover')): + return + + # We need to invoke "nvme connect-all" using nvme-cli's nvmf-connect@.service + # NOTE: Eventually, we'll be able to drop --host-traddr and --host-iface from + # the parameters passed to nvmf-connect@.service. A fix was added to connect-all + # to infer these two values from the device used to connect to the DC. + # Ref: https://github.com/linux-nvme/nvme-cli/pull/1812 + cnf = [ + ('--device', udev_obj.sys_name), + ('--host-traddr', udev_obj.properties.get('NVME_HOST_TRADDR', None)), + ('--host-iface', udev_obj.properties.get('NVME_HOST_IFACE', None)), + ] + # Use systemd's escaped syntax (i.e. '=' is replaced by '\x3d', '\t' by '\x09', etc. + options = r'\x09'.join( + [fr'{option}\x3d{value}' for option, value in cnf if value not in (None, 'none', 'None', '')] + ) + logging.info('Invoking: systemctl start nvmf-connect@%s.service', options) + cmd = [defs.SYSTEMCTL, '--quiet', '--no-block', 'start', fr'nvmf-connect@{options}.service'] + subprocess.run(cmd, check=False) diff --git a/staslib/singleton.py b/staslib/singleton.py new file mode 100644 index 0000000..2171186 --- /dev/null +++ b/staslib/singleton.py @@ -0,0 +1,23 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''Implementation of a singleton pattern''' + + +class Singleton(type): + '''metaclass implementation of a singleton pattern''' + + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + # This variable declaration is required to force a + # strong reference on the instance. + instance = super(Singleton, cls).__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] diff --git a/staslib/stacd.idl b/staslib/stacd.idl new file mode 100644 index 0000000..efefbbe --- /dev/null +++ b/staslib/stacd.idl @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/staslib/stafd.idl b/staslib/stafd.idl new file mode 100644 index 0000000..8c98ffe --- /dev/null +++ b/staslib/stafd.idl @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/staslib/stas.py b/staslib/stas.py new file mode 100644 index 0000000..95afb94 --- /dev/null +++ b/staslib/stas.py @@ -0,0 +1,554 @@ +# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''Library for staf/stac. You will find here common code for stafd and stacd +including the Abstract Base Classes (ABC) for Controllers and Services''' + +import os +import sys +import abc +import signal +import pickle +import logging +import dasbus.connection +from gi.repository import Gio, GLib +from systemd.daemon import notify as sd_notify +from staslib import conf, defs, gutil, log, trid + +try: + # Python 3.9 or later + # This is the preferred way, but may not be available before Python 3.9 + from importlib.resources import files +except ImportError: + try: + # Pre Python 3.9 backport of importlib.resources (if installed) + from importlib_resources import files + except ImportError: + # Less efficient, but avalable on older versions of Python + import pkg_resources + + def load_idl(idl_fname): + '''@brief Load D-Bus Interface Description Language File''' + try: + return pkg_resources.resource_string('staslib', idl_fname).decode() + except (FileNotFoundError, AttributeError): + pass + + return '' + + else: + + def load_idl(idl_fname): + '''@brief Load D-Bus Interface Description Language File''' + try: + return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding + except FileNotFoundError: + pass + + return '' + +else: + + def load_idl(idl_fname): + '''@brief Load D-Bus Interface Description Language File''' + try: + return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding + except FileNotFoundError: + pass + + return '' + + +# ****************************************************************************** +def check_if_allowed_to_continue(): + '''@brief Let's perform some basic checks before going too far. There are + a few pre-requisites that need to be met before this program + is allowed to proceed: + + 1) The program needs to have root privileges + 2) The nvme-tcp kernel module must be loaded + + @return This function will only return if all conditions listed above + are met. Otherwise the program exits. + ''' + # 1) Check root privileges + if os.geteuid() != 0: + sys.exit(f'Permission denied. You need root privileges to run {defs.PROG_NAME}.') + + # 2) Check that nvme-tcp kernel module is running + if not os.path.exists('/dev/nvme-fabrics'): + # There's no point going any further if the kernel module hasn't been loaded + sys.exit('Fatal error: missing nvme-tcp kernel module') + + +# ****************************************************************************** +def tid_from_dlpe(dlpe, host_traddr, host_iface): + '''@brief Take a Discovery Log Page Entry and return a Controller ID as a dict.''' + cid = { + 'transport': dlpe['trtype'], + 'traddr': dlpe['traddr'], + 'trsvcid': dlpe['trsvcid'], + 'host-traddr': host_traddr, + 'host-iface': host_iface, + 'subsysnqn': dlpe['subnqn'], + } + return trid.TID(cid) + + +# ****************************************************************************** +def _excluded(excluded_ctrl_list, controller: dict): + '''@brief Check if @controller is excluded.''' + for excluded_ctrl in excluded_ctrl_list: + test_results = [val == controller.get(key, None) for key, val in excluded_ctrl.items()] + if all(test_results): + return True + return False + + +# ****************************************************************************** +def remove_excluded(controllers: list): + '''@brief Remove excluded controllers from the list of controllers. + @param controllers: List of TIDs + ''' + excluded_ctrl_list = conf.SvcConf().get_excluded() + if excluded_ctrl_list: + logging.debug('remove_excluded() - excluded_ctrl_list = %s', excluded_ctrl_list) + controllers = [ + controller for controller in controllers if not _excluded(excluded_ctrl_list, controller.as_dict()) + ] + return controllers + + +# ****************************************************************************** +class ControllerABC(abc.ABC): + '''@brief Base class used to manage the connection to a controller.''' + + CONNECT_RETRY_PERIOD_SEC = 60 + FAST_CONNECT_RETRY_PERIOD_SEC = 3 + + def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False): + self._tid = tid + self._serv = service # Refers to the parent service (either Staf or Stac) + self.set_level_from_tron(self._serv.tron) + self._cancellable = Gio.Cancellable() + self._connect_attempts = 0 + self._retry_connect_tmr = gutil.GTimer(self.CONNECT_RETRY_PERIOD_SEC, self._on_try_to_connect) + self._discovery_ctrl = discovery_ctrl + self._try_to_connect_deferred = gutil.Deferred(self._try_to_connect) + self._try_to_connect_deferred.schedule() + + def _release_resources(self): + # Remove pending deferred from main loop + if self._try_to_connect_deferred: + self._try_to_connect_deferred.cancel() + + if self._retry_connect_tmr is not None: + self._retry_connect_tmr.kill() + + if self._alive(): + self._cancellable.cancel() + + self._tid = None + self._serv = None + self._cancellable = None + self._retry_connect_tmr = None + self._try_to_connect_deferred = None + + @property + def id(self) -> str: + '''@brief Return the Transport ID as a printable string''' + return str(self.tid) + + @property + def tid(self): + '''@brief Return the Transport ID object''' + return self._tid + + def controller_id_dict(self) -> dict: + '''@brief return the controller ID as a dict.''' + return {k: str(v) for k, v in self.tid.as_dict().items()} + + def details(self) -> dict: + '''@brief return detailed debug info about this controller''' + return self.info() + + def info(self) -> dict: + '''@brief Get the controller info for this object''' + info = self.controller_id_dict() + info['connect attempts'] = str(self._connect_attempts) + info['retry connect timer'] = str(self._retry_connect_tmr) + return info + + def cancel(self): + '''@brief Used to cancel pending operations.''' + if self._alive(): + logging.debug('ControllerABC.cancel() - %s', self.id) + self._cancellable.cancel() + + def kill(self): + '''@brief Used to release all resources associated with this object.''' + logging.debug('ControllerABC.kill() - %s', self.id) + self._release_resources() + + def _alive(self): + '''There may be race condition where a queued event gets processed + after the object is no longer configured (i.e. alive). This method + can be used by callback functions to make sure the object is still + alive before processing further. + ''' + return self._cancellable and not self._cancellable.is_cancelled() + + def _on_try_to_connect(self): + if self._alive(): + self._try_to_connect_deferred.schedule() + return GLib.SOURCE_REMOVE + + def _should_try_to_reconnect(self): # pylint: disable=no-self-use + return True + + def _try_to_connect(self): + if not self._alive(): + return GLib.SOURCE_REMOVE + + # This is a deferred function call. Make sure + # the source of the deferred is still good. + source = GLib.main_current_source() + if source and source.is_destroyed(): + return GLib.SOURCE_REMOVE + + self._connect_attempts += 1 + + self._do_connect() + + return GLib.SOURCE_REMOVE + + @abc.abstractmethod + def set_level_from_tron(self, tron): + '''Set log level based on TRON''' + + @abc.abstractmethod + def _do_connect(self): + '''Perform connection''' + + @abc.abstractmethod + def _on_aen(self, aen: int): + '''Event handler when an AEN is received''' + + @abc.abstractmethod + def _on_nvme_event(self, nvme_event): + '''Event handler when an nvme_event is received''' + + @abc.abstractmethod + def _on_ctrl_removed(self, udev_obj): + '''Called when the associated nvme device (/dev/nvmeX) is removed + from the system by the kernel. + ''' + + @abc.abstractmethod + def _find_existing_connection(self): + '''Check if there is an existing connection that matches this Controller's TID''' + + @abc.abstractmethod + def all_ops_completed(self) -> bool: + '''@brief Returns True if all operations have completed. False otherwise.''' + + @abc.abstractmethod + def connected(self): + '''@brief Return whether a connection is established''' + + @abc.abstractmethod + def disconnect(self, disconnected_cb, keep_connection): + '''@brief Issue an asynchronous disconnect command to a Controller. + Once the async command has completed, the callback 'disconnected_cb' + will be invoked. If a controller is already disconnected, then the + callback will be added to the main loop's next idle slot to be executed + ASAP. + ''' + + @abc.abstractmethod + def reload_hdlr(self): + '''@brief This is called when a "reload" signal is received.''' + + +# ****************************************************************************** +class ServiceABC(abc.ABC): # pylint: disable=too-many-instance-attributes + '''@brief Base class used to manage a STorage Appliance Service''' + + CONF_STABILITY_SOAK_TIME_SEC = 1.5 + + def __init__(self, args, default_conf, reload_hdlr): + service_conf = conf.SvcConf(default_conf=default_conf) + service_conf.set_conf_file(args.conf_file) # reload configuration + self._tron = args.tron or service_conf.tron + log.set_level_from_tron(self._tron) + + self._lkc_file = os.path.join( + os.environ.get('RUNTIME_DIRECTORY', os.path.join('/run', defs.PROG_NAME)), 'last-known-config.pickle' + ) + self._loop = GLib.MainLoop() + self._cancellable = Gio.Cancellable() + self._resolver = gutil.NameResolver() + self._controllers = self._load_last_known_config() + self._dbus_iface = None + self._cfg_soak_tmr = gutil.GTimer(self.CONF_STABILITY_SOAK_TIME_SEC, self._on_config_ctrls) + self._sysbus = dasbus.connection.SystemMessageBus() + + GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self._stop_hdlr) # CTRL-C + GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self._stop_hdlr) # systemctl stop stafd + GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, reload_hdlr) # systemctl reload stafd + + nvme_options = conf.NvmeOptions() + if not nvme_options.host_iface_supp or not nvme_options.discovery_supp: + logging.warning( + 'Kernel does not appear to support all the options needed to run this program. Consider updating to a later kernel version.' + ) + + # We don't want to apply configuration changes right away. + # Often, multiple changes will occur in a short amount of time (sub-second). + # We want to wait until there are no more changes before applying them + # to the system. The following timer acts as a "soak period". Changes + # will be applied by calling self._on_config_ctrls() at the end of + # the soak period. + self._cfg_soak_tmr.start() + + def _release_resources(self): + logging.debug('ServiceABC._release_resources()') + + if self._alive(): + self._cancellable.cancel() + + if self._cfg_soak_tmr is not None: + self._cfg_soak_tmr.kill() + + self._controllers.clear() + + if self._sysbus: + self._sysbus.disconnect() + + self._cfg_soak_tmr = None + self._cancellable = None + self._resolver = None + self._lkc_file = None + self._sysbus = None + + def _config_dbus(self, iface_obj, bus_name: str, obj_name: str): + self._dbus_iface = iface_obj + self._sysbus.publish_object(obj_name, iface_obj) + self._sysbus.register_service(bus_name) + + @property + def tron(self): + '''@brief Get Trace ON property''' + return self._tron + + @tron.setter + def tron(self, value): + '''@brief Set Trace ON property''' + self._tron = value + log.set_level_from_tron(self._tron) + for controller in self._controllers.values(): + controller.set_level_from_tron(self._tron) + + def run(self): + '''@brief Start the main loop execution''' + try: + self._loop.run() + except Exception as ex: # pylint: disable=broad-except + logging.critical('exception: %s', ex) + + self._loop = None + + def info(self) -> dict: + '''@brief Get the status info for this object (used for debug)''' + nvme_options = conf.NvmeOptions() + info = conf.SysConf().as_dict() + info['last known config file'] = self._lkc_file + info['config soak timer'] = str(self._cfg_soak_tmr) + info['kernel support.TP8013'] = str(nvme_options.discovery_supp) + info['kernel support.host_iface'] = str(nvme_options.host_iface_supp) + return info + + def get_controllers(self) -> dict: + '''@brief return the list of controller objects''' + return self._controllers.values() + + def get_controller( + self, transport: str, traddr: str, trsvcid: str, host_traddr: str, host_iface: str, subsysnqn: str + ): # pylint: disable=too-many-arguments + '''@brief get the specified controller object from the list of controllers''' + cid = { + 'transport': transport, + 'traddr': traddr, + 'trsvcid': trsvcid, + 'host-traddr': host_traddr, + 'host-iface': host_iface, + 'subsysnqn': subsysnqn, + } + return self._controllers.get(trid.TID(cid)) + + def _remove_ctrl_from_dict(self, controller, shutdown=False): + tid_to_pop = controller.tid + if not tid_to_pop: + # Being paranoid. This should not happen, but let's say the + # controller object has been purged, but it is somehow still + # listed in self._controllers. + for tid, _controller in self._controllers.items(): + if _controller is controller: + tid_to_pop = tid + break + + if tid_to_pop: + logging.debug('ServiceABC._remove_ctrl_from_dict()- %s | %s', tid_to_pop, controller.device) + popped = self._controllers.pop(tid_to_pop, None) + if not shutdown and popped is not None and self._cfg_soak_tmr: + self._cfg_soak_tmr.start() + else: + logging.debug('ServiceABC._remove_ctrl_from_dict()- already removed') + + def remove_controller(self, controller, success): # pylint: disable=unused-argument + '''@brief remove the specified controller object from the list of controllers + @param controller: the controller object + @param success: whether the disconnect was successful''' + logging.debug('ServiceABC.remove_controller()') + if isinstance(controller, ControllerABC): + self._remove_ctrl_from_dict(controller) + controller.kill() + + def _alive(self): + '''It's a good idea to check that this object hasn't been + cancelled (i.e. is still alive) when entering a callback function. + Callback functrions can be invoked after, for example, a process has + been signalled to stop or restart, in which case it makes no sense to + proceed with the callback. + ''' + return self._cancellable and not self._cancellable.is_cancelled() + + def _cancel(self): + logging.debug('ServiceABC._cancel()') + if self._alive(): + self._cancellable.cancel() + + for controller in self._controllers.values(): + controller.cancel() + + def _stop_hdlr(self): + logging.debug('ServiceABC._stop_hdlr()') + sd_notify('STOPPING=1') + + self._cancel() # Cancel pending operations + + self._dump_last_known_config(self._controllers) + + if len(self._controllers) == 0: + GLib.idle_add(self._exit) + else: + self._disconnect_all() + + return GLib.SOURCE_REMOVE + + def _on_final_disconnect(self, controller, success): + '''Callback invoked after a controller is disconnected. + THIS IS USED DURING PROCESS SHUTDOWN TO WAIT FOR ALL CONTROLLERS TO BE + DISCONNECTED BEFORE EXITING THE PROGRAM. ONLY CALL ON SHUTDOWN! + @param controller: the controller object + @param success: whether the disconnect operation was successful + ''' + logging.debug( + 'ServiceABC._on_final_disconnect() - %s | %s: disconnect %s', + controller.id, + controller.device, + 'succeeded' if success else 'failed', + ) + + self._remove_ctrl_from_dict(controller, True) + controller.kill() + + # When all controllers have disconnected, we can finish the clean up + if len(self._controllers) == 0: + # Defer exit to the next main loop's idle period. + GLib.idle_add(self._exit) + + def _exit(self): + logging.debug('ServiceABC._exit()') + self._release_resources() + self._loop.quit() + + def _on_config_ctrls(self, *_user_data): + if self._alive(): + self._config_ctrls() + return GLib.SOURCE_REMOVE + + def _config_ctrls(self): + '''@brief Start controllers configuration.''' + # The configuration file may contain controllers and/or excluded + # controllers with traddr specified as hostname instead of IP address. + # Because of this, we need to remove those excluded elements before + # running name resolution. And we will need to remove excluded + # elements after name resolution is complete (i.e. in the calback + # function _config_ctrls_finish) + logging.debug('ServiceABC._config_ctrls()') + configured_controllers = [trid.TID(cid) for cid in conf.SvcConf().get_controllers()] + configured_controllers = remove_excluded(configured_controllers) + self._resolver.resolve_ctrl_async(self._cancellable, configured_controllers, self._config_ctrls_finish) + + def _read_lkc(self): + '''@brief Read Last Known Config from file''' + try: + with open(self._lkc_file, 'rb') as file: + return pickle.load(file) + except (FileNotFoundError, AttributeError, EOFError): + return None + + def _write_lkc(self, config): + '''@brief Write Last Known Config to file, and if config is empty + make sure the file is emptied.''' + try: + # Note that if config is empty we still + # want to open/close the file to empty it. + with open(self._lkc_file, 'wb') as file: + if config: + pickle.dump(config, file) + except FileNotFoundError as ex: + logging.error('Unable to save last known config: %s', ex) + + @abc.abstractmethod + def _disconnect_all(self): + '''Tell all controller objects to disconnect''' + + @abc.abstractmethod + def _keep_connections_on_exit(self): + '''@brief Determine whether connections should remain when the + process exits. + + NOTE) This is the base class method used to define the interface. + It must be overloaded by a child class. + ''' + + @abc.abstractmethod + def _config_ctrls_finish(self, configured_ctrl_list): + '''@brief Finish controllers configuration after hostnames (if any) + have been resolved. + + Configuring controllers must be done asynchronously in 2 steps. + In the first step, host names get resolved to find their IP addresses. + Name resolution can take a while, especially when an external name + resolution server is used. Once that step completed, the callback + method _config_ctrls_finish() (i.e. this method), gets invoked to + complete the controller configuration. + + NOTE) This is the base class method used to define the interface. + It must be overloaded by a child class. + ''' + + @abc.abstractmethod + def _load_last_known_config(self): + '''Load last known config from file (if any)''' + + @abc.abstractmethod + def _dump_last_known_config(self, controllers): + '''Save last known config to file''' diff --git a/staslib/timeparse.py b/staslib/timeparse.py new file mode 100644 index 0000000..5295fc4 --- /dev/null +++ b/staslib/timeparse.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +''' +This module was borrowed and modified from: https://github.com/wroberts/pytimeparse + +timeparse.py +(c) Will Roberts 1 February, 2014 + +Implements a single function, `timeparse`, which can parse various +kinds of time expressions. +''' + +# MIT LICENSE +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation files +# (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import re + +SIGN = r'(?P[+|-])?' +DAYS = r'(?P[\d.]+)\s*(?:d|dys?|days?)' +HOURS = r'(?P[\d.]+)\s*(?:h|hrs?|hours?)' +MINS = r'(?P[\d.]+)\s*(?:m|(mins?)|(minutes?))' +SECS = r'(?P[\d.]+)\s*(?:s|secs?|seconds?)' +SEPARATORS = r'[,/]' +SECCLOCK = r':(?P\d{2}(?:\.\d+)?)' +MINCLOCK = r'(?P\d{1,2}):(?P\d{2}(?:\.\d+)?)' +HOURCLOCK = r'(?P\d+):(?P\d{2}):(?P\d{2}(?:\.\d+)?)' +DAYCLOCK = r'(?P\d+):(?P\d{2}):(?P\d{2}):(?P\d{2}(?:\.\d+)?)' + + +def _opt(string): + return f'(?:{string})?' + + +def _optsep(string): + return fr'(?:{string}\s*(?:{SEPARATORS}\s*)?)?' + + +TIMEFORMATS = [ + fr'{_optsep(DAYS)}\s*{_optsep(HOURS)}\s*{_optsep(MINS)}\s*{_opt(SECS)}', + f'{MINCLOCK}', + fr'{_optsep(DAYS)}\s*{HOURCLOCK}', + f'{DAYCLOCK}', + f'{SECCLOCK}', +] + +COMPILED_SIGN = re.compile(r'\s*' + SIGN + r'\s*(?P.*)$') +COMPILED_TIMEFORMATS = [re.compile(r'\s*' + timefmt + r'\s*$', re.I) for timefmt in TIMEFORMATS] + +MULTIPLIERS = { + 'days': 60 * 60 * 24, + 'hours': 60 * 60, + 'mins': 60, + 'secs': 1, +} + + +def timeparse(sval): + ''' + Parse a time expression, returning it as a number of seconds. If + possible, the return value will be an `int`; if this is not + possible, the return will be a `float`. Returns `None` if a time + expression cannot be parsed from the given string. + + Arguments: + - `sval`: the string value to parse + + >>> timeparse('1:24') + 84 + >>> timeparse(':22') + 22 + >>> timeparse('1 minute, 24 secs') + 84 + >>> timeparse('1m24s') + 84 + >>> timeparse('1.2 minutes') + 72 + >>> timeparse('1.2 seconds') + 1.2 + + Time expressions can be signed. + + >>> timeparse('- 1 minute') + -60 + >>> timeparse('+ 1 minute') + 60 + ''' + try: + return float(sval) + except TypeError: + pass + except ValueError: + match = COMPILED_SIGN.match(sval) + sign = -1 if match.groupdict()['sign'] == '-' else 1 + sval = match.groupdict()['unsigned'] + for timefmt in COMPILED_TIMEFORMATS: + match = timefmt.match(sval) + if match and match.group(0).strip(): + mdict = match.groupdict() + # if all of the fields are integer numbers + if all(v.isdigit() for v in list(mdict.values()) if v): + return sign * sum((MULTIPLIERS[k] * int(v, 10) for (k, v) in list(mdict.items()) if v is not None)) + + # if SECS is an integer number + if 'secs' not in mdict or mdict['secs'] is None or mdict['secs'].isdigit(): + # we will return an integer + return sign * int( + sum( + ( + MULTIPLIERS[k] * float(v) + for (k, v) in list(mdict.items()) + if k != 'secs' and v is not None + ) + ) + ) + (int(mdict['secs'], 10) if mdict['secs'] else 0) + + # SECS is a float, we will return a float + return sign * sum((MULTIPLIERS[k] * float(v) for (k, v) in list(mdict.items()) if v is not None)) + + return None diff --git a/staslib/trid.py b/staslib/trid.py new file mode 100644 index 0000000..ea40b7d --- /dev/null +++ b/staslib/trid.py @@ -0,0 +1,137 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''This module defines the Transport Identifier Object, which is used +throughout nvme-stas to uniquely identify a Controller''' + +import hashlib +from staslib import conf + + +class TID: # pylint: disable=too-many-instance-attributes + '''Transport Identifier''' + + RDMA_IP_PORT = '4420' + DISC_IP_PORT = '8009' + + def __init__(self, cid: dict): + '''@param cid: Controller Identifier. A dictionary with the following + contents. + { + # Transport parameters + 'transport': str, # [mandatory] + 'traddr': str, # [mandatory] + 'subsysnqn': str, # [mandatory] + 'trsvcid': str, # [optional] + 'host-traddr': str, # [optional] + 'host-iface': str, # [optional] + + # Connection parameters + 'dhchap-ctrl-secret': str, # [optional] + 'hdr-digest': str, # [optional] + 'data-digest': str, # [optional] + 'nr-io-queues': str, # [optional] + 'nr-write-queues': str, # [optional] + 'nr-poll-queues': str, # [optional] + 'queue-size': str, # [optional] + 'kato': str, # [optional] + 'reconnect-delay': str, # [optional] + 'ctrl-loss-tmo': str, # [optional] + 'disable-sqflow': str, # [optional] + } + ''' + self._cfg = { + k: v + for k, v in cid.items() + if k not in ('transport', 'traddr', 'subsysnqn', 'trsvcid', 'host-traddr', 'host-iface') + } + self._transport = cid.get('transport', '') + self._traddr = cid.get('traddr', '') + self._trsvcid = '' + if self._transport in ('tcp', 'rdma'): + trsvcid = cid.get('trsvcid', None) + self._trsvcid = ( + trsvcid if trsvcid else (TID.RDMA_IP_PORT if self._transport == 'rdma' else TID.DISC_IP_PORT) + ) + self._host_traddr = cid.get('host-traddr', '') + self._host_iface = '' if conf.SvcConf().ignore_iface else cid.get('host-iface', '') + self._subsysnqn = cid.get('subsysnqn', '') + self._shortkey = (self._transport, self._traddr, self._trsvcid, self._subsysnqn, self._host_traddr) + self._key = (self._transport, self._traddr, self._trsvcid, self._subsysnqn, self._host_traddr, self._host_iface) + self._hash = int.from_bytes( + hashlib.md5(''.join(self._key).encode('utf-8')).digest(), 'big' + ) # We need a consistent hash between restarts + self._id = f'({self._transport}, {self._traddr}, {self._trsvcid}{", " + self._subsysnqn if self._subsysnqn else ""}{", " + self._host_iface if self._host_iface else ""}{", " + self._host_traddr if self._host_traddr else ""})' # pylint: disable=line-too-long + + @property + def transport(self): # pylint: disable=missing-function-docstring + return self._transport + + @property + def traddr(self): # pylint: disable=missing-function-docstring + return self._traddr + + @property + def trsvcid(self): # pylint: disable=missing-function-docstring + return self._trsvcid + + @property + def host_traddr(self): # pylint: disable=missing-function-docstring + return self._host_traddr + + @property + def host_iface(self): # pylint: disable=missing-function-docstring + return self._host_iface + + @property + def subsysnqn(self): # pylint: disable=missing-function-docstring + return self._subsysnqn + + @property + def cfg(self): # pylint: disable=missing-function-docstring + return self._cfg + + def as_dict(self): + '''Return object members as a dictionary''' + data = { + 'transport': self.transport, + 'traddr': self.traddr, + 'subsysnqn': self.subsysnqn, + 'trsvcid': self.trsvcid, + 'host-traddr': self.host_traddr, + 'host-iface': self.host_iface, + } + data.update(self._cfg) + return data + + def __str__(self): + return self._id + + def __repr__(self): + return self._id + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + + if self._host_iface and other._host_iface: + return self._key == other._key + + return self._shortkey == other._shortkey + + def __ne__(self, other): + if not isinstance(other, self.__class__): + return True + + if self._host_iface and other._host_iface: + return self._key != other._key + + return self._shortkey != other._shortkey + + def __hash__(self): + return self._hash diff --git a/staslib/udev.py b/staslib/udev.py new file mode 100644 index 0000000..12ef61b --- /dev/null +++ b/staslib/udev.py @@ -0,0 +1,334 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +'''This module provides functions to access nvme devices using the pyudev module''' + +import os +import time +import logging +import pyudev +from gi.repository import GLib +from staslib import defs, iputil, trid + + +# ****************************************************************************** +class Udev: + '''@brief Udev event monitor. Provide a way to register for udev events. + WARNING: THE singleton.Singleton PATTERN CANNOT BE USED WITH THIS CLASS. + IT INTERFERES WITH THE pyudev INTERNALS, WHICH CAUSES OBJECT CLEAN UP TO FAIL. + ''' + + def __init__(self): + self._log_event_soak_time = 0 + self._log_event_count = 0 + self._device_event_registry = dict() + self._action_event_registry = dict() + self._context = pyudev.Context() + self._monitor = pyudev.Monitor.from_netlink(self._context) + self._monitor.filter_by(subsystem='nvme') + self._event_source = GLib.io_add_watch( + self._monitor.fileno(), + GLib.PRIORITY_HIGH, + GLib.IO_IN, + self._process_udev_event, + ) + self._monitor.start() + + def release_resources(self): + '''Release all resources used by this object''' + if self._event_source is not None: + GLib.source_remove(self._event_source) + + if self._monitor is not None: + self._monitor.remove_filter() + + self._event_source = None + self._monitor = None + self._context = None + self._device_event_registry = None + self._action_event_registry = None + + def get_nvme_device(self, sys_name): + '''@brief Get the udev device object associated with an nvme device. + @param sys_name: The device system name (e.g. 'nvme1') + @return A pyudev.device._device.Device object + ''' + device_node = os.path.join('/dev', sys_name) + try: + return pyudev.Devices.from_device_file(self._context, device_node) + except pyudev.DeviceNotFoundByFileError as ex: + logging.error("Udev.get_nvme_device() - Error: %s", ex) + return None + + def is_action_cback_registered(self, action: str, user_cback): + '''Returns True if @user_cback is registered for @action. False otherwise. + @param action: one of 'add', 'remove', 'change'. + @param user_cback: A callback function with this signature: cback(udev_obj) + ''' + return user_cback in self._action_event_registry.get(action, set()) + + def register_for_action_events(self, action: str, user_cback): + '''@brief Register a callback function to be called when udev events + for a specific action are received. + @param action: one of 'add', 'remove', 'change'. + ''' + self._action_event_registry.setdefault(action, set()).add(user_cback) + + def unregister_for_action_events(self, action: str, user_cback): + '''@brief The opposite of register_for_action_events()''' + try: + self._action_event_registry.get(action, set()).remove(user_cback) + except KeyError: # Raise if user_cback already removed + pass + + def register_for_device_events(self, sys_name: str, user_cback): + '''@brief Register a callback function to be called when udev events + are received for a specific nvme device. + @param sys_name: The device system name (e.g. 'nvme1') + ''' + if sys_name: + self._device_event_registry[sys_name] = user_cback + + def unregister_for_device_events(self, user_cback): + '''@brief The opposite of register_for_device_events()''' + entries = list(self._device_event_registry.items()) + for sys_name, _user_cback in entries: + if user_cback == _user_cback: + self._device_event_registry.pop(sys_name, None) + break + + def get_attributes(self, sys_name: str, attr_ids) -> dict: + '''@brief Get all the attributes associated with device @sys_name''' + attrs = {attr_id: '' for attr_id in attr_ids} + if sys_name and sys_name != 'nvme?': + udev = self.get_nvme_device(sys_name) + if udev is not None: + for attr_id in attr_ids: + try: + value = udev.attributes.asstring(attr_id).strip() + attrs[attr_id] = '' if value == '(efault)' else value + except Exception: # pylint: disable=broad-except + pass + + return attrs + + @staticmethod + def is_dc_device(device): + '''@brief check whether device refers to a Discovery Controller''' + subsysnqn = device.attributes.get('subsysnqn') + if subsysnqn is not None and subsysnqn.decode() == defs.WELL_KNOWN_DISC_NQN: + return True + + # Note: Prior to 5.18 linux didn't expose the cntrltype through + # the sysfs. So, this may return None on older kernels. + cntrltype = device.attributes.get('cntrltype') + if cntrltype is not None and cntrltype.decode() == 'discovery': + return True + + # Imply Discovery controller based on the absence of children. + # Discovery Controllers have no children devices + if len(list(device.children)) == 0: + return True + + return False + + @staticmethod + def is_ioc_device(device): + '''@brief check whether device refers to an I/O Controller''' + # Note: Prior to 5.18 linux didn't expose the cntrltype through + # the sysfs. So, this may return None on older kernels. + cntrltype = device.attributes.get('cntrltype') + if cntrltype is not None and cntrltype.decode() == 'io': + return True + + # Imply I/O controller based on the presence of children. + # I/O Controllers have children devices + if len(list(device.children)) != 0: + return True + + return False + + def find_nvme_dc_device(self, tid): + '''@brief Find the nvme device associated with the specified + Discovery Controller. + @return The device if a match is found, None otherwise. + ''' + for device in self._context.list_devices( + subsystem='nvme', NVME_TRADDR=tid.traddr, NVME_TRSVCID=tid.trsvcid, NVME_TRTYPE=tid.transport + ): + if not self.is_dc_device(device): + continue + + if self.get_tid(device) != tid: + continue + + return device + + return None + + def find_nvme_ioc_device(self, tid): + '''@brief Find the nvme device associated with the specified + I/O Controller. + @return The device if a match is found, None otherwise. + ''' + for device in self._context.list_devices( + subsystem='nvme', NVME_TRADDR=tid.traddr, NVME_TRSVCID=tid.trsvcid, NVME_TRTYPE=tid.transport + ): + if not self.is_ioc_device(device): + continue + + if self.get_tid(device) != tid: + continue + + return device + + return None + + def get_nvme_ioc_tids(self, transports): + '''@brief Find all the I/O controller nvme devices in the system. + @return A list of pyudev.device._device.Device objects + ''' + tids = [] + for device in self._context.list_devices(subsystem='nvme'): + if device.properties.get('NVME_TRTYPE', '') not in transports: + continue + + if not self.is_ioc_device(device): + continue + + tids.append(self.get_tid(device)) + + return tids + + def _process_udev_event(self, event_source, condition): # pylint: disable=unused-argument + if condition == GLib.IO_IN: + event_count = 0 + while True: + try: + device = self._monitor.poll(timeout=0) + except EnvironmentError as ex: + device = None + # This event seems to happen in bursts. So, let's suppress + # logging for 2 seconds to avoid filling the syslog. + self._log_event_count += 1 + now = time.time() + if now > self._log_event_soak_time: + logging.debug('Udev._process_udev_event() - %s [%s]', ex, self._log_event_count) + self._log_event_soak_time = now + 2 + self._log_event_count = 0 + + if device is None: + break + + event_count += 1 + self._device_event(device, event_count) + + return GLib.SOURCE_CONTINUE + + @staticmethod + def __cback_names(action_cbacks, device_cback): + names = [] + for cback in action_cbacks: + names.append(cback.__name__ + '()') + if device_cback: + names.append(device_cback.__name__ + '()') + return names + + def _device_event(self, device, event_count): + action_cbacks = self._action_event_registry.get(device.action, set()) + device_cback = self._device_event_registry.get(device.sys_name, None) + + logging.debug( + 'Udev._device_event() - %-8s %-6s %-8s %s', + f'{device.sys_name}:', + device.action, + f'{event_count:2}:{device.sequence_number}', + self.__cback_names(action_cbacks, device_cback), + ) + + for action_cback in action_cbacks: + GLib.idle_add(action_cback, device) + + if device_cback is not None: + GLib.idle_add(device_cback, device) + + @staticmethod + def _get_property(device, prop, default=''): + prop = device.properties.get(prop, default) + return '' if prop.lower() == 'none' else prop + + @staticmethod + def _get_attribute(device, attr_id, default=''): + try: + attr = device.attributes.asstring(attr_id).strip() + except Exception: # pylint: disable=broad-except + attr = default + + return '' if attr.lower() == 'none' else attr + + @staticmethod + def get_key_from_attr(device, attr, key, delim=','): + '''Get attribute specified by attr, which is composed of key=value pairs. + Then return the value associated with key. + @param device: The Device object + @param attr: The device's attribute to get + @param key: The key to look for in the attribute + @param delim: Delimiter used between key=value pairs. + @example: + "address" attribute contains "trtype=tcp,traddr=10.10.1.100,trsvcid=4420,host_traddr=10.10.1.50" + ''' + attr_str = Udev._get_attribute(device, attr) + if not attr_str: + return '' + + if key[-1] != '=': + key += '=' + start = attr_str.find(key) + if start < 0: + return '' + start += len(key) + + end = attr_str.find(delim, start) + if end < 0: + return attr_str[start:] + + return attr_str[start:end] + + @staticmethod + def _get_host_iface(device): + host_iface = Udev._get_property(device, 'NVME_HOST_IFACE') + if not host_iface: + # We'll try to find the interface from the source address on + # the connection. Only available if kernel exposes the source + # address (src_addr) in the "address" attribute. + src_addr = Udev.get_key_from_attr(device, 'address', 'src_addr=') + host_iface = iputil.get_interface(src_addr) + return host_iface + + @staticmethod + def get_tid(device): + '''@brief return the Transport ID associated with a udev device''' + cid = { + 'transport': Udev._get_property(device, 'NVME_TRTYPE'), + 'traddr': Udev._get_property(device, 'NVME_TRADDR'), + 'trsvcid': Udev._get_property(device, 'NVME_TRSVCID'), + 'host-traddr': Udev._get_property(device, 'NVME_HOST_TRADDR'), + 'host-iface': Udev._get_host_iface(device), + 'subsysnqn': Udev._get_attribute(device, 'subsysnqn'), + } + return trid.TID(cid) + + +UDEV = Udev() # Singleton + + +def shutdown(): + '''Destroy the UDEV singleton''' + global UDEV # pylint: disable=global-statement,global-variable-not-assigned + UDEV.release_resources() + del UDEV diff --git a/staslib/version.py b/staslib/version.py new file mode 100644 index 0000000..999d916 --- /dev/null +++ b/staslib/version.py @@ -0,0 +1,64 @@ +# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger +# +''' distutils (and hence LooseVersion) is being deprecated. None of the + suggested replacements (e.g. from pkg_resources import parse_version) quite + work with Linux kernel versions the way LooseVersion does. + + It was suggested to simply lift the LooseVersion code and vendor it in, + which is what this module is about. +''' + +import re + + +class KernelVersion: + '''Code loosely lifted from distutils's LooseVersion''' + + component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE) + + def __init__(self, string: str): + self.string = string + self.version = self.__parse(string) + + def __str__(self): + return self.string + + def __repr__(self): + return f'KernelVersion ("{self}")' + + def __eq__(self, other): + return self.version == self.__version(other) + + def __lt__(self, other): + return self.version < self.__version(other) + + def __le__(self, other): + return self.version <= self.__version(other) + + def __gt__(self, other): + return self.version > self.__version(other) + + def __ge__(self, other): + return self.version >= self.__version(other) + + @staticmethod + def __version(obj): + return obj.version if isinstance(obj, KernelVersion) else KernelVersion.__parse(obj) + + @staticmethod + def __parse(string): + components = [] + for item in KernelVersion.component_re.split(string): + if item and item != '.': + try: + components.append(int(item)) + except ValueError: + pass + + return components -- cgit v1.2.3