summaryrefslogtreecommitdiffstats
path: root/staslib/avahi.py
diff options
context:
space:
mode:
Diffstat (limited to 'staslib/avahi.py')
-rw-r--r--staslib/avahi.py456
1 files changed, 456 insertions, 0 deletions
diff --git a/staslib/avahi.py b/staslib/avahi.py
new file mode 100644
index 0000000..c8a3a0b
--- /dev/null
+++ b/staslib/avahi.py
@@ -0,0 +1,456 @@
+# Copyright (c) 2021, Dell Inc. or its subsidiaries. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+# See the LICENSE file for details.
+#
+# This file is part of NVMe STorage Appliance Services (nvme-stas).
+#
+# Authors: Martin Belanger <Martin.Belanger@dell.com>
+#
+''' Module that provides a way to retrieve discovered
+ services from the Avahi daemon over D-Bus.
+'''
+import socket
+import typing
+import logging
+import functools
+import dasbus.error
+import dasbus.connection
+import dasbus.client.proxy
+import dasbus.client.observer
+from gi.repository import GLib
+from staslib import defs, conf, gutil
+
+
+def _txt2dict(txt: list):
+ '''@param txt: A list of list of integers. The integers are the ASCII value
+ of printable text characters.
+ '''
+ the_dict = dict()
+ for list_of_chars in txt:
+ try:
+ string = functools.reduce(lambda accumulator, c: accumulator + chr(c), list_of_chars, '')
+ key, val = string.split("=")
+ the_dict[key.lower()] = val
+ except Exception: # pylint: disable=broad-except
+ pass
+
+ return the_dict
+
+
+def _proto2trans(protocol):
+ '''Return the matching transport for the given protocol.'''
+ if protocol is None:
+ return None
+
+ protocol = protocol.strip().lower()
+ if protocol == 'tcp':
+ return 'tcp'
+
+ if protocol in ('roce', 'iwarp', 'rdma'):
+ return 'rdma'
+
+ return None
+
+
+# ******************************************************************************
+class Avahi: # pylint: disable=too-many-instance-attributes
+ '''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
+ daemon and register to be notified when services of a certain
+ type (stype) are discovered or lost.
+ '''
+
+ DBUS_NAME = 'org.freedesktop.Avahi'
+ DBUS_INTERFACE_SERVICE_BROWSER = DBUS_NAME + '.ServiceBrowser'
+ DBUS_INTERFACE_SERVICE_RESOLVER = DBUS_NAME + '.ServiceResolver'
+ LOOKUP_USE_MULTICAST = 2
+
+ IF_UNSPEC = -1
+ PROTO_INET = 0
+ PROTO_INET6 = 1
+ PROTO_UNSPEC = -1
+
+ LOOKUP_RESULT_LOCAL = 8 # This record/service resides on and was announced by the local host
+ LOOKUP_RESULT_CACHED = 1 # This response originates from the cache
+ LOOKUP_RESULT_STATIC = 32 # The returned data has been defined statically by some configuration option
+ LOOKUP_RESULT_OUR_OWN = 16 # This service belongs to the same local client as the browser object
+ LOOKUP_RESULT_WIDE_AREA = 2 # This response originates from wide area DNS
+ LOOKUP_RESULT_MULTICAST = 4 # This response originates from multicast DNS
+
+ result_flags = {
+ LOOKUP_RESULT_LOCAL: 'local',
+ LOOKUP_RESULT_CACHED: 'cache',
+ LOOKUP_RESULT_STATIC: 'static',
+ LOOKUP_RESULT_OUR_OWN: 'own',
+ LOOKUP_RESULT_WIDE_AREA: 'wan',
+ LOOKUP_RESULT_MULTICAST: 'mcast',
+ }
+
+ protos = {PROTO_INET: 'IPv4', PROTO_INET6: 'IPv6', PROTO_UNSPEC: 'uspecified'}
+
+ @classmethod
+ def result_flags_as_string(cls, flags):
+ '''Convert flags to human-readable string'''
+ return '+'.join((value for flag, value in Avahi.result_flags.items() if (flags & flag) != 0))
+
+ @classmethod
+ def protocol_as_string(cls, proto):
+ '''Convert protocol codes to human-readable strings'''
+ return Avahi.protos.get(proto, 'unknown')
+
+ # ==========================================================================
+ def __init__(self, sysbus, change_cb):
+ self._change_cb = change_cb
+ self._services = dict()
+ self._sysbus = sysbus
+ self._stypes = set()
+ self._service_browsers = dict()
+
+ # Avahi is an on-demand service. If, for some reason, the avahi-daemon
+ # were to stop, we need to try to contact it for it to restart. For
+ # example, when installing the avahi-daemon package on a running system,
+ # the daemon doesn't get started right away. It needs another process to
+ # access it over D-Bus to wake it up. The following timer is used to
+ # periodically query the avahi-daemon until we successfully establish
+ # first contact.
+ self._kick_avahi_tmr = gutil.GTimer(60, self._on_kick_avahi)
+
+ # Subscribe for Avahi signals (i.e. events). This must be done before
+ # any Browser or Resolver is created to avoid race conditions and
+ # missed events.
+ self._subscriptions = [
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME,
+ Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
+ 'ItemNew',
+ None,
+ None,
+ 0,
+ self._service_discovered,
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME,
+ Avahi.DBUS_INTERFACE_SERVICE_BROWSER,
+ 'ItemRemove',
+ None,
+ None,
+ 0,
+ self._service_removed,
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_BROWSER, 'Failure', None, None, 0, self._failure_handler
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Found', None, None, 0, self._service_identified
+ ),
+ self._sysbus.connection.signal_subscribe(
+ Avahi.DBUS_NAME, Avahi.DBUS_INTERFACE_SERVICE_RESOLVER, 'Failure', None, None, 0, self._failure_handler
+ ),
+ ]
+
+ self._avahi = self._sysbus.get_proxy(Avahi.DBUS_NAME, '/')
+
+ self._avahi_watcher = dasbus.client.observer.DBusObserver(self._sysbus, Avahi.DBUS_NAME)
+ self._avahi_watcher.service_available.connect(self._avahi_available)
+ self._avahi_watcher.service_unavailable.connect(self._avahi_unavailable)
+ self._avahi_watcher.connect_once_available()
+
+ def kill(self):
+ '''@brief Clean up object'''
+ logging.debug('Avahi.kill()')
+
+ self._kick_avahi_tmr.kill()
+ self._kick_avahi_tmr = None
+
+ for subscription in self._subscriptions:
+ self._sysbus.connection.signal_unsubscribe(subscription)
+ self._subscriptions = list()
+
+ self._disconnect()
+
+ self._avahi_watcher.service_available.disconnect()
+ self._avahi_watcher.service_unavailable.disconnect()
+ self._avahi_watcher.disconnect()
+ self._avahi_watcher = None
+
+ dasbus.client.proxy.disconnect_proxy(self._avahi)
+ self._avahi = None
+
+ self._change_cb = None
+ self._sysbus = None
+
+ def info(self) -> dict:
+ '''@brief return debug info about this object'''
+ services = dict()
+ for service, obj in self._services.items():
+ interface, protocol, name, stype, domain = service
+ key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})'
+ services[key] = obj.get('data', {})
+
+ info = {
+ 'avahi wake up timer': str(self._kick_avahi_tmr),
+ 'service types': list(self._stypes),
+ 'services': services,
+ }
+
+ return info
+
+ def get_controllers(self) -> list:
+ '''@brief Get the discovery controllers as a list of dict()
+ as follows:
+ [
+ {
+ 'transport': tcp,
+ 'traddr': str(),
+ 'trsvcid': str(),
+ 'host-iface': str(),
+ 'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
+ },
+ {
+ 'transport': tcp,
+ 'traddr': str(),
+ 'trsvcid': str(),
+ 'host-iface': str(),
+ 'subsysnqn': 'nqn.2014-08.org.nvmexpress.discovery',
+ },
+ [...]
+ ]
+ '''
+ return [service['data'] for service in self._services.values() if len(service['data'])]
+
+ def config_stypes(self, stypes: list):
+ '''@brief Configure the service types that we want to discover.
+ @param stypes: A list of services types, e.g. ['_nvme-disc._tcp']
+ '''
+ self._stypes = set(stypes)
+ success = self._configure_browsers()
+ if not success:
+ self._kick_avahi_tmr.start()
+
+ def kick_start(self):
+ '''@brief We use this to kick start the Avahi
+ daemon (i.e. socket activation).
+ '''
+ self._kick_avahi_tmr.clear()
+
+ def _disconnect(self):
+ logging.debug('Avahi._disconnect()')
+ for service in self._services.values():
+ resolver = service.pop('resolver', None)
+ if resolver is not None:
+ try:
+ resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex)
+
+ self._services = dict()
+
+ for browser in self._service_browsers.values():
+ try:
+ browser.Free()
+ dasbus.client.proxy.disconnect_proxy(browser)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._disconnect() - Failed to Free() browser. %s', ex)
+
+ self._service_browsers = dict()
+
+ def _on_kick_avahi(self):
+ try:
+ # try to contact avahi-daemon. This is just a wake
+ # up call in case the avahi-daemon was sleeping.
+ self._avahi.GetVersionString()
+ except dasbus.error.DBusError:
+ return GLib.SOURCE_CONTINUE
+
+ return GLib.SOURCE_REMOVE
+
+ def _avahi_available(self, _avahi_watcher):
+ '''@brief Hook up DBus signal handlers for signals from stafd.'''
+ logging.info('avahi-daemon service available, zeroconf supported.')
+ success = self._configure_browsers()
+ if not success:
+ self._kick_avahi_tmr.start()
+
+ def _avahi_unavailable(self, _avahi_watcher):
+ self._disconnect()
+ logging.warning('avahi-daemon not available, zeroconf not supported.')
+ self._kick_avahi_tmr.start()
+
+ def _configure_browsers(self):
+ stypes_cur = set(self._service_browsers.keys())
+ stypes_to_add = self._stypes - stypes_cur
+ stypes_to_rm = stypes_cur - self._stypes
+
+ logging.debug('Avahi._configure_browsers() - stypes_to_rm = %s', list(stypes_to_rm))
+ logging.debug('Avahi._configure_browsers() - stypes_to_add = %s', list(stypes_to_add))
+
+ for stype_to_rm in stypes_to_rm:
+ browser = self._service_browsers.pop(stype_to_rm, None)
+ if browser is not None:
+ try:
+ browser.Free()
+ dasbus.client.proxy.disconnect_proxy(browser)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex)
+
+ # Find the cached services corresponding to stype_to_rm and remove them
+ services_to_rm = [service for service in self._services if service[3] == stype_to_rm]
+ for service in services_to_rm:
+ resolver = self._services.pop(service, {}).pop('resolver', None)
+ if resolver is not None:
+ try:
+ resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex)
+
+ for stype in stypes_to_add:
+ try:
+ obj_path = self._avahi.ServiceBrowserNew(
+ Avahi.IF_UNSPEC, Avahi.PROTO_UNSPEC, stype, 'local', Avahi.LOOKUP_USE_MULTICAST
+ )
+ self._service_browsers[stype] = self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path)
+ except dasbus.error.DBusError as ex:
+ logging.debug('Avahi._configure_browsers() - Failed to contact avahi-daemon. %s', ex)
+ logging.warning('avahi-daemon not available, operating w/o mDNS discovery.')
+ return False
+
+ return True
+
+ def _service_discovered(
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ _interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[int, int, str, str, str, int],
+ *_user_data,
+ ):
+ (interface, protocol, name, stype, domain, flags) = args
+ logging.debug(
+ 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
+ interface,
+ socket.if_indextoname(interface),
+ Avahi.protocol_as_string(protocol),
+ stype,
+ domain,
+ flags,
+ '(' + Avahi.result_flags_as_string(flags) + '),',
+ name,
+ )
+
+ service = (interface, protocol, name, stype, domain)
+ if service not in self._services:
+ try:
+ obj_path = self._avahi.ServiceResolverNew(
+ interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST
+ )
+ self._services[service] = {
+ 'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path),
+ 'data': {},
+ }
+ except dasbus.error.DBusError as ex:
+ logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex)
+
+ def _service_removed(
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ _interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[int, int, str, str, str, int],
+ *_user_data,
+ ):
+ (interface, protocol, name, stype, domain, flags) = args
+ logging.debug(
+ 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s',
+ interface,
+ socket.if_indextoname(interface),
+ Avahi.protocol_as_string(protocol),
+ stype,
+ domain,
+ flags,
+ '(' + Avahi.result_flags_as_string(flags) + '),',
+ name,
+ )
+
+ service = (interface, protocol, name, stype, domain)
+ resolver = self._services.pop(service, {}).pop('resolver', None)
+ if resolver is not None:
+ try:
+ resolver.Free()
+ dasbus.client.proxy.disconnect_proxy(resolver)
+ except (AttributeError, dasbus.error.DBusError) as ex:
+ logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex)
+
+ self._change_cb()
+
+ def _service_identified( # pylint: disable=too-many-locals
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ _interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[int, int, str, str, str, str, int, str, int, list, int],
+ *_user_data,
+ ):
+ (interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args
+ txt = _txt2dict(txt)
+ logging.debug(
+ 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s',
+ interface,
+ socket.if_indextoname(interface),
+ Avahi.protocol_as_string(protocol),
+ stype,
+ domain,
+ flags,
+ '(' + Avahi.result_flags_as_string(flags) + '),',
+ name,
+ host,
+ Avahi.protocol_as_string(aprotocol),
+ address,
+ port,
+ txt,
+ )
+
+ service = (interface, protocol, name, stype, domain)
+ if service in self._services:
+ transport = _proto2trans(txt.get('p'))
+ if transport is not None:
+ self._services[service]['data'] = {
+ 'transport': transport,
+ 'traddr': address.strip(),
+ 'trsvcid': str(port).strip(),
+ # host-iface permitted for tcp alone and not rdma
+ 'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '',
+ 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip()
+ if conf.NvmeOptions().discovery_supp
+ else defs.WELL_KNOWN_DISC_NQN,
+ }
+
+ self._change_cb()
+ else:
+ logging.error(
+ 'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s',
+ address,
+ socket.if_indextoname(interface).strip(),
+ txt,
+ )
+
+ def _failure_handler( # pylint: disable=no-self-use
+ self,
+ _connection,
+ _sender_name: str,
+ _object_path: str,
+ interface_name: str,
+ _signal_name: str,
+ args: typing.Tuple[str],
+ *_user_data,
+ ):
+ (error,) = args
+ if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error:
+ # ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
+ logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error)