diff options
Diffstat (limited to '')
-rw-r--r-- | staslib/gutil.py | 418 |
1 files changed, 418 insertions, 0 deletions
diff --git a/staslib/gutil.py b/staslib/gutil.py new file mode 100644 index 0000000..c40b80e --- /dev/null +++ b/staslib/gutil.py @@ -0,0 +1,418 @@ +# Copyright (c) 2022, Dell Inc. or its subsidiaries. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# See the LICENSE file for details. +# +# This file is part of NVMe STorage Appliance Services (nvme-stas). +# +# Authors: Martin Belanger <Martin.Belanger@dell.com> +# +'''This module provides utility functions/classes to provide easier to use +access to GLib/Gio/Gobject resources. +''' + +import logging +from gi.repository import Gio, GLib, GObject +from staslib import conf, iputil, trid + + +# ****************************************************************************** +class GTimer: + '''@brief Convenience class to wrap GLib timers''' + + def __init__( + self, interval_sec: float = 0, user_cback=lambda: GLib.SOURCE_REMOVE, *user_data, priority=GLib.PRIORITY_DEFAULT + ): # pylint: disable=keyword-arg-before-vararg + self._source = None + self._interval_sec = float(interval_sec) + self._user_cback = user_cback + self._user_data = user_data + self._priority = priority if priority is not None else GLib.PRIORITY_DEFAULT + + def _release_resources(self): + self.stop() + self._user_cback = None + self._user_data = None + + def kill(self): + '''@brief Used to release all resources associated with a timer.''' + self._release_resources() + + def __str__(self): + if self._source is not None: + return f'{self._interval_sec}s [{self.time_remaining()}s]' + + return f'{self._interval_sec}s [off]' + + def _callback(self, *_): + retval = self._user_cback(*self._user_data) + if retval == GLib.SOURCE_REMOVE: + self._source = None + return retval + + def stop(self): + '''@brief Stop timer''' + if self._source is not None: + self._source.destroy() + self._source = None + + def start(self, new_interval_sec: float = -1.0): + '''@brief Start (or restart) timer''' + if new_interval_sec >= 0: + self._interval_sec = float(new_interval_sec) + + if self._source is not None: + self._source.set_ready_time( + self._source.get_time() + (self._interval_sec * 1000000) + ) # ready time is in micro-seconds (monotonic time) + else: + if self._interval_sec.is_integer(): + self._source = GLib.timeout_source_new_seconds(int(self._interval_sec)) # seconds resolution + else: + self._source = GLib.timeout_source_new(self._interval_sec * 1000.0) # mili-seconds resolution + + self._source.set_priority(self._priority) + self._source.set_callback(self._callback) + self._source.attach() + + def clear(self): + '''@brief Make timer expire now. The callback function + will be invoked immediately by the main loop. + ''' + if self._source is not None: + self._source.set_ready_time(0) # Expire now! + + def set_callback(self, user_cback, *user_data): + '''@brief set the callback function to invoke when timer expires''' + self._user_cback = user_cback + self._user_data = user_data + + def set_timeout(self, new_interval_sec: float): + '''@brief set the timer's duration''' + if new_interval_sec >= 0: + self._interval_sec = float(new_interval_sec) + + def get_timeout(self): + '''@brief get the timer's duration''' + return self._interval_sec + + def time_remaining(self) -> float: + '''@brief Get how much time remains on a timer before it fires.''' + if self._source is not None: + delta_us = self._source.get_ready_time() - self._source.get_time() # monotonic time in micro-seconds + if delta_us > 0: + return delta_us / 1000000.0 + + return 0 + + +# ****************************************************************************** +class NameResolver: # pylint: disable=too-few-public-methods + '''@brief DNS resolver to convert host names to IP addresses.''' + + def __init__(self): + self._resolver = Gio.Resolver.get_default() + + def resolve_ctrl_async(self, cancellable, controllers_in: list, callback): + '''@brief The traddr fields may specify a hostname instead of an IP + address. We need to resolve all the host names to addresses. + Resolving hostnames may take a while as a DNS server may need + to be contacted. For that reason, we're using async APIs with + callbacks to resolve all the hostnames. + + The callback @callback will be called once all hostnames have + been resolved. + + @param controllers: List of trid.TID + ''' + pending_resolution_count = 0 + controllers_out = [] + service_conf = conf.SvcConf() + + def addr_resolved(resolver, result, controller): + try: + addresses = resolver.lookup_by_name_finish(result) # List of Gio.InetAddress objects + + except GLib.GError as err: + # We don't need to report "cancellation" errors. + if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED): + # pylint: disable=no-member + logging.debug('NameResolver.resolve_ctrl_async() - %s %s', err.message, controller) + else: + logging.error('%s', err.message) # pylint: disable=no-member + + # if err.matches(Gio.resolver_error_quark(), Gio.ResolverError.TEMPORARY_FAILURE): + # elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.NOT_FOUND): + # elif err.matches(Gio.resolver_error_quark(), Gio.ResolverError.INTERNAL): + + else: + traddr = None + + # If multiple addresses are returned (which is often the case), + # prefer IPv4 addresses over IPv6. + if 4 in service_conf.ip_family: + for address in addresses: + # There may be multiple IPv4 addresses. Pick 1st one. + if address.get_family() == Gio.SocketFamily.IPV4: + traddr = address.to_string() + break + + if traddr is None and 6 in service_conf.ip_family: + for address in addresses: + # There may be multiple IPv6 addresses. Pick 1st one. + if address.get_family() == Gio.SocketFamily.IPV6: + traddr = address.to_string() + break + + if traddr is not None: + logging.debug( + 'NameResolver.resolve_ctrl_async() - resolved \'%s\' -> %s', controller.traddr, traddr + ) + cid = controller.as_dict() + cid['traddr'] = traddr + nonlocal controllers_out + controllers_out.append(trid.TID(cid)) + + # Invoke callback after all hostnames have been resolved + nonlocal pending_resolution_count + pending_resolution_count -= 1 + if pending_resolution_count == 0: + callback(controllers_out) + + for controller in controllers_in: + if controller.transport in ('tcp', 'rdma'): + hostname_or_addr = controller.traddr + if not hostname_or_addr: + logging.error('Invalid traddr: %s', controller) + else: + # Try to convert to an ipaddress object. If this + # succeeds, then we don't need to call the resolver. + ip = iputil.get_ipaddress_obj(hostname_or_addr) + if ip is None: + logging.debug('NameResolver.resolve_ctrl_async() - resolving \'%s\'', hostname_or_addr) + pending_resolution_count += 1 + self._resolver.lookup_by_name_async(hostname_or_addr, cancellable, addr_resolved, controller) + elif ip.version in service_conf.ip_family: + controllers_out.append(controller) + else: + logging.warning( + 'Excluding configured IP address %s based on "ip-family" setting', hostname_or_addr + ) + else: + controllers_out.append(controller) + + if pending_resolution_count == 0: # No names are pending asynchronous resolution + callback(controllers_out) + + +# ****************************************************************************** +class _TaskRunner(GObject.Object): + '''@brief This class allows running methods asynchronously in a thread.''' + + def __init__(self, user_function, *user_args): + '''@param user_function: function to run inside a thread + @param user_args: arguments passed to @user_function + ''' + super().__init__() + self._user_function = user_function + self._user_args = user_args + + def communicate(self, cancellable, cb_function, *cb_args): + '''@param cancellable: A Gio.Cancellable object that can be used to + cancel an in-flight async command. + @param cb_function: User callback function to call when the async + command has completed. The callback function + will be passed these arguments: + + (runner, result, *cb_args) + + Where: + runner: This _TaskRunner object instance + result: A GObject.Object instance that contains the result + cb_args: The cb_args arguments passed to communicate() + + @param cb_args: User arguments to pass to @cb_function + ''' + + def in_thread_exec(task, self, task_data, cancellable): # pylint: disable=unused-argument + if task.return_error_if_cancelled(): + return # Bail out if task has been cancelled + + try: + value = GObject.Object() + value.result = self._user_function(*self._user_args) + task.return_value(value) + except Exception as ex: # pylint: disable=broad-except + task.return_error(GLib.Error(message=str(ex), domain=type(ex).__name__)) + + task = Gio.Task.new(self, cancellable, cb_function, *cb_args) + task.set_return_on_cancel(False) + task.run_in_thread(in_thread_exec) + return task + + def communicate_finish(self, result): # pylint: disable=no-self-use + '''@brief Use this function in your callback (see @cb_function) to + extract data from the result object. + + @return On success (True, data, None), + On failure (False, None, err: GLib.Error) + ''' + try: + success, value = result.propagate_value() + return success, value.result, None + except GLib.Error as err: + return False, None, err + + +# ****************************************************************************** +class AsyncTask: # pylint: disable=too-many-instance-attributes + '''Object used to manage an asynchronous GLib operation. The operation + can be cancelled or retried. + ''' + + def __init__(self, on_success_callback, on_failure_callback, operation, *op_args): + '''@param on_success_callback: Callback method invoked when @operation completes successfully + @param on_failure_callback: Callback method invoked when @operation fails + @param operation: Operation (i.e. a function) to execute asynchronously + @param op_args: Arguments passed to operation + ''' + self._cancellable = Gio.Cancellable() + self._operation = operation + self._op_args = op_args + self._success_cb = on_success_callback + self._fail_cb = on_failure_callback + self._retry_tmr = None + self._errmsg = None + self._task = None + self._fail_cnt = 0 + + def _release_resources(self): + if self._alive(): + self._cancellable.cancel() + + if self._retry_tmr is not None: + self._retry_tmr.kill() + + self._operation = None + self._op_args = None + self._success_cb = None + self._fail_cb = None + self._retry_tmr = None + self._errmsg = None + self._task = None + self._fail_cnt = None + self._cancellable = None + + def __str__(self): + return str(self.as_dict()) + + def as_dict(self): + '''Return object members as a dictionary''' + info = { + 'fail count': self._fail_cnt, + 'completed': self._task.get_completed(), + 'alive': self._alive(), + } + + if self._retry_tmr: + info['retry timer'] = str(self._retry_tmr) + + if self._errmsg: + info['error'] = self._errmsg + + return info + + def _alive(self): + return self._cancellable and not self._cancellable.is_cancelled() + + def completed(self): + '''@brief Returns True if the task has completed, False otherwise.''' + return self._task is not None and self._task.get_completed() + + def cancel(self): + '''@brief cancel async operation''' + if self._alive(): + self._cancellable.cancel() + + def kill(self): + '''@brief kill and clean up this object''' + self._release_resources() + + def run_async(self, *args): + '''@brief + Method used to initiate an asynchronous operation with the + Controller. When the operation completes (or fails) the + callback method @_on_operation_complete() will be invoked. + ''' + runner = _TaskRunner(self._operation, *self._op_args) + self._task = runner.communicate(self._cancellable, self._on_operation_complete, *args) + + def retry(self, interval_sec, *args): + '''@brief Tell this object that the async operation is to be retried + in @interval_sec seconds. + + ''' + if self._retry_tmr is None: + self._retry_tmr = GTimer() + self._retry_tmr.set_callback(self._on_retry_timeout, *args) + self._retry_tmr.start(interval_sec) + + def _on_retry_timeout(self, *args): + '''@brief + When an operation fails, the application has the option to + retry at a later time by calling the retry() method. The + retry() method starts a timer at the end of which the operation + will be executed again. This is the method that is called when + the timer expires. + ''' + if self._alive(): + self.run_async(*args) + return GLib.SOURCE_REMOVE + + def _on_operation_complete(self, runner, result, *args): + '''@brief + This callback method is invoked when the operation with the + Controller has completed (be it successful or not). + ''' + # The operation might have been cancelled. + # Only proceed if it hasn't been cancelled. + if self._operation is None or not self._alive(): + return + + success, data, err = runner.communicate_finish(result) + + if success: + self._errmsg = None + self._fail_cnt = 0 + self._success_cb(self, data, *args) + else: + self._errmsg = str(err) + self._fail_cnt += 1 + self._fail_cb(self, err, self._fail_cnt, *args) + + +# ****************************************************************************** +class Deferred: + '''Implement a deferred function call. A deferred is a function that gets + added to the main loop to be executed during the next idle slot.''' + + def __init__(self, func, *user_data): + self._source = None + self._func = func + self._user_data = user_data + + def schedule(self): + '''Schedule the function to be called by the main loop. If the + function is already scheduled, then do nothing''' + if not self.is_scheduled(): + srce_id = GLib.idle_add(self._func, *self._user_data) + self._source = GLib.main_context_default().find_source_by_id(srce_id) + + def is_scheduled(self): + '''Check if deferred is currently schedules to run''' + return self._source and not self._source.is_destroyed() + + def cancel(self): + '''Remove deferred from main loop''' + if self.is_scheduled(): + self._source.destroy() + self._source = None |