diff options
Diffstat (limited to '')
-rw-r--r-- | staslib/avahi.py | 278 | ||||
-rw-r--r-- | staslib/ctrl.py | 6 | ||||
-rw-r--r-- | staslib/gutil.py | 98 |
3 files changed, 290 insertions, 92 deletions
diff --git a/staslib/avahi.py b/staslib/avahi.py index 26543d9..f29f89d 100644 --- a/staslib/avahi.py +++ b/staslib/avahi.py @@ -18,7 +18,7 @@ import dasbus.connection import dasbus.client.proxy import dasbus.client.observer from gi.repository import GLib -from staslib import defs, conf, gutil +from staslib import defs, conf, gutil, iputil def _txt2dict(txt: list): @@ -54,6 +54,141 @@ def _proto2trans(protocol): return None +def mk_service_key(interface, protocol, name, stype, domain): + '''Return a tuple used as a service key (unique identifier)''' + return (interface, protocol, name, stype, domain) + + +def fmt_service_str(interface, protocol, name, stype, domain, flags): # pylint: disable=too-many-arguments + '''Return service identifier as a string''' + return ( + f'interface={interface}:{(socket.if_indextoname(interface) + ","):<9} ' + f'protocol={Avahi.protocol_as_string(protocol)}, ' + f'stype={stype}, ' + f'domain={domain}, ' + f'flags={flags}:{(Avahi.result_flags_as_string(flags) + ","):<12} ' + f'name={name}' + ) + + +# ****************************************************************************** +class Service: # pylint: disable=too-many-instance-attributes + '''Object used to keep track of the services discovered from the avahi-daemon''' + + interface_name = property(lambda self: self._interface_name) + interface = property(lambda self: self._interface_id) + ip_family = property(lambda self: self._ip_family) + reachable = property(lambda self: self._reachable) + protocol = property(lambda self: self._protocol_id) + key_str = property(lambda self: self._key_str) + domain = property(lambda self: self._domain) + stype = property(lambda self: self._stype) + data = property(lambda self: self._data) + name = property(lambda self: self._name) + key = property(lambda self: self._key) + ip = property(lambda self: self._ip) + + def __init__(self, args, identified_cback): + self._identified_cback = identified_cback + self._interface_id = args[0] + self._protocol_id = args[1] + self._name = args[2] + self._stype = args[3] + self._domain = args[4] + self._flags = args[5] + self._ip_family = 4 if self._protocol_id == Avahi.PROTO_INET else 6 + + self._interface_name = socket.if_indextoname(self._interface_id).strip() + self._protocol_name = Avahi.protocol_as_string(self._protocol_id) + self._flags_str = '(' + Avahi.result_flags_as_string(self._flags) + '),' + + self._key = mk_service_key(self._interface_id, self._protocol_id, self._name, self._stype, self._domain) + self._key_str = f'({self._interface_name}, {self._protocol_name}, {self._name}.{self._domain}, {self._stype})' + + self._id = fmt_service_str( + self._interface_id, self._protocol_id, self._name, self._stype, self._domain, self._flags + ) + + self._ip = None + self._resolver = None + self._data = {} + self._reachable = False + self._connect_checker = None + + def info(self): + '''Return debug info''' + info = self._data + info['reachable'] = str(self._reachable) + return info + + def __str__(self): + return self._id + + def set_identity(self, transport, address, port, txt): # pylint: disable=too-many-arguments + '''Complete identification and check connectivity (if needed) + Return True if identification is complete. Return False if + we need to check connectivity. + ''' + traddr = address.strip() + trsvcid = str(port).strip() + # host-iface permitted for tcp alone and not rdma + host_iface = self._interface_name if transport == 'tcp' else '' + self._data = { + 'transport': transport, + 'traddr': traddr, + 'trsvcid': trsvcid, + # host-iface permitted for tcp alone and not rdma + 'host-iface': host_iface, + 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip() + if conf.NvmeOptions().discovery_supp + else defs.WELL_KNOWN_DISC_NQN, + } + + self._ip = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True) + + if transport != 'tcp': + self._reachable = True + self._identified_cback() + return + + self._reachable = False + connect_checker = gutil.TcpChecker(traddr, trsvcid, host_iface, self._tcp_connect_check_cback) + + try: + connect_checker.connect() + except RuntimeError as err: + logging.error('Unable to verify connectivity: %s', err) + connect_checker.close() + connect_checker = None + + self._connect_checker = connect_checker + + def _tcp_connect_check_cback(self, connected): + if self._connect_checker is not None: + self._connect_checker.close() + self._connect_checker = None + self._reachable = connected + self._identified_cback() + + def set_resolver(self, resolver): + '''Set the resolver object''' + self._resolver = resolver + + def close(self): + '''Close this object and release all resources''' + if self._connect_checker is not None: + self._connect_checker.close() + self._connect_checker = None + + if self._resolver is not None: + try: + self._resolver.Free() + dasbus.client.proxy.disconnect_proxy(self._resolver) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Service.close() - Failed to Free() resolver. %s', ex) + self._resolver = None + + # ****************************************************************************** class Avahi: # pylint: disable=too-many-instance-attributes '''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi @@ -182,16 +317,10 @@ class Avahi: # pylint: disable=too-many-instance-attributes def info(self) -> dict: '''@brief return debug info about this object''' - services = dict() - for service, obj in self._services.items(): - interface, protocol, name, stype, domain = service - key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})' - services[key] = obj.get('data', {}) - info = { 'avahi wake up timer': str(self._kick_avahi_tmr), 'service types': list(self._stypes), - 'services': services, + 'services': {service.key_str: service.info() for service in self._services.values()}, } return info @@ -217,7 +346,7 @@ class Avahi: # pylint: disable=too-many-instance-attributes [...] ] ''' - return [service['data'] for service in self._services.values() if len(service['data'])] + return [service.data for service in self._services.values() if service.reachable] def config_stypes(self, stypes: list): '''@brief Configure the service types that we want to discover. @@ -234,18 +363,17 @@ class Avahi: # pylint: disable=too-many-instance-attributes ''' self._kick_avahi_tmr.clear() + def _remove_service(self, service_to_rm: typing.Tuple[int, int, str, str, str]): + service = self._services.pop(service_to_rm) + if service is not None: + service.close() + def _disconnect(self): logging.debug('Avahi._disconnect()') for service in self._services.values(): - resolver = service.pop('resolver', None) - if resolver is not None: - try: - resolver.Free() - dasbus.client.proxy.disconnect_proxy(resolver) - except (AttributeError, dasbus.error.DBusError) as ex: - logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex) + service.close() - self._services = dict() + self._services.clear() for browser in self._service_browsers.values(): try: @@ -296,15 +424,9 @@ class Avahi: # pylint: disable=too-many-instance-attributes logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex) # Find the cached services corresponding to stype_to_rm and remove them - services_to_rm = [service for service in self._services if service[3] == stype_to_rm] - for service in services_to_rm: - resolver = self._services.pop(service, {}).pop('resolver', None) - if resolver is not None: - try: - resolver.Free() - dasbus.client.proxy.disconnect_proxy(resolver) - except (AttributeError, dasbus.error.DBusError) as ex: - logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex) + services_to_rm = [service.key for service in self._services.values() if service.stype == stype_to_rm] + for service_to_rm in services_to_rm: + self._remove_service(service_to_rm) for stype in stypes_to_add: try: @@ -329,31 +451,25 @@ class Avahi: # pylint: disable=too-many-instance-attributes args: typing.Tuple[int, int, str, str, str, int], *_user_data, ): - (interface, protocol, name, stype, domain, flags) = args - logging.debug( - 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s', - interface, - socket.if_indextoname(interface), - Avahi.protocol_as_string(protocol), - stype, - domain, - flags, - '(' + Avahi.result_flags_as_string(flags) + '),', - name, - ) + service = Service(args, self._change_cb) + logging.debug('Avahi._service_discovered() - %s', service) - service = (interface, protocol, name, stype, domain) - if service not in self._services: + if service.key not in self._services: try: obj_path = self._avahi.ServiceResolverNew( - interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST + service.interface, + service.protocol, + service.name, + service.stype, + service.domain, + Avahi.PROTO_UNSPEC, + Avahi.LOOKUP_USE_MULTICAST, ) - self._services[service] = { - 'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path), - 'data': {}, - } + service.set_resolver(self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path)) except dasbus.error.DBusError as ex: - logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex) + logging.warning('Failed to create resolver - %s: %s', service, ex) + + self._services[service.key] = service def _service_removed( self, @@ -367,27 +483,14 @@ class Avahi: # pylint: disable=too-many-instance-attributes ): (interface, protocol, name, stype, domain, flags) = args logging.debug( - 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s', - interface, - socket.if_indextoname(interface), - Avahi.protocol_as_string(protocol), - stype, - domain, - flags, - '(' + Avahi.result_flags_as_string(flags) + '),', - name, + 'Avahi._service_removed() - %s', + fmt_service_str(interface, protocol, name, stype, domain, flags), ) - service = (interface, protocol, name, stype, domain) - resolver = self._services.pop(service, {}).pop('resolver', None) - if resolver is not None: - try: - resolver.Free() - dasbus.client.proxy.disconnect_proxy(resolver) - except (AttributeError, dasbus.error.DBusError) as ex: - logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex) - - self._change_cb() + service_key = mk_service_key(interface, protocol, name, stype, domain) + self._remove_service(service_key) + if self._change_cb is not None: + self._change_cb() def _service_identified( # pylint: disable=too-many-locals self, @@ -402,38 +505,21 @@ class Avahi: # pylint: disable=too-many-instance-attributes (interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args txt = _txt2dict(txt) logging.debug( - 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s', - interface, - socket.if_indextoname(interface), - Avahi.protocol_as_string(protocol), - stype, - domain, - flags, - '(' + Avahi.result_flags_as_string(flags) + '),', - name, + 'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s', + fmt_service_str(interface, protocol, name, stype, domain, flags), host, Avahi.protocol_as_string(aprotocol), - address, port, + address, txt, ) - service = (interface, protocol, name, stype, domain) - if service in self._services: + service_key = mk_service_key(interface, protocol, name, stype, domain) + service = self._services.get(service_key, None) + if service is not None: transport = _proto2trans(txt.get('p')) if transport is not None: - self._services[service]['data'] = { - 'transport': transport, - 'traddr': address.strip(), - 'trsvcid': str(port).strip(), - # host-iface permitted for tcp alone and not rdma - 'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '', - 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip() - if conf.NvmeOptions().discovery_supp - else defs.WELL_KNOWN_DISC_NQN, - } - - self._change_cb() + service.set_identity(transport, address, port, txt) else: logging.error( 'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s', @@ -442,6 +528,8 @@ class Avahi: # pylint: disable=too-many-instance-attributes txt, ) + self._check_for_duplicate_ips() + def _failure_handler( # pylint: disable=no-self-use self, _connection, @@ -456,3 +544,15 @@ class Avahi: # pylint: disable=too-many-instance-attributes if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error: # ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal. logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error) + + def _check_for_duplicate_ips(self): + '''This is to identify misconfigured networks where the + same IP addresses are discovered on two or more interfaces.''' + ips = {} + for service in self._services.values(): + if service.ip is not None: + ips.setdefault(service.ip.compressed, []).append(service.interface_name) + + for ip, ifaces in ips.items(): + if len(ifaces) > 1: + logging.error('IP address %s was found on multiple interfaces: %s', ip, ','.join(ifaces)) diff --git a/staslib/ctrl.py b/staslib/ctrl.py index ad221e0..9f415b3 100644 --- a/staslib/ctrl.py +++ b/staslib/ctrl.py @@ -169,9 +169,9 @@ class Controller(stas.ControllerABC): # pylint: disable=too-many-instance-attri self._udev.unregister_for_device_events(self._on_udev_notification) self._kill_ops() # Kill all pending operations self._ctrl = None - - # Defer removal of this object to the next main loop's idle period. - GLib.idle_add(self._serv.remove_controller, self, True) + self._device = None + self._connect_attempts = 0 + self._retry_connect_tmr.start() def _get_cfg(self): '''Get configuration parameters. These may either come from the [Global] diff --git a/staslib/gutil.py b/staslib/gutil.py index 836674a..1730ac0 100644 --- a/staslib/gutil.py +++ b/staslib/gutil.py @@ -11,6 +11,7 @@ access to GLib/Gio/Gobject resources. ''' import logging +import socket from gi.repository import Gio, GLib, GObject from staslib import conf, iputil, trid @@ -416,3 +417,100 @@ class Deferred: if self.is_scheduled(): self._source.destroy() self._source = None + + +# ****************************************************************************** +class TcpChecker: # pylint: disable=too-many-instance-attributes + '''@brief Verify that a TCP connection can be established with an enpoint''' + + def __init__(self, traddr, trsvcid, host_iface, user_cback, *user_data): + self._user_cback = user_cback + self._host_iface = host_iface + self._user_data = user_data + self._trsvcid = trsvcid + self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True) + self._cancellable = None + self._gio_sock = None + self._native_sock = None + + def connect(self): + '''Attempt to connect''' + self.close() + + # Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE + # we need to use a generic socket.socket() and then convert to a + # Gio.Socket() object to perform async connect operation within + # the GLib context. + family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6 + self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP) + if isinstance(self._host_iface, str): + self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8')) + + # Convert socket.socket() to a Gio.Socket() object + try: + self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error + except GLib.Error as err: + logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member + self._gio_sock = None + + if self._gio_sock is None: + self._native_sock.close() + raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}') + + g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid)) + + self._cancellable = Gio.Cancellable() + + g_sockconn = self._gio_sock.connection_factory_create_connection() + g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback) + + def close(self): + '''Terminate/Cancel current connection attempt and free resources''' + if self._cancellable is not None: + self._cancellable.cancel() + self._cancellable = None + + if self._gio_sock is not None: + try: + self._gio_sock.close() + except GLib.Error as err: + logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member + + self._gio_sock = None + + if self._native_sock is not None: + try: + # This is expected to fail because the socket + # is already closed by self._gio_sock.close() above. + # This code is just for completeness. + self._native_sock.close() + except OSError: + pass + + self._native_sock = None + + def _connect_async_cback(self, source_object, result): + ''' + @param source_object: The Gio.SocketConnection object used to + invoke the connect_async() API. + ''' + try: + connected = source_object.connect_finish(result) + except GLib.Error as err: + connected = False + # We don't need to report "cancellation" errors. + if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED): + logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member + else: + logging.info( + 'Unable to verify TCP connectivity - (%-10s %-14s %s): %s', + self._host_iface + ',', + self._traddr.compressed + ',', + self._trsvcid, + err.message, # pylint: disable=no-member + ) + + self.close() + + if self._user_cback is not None: + self._user_cback(connected, *self._user_data) |