summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--.github/workflows/docker-publish.yml4
-rw-r--r--NEWS.md3
-rw-r--r--meson.build2
-rw-r--r--staslib/avahi.py278
-rw-r--r--staslib/ctrl.py6
-rw-r--r--staslib/gutil.py98
6 files changed, 296 insertions, 95 deletions
diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml
index a702e27..a94e7b3 100644
--- a/.github/workflows/docker-publish.yml
+++ b/.github/workflows/docker-publish.yml
@@ -48,14 +48,14 @@ jobs:
# https://github.com/docker/metadata-action
- name: Extract Docker metadata
id: meta
- uses: docker/metadata-action@2c0bd771b40637d97bf205cbccdd294a32112176
+ uses: docker/metadata-action@818d4b7b91585d195f67373fd9cb0332e31a7175
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
- name: Build and push Docker image
- uses: docker/build-push-action@44ea916f6c540f9302d50c2b1e5a8dc071f15cdf
+ uses: docker/build-push-action@2eb1c1961a95fc15694676618e422e8ba1d63825
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
diff --git a/NEWS.md b/NEWS.md
index 5c3a699..4583cea 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -5,11 +5,14 @@
New features:
- Support for nBFT (NVMe-oF Boot Table).
+- The Avahi driver will now verify reachability of services discovered through mDNS to make sure all discovered IP addresses can be connected to. This avoids invoking the NVMe kernel driver with invalid IP addresses and getting error messages in the syslog.
+- The Avahi driver will now print an error message if the same IP address is found on multiple interfaces. This indicates a misconfiguration of the network.
Bug fixes:
* For TCP transport: use `sysfs` controller `src_addr` attribute when matching to a configured "candidate" controller. This is to determine when an existing controller (located under the `sysfs`) can be reused instead of creating a new one. This avoids creating unnecessary duplicate connections.
* Udev event handling: use `systemctl restart` instead of `systemctl start`. There is a small chance that a `start` operation has not completed when a new `start` is required. Issuing a `start` while a `start` is being performed has no effect. However, a `restart` will be handled properly.
+* `stafd`: Do not delete and recreate DC objects on kernel events indicating that an nvme device associated to a discovery controller was removed by the kernel. This was done to kick start the reconnect process, but was also causing the DLPE (Discovery Log Page Entries) cache to be lost. This could potentially result in `stacd` disconnecting from I/O controllers. Instead, keep the existing DC object which contains a valid DLPE cache and simply restart the "retry to connect" timer. This way the DLPE cache is maintained throughout the reconnect to DC process.
## Changes with release 2.2.3
diff --git a/meson.build b/meson.build
index df52199..1e6f864 100644
--- a/meson.build
+++ b/meson.build
@@ -9,7 +9,7 @@
project(
'nvme-stas',
meson_version: '>= 0.53.0',
- version: '2.3-rc1',
+ version: '2.3-rc2',
license: 'Apache-2.0',
default_options: [
'buildtype=release',
diff --git a/staslib/avahi.py b/staslib/avahi.py
index 26543d9..f29f89d 100644
--- a/staslib/avahi.py
+++ b/staslib/avahi.py
@@ -18,7 +18,7 @@ import dasbus.connection
import dasbus.client.proxy
import dasbus.client.observer
from gi.repository import GLib
-from staslib import defs, conf, gutil
+from staslib import defs, conf, gutil, iputil
def _txt2dict(txt: list):
@@ -54,6 +54,141 @@ def _proto2trans(protocol):
return None
+def mk_service_key(interface, protocol, name, stype, domain):
+ '''Return a tuple used as a service key (unique identifier)'''
+ return (interface, protocol, name, stype, domain)
+
+
+def fmt_service_str(interface, protocol, name, stype, domain, flags): # pylint: disable=too-many-arguments
+ '''Return service identifier as a string'''
+ return (
+ f'interface={interface}:{(socket.if_indextoname(interface) + ","):<9} '
+ f'protocol={Avahi.protocol_as_string(protocol)}, '
+ f'stype={stype}, '
+ f'domain={domain}, '
+ f'flags={flags}:{(Avahi.result_flags_as_string(flags) + ","):<12} '
+ f'name={name}'
+ )
+
+
+# ******************************************************************************
+class Service: # pylint: disable=too-many-instance-attributes
+ '''Object used to keep track of the services discovered from the avahi-daemon'''
+
+ interface_name = property(lambda self: self._interface_name)
+ interface = property(lambda self: self._interface_id)
+ ip_family = property(lambda self: self._ip_family)
+ reachable = property(lambda self: self._reachable)
+ protocol = property(lambda self: self._protocol_id)
+ key_str = property(lambda self: self._key_str)
+ domain = property(lambda self: self._domain)
+ stype = property(lambda self: self._stype)
+ data = property(lambda self: self._data)
+ name = property(lambda self: self._name)
+ key = property(lambda self: self._key)
+ ip = property(lambda self: self._ip)
+
+ def __init__(self, args, identified_cback):
+ self._identified_cback = identified_cback
+ self._interface_id = args[0]
+ self._protocol_id = args[1]
+ self._name = args[2]
+ self._stype = args[3]
+ self._domain = args[4]
+ self._flags = args[5]
+ self._ip_family = 4 if self._protocol_id == Avahi.PROTO_INET else 6
+
+ self._interface_name = socket.if_indextoname(self._interface_id).strip()
+ self._protocol_name = Avahi.protocol_as_string(self._protocol_id)
+ self._flags_str = '(' + Avahi.result_flags_as_string(self._flags) + '),'
+
+ self._key = mk_service_key(self._interface_id, self._protocol_id, self._name, self._stype, self._domain)
+ self._key_str = f'({self._interface_name}, {self._protocol_name}, {self._name}.{self._domain}, {self._stype})'
+
+ self._id = fmt_service_str(
+ self._interface_id, self._protocol_id, self._name, self._stype, self._domain, self._flags
+ )
+
+ self._ip = None
+ self._resolver = None
+ self._data = {}
+ self._reachable = False
+ self._connect_checker = None
+
+ def info(self):
+ '''Return debug info'''
+ info = self._data
+ info['reachable'] = str(self._reachable)
+ return info
+
+ def __str__(self):
+ return self._id
+
+ def set_identity(self, transport, address, port, txt): # pylint: disable=too-many-arguments
+ '''Complete identification and check connectivity (if needed)
+ Return True if identification is complete. Return False if
+ we need to check connectivity.
+ '''
+ traddr = address.strip()
+ trsvcid = str(port).strip()
+ # host-iface permitted for tcp alone and not rdma
+ host_iface = self._interface_name if transport == 'tcp' else ''
+ self._data = {
+ 'transport': transport,
+ 'traddr': traddr,
+ 'trsvcid': trsvcid,
+ # host-iface permitted for tcp alone and not rdma
+ 'host-iface': host_iface,
+ 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
+ if conf.NvmeOptions().discovery_supp
+ else defs.WELL_KNOWN_DISC_NQN,
+ }
+
+ self._ip = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
+
+ if transport != 'tcp':
+ self._reachable = True
+ self._identified_cback()
+ return
+
+ self._reachable = False
+ connect_checker = gutil.TcpChecker(traddr, trsvcid, host_iface, self._tcp_connect_check_cback)
+
+ try:
+ connect_checker.connect()
+ except RuntimeError as err:
+ logging.error('Unable to verify connectivity: %s', err)
+ connect_checker.close()
+ connect_checker = None
+
+ self._connect_checker = connect_checker
+
+ def _tcp_connect_check_cback(self, connected):
+ if self._connect_checker is not None:
+ self._connect_checker.close()
+ self._connect_checker = None
+ self._reachable = connected
+ self._identified_cback()
+
+ def set_resolver(self, resolver):
+ '''Set the resolver object'''
+ self._resolver = resolver
+
+ def close(self):
+ '''Close this object and release all resources'''
+ if self._connect_checker is not None:
+ self._connect_checker.close()
+ self._connect_checker = None
+
+ if self._resolver is not None:
+ try:
+ self._resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(self._resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Service.close() - Failed to Free() resolver. %s', ex)
+ self._resolver = None
+
+
# ******************************************************************************
class Avahi: # pylint: disable=too-many-instance-attributes
'''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
@@ -182,16 +317,10 @@ class Avahi: # pylint: disable=too-many-instance-attributes
def info(self) -> dict:
'''@brief return debug info about this object'''
- services = dict()
- for service, obj in self._services.items():
- interface, protocol, name, stype, domain = service
- key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
- services[key] = obj.get('data', {})
-
info = {
'avahi wake up timer': str(self._kick_avahi_tmr),
'service types': list(self._stypes),
- 'services': services,
+ 'services': {service.key_str: service.info() for service in self._services.values()},
}
return info
@@ -217,7 +346,7 @@ class Avahi: # pylint: disable=too-many-instance-attributes
[...]
]
'''
- return [service['data'] for service in self._services.values() if len(service['data'])]
+ return [service.data for service in self._services.values() if service.reachable]
def config_stypes(self, stypes: list):
'''@brief Configure the service types that we want to discover.
@@ -234,18 +363,17 @@ class Avahi: # pylint: disable=too-many-instance-attributes
'''
self._kick_avahi_tmr.clear()
+ def _remove_service(self, service_to_rm: typing.Tuple[int, int, str, str, str]):
+ service = self._services.pop(service_to_rm)
+ if service is not None:
+ service.close()
+
def _disconnect(self):
logging.debug('Avahi._disconnect()')
for service in self._services.values():
- resolver = service.pop('resolver', None)
- if resolver is not None:
- try:
- resolver.Free()
- dasbus.client.proxy.disconnect_proxy(resolver)
- except (AttributeError, dasbus.error.DBusError) as ex:
- logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
+ service.close()
- self._services = dict()
+ self._services.clear()
for browser in self._service_browsers.values():
try:
@@ -296,15 +424,9 @@ class Avahi: # pylint: disable=too-many-instance-attributes
logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
# Find the cached services corresponding to stype_to_rm and remove them
- services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
- for service in services_to_rm:
- resolver = self._services.pop(service, {}).pop('resolver', None)
- if resolver is not None:
- try:
- resolver.Free()
- dasbus.client.proxy.disconnect_proxy(resolver)
- except (AttributeError, dasbus.error.DBusError) as ex:
- logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
+ services_to_rm = [service.key for service in self._services.values() if service.stype == stype_to_rm]
+ for service_to_rm in services_to_rm:
+ self._remove_service(service_to_rm)
for stype in stypes_to_add:
try:
@@ -329,31 +451,25 @@ class Avahi: # pylint: disable=too-many-instance-attributes
args: typing.Tuple[int, int, str, str, str, int],
*_user_data,
):
- (interface, protocol, name, stype, domain, flags) = args
- logging.debug(
- 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
- interface,
- socket.if_indextoname(interface),
- Avahi.protocol_as_string(protocol),
- stype,
- domain,
- flags,
- '(' + Avahi.result_flags_as_string(flags) + '),',
- name,
- )
+ service = Service(args, self._change_cb)
+ logging.debug('Avahi._service_discovered() - %s', service)
- service = (interface, protocol, name, stype, domain)
- if service not in self._services:
+ if service.key not in self._services:
try:
obj_path = self._avahi.ServiceResolverNew(
- interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
+ service.interface,
+ service.protocol,
+ service.name,
+ service.stype,
+ service.domain,
+ Avahi.PROTO_UNSPEC,
+ Avahi.LOOKUP_USE_MULTICAST,
)
- self._services[service] = {
- 'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
- 'data': {},
- }
+ service.set_resolver(self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path))
except dasbus.error.DBusError as ex:
- logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
+ logging.warning('Failed to create resolver - %s: %s', service, ex)
+
+ self._services[service.key] = service
def _service_removed(
self,
@@ -367,27 +483,14 @@ class Avahi: # pylint: disable=too-many-instance-attributes
):
(interface, protocol, name, stype, domain, flags) = args
logging.debug(
- 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
- interface,
- socket.if_indextoname(interface),
- Avahi.protocol_as_string(protocol),
- stype,
- domain,
- flags,
- '(' + Avahi.result_flags_as_string(flags) + '),',
- name,
+ 'Avahi._service_removed() - %s',
+ fmt_service_str(interface, protocol, name, stype, domain, flags),
)
- service = (interface, protocol, name, stype, domain)
- resolver = self._services.pop(service, {}).pop('resolver', None)
- if resolver is not None:
- try:
- resolver.Free()
- dasbus.client.proxy.disconnect_proxy(resolver)
- except (AttributeError, dasbus.error.DBusError) as ex:
- logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
-
- self._change_cb()
+ service_key = mk_service_key(interface, protocol, name, stype, domain)
+ self._remove_service(service_key)
+ if self._change_cb is not None:
+ self._change_cb()
def _service_identified( # pylint: disable=too-many-locals
self,
@@ -402,38 +505,21 @@ class Avahi: # pylint: disable=too-many-instance-attributes
(interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
txt = _txt2dict(txt)
logging.debug(
- 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
- interface,
- socket.if_indextoname(interface),
- Avahi.protocol_as_string(protocol),
- stype,
- domain,
- flags,
- '(' + Avahi.result_flags_as_string(flags) + '),',
- name,
+ 'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s',
+ fmt_service_str(interface, protocol, name, stype, domain, flags),
host,
Avahi.protocol_as_string(aprotocol),
- address,
port,
+ address,
txt,
)
- service = (interface, protocol, name, stype, domain)
- if service in self._services:
+ service_key = mk_service_key(interface, protocol, name, stype, domain)
+ service = self._services.get(service_key, None)
+ if service is not None:
transport = _proto2trans(txt.get('p'))
if transport is not None:
- self._services[service]['data'] = {
- 'transport': transport,
- 'traddr': address.strip(),
- 'trsvcid': str(port).strip(),
- # host-iface permitted for tcp alone and not rdma
- 'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
- 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
- if conf.NvmeOptions().discovery_supp
- else defs.WELL_KNOWN_DISC_NQN,
- }
-
- self._change_cb()
+ service.set_identity(transport, address, port, txt)
else:
logging.error(
'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
@@ -442,6 +528,8 @@ class Avahi: # pylint: disable=too-many-instance-attributes
txt,
)
+ self._check_for_duplicate_ips()
+
def _failure_handler( # pylint: disable=no-self-use
self,
_connection,
@@ -456,3 +544,15 @@ class Avahi: # pylint: disable=too-many-instance-attributes
if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
# ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)
+
+ def _check_for_duplicate_ips(self):
+ '''This is to identify misconfigured networks where the
+ same IP addresses are discovered on two or more interfaces.'''
+ ips = {}
+ for service in self._services.values():
+ if service.ip is not None:
+ ips.setdefault(service.ip.compressed, []).append(service.interface_name)
+
+ for ip, ifaces in ips.items():
+ if len(ifaces) > 1:
+ logging.error('IP address %s was found on multiple interfaces: %s', ip, ','.join(ifaces))
diff --git a/staslib/ctrl.py b/staslib/ctrl.py
index ad221e0..9f415b3 100644
--- a/staslib/ctrl.py
+++ b/staslib/ctrl.py
@@ -169,9 +169,9 @@ class Controller(stas.ControllerABC): # pylint: disable=too-many-instance-attri
self._udev.unregister_for_device_events(self._on_udev_notification)
self._kill_ops() # Kill all pending operations
self._ctrl = None
-
- # Defer removal of this object to the next main loop's idle period.
- GLib.idle_add(self._serv.remove_controller, self, True)
+ self._device = None
+ self._connect_attempts = 0
+ self._retry_connect_tmr.start()
def _get_cfg(self):
'''Get configuration parameters. These may either come from the [Global]
diff --git a/staslib/gutil.py b/staslib/gutil.py
index 836674a..1730ac0 100644
--- a/staslib/gutil.py
+++ b/staslib/gutil.py
@@ -11,6 +11,7 @@ access to GLib/Gio/Gobject resources.
'''
import logging
+import socket
from gi.repository import Gio, GLib, GObject
from staslib import conf, iputil, trid
@@ -416,3 +417,100 @@ class Deferred:
if self.is_scheduled():
self._source.destroy()
self._source = None
+
+
+# ******************************************************************************
+class TcpChecker: # pylint: disable=too-many-instance-attributes
+ '''@brief Verify that a TCP connection can be established with an enpoint'''
+
+ def __init__(self, traddr, trsvcid, host_iface, user_cback, *user_data):
+ self._user_cback = user_cback
+ self._host_iface = host_iface
+ self._user_data = user_data
+ self._trsvcid = trsvcid
+ self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True)
+ self._cancellable = None
+ self._gio_sock = None
+ self._native_sock = None
+
+ def connect(self):
+ '''Attempt to connect'''
+ self.close()
+
+ # Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE
+ # we need to use a generic socket.socket() and then convert to a
+ # Gio.Socket() object to perform async connect operation within
+ # the GLib context.
+ family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6
+ self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP)
+ if isinstance(self._host_iface, str):
+ self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8'))
+
+ # Convert socket.socket() to a Gio.Socket() object
+ try:
+ self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error
+ except GLib.Error as err:
+ logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member
+ self._gio_sock = None
+
+ if self._gio_sock is None:
+ self._native_sock.close()
+ raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}')
+
+ g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid))
+
+ self._cancellable = Gio.Cancellable()
+
+ g_sockconn = self._gio_sock.connection_factory_create_connection()
+ g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback)
+
+ def close(self):
+ '''Terminate/Cancel current connection attempt and free resources'''
+ if self._cancellable is not None:
+ self._cancellable.cancel()
+ self._cancellable = None
+
+ if self._gio_sock is not None:
+ try:
+ self._gio_sock.close()
+ except GLib.Error as err:
+ logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member
+
+ self._gio_sock = None
+
+ if self._native_sock is not None:
+ try:
+ # This is expected to fail because the socket
+ # is already closed by self._gio_sock.close() above.
+ # This code is just for completeness.
+ self._native_sock.close()
+ except OSError:
+ pass
+
+ self._native_sock = None
+
+ def _connect_async_cback(self, source_object, result):
+ '''
+ @param source_object: The Gio.SocketConnection object used to
+ invoke the connect_async() API.
+ '''
+ try:
+ connected = source_object.connect_finish(result)
+ except GLib.Error as err:
+ connected = False
+ # We don't need to report "cancellation" errors.
+ if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED):
+ logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member
+ else:
+ logging.info(
+ 'Unable to verify TCP connectivity - (%-10s %-14s %s): %s',
+ self._host_iface + ',',
+ self._traddr.compressed + ',',
+ self._trsvcid,
+ err.message, # pylint: disable=no-member
+ )
+
+ self.close()
+
+ if self._user_cback is not None:
+ self._user_cback(connected, *self._user_data)