path: root/staslib/
diff options
Diffstat (limited to '')
1 files changed, 554 insertions, 0 deletions
diff --git a/staslib/ b/staslib/
new file mode 100644
index 0000000..95afb94
--- /dev/null
+++ b/staslib/
@@ -0,0 +1,554 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+# Authors: Martin Belanger <>
+'''Library for staf/stac. You will find here common code for stafd and stacd
+including the Abstract Base Classes (ABC) for Controllers and Services'''
+import os
+import sys
+import abc
+import signal
+import pickle
+import logging
+import dasbus.connection
+from gi.repository import Gio, GLib
+from systemd.daemon import notify as sd_notify
+from staslib import conf, defs, gutil, log, trid
+ # Python 3.9 or later
+ # This is the preferred way, but may not be available before Python 3.9
+ from importlib.resources import files
+except ImportError:
+ try:
+ # Pre Python 3.9 backport of importlib.resources (if installed)
+ from importlib_resources import files
+ except ImportError:
+ # Less efficient, but avalable on older versions of Python
+ import pkg_resources
+ def load_idl(idl_fname):
+ '''@brief Load D-Bus Interface Description Language File'''
+ try:
+ return pkg_resources.resource_string('staslib', idl_fname).decode()
+ except (FileNotFoundError, AttributeError):
+ pass
+ return ''
+ else:
+ def load_idl(idl_fname):
+ '''@brief Load D-Bus Interface Description Language File'''
+ try:
+ return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
+ except FileNotFoundError:
+ pass
+ return ''
+ def load_idl(idl_fname):
+ '''@brief Load D-Bus Interface Description Language File'''
+ try:
+ return files('staslib').joinpath(idl_fname).read_text() # pylint: disable=unspecified-encoding
+ except FileNotFoundError:
+ pass
+ return ''
+# ******************************************************************************
+def check_if_allowed_to_continue():
+ '''@brief Let's perform some basic checks before going too far. There are
+ a few pre-requisites that need to be met before this program
+ is allowed to proceed:
+ 1) The program needs to have root privileges
+ 2) The nvme-tcp kernel module must be loaded
+ @return This function will only return if all conditions listed above
+ are met. Otherwise the program exits.
+ '''
+ # 1) Check root privileges
+ if os.geteuid() != 0:
+ sys.exit(f'Permission denied. You need root privileges to run {defs.PROG_NAME}.')
+ # 2) Check that nvme-tcp kernel module is running
+ if not os.path.exists('/dev/nvme-fabrics'):
+ # There's no point going any further if the kernel module hasn't been loaded
+ sys.exit('Fatal error: missing nvme-tcp kernel module')
+# ******************************************************************************
+def tid_from_dlpe(dlpe, host_traddr, host_iface):
+ '''@brief Take a Discovery Log Page Entry and return a Controller ID as a dict.'''
+ cid = {
+ 'transport': dlpe['trtype'],
+ 'traddr': dlpe['traddr'],
+ 'trsvcid': dlpe['trsvcid'],
+ 'host-traddr': host_traddr,
+ 'host-iface': host_iface,
+ 'subsysnqn': dlpe['subnqn'],
+ }
+ return trid.TID(cid)
+# ******************************************************************************
+def _excluded(excluded_ctrl_list, controller: dict):
+ '''@brief Check if @controller is excluded.'''
+ for excluded_ctrl in excluded_ctrl_list:
+ test_results = [val == controller.get(key, None) for key, val in excluded_ctrl.items()]
+ if all(test_results):
+ return True
+ return False
+# ******************************************************************************
+def remove_excluded(controllers: list):
+ '''@brief Remove excluded controllers from the list of controllers.
+ @param controllers: List of TIDs
+ '''
+ excluded_ctrl_list = conf.SvcConf().get_excluded()
+ if excluded_ctrl_list:
+ logging.debug('remove_excluded() - excluded_ctrl_list = %s', excluded_ctrl_list)
+ controllers = [
+ controller for controller in controllers if not _excluded(excluded_ctrl_list, controller.as_dict())
+ ]
+ return controllers
+# ******************************************************************************
+class ControllerABC(abc.ABC):
+ '''@brief Base class used to manage the connection to a controller.'''
+ def __init__(self, tid: trid.TID, service, discovery_ctrl: bool = False):
+ self._tid = tid
+ self._serv = service # Refers to the parent service (either Staf or Stac)
+ self.set_level_from_tron(self._serv.tron)
+ self._cancellable = Gio.Cancellable()
+ self._connect_attempts = 0
+ self._retry_connect_tmr = gutil.GTimer(self.CONNECT_RETRY_PERIOD_SEC, self._on_try_to_connect)
+ self._discovery_ctrl = discovery_ctrl
+ self._try_to_connect_deferred = gutil.Deferred(self._try_to_connect)
+ self._try_to_connect_deferred.schedule()
+ def _release_resources(self):
+ # Remove pending deferred from main loop
+ if self._try_to_connect_deferred:
+ self._try_to_connect_deferred.cancel()
+ if self._retry_connect_tmr is not None:
+ self._retry_connect_tmr.kill()
+ if self._alive():
+ self._cancellable.cancel()
+ self._tid = None
+ self._serv = None
+ self._cancellable = None
+ self._retry_connect_tmr = None
+ self._try_to_connect_deferred = None
+ @property
+ def id(self) -> str:
+ '''@brief Return the Transport ID as a printable string'''
+ return str(self.tid)
+ @property
+ def tid(self):
+ '''@brief Return the Transport ID object'''
+ return self._tid
+ def controller_id_dict(self) -> dict:
+ '''@brief return the controller ID as a dict.'''
+ return {k: str(v) for k, v in self.tid.as_dict().items()}
+ def details(self) -> dict:
+ '''@brief return detailed debug info about this controller'''
+ return
+ def info(self) -> dict:
+ '''@brief Get the controller info for this object'''
+ info = self.controller_id_dict()
+ info['connect attempts'] = str(self._connect_attempts)
+ info['retry connect timer'] = str(self._retry_connect_tmr)
+ return info
+ def cancel(self):
+ '''@brief Used to cancel pending operations.'''
+ if self._alive():
+ logging.debug('ControllerABC.cancel() - %s',
+ self._cancellable.cancel()
+ def kill(self):
+ '''@brief Used to release all resources associated with this object.'''
+ logging.debug('ControllerABC.kill() - %s',
+ self._release_resources()
+ def _alive(self):
+ '''There may be race condition where a queued event gets processed
+ after the object is no longer configured (i.e. alive). This method
+ can be used by callback functions to make sure the object is still
+ alive before processing further.
+ '''
+ return self._cancellable and not self._cancellable.is_cancelled()
+ def _on_try_to_connect(self):
+ if self._alive():
+ self._try_to_connect_deferred.schedule()
+ def _should_try_to_reconnect(self): # pylint: disable=no-self-use
+ return True
+ def _try_to_connect(self):
+ if not self._alive():
+ # This is a deferred function call. Make sure
+ # the source of the deferred is still good.
+ source = GLib.main_current_source()
+ if source and source.is_destroyed():
+ self._connect_attempts += 1
+ self._do_connect()
+ @abc.abstractmethod
+ def set_level_from_tron(self, tron):
+ '''Set log level based on TRON'''
+ @abc.abstractmethod
+ def _do_connect(self):
+ '''Perform connection'''
+ @abc.abstractmethod
+ def _on_aen(self, aen: int):
+ '''Event handler when an AEN is received'''
+ @abc.abstractmethod
+ def _on_nvme_event(self, nvme_event):
+ '''Event handler when an nvme_event is received'''
+ @abc.abstractmethod
+ def _on_ctrl_removed(self, udev_obj):
+ '''Called when the associated nvme device (/dev/nvmeX) is removed
+ from the system by the kernel.
+ '''
+ @abc.abstractmethod
+ def _find_existing_connection(self):
+ '''Check if there is an existing connection that matches this Controller's TID'''
+ @abc.abstractmethod
+ def all_ops_completed(self) -> bool:
+ '''@brief Returns True if all operations have completed. False otherwise.'''
+ @abc.abstractmethod
+ def connected(self):
+ '''@brief Return whether a connection is established'''
+ @abc.abstractmethod
+ def disconnect(self, disconnected_cb, keep_connection):
+ '''@brief Issue an asynchronous disconnect command to a Controller.
+ Once the async command has completed, the callback 'disconnected_cb'
+ will be invoked. If a controller is already disconnected, then the
+ callback will be added to the main loop's next idle slot to be executed
+ '''
+ @abc.abstractmethod
+ def reload_hdlr(self):
+ '''@brief This is called when a "reload" signal is received.'''
+# ******************************************************************************
+class ServiceABC(abc.ABC): # pylint: disable=too-many-instance-attributes
+ '''@brief Base class used to manage a STorage Appliance Service'''
+ def __init__(self, args, default_conf, reload_hdlr):
+ service_conf = conf.SvcConf(default_conf=default_conf)
+ service_conf.set_conf_file(args.conf_file) # reload configuration
+ self._tron = args.tron or service_conf.tron
+ log.set_level_from_tron(self._tron)
+ self._lkc_file = os.path.join(
+ os.environ.get('RUNTIME_DIRECTORY', os.path.join('/run', defs.PROG_NAME)), 'last-known-config.pickle'
+ )
+ self._loop = GLib.MainLoop()
+ self._cancellable = Gio.Cancellable()
+ self._resolver = gutil.NameResolver()
+ self._controllers = self._load_last_known_config()
+ self._dbus_iface = None
+ self._cfg_soak_tmr = gutil.GTimer(self.CONF_STABILITY_SOAK_TIME_SEC, self._on_config_ctrls)
+ self._sysbus = dasbus.connection.SystemMessageBus()
+ GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self._stop_hdlr) # CTRL-C
+ GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self._stop_hdlr) # systemctl stop stafd
+ GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, reload_hdlr) # systemctl reload stafd
+ nvme_options = conf.NvmeOptions()
+ if not nvme_options.host_iface_supp or not nvme_options.discovery_supp:
+ logging.warning(
+ 'Kernel does not appear to support all the options needed to run this program. Consider updating to a later kernel version.'
+ )
+ # We don't want to apply configuration changes right away.
+ # Often, multiple changes will occur in a short amount of time (sub-second).
+ # We want to wait until there are no more changes before applying them
+ # to the system. The following timer acts as a "soak period". Changes
+ # will be applied by calling self._on_config_ctrls() at the end of
+ # the soak period.
+ self._cfg_soak_tmr.start()
+ def _release_resources(self):
+ logging.debug('ServiceABC._release_resources()')
+ if self._alive():
+ self._cancellable.cancel()
+ if self._cfg_soak_tmr is not None:
+ self._cfg_soak_tmr.kill()
+ self._controllers.clear()
+ if self._sysbus:
+ self._sysbus.disconnect()
+ self._cfg_soak_tmr = None
+ self._cancellable = None
+ self._resolver = None
+ self._lkc_file = None
+ self._sysbus = None
+ def _config_dbus(self, iface_obj, bus_name: str, obj_name: str):
+ self._dbus_iface = iface_obj
+ self._sysbus.publish_object(obj_name, iface_obj)
+ self._sysbus.register_service(bus_name)
+ @property
+ def tron(self):
+ '''@brief Get Trace ON property'''
+ return self._tron
+ @tron.setter
+ def tron(self, value):
+ '''@brief Set Trace ON property'''
+ self._tron = value
+ log.set_level_from_tron(self._tron)
+ for controller in self._controllers.values():
+ controller.set_level_from_tron(self._tron)
+ def run(self):
+ '''@brief Start the main loop execution'''
+ try:
+ except Exception as ex: # pylint: disable=broad-except
+ logging.critical('exception: %s', ex)
+ self._loop = None
+ def info(self) -> dict:
+ '''@brief Get the status info for this object (used for debug)'''
+ nvme_options = conf.NvmeOptions()
+ info = conf.SysConf().as_dict()
+ info['last known config file'] = self._lkc_file
+ info['config soak timer'] = str(self._cfg_soak_tmr)
+ info['kernel support.TP8013'] = str(nvme_options.discovery_supp)
+ info['kernel support.host_iface'] = str(nvme_options.host_iface_supp)
+ return info
+ def get_controllers(self) -> dict:
+ '''@brief return the list of controller objects'''
+ return self._controllers.values()
+ def get_controller(
+ self, transport: str, traddr: str, trsvcid: str, host_traddr: str, host_iface: str, subsysnqn: str
+ ): # pylint: disable=too-many-arguments
+ '''@brief get the specified controller object from the list of controllers'''
+ cid = {
+ 'transport': transport,
+ 'traddr': traddr,
+ 'trsvcid': trsvcid,
+ 'host-traddr': host_traddr,
+ 'host-iface': host_iface,
+ 'subsysnqn': subsysnqn,
+ }
+ return self._controllers.get(trid.TID(cid))
+ def _remove_ctrl_from_dict(self, controller, shutdown=False):
+ tid_to_pop = controller.tid
+ if not tid_to_pop:
+ # Being paranoid. This should not happen, but let's say the
+ # controller object has been purged, but it is somehow still
+ # listed in self._controllers.
+ for tid, _controller in self._controllers.items():
+ if _controller is controller:
+ tid_to_pop = tid
+ break
+ if tid_to_pop:
+ logging.debug('ServiceABC._remove_ctrl_from_dict()- %s | %s', tid_to_pop, controller.device)
+ popped = self._controllers.pop(tid_to_pop, None)
+ if not shutdown and popped is not None and self._cfg_soak_tmr:
+ self._cfg_soak_tmr.start()
+ else:
+ logging.debug('ServiceABC._remove_ctrl_from_dict()- already removed')
+ def remove_controller(self, controller, success): # pylint: disable=unused-argument
+ '''@brief remove the specified controller object from the list of controllers
+ @param controller: the controller object
+ @param success: whether the disconnect was successful'''
+ logging.debug('ServiceABC.remove_controller()')
+ if isinstance(controller, ControllerABC):
+ self._remove_ctrl_from_dict(controller)
+ controller.kill()
+ def _alive(self):
+ '''It's a good idea to check that this object hasn't been
+ cancelled (i.e. is still alive) when entering a callback function.
+ Callback functrions can be invoked after, for example, a process has
+ been signalled to stop or restart, in which case it makes no sense to
+ proceed with the callback.
+ '''
+ return self._cancellable and not self._cancellable.is_cancelled()
+ def _cancel(self):
+ logging.debug('ServiceABC._cancel()')
+ if self._alive():
+ self._cancellable.cancel()
+ for controller in self._controllers.values():
+ controller.cancel()
+ def _stop_hdlr(self):
+ logging.debug('ServiceABC._stop_hdlr()')
+ sd_notify('STOPPING=1')
+ self._cancel() # Cancel pending operations
+ self._dump_last_known_config(self._controllers)
+ if len(self._controllers) == 0:
+ GLib.idle_add(self._exit)
+ else:
+ self._disconnect_all()
+ def _on_final_disconnect(self, controller, success):
+ '''Callback invoked after a controller is disconnected.
+ @param controller: the controller object
+ @param success: whether the disconnect operation was successful
+ '''
+ logging.debug(
+ 'ServiceABC._on_final_disconnect() - %s | %s: disconnect %s',
+ controller.device,
+ 'succeeded' if success else 'failed',
+ )
+ self._remove_ctrl_from_dict(controller, True)
+ controller.kill()
+ # When all controllers have disconnected, we can finish the clean up
+ if len(self._controllers) == 0:
+ # Defer exit to the next main loop's idle period.
+ GLib.idle_add(self._exit)
+ def _exit(self):
+ logging.debug('ServiceABC._exit()')
+ self._release_resources()
+ self._loop.quit()
+ def _on_config_ctrls(self, *_user_data):
+ if self._alive():
+ self._config_ctrls()
+ def _config_ctrls(self):
+ '''@brief Start controllers configuration.'''
+ # The configuration file may contain controllers and/or excluded
+ # controllers with traddr specified as hostname instead of IP address.
+ # Because of this, we need to remove those excluded elements before
+ # running name resolution. And we will need to remove excluded
+ # elements after name resolution is complete (i.e. in the calback
+ # function _config_ctrls_finish)
+ logging.debug('ServiceABC._config_ctrls()')
+ configured_controllers = [trid.TID(cid) for cid in conf.SvcConf().get_controllers()]
+ configured_controllers = remove_excluded(configured_controllers)
+ self._resolver.resolve_ctrl_async(self._cancellable, configured_controllers, self._config_ctrls_finish)
+ def _read_lkc(self):
+ '''@brief Read Last Known Config from file'''
+ try:
+ with open(self._lkc_file, 'rb') as file:
+ return pickle.load(file)
+ except (FileNotFoundError, AttributeError, EOFError):
+ return None
+ def _write_lkc(self, config):
+ '''@brief Write Last Known Config to file, and if config is empty
+ make sure the file is emptied.'''
+ try:
+ # Note that if config is empty we still
+ # want to open/close the file to empty it.
+ with open(self._lkc_file, 'wb') as file:
+ if config:
+ pickle.dump(config, file)
+ except FileNotFoundError as ex:
+ logging.error('Unable to save last known config: %s', ex)
+ @abc.abstractmethod
+ def _disconnect_all(self):
+ '''Tell all controller objects to disconnect'''
+ @abc.abstractmethod
+ def _keep_connections_on_exit(self):
+ '''@brief Determine whether connections should remain when the
+ process exits.
+ NOTE) This is the base class method used to define the interface.
+ It must be overloaded by a child class.
+ '''
+ @abc.abstractmethod
+ def _config_ctrls_finish(self, configured_ctrl_list):
+ '''@brief Finish controllers configuration after hostnames (if any)
+ have been resolved.
+ Configuring controllers must be done asynchronously in 2 steps.
+ In the first step, host names get resolved to find their IP addresses.
+ Name resolution can take a while, especially when an external name
+ resolution server is used. Once that step completed, the callback
+ method _config_ctrls_finish() (i.e. this method), gets invoked to
+ complete the controller configuration.
+ NOTE) This is the base class method used to define the interface.
+ It must be overloaded by a child class.
+ '''
+ @abc.abstractmethod
+ def _load_last_known_config(self):
+ '''Load last known config from file (if any)'''
+ @abc.abstractmethod
+ def _dump_last_known_config(self, controllers):
+ '''Save last known config to file'''