summaryrefslogtreecommitdiffstats
path: root/staslib
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--staslib/.gitignore1
-rw-r--r--staslib/__init__.py11
-rw-r--r--staslib/avahi.py456
-rw-r--r--staslib/conf.py703
-rw-r--r--staslib/ctrl.py850
-rw-r--r--staslib/defs.py51
-rw-r--r--staslib/gutil.py418
-rw-r--r--staslib/iputil.py169
-rw-r--r--staslib/log.py53
-rw-r--r--staslib/meson.build60
-rw-r--r--staslib/service.py878
-rw-r--r--staslib/singleton.py23
-rw-r--r--staslib/stacd.idl27
-rw-r--r--staslib/stafd.idl49
-rw-r--r--staslib/stas.py554
-rw-r--r--staslib/timeparse.py139
-rw-r--r--staslib/trid.py137
-rw-r--r--staslib/udev.py334
-rw-r--r--staslib/version.py64
19 files changed, 4977 insertions, 0 deletions
diff --git a/staslib/.gitignore b/staslib/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/staslib/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/staslib/__init__.py b/staslib/__init__.py
new file mode 100644
index 0000000..27673d1
--- /dev/null
+++ b/staslib/__init__.py
@@ -0,0 +1,11 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''STorage Appliance Services'''
+
+__version__ = '@VERSION@'
diff --git a/staslib/avahi.py b/staslib/avahi.py
new file mode 100644
index 0000000..c8a3a0b
--- /dev/null
+++ b/staslib/avahi.py
@@ -0,0 +1,456 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+''' Module that provides a way to retrieve discovered
+ services from the Avahi daemon over D-Bus.
+'''
+import socket
+import typing
+import logging
+import functools
+import dasbus.error
+import dasbus.connection
+import dasbus.client.proxy
+import dasbus.client.observer
+from gi.repository import GLib
+from staslib import defs, conf, gutil
+
+
+def _txt2dict(txt: list):
+ '''@param txt: A list of list of integers. The integers are the ASCII value
+ of printable text characters.
+ '''
+ the_dict = dict()
+ for list_of_chars in txt:
+ try:
+ string = functools.reduce(lambda accumulator, c: accumulator + chr(c), list_of_chars, '')
+ key, val = string.split("=")
+ the_dict[key.lower()] = val
+ except Exception: # pylint: disable=broad-except
+ pass
+
+ return the_dict
+
+
+def _proto2trans(protocol):
+ '''Return the matching transport for the given protocol.'''
+ if protocol is None:
+ return None
+
+ protocol = protocol.strip().lower()
+ if protocol == 'tcp':
+ return 'tcp'
+
+ if protocol in ('roce', 'iwarp', 'rdma'):
+ return 'rdma'
+
+ return None
+
+
+# ******************************************************************************
+class Avahi: # pylint: disable=too-many-instance-attributes
+ '''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
+ daemon and register to be notified when services of a certain
+ type (stype) are discovered or lost.
+ '''
+
+ DBUS_NAME = 'org.freedesktop.Avahi'
+ DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + '.ServiceBrowser'
+ DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + '.ServiceResolver'
+ LOOKUP_USE_MULTICAST = 2
+
+ IF_UNSPEC = -1
+ PROTO_INET = 0
+ PROTO_INET6 = 1
+ PROTO_UNSPEC = -1
+
+ LOOKUP_RESULT_LOCAL = 8 # This record/service resides on and was announced by the local host
+ LOOKUP_RESULT_CACHED = 1 # This response originates from the cache
+ LOOKUP_RESULT_STATIC = 32 # The returned data has been defined statically by some configuration option
+ LOOKUP_RESULT_OUR_OWN = 16 # This service belongs to the same local client as the browser object
+ LOOKUP_RESULT_WIDE_AREA = 2 # This response originates from wide area DNS
+ LOOKUP_RESULT_MULTICAST = 4 # This response originates from multicast DNS
+
+ result_flags = {
+ LOOKUP_RESULT_LOCAL: 'local',
+ LOOKUP_RESULT_CACHED: 'cache',
+ LOOKUP_RESULT_STATIC: 'static',
+ LOOKUP_RESULT_OUR_OWN: 'own',
+ LOOKUP_RESULT_WIDE_AREA: 'wan',
+ LOOKUP_RESULT_MULTICAST: 'mcast',
+ }
+
+ protos = {PROTO_INET: 'IPv4', PROTO_INET6: 'IPv6', PROTO_UNSPEC: 'uspecified'}
+
+ @classmethod
+ def result_flags_as_string(cls, flags):
+ '''Convert flags to human-readable string'''
+ return '+'.join((value for flag, value in Avahi.result_flags.items() if (flags & flag) != 0))
+
+ @classmethod
+ def protocol_as_string(cls, proto):
+ '''Convert protocol codes to human-readable strings'''
+ return Avahi.protos.get(proto, 'unknown')
+
+ # ==========================================================================
+ def __init__(self, sysbus, change_cb):
+ self._change_cb = change_cb
+ self._services = dict()
+ self._sysbus = sysbus
+ self._stypes = set()
+ self._service_browsers = dict()
+
+ # Avahi is an on-demand service. If, for some reason, the avahi-daemon
+ # were to stop, we need to try to contact it for it to restart. For
+ # example, when installing the avahi-daemon package on a running system,
+ # the daemon doesn't get started right away. It needs another process to
+ # access it over D-Bus to wake it up. The following timer is used to
+ # periodically query the avahi-daemon until we successfully establish
+ # first contact.
+ self._kick_avahi_tmr = gutil.GTimer(60, self._on_kick_avahi)
+
+ # Subscribe for Avahi signals (i.e. events). This must be done before
+ # any Browser or Resolver is created to avoid race conditions and
+ # missed events.
+ self._subscriptions = [
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME,
+ Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
+ 'ItemNew',
+ None,
+ None,
+ 0,
+ self._service_discovered,
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME,
+ Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
+ 'ItemRemove',
+ None,
+ None,
+ 0,
+ self._service_removed,
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_BROWSER, 'Failure', None, None, 0, self._failure_handler
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Found', None, None, 0, self._service_identified
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Failure', None, None, 0, self._failure_handler
+ ),
+ ]
+
+ self._avahi = self._sysbus.get_proxy(Avahi.DBUS_NAME, '/')
+
+ self._avahi_watcher = dasbus.client.observer.DBusObserver(self._sysbus, Avahi.DBUS_NAME)
+ self._avahi_watcher.service_available.connect(self._avahi_available)
+ self._avahi_watcher.service_unavailable.connect(self._avahi_unavailable)
+ self._avahi_watcher.connect_once_available()
+
+ def kill(self):
+ '''@brief Clean up object'''
+ logging.debug('Avahi.kill()')
+
+ self._kick_avahi_tmr.kill()
+ self._kick_avahi_tmr = None
+
+ for subscription in self._subscriptions:
+ self._sysbus.connection.signal_unsubscribe(subscription)
+ self._subscriptions = list()
+
+ self._disconnect()
+
+ self._avahi_watcher.service_available.disconnect()
+ self._avahi_watcher.service_unavailable.disconnect()
+ self._avahi_watcher.disconnect()
+ self._avahi_watcher = None
+
+ dasbus.client.proxy.disconnect_proxy(self._avahi)
+ self._avahi = None
+
+ self._change_cb = None
+ self._sysbus = None
+
+ def info(self) -> dict:
+ '''@brief return debug info about this object'''
+ services = dict()
+ for service, obj in self._services.items():
+ interface, protocol, name, stype, domain = service
+ key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
+ services[key] = obj.get('data', {})
+
+ info = {
+ 'avahi wake up timer': str(self._kick_avahi_tmr),
+ 'service types': list(self._stypes),
+ 'services': services,
+ }
+
+ return info
+
+ def get_controllers(self) -> list:
+ '''@brief Get the discovery controllers as a list of dict()
+ as follows:
+ [
+ {
+ 'transport': tcp,
+ 'traddr': str(),
+ 'trsvcid': str(),
+ 'host-iface': str(),
+ 'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
+ },
+ {
+ 'transport': tcp,
+ 'traddr': str(),
+ 'trsvcid': str(),
+ 'host-iface': str(),
+ 'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
+ },
+ [...]
+ ]
+ '''
+ return [service['data'] for service in self._services.values() if len(service['data'])]
+
+ def config_stypes(self, stypes: list):
+ '''@brief Configure the service types that we want to discover.
+ @param stypes: A list of services types, e.g. ['_nvme-disc._tcp']
+ '''
+ self._stypes = set(stypes)
+ success = self._configure_browsers()
+ if not success:
+ self._kick_avahi_tmr.start()
+
+ def kick_start(self):
+ '''@brief We use this to kick start the Avahi
+ daemon (i.e. socket activation).
+ '''
+ self._kick_avahi_tmr.clear()
+
+ def _disconnect(self):
+ logging.debug('Avahi._disconnect()')
+ for service in self._services.values():
+ resolver = service.pop('resolver', None)
+ if resolver is not None:
+ try:
+ resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
+
+ self._services = dict()
+
+ for browser in self._service_browsers.values():
+ try:
+ browser.Free()
+ dasbus.client.proxy.disconnect_proxy(browser)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._disconnect() - Failed to Free() browser. %s', ex)
+
+ self._service_browsers = dict()
+
+ def _on_kick_avahi(self):
+ try:
+ # try to contact avahi-daemon. This is just a wake
+ # up call in case the avahi-daemon was sleeping.
+ self._avahi.GetVersionString()
+ except dasbus.error.DBusError:
+ return GLib.SOURCE_CONTINUE
+
+ return GLib.SOURCE_REMOVE
+
+ def _avahi_available(self, _avahi_watcher):
+ '''@brief Hook up DBus signal handlers for signals from stafd.'''
+ logging.info('avahi-daemon service available, zeroconf supported.')
+ success = self._configure_browsers()
+ if not success:
+ self._kick_avahi_tmr.start()
+
+ def _avahi_unavailable(self, _avahi_watcher):
+ self._disconnect()
+ logging.warning('avahi-daemon not available, zeroconf not supported.')
+ self._kick_avahi_tmr.start()
+
+ def _configure_browsers(self):
+ stypes_cur = set(self._service_browsers.keys())
+ stypes_to_add = self._stypes - stypes_cur
+ stypes_to_rm = stypes_cur - self._stypes
+
+ logging.debug('Avahi._configure_browsers() - stypes_to_rm = %s', list(stypes_to_rm))
+ logging.debug('Avahi._configure_browsers() - stypes_to_add = %s', list(stypes_to_add))
+
+ for stype_to_rm in stypes_to_rm:
+ browser = self._service_browsers.pop(stype_to_rm, None)
+ if browser is not None:
+ try:
+ browser.Free()
+ dasbus.client.proxy.disconnect_proxy(browser)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
+
+ # Find the cached services corresponding to stype_to_rm and remove them
+ services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
+ for service in services_to_rm:
+ resolver = self._services.pop(service, {}).pop('resolver', None)
+ if resolver is not None:
+ try:
+ resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
+
+ for stype in stypes_to_add:
+ try:
+ obj_path = self._avahi.ServiceBrowserNew(
+ Avahi.IF_UNSPEC, Avahi.PROTO_UNSPEC, stype, 'local', Avahi.LOOKUP_USE_MULTICAST
+ )
+ self._service_browsers[stype] = self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path)
+ except dasbus.error.DBusError as ex:
+ logging.debug('Avahi._configure_browsers() - Failed to contact avahi-daemon. %s', ex)
+ logging.warning('avahi-daemon not available, operating w/o mDNS discovery.')
+ return False
+
+ return True
+
+ def _service_discovered(
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ _interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[int, int, str, str, str, int],
+ *_user_data,
+ ):
+ (interface, protocol, name, stype, domain, flags) = args
+ logging.debug(
+ 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
+ interface,
+ socket.if_indextoname(interface),
+ Avahi.protocol_as_string(protocol),
+ stype,
+ domain,
+ flags,
+ '(' + Avahi.result_flags_as_string(flags) + '),',
+ name,
+ )
+
+ service = (interface, protocol, name, stype, domain)
+ if service not in self._services:
+ try:
+ obj_path = self._avahi.ServiceResolverNew(
+ interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
+ )
+ self._services[service] = {
+ 'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
+ 'data': {},
+ }
+ except dasbus.error.DBusError as ex:
+ logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
+
+ def _service_removed(
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ _interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[int, int, str, str, str, int],
+ *_user_data,
+ ):
+ (interface, protocol, name, stype, domain, flags) = args
+ logging.debug(
+ 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
+ interface,
+ socket.if_indextoname(interface),
+ Avahi.protocol_as_string(protocol),
+ stype,
+ domain,
+ flags,
+ '(' + Avahi.result_flags_as_string(flags) + '),',
+ name,
+ )
+
+ service = (interface, protocol, name, stype, domain)
+ resolver = self._services.pop(service, {}).pop('resolver', None)
+ if resolver is not None:
+ try:
+ resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
+
+ self._change_cb()
+
+ def _service_identified( # pylint: disable=too-many-locals
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ _interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[int, int, str, str, str, str, int, str, int, list, int],
+ *_user_data,
+ ):
+ (interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
+ txt = _txt2dict(txt)
+ logging.debug(
+ 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
+ interface,
+ socket.if_indextoname(interface),
+ Avahi.protocol_as_string(protocol),
+ stype,
+ domain,
+ flags,
+ '(' + Avahi.result_flags_as_string(flags) + '),',
+ name,
+ host,
+ Avahi.protocol_as_string(aprotocol),
+ address,
+ port,
+ txt,
+ )
+
+ service = (interface, protocol, name, stype, domain)
+ if service in self._services:
+ transport = _proto2trans(txt.get('p'))
+ if transport is not None:
+ self._services[service]['data'] = {
+ 'transport': transport,
+ 'traddr': address.strip(),
+ 'trsvcid': str(port).strip(),
+ # host-iface permitted for tcp alone and not rdma
+ 'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
+ 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
+ if conf.NvmeOptions().discovery_supp
+ else defs.WELL_KNOWN_DISC_NQN,
+ }
+
+ self._change_cb()
+ else:
+ logging.error(
+ 'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
+ address,
+ socket.if_indextoname(interface).strip(),
+ txt,
+ )
+
+ def _failure_handler( # pylint: disable=no-self-use
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[str],
+ *_user_data,
+ ):
+ (error,) = args
+ if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
+ # ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
+ logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)
diff --git a/staslib/conf.py b/staslib/conf.py
new file mode 100644
index 0000000..a54da98
--- /dev/null
+++ b/staslib/conf.py
@@ -0,0 +1,703 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''nvme-stas configuration module'''
+
+import re
+import os
+import sys
+import logging
+import functools
+import configparser
+from staslib import defs, singleton, timeparse
+
+__TOKEN_RE = re.compile(r'\s*;\s*')
+__OPTION_RE = re.compile(r'\s*=\s*')
+
+
+class InvalidOption(Exception):
+ '''Exception raised when an invalid option value is detected'''
+
+
+def _parse_controller(controller):
+ '''@brief Parse a "controller" entry. Controller entries are strings
+ composed of several configuration parameters delimited by
+ semi-colons. Each configuration parameter is specified as a
+ "key=value" pair.
+ @return A dictionary of key-value pairs.
+ '''
+ options = dict()
+ tokens = __TOKEN_RE.split(controller)
+ for token in tokens:
+ if token:
+ try:
+ option, val = __OPTION_RE.split(token)
+ options[option.strip()] = val.strip()
+ except ValueError:
+ pass
+
+ return options
+
+
+def _parse_single_val(text):
+ if isinstance(text, str):
+ return text
+ if not isinstance(text, list) or len(text) == 0:
+ return None
+
+ return text[-1]
+
+
+def _parse_list(text):
+ return text if isinstance(text, list) else [text]
+
+
+def _to_int(text):
+ try:
+ return int(_parse_single_val(text))
+ except (ValueError, TypeError):
+ raise InvalidOption # pylint: disable=raise-missing-from
+
+
+def _to_bool(text, positive='true'):
+ return _parse_single_val(text).lower() == positive
+
+
+def _to_ncc(text):
+ value = _to_int(text)
+ if value == 1: # 1 is invalid. A minimum of 2 is required (with the exception of 0, which is valid).
+ value = 2
+ return value
+
+
+def _to_ip_family(text):
+ return tuple((4 if text == 'ipv4' else 6 for text in _parse_single_val(text).split('+')))
+
+
+# ******************************************************************************
+class OrderedMultisetDict(dict):
+ '''This class is used to change the behavior of configparser.ConfigParser
+ and allow multiple configuration parameters with the same key. The
+ result is a list of values.
+ '''
+
+ def __setitem__(self, key, value):
+ if key in self and isinstance(value, list):
+ self[key].extend(value)
+ else:
+ super().__setitem__(key, value)
+
+ def __getitem__(self, key):
+ value = super().__getitem__(key)
+
+ if isinstance(value, str):
+ return value.split('\n')
+
+ return value
+
+
+class SvcConf(metaclass=singleton.Singleton): # pylint: disable=too-many-public-methods
+ '''Read and cache configuration file.'''
+
+ OPTION_CHECKER = {
+ 'Global': {
+ 'tron': {
+ 'convert': _to_bool,
+ 'default': False,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ 'kato': {
+ 'convert': _to_int,
+ },
+ 'pleo': {
+ 'convert': functools.partial(_to_bool, positive='enabled'),
+ 'default': True,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('disabled', 'enabled'),
+ },
+ 'ip-family': {
+ 'convert': _to_ip_family,
+ 'default': (4, 6),
+ 'txt-chk': lambda text: _parse_single_val(text) in ('ipv4', 'ipv6', 'ipv4+ipv6', 'ipv6+ipv4'),
+ },
+ 'queue-size': {
+ 'convert': _to_int,
+ 'rng-chk': lambda value: None if value in range(16, 1025) else range(16, 1025),
+ },
+ 'hdr-digest': {
+ 'convert': _to_bool,
+ 'default': False,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ 'data-digest': {
+ 'convert': _to_bool,
+ 'default': False,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ 'ignore-iface': {
+ 'convert': _to_bool,
+ 'default': False,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ 'nr-io-queues': {
+ 'convert': _to_int,
+ },
+ 'ctrl-loss-tmo': {
+ 'convert': _to_int,
+ },
+ 'disable-sqflow': {
+ 'convert': _to_bool,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ 'nr-poll-queues': {
+ 'convert': _to_int,
+ },
+ 'nr-write-queues': {
+ 'convert': _to_int,
+ },
+ 'reconnect-delay': {
+ 'convert': _to_int,
+ },
+ ### BEGIN: LEGACY SECTION TO BE REMOVED ###
+ 'persistent-connections': {
+ 'convert': _to_bool,
+ 'default': False,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ ### END: LEGACY SECTION TO BE REMOVED ###
+ },
+ 'Service Discovery': {
+ 'zeroconf': {
+ 'convert': functools.partial(_to_bool, positive='enabled'),
+ 'default': True,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('disabled', 'enabled'),
+ },
+ },
+ 'Discovery controller connection management': {
+ 'persistent-connections': {
+ 'convert': _to_bool,
+ 'default': True,
+ 'txt-chk': lambda text: _parse_single_val(text).lower() in ('false', 'true'),
+ },
+ 'zeroconf-connections-persistence': {
+ 'convert': lambda text: timeparse.timeparse(_parse_single_val(text)),
+ 'default': timeparse.timeparse('72hours'),
+ },
+ },
+ 'I/O controller connection management': {
+ 'disconnect-scope': {
+ 'convert': _parse_single_val,
+ 'default': 'only-stas-connections',
+ 'txt-chk': lambda text: _parse_single_val(text)
+ in ('only-stas-connections', 'all-connections-matching-disconnect-trtypes', 'no-disconnect'),
+ },
+ 'disconnect-trtypes': {
+ # Use set() to eliminate potential duplicates
+ 'convert': lambda text: set(_parse_single_val(text).split('+')),
+ 'default': [
+ 'tcp',
+ ],
+ 'lst-chk': ('tcp', 'rdma', 'fc'),
+ },
+ 'connect-attempts-on-ncc': {
+ 'convert': _to_ncc,
+ 'default': 0,
+ },
+ },
+ 'Controllers': {
+ 'controller': {
+ 'convert': _parse_list,
+ 'default': [],
+ },
+ 'exclude': {
+ 'convert': _parse_list,
+ 'default': [],
+ },
+ ### BEGIN: LEGACY SECTION TO BE REMOVED ###
+ 'blacklist': {
+ 'convert': _parse_list,
+ 'default': [],
+ },
+ ### END: LEGACY SECTION TO BE REMOVED ###
+ },
+ }
+
+ def __init__(self, default_conf=None, conf_file='/dev/null'):
+ self._config = None
+ self._defaults = default_conf if default_conf else {}
+
+ if self._defaults is not None and len(self._defaults) != 0:
+ self._valid_conf = {}
+ for section, option in self._defaults:
+ self._valid_conf.setdefault(section, set()).add(option)
+ else:
+ self._valid_conf = None
+
+ self._conf_file = conf_file
+ self.reload()
+
+ def reload(self):
+ '''@brief Reload the configuration file.'''
+ self._config = self._read_conf_file()
+
+ @property
+ def conf_file(self):
+ '''Return the configuration file name'''
+ return self._conf_file
+
+ def set_conf_file(self, fname):
+ '''Set the configuration file name and reload config'''
+ self._conf_file = fname
+ self.reload()
+
+ def get_option(self, section, option, ignore_default=False): # pylint: disable=too-many-locals
+ '''Retrieve @option from @section, convert raw text to
+ appropriate object type, and validate.'''
+ try:
+ checker = self.OPTION_CHECKER[section][option]
+ except KeyError:
+ logging.error('Requesting invalid section=%s and/or option=%s', section, option)
+ raise
+
+ default = checker.get('default', None)
+
+ try:
+ text = self._config.get(section=section, option=option)
+ except (configparser.NoSectionError, configparser.NoOptionError, KeyError):
+ return None if ignore_default else self._defaults.get((section, option), default)
+
+ return self._check(text, section, option, default)
+
+ tron = property(functools.partial(get_option, section='Global', option='tron'))
+ kato = property(functools.partial(get_option, section='Global', option='kato'))
+ ip_family = property(functools.partial(get_option, section='Global', option='ip-family'))
+ queue_size = property(functools.partial(get_option, section='Global', option='queue-size'))
+ hdr_digest = property(functools.partial(get_option, section='Global', option='hdr-digest'))
+ data_digest = property(functools.partial(get_option, section='Global', option='data-digest'))
+ ignore_iface = property(functools.partial(get_option, section='Global', option='ignore-iface'))
+ pleo_enabled = property(functools.partial(get_option, section='Global', option='pleo'))
+ nr_io_queues = property(functools.partial(get_option, section='Global', option='nr-io-queues'))
+ ctrl_loss_tmo = property(functools.partial(get_option, section='Global', option='ctrl-loss-tmo'))
+ disable_sqflow = property(functools.partial(get_option, section='Global', option='disable-sqflow'))
+ nr_poll_queues = property(functools.partial(get_option, section='Global', option='nr-poll-queues'))
+ nr_write_queues = property(functools.partial(get_option, section='Global', option='nr-write-queues'))
+ reconnect_delay = property(functools.partial(get_option, section='Global', option='reconnect-delay'))
+
+ zeroconf_enabled = property(functools.partial(get_option, section='Service Discovery', option='zeroconf'))
+
+ zeroconf_persistence_sec = property(
+ functools.partial(
+ get_option, section='Discovery controller connection management', option='zeroconf-connections-persistence'
+ )
+ )
+
+ disconnect_scope = property(
+ functools.partial(get_option, section='I/O controller connection management', option='disconnect-scope')
+ )
+ disconnect_trtypes = property(
+ functools.partial(get_option, section='I/O controller connection management', option='disconnect-trtypes')
+ )
+ connect_attempts_on_ncc = property(
+ functools.partial(get_option, section='I/O controller connection management', option='connect-attempts-on-ncc')
+ )
+
+ @property
+ def stypes(self):
+ '''@brief Get the DNS-SD/mDNS service types.'''
+ return ['_nvme-disc._tcp', '_nvme-disc._udp'] if self.zeroconf_enabled else list()
+
+ @property
+ def persistent_connections(self):
+ '''@brief return the "persistent-connections" config parameter'''
+ section = 'Discovery controller connection management'
+ option = 'persistent-connections'
+
+ value = self.get_option(section, option, ignore_default=True)
+ legacy = self.get_option('Global', 'persistent-connections', ignore_default=True)
+
+ if value is None and legacy is None:
+ return self._defaults.get((section, option), True)
+
+ return value or legacy
+
+ def get_controllers(self):
+ '''@brief Return the list of controllers in the config file.
+ Each controller is in the form of a dictionary as follows.
+ Note that some of the keys are optional.
+ {
+ 'transport': [TRANSPORT],
+ 'traddr': [TRADDR],
+ 'trsvcid': [TRSVCID],
+ 'host-traddr': [TRADDR],
+ 'host-iface': [IFACE],
+ 'subsysnqn': [NQN],
+ 'dhchap-ctrl-secret': [KEY],
+ 'hdr-digest': [BOOL]
+ 'data-digest': [BOOL]
+ 'nr-io-queues': [NUMBER]
+ 'nr-write-queues': [NUMBER]
+ 'nr-poll-queues': [NUMBER]
+ 'queue-size': [SIZE]
+ 'kato': [KATO]
+ 'reconnect-delay': [SECONDS]
+ 'ctrl-loss-tmo': [SECONDS]
+ 'disable-sqflow': [BOOL]
+ }
+ '''
+ controller_list = self.get_option('Controllers', 'controller')
+ cids = [_parse_controller(controller) for controller in controller_list]
+ for cid in cids:
+ try:
+ # replace 'nqn' key by 'subsysnqn', if present.
+ cid['subsysnqn'] = cid.pop('nqn')
+ except KeyError:
+ pass
+
+ # Verify values of the options used to overload the matching [Global] options
+ for option in cid:
+ if option in self.OPTION_CHECKER['Global']:
+ value = self._check(cid[option], 'Global', option, None)
+ if value is not None:
+ cid[option] = value
+
+ return cids
+
+ def get_excluded(self):
+ '''@brief Return the list of excluded controllers in the config file.
+ Each excluded controller is in the form of a dictionary
+ as follows. All the keys are optional.
+ {
+ 'transport': [TRANSPORT],
+ 'traddr': [TRADDR],
+ 'trsvcid': [TRSVCID],
+ 'host-iface': [IFACE],
+ 'subsysnqn': [NQN],
+ }
+ '''
+ controller_list = self.get_option('Controllers', 'exclude')
+
+ # 2022-09-20: Look for "blacklist". This is for backwards compatibility
+ # with releases 1.0 to 1.1.6. This is to be phased out (i.e. remove by 2024)
+ controller_list += self.get_option('Controllers', 'blacklist')
+
+ excluded = [_parse_controller(controller) for controller in controller_list]
+ for controller in excluded:
+ controller.pop('host-traddr', None) # remove host-traddr
+ try:
+ # replace 'nqn' key by 'subsysnqn', if present.
+ controller['subsysnqn'] = controller.pop('nqn')
+ except KeyError:
+ pass
+ return excluded
+
+ def _check(self, text, section, option, default):
+ checker = self.OPTION_CHECKER[section][option]
+ text_checker = checker.get('txt-chk', None)
+ if text_checker is not None and not text_checker(text):
+ logging.warning(
+ 'File:%s [%s]: %s - Text check found invalid value "%s". Default will be used',
+ self.conf_file,
+ section,
+ option,
+ text,
+ )
+ return self._defaults.get((section, option), default)
+
+ converter = checker.get('convert', None)
+ try:
+ value = converter(text)
+ except InvalidOption:
+ logging.warning(
+ 'File:%s [%s]: %s - Data converter found invalid value "%s". Default will be used',
+ self.conf_file,
+ section,
+ option,
+ text,
+ )
+ return self._defaults.get((section, option), default)
+
+ value_in_range = checker.get('rng-chk', None)
+ if value_in_range is not None:
+ expected_range = value_in_range(value)
+ if expected_range is not None:
+ logging.warning(
+ 'File:%s [%s]: %s - "%s" is not within range %s..%s. Default will be used',
+ self.conf_file,
+ section,
+ option,
+ value,
+ min(expected_range),
+ max(expected_range),
+ )
+ return self._defaults.get((section, option), default)
+
+ list_checker = checker.get('lst-chk', None)
+ if list_checker:
+ values = set()
+ for item in value:
+ if item not in list_checker:
+ logging.warning(
+ 'File:%s [%s]: %s - List checker found invalid item "%s" will be ignored.',
+ self.conf_file,
+ section,
+ option,
+ item,
+ )
+ else:
+ values.add(item)
+
+ if len(values) == 0:
+ return self._defaults.get((section, option), default)
+
+ value = list(values)
+
+ return value
+
+ def _read_conf_file(self):
+ '''@brief Read the configuration file if the file exists.'''
+ config = configparser.ConfigParser(
+ default_section=None,
+ allow_no_value=True,
+ delimiters=('='),
+ interpolation=None,
+ strict=False,
+ dict_type=OrderedMultisetDict,
+ )
+ if self._conf_file and os.path.isfile(self._conf_file):
+ config.read(self._conf_file)
+
+ # Parse Configuration and validate.
+ if self._valid_conf is not None:
+ invalid_sections = set()
+ for section in config.sections():
+ if section not in self._valid_conf:
+ invalid_sections.add(section)
+ else:
+ invalid_options = set()
+ for option in config.options(section):
+ if option not in self._valid_conf.get(section, []):
+ invalid_options.add(option)
+
+ if len(invalid_options) != 0:
+ logging.error(
+ 'File:%s [%s] contains invalid options: %s',
+ self.conf_file,
+ section,
+ invalid_options,
+ )
+
+ if len(invalid_sections) != 0:
+ logging.error(
+ 'File:%s contains invalid sections: %s',
+ self.conf_file,
+ invalid_sections,
+ )
+
+ return config
+
+
+# ******************************************************************************
+class SysConf(metaclass=singleton.Singleton):
+ '''Read and cache the host configuration file.'''
+
+ def __init__(self, conf_file=defs.SYS_CONF_FILE):
+ self._config = None
+ self._conf_file = conf_file
+ self.reload()
+
+ def reload(self):
+ '''@brief Reload the configuration file.'''
+ self._config = self._read_conf_file()
+
+ @property
+ def conf_file(self):
+ '''Return the configuration file name'''
+ return self._conf_file
+
+ def set_conf_file(self, fname):
+ '''Set the configuration file name and reload config'''
+ self._conf_file = fname
+ self.reload()
+
+ def as_dict(self):
+ '''Return configuration as a dictionary'''
+ return {
+ 'hostnqn': self.hostnqn,
+ 'hostid': self.hostid,
+ 'hostkey': self.hostkey,
+ 'symname': self.hostsymname,
+ }
+
+ @property
+ def hostnqn(self):
+ '''@brief return the host NQN
+ @return: Host NQN
+ @raise: Host NQN is mandatory. The program will terminate if a
+ Host NQN cannot be determined.
+ '''
+ try:
+ value = self.__get_value('Host', 'nqn', defs.NVME_HOSTNQN)
+ except FileNotFoundError as ex:
+ sys.exit(f'Error reading mandatory Host NQN (see stasadm --help): {ex}')
+
+ if value is not None and not value.startswith('nqn.'):
+ sys.exit(f'Error Host NQN "{value}" should start with "nqn."')
+
+ return value
+
+ @property
+ def hostid(self):
+ '''@brief return the host ID
+ @return: Host ID
+ @raise: Host ID is mandatory. The program will terminate if a
+ Host ID cannot be determined.
+ '''
+ try:
+ value = self.__get_value('Host', 'id', defs.NVME_HOSTID)
+ except FileNotFoundError as ex:
+ sys.exit(f'Error reading mandatory Host ID (see stasadm --help): {ex}')
+
+ return value
+
+ @property
+ def hostkey(self):
+ '''@brief return the host key
+ @return: Host key
+ @raise: Host key is optional, but mandatory if authorization will be performed.
+ '''
+ try:
+ value = self.__get_value('Host', 'key', defs.NVME_HOSTKEY)
+ except FileNotFoundError as ex:
+ logging.info('Host key undefined: %s', ex)
+ value = None
+
+ return value
+
+ @property
+ def hostsymname(self):
+ '''@brief return the host symbolic name (or None)
+ @return: symbolic name or None
+ '''
+ try:
+ value = self.__get_value('Host', 'symname')
+ except FileNotFoundError as ex:
+ logging.warning('Error reading host symbolic name (will remain undefined): %s', ex)
+ value = None
+
+ return value
+
+ def _read_conf_file(self):
+ '''@brief Read the configuration file if the file exists.'''
+ config = configparser.ConfigParser(
+ default_section=None, allow_no_value=True, delimiters=('='), interpolation=None, strict=False
+ )
+ if os.path.isfile(self._conf_file):
+ config.read(self._conf_file)
+ return config
+
+ def __get_value(self, section, option, default_file=None):
+ '''@brief A configuration file consists of sections, each led by a
+ [section] header, followed by key/value entries separated
+ by a equal sign (=). This method retrieves the value
+ associated with the key @option from the section @section.
+ If the value starts with the string "file://", then the value
+ will be retrieved from that file.
+
+ @param section: Configuration section
+ @param option: The key to look for
+ @param default_file: A file that contains the default value
+
+ @return: On success, the value associated with the key. On failure,
+ this method will return None is a default_file is not
+ specified, or will raise an exception if a file is not
+ found.
+
+ @raise: This method will raise the FileNotFoundError exception if
+ the value retrieved is a file that does not exist.
+ '''
+ try:
+ value = self._config.get(section=section, option=option)
+ if not value.startswith('file://'):
+ return value
+ file = value[7:]
+ except (configparser.NoSectionError, configparser.NoOptionError, KeyError):
+ if default_file is None:
+ return None
+ file = default_file
+
+ try:
+ with open(file) as f: # pylint: disable=unspecified-encoding
+ return f.readline().split()[0]
+ except IndexError:
+ return None
+
+
+# ******************************************************************************
+class NvmeOptions(metaclass=singleton.Singleton):
+ '''Object used to read and cache contents of file /dev/nvme-fabrics.
+ Note that this file was not readable prior to Linux 5.16.
+ '''
+
+ def __init__(self):
+ # Supported options can be determined by looking at the kernel version
+ # or by reading '/dev/nvme-fabrics'. The ability to read the options
+ # from '/dev/nvme-fabrics' was only introduced in kernel 5.17, but may
+ # have been backported to older kernels. In any case, if the kernel
+ # version meets the minimum version for that option, then we don't
+ # even need to read '/dev/nvme-fabrics'.
+ self._supported_options = {
+ 'discovery': defs.KERNEL_VERSION >= defs.KERNEL_TP8013_MIN_VERSION,
+ 'host_iface': defs.KERNEL_VERSION >= defs.KERNEL_IFACE_MIN_VERSION,
+ 'dhchap_secret': defs.KERNEL_VERSION >= defs.KERNEL_HOSTKEY_MIN_VERSION,
+ 'dhchap_ctrl_secret': defs.KERNEL_VERSION >= defs.KERNEL_CTRLKEY_MIN_VERSION,
+ }
+
+ # If some of the options are False, we need to check wether they can be
+ # read from '/dev/nvme-fabrics'. This method allows us to determine that
+ # an older kernel actually supports a specific option because it was
+ # backported to that kernel.
+ if not all(self._supported_options.values()): # At least one option is False.
+ try:
+ with open('/dev/nvme-fabrics') as f: # pylint: disable=unspecified-encoding
+ options = [option.split('=')[0].strip() for option in f.readline().rstrip('\n').split(',')]
+ except PermissionError: # Must be root to read this file
+ raise
+ except (OSError, FileNotFoundError):
+ logging.warning('Cannot determine which NVMe options the kernel supports')
+ else:
+ for option, supported in self._supported_options.items():
+ if not supported:
+ self._supported_options[option] = option in options
+
+ def __str__(self):
+ return f'supported options: {self._supported_options}'
+
+ def get(self):
+ '''get the supported options as a dict'''
+ return self._supported_options
+
+ @property
+ def discovery_supp(self):
+ '''This option adds support for TP8013'''
+ return self._supported_options['discovery']
+
+ @property
+ def host_iface_supp(self):
+ '''This option allows forcing connections to go over
+ a specific interface regardless of the routing tables.
+ '''
+ return self._supported_options['host_iface']
+
+ @property
+ def dhchap_hostkey_supp(self):
+ '''This option allows specifying the host DHCHAP key used for authentication.'''
+ return self._supported_options['dhchap_secret']
+
+ @property
+ def dhchap_ctrlkey_supp(self):
+ '''This option allows specifying the controller DHCHAP key used for authentication.'''
+ return self._supported_options['dhchap_ctrl_secret']
diff --git a/staslib/ctrl.py b/staslib/ctrl.py
new file mode 100644
index 0000000..97a1c7b
--- /dev/null
+++ b/staslib/ctrl.py
@@ -0,0 +1,850 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''This module defines the base Controller object from which the
+Dc (Discovery Controller) and Ioc (I/O Controller) objects are derived.'''
+
+import time
+import inspect
+import logging
+from gi.repository import GLib
+from libnvme import nvme
+from staslib import conf, defs, gutil, trid, udev, stas
+
+
+DLP_CHANGED = (
+ (nvme.NVME_LOG_LID_DISCOVER << 16) | (nvme.NVME_AER_NOTICE_DISC_CHANGED << 8) | nvme.NVME_AER_NOTICE
+) # 0x70f002
+
+
+def get_eflags(dlpe):
+ '''@brief Return eflags field of dlpe'''
+ return int(dlpe.get('eflags', 0)) if dlpe else 0
+
+
+def get_ncc(eflags: int):
+ '''@brief Return True if Not Connected to CDC bit is asserted, False otherwise'''
+ return eflags & nvme.NVMF_DISC_EFLAGS_NCC != 0
+
+
+def dlp_supp_opts_as_string(dlp_supp_opts: int):
+ '''@brief Return the list of options supported by the Get
+ discovery log page command.
+ '''
+ data = {
+ nvme.NVMF_LOG_DISC_LID_EXTDLPES: "EXTDLPES",
+ nvme.NVMF_LOG_DISC_LID_PLEOS: "PLEOS",
+ nvme.NVMF_LOG_DISC_LID_ALLSUBES: "ALLSUBES",
+ }
+ return [txt for msk, txt in data.items() if dlp_supp_opts & msk]
+
+
+# ******************************************************************************
+class Controller(stas.ControllerABC): # pylint: disable=too-many-instance-attributes
+ '''@brief Base class used to manage the connection to a controller.'''
+
+ def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False):
+ sysconf = conf.SysConf()
+ self._nvme_options = conf.NvmeOptions()
+ self._root = nvme.root()
+ self._host = nvme.host(
+ self._root, hostnqn=sysconf.hostnqn, hostid=sysconf.hostid, hostsymname=sysconf.hostsymname
+ )
+ self._host.dhchap_key = sysconf.hostkey if self._nvme_options.dhchap_hostkey_supp else None
+ self._udev = udev.UDEV
+ self._device = None # Refers to the nvme device (e.g. /dev/nvme[n])
+ self._ctrl = None # libnvme's nvme.ctrl object
+ self._connect_op = None
+
+ super().__init__(tid, service, discovery_ctrl)
+
+ def _release_resources(self):
+ logging.debug('Controller._release_resources() - %s | %s', self.id, self.device)
+
+ if self._udev:
+ self._udev.unregister_for_device_events(self._on_udev_notification)
+
+ self._kill_ops()
+
+ super()._release_resources()
+
+ self._ctrl = None
+ self._udev = None
+ self._host = None
+ self._root = None
+ self._nvme_options = None
+
+ @property
+ def device(self) -> str:
+ '''@brief return the Linux nvme device id (e.g. nvme3) or empty
+ string if no device is associated with this controller'''
+ if not self._device and self._ctrl and self._ctrl.name:
+ self._device = self._ctrl.name
+
+ return self._device or 'nvme?'
+
+ def all_ops_completed(self) -> bool:
+ '''@brief Returns True if all operations have completed. False otherwise.'''
+ return self._connect_op is None or self._connect_op.completed()
+
+ def connected(self):
+ '''@brief Return whether a connection is established'''
+ return self._ctrl and self._ctrl.connected()
+
+ def controller_id_dict(self) -> dict:
+ '''@brief return the controller ID as a dict.'''
+ cid = super().controller_id_dict()
+ cid['device'] = self.device
+ return cid
+
+ def details(self) -> dict:
+ '''@brief return detailed debug info about this controller'''
+ details = super().details()
+ details.update(
+ self._udev.get_attributes(self.device, ('hostid', 'hostnqn', 'model', 'serial', 'dctype', 'cntrltype'))
+ )
+ details['connected'] = str(self.connected())
+ return details
+
+ def info(self) -> dict:
+ '''@brief Get the controller info for this object'''
+ info = super().info()
+ if self._connect_op:
+ info['connect operation'] = str(self._connect_op.as_dict())
+ return info
+
+ def cancel(self):
+ '''@brief Used to cancel pending operations.'''
+ super().cancel()
+ if self._connect_op:
+ self._connect_op.cancel()
+
+ def _kill_ops(self):
+ if self._connect_op:
+ self._connect_op.kill()
+ self._connect_op = None
+
+ def set_level_from_tron(self, tron):
+ '''Set log level based on TRON'''
+ if self._root:
+ self._root.log_level("debug" if tron else "err")
+
+ def _on_udev_notification(self, udev_obj):
+ if self._alive():
+ if udev_obj.action == 'change':
+ nvme_aen = udev_obj.get('NVME_AEN')
+ nvme_event = udev_obj.get('NVME_EVENT')
+ if isinstance(nvme_aen, str):
+ logging.info('%s | %s - Received AEN: %s', self.id, udev_obj.sys_name, nvme_aen)
+ self._on_aen(int(nvme_aen, 16))
+ if isinstance(nvme_event, str):
+ self._on_nvme_event(nvme_event)
+ elif udev_obj.action == 'remove':
+ logging.info('%s | %s - Received "remove" event', self.id, udev_obj.sys_name)
+ self._on_ctrl_removed(udev_obj)
+ else:
+ logging.debug(
+ 'Controller._on_udev_notification() - %s | %s: Received "%s" event',
+ self.id,
+ udev_obj.sys_name,
+ udev_obj.action,
+ )
+ else:
+ logging.debug(
+ 'Controller._on_udev_notification() - %s | %s: Received event on dead object. udev_obj %s: %s',
+ self.id,
+ self.device,
+ udev_obj.action,
+ udev_obj.sys_name,
+ )
+
+ def _on_ctrl_removed(self, udev_obj): # pylint: disable=unused-argument
+ if self._udev:
+ self._udev.unregister_for_device_events(self._on_udev_notification)
+ self._kill_ops() # Kill all pending operations
+ self._ctrl = None
+
+ # Defer removal of this object to the next main loop's idle period.
+ GLib.idle_add(self._serv.remove_controller, self, True)
+
+ def _get_cfg(self):
+ '''Get configuration parameters. These may either come from the [Global]
+ section or from a "controller" entry in the configuration file. A
+ definition found in a "controller" entry overrides the same definition
+ found in the [Global] section.
+ '''
+ cfg = {}
+ service_conf = conf.SvcConf()
+ for option, keyword in (
+ ('kato', 'keep_alive_tmo'),
+ ('queue-size', 'queue_size'),
+ ('hdr-digest', 'hdr_digest'),
+ ('data-digest', 'data_digest'),
+ ('nr-io-queues', 'nr_io_queues'),
+ ('ctrl-loss-tmo', 'ctrl_loss_tmo'),
+ ('disable-sqflow', 'disable_sqflow'),
+ ('nr-poll-queues', 'nr_poll_queues'),
+ ('nr-write-queues', 'nr_write_queues'),
+ ('reconnect-delay', 'reconnect_delay'),
+ ):
+ # Check if the value is defined as a "controller" entry (i.e. override)
+ ovrd_val = self.tid.cfg.get(option, None)
+ if ovrd_val is not None:
+ cfg[keyword] = ovrd_val
+ else:
+ # Check if the value is found in the [Global] section.
+ glob_val = service_conf.get_option('Global', option)
+ if glob_val is not None:
+ cfg[keyword] = glob_val
+
+ return cfg
+
+ def _do_connect(self):
+ service_conf = conf.SvcConf()
+ host_iface = (
+ self.tid.host_iface
+ if (self.tid.host_iface and not service_conf.ignore_iface and self._nvme_options.host_iface_supp)
+ else None
+ )
+ self._ctrl = nvme.ctrl(
+ self._root,
+ subsysnqn=self.tid.subsysnqn,
+ transport=self.tid.transport,
+ traddr=self.tid.traddr,
+ trsvcid=self.tid.trsvcid if self.tid.trsvcid else None,
+ host_traddr=self.tid.host_traddr if self.tid.host_traddr else None,
+ host_iface=host_iface,
+ )
+ self._ctrl.discovery_ctrl_set(self._discovery_ctrl)
+
+ # Set the DHCHAP key on the controller
+ # NOTE that this will eventually have to
+ # change once we have support for AVE (TP8019)
+ ctrl_dhchap_key = self.tid.cfg.get('dhchap-ctrl-secret')
+ if ctrl_dhchap_key and self._nvme_options.dhchap_ctrlkey_supp:
+ has_dhchap_key = hasattr(self._ctrl, 'dhchap_key')
+ if not has_dhchap_key:
+ logging.warning(
+ '%s | %s - libnvme-%s does not allow setting the controller DHCHAP key. Please upgrade libnvme.',
+ self.id,
+ self.device,
+ defs.LIBNVME_VERSION,
+ )
+ else:
+ self._ctrl.dhchap_key = ctrl_dhchap_key
+
+ # Audit existing nvme devices. If we find a match, then
+ # we'll just borrow that device instead of creating a new one.
+ udev_obj = self._find_existing_connection()
+ if udev_obj is not None:
+ # A device already exists.
+ self._device = udev_obj.sys_name
+ logging.debug(
+ 'Controller._do_connect() - %s Found existing control device: %s', self.id, udev_obj.sys_name
+ )
+ self._connect_op = gutil.AsyncTask(
+ self._on_connect_success, self._on_connect_fail, self._ctrl.init, self._host, int(udev_obj.sys_number)
+ )
+ else:
+ cfg = self._get_cfg()
+ logging.debug(
+ 'Controller._do_connect() - %s Connecting to nvme control with cfg=%s', self.id, cfg
+ )
+ self._connect_op = gutil.AsyncTask(
+ self._on_connect_success, self._on_connect_fail, self._ctrl.connect, self._host, cfg
+ )
+
+ self._connect_op.run_async()
+
+ # --------------------------------------------------------------------------
+ def _on_connect_success(self, op_obj: gutil.AsyncTask, data):
+ '''@brief Function called when we successfully connect to the
+ Controller.
+ '''
+ op_obj.kill()
+ self._connect_op = None
+
+ if self._alive():
+ self._device = self._ctrl.name
+ logging.info('%s | %s - Connection established!', self.id, self.device)
+ self._connect_attempts = 0
+ self._udev.register_for_device_events(self._device, self._on_udev_notification)
+ else:
+ logging.debug(
+ 'Controller._on_connect_success() - %s | %s: Received event on dead object. data=%s',
+ self.id,
+ self.device,
+ data,
+ )
+
+ def _on_connect_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt): # pylint: disable=unused-argument
+ '''@brief Function called when we fail to connect to the Controller.'''
+ op_obj.kill()
+ self._connect_op = None
+ if self._alive():
+ if self._connect_attempts == 1:
+ # Do a fast re-try on the first failure.
+ self._retry_connect_tmr.set_timeout(self.FAST_CONNECT_RETRY_PERIOD_SEC)
+ elif self._connect_attempts == 2:
+ # If the fast connect re-try fails, then we can print a message to
+ # indicate the failure, and start a slow re-try period.
+ self._retry_connect_tmr.set_timeout(self.CONNECT_RETRY_PERIOD_SEC)
+ logging.error('%s Failed to connect to controller. %s %s', self.id, err.domain, err.message)
+
+ if self._should_try_to_reconnect():
+ logging.debug(
+ 'Controller._on_connect_fail() - %s %s. Retry in %s sec.',
+ self.id,
+ err,
+ self._retry_connect_tmr.get_timeout(),
+ )
+ self._retry_connect_tmr.start()
+ else:
+ logging.debug(
+ 'Controller._on_connect_fail() - %s Received event on dead object. %s %s',
+ self.id,
+ err.domain,
+ err.message,
+ )
+
+ def disconnect(self, disconnected_cb, keep_connection):
+ '''@brief Issue an asynchronous disconnect command to a Controller.
+ Once the async command has completed, the callback 'disconnected_cb'
+ will be invoked. If a controller is already disconnected, then the
+ callback will be added to the main loop's next idle slot to be executed
+ ASAP.
+
+ @param disconnected_cb: Callback to be called when disconnect has
+ completed. the callback must have this signature:
+ def cback(controller: Controller, success: bool)
+ @param keep_connection: Whether the underlying connection should remain
+ in the kernel.
+ '''
+ logging.debug(
+ 'Controller.disconnect() - %s | %s: keep_connection=%s', self.id, self.device, keep_connection
+ )
+ if self._ctrl and self._ctrl.connected() and not keep_connection:
+ logging.info('%s | %s - Disconnect initiated', self.id, self.device)
+ op = gutil.AsyncTask(self._on_disconn_success, self._on_disconn_fail, self._ctrl.disconnect)
+ op.run_async(disconnected_cb)
+ else:
+ # Defer callback to the next main loop's idle period. The callback
+ # cannot be called directly as the current Controller object is in the
+ # process of being disconnected and the callback will in fact delete
+ # the object. This would invariably lead to unpredictable outcome.
+ GLib.idle_add(disconnected_cb, self, True)
+
+ def _on_disconn_success(self, op_obj: gutil.AsyncTask, data, disconnected_cb): # pylint: disable=unused-argument
+ logging.debug('Controller._on_disconn_success() - %s | %s', self.id, self.device)
+ op_obj.kill()
+ # Defer callback to the next main loop's idle period. The callback
+ # cannot be called directly as the current Controller object is in the
+ # process of being disconnected and the callback will in fact delete
+ # the object. This would invariably lead to unpredictable outcome.
+ GLib.idle_add(disconnected_cb, self, True)
+
+ def _on_disconn_fail(
+ self, op_obj: gutil.AsyncTask, err, fail_cnt, disconnected_cb
+ ): # pylint: disable=unused-argument
+ logging.debug('Controller._on_disconn_fail() - %s | %s: %s', self.id, self.device, err)
+ op_obj.kill()
+ # Defer callback to the next main loop's idle period. The callback
+ # cannot be called directly as the current Controller object is in the
+ # process of being disconnected and the callback will in fact delete
+ # the object. This would invariably lead to unpredictable outcome.
+ GLib.idle_add(disconnected_cb, self, False)
+
+
+# ******************************************************************************
+class Dc(Controller):
+ '''@brief This object establishes a connection to one Discover Controller (DC).
+ It retrieves the discovery log pages and caches them.
+ It also monitors udev events associated with that DC and updates
+ the cached discovery log pages accordingly.
+ '''
+
+ GET_LOG_PAGE_RETRY_RERIOD_SEC = 20
+ REGISTRATION_RETRY_RERIOD_SEC = 5
+ GET_SUPPORTED_RETRY_RERIOD_SEC = 5
+
+ def __init__(self, staf, tid: trid.TID, log_pages=None, origin=None):
+ super().__init__(tid, staf, discovery_ctrl=True)
+ self._register_op = None
+ self._get_supported_op = None
+ self._get_log_op = None
+ self._origin = origin
+ self._log_pages = log_pages if log_pages else list() # Log pages cache
+
+ # For Avahi-discovered DCs that later become unresponsive, monitor how
+ # long the controller remains unresponsive and if it does not return for
+ # a configurable soak period (_ctrl_unresponsive_tmr), remove that
+ # controller. Only Avahi-discovered controllers need this timeout-based
+ # cleanup.
+ self._ctrl_unresponsive_time = None # The time at which connectivity was lost
+ self._ctrl_unresponsive_tmr = gutil.GTimer(0, self._serv.controller_unresponsive, self.tid)
+
+ def _release_resources(self):
+ logging.debug('Dc._release_resources() - %s | %s', self.id, self.device)
+ super()._release_resources()
+
+ if self._ctrl_unresponsive_tmr is not None:
+ self._ctrl_unresponsive_tmr.kill()
+
+ self._log_pages = list()
+ self._ctrl_unresponsive_tmr = None
+
+ def _kill_ops(self):
+ super()._kill_ops()
+ if self._get_log_op:
+ self._get_log_op.kill()
+ self._get_log_op = None
+ if self._register_op:
+ self._register_op.kill()
+ self._register_op = None
+ if self._get_supported_op:
+ self._get_supported_op.kill()
+ self._get_supported_op = None
+
+ def all_ops_completed(self) -> bool:
+ '''@brief Returns True if all operations have completed. False otherwise.'''
+ return (
+ super().all_ops_completed()
+ and (self._get_log_op is None or self._get_log_op.completed())
+ and (self._register_op is None or self._register_op.completed())
+ and (self._get_supported_op is None or self._get_supported_op.completed())
+ )
+
+ @property
+ def origin(self):
+ '''@brief Return how this controller came into existance. Was it
+ "discovered" through mDNS service discovery (TP8009), was it manually
+ "configured" in stafd.conf, or was it a "referral".
+ '''
+ return self._origin
+
+ @origin.setter
+ def origin(self, value):
+ '''@brief Set the origin of this controller.'''
+ if value in ('discovered', 'configured', 'referral'):
+ self._origin = value
+ self._handle_lost_controller()
+ else:
+ logging.error('%s | %s - Trying to set invalid origin to %s', self.id, self.device, value)
+
+ def reload_hdlr(self):
+ '''@brief This is called when a "reload" signal is received.'''
+ logging.debug('Dc.reload_hdlr() - %s | %s', self.id, self.device)
+
+ self._handle_lost_controller()
+ self._resync_with_controller()
+
+ def info(self) -> dict:
+ '''@brief Get the controller info for this object'''
+ timeout = conf.SvcConf().zeroconf_persistence_sec
+ unresponsive_time = (
+ time.asctime(self._ctrl_unresponsive_time) if self._ctrl_unresponsive_time is not None else '---'
+ )
+ info = super().info()
+ info['origin'] = self.origin
+ if self.origin == 'discovered':
+ # The code that handles "unresponsive" DCs only applies to
+ # discovered DCs. So, let's only print that info when it's relevant.
+ info['unresponsive timer'] = str(self._ctrl_unresponsive_tmr)
+ info['unresponsive timeout'] = f'{timeout} sec' if timeout >= 0 else 'forever'
+ info['unresponsive time'] = unresponsive_time
+ if self._get_log_op:
+ info['get log page operation'] = str(self._get_log_op.as_dict())
+ if self._register_op:
+ info['register operation'] = str(self._register_op.as_dict())
+ if self._get_supported_op:
+ info['get supported log page operation'] = str(self._get_supported_op.as_dict())
+ return info
+
+ def cancel(self):
+ '''@brief Used to cancel pending operations.'''
+ super().cancel()
+ if self._get_log_op:
+ self._get_log_op.cancel()
+ if self._register_op:
+ self._register_op.cancel()
+ if self._get_supported_op:
+ self._get_supported_op.cancel()
+
+ def log_pages(self) -> list:
+ '''@brief Get the cached log pages for this object'''
+ return self._log_pages
+
+ def referrals(self) -> list:
+ '''@brief Return the list of referrals'''
+ return [page for page in self._log_pages if page['subtype'] == 'referral']
+
+ def _is_ddc(self):
+ return self._ctrl and self._ctrl.dctype != 'cdc'
+
+ def _on_aen(self, aen: int):
+ if aen == DLP_CHANGED and self._get_log_op:
+ self._get_log_op.run_async()
+
+ def _handle_lost_controller(self):
+ if self.origin == 'discovered': # Only apply to mDNS-discovered DCs
+ if not self._serv.is_avahi_reported(self.tid) and not self.connected():
+ timeout = conf.SvcConf().zeroconf_persistence_sec
+ if timeout >= 0:
+ if self._ctrl_unresponsive_time is None:
+ self._ctrl_unresponsive_time = time.localtime()
+ self._ctrl_unresponsive_tmr.start(timeout)
+ logging.info(
+ '%s | %s - Controller is not responding. Will be removed by %s unless restored',
+ self.id,
+ self.device,
+ time.ctime(time.mktime(self._ctrl_unresponsive_time) + timeout),
+ )
+
+ return
+
+ logging.info(
+ '%s | %s - Controller not responding. Retrying...',
+ self.id,
+ self.device,
+ )
+
+ self._ctrl_unresponsive_time = None
+ self._ctrl_unresponsive_tmr.stop()
+ self._ctrl_unresponsive_tmr.set_timeout(0)
+
+ def is_unresponsive(self):
+ '''@brief For "discovered" DC, return True if DC is unresponsive,
+ False otherwise.
+ '''
+ return (
+ self.origin == 'discovered'
+ and not self._serv.is_avahi_reported(self.tid)
+ and not self.connected()
+ and self._ctrl_unresponsive_time is not None
+ and self._ctrl_unresponsive_tmr.time_remaining() <= 0
+ )
+
+ def _resync_with_controller(self):
+ '''Communicate with DC to resync the states'''
+ if self._register_op:
+ self._register_op.run_async()
+ elif self._get_supported_op:
+ self._get_supported_op.run_async()
+ elif self._get_log_op:
+ self._get_log_op.run_async()
+
+ def _on_nvme_event(self, nvme_event: str):
+ if nvme_event in ('connected', 'rediscover'):
+ # This event indicates that the kernel
+ # driver re-connected to the DC.
+ logging.debug(
+ 'Dc._on_nvme_event() - %s | %s: Received "%s" event',
+ self.id,
+ self.device,
+ nvme_event,
+ )
+ self._resync_with_controller()
+
+ def _find_existing_connection(self):
+ return self._udev.find_nvme_dc_device(self.tid)
+
+ def _post_registration_actions(self):
+ # Need to check that supported_log_pages() is available (introduced in libnvme 1.2)
+ has_supported_log_pages = hasattr(self._ctrl, 'supported_log_pages')
+ if not has_supported_log_pages:
+ logging.warning(
+ '%s | %s - libnvme-%s does not support "Get supported log pages". Please upgrade libnvme.',
+ self.id,
+ self.device,
+ defs.LIBNVME_VERSION,
+ )
+
+ if conf.SvcConf().pleo_enabled and self._is_ddc() and has_supported_log_pages:
+ self._get_supported_op = gutil.AsyncTask(
+ self._on_get_supported_success, self._on_get_supported_fail, self._ctrl.supported_log_pages
+ )
+ self._get_supported_op.run_async()
+ else:
+ self._get_log_op = gutil.AsyncTask(self._on_get_log_success, self._on_get_log_fail, self._ctrl.discover)
+ self._get_log_op.run_async()
+
+ # --------------------------------------------------------------------------
+ def _on_connect_success(self, op_obj: gutil.AsyncTask, data):
+ '''@brief Function called when we successfully connect to the
+ Discovery Controller.
+ '''
+ super()._on_connect_success(op_obj, data)
+
+ if self._alive():
+ self._ctrl_unresponsive_time = None
+ self._ctrl_unresponsive_tmr.stop()
+ self._ctrl_unresponsive_tmr.set_timeout(0)
+
+ if self._ctrl.is_registration_supported():
+ self._register_op = gutil.AsyncTask(
+ self._on_registration_success,
+ self._on_registration_fail,
+ self._ctrl.registration_ctlr,
+ nvme.NVMF_DIM_TAS_REGISTER,
+ )
+ self._register_op.run_async()
+ else:
+ self._post_registration_actions()
+
+ def _on_connect_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt):
+ '''@brief Function called when we fail to connect to the Controller.'''
+ super()._on_connect_fail(op_obj, err, fail_cnt)
+
+ if self._alive():
+ self._handle_lost_controller()
+
+ # --------------------------------------------------------------------------
+ def _on_registration_success(self, op_obj: gutil.AsyncTask, data): # pylint: disable=unused-argument
+ '''@brief Function called when we successfully register with the
+ Discovery Controller. See self._register_op object
+ for details.
+
+ NOTE: The name _on_registration_success() may be misleading. "success"
+ refers to the fact that a successful exchange was made with the DC.
+ It doesn't mean that the registration itself succeeded.
+ '''
+ if self._alive():
+ if data is not None:
+ logging.warning('%s | %s - Registration error. %s.', self.id, self.device, data)
+ else:
+ logging.debug('Dc._on_registration_success() - %s | %s', self.id, self.device)
+
+ self._post_registration_actions()
+ else:
+ logging.debug(
+ 'Dc._on_registration_success() - %s | %s: Received event on dead object.', self.id, self.device
+ )
+
+ def _on_registration_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt):
+ '''@brief Function called when we fail to register with the
+ Discovery Controller. See self._register_op object
+ for details.
+ '''
+ if self._alive():
+ logging.debug(
+ 'Dc._on_registration_fail() - %s | %s: %s. Retry in %s sec',
+ self.id,
+ self.device,
+ err,
+ Dc.REGISTRATION_RETRY_RERIOD_SEC,
+ )
+ if fail_cnt == 1: # Throttle the logs. Only print the first time the command fails
+ logging.error('%s | %s - Failed to register with Discovery Controller. %s', self.id, self.device, err)
+ op_obj.retry(Dc.REGISTRATION_RETRY_RERIOD_SEC)
+ else:
+ logging.debug(
+ 'Dc._on_registration_fail() - %s | %s: Received event on dead object. %s',
+ self.id,
+ self.device,
+ err,
+ )
+ op_obj.kill()
+
+ # --------------------------------------------------------------------------
+ def _on_get_supported_success(self, op_obj: gutil.AsyncTask, data): # pylint: disable=unused-argument
+ '''@brief Function called when we successfully retrieved the supported
+ log pages from the Discovery Controller. See self._get_supported_op object
+ for details.
+
+ NOTE: The name _on_get_supported_success() may be misleading. "success"
+ refers to the fact that a successful exchange was made with the DC.
+ It doesn't mean that the Get Supported Log Page itself succeeded.
+ '''
+ if self._alive():
+ try:
+ dlp_supp_opts = data[nvme.NVME_LOG_LID_DISCOVER] >> 16
+ except (TypeError, IndexError):
+ dlp_supp_opts = 0
+
+ logging.debug(
+ 'Dc._on_get_supported_success() - %s | %s: supported options = 0x%04X = %s',
+ self.id,
+ self.device,
+ dlp_supp_opts,
+ dlp_supp_opts_as_string(dlp_supp_opts),
+ )
+
+ if 'lsp' in inspect.signature(self._ctrl.discover).parameters:
+ lsp = nvme.NVMF_LOG_DISC_LSP_PLEO if dlp_supp_opts & nvme.NVMF_LOG_DISC_LID_PLEOS else 0
+ self._get_log_op = gutil.AsyncTask(
+ self._on_get_log_success, self._on_get_log_fail, self._ctrl.discover, lsp
+ )
+ else:
+ logging.warning(
+ '%s | %s - libnvme-%s does not support setting PLEO bit. Please upgrade.',
+ self.id,
+ self.device,
+ defs.LIBNVME_VERSION,
+ )
+ self._get_log_op = gutil.AsyncTask(self._on_get_log_success, self._on_get_log_fail, self._ctrl.discover)
+ self._get_log_op.run_async()
+ else:
+ logging.debug(
+ 'Dc._on_get_supported_success() - %s | %s: Received event on dead object.', self.id, self.device
+ )
+
+ def _on_get_supported_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt):
+ '''@brief Function called when we fail to retrieve the supported log
+ page from the Discovery Controller. See self._get_supported_op object
+ for details.
+ '''
+ if self._alive():
+ logging.debug(
+ 'Dc._on_get_supported_fail() - %s | %s: %s. Retry in %s sec',
+ self.id,
+ self.device,
+ err,
+ Dc.GET_SUPPORTED_RETRY_RERIOD_SEC,
+ )
+ if fail_cnt == 1: # Throttle the logs. Only print the first time the command fails
+ logging.error(
+ '%s | %s - Failed to Get supported log pages from Discovery Controller. %s',
+ self.id,
+ self.device,
+ err,
+ )
+ op_obj.retry(Dc.GET_SUPPORTED_RETRY_RERIOD_SEC)
+ else:
+ logging.debug(
+ 'Dc._on_get_supported_fail() - %s | %s: Received event on dead object. %s',
+ self.id,
+ self.device,
+ err,
+ )
+ op_obj.kill()
+
+ # --------------------------------------------------------------------------
+ def _on_get_log_success(self, op_obj: gutil.AsyncTask, data): # pylint: disable=unused-argument
+ '''@brief Function called when we successfully retrieve the log pages
+ from the Discovery Controller. See self._get_log_op object
+ for details.
+ '''
+ if self._alive():
+ # Note that for historical reasons too long to explain, the CDC may
+ # return invalid addresses ('0.0.0.0', '::', or ''). Those need to
+ # be filtered out.
+ referrals_before = self.referrals()
+ self._log_pages = (
+ [
+ {k.strip(): str(v).strip() for k, v in dictionary.items()}
+ for dictionary in data
+ if dictionary.get('traddr', '').strip() not in ('0.0.0.0', '::', '')
+ ]
+ if data
+ else list()
+ )
+ logging.info(
+ '%s | %s - Received discovery log pages (num records=%s).', self.id, self.device, len(self._log_pages)
+ )
+ referrals_after = self.referrals()
+ self._serv.log_pages_changed(self, self.device)
+ if referrals_after != referrals_before:
+ logging.debug(
+ 'Dc._on_get_log_success() - %s | %s: Referrals before = %s',
+ self.id,
+ self.device,
+ referrals_before,
+ )
+ logging.debug(
+ 'Dc._on_get_log_success() - %s | %s: Referrals after = %s',
+ self.id,
+ self.device,
+ referrals_after,
+ )
+ self._serv.referrals_changed()
+ else:
+ logging.debug(
+ 'Dc._on_get_log_success() - %s | %s: Received event on dead object.', self.id, self.device
+ )
+
+ def _on_get_log_fail(self, op_obj: gutil.AsyncTask, err, fail_cnt):
+ '''@brief Function called when we fail to retrieve the log pages
+ from the Discovery Controller. See self._get_log_op object
+ for details.
+ '''
+ if self._alive():
+ logging.debug(
+ 'Dc._on_get_log_fail() - %s | %s: %s. Retry in %s sec',
+ self.id,
+ self.device,
+ err,
+ Dc.GET_LOG_PAGE_RETRY_RERIOD_SEC,
+ )
+ if fail_cnt == 1: # Throttle the logs. Only print the first time the command fails
+ logging.error('%s | %s - Failed to retrieve log pages. %s', self.id, self.device, err)
+ op_obj.retry(Dc.GET_LOG_PAGE_RETRY_RERIOD_SEC)
+ else:
+ logging.debug(
+ 'Dc._on_get_log_fail() - %s | %s: Received event on dead object. %s',
+ self.id,
+ self.device,
+ err,
+ )
+ op_obj.kill()
+
+
+# ******************************************************************************
+class Ioc(Controller):
+ '''@brief This object establishes a connection to one I/O Controller.'''
+
+ def __init__(self, stac, tid: trid.TID):
+ self._dlpe = None
+ super().__init__(tid, stac)
+
+ def _find_existing_connection(self):
+ return self._udev.find_nvme_ioc_device(self.tid)
+
+ def _on_aen(self, aen: int):
+ pass
+
+ def _on_nvme_event(self, nvme_event):
+ pass
+
+ def reload_hdlr(self):
+ '''@brief This is called when a "reload" signal is received.'''
+ if not self.connected() and self._retry_connect_tmr.time_remaining() == 0:
+ self._try_to_connect_deferred.schedule()
+
+ @property
+ def eflags(self):
+ '''@brief Return the eflag field of the DLPE'''
+ return get_eflags(self._dlpe)
+
+ @property
+ def ncc(self):
+ '''@brief Return Not Connected to CDC status'''
+ return get_ncc(self.eflags)
+
+ def details(self) -> dict:
+ '''@brief return detailed debug info about this controller'''
+ details = super().details()
+ details['dlpe'] = str(self._dlpe)
+ details['dlpe.eflags.ncc'] = str(self.ncc)
+ return details
+
+ def update_dlpe(self, dlpe):
+ '''@brief This method is called when a new DLPE associated
+ with this controller is received.'''
+ new_ncc = get_ncc(get_eflags(dlpe))
+ old_ncc = self.ncc
+ self._dlpe = dlpe
+
+ if old_ncc and not new_ncc: # NCC bit cleared?
+ if not self.connected():
+ self._connect_attempts = 0
+ self._try_to_connect_deferred.schedule()
+
+ def _should_try_to_reconnect(self):
+ '''@brief This is used to determine when it's time to stop trying toi connect'''
+ max_connect_attempts = conf.SvcConf().connect_attempts_on_ncc if self.ncc else 0
+ return max_connect_attempts == 0 or self._connect_attempts < max_connect_attempts
diff --git a/staslib/defs.py b/staslib/defs.py
new file mode 100644
index 0000000..5a50371
--- /dev/null
+++ b/staslib/defs.py
@@ -0,0 +1,51 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+
+''' @brief This file gets automagically configured by meson at build time.
+'''
+import os
+import sys
+import shutil
+import platform
+from staslib.version import KernelVersion
+
+try:
+ import libnvme
+
+ LIBNVME_VERSION = libnvme.__version__
+except (AttributeError, ModuleNotFoundError):
+ LIBNVME_VERSION = '?.?'
+
+VERSION = '@VERSION@'
+LICENSE = '@LICENSE@'
+
+STACD_DBUS_NAME = '@STACD_DBUS_NAME@'
+STACD_DBUS_PATH = '@STACD_DBUS_PATH@'
+
+STAFD_DBUS_NAME = '@STAFD_DBUS_NAME@'
+STAFD_DBUS_PATH = '@STAFD_DBUS_PATH@'
+
+KERNEL_VERSION = KernelVersion(platform.release())
+KERNEL_IFACE_MIN_VERSION = KernelVersion('5.14')
+KERNEL_TP8013_MIN_VERSION = KernelVersion('5.16')
+KERNEL_HOSTKEY_MIN_VERSION = KernelVersion('5.20')
+KERNEL_CTRLKEY_MIN_VERSION = KernelVersion('5.20')
+
+WELL_KNOWN_DISC_NQN = 'nqn.2014-08.org.nvmexpress.discovery'
+
+PROG_NAME = os.path.basename(sys.argv[0])
+
+NVME_HOSTID = '/etc/nvme/hostid'
+NVME_HOSTNQN = '/etc/nvme/hostnqn'
+NVME_HOSTKEY = '/etc/nvme/hostkey'
+
+SYS_CONF_FILE = '/etc/stas/sys.conf'
+STAFD_CONF_FILE = '/etc/stas/stafd.conf'
+STACD_CONF_FILE = '/etc/stas/stacd.conf'
+
+SYSTEMCTL = shutil.which('systemctl')
diff --git a/staslib/gutil.py b/staslib/gutil.py
new file mode 100644
index 0000000..c40b80e
--- /dev/null
+++ b/staslib/gutil.py
@@ -0,0 +1,418 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''This module provides utility functions/classes to provide easier to use
+access to GLib/Gio/Gobject resources.
+'''
+
+import logging
+from gi.repository import Gio, GLib, GObject
+from staslib import conf, iputil, trid
+
+
+# ******************************************************************************
+class GTimer:
+ '''@brief Convenience class to wrap GLib timers'''
+
+ def __init__(
+ self, interval_sec: float = 0, user_cback=lambda: GLib.SOURCE_REMOVE, *user_data, priority=GLib.PRIORITY_DEFAULT
+ ): # pylint: disable=keyword-arg-before-vararg
+ self._source = None
+ self._interval_sec = float(interval_sec)
+ self._user_cback = user_cback
+ self._user_data = user_data
+ self._priority = priority if priority is not None else GLib.PRIORITY_DEFAULT
+
+ def _release_resources(self):
+ self.stop()
+ self._user_cback = None
+ self._user_data = None
+
+ def kill(self):
+ '''@brief Used to release all resources associated with a timer.'''
+ self._release_resources()
+
+ def __str__(self):
+ if self._source is not None:
+ return f'{self._interval_sec}s [{self.time_remaining()}s]'
+
+ return f'{self._interval_sec}s [off]'
+
+ def _callback(self, *_):
+ retval = self._user_cback(*self._user_data)
+ if retval == GLib.SOURCE_REMOVE:
+ self._source = None
+ return retval
+
+ def stop(self):
+ '''@brief Stop timer'''
+ if self._source is not None:
+ self._source.destroy()
+ self._source = None
+
+ def start(self, new_interval_sec: float = -1.0):
+ '''@brief Start (or restart) timer'''
+ if new_interval_sec >= 0:
+ self._interval_sec = float(new_interval_sec)
+
+ if self._source is not None:
+ self._source.set_ready_time(
+ self._source.get_time() + (self._interval_sec * 1000000)
+ ) # ready time is in micro-seconds (monotonic time)
+ else:
+ if self._interval_sec.is_integer():
+ self._source = GLib.timeout_source_new_seconds(int(self._interval_sec)) # seconds resolution
+ else:
+ self._source = GLib.timeout_source_new(self._interval_sec * 1000.0) # mili-seconds resolution
+
+ self._source.set_priority(self._priority)
+ self._source.set_callback(self._callback)
+ self._source.attach()
+
+ def clear(self):
+ '''@brief Make timer expire now. The callback function
+ will be invoked immediately by the main loop.
+ '''
+ if self._source is not None:
+ self._source.set_ready_time(0) # Expire now!
+
+ def set_callback(self, user_cback, *user_data):
+ '''@brief set the callback function to invoke when timer expires'''
+ self._user_cback = user_cback
+ self._user_data = user_data
+
+ def set_timeout(self, new_interval_sec: float):
+ '''@brief set the timer's duration'''
+ if new_interval_sec >= 0:
+ self._interval_sec = float(new_interval_sec)
+
+ def get_timeout(self):
+ '''@brief get the timer's duration'''
+ return self._interval_sec
+
+ def time_remaining(self) -> float:
+ '''@brief Get how much time remains on a timer before it fires.'''
+ if self._source is not None:
+ delta_us = self._source.get_ready_time() - self._source.get_time() # monotonic time in micro-seconds
+ if delta_us > 0:
+ return delta_us / 1000000.0
+
+ return 0
+
+
+# ******************************************************************************
+class NameResolver: # pylint: disable=too-few-public-methods
+ '''@brief DNS resolver to convert host names to IP addresses.'''
+
+ def __init__(self):
+ self._resolver = Gio.Resolver.get_default()
+
+ def resolve_ctrl_async(self, cancellable, controllers_in: list, callback):
+ '''@brief The traddr fields may specify a hostname instead of an IP
+ address. We need to resolve all the host names to addresses.
+ Resolving hostnames may take a while as a DNS server may need
+ to be contacted. For that reason, we're using async APIs with
+ callbacks to resolve all the hostnames.
+
+ The callback @callback will be called once all hostnames have
+ been resolved.
+
+ @param controllers: List of trid.TID
+ '''
+ pending_resolution_count = 0
+ controllers_out = []
+ service_conf = conf.SvcConf()
+
+ def addr_resolved(resolver, result, controller):
+ try:
+ addresses = resolver.lookup_by_name_finish(result) # List of Gio.InetAddress objects
+
+ except GLib.GError as err:
+ # We don't need to report "cancellation" errors.
+ if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
+ # pylint: disable=no-member
+ logging.debug('NameResolver.resolve_ctrl_async() - %s %s', err.message, controller)
+ else:
+ logging.error('%s', err.message) # pylint: disable=no-member
+
+ # if err.matches(Gio.resolver_error_quark(), Gio.ResolverError.TEMPORARY_FAILURE):
+ # elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.NOT_FOUND):
+ # elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.INTERNAL):
+
+ else:
+ traddr = None
+
+ # If multiple addresses are returned (which is often the case),
+ # prefer IPv4 addresses over IPv6.
+ if 4 in service_conf.ip_family:
+ for address in addresses:
+ # There may be multiple IPv4 addresses. Pick 1st one.
+ if address.get_family() == Gio.SocketFamily.IPV4:
+ traddr = address.to_string()
+ break
+
+ if traddr is None and 6 in service_conf.ip_family:
+ for address in addresses:
+ # There may be multiple IPv6 addresses. Pick 1st one.
+ if address.get_family() == Gio.SocketFamily.IPV6:
+ traddr = address.to_string()
+ break
+
+ if traddr is not None:
+ logging.debug(
+ 'NameResolver.resolve_ctrl_async() - resolved \'%s\' -> %s', controller.traddr, traddr
+ )
+ cid = controller.as_dict()
+ cid['traddr'] = traddr
+ nonlocal controllers_out
+ controllers_out.append(trid.TID(cid))
+
+ # Invoke callback after all hostnames have been resolved
+ nonlocal pending_resolution_count
+ pending_resolution_count -= 1
+ if pending_resolution_count == 0:
+ callback(controllers_out)
+
+ for controller in controllers_in:
+ if controller.transport in ('tcp', 'rdma'):
+ hostname_or_addr = controller.traddr
+ if not hostname_or_addr:
+ logging.error('Invalid traddr: %s', controller)
+ else:
+ # Try to convert to an ipaddress object. If this
+ # succeeds, then we don't need to call the resolver.
+ ip = iputil.get_ipaddress_obj(hostname_or_addr)
+ if ip is None:
+ logging.debug('NameResolver.resolve_ctrl_async() - resolving \'%s\'', hostname_or_addr)
+ pending_resolution_count += 1
+ self._resolver.lookup_by_name_async(hostname_or_addr, cancellable, addr_resolved, controller)
+ elif ip.version in service_conf.ip_family:
+ controllers_out.append(controller)
+ else:
+ logging.warning(
+ 'Excluding configured IP address %s based on "ip-family" setting', hostname_or_addr
+ )
+ else:
+ controllers_out.append(controller)
+
+ if pending_resolution_count == 0: # No names are pending asynchronous resolution
+ callback(controllers_out)
+
+
+# ******************************************************************************
+class _TaskRunner(GObject.Object):
+ '''@brief This class allows running methods asynchronously in a thread.'''
+
+ def __init__(self, user_function, *user_args):
+ '''@param user_function: function to run inside a thread
+ @param user_args: arguments passed to @user_function
+ '''
+ super().__init__()
+ self._user_function = user_function
+ self._user_args = user_args
+
+ def communicate(self, cancellable, cb_function, *cb_args):
+ '''@param cancellable: A Gio.Cancellable object that can be used to
+ cancel an in-flight async command.
+ @param cb_function: User callback function to call when the async
+ command has completed. The callback function
+ will be passed these arguments:
+
+ (runner, result, *cb_args)
+
+ Where:
+ runner: This _TaskRunner object instance
+ result: A GObject.Object instance that contains the result
+ cb_args: The cb_args arguments passed to communicate()
+
+ @param cb_args: User arguments to pass to @cb_function
+ '''
+
+ def in_thread_exec(task, self, task_data, cancellable): # pylint: disable=unused-argument
+ if task.return_error_if_cancelled():
+ return # Bail out if task has been cancelled
+
+ try:
+ value = GObject.Object()
+ value.result = self._user_function(*self._user_args)
+ task.return_value(value)
+ except Exception as ex: # pylint: disable=broad-except
+ task.return_error(GLib.Error(message=str(ex), domain=type(ex).__name__))
+
+ task = Gio.Task.new(self, cancellable, cb_function, *cb_args)
+ task.set_return_on_cancel(False)
+ task.run_in_thread(in_thread_exec)
+ return task
+
+ def communicate_finish(self, result): # pylint: disable=no-self-use
+ '''@brief Use this function in your callback (see @cb_function) to
+ extract data from the result object.
+
+ @return On success (True, data, None),
+ On failure (False, None, err: GLib.Error)
+ '''
+ try:
+ success, value = result.propagate_value()
+ return success, value.result, None
+ except GLib.Error as err:
+ return False, None, err
+
+
+# ******************************************************************************
+class AsyncTask: # pylint: disable=too-many-instance-attributes
+ '''Object used to manage an asynchronous GLib operation. The operation
+ can be cancelled or retried.
+ '''
+
+ def __init__(self, on_success_callback, on_failure_callback, operation, *op_args):
+ '''@param on_success_callback: Callback method invoked when @operation completes successfully
+ @param on_failure_callback: Callback method invoked when @operation fails
+ @param operation: Operation (i.e. a function) to execute asynchronously
+ @param op_args: Arguments passed to operation
+ '''
+ self._cancellable = Gio.Cancellable()
+ self._operation = operation
+ self._op_args = op_args
+ self._success_cb = on_success_callback
+ self._fail_cb = on_failure_callback
+ self._retry_tmr = None
+ self._errmsg = None
+ self._task = None
+ self._fail_cnt = 0
+
+ def _release_resources(self):
+ if self._alive():
+ self._cancellable.cancel()
+
+ if self._retry_tmr is not None:
+ self._retry_tmr.kill()
+
+ self._operation = None
+ self._op_args = None
+ self._success_cb = None
+ self._fail_cb = None
+ self._retry_tmr = None
+ self._errmsg = None
+ self._task = None
+ self._fail_cnt = None
+ self._cancellable = None
+
+ def __str__(self):
+ return str(self.as_dict())
+
+ def as_dict(self):
+ '''Return object members as a dictionary'''
+ info = {
+ 'fail count': self._fail_cnt,
+ 'completed': self._task.get_completed(),
+ 'alive': self._alive(),
+ }
+
+ if self._retry_tmr:
+ info['retry timer'] = str(self._retry_tmr)
+
+ if self._errmsg:
+ info['error'] = self._errmsg
+
+ return info
+
+ def _alive(self):
+ return self._cancellable and not self._cancellable.is_cancelled()
+
+ def completed(self):
+ '''@brief Returns True if the task has completed, False otherwise.'''
+ return self._task is not None and self._task.get_completed()
+
+ def cancel(self):
+ '''@brief cancel async operation'''
+ if self._alive():
+ self._cancellable.cancel()
+
+ def kill(self):
+ '''@brief kill and clean up this object'''
+ self._release_resources()
+
+ def run_async(self, *args):
+ '''@brief
+ Method used to initiate an asynchronous operation with the
+ Controller. When the operation completes (or fails) the
+ callback method @_on_operation_complete() will be invoked.
+ '''
+ runner = _TaskRunner(self._operation, *self._op_args)
+ self._task = runner.communicate(self._cancellable, self._on_operation_complete, *args)
+
+ def retry(self, interval_sec, *args):
+ '''@brief Tell this object that the async operation is to be retried
+ in @interval_sec seconds.
+
+ '''
+ if self._retry_tmr is None:
+ self._retry_tmr = GTimer()
+ self._retry_tmr.set_callback(self._on_retry_timeout, *args)
+ self._retry_tmr.start(interval_sec)
+
+ def _on_retry_timeout(self, *args):
+ '''@brief
+ When an operation fails, the application has the option to
+ retry at a later time by calling the retry() method. The
+ retry() method starts a timer at the end of which the operation
+ will be executed again. This is the method that is called when
+ the timer expires.
+ '''
+ if self._alive():
+ self.run_async(*args)
+ return GLib.SOURCE_REMOVE
+
+ def _on_operation_complete(self, runner, result, *args):
+ '''@brief
+ This callback method is invoked when the operation with the
+ Controller has completed (be it successful or not).
+ '''
+ # The operation might have been cancelled.
+ # Only proceed if it hasn't been cancelled.
+ if self._operation is None or not self._alive():
+ return
+
+ success, data, err = runner.communicate_finish(result)
+
+ if success:
+ self._errmsg = None
+ self._fail_cnt = 0
+ self._success_cb(self, data, *args)
+ else:
+ self._errmsg = str(err)
+ self._fail_cnt += 1
+ self._fail_cb(self, err, self._fail_cnt, *args)
+
+
+# ******************************************************************************
+class Deferred:
+ '''Implement a deferred function call. A deferred is a function that gets
+ added to the main loop to be executed during the next idle slot.'''
+
+ def __init__(self, func, *user_data):
+ self._source = None
+ self._func = func
+ self._user_data = user_data
+
+ def schedule(self):
+ '''Schedule the function to be called by the main loop. If the
+ function is already scheduled, then do nothing'''
+ if not self.is_scheduled():
+ srce_id = GLib.idle_add(self._func, *self._user_data)
+ self._source = GLib.main_context_default().find_source_by_id(srce_id)
+
+ def is_scheduled(self):
+ '''Check if deferred is currently schedules to run'''
+ return self._source and not self._source.is_destroyed()
+
+ def cancel(self):
+ '''Remove deferred from main loop'''
+ if self.is_scheduled():
+ self._source.destroy()
+ self._source = None
diff --git a/staslib/iputil.py b/staslib/iputil.py
new file mode 100644
index 0000000..9199a49
--- /dev/null
+++ b/staslib/iputil.py
@@ -0,0 +1,169 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+
+'''A collection of IP address and network interface utilities'''
+
+import socket
+import logging
+import ipaddress
+from staslib import conf
+
+RTM_NEWADDR = 20
+RTM_GETADDR = 22
+NLM_F_REQUEST = 0x01
+NLM_F_ROOT = 0x100
+NLMSG_DONE = 3
+IFLA_ADDRESS = 1
+NLMSGHDR_SZ = 16
+IFADDRMSG_SZ = 8
+RTATTR_SZ = 4
+
+# Netlink request (Get address command)
+GETADDRCMD = (
+ # BEGIN: struct nlmsghdr
+ b'\0' * 4 # nlmsg_len (placeholder - actual length calculated below)
+ + (RTM_GETADDR).to_bytes(2, byteorder='little', signed=False) # nlmsg_type
+ + (NLM_F_REQUEST | NLM_F_ROOT).to_bytes(2, byteorder='little', signed=False) # nlmsg_flags
+ + b'\0' * 2 # nlmsg_seq
+ + b'\0' * 2 # nlmsg_pid
+ # END: struct nlmsghdr
+ + b'\0' * 8 # struct ifaddrmsg
+)
+GETADDRCMD = len(GETADDRCMD).to_bytes(4, byteorder='little') + GETADDRCMD[4:] # nlmsg_len
+
+
+# ******************************************************************************
+def get_ipaddress_obj(ipaddr):
+ '''@brief Return a IPv4Address or IPv6Address depending on whether @ipaddr
+ is a valid IPv4 or IPv6 address. Return None otherwise.'''
+ try:
+ ip = ipaddress.ip_address(ipaddr)
+ except ValueError:
+ return None
+
+ return ip
+
+
+# ******************************************************************************
+def _data_matches_ip(data_family, data, ip):
+ if data_family == socket.AF_INET:
+ try:
+ other_ip = ipaddress.IPv4Address(data)
+ except ValueError:
+ return False
+ if ip.version == 6:
+ ip = ip.ipv4_mapped
+ elif data_family == socket.AF_INET6:
+ try:
+ other_ip = ipaddress.IPv6Address(data)
+ except ValueError:
+ return False
+ if ip.version == 4:
+ other_ip = other_ip.ipv4_mapped
+ else:
+ return False
+
+ return other_ip == ip
+
+
+# ******************************************************************************
+def iface_of(src_addr):
+ '''@brief Find the interface that has src_addr as one of its assigned IP addresses.
+ @param src_addr: The IP address to match
+ @type src_addr: Instance of ipaddress.IPv4Address or ipaddress.IPv6Address
+ '''
+ with socket.socket(socket.AF_NETLINK, socket.SOCK_RAW) as sock:
+ sock.sendall(GETADDRCMD)
+ nlmsg = sock.recv(8192)
+ nlmsg_idx = 0
+ while True:
+ if nlmsg_idx >= len(nlmsg):
+ nlmsg += sock.recv(8192)
+
+ nlmsg_type = int.from_bytes(nlmsg[nlmsg_idx + 4 : nlmsg_idx + 6], byteorder='little', signed=False)
+ if nlmsg_type == NLMSG_DONE:
+ break
+
+ if nlmsg_type != RTM_NEWADDR:
+ break
+
+ nlmsg_len = int.from_bytes(nlmsg[nlmsg_idx : nlmsg_idx + 4], byteorder='little', signed=False)
+ if nlmsg_len % 4: # Is msg length not a multiple of 4?
+ break
+
+ ifaddrmsg_indx = nlmsg_idx + NLMSGHDR_SZ
+ ifa_family = nlmsg[ifaddrmsg_indx]
+ ifa_index = int.from_bytes(nlmsg[ifaddrmsg_indx + 4 : ifaddrmsg_indx + 8], byteorder='little', signed=False)
+
+ rtattr_indx = ifaddrmsg_indx + IFADDRMSG_SZ
+ while rtattr_indx < (nlmsg_idx + nlmsg_len):
+ rta_len = int.from_bytes(nlmsg[rtattr_indx : rtattr_indx + 2], byteorder='little', signed=False)
+ rta_type = int.from_bytes(nlmsg[rtattr_indx + 2 : rtattr_indx + 4], byteorder='little', signed=False)
+ if rta_type == IFLA_ADDRESS:
+ data = nlmsg[rtattr_indx + RTATTR_SZ : rtattr_indx + rta_len]
+ if _data_matches_ip(ifa_family, data, src_addr):
+ return socket.if_indextoname(ifa_index)
+
+ rta_len = (rta_len + 3) & ~3 # Round up to multiple of 4
+ rtattr_indx += rta_len # Move to next rtattr
+
+ nlmsg_idx += nlmsg_len # Move to next Netlink message
+
+ return ''
+
+
+# ******************************************************************************
+def get_interface(src_addr):
+ '''Get interface for given source address
+ @param src_addr: The source address
+ @type src_addr: str
+ '''
+ if not src_addr:
+ return ''
+
+ src_addr = src_addr.split('%')[0] # remove scope-id (if any)
+ src_addr = get_ipaddress_obj(src_addr)
+ return '' if src_addr is None else iface_of(src_addr)
+
+
+# ******************************************************************************
+def remove_invalid_addresses(controllers: list):
+ '''@brief Remove controllers with invalid addresses from the list of controllers.
+ @param controllers: List of TIDs
+ '''
+ service_conf = conf.SvcConf()
+ valid_controllers = list()
+ for controller in controllers:
+ if controller.transport in ('tcp', 'rdma'):
+ # Let's make sure that traddr is
+ # syntactically a valid IPv4 or IPv6 address.
+ ip = get_ipaddress_obj(controller.traddr)
+ if ip is None:
+ logging.warning('%s IP address is not valid', controller)
+ continue
+
+ # Let's make sure the address family is enabled.
+ if ip.version not in service_conf.ip_family:
+ logging.debug(
+ '%s ignored because IPv%s is disabled in %s',
+ controller,
+ ip.version,
+ service_conf.conf_file,
+ )
+ continue
+
+ valid_controllers.append(controller)
+
+ elif controller.transport in ('fc', 'loop'):
+ # At some point, need to validate FC addresses as well...
+ valid_controllers.append(controller)
+
+ else:
+ logging.warning('Invalid transport %s', controller.transport)
+
+ return valid_controllers
diff --git a/staslib/log.py b/staslib/log.py
new file mode 100644
index 0000000..9622e98
--- /dev/null
+++ b/staslib/log.py
@@ -0,0 +1,53 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''nvme-stas logging module'''
+
+import sys
+import logging
+from staslib import defs
+
+
+def init(syslog: bool):
+ '''Init log module
+ @param syslog: True to send messages to the syslog,
+ False to send messages to stdout.
+ '''
+ log = logging.getLogger()
+ log.propagate = False
+
+ if syslog:
+ try:
+ # Try journal logger first
+ import systemd.journal # pylint: disable=import-outside-toplevel
+
+ handler = systemd.journal.JournalHandler(SYSLOG_IDENTIFIER=defs.PROG_NAME)
+ except ModuleNotFoundError:
+ # Go back to standard syslog handler
+ from logging.handlers import SysLogHandler # pylint: disable=import-outside-toplevel
+
+ handler = SysLogHandler(address="/dev/log")
+ handler.setFormatter(logging.Formatter(f'{defs.PROG_NAME}: %(message)s'))
+ else:
+ # Log to stdout
+ handler = logging.StreamHandler(stream=sys.stdout)
+
+ log.addHandler(handler)
+ log.setLevel(logging.INFO if syslog else logging.DEBUG)
+
+
+def level() -> str:
+ '''@brief return current log level'''
+ logger = logging.getLogger()
+ return str(logging.getLevelName(logger.getEffectiveLevel()))
+
+
+def set_level_from_tron(tron):
+ '''Set log level based on TRON'''
+ logger = logging.getLogger()
+ logger.setLevel(logging.DEBUG if tron else logging.INFO)
diff --git a/staslib/meson.build b/staslib/meson.build
new file mode 100644
index 0000000..eb006f0
--- /dev/null
+++ b/staslib/meson.build
@@ -0,0 +1,60 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+
+files_to_configure = [ 'defs.py', '__init__.py', 'stafd.idl', 'stacd.idl' ]
+configured_files = []
+foreach file : files_to_configure
+ configured_files += configure_file(
+ input: file,
+ output: file,
+ configuration: conf
+ )
+endforeach
+
+files_to_copy = [
+ 'avahi.py',
+ 'conf.py',
+ 'ctrl.py',
+ 'gutil.py',
+ 'iputil.py',
+ 'log.py',
+ 'service.py',
+ 'singleton.py',
+ 'stas.py',
+ 'timeparse.py',
+ 'trid.py',
+ 'udev.py',
+ 'version.py'
+]
+copied_files = []
+foreach file : files_to_copy
+ copied_files += configure_file(
+ input: file,
+ output: file,
+ copy: true,
+ )
+endforeach
+
+files_to_install = copied_files + configured_files
+python3.install_sources(
+ files_to_install,
+ pure: true,
+ subdir: 'staslib',
+)
+
+#===============================================================================
+# Make a list of modules to lint
+skip = ['stafd.idl', 'stacd.idl']
+foreach file: files_to_install
+ fname = fs.name('@0@'.format(file))
+ if fname not in skip
+ modules_to_lint += file
+ endif
+endforeach
+
diff --git a/staslib/service.py b/staslib/service.py
new file mode 100644
index 0000000..ce4769d
--- /dev/null
+++ b/staslib/service.py
@@ -0,0 +1,878 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''This module defines the base Service object from
+which the Staf and the Stac objects are derived.'''
+
+import json
+import logging
+import pathlib
+import subprocess
+from itertools import filterfalse
+import dasbus.error
+import dasbus.client.observer
+import dasbus.client.proxy
+
+from gi.repository import GLib
+from systemd.daemon import notify as sd_notify
+from staslib import avahi, conf, ctrl, defs, gutil, iputil, stas, timeparse, trid, udev
+
+
+# ******************************************************************************
+class CtrlTerminator:
+ '''The Controller Terminator is used to gracefully disconnect from
+ controllers. All communications with controllers is handled by the kernel.
+ Once we make a request to the kernel to perform an operation (e.g. connect),
+ we have to wait for it to complete before requesting another operation. This
+ is particularly important when we want to disconnect from a controller while
+ there are pending operations, especially a pending connect.
+
+ The "connect" operation is especially unpredictable because all connect
+ requests are made through the blocking interface "/dev/nvme-fabrics". This
+ means that once a "connect" operation has been submitted, and depending on
+ how many connect requests are made concurrently, it can take several seconds
+ for a connect to be processed by the kernel.
+
+ While connect or other operations are being performed, it is possible
+ that a disconnect may be requested (e.g. someone or something changes the
+ configuration to remove a controller). Because it is not possible to
+ terminate a pending operation request, we have to wait for it to complete
+ before we can issue a disconnect. Failure to do that will result in
+ operations being performed by the kernel in reverse order. For example,
+ a disconnect may be executed before a pending connect has had a chance to
+ complete. And this will result in controllers that are supposed to be
+ disconnected to be connected without nvme-stas knowing about it.
+
+ The Controller Terminator is used when we need to disconnect from a
+ controller. It will make sure that there are no pending operations before
+ issuing a disconnect.
+ '''
+
+ DISPOSAL_AUDIT_PERIOD_SEC = 30
+
+ def __init__(self):
+ self._udev = udev.UDEV
+ self._controllers = list() # The list of controllers to dispose of.
+ self._audit_tmr = gutil.GTimer(self.DISPOSAL_AUDIT_PERIOD_SEC, self._on_disposal_check)
+
+ def dispose(self, controller: ctrl.Controller, on_controller_removed_cb, keep_connection: bool):
+ '''Invoked by a service (stafd or stacd) to dispose of a controller'''
+ if controller.all_ops_completed():
+ logging.debug(
+ 'CtrlTerminator.dispose() - %s | %s: Invoke disconnect()', controller.tid, controller.device
+ )
+ controller.disconnect(on_controller_removed_cb, keep_connection)
+ else:
+ logging.debug(
+ 'CtrlTerminator.dispose() - %s | %s: Add controller to garbage disposal',
+ controller.tid,
+ controller.device,
+ )
+ self._controllers.append((controller, keep_connection, on_controller_removed_cb, controller.tid))
+
+ self._udev.register_for_action_events('add', self._on_kernel_events)
+ self._udev.register_for_action_events('remove', self._on_kernel_events)
+
+ if self._audit_tmr.time_remaining() == 0:
+ self._audit_tmr.start()
+
+ def pending_disposal(self, tid):
+ '''Check whether @tid is pending disposal'''
+ for controller in self._controllers:
+ if controller.tid == tid:
+ return True
+ return False
+
+ def info(self):
+ '''@brief Get info about this object (used for debug)'''
+ info = {
+ 'terminator.audit timer': str(self._audit_tmr),
+ }
+ for controller, _, _, tid in self._controllers:
+ info[f'terminator.controller.{tid}'] = str(controller.info())
+ return info
+
+ def kill(self):
+ '''Stop Controller Terminator and release resources.'''
+ self._audit_tmr.stop()
+ self._audit_tmr = None
+
+ if self._udev:
+ self._udev.unregister_for_action_events('add', self._on_kernel_events)
+ self._udev.unregister_for_action_events('remove', self._on_kernel_events)
+ self._udev = None
+
+ for controller, keep_connection, on_controller_removed_cb, _ in self._controllers:
+ controller.disconnect(on_controller_removed_cb, keep_connection)
+
+ self._controllers.clear()
+
+ def _on_kernel_events(self, udev_obj):
+ logging.debug('CtrlTerminator._on_kernel_events() - %s event received', udev_obj.action)
+ self._disposal_check()
+
+ def _on_disposal_check(self, *_user_data):
+ logging.debug('CtrlTerminator._on_disposal_check()- Periodic audit')
+ return GLib.SOURCE_REMOVE if self._disposal_check() else GLib.SOURCE_CONTINUE
+
+ @staticmethod
+ def _keep_or_terminate(args):
+ '''Return False if controller is to be kept. True if controller
+ was terminated and can be removed from the list.'''
+ controller, keep_connection, on_controller_removed_cb, tid = args
+ if controller.all_ops_completed():
+ logging.debug(
+ 'CtrlTerminator._keep_or_terminate()- %s | %s: Disconnecting controller',
+ tid,
+ controller.device,
+ )
+ controller.disconnect(on_controller_removed_cb, keep_connection)
+ return True
+
+ return False
+
+ def _disposal_check(self):
+ # Iterate over the list, terminating (disconnecting) those controllers
+ # that have no pending operations, and remove those controllers from the
+ # list (only keep controllers that still have operations pending).
+ self._controllers[:] = filterfalse(self._keep_or_terminate, self._controllers)
+ disposal_complete = len(self._controllers) == 0
+
+ if disposal_complete:
+ logging.debug('CtrlTerminator._disposal_check() - Disposal complete')
+ self._audit_tmr.stop()
+ self._udev.unregister_for_action_events('add', self._on_kernel_events)
+ self._udev.unregister_for_action_events('remove', self._on_kernel_events)
+ else:
+ self._audit_tmr.start() # Restart timer
+
+ return disposal_complete
+
+
+# ******************************************************************************
+class Service(stas.ServiceABC):
+ '''@brief Base class used to manage a STorage Appliance Service'''
+
+ def __init__(self, args, default_conf, reload_hdlr):
+ self._udev = udev.UDEV
+ self._terminator = CtrlTerminator()
+
+ super().__init__(args, default_conf, reload_hdlr)
+
+ def _release_resources(self):
+ logging.debug('Service._release_resources()')
+ super()._release_resources()
+
+ if self._terminator:
+ self._terminator.kill()
+
+ self._udev = None
+ self._terminator = None
+
+ def _disconnect_all(self):
+ '''Tell all controller objects to disconnect'''
+ keep_connections = self._keep_connections_on_exit()
+ controllers = self._controllers.values()
+ logging.debug(
+ 'Service._stop_hdlr() - Controller count = %s, keep_connections = %s',
+ len(controllers),
+ keep_connections,
+ )
+ for controller in controllers:
+ self._terminator.dispose(controller, self._on_final_disconnect, keep_connections)
+
+ def info(self) -> dict:
+ '''@brief Get the status info for this object (used for debug)'''
+ info = super().info()
+ if self._terminator:
+ info.update(self._terminator.info())
+ return info
+
+ @stas.ServiceABC.tron.setter
+ def tron(self, value):
+ '''@brief Set Trace ON property'''
+ super(__class__, self.__class__).tron.__set__(self, value)
+
+
+# ******************************************************************************
+class Stac(Service):
+ '''STorage Appliance Connector (STAC)'''
+
+ CONF_STABILITY_LONG_SOAK_TIME_SEC = 10 # pylint: disable=invalid-name
+ ADD_EVENT_SOAK_TIME_SEC = 1
+
+ def __init__(self, args, dbus):
+ default_conf = {
+ ('Global', 'tron'): False,
+ ('Global', 'hdr-digest'): False,
+ ('Global', 'data-digest'): False,
+ ('Global', 'kato'): None, # None to let the driver decide the default
+ ('Global', 'nr-io-queues'): None, # None to let the driver decide the default
+ ('Global', 'nr-write-queues'): None, # None to let the driver decide the default
+ ('Global', 'nr-poll-queues'): None, # None to let the driver decide the default
+ ('Global', 'queue-size'): None, # None to let the driver decide the default
+ ('Global', 'reconnect-delay'): None, # None to let the driver decide the default
+ ('Global', 'ctrl-loss-tmo'): None, # None to let the driver decide the default
+ ('Global', 'disable-sqflow'): None, # None to let the driver decide the default
+ ('Global', 'ignore-iface'): False,
+ ('Global', 'ip-family'): (4, 6),
+ ('Controllers', 'controller'): list(),
+ ('Controllers', 'exclude'): list(),
+ ('I/O controller connection management', 'disconnect-scope'): 'only-stas-connections',
+ ('I/O controller connection management', 'disconnect-trtypes'): ['tcp'],
+ ('I/O controller connection management', 'connect-attempts-on-ncc'): 0,
+ }
+
+ super().__init__(args, default_conf, self._reload_hdlr)
+
+ self._add_event_soak_tmr = gutil.GTimer(self.ADD_EVENT_SOAK_TIME_SEC, self._on_add_event_soaked)
+
+ self._config_connections_audit()
+
+ # Create the D-Bus instance.
+ self._config_dbus(dbus, defs.STACD_DBUS_NAME, defs.STACD_DBUS_PATH)
+
+ # Connect to STAF D-Bus interface
+ self._staf = None
+ self._staf_watcher = dasbus.client.observer.DBusObserver(self._sysbus, defs.STAFD_DBUS_NAME)
+ self._staf_watcher.service_available.connect(self._connect_to_staf)
+ self._staf_watcher.service_unavailable.connect(self._disconnect_from_staf)
+ self._staf_watcher.connect_once_available()
+
+ def _release_resources(self):
+ logging.debug('Stac._release_resources()')
+
+ if self._add_event_soak_tmr:
+ self._add_event_soak_tmr.kill()
+
+ if self._udev:
+ self._udev.unregister_for_action_events('add', self._on_add_event)
+
+ self._destroy_staf_comlink(self._staf_watcher)
+ if self._staf_watcher is not None:
+ self._staf_watcher.disconnect()
+
+ super()._release_resources()
+
+ self._staf = None
+ self._staf_watcher = None
+ self._add_event_soak_tmr = None
+
+ def _dump_last_known_config(self, controllers):
+ config = list(controllers.keys())
+ logging.debug('Stac._dump_last_known_config() - IOC count = %s', len(config))
+ self._write_lkc(config)
+
+ def _load_last_known_config(self):
+ config = self._read_lkc() or list()
+ logging.debug('Stac._load_last_known_config() - IOC count = %s', len(config))
+
+ controllers = {}
+ for tid in config:
+ # Only create Ioc objects if there is already a connection in the kernel
+ # First, regenerate the TID (in case of soft. upgrade and TID object
+ # has changed internally)
+ tid = trid.TID(tid.as_dict())
+ if udev.UDEV.find_nvme_ioc_device(tid) is not None:
+ controllers[tid] = ctrl.Ioc(self, tid)
+
+ return controllers
+
+ def _audit_all_connections(self, tids):
+ '''A host should only connect to I/O controllers that have been zoned
+ for that host or a manual "controller" entry exists in stacd.conf.
+ A host should disconnect from an I/O controller when that I/O controller
+ is removed from the zone or a "controller" entry is manually removed
+ from stacd.conf. stacd will audit connections if "disconnect-scope=
+ all-connections-matching-disconnect-trtypes". stacd will delete any
+ connection that is not supposed to exist.
+ '''
+ logging.debug('Stac._audit_all_connections() - tids = %s', tids)
+ num_controllers = len(self._controllers)
+ for tid in tids:
+ if tid not in self._controllers and not self._terminator.pending_disposal(tid):
+ self._controllers[tid] = ctrl.Ioc(self, tid)
+
+ if num_controllers != len(self._controllers):
+ self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
+
+ def _on_add_event(self, udev_obj):
+ '''@brief This function is called when a "add" event is received from
+ the kernel for an NVMe device. This is used to trigger an audit and make
+ sure that the connection to an I/O controller is allowed.
+
+ WARNING: There is a race condition with the "add" event from the kernel.
+ The kernel sends the "add" event a bit early and the sysfs attributes
+ associated with the nvme object are not always fully initialized.
+ To workaround this problem we use a soaking timer to give time for the
+ sysfs attributes to stabilize.
+ '''
+ logging.debug('Stac._on_add_event(() - Received "add" event: %s', udev_obj.sys_name)
+ self._add_event_soak_tmr.start()
+
+ def _on_add_event_soaked(self):
+ '''@brief After the add event has been soaking for ADD_EVENT_SOAK_TIME_SEC
+ seconds, we can audit the connections.
+ '''
+ if self._alive():
+ svc_conf = conf.SvcConf()
+ if svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes':
+ self._audit_all_connections(self._udev.get_nvme_ioc_tids(svc_conf.disconnect_trtypes))
+ return GLib.SOURCE_REMOVE
+
+ def _config_connections_audit(self):
+ '''This function checks the "disconnect_scope" parameter to determine
+ whether audits should be performed. Audits are enabled when
+ "disconnect_scope == all-connections-matching-disconnect-trtypes".
+ '''
+ svc_conf = conf.SvcConf()
+ if svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes':
+ if not self._udev.is_action_cback_registered('add', self._on_add_event):
+ self._udev.register_for_action_events('add', self._on_add_event)
+ self._audit_all_connections(self._udev.get_nvme_ioc_tids(svc_conf.disconnect_trtypes))
+ else:
+ self._udev.unregister_for_action_events('add', self._on_add_event)
+
+ def _keep_connections_on_exit(self):
+ '''@brief Determine whether connections should remain when the
+ process exits.
+ '''
+ return True
+
+ def _reload_hdlr(self):
+ '''@brief Reload configuration file. This is triggered by the SIGHUP
+ signal, which can be sent with "systemctl reload stacd".
+ '''
+ if not self._alive():
+ return GLib.SOURCE_REMOVE
+
+ sd_notify('RELOADING=1')
+ service_cnf = conf.SvcConf()
+ service_cnf.reload()
+ self.tron = service_cnf.tron
+ self._config_connections_audit()
+ self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
+
+ for controller in self._controllers.values():
+ controller.reload_hdlr()
+
+ sd_notify('READY=1')
+ return GLib.SOURCE_CONTINUE
+
+ def _get_log_pages_from_stafd(self):
+ if self._staf:
+ try:
+ return json.loads(self._staf.get_all_log_pages(True))
+ except dasbus.error.DBusError:
+ pass
+
+ return list()
+
+ def _config_ctrls_finish(self, configured_ctrl_list: list): # pylint: disable=too-many-locals
+ '''@param configured_ctrl_list: list of TIDs'''
+ # This is a callback function, which may be called after the service
+ # has been signalled to stop. So let's make sure the service is still
+ # alive and well before continuing.
+ if not self._alive():
+ logging.debug('Stac._config_ctrls_finish() - Exiting because service is no longer alive')
+ return
+
+ # Eliminate invalid entries from stacd.conf "controller list".
+ configured_ctrl_list = [
+ tid for tid in configured_ctrl_list if '' not in (tid.transport, tid.traddr, tid.trsvcid, tid.subsysnqn)
+ ]
+
+ logging.debug('Stac._config_ctrls_finish() - configured_ctrl_list = %s', configured_ctrl_list)
+
+ discovered_ctrls = dict()
+ for staf_data in self._get_log_pages_from_stafd():
+ host_traddr = staf_data['discovery-controller']['host-traddr']
+ host_iface = staf_data['discovery-controller']['host-iface']
+ for dlpe in staf_data['log-pages']:
+ if dlpe.get('subtype') == 'nvme': # eliminate discovery controllers
+ tid = stas.tid_from_dlpe(dlpe, host_traddr, host_iface)
+ discovered_ctrls[tid] = dlpe
+
+ discovered_ctrl_list = list(discovered_ctrls.keys())
+ logging.debug('Stac._config_ctrls_finish() - discovered_ctrl_list = %s', discovered_ctrl_list)
+
+ controllers = stas.remove_excluded(configured_ctrl_list + discovered_ctrl_list)
+ controllers = iputil.remove_invalid_addresses(controllers)
+
+ new_controller_tids = set(controllers)
+ cur_controller_tids = set(self._controllers.keys())
+ controllers_to_add = new_controller_tids - cur_controller_tids
+ controllers_to_del = cur_controller_tids - new_controller_tids
+
+ logging.debug('Stac._config_ctrls_finish() - controllers_to_add = %s', list(controllers_to_add))
+ logging.debug('Stac._config_ctrls_finish() - controllers_to_del = %s', list(controllers_to_del))
+
+ svc_conf = conf.SvcConf()
+ no_disconnect = svc_conf.disconnect_scope == 'no-disconnect'
+ match_trtypes = svc_conf.disconnect_scope == 'all-connections-matching-disconnect-trtypes'
+ logging.debug(
+ 'Stac._config_ctrls_finish() - no_disconnect=%s, match_trtypes=%s, svc_conf.disconnect_trtypes=%s',
+ no_disconnect,
+ match_trtypes,
+ svc_conf.disconnect_trtypes,
+ )
+ for tid in controllers_to_del:
+ controller = self._controllers.pop(tid, None)
+ if controller is not None:
+ keep_connection = no_disconnect or (match_trtypes and tid.transport not in svc_conf.disconnect_trtypes)
+ self._terminator.dispose(controller, self.remove_controller, keep_connection)
+
+ for tid in controllers_to_add:
+ self._controllers[tid] = ctrl.Ioc(self, tid)
+
+ for tid, controller in self._controllers.items():
+ if tid in discovered_ctrls:
+ dlpe = discovered_ctrls[tid]
+ controller.update_dlpe(dlpe)
+
+ self._dump_last_known_config(self._controllers)
+
+ def _connect_to_staf(self, _):
+ '''@brief Hook up DBus signal handlers for signals from stafd.'''
+ if not self._alive():
+ return
+
+ try:
+ self._staf = self._sysbus.get_proxy(defs.STAFD_DBUS_NAME, defs.STAFD_DBUS_PATH)
+ self._staf.log_pages_changed.connect(self._log_pages_changed)
+ self._staf.dc_removed.connect(self._dc_removed)
+ self._cfg_soak_tmr.start()
+
+ # Make sure timer is set back to its normal value.
+ self._cfg_soak_tmr.set_timeout(self.CONF_STABILITY_SOAK_TIME_SEC)
+ logging.debug('Stac._connect_to_staf() - Connected to staf')
+ except dasbus.error.DBusError:
+ logging.error('Failed to connect to staf')
+
+ def _destroy_staf_comlink(self, watcher): # pylint: disable=unused-argument
+ if self._staf:
+ self._staf.log_pages_changed.disconnect(self._log_pages_changed)
+ self._staf.dc_removed.disconnect(self._dc_removed)
+ dasbus.client.proxy.disconnect_proxy(self._staf)
+ self._staf = None
+
+ def _disconnect_from_staf(self, watcher):
+ self._destroy_staf_comlink(watcher)
+
+ # When we lose connectivity with stafd, the most logical explanation
+ # is that stafd restarted. In that case, it may take some time for stafd
+ # to re-populate its log pages cache. So let's give stafd plenty of time
+ # to update its log pages cache and send log pages change notifications
+ # before triggering a stacd re-config. We do this by momentarily
+ # increasing the config soak timer to a longer period.
+ if self._cfg_soak_tmr:
+ self._cfg_soak_tmr.set_timeout(self.CONF_STABILITY_LONG_SOAK_TIME_SEC)
+
+ logging.debug('Stac._disconnect_from_staf() - Disconnected from staf')
+
+ def _log_pages_changed( # pylint: disable=too-many-arguments
+ self, transport, traddr, trsvcid, host_traddr, host_iface, subsysnqn, device
+ ):
+ if not self._alive():
+ return
+
+ logging.debug(
+ 'Stac._log_pages_changed() - transport=%s, traddr=%s, trsvcid=%s, host_traddr=%s, host_iface=%s, subsysnqn=%s, device=%s',
+ transport,
+ traddr,
+ trsvcid,
+ host_traddr,
+ host_iface,
+ subsysnqn,
+ device,
+ )
+ if self._cfg_soak_tmr:
+ self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
+
+ def _dc_removed(self):
+ if not self._alive():
+ return
+
+ logging.debug('Stac._dc_removed()')
+ if self._cfg_soak_tmr:
+ self._cfg_soak_tmr.start(self.CONF_STABILITY_SOAK_TIME_SEC)
+
+
+# ******************************************************************************
+# Only keep legacy FC rule (not even sure this is still in use today, but just to be safe).
+UDEV_RULE_OVERRIDE = r'''
+ACTION=="change", SUBSYSTEM=="fc", ENV{FC_EVENT}=="nvmediscovery", \
+ ENV{NVMEFC_HOST_TRADDR}=="*", ENV{NVMEFC_TRADDR}=="*", \
+ RUN+="%s --no-block start nvmf-connect@--transport=fc\t--traddr=$env{NVMEFC_TRADDR}\t--trsvcid=none\t--host-traddr=$env{NVMEFC_HOST_TRADDR}.service"
+'''
+
+
+def _udev_rule_ctrl(suppress):
+ '''@brief We override the standard udev rule installed by nvme-cli, i.e.
+ '/usr/lib/udev/rules.d/70-nvmf-autoconnect.rules', with a copy into
+ /run/udev/rules.d. The goal is to suppress the udev rule that controls TCP
+ connections to I/O controllers. This is to avoid race conditions between
+ stacd and udevd. This is configurable. See "udev-rule" in stacd.conf
+ for details.
+
+ @param enable: When True, override nvme-cli's udev rule and prevent TCP I/O
+ Controller connections by nvme-cli. When False, allow nvme-cli's udev rule
+ to make TCP I/O connections.
+ @type enable: bool
+ '''
+ udev_rule_file = pathlib.Path('/run/udev/rules.d', '70-nvmf-autoconnect.rules')
+ if suppress:
+ if not udev_rule_file.exists():
+ pathlib.Path('/run/udev/rules.d').mkdir(parents=True, exist_ok=True)
+ text = UDEV_RULE_OVERRIDE % (defs.SYSTEMCTL)
+ udev_rule_file.write_text(text) # pylint: disable=unspecified-encoding
+ else:
+ try:
+ udev_rule_file.unlink()
+ except FileNotFoundError:
+ pass
+
+
+def _is_dlp_changed_aen(udev_obj):
+ '''Check whether we received a Change of Discovery Log Page AEN'''
+ nvme_aen = udev_obj.get('NVME_AEN')
+ if not isinstance(nvme_aen, str):
+ return False
+
+ aen = int(nvme_aen, 16)
+ if aen != ctrl.DLP_CHANGED:
+ return False
+
+ logging.info(
+ '%s - Received AEN: Change of Discovery Log Page (%s)',
+ udev_obj.sys_name,
+ nvme_aen,
+ )
+ return True
+
+
+def _event_matches(udev_obj, nvme_events):
+ '''Check whether we received an NVMe Event matching
+ one of the events listed in @nvme_events'''
+ nvme_event = udev_obj.get('NVME_EVENT')
+ if nvme_event not in nvme_events:
+ return False
+
+ logging.info('%s - Received "%s" event', udev_obj.sys_name, nvme_event)
+ return True
+
+
+# ******************************************************************************
+class Staf(Service):
+ '''STorage Appliance Finder (STAF)'''
+
+ def __init__(self, args, dbus):
+ default_conf = {
+ ('Global', 'tron'): False,
+ ('Global', 'hdr-digest'): False,
+ ('Global', 'data-digest'): False,
+ ('Global', 'kato'): 30,
+ ('Global', 'queue-size'): None, # None to let the driver decide the default
+ ('Global', 'reconnect-delay'): None, # None to let the driver decide the default
+ ('Global', 'ctrl-loss-tmo'): None, # None to let the driver decide the default
+ ('Global', 'disable-sqflow'): None, # None to let the driver decide the default
+ ('Global', 'persistent-connections'): False, # Deprecated
+ ('Discovery controller connection management', 'persistent-connections'): True,
+ ('Discovery controller connection management', 'zeroconf-connections-persistence'): timeparse.timeparse(
+ '72hours'
+ ),
+ ('Global', 'ignore-iface'): False,
+ ('Global', 'ip-family'): (4, 6),
+ ('Global', 'pleo'): True,
+ ('Service Discovery', 'zeroconf'): True,
+ ('Controllers', 'controller'): list(),
+ ('Controllers', 'exclude'): list(),
+ }
+
+ super().__init__(args, default_conf, self._reload_hdlr)
+
+ self._avahi = avahi.Avahi(self._sysbus, self._avahi_change)
+ self._avahi.config_stypes(conf.SvcConf().stypes)
+
+ # Create the D-Bus instance.
+ self._config_dbus(dbus, defs.STAFD_DBUS_NAME, defs.STAFD_DBUS_PATH)
+
+ self._udev.register_for_action_events('change', self._nvme_cli_interop)
+ _udev_rule_ctrl(True)
+
+ def info(self) -> dict:
+ '''@brief Get the status info for this object (used for debug)'''
+ info = super().info()
+ info['avahi'] = self._avahi.info()
+ return info
+
+ def _release_resources(self):
+ logging.debug('Staf._release_resources()')
+ if self._udev:
+ self._udev.unregister_for_action_events('change', self._nvme_cli_interop)
+
+ super()._release_resources()
+
+ _udev_rule_ctrl(False)
+ if self._avahi:
+ self._avahi.kill()
+ self._avahi = None
+
+ def _dump_last_known_config(self, controllers):
+ config = {tid: {'log_pages': dc.log_pages(), 'origin': dc.origin} for tid, dc in controllers.items()}
+ logging.debug('Staf._dump_last_known_config() - DC count = %s', len(config))
+ self._write_lkc(config)
+
+ def _load_last_known_config(self):
+ config = self._read_lkc() or dict()
+ logging.debug('Staf._load_last_known_config() - DC count = %s', len(config))
+
+ controllers = {}
+ for tid, data in config.items():
+ if isinstance(data, dict):
+ log_pages = data.get('log_pages')
+ origin = data.get('origin')
+ else:
+ log_pages = data
+ origin = None
+
+ # Regenerate the TID (in case of soft. upgrade and TID object
+ # has changed internally)
+ tid = trid.TID(tid.as_dict())
+ controllers[tid] = ctrl.Dc(self, tid, log_pages, origin)
+
+ return controllers
+
+ def _keep_connections_on_exit(self):
+ '''@brief Determine whether connections should remain when the
+ process exits.
+ '''
+ return conf.SvcConf().persistent_connections
+
+ def _reload_hdlr(self):
+ '''@brief Reload configuration file. This is triggered by the SIGHUP
+ signal, which can be sent with "systemctl reload stafd".
+ '''
+ if not self._alive():
+ return GLib.SOURCE_REMOVE
+
+ sd_notify('RELOADING=1')
+ service_cnf = conf.SvcConf()
+ service_cnf.reload()
+ self.tron = service_cnf.tron
+ self._avahi.kick_start() # Make sure Avahi is running
+ self._avahi.config_stypes(service_cnf.stypes)
+ self._cfg_soak_tmr.start()
+
+ for controller in self._controllers.values():
+ controller.reload_hdlr()
+
+ sd_notify('READY=1')
+ return GLib.SOURCE_CONTINUE
+
+ def is_avahi_reported(self, tid):
+ '''@brief Return whether @tid is being reported by the Avahi daemon.
+ @return: True if the Avahi daemon is reporting it, False otherwise.
+ '''
+ for cid in self._avahi.get_controllers():
+ if trid.TID(cid) == tid:
+ return True
+ return False
+
+ def log_pages_changed(self, controller, device):
+ '''@brief Function invoked when a controller's cached log pages
+ have changed. This will emit a D-Bus signal to inform
+ other applications that the cached log pages have changed.
+ '''
+ self._dbus_iface.log_pages_changed.emit(
+ controller.tid.transport,
+ controller.tid.traddr,
+ controller.tid.trsvcid,
+ controller.tid.host_traddr,
+ controller.tid.host_iface,
+ controller.tid.subsysnqn,
+ device,
+ )
+
+ def dc_removed(self):
+ '''@brief Function invoked when a controller's cached log pages
+ have changed. This will emit a D-Bus signal to inform
+ other applications that the cached log pages have changed.
+ '''
+ self._dbus_iface.dc_removed.emit()
+
+ def _referrals(self) -> list:
+ return [
+ stas.tid_from_dlpe(dlpe, controller.tid.host_traddr, controller.tid.host_iface)
+ for controller in self.get_controllers()
+ for dlpe in controller.referrals()
+ ]
+
+ def _config_ctrls_finish(self, configured_ctrl_list: list):
+ '''@brief Finish discovery controllers configuration after
+ hostnames (if any) have been resolved. All the logic associated
+ with discovery controller creation/deletion is found here. To
+ avoid calling this algorith repetitively for each and every events,
+ it is called after a soaking period controlled by self._cfg_soak_tmr.
+
+ @param configured_ctrl_list: List of TIDs configured in stafd.conf with
+ all hostnames resolved to their corresponding IP addresses.
+ '''
+ # This is a callback function, which may be called after the service
+ # has been signalled to stop. So let's make sure the service is still
+ # alive and well before continuing.
+ if not self._alive():
+ logging.debug('Staf._config_ctrls_finish() - Exiting because service is no longer alive')
+ return
+
+ # Eliminate invalid entries from stafd.conf "controller list".
+ controllers = list()
+ for tid in configured_ctrl_list:
+ if '' in (tid.transport, tid.traddr, tid.trsvcid):
+ continue
+ if not tid.subsysnqn:
+ cid = tid.as_dict()
+ cid['subsysnqn'] = defs.WELL_KNOWN_DISC_NQN
+ controllers.append(trid.TID(cid))
+ else:
+ controllers.append(tid)
+ configured_ctrl_list = controllers
+
+ # Get the Avahi-discovered list and the referrals.
+ discovered_ctrl_list = [trid.TID(cid) for cid in self._avahi.get_controllers()]
+ referral_ctrl_list = self._referrals()
+ logging.debug('Staf._config_ctrls_finish() - configured_ctrl_list = %s', configured_ctrl_list)
+ logging.debug('Staf._config_ctrls_finish() - discovered_ctrl_list = %s', discovered_ctrl_list)
+ logging.debug('Staf._config_ctrls_finish() - referral_ctrl_list = %s', referral_ctrl_list)
+
+ all_ctrls = configured_ctrl_list + discovered_ctrl_list + referral_ctrl_list
+ controllers = stas.remove_excluded(all_ctrls)
+ controllers = iputil.remove_invalid_addresses(controllers)
+
+ new_controller_tids = set(controllers)
+ cur_controller_tids = set(self._controllers.keys())
+ controllers_to_add = new_controller_tids - cur_controller_tids
+ controllers_to_del = cur_controller_tids - new_controller_tids
+
+ # Make a list list of excluded and invalid controllers
+ must_remove_list = set(all_ctrls) - new_controller_tids
+
+ # Find "discovered" controllers that have not responded
+ # in a while and add them to controllers that must be removed.
+ must_remove_list.update({tid for tid, controller in self._controllers.items() if controller.is_unresponsive()})
+
+ # Do not remove Avahi-discovered DCs from controllers_to_del unless
+ # marked as "must-be-removed" (must_remove_list). This is to account for
+ # the case where mDNS discovery is momentarily disabled (e.g. Avahi
+ # daemon restarts). We don't want to delete connections because of
+ # temporary mDNS impairments. Removal of Avahi-discovered DCs will be
+ # handled differently and only if the connection cannot be established
+ # for a long period of time.
+ logging.debug('Staf._config_ctrls_finish() - must_remove_list = %s', list(must_remove_list))
+ controllers_to_del = {
+ tid
+ for tid in controllers_to_del
+ if tid in must_remove_list or self._controllers[tid].origin != 'discovered'
+ }
+
+ logging.debug('Staf._config_ctrls_finish() - controllers_to_add = %s', list(controllers_to_add))
+ logging.debug('Staf._config_ctrls_finish() - controllers_to_del = %s', list(controllers_to_del))
+
+ # Delete controllers
+ for tid in controllers_to_del:
+ controller = self._controllers.pop(tid, None)
+ if controller is not None:
+ self._terminator.dispose(controller, self.remove_controller, keep_connection=False)
+
+ if len(controllers_to_del) > 0:
+ self.dc_removed() # Let other apps (e.g. stacd) know that discovery controllers were removed.
+
+ # Add controllers
+ for tid in controllers_to_add:
+ self._controllers[tid] = ctrl.Dc(self, tid)
+
+ # Update "origin" on all DC objects
+ for tid, controller in self._controllers.items():
+ origin = (
+ 'configured'
+ if tid in configured_ctrl_list
+ else 'referral'
+ if tid in referral_ctrl_list
+ else 'discovered'
+ if tid in discovered_ctrl_list
+ else None
+ )
+ if origin is not None:
+ controller.origin = origin
+
+ self._dump_last_known_config(self._controllers)
+
+ def _avahi_change(self):
+ if self._alive() and self._cfg_soak_tmr is not None:
+ self._cfg_soak_tmr.start()
+
+ def controller_unresponsive(self, tid):
+ '''@brief Function invoked when a controller becomes unresponsive and
+ needs to be removed.
+ '''
+ if self._alive() and self._cfg_soak_tmr is not None:
+ logging.debug('Staf.controller_unresponsive() - tid = %s', tid)
+ self._cfg_soak_tmr.start()
+
+ def referrals_changed(self):
+ '''@brief Function invoked when a controller's cached referrals
+ have changed.
+ '''
+ if self._alive() and self._cfg_soak_tmr is not None:
+ logging.debug('Staf.referrals_changed()')
+ self._cfg_soak_tmr.start()
+
+ def _nvme_cli_interop(self, udev_obj):
+ '''Interoperability with nvme-cli:
+ stafd will invoke nvme-cli's connect-all the same way nvme-cli's udev
+ rules would do normally. This is for the case where a user has an hybrid
+ configuration where some controllers are configured through nvme-stas
+ and others through nvme-cli. This is not an optimal configuration. It
+ would be better if everything was configured through nvme-stas, however
+ support for hybrid configuration was requested by users (actually only
+ one user requested this).'''
+
+ # Looking for 'change' events only
+ if udev_obj.action != 'change':
+ return
+
+ # Looking for events from Discovery Controllers only
+ if not udev.Udev.is_dc_device(udev_obj):
+ return
+
+ # Is the controller already being monitored by stafd?
+ for controller in self.get_controllers():
+ if controller.device == udev_obj.sys_name:
+ return
+
+ # Did we receive a Change of DLP AEN or an NVME Event indicating 'connect' or 'rediscover'?
+ if not _is_dlp_changed_aen(udev_obj) and not _event_matches(udev_obj, ('connected', 'rediscover')):
+ return
+
+ # We need to invoke "nvme connect-all" using nvme-cli's nvmf-connect@.service
+ # NOTE: Eventually, we'll be able to drop --host-traddr and --host-iface from
+ # the parameters passed to nvmf-connect@.service. A fix was added to connect-all
+ # to infer these two values from the device used to connect to the DC.
+ # Ref: https://github.com/linux-nvme/nvme-cli/pull/1812
+ cnf = [
+ ('--device', udev_obj.sys_name),
+ ('--host-traddr', udev_obj.properties.get('NVME_HOST_TRADDR', None)),
+ ('--host-iface', udev_obj.properties.get('NVME_HOST_IFACE', None)),
+ ]
+ # Use systemd's escaped syntax (i.e. '=' is replaced by '\x3d', '\t' by '\x09', etc.
+ options = r'\x09'.join(
+ [fr'{option}\x3d{value}' for option, value in cnf if value not in (None, 'none', 'None', '')]
+ )
+ logging.info('Invoking: systemctl start nvmf-connect@%s.service', options)
+ cmd = [defs.SYSTEMCTL, '--quiet', '--no-block', 'start', fr'nvmf-connect@{options}.service']
+ subprocess.run(cmd, check=False)
diff --git a/staslib/singleton.py b/staslib/singleton.py
new file mode 100644
index 0000000..2171186
--- /dev/null
+++ b/staslib/singleton.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''Implementation of a singleton pattern'''
+
+
+class Singleton(type):
+ '''metaclass implementation of a singleton pattern'''
+
+ _instances = {}
+
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ # This variable declaration is required to force a
+ # strong reference on the instance.
+ instance = super(Singleton, cls).__call__(*args, **kwargs)
+ cls._instances[cls] = instance
+ return cls._instances[cls]
diff --git a/staslib/stacd.idl b/staslib/stacd.idl
new file mode 100644
index 0000000..efefbbe
--- /dev/null
+++ b/staslib/stacd.idl
@@ -0,0 +1,27 @@
+<node>
+ <interface name="@STACD_DBUS_NAME@.debug">
+ <property name="tron" type="b" access="readwrite"/>
+ <property name="log_level" type="s" access="read"/>
+ <method name="process_info">
+ <arg direction="out" type="s" name="info_json"/>
+ </method>
+ <method name="controller_info">
+ <arg direction="in" type="s" name="transport"/>
+ <arg direction="in" type="s" name="traddr"/>
+ <arg direction="in" type="s" name="trsvcid"/>
+ <arg direction="in" type="s" name="host_traddr"/>
+ <arg direction="in" type="s" name="host_iface"/>
+ <arg direction="in" type="s" name="subsysnqn"/>
+ <arg direction="out" type="s" name="info_json"/>
+ </method>
+ </interface>
+
+ <interface name="@STACD_DBUS_NAME@">
+ <method name="list_controllers">
+ <arg direction="in" type="b" name="detailed"/>
+ <arg direction="out" type="aa{ss}" name="controller_list"/>
+ </method>
+ </interface>
+</node>
+
+
diff --git a/staslib/stafd.idl b/staslib/stafd.idl
new file mode 100644
index 0000000..8c98ffe
--- /dev/null
+++ b/staslib/stafd.idl
@@ -0,0 +1,49 @@
+<node>
+ <interface name="@STAFD_DBUS_NAME@.debug">
+ <property name="tron" type="b" access="readwrite"/>
+ <property name="log_level" type="s" access="read"/>
+ <method name="process_info">
+ <arg direction="out" type="s" name="info_json"/>
+ </method>
+ <method name="controller_info">
+ <arg direction="in" type="s" name="transport"/>
+ <arg direction="in" type="s" name="traddr"/>
+ <arg direction="in" type="s" name="trsvcid"/>
+ <arg direction="in" type="s" name="host_traddr"/>
+ <arg direction="in" type="s" name="host_iface"/>
+ <arg direction="in" type="s" name="subsysnqn"/>
+ <arg direction="out" type="s" name="info_json"/>
+ </method>
+ </interface>
+
+ <interface name="@STAFD_DBUS_NAME@">
+ <method name="list_controllers">
+ <arg direction="in" type="b" name="detailed"/>
+ <arg direction="out" type="aa{ss}" name="controller_list"/>
+ </method>
+ <method name="get_log_pages">
+ <arg direction="in" type="s" name="transport"/>
+ <arg direction="in" type="s" name="traddr"/>
+ <arg direction="in" type="s" name="trsvcid"/>
+ <arg direction="in" type="s" name="host_traddr"/>
+ <arg direction="in" type="s" name="host_iface"/>
+ <arg direction="in" type="s" name="subsysnqn"/>
+ <arg direction="out" type="aa{ss}" name="log_pages"/>
+ </method>
+ <method name="get_all_log_pages">
+ <arg direction="in" type="b" name="detailed"/>
+ <arg direction="out" type="s" name="log_pages_json"/>
+ </method>
+ <signal name="log_pages_changed">
+ <arg direction="out" type="s" name="transport"/>
+ <arg direction="out" type="s" name="traddr"/>
+ <arg direction="out" type="s" name="trsvcid"/>
+ <arg direction="out" type="s" name="host_traddr"/>
+ <arg direction="out" type="s" name="host_iface"/>
+ <arg direction="out" type="s" name="subsysnqn"/>
+ <arg direction="out" type="s" name="device"/>
+ </signal>
+ <signal name="dc_removed"></signal>
+ </interface>
+</node>
+
diff --git a/staslib/stas.py b/staslib/stas.py
new file mode 100644
index 0000000..95afb94
--- /dev/null
+++ b/staslib/stas.py
@@ -0,0 +1,554 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''Library for staf/stac. You will find here common code for stafd and stacd
+including the Abstract Base Classes (ABC) for Controllers and Services'''
+
+import os
+import sys
+import abc
+import signal
+import pickle
+import logging
+import dasbus.connection
+from gi.repository import Gio, GLib
+from systemd.daemon import notify as sd_notify
+from staslib import conf, defs, gutil, log, trid
+
+try:
+ # Python 3.9 or later
+ # This is the preferred way, but may not be available before Python 3.9
+ from importlib.resources import files
+except ImportError:
+ try:
+ # Pre Python 3.9 backport of importlib.resources (if installed)
+ from importlib_resources import files
+ except ImportError:
+ # Less efficient, but avalable on older versions of Python
+ import pkg_resources
+
+ def load_idl(idl_fname):
+ '''@brief Load D-Bus Interface Description Language File'''
+ try:
+ return pkg_resources.resource_string('staslib', idl_fname).decode()
+ except (FileNotFoundError, AttributeError):
+ pass
+
+ return ''
+
+ else:
+
+ def load_idl(idl_fname):
+ '''@brief Load D-Bus Interface Description Language File'''
+ try:
+ return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
+ except FileNotFoundError:
+ pass
+
+ return ''
+
+else:
+
+ def load_idl(idl_fname):
+ '''@brief Load D-Bus Interface Description Language File'''
+ try:
+ return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
+ except FileNotFoundError:
+ pass
+
+ return ''
+
+
+# ******************************************************************************
+def check_if_allowed_to_continue():
+ '''@brief Let's perform some basic checks before going too far. There are
+ a few pre-requisites that need to be met before this program
+ is allowed to proceed:
+
+ 1) The program needs to have root privileges
+ 2) The nvme-tcp kernel module must be loaded
+
+ @return This function will only return if all conditions listed above
+ are met. Otherwise the program exits.
+ '''
+ # 1) Check root privileges
+ if os.geteuid() != 0:
+ sys.exit(f'Permission denied. You need root privileges to run {defs.PROG_NAME}.')
+
+ # 2) Check that nvme-tcp kernel module is running
+ if not os.path.exists('/dev/nvme-fabrics'):
+ # There's no point going any further if the kernel module hasn't been loaded
+ sys.exit('Fatal error: missing nvme-tcp kernel module')
+
+
+# ******************************************************************************
+def tid_from_dlpe(dlpe, host_traddr, host_iface):
+ '''@brief Take a Discovery Log Page Entry and return a Controller ID as a dict.'''
+ cid = {
+ 'transport': dlpe['trtype'],
+ 'traddr': dlpe['traddr'],
+ 'trsvcid': dlpe['trsvcid'],
+ 'host-traddr': host_traddr,
+ 'host-iface': host_iface,
+ 'subsysnqn': dlpe['subnqn'],
+ }
+ return trid.TID(cid)
+
+
+# ******************************************************************************
+def _excluded(excluded_ctrl_list, controller: dict):
+ '''@brief Check if @controller is excluded.'''
+ for excluded_ctrl in excluded_ctrl_list:
+ test_results = [val == controller.get(key, None) for key, val in excluded_ctrl.items()]
+ if all(test_results):
+ return True
+ return False
+
+
+# ******************************************************************************
+def remove_excluded(controllers: list):
+ '''@brief Remove excluded controllers from the list of controllers.
+ @param controllers: List of TIDs
+ '''
+ excluded_ctrl_list = conf.SvcConf().get_excluded()
+ if excluded_ctrl_list:
+ logging.debug('remove_excluded() - excluded_ctrl_list = %s', excluded_ctrl_list)
+ controllers = [
+ controller for controller in controllers if not _excluded(excluded_ctrl_list, controller.as_dict())
+ ]
+ return controllers
+
+
+# ******************************************************************************
+class ControllerABC(abc.ABC):
+ '''@brief Base class used to manage the connection to a controller.'''
+
+ CONNECT_RETRY_PERIOD_SEC = 60
+ FAST_CONNECT_RETRY_PERIOD_SEC = 3
+
+ def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False):
+ self._tid = tid
+ self._serv = service # Refers to the parent service (either Staf or Stac)
+ self.set_level_from_tron(self._serv.tron)
+ self._cancellable = Gio.Cancellable()
+ self._connect_attempts = 0
+ self._retry_connect_tmr = gutil.GTimer(self.CONNECT_RETRY_PERIOD_SEC, self._on_try_to_connect)
+ self._discovery_ctrl = discovery_ctrl
+ self._try_to_connect_deferred = gutil.Deferred(self._try_to_connect)
+ self._try_to_connect_deferred.schedule()
+
+ def _release_resources(self):
+ # Remove pending deferred from main loop
+ if self._try_to_connect_deferred:
+ self._try_to_connect_deferred.cancel()
+
+ if self._retry_connect_tmr is not None:
+ self._retry_connect_tmr.kill()
+
+ if self._alive():
+ self._cancellable.cancel()
+
+ self._tid = None
+ self._serv = None
+ self._cancellable = None
+ self._retry_connect_tmr = None
+ self._try_to_connect_deferred = None
+
+ @property
+ def id(self) -> str:
+ '''@brief Return the Transport ID as a printable string'''
+ return str(self.tid)
+
+ @property
+ def tid(self):
+ '''@brief Return the Transport ID object'''
+ return self._tid
+
+ def controller_id_dict(self) -> dict:
+ '''@brief return the controller ID as a dict.'''
+ return {k: str(v) for k, v in self.tid.as_dict().items()}
+
+ def details(self) -> dict:
+ '''@brief return detailed debug info about this controller'''
+ return self.info()
+
+ def info(self) -> dict:
+ '''@brief Get the controller info for this object'''
+ info = self.controller_id_dict()
+ info['connect attempts'] = str(self._connect_attempts)
+ info['retry connect timer'] = str(self._retry_connect_tmr)
+ return info
+
+ def cancel(self):
+ '''@brief Used to cancel pending operations.'''
+ if self._alive():
+ logging.debug('ControllerABC.cancel() - %s', self.id)
+ self._cancellable.cancel()
+
+ def kill(self):
+ '''@brief Used to release all resources associated with this object.'''
+ logging.debug('ControllerABC.kill() - %s', self.id)
+ self._release_resources()
+
+ def _alive(self):
+ '''There may be race condition where a queued event gets processed
+ after the object is no longer configured (i.e. alive). This method
+ can be used by callback functions to make sure the object is still
+ alive before processing further.
+ '''
+ return self._cancellable and not self._cancellable.is_cancelled()
+
+ def _on_try_to_connect(self):
+ if self._alive():
+ self._try_to_connect_deferred.schedule()
+ return GLib.SOURCE_REMOVE
+
+ def _should_try_to_reconnect(self): # pylint: disable=no-self-use
+ return True
+
+ def _try_to_connect(self):
+ if not self._alive():
+ return GLib.SOURCE_REMOVE
+
+ # This is a deferred function call. Make sure
+ # the source of the deferred is still good.
+ source = GLib.main_current_source()
+ if source and source.is_destroyed():
+ return GLib.SOURCE_REMOVE
+
+ self._connect_attempts += 1
+
+ self._do_connect()
+
+ return GLib.SOURCE_REMOVE
+
+ @abc.abstractmethod
+ def set_level_from_tron(self, tron):
+ '''Set log level based on TRON'''
+
+ @abc.abstractmethod
+ def _do_connect(self):
+ '''Perform connection'''
+
+ @abc.abstractmethod
+ def _on_aen(self, aen: int):
+ '''Event handler when an AEN is received'''
+
+ @abc.abstractmethod
+ def _on_nvme_event(self, nvme_event):
+ '''Event handler when an nvme_event is received'''
+
+ @abc.abstractmethod
+ def _on_ctrl_removed(self, udev_obj):
+ '''Called when the associated nvme device (/dev/nvmeX) is removed
+ from the system by the kernel.
+ '''
+
+ @abc.abstractmethod
+ def _find_existing_connection(self):
+ '''Check if there is an existing connection that matches this Controller's TID'''
+
+ @abc.abstractmethod
+ def all_ops_completed(self) -> bool:
+ '''@brief Returns True if all operations have completed. False otherwise.'''
+
+ @abc.abstractmethod
+ def connected(self):
+ '''@brief Return whether a connection is established'''
+
+ @abc.abstractmethod
+ def disconnect(self, disconnected_cb, keep_connection):
+ '''@brief Issue an asynchronous disconnect command to a Controller.
+ Once the async command has completed, the callback 'disconnected_cb'
+ will be invoked. If a controller is already disconnected, then the
+ callback will be added to the main loop's next idle slot to be executed
+ ASAP.
+ '''
+
+ @abc.abstractmethod
+ def reload_hdlr(self):
+ '''@brief This is called when a "reload" signal is received.'''
+
+
+# ******************************************************************************
+class ServiceABC(abc.ABC): # pylint: disable=too-many-instance-attributes
+ '''@brief Base class used to manage a STorage Appliance Service'''
+
+ CONF_STABILITY_SOAK_TIME_SEC = 1.5
+
+ def __init__(self, args, default_conf, reload_hdlr):
+ service_conf = conf.SvcConf(default_conf=default_conf)
+ service_conf.set_conf_file(args.conf_file) # reload configuration
+ self._tron = args.tron or service_conf.tron
+ log.set_level_from_tron(self._tron)
+
+ self._lkc_file = os.path.join(
+ os.environ.get('RUNTIME_DIRECTORY', os.path.join('/run', defs.PROG_NAME)), 'last-known-config.pickle'
+ )
+ self._loop = GLib.MainLoop()
+ self._cancellable = Gio.Cancellable()
+ self._resolver = gutil.NameResolver()
+ self._controllers = self._load_last_known_config()
+ self._dbus_iface = None
+ self._cfg_soak_tmr = gutil.GTimer(self.CONF_STABILITY_SOAK_TIME_SEC, self._on_config_ctrls)
+ self._sysbus = dasbus.connection.SystemMessageBus()
+
+ GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self._stop_hdlr) # CTRL-C
+ GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self._stop_hdlr) # systemctl stop stafd
+ GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, reload_hdlr) # systemctl reload stafd
+
+ nvme_options = conf.NvmeOptions()
+ if not nvme_options.host_iface_supp or not nvme_options.discovery_supp:
+ logging.warning(
+ 'Kernel does not appear to support all the options needed to run this program. Consider updating to a later kernel version.'
+ )
+
+ # We don't want to apply configuration changes right away.
+ # Often, multiple changes will occur in a short amount of time (sub-second).
+ # We want to wait until there are no more changes before applying them
+ # to the system. The following timer acts as a "soak period". Changes
+ # will be applied by calling self._on_config_ctrls() at the end of
+ # the soak period.
+ self._cfg_soak_tmr.start()
+
+ def _release_resources(self):
+ logging.debug('ServiceABC._release_resources()')
+
+ if self._alive():
+ self._cancellable.cancel()
+
+ if self._cfg_soak_tmr is not None:
+ self._cfg_soak_tmr.kill()
+
+ self._controllers.clear()
+
+ if self._sysbus:
+ self._sysbus.disconnect()
+
+ self._cfg_soak_tmr = None
+ self._cancellable = None
+ self._resolver = None
+ self._lkc_file = None
+ self._sysbus = None
+
+ def _config_dbus(self, iface_obj, bus_name: str, obj_name: str):
+ self._dbus_iface = iface_obj
+ self._sysbus.publish_object(obj_name, iface_obj)
+ self._sysbus.register_service(bus_name)
+
+ @property
+ def tron(self):
+ '''@brief Get Trace ON property'''
+ return self._tron
+
+ @tron.setter
+ def tron(self, value):
+ '''@brief Set Trace ON property'''
+ self._tron = value
+ log.set_level_from_tron(self._tron)
+ for controller in self._controllers.values():
+ controller.set_level_from_tron(self._tron)
+
+ def run(self):
+ '''@brief Start the main loop execution'''
+ try:
+ self._loop.run()
+ except Exception as ex: # pylint: disable=broad-except
+ logging.critical('exception: %s', ex)
+
+ self._loop = None
+
+ def info(self) -> dict:
+ '''@brief Get the status info for this object (used for debug)'''
+ nvme_options = conf.NvmeOptions()
+ info = conf.SysConf().as_dict()
+ info['last known config file'] = self._lkc_file
+ info['config soak timer'] = str(self._cfg_soak_tmr)
+ info['kernel support.TP8013'] = str(nvme_options.discovery_supp)
+ info['kernel support.host_iface'] = str(nvme_options.host_iface_supp)
+ return info
+
+ def get_controllers(self) -> dict:
+ '''@brief return the list of controller objects'''
+ return self._controllers.values()
+
+ def get_controller(
+ self, transport: str, traddr: str, trsvcid: str, host_traddr: str, host_iface: str, subsysnqn: str
+ ): # pylint: disable=too-many-arguments
+ '''@brief get the specified controller object from the list of controllers'''
+ cid = {
+ 'transport': transport,
+ 'traddr': traddr,
+ 'trsvcid': trsvcid,
+ 'host-traddr': host_traddr,
+ 'host-iface': host_iface,
+ 'subsysnqn': subsysnqn,
+ }
+ return self._controllers.get(trid.TID(cid))
+
+ def _remove_ctrl_from_dict(self, controller, shutdown=False):
+ tid_to_pop = controller.tid
+ if not tid_to_pop:
+ # Being paranoid. This should not happen, but let's say the
+ # controller object has been purged, but it is somehow still
+ # listed in self._controllers.
+ for tid, _controller in self._controllers.items():
+ if _controller is controller:
+ tid_to_pop = tid
+ break
+
+ if tid_to_pop:
+ logging.debug('ServiceABC._remove_ctrl_from_dict()- %s | %s', tid_to_pop, controller.device)
+ popped = self._controllers.pop(tid_to_pop, None)
+ if not shutdown and popped is not None and self._cfg_soak_tmr:
+ self._cfg_soak_tmr.start()
+ else:
+ logging.debug('ServiceABC._remove_ctrl_from_dict()- already removed')
+
+ def remove_controller(self, controller, success): # pylint: disable=unused-argument
+ '''@brief remove the specified controller object from the list of controllers
+ @param controller: the controller object
+ @param success: whether the disconnect was successful'''
+ logging.debug('ServiceABC.remove_controller()')
+ if isinstance(controller, ControllerABC):
+ self._remove_ctrl_from_dict(controller)
+ controller.kill()
+
+ def _alive(self):
+ '''It's a good idea to check that this object hasn't been
+ cancelled (i.e. is still alive) when entering a callback function.
+ Callback functrions can be invoked after, for example, a process has
+ been signalled to stop or restart, in which case it makes no sense to
+ proceed with the callback.
+ '''
+ return self._cancellable and not self._cancellable.is_cancelled()
+
+ def _cancel(self):
+ logging.debug('ServiceABC._cancel()')
+ if self._alive():
+ self._cancellable.cancel()
+
+ for controller in self._controllers.values():
+ controller.cancel()
+
+ def _stop_hdlr(self):
+ logging.debug('ServiceABC._stop_hdlr()')
+ sd_notify('STOPPING=1')
+
+ self._cancel() # Cancel pending operations
+
+ self._dump_last_known_config(self._controllers)
+
+ if len(self._controllers) == 0:
+ GLib.idle_add(self._exit)
+ else:
+ self._disconnect_all()
+
+ return GLib.SOURCE_REMOVE
+
+ def _on_final_disconnect(self, controller, success):
+ '''Callback invoked after a controller is disconnected.
+ THIS IS USED DURING PROCESS SHUTDOWN TO WAIT FOR ALL CONTROLLERS TO BE
+ DISCONNECTED BEFORE EXITING THE PROGRAM. ONLY CALL ON SHUTDOWN!
+ @param controller: the controller object
+ @param success: whether the disconnect operation was successful
+ '''
+ logging.debug(
+ 'ServiceABC._on_final_disconnect() - %s | %s: disconnect %s',
+ controller.id,
+ controller.device,
+ 'succeeded' if success else 'failed',
+ )
+
+ self._remove_ctrl_from_dict(controller, True)
+ controller.kill()
+
+ # When all controllers have disconnected, we can finish the clean up
+ if len(self._controllers) == 0:
+ # Defer exit to the next main loop's idle period.
+ GLib.idle_add(self._exit)
+
+ def _exit(self):
+ logging.debug('ServiceABC._exit()')
+ self._release_resources()
+ self._loop.quit()
+
+ def _on_config_ctrls(self, *_user_data):
+ if self._alive():
+ self._config_ctrls()
+ return GLib.SOURCE_REMOVE
+
+ def _config_ctrls(self):
+ '''@brief Start controllers configuration.'''
+ # The configuration file may contain controllers and/or excluded
+ # controllers with traddr specified as hostname instead of IP address.
+ # Because of this, we need to remove those excluded elements before
+ # running name resolution. And we will need to remove excluded
+ # elements after name resolution is complete (i.e. in the calback
+ # function _config_ctrls_finish)
+ logging.debug('ServiceABC._config_ctrls()')
+ configured_controllers = [trid.TID(cid) for cid in conf.SvcConf().get_controllers()]
+ configured_controllers = remove_excluded(configured_controllers)
+ self._resolver.resolve_ctrl_async(self._cancellable, configured_controllers, self._config_ctrls_finish)
+
+ def _read_lkc(self):
+ '''@brief Read Last Known Config from file'''
+ try:
+ with open(self._lkc_file, 'rb') as file:
+ return pickle.load(file)
+ except (FileNotFoundError, AttributeError, EOFError):
+ return None
+
+ def _write_lkc(self, config):
+ '''@brief Write Last Known Config to file, and if config is empty
+ make sure the file is emptied.'''
+ try:
+ # Note that if config is empty we still
+ # want to open/close the file to empty it.
+ with open(self._lkc_file, 'wb') as file:
+ if config:
+ pickle.dump(config, file)
+ except FileNotFoundError as ex:
+ logging.error('Unable to save last known config: %s', ex)
+
+ @abc.abstractmethod
+ def _disconnect_all(self):
+ '''Tell all controller objects to disconnect'''
+
+ @abc.abstractmethod
+ def _keep_connections_on_exit(self):
+ '''@brief Determine whether connections should remain when the
+ process exits.
+
+ NOTE) This is the base class method used to define the interface.
+ It must be overloaded by a child class.
+ '''
+
+ @abc.abstractmethod
+ def _config_ctrls_finish(self, configured_ctrl_list):
+ '''@brief Finish controllers configuration after hostnames (if any)
+ have been resolved.
+
+ Configuring controllers must be done asynchronously in 2 steps.
+ In the first step, host names get resolved to find their IP addresses.
+ Name resolution can take a while, especially when an external name
+ resolution server is used. Once that step completed, the callback
+ method _config_ctrls_finish() (i.e. this method), gets invoked to
+ complete the controller configuration.
+
+ NOTE) This is the base class method used to define the interface.
+ It must be overloaded by a child class.
+ '''
+
+ @abc.abstractmethod
+ def _load_last_known_config(self):
+ '''Load last known config from file (if any)'''
+
+ @abc.abstractmethod
+ def _dump_last_known_config(self, controllers):
+ '''Save last known config to file'''
diff --git a/staslib/timeparse.py b/staslib/timeparse.py
new file mode 100644
index 0000000..5295fc4
--- /dev/null
+++ b/staslib/timeparse.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+This module was borrowed and modified from: https://github.com/wroberts/pytimeparse
+
+timeparse.py
+(c) Will Roberts <wildwilhelm@gmail.com> 1 February, 2014
+
+Implements a single function, `timeparse`, which can parse various
+kinds of time expressions.
+'''
+
+# MIT LICENSE
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import re
+
+SIGN = r'(?P<sign>[+|-])?'
+DAYS = r'(?P<days>[\d.]+)\s*(?:d|dys?|days?)'
+HOURS = r'(?P<hours>[\d.]+)\s*(?:h|hrs?|hours?)'
+MINS = r'(?P<mins>[\d.]+)\s*(?:m|(mins?)|(minutes?))'
+SECS = r'(?P<secs>[\d.]+)\s*(?:s|secs?|seconds?)'
+SEPARATORS = r'[,/]'
+SECCLOCK = r':(?P<secs>\d{2}(?:\.\d+)?)'
+MINCLOCK = r'(?P<mins>\d{1,2}):(?P<secs>\d{2}(?:\.\d+)?)'
+HOURCLOCK = r'(?P<hours>\d+):(?P<mins>\d{2}):(?P<secs>\d{2}(?:\.\d+)?)'
+DAYCLOCK = r'(?P<days>\d+):(?P<hours>\d{2}):(?P<mins>\d{2}):(?P<secs>\d{2}(?:\.\d+)?)'
+
+
+def _opt(string):
+ return f'(?:{string})?'
+
+
+def _optsep(string):
+ return fr'(?:{string}\s*(?:{SEPARATORS}\s*)?)?'
+
+
+TIMEFORMATS = [
+ fr'{_optsep(DAYS)}\s*{_optsep(HOURS)}\s*{_optsep(MINS)}\s*{_opt(SECS)}',
+ f'{MINCLOCK}',
+ fr'{_optsep(DAYS)}\s*{HOURCLOCK}',
+ f'{DAYCLOCK}',
+ f'{SECCLOCK}',
+]
+
+COMPILED_SIGN = re.compile(r'\s*' + SIGN + r'\s*(?P<unsigned>.*)$')
+COMPILED_TIMEFORMATS = [re.compile(r'\s*' + timefmt + r'\s*$', re.I) for timefmt in TIMEFORMATS]
+
+MULTIPLIERS = {
+ 'days': 60 * 60 * 24,
+ 'hours': 60 * 60,
+ 'mins': 60,
+ 'secs': 1,
+}
+
+
+def timeparse(sval):
+ '''
+ Parse a time expression, returning it as a number of seconds. If
+ possible, the return value will be an `int`; if this is not
+ possible, the return will be a `float`. Returns `None` if a time
+ expression cannot be parsed from the given string.
+
+ Arguments:
+ - `sval`: the string value to parse
+
+ >>> timeparse('1:24')
+ 84
+ >>> timeparse(':22')
+ 22
+ >>> timeparse('1 minute, 24 secs')
+ 84
+ >>> timeparse('1m24s')
+ 84
+ >>> timeparse('1.2 minutes')
+ 72
+ >>> timeparse('1.2 seconds')
+ 1.2
+
+ Time expressions can be signed.
+
+ >>> timeparse('- 1 minute')
+ -60
+ >>> timeparse('+ 1 minute')
+ 60
+ '''
+ try:
+ return float(sval)
+ except TypeError:
+ pass
+ except ValueError:
+ match = COMPILED_SIGN.match(sval)
+ sign = -1 if match.groupdict()['sign'] == '-' else 1
+ sval = match.groupdict()['unsigned']
+ for timefmt in COMPILED_TIMEFORMATS:
+ match = timefmt.match(sval)
+ if match and match.group(0).strip():
+ mdict = match.groupdict()
+ # if all of the fields are integer numbers
+ if all(v.isdigit() for v in list(mdict.values()) if v):
+ return sign * sum((MULTIPLIERS[k] * int(v, 10) for (k, v) in list(mdict.items()) if v is not None))
+
+ # if SECS is an integer number
+ if 'secs' not in mdict or mdict['secs'] is None or mdict['secs'].isdigit():
+ # we will return an integer
+ return sign * int(
+ sum(
+ (
+ MULTIPLIERS[k] * float(v)
+ for (k, v) in list(mdict.items())
+ if k != 'secs' and v is not None
+ )
+ )
+ ) + (int(mdict['secs'], 10) if mdict['secs'] else 0)
+
+ # SECS is a float, we will return a float
+ return sign * sum((MULTIPLIERS[k] * float(v) for (k, v) in list(mdict.items()) if v is not None))
+
+ return None
diff --git a/staslib/trid.py b/staslib/trid.py
new file mode 100644
index 0000000..ea40b7d
--- /dev/null
+++ b/staslib/trid.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''This module defines the Transport Identifier Object, which is used
+throughout nvme-stas to uniquely identify a Controller'''
+
+import hashlib
+from staslib import conf
+
+
+class TID: # pylint: disable=too-many-instance-attributes
+ '''Transport Identifier'''
+
+ RDMA_IP_PORT = '4420'
+ DISC_IP_PORT = '8009'
+
+ def __init__(self, cid: dict):
+ '''@param cid: Controller Identifier. A dictionary with the following
+ contents.
+ {
+ # Transport parameters
+ 'transport': str, # [mandatory]
+ 'traddr': str, # [mandatory]
+ 'subsysnqn': str, # [mandatory]
+ 'trsvcid': str, # [optional]
+ 'host-traddr': str, # [optional]
+ 'host-iface': str, # [optional]
+
+ # Connection parameters
+ 'dhchap-ctrl-secret': str, # [optional]
+ 'hdr-digest': str, # [optional]
+ 'data-digest': str, # [optional]
+ 'nr-io-queues': str, # [optional]
+ 'nr-write-queues': str, # [optional]
+ 'nr-poll-queues': str, # [optional]
+ 'queue-size': str, # [optional]
+ 'kato': str, # [optional]
+ 'reconnect-delay': str, # [optional]
+ 'ctrl-loss-tmo': str, # [optional]
+ 'disable-sqflow': str, # [optional]
+ }
+ '''
+ self._cfg = {
+ k: v
+ for k, v in cid.items()
+ if k not in ('transport', 'traddr', 'subsysnqn', 'trsvcid', 'host-traddr', 'host-iface')
+ }
+ self._transport = cid.get('transport', '')
+ self._traddr = cid.get('traddr', '')
+ self._trsvcid = ''
+ if self._transport in ('tcp', 'rdma'):
+ trsvcid = cid.get('trsvcid', None)
+ self._trsvcid = (
+ trsvcid if trsvcid else (TID.RDMA_IP_PORT if self._transport == 'rdma' else TID.DISC_IP_PORT)
+ )
+ self._host_traddr = cid.get('host-traddr', '')
+ self._host_iface = '' if conf.SvcConf().ignore_iface else cid.get('host-iface', '')
+ self._subsysnqn = cid.get('subsysnqn', '')
+ self._shortkey = (self._transport, self._traddr, self._trsvcid, self._subsysnqn, self._host_traddr)
+ self._key = (self._transport, self._traddr, self._trsvcid, self._subsysnqn, self._host_traddr, self._host_iface)
+ self._hash = int.from_bytes(
+ hashlib.md5(''.join(self._key).encode('utf-8')).digest(), 'big'
+ ) # We need a consistent hash between restarts
+ self._id = f'({self._transport}, {self._traddr}, {self._trsvcid}{", " + self._subsysnqn if self._subsysnqn else ""}{", " + self._host_iface if self._host_iface else ""}{", " + self._host_traddr if self._host_traddr else ""})' # pylint: disable=line-too-long
+
+ @property
+ def transport(self): # pylint: disable=missing-function-docstring
+ return self._transport
+
+ @property
+ def traddr(self): # pylint: disable=missing-function-docstring
+ return self._traddr
+
+ @property
+ def trsvcid(self): # pylint: disable=missing-function-docstring
+ return self._trsvcid
+
+ @property
+ def host_traddr(self): # pylint: disable=missing-function-docstring
+ return self._host_traddr
+
+ @property
+ def host_iface(self): # pylint: disable=missing-function-docstring
+ return self._host_iface
+
+ @property
+ def subsysnqn(self): # pylint: disable=missing-function-docstring
+ return self._subsysnqn
+
+ @property
+ def cfg(self): # pylint: disable=missing-function-docstring
+ return self._cfg
+
+ def as_dict(self):
+ '''Return object members as a dictionary'''
+ data = {
+ 'transport': self.transport,
+ 'traddr': self.traddr,
+ 'subsysnqn': self.subsysnqn,
+ 'trsvcid': self.trsvcid,
+ 'host-traddr': self.host_traddr,
+ 'host-iface': self.host_iface,
+ }
+ data.update(self._cfg)
+ return data
+
+ def __str__(self):
+ return self._id
+
+ def __repr__(self):
+ return self._id
+
+ def __eq__(self, other):
+ if not isinstance(other, self.__class__):
+ return False
+
+ if self._host_iface and other._host_iface:
+ return self._key == other._key
+
+ return self._shortkey == other._shortkey
+
+ def __ne__(self, other):
+ if not isinstance(other, self.__class__):
+ return True
+
+ if self._host_iface and other._host_iface:
+ return self._key != other._key
+
+ return self._shortkey != other._shortkey
+
+ def __hash__(self):
+ return self._hash
diff --git a/staslib/udev.py b/staslib/udev.py
new file mode 100644
index 0000000..12ef61b
--- /dev/null
+++ b/staslib/udev.py
@@ -0,0 +1,334 @@
+# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+'''This module provides functions to access nvme devices using the pyudev module'''
+
+import os
+import time
+import logging
+import pyudev
+from gi.repository import GLib
+from staslib import defs, iputil, trid
+
+
+# ******************************************************************************
+class Udev:
+ '''@brief Udev event monitor. Provide a way to register for udev events.
+ WARNING: THE singleton.Singleton PATTERN CANNOT BE USED WITH THIS CLASS.
+ IT INTERFERES WITH THE pyudev INTERNALS, WHICH CAUSES OBJECT CLEAN UP TO FAIL.
+ '''
+
+ def __init__(self):
+ self._log_event_soak_time = 0
+ self._log_event_count = 0
+ self._device_event_registry = dict()
+ self._action_event_registry = dict()
+ self._context = pyudev.Context()
+ self._monitor = pyudev.Monitor.from_netlink(self._context)
+ self._monitor.filter_by(subsystem='nvme')
+ self._event_source = GLib.io_add_watch(
+ self._monitor.fileno(),
+ GLib.PRIORITY_HIGH,
+ GLib.IO_IN,
+ self._process_udev_event,
+ )
+ self._monitor.start()
+
+ def release_resources(self):
+ '''Release all resources used by this object'''
+ if self._event_source is not None:
+ GLib.source_remove(self._event_source)
+
+ if self._monitor is not None:
+ self._monitor.remove_filter()
+
+ self._event_source = None
+ self._monitor = None
+ self._context = None
+ self._device_event_registry = None
+ self._action_event_registry = None
+
+ def get_nvme_device(self, sys_name):
+ '''@brief Get the udev device object associated with an nvme device.
+ @param sys_name: The device system name (e.g. 'nvme1')
+ @return A pyudev.device._device.Device object
+ '''
+ device_node = os.path.join('/dev', sys_name)
+ try:
+ return pyudev.Devices.from_device_file(self._context, device_node)
+ except pyudev.DeviceNotFoundByFileError as ex:
+ logging.error("Udev.get_nvme_device() - Error: %s", ex)
+ return None
+
+ def is_action_cback_registered(self, action: str, user_cback):
+ '''Returns True if @user_cback is registered for @action. False otherwise.
+ @param action: one of 'add', 'remove', 'change'.
+ @param user_cback: A callback function with this signature: cback(udev_obj)
+ '''
+ return user_cback in self._action_event_registry.get(action, set())
+
+ def register_for_action_events(self, action: str, user_cback):
+ '''@brief Register a callback function to be called when udev events
+ for a specific action are received.
+ @param action: one of 'add', 'remove', 'change'.
+ '''
+ self._action_event_registry.setdefault(action, set()).add(user_cback)
+
+ def unregister_for_action_events(self, action: str, user_cback):
+ '''@brief The opposite of register_for_action_events()'''
+ try:
+ self._action_event_registry.get(action, set()).remove(user_cback)
+ except KeyError: # Raise if user_cback already removed
+ pass
+
+ def register_for_device_events(self, sys_name: str, user_cback):
+ '''@brief Register a callback function to be called when udev events
+ are received for a specific nvme device.
+ @param sys_name: The device system name (e.g. 'nvme1')
+ '''
+ if sys_name:
+ self._device_event_registry[sys_name] = user_cback
+
+ def unregister_for_device_events(self, user_cback):
+ '''@brief The opposite of register_for_device_events()'''
+ entries = list(self._device_event_registry.items())
+ for sys_name, _user_cback in entries:
+ if user_cback == _user_cback:
+ self._device_event_registry.pop(sys_name, None)
+ break
+
+ def get_attributes(self, sys_name: str, attr_ids) -> dict:
+ '''@brief Get all the attributes associated with device @sys_name'''
+ attrs = {attr_id: '' for attr_id in attr_ids}
+ if sys_name and sys_name != 'nvme?':
+ udev = self.get_nvme_device(sys_name)
+ if udev is not None:
+ for attr_id in attr_ids:
+ try:
+ value = udev.attributes.asstring(attr_id).strip()
+ attrs[attr_id] = '' if value == '(efault)' else value
+ except Exception: # pylint: disable=broad-except
+ pass
+
+ return attrs
+
+ @staticmethod
+ def is_dc_device(device):
+ '''@brief check whether device refers to a Discovery Controller'''
+ subsysnqn = device.attributes.get('subsysnqn')
+ if subsysnqn is not None and subsysnqn.decode() == defs.WELL_KNOWN_DISC_NQN:
+ return True
+
+ # Note: Prior to 5.18 linux didn't expose the cntrltype through
+ # the sysfs. So, this may return None on older kernels.
+ cntrltype = device.attributes.get('cntrltype')
+ if cntrltype is not None and cntrltype.decode() == 'discovery':
+ return True
+
+ # Imply Discovery controller based on the absence of children.
+ # Discovery Controllers have no children devices
+ if len(list(device.children)) == 0:
+ return True
+
+ return False
+
+ @staticmethod
+ def is_ioc_device(device):
+ '''@brief check whether device refers to an I/O Controller'''
+ # Note: Prior to 5.18 linux didn't expose the cntrltype through
+ # the sysfs. So, this may return None on older kernels.
+ cntrltype = device.attributes.get('cntrltype')
+ if cntrltype is not None and cntrltype.decode() == 'io':
+ return True
+
+ # Imply I/O controller based on the presence of children.
+ # I/O Controllers have children devices
+ if len(list(device.children)) != 0:
+ return True
+
+ return False
+
+ def find_nvme_dc_device(self, tid):
+ '''@brief Find the nvme device associated with the specified
+ Discovery Controller.
+ @return The device if a match is found, None otherwise.
+ '''
+ for device in self._context.list_devices(
+ subsystem='nvme', NVME_TRADDR=tid.traddr, NVME_TRSVCID=tid.trsvcid, NVME_TRTYPE=tid.transport
+ ):
+ if not self.is_dc_device(device):
+ continue
+
+ if self.get_tid(device) != tid:
+ continue
+
+ return device
+
+ return None
+
+ def find_nvme_ioc_device(self, tid):
+ '''@brief Find the nvme device associated with the specified
+ I/O Controller.
+ @return The device if a match is found, None otherwise.
+ '''
+ for device in self._context.list_devices(
+ subsystem='nvme', NVME_TRADDR=tid.traddr, NVME_TRSVCID=tid.trsvcid, NVME_TRTYPE=tid.transport
+ ):
+ if not self.is_ioc_device(device):
+ continue
+
+ if self.get_tid(device) != tid:
+ continue
+
+ return device
+
+ return None
+
+ def get_nvme_ioc_tids(self, transports):
+ '''@brief Find all the I/O controller nvme devices in the system.
+ @return A list of pyudev.device._device.Device objects
+ '''
+ tids = []
+ for device in self._context.list_devices(subsystem='nvme'):
+ if device.properties.get('NVME_TRTYPE', '') not in transports:
+ continue
+
+ if not self.is_ioc_device(device):
+ continue
+
+ tids.append(self.get_tid(device))
+
+ return tids
+
+ def _process_udev_event(self, event_source, condition): # pylint: disable=unused-argument
+ if condition == GLib.IO_IN:
+ event_count = 0
+ while True:
+ try:
+ device = self._monitor.poll(timeout=0)
+ except EnvironmentError as ex:
+ device = None
+ # This event seems to happen in bursts. So, let's suppress
+ # logging for 2 seconds to avoid filling the syslog.
+ self._log_event_count += 1
+ now = time.time()
+ if now > self._log_event_soak_time:
+ logging.debug('Udev._process_udev_event() - %s [%s]', ex, self._log_event_count)
+ self._log_event_soak_time = now + 2
+ self._log_event_count = 0
+
+ if device is None:
+ break
+
+ event_count += 1
+ self._device_event(device, event_count)
+
+ return GLib.SOURCE_CONTINUE
+
+ @staticmethod
+ def __cback_names(action_cbacks, device_cback):
+ names = []
+ for cback in action_cbacks:
+ names.append(cback.__name__ + '()')
+ if device_cback:
+ names.append(device_cback.__name__ + '()')
+ return names
+
+ def _device_event(self, device, event_count):
+ action_cbacks = self._action_event_registry.get(device.action, set())
+ device_cback = self._device_event_registry.get(device.sys_name, None)
+
+ logging.debug(
+ 'Udev._device_event() - %-8s %-6s %-8s %s',
+ f'{device.sys_name}:',
+ device.action,
+ f'{event_count:2}:{device.sequence_number}',
+ self.__cback_names(action_cbacks, device_cback),
+ )
+
+ for action_cback in action_cbacks:
+ GLib.idle_add(action_cback, device)
+
+ if device_cback is not None:
+ GLib.idle_add(device_cback, device)
+
+ @staticmethod
+ def _get_property(device, prop, default=''):
+ prop = device.properties.get(prop, default)
+ return '' if prop.lower() == 'none' else prop
+
+ @staticmethod
+ def _get_attribute(device, attr_id, default=''):
+ try:
+ attr = device.attributes.asstring(attr_id).strip()
+ except Exception: # pylint: disable=broad-except
+ attr = default
+
+ return '' if attr.lower() == 'none' else attr
+
+ @staticmethod
+ def get_key_from_attr(device, attr, key, delim=','):
+ '''Get attribute specified by attr, which is composed of key=value pairs.
+ Then return the value associated with key.
+ @param device: The Device object
+ @param attr: The device's attribute to get
+ @param key: The key to look for in the attribute
+ @param delim: Delimiter used between key=value pairs.
+ @example:
+ "address" attribute contains "trtype=tcp,traddr=10.10.1.100,trsvcid=4420,host_traddr=10.10.1.50"
+ '''
+ attr_str = Udev._get_attribute(device, attr)
+ if not attr_str:
+ return ''
+
+ if key[-1] != '=':
+ key += '='
+ start = attr_str.find(key)
+ if start < 0:
+ return ''
+ start += len(key)
+
+ end = attr_str.find(delim, start)
+ if end < 0:
+ return attr_str[start:]
+
+ return attr_str[start:end]
+
+ @staticmethod
+ def _get_host_iface(device):
+ host_iface = Udev._get_property(device, 'NVME_HOST_IFACE')
+ if not host_iface:
+ # We'll try to find the interface from the source address on
+ # the connection. Only available if kernel exposes the source
+ # address (src_addr) in the "address" attribute.
+ src_addr = Udev.get_key_from_attr(device, 'address', 'src_addr=')
+ host_iface = iputil.get_interface(src_addr)
+ return host_iface
+
+ @staticmethod
+ def get_tid(device):
+ '''@brief return the Transport ID associated with a udev device'''
+ cid = {
+ 'transport': Udev._get_property(device, 'NVME_TRTYPE'),
+ 'traddr': Udev._get_property(device, 'NVME_TRADDR'),
+ 'trsvcid': Udev._get_property(device, 'NVME_TRSVCID'),
+ 'host-traddr': Udev._get_property(device, 'NVME_HOST_TRADDR'),
+ 'host-iface': Udev._get_host_iface(device),
+ 'subsysnqn': Udev._get_attribute(device, 'subsysnqn'),
+ }
+ return trid.TID(cid)
+
+
+UDEV = Udev() # Singleton
+
+
+def shutdown():
+ '''Destroy the UDEV singleton'''
+ global UDEV # pylint: disable=global-statement,global-variable-not-assigned
+ UDEV.release_resources()
+ del UDEV
diff --git a/staslib/version.py b/staslib/version.py
new file mode 100644
index 0000000..999d916
--- /dev/null
+++ b/staslib/version.py
@@ -0,0 +1,64 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+''' distutils (and hence LooseVersion) is being deprecated. None of the
+ suggested replacements (e.g. from pkg_resources import parse_version) quite
+ work with Linux kernel versions the way LooseVersion does.
+
+ It was suggested to simply lift the LooseVersion code and vendor it in,
+ which is what this module is about.
+'''
+
+import re
+
+
+class KernelVersion:
+ '''Code loosely lifted from distutils's LooseVersion'''
+
+ component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE)
+
+ def __init__(self, string: str):
+ self.string = string
+ self.version = self.__parse(string)
+
+ def __str__(self):
+ return self.string
+
+ def __repr__(self):
+ return f'KernelVersion ("{self}")'
+
+ def __eq__(self, other):
+ return self.version == self.__version(other)
+
+ def __lt__(self, other):
+ return self.version < self.__version(other)
+
+ def __le__(self, other):
+ return self.version <= self.__version(other)
+
+ def __gt__(self, other):
+ return self.version > self.__version(other)
+
+ def __ge__(self, other):
+ return self.version >= self.__version(other)
+
+ @staticmethod
+ def __version(obj):
+ return obj.version if isinstance(obj, KernelVersion) else KernelVersion.__parse(obj)
+
+ @staticmethod
+ def __parse(string):
+ components = []
+ for item in KernelVersion.component_re.split(string):
+ if item and item != '.':
+ try:
+ components.append(int(item))
+ except ValueError:
+ pass
+
+ return components