summaryrefslogtreecommitdiffstats
path: root/test/networkd-test.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xtest/networkd-test.py1094
1 files changed, 1094 insertions, 0 deletions
diff --git a/test/networkd-test.py b/test/networkd-test.py
new file mode 100755
index 0000000..8496ec8
--- /dev/null
+++ b/test/networkd-test.py
@@ -0,0 +1,1094 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: LGPL-2.1-or-later
+#
+# networkd integration test
+# This uses temporary configuration in /run and temporary veth devices, and
+# does not write anything on disk or change any system configuration;
+# but it assumes (and checks at the beginning) that networkd is not currently
+# running.
+#
+# This can be run on a normal installation, in QEMU, nspawn (with
+# --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
+# or LXC system containers. You need at least the "ip" tool from the iproute
+# package; it is recommended to install dnsmasq too to get full test coverage.
+#
+# ATTENTION: This uses the *installed* networkd, not the one from the built
+# source tree.
+#
+# © 2015 Canonical Ltd.
+# Author: Martin Pitt <martin.pitt@ubuntu.com>
+
+import errno
+import os
+import shutil
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+import unittest
+
+HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
+IS_CONTAINER = subprocess.call(['systemd-detect-virt', '--quiet', '--container']) == 0
+
+NETWORK_UNITDIR = '/run/systemd/network'
+
+NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
+ path='/usr/lib/systemd:/lib/systemd')
+
+RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
+
+tmpmounts = []
+running_units = []
+stopped_units = []
+
+
+def setUpModule():
+ global tmpmounts
+
+ """Initialize the environment, and perform sanity checks on it."""
+ if NETWORKD_WAIT_ONLINE is None:
+ raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
+
+ # Do not run any tests if the system is using networkd already and it's not virtualized
+ if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
+ subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
+ raise unittest.SkipTest('not virtualized and networkd is already active')
+
+ # Ensure we don't mess with an existing networkd config
+ for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
+ if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
+ subprocess.call(['systemctl', 'stop', u])
+ running_units.append(u)
+ else:
+ stopped_units.append(u)
+
+ # create static systemd-network user for networkd-test-router.service (it
+ # needs to do some stuff as root and can't start as user; but networkd
+ # still insists on the user)
+ if subprocess.call(['getent', 'passwd', 'systemd-network']) != 0:
+ subprocess.call(['useradd', '--system', '--no-create-home', 'systemd-network'])
+
+ for d in ['/etc/systemd/network', '/run/systemd/network',
+ '/run/systemd/netif', '/run/systemd/resolve']:
+ if os.path.isdir(d):
+ subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
+ tmpmounts.append(d)
+ if os.path.isdir('/run/systemd/resolve'):
+ os.chmod('/run/systemd/resolve', 0o755)
+ shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
+ if os.path.isdir('/run/systemd/netif'):
+ os.chmod('/run/systemd/netif', 0o755)
+ shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
+
+ # Avoid "Failed to open /dev/tty" errors in containers.
+ os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
+
+ # Ensure the unit directory exists so tests can dump files into it.
+ os.makedirs(NETWORK_UNITDIR, exist_ok=True)
+
+
+def tearDownModule():
+ global tmpmounts
+ for d in tmpmounts:
+ subprocess.check_call(["umount", d])
+ for u in stopped_units:
+ subprocess.call(["systemctl", "stop", u])
+ for u in running_units:
+ subprocess.call(["systemctl", "restart", u])
+
+
+class NetworkdTestingUtilities:
+ """Provide a set of utility functions to facilitate networkd tests.
+
+ This class must be inherited along with unittest.TestCase to define
+ some required methods.
+ """
+
+ def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
+ """Add a veth interface pair, and queue them to be removed."""
+ subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
+ list(veth_options) +
+ ['type', 'veth', 'peer', 'name', peer] +
+ list(peer_options))
+ self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
+
+ def write_config(self, path, contents):
+ """"Write a configuration file, and queue it to be removed."""
+
+ with open(path, 'w') as f:
+ f.write(contents)
+
+ self.addCleanup(os.remove, path)
+
+ def write_network(self, unit_name, contents):
+ """Write a network unit file, and queue it to be removed."""
+ self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents)
+
+ def write_network_dropin(self, unit_name, dropin_name, contents):
+ """Write a network unit drop-in, and queue it to be removed."""
+ dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
+ dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
+
+ os.makedirs(dropin_dir, exist_ok=True)
+ self.addCleanup(os.rmdir, dropin_dir)
+ with open(dropin_path, 'w') as dropin:
+ dropin.write(contents)
+ self.addCleanup(os.remove, dropin_path)
+
+ def read_attr(self, link, attribute):
+ """Read a link attributed from the sysfs."""
+ # Note we we don't want to check if interface `link' is managed, we
+ # want to evaluate link variable and pass the value of the link to
+ # assert_link_states e.g. eth0=managed.
+ self.assert_link_states(**{link:'managed'})
+ with open(os.path.join('/sys/class/net', link, attribute)) as f:
+ return f.readline().strip()
+
+ def assert_link_states(self, **kwargs):
+ """Match networkctl link states to the given ones.
+
+ Each keyword argument should be the name of a network interface
+ with its expected value of the "SETUP" column in output from
+ networkctl. The interfaces have five seconds to come online
+ before the check is performed. Every specified interface must
+ be present in the output, and any other interfaces found in the
+ output are ignored.
+
+ A special interface state "managed" is supported, which matches
+ any value in the "SETUP" column other than "unmanaged".
+ """
+ if not kwargs:
+ return
+ interfaces = set(kwargs)
+
+ # Wait for the requested interfaces, but don't fail for them.
+ subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
+ ['--interface={}'.format(iface) for iface in kwargs])
+
+ # Validate each link state found in the networkctl output.
+ out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
+ for line in out.decode('utf-8').split('\n'):
+ fields = line.split()
+ if len(fields) >= 5 and fields[1] in kwargs:
+ iface = fields[1]
+ expected = kwargs[iface]
+ actual = fields[-1]
+ if (actual != expected and
+ not (expected == 'managed' and actual != 'unmanaged')):
+ self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
+ interfaces.remove(iface)
+
+ # Ensure that all requested interfaces have been covered.
+ if interfaces:
+ self.fail("Missing links in status output: {}".format(interfaces))
+
+
+class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
+ """Provide common methods for testing networkd against servers."""
+
+ def setUp(self):
+ self.write_network('port1.netdev', '''\
+[NetDev]
+Name=port1
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+ self.write_network('port2.netdev', '''\
+[NetDev]
+Name=port2
+Kind=dummy
+MACAddress=12:34:56:78:9a:bd''')
+ self.write_network('mybridge.netdev', '''\
+[NetDev]
+Name=mybridge
+Kind=bridge''')
+ self.write_network('port1.network', '''\
+[Match]
+Name=port1
+[Network]
+Bridge=mybridge''')
+ self.write_network('port2.network', '''\
+[Match]
+Name=port2
+[Network]
+Bridge=mybridge''')
+ self.write_network('mybridge.network', '''\
+[Match]
+Name=mybridge
+[Network]
+DNS=192.168.250.1
+Address=192.168.250.33/24
+Gateway=192.168.250.1''')
+ subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+
+ def tearDown(self):
+ subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.socket'])
+ subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.service'])
+ subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
+ subprocess.check_call(['ip', 'link', 'del', 'port1'])
+ subprocess.check_call(['ip', 'link', 'del', 'port2'])
+
+ def test_bridge_init(self):
+ self.assert_link_states(
+ port1='managed',
+ port2='managed',
+ mybridge='managed')
+
+ def test_bridge_port_priority(self):
+ self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
+ self.write_network_dropin('port1.network', 'priority', '''\
+[Bridge]
+Priority=28
+''')
+ subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+ self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
+
+ def test_bridge_port_priority_set_zero(self):
+ """It should be possible to set the bridge port priority to 0"""
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
+ self.write_network_dropin('port2.network', 'priority', '''\
+[Bridge]
+Priority=0
+''')
+ subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
+
+ def test_bridge_port_property(self):
+ """Test the "[Bridge]" section keys"""
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
+ self.write_network_dropin('port2.network', 'property', '''\
+[Bridge]
+UnicastFlood=true
+HairPin=true
+UseBPDU=true
+FastLeave=true
+AllowPortToBeRoot=true
+Cost=555
+Priority=23
+''')
+ subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
+
+ self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
+ self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
+ self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
+ self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
+
+class ClientTestBase(NetworkdTestingUtilities):
+ """Provide common methods for testing networkd against servers."""
+
+ @classmethod
+ def setUpClass(klass):
+ klass.orig_log_level = subprocess.check_output(
+ ['systemctl', 'show', '--value', '--property', 'LogLevel'],
+ universal_newlines=True).strip()
+ subprocess.check_call(['systemd-analyze', 'log-level', 'debug'])
+
+ @classmethod
+ def tearDownClass(klass):
+ subprocess.check_call(['systemd-analyze', 'log-level', klass.orig_log_level])
+
+ def setUp(self):
+ self.iface = 'test_eth42'
+ self.if_router = 'router_eth42'
+ self.workdir_obj = tempfile.TemporaryDirectory()
+ self.workdir = self.workdir_obj.name
+ self.config = 'test_eth42.network'
+
+ # get current journal cursor
+ subprocess.check_output(['journalctl', '--sync'])
+ out = subprocess.check_output(['journalctl', '-b', '--quiet',
+ '--no-pager', '-n0', '--show-cursor'],
+ universal_newlines=True)
+ self.assertTrue(out.startswith('-- cursor:'))
+ self.journal_cursor = out.split()[-1]
+
+ subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
+
+ def tearDown(self):
+ self.shutdown_iface()
+ subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
+ subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
+ subprocess.call(['ip', 'link', 'del', 'dummy0'],
+ stderr=subprocess.DEVNULL)
+
+ def show_journal(self, unit):
+ '''Show journal of given unit since start of the test'''
+
+ print('---- {} ----'.format(unit))
+ subprocess.check_output(['journalctl', '--sync'])
+ sys.stdout.flush()
+ subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
+ '--cursor', self.journal_cursor, '-u', unit])
+
+ def create_iface(self, ipv6=False):
+ '''Create test interface with DHCP server behind it'''
+
+ raise NotImplementedError('must be implemented by a subclass')
+
+ def shutdown_iface(self):
+ '''Remove test interface and stop DHCP server'''
+
+ raise NotImplementedError('must be implemented by a subclass')
+
+ def print_server_log(self):
+ '''Print DHCP server log for debugging failures'''
+
+ raise NotImplementedError('must be implemented by a subclass')
+
+ def start_unit(self, unit):
+ try:
+ subprocess.check_call(['systemctl', 'start', unit])
+ except subprocess.CalledProcessError:
+ self.show_journal(unit)
+ raise
+
+ def do_test(self, coldplug=True, ipv6=False, extra_opts='',
+ online_timeout=10, dhcp_mode='yes'):
+ self.start_unit('systemd-resolved')
+ self.write_network(self.config, '''\
+[Match]
+Name={}
+[Network]
+DHCP={}
+{}'''.format(self.iface, dhcp_mode, extra_opts))
+
+ if coldplug:
+ # create interface first, then start networkd
+ self.create_iface(ipv6=ipv6)
+ self.start_unit('systemd-networkd')
+ elif coldplug is not None:
+ # start networkd first, then create interface
+ self.start_unit('systemd-networkd')
+ self.create_iface(ipv6=ipv6)
+ else:
+ # "None" means test sets up interface by itself
+ self.start_unit('systemd-networkd')
+
+ try:
+ subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
+ self.iface, '--timeout=%i' % online_timeout])
+
+ if ipv6:
+ # check iface state and IP 6 address; FIXME: we need to wait a bit
+ # longer, as the iface is "configured" already with IPv4 *or*
+ # IPv6, but we want to wait for both
+ for _ in range(10):
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
+ if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out and b'tentative' not in out:
+ break
+ time.sleep(1)
+ else:
+ self.fail('timed out waiting for IPv6 configuration')
+
+ self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
+ self.assertRegex(out, b'inet6 fe80::.* scope link')
+ else:
+ # should have link-local address on IPv6 only
+ out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
+ self.assertRegex(out, br'inet6 fe80::.* scope link')
+ self.assertNotIn(b'scope global', out)
+
+ # should have IPv4 address
+ out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
+ self.assertIn(b'state UP', out)
+ self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
+
+ # check networkctl state
+ out = subprocess.check_output(['networkctl'])
+ self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
+ self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
+
+ out = subprocess.check_output(['networkctl', '-n', '0', 'status', self.iface])
+ self.assertRegex(out, br'Type:\s+ether')
+ self.assertRegex(out, br'State:\s+routable.*configured')
+ self.assertRegex(out, br'Address:\s+192.168.5.\d+')
+ if ipv6:
+ self.assertRegex(out, br'2600::')
+ else:
+ self.assertNotIn(br'2600::', out)
+ self.assertRegex(out, br'fe80::')
+ self.assertRegex(out, br'Gateway:\s+192.168.5.1')
+ self.assertRegex(out, br'DNS:\s+192.168.5.1')
+ except (AssertionError, subprocess.CalledProcessError):
+ # show networkd status, journal, and DHCP server log on failure
+ with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
+ print('\n---- {} ----\n{}'.format(self.config, f.read()))
+ print('---- interface status ----')
+ sys.stdout.flush()
+ subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
+ print('---- networkctl status {} ----'.format(self.iface))
+ sys.stdout.flush()
+ rc = subprocess.call(['networkctl', '-n', '0', 'status', self.iface])
+ if rc != 0:
+ print("'networkctl status' exited with an unexpected code {}".format(rc))
+ self.show_journal('systemd-networkd.service')
+ self.print_server_log()
+ raise
+
+ for timeout in range(50):
+ with open(RESOLV_CONF) as f:
+ contents = f.read()
+ if 'nameserver 192.168.5.1\n' in contents:
+ break
+ time.sleep(0.1)
+ else:
+ self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
+
+ if coldplug is False:
+ # check post-down.d hook
+ self.shutdown_iface()
+
+ def test_coldplug_dhcp_yes_ip4(self):
+ # we have a 12s timeout on RA, so we need to wait longer
+ self.do_test(coldplug=True, ipv6=False, online_timeout=15)
+
+ def test_coldplug_dhcp_yes_ip4_no_ra(self):
+ # with disabling RA explicitly things should be fast
+ self.do_test(coldplug=True, ipv6=False,
+ extra_opts='IPv6AcceptRA=False')
+
+ def test_coldplug_dhcp_ip4_only(self):
+ # we have a 12s timeout on RA, so we need to wait longer
+ self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
+ online_timeout=15)
+
+ def test_coldplug_dhcp_ip4_only_no_ra(self):
+ # with disabling RA explicitly things should be fast
+ self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
+ extra_opts='IPv6AcceptRA=False')
+
+ def test_coldplug_dhcp_ip6(self):
+ self.do_test(coldplug=True, ipv6=True)
+
+ def test_hotplug_dhcp_ip4(self):
+ # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
+ self.do_test(coldplug=False, ipv6=False, online_timeout=15)
+
+ def test_hotplug_dhcp_ip6(self):
+ self.do_test(coldplug=False, ipv6=True)
+
+ def test_route_only_dns(self):
+ self.write_network('myvpn.netdev', '''\
+[NetDev]
+Name=dummy0
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+ self.write_network('myvpn.network', '''\
+[Match]
+Name=dummy0
+[Network]
+Address=192.168.42.100/24
+DNS=192.168.42.1
+Domains= ~company''')
+
+ try:
+ self.do_test(coldplug=True, ipv6=False,
+ extra_opts='IPv6AcceptRouterAdvertisements=False')
+ except subprocess.CalledProcessError as e:
+ # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
+ if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
+ raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
+ else:
+ raise
+
+ with open(RESOLV_CONF) as f:
+ contents = f.read()
+ # ~company is not a search domain, only a routing domain
+ self.assertNotRegex(contents, 'search.*company')
+ # our global server should appear
+ self.assertIn('nameserver 192.168.5.1\n', contents)
+ # should not have domain-restricted server as global server
+ self.assertNotIn('nameserver 192.168.42.1\n', contents)
+
+ def test_route_only_dns_all_domains(self):
+ self.write_network('myvpn.netdev', '''[NetDev]
+Name=dummy0
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+ self.write_network('myvpn.network', '''[Match]
+Name=dummy0
+[Network]
+Address=192.168.42.100/24
+DNS=192.168.42.1
+Domains= ~company ~.''')
+
+ try:
+ self.do_test(coldplug=True, ipv6=False,
+ extra_opts='IPv6AcceptRouterAdvertisements=False')
+ except subprocess.CalledProcessError as e:
+ # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
+ if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
+ raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
+ else:
+ raise
+
+ with open(RESOLV_CONF) as f:
+ contents = f.read()
+
+ # ~company is not a search domain, only a routing domain
+ self.assertNotRegex(contents, 'search.*company')
+
+ # our global server should appear
+ self.assertIn('nameserver 192.168.5.1\n', contents)
+ # should have company server as global server due to ~.
+ self.assertIn('nameserver 192.168.42.1\n', contents)
+
+
+@unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
+class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
+ '''Test networkd client against dnsmasq'''
+
+ def setUp(self):
+ super().setUp()
+ self.dnsmasq = None
+ self.iface_mac = 'de:ad:be:ef:47:11'
+
+ def create_iface(self, ipv6=False, dnsmasq_opts=None):
+ '''Create test interface with DHCP server behind it'''
+
+ # add veth pair
+ subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
+ 'address', self.iface_mac,
+ 'type', 'veth', 'peer', 'name', self.if_router])
+
+ # give our router an IP
+ subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
+ subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
+ if ipv6:
+ subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
+ subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
+
+ # add DHCP server
+ self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
+ lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
+ if ipv6:
+ extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
+ else:
+ extra_opts = []
+ if dnsmasq_opts:
+ extra_opts += dnsmasq_opts
+ self.dnsmasq = subprocess.Popen(
+ ['dnsmasq', '--keep-in-foreground', '--log-queries',
+ '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
+ '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
+ '--interface=' + self.if_router, '--except-interface=lo',
+ '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
+
+ def shutdown_iface(self):
+ '''Remove test interface and stop DHCP server'''
+
+ if self.if_router:
+ subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
+ self.if_router = None
+ if self.dnsmasq:
+ self.dnsmasq.kill()
+ self.dnsmasq.wait()
+ self.dnsmasq = None
+
+ def print_server_log(self):
+ '''Print DHCP server log for debugging failures'''
+
+ with open(self.dnsmasq_log) as f:
+ sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
+
+ def test_resolved_domain_restricted_dns(self):
+ '''resolved: domain-restricted DNS servers'''
+
+ # FIXME: resolvectl query fails with enabled DNSSEC against our dnsmasq
+ conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
+ os.makedirs(os.path.dirname(conf), exist_ok=True)
+ with open(conf, 'w') as f:
+ f.write('[Resolve]\nDNSSEC=no\n')
+ self.addCleanup(os.remove, conf)
+
+ # create interface for generic connections; this will map all DNS names
+ # to 192.168.42.1
+ self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
+ self.write_network('general.network', '''\
+[Match]
+Name={}
+[Network]
+DHCP=ipv4
+IPv6AcceptRA=False'''.format(self.iface))
+
+ # create second device/dnsmasq for a .company/.lab VPN interface
+ # static IPs for simplicity
+ self.add_veth_pair('testvpnclient', 'testvpnrouter')
+ subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
+ subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
+ subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
+
+ vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
+ vpn_dnsmasq = subprocess.Popen(
+ ['dnsmasq', '--keep-in-foreground', '--log-queries',
+ '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
+ '--dhcp-leasefile=/dev/null', '--bind-interfaces',
+ '--interface=testvpnrouter', '--except-interface=lo',
+ '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
+ self.addCleanup(vpn_dnsmasq.wait)
+ self.addCleanup(vpn_dnsmasq.kill)
+
+ self.write_network('vpn.network', '''\
+[Match]
+Name=testvpnclient
+[Network]
+IPv6AcceptRA=False
+Address=10.241.3.2/24
+DNS=10.241.3.1
+Domains= ~company ~lab''')
+
+ self.start_unit('systemd-networkd')
+ subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
+ '--interface=testvpnclient', '--timeout=20'])
+
+ # ensure we start fresh with every test
+ subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
+
+ # test vpnclient specific domains; these should *not* be answered by
+ # the general DNS
+ out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
+ self.assertIn(b'math.lab: 10.241.3.3', out)
+ out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
+ self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
+
+ # test general domains
+ out = subprocess.check_output(['resolvectl', 'query', 'megasearch.net'])
+ self.assertIn(b'megasearch.net: 192.168.42.1', out)
+
+ with open(self.dnsmasq_log) as f:
+ general_log = f.read()
+ with open(vpn_dnsmasq_log) as f:
+ vpn_log = f.read()
+
+ # VPN domains should only be sent to VPN DNS
+ self.assertRegex(vpn_log, 'query.*math.lab')
+ self.assertRegex(vpn_log, 'query.*cantina.company')
+ self.assertNotIn('.lab', general_log)
+ self.assertNotIn('.company', general_log)
+
+ # general domains should not be sent to the VPN DNS
+ self.assertRegex(general_log, 'query.*megasearch.net')
+ self.assertNotIn('megasearch.net', vpn_log)
+
+ def test_resolved_etc_hosts(self):
+ '''resolved queries to /etc/hosts'''
+
+ # FIXME: -t MX query fails with enabled DNSSEC (even when using
+ # the known negative trust anchor .internal instead of .example.com)
+ conf = '/run/systemd/resolved.conf.d/test-disable-dnssec.conf'
+ os.makedirs(os.path.dirname(conf), exist_ok=True)
+ with open(conf, 'w') as f:
+ f.write('[Resolve]\nDNSSEC=no\nLLMNR=no\nMulticastDNS=no\n')
+ self.addCleanup(os.remove, conf)
+
+ # create /etc/hosts bind mount which resolves my.example.com for IPv4
+ hosts = os.path.join(self.workdir, 'hosts')
+ with open(hosts, 'w') as f:
+ f.write('172.16.99.99 my.example.com\n')
+ subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
+ self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
+ subprocess.check_call(['systemctl', 'stop', 'systemd-resolved.service'])
+
+ # note: different IPv4 address here, so that it's easy to tell apart
+ # what resolved the query
+ self.create_iface(dnsmasq_opts=['--host-record=my.example.com,172.16.99.1,2600::99:99',
+ '--host-record=other.example.com,172.16.0.42,2600::42',
+ '--mx-host=example.com,mail.example.com'],
+ ipv6=True)
+ self.do_test(coldplug=None, ipv6=True)
+
+ try:
+ # family specific queries
+ out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example.com'])
+ self.assertIn(b'my.example.com: 172.16.99.99', out)
+ # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
+ # it's considered a sufficient source
+ self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example.com']), 0)
+ # "any family" query; IPv4 should come from /etc/hosts
+ out = subprocess.check_output(['resolvectl', 'query', 'my.example.com'])
+ self.assertIn(b'my.example.com: 172.16.99.99', out)
+ # IP → name lookup; again, takes the /etc/hosts one
+ out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
+ self.assertIn(b'172.16.99.99: my.example.com', out)
+
+ # non-address RRs should fall back to DNS
+ out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example.com'])
+ self.assertIn(b'example.com IN MX 1 mail.example.com', out)
+
+ # other domains query DNS
+ out = subprocess.check_output(['resolvectl', 'query', 'other.example.com'])
+ self.assertIn(b'172.16.0.42', out)
+ out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
+ self.assertIn(b'172.16.0.42: other.example.com', out)
+ except (AssertionError, subprocess.CalledProcessError):
+ self.show_journal('systemd-resolved.service')
+ self.print_server_log()
+ raise
+
+ def test_transient_hostname(self):
+ '''networkd sets transient hostname from DHCP'''
+
+ orig_hostname = socket.gethostname()
+ self.addCleanup(socket.sethostname, orig_hostname)
+ # temporarily move /etc/hostname away; restart hostnamed to pick it up
+ if os.path.exists('/etc/hostname'):
+ subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
+ self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
+ subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
+ self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
+
+ self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
+ self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
+
+ try:
+ # should have received the fixed IP above
+ out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
+ self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
+ # should have set transient hostname in hostnamed; this is
+ # sometimes a bit lagging (issue #4753), so retry a few times
+ for retry in range(1, 6):
+ out = subprocess.check_output(['hostnamectl'])
+ if b'testgreen' in out:
+ break
+ time.sleep(5)
+ sys.stdout.write('[retry %i] ' % retry)
+ sys.stdout.flush()
+ else:
+ self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
+ # and also applied to the system
+ self.assertEqual(socket.gethostname(), 'testgreen')
+ except AssertionError:
+ self.show_journal('systemd-networkd.service')
+ self.show_journal('systemd-hostnamed.service')
+ self.print_server_log()
+ raise
+
+ def test_transient_hostname_with_static(self):
+ '''transient hostname is not applied if static hostname exists'''
+
+ orig_hostname = socket.gethostname()
+ self.addCleanup(socket.sethostname, orig_hostname)
+
+ if not os.path.exists('/etc/hostname'):
+ self.write_config('/etc/hostname', "foobarqux")
+ else:
+ self.write_config('/run/hostname.tmp', "foobarqux")
+ subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname'])
+ self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
+
+ socket.sethostname("foobarqux");
+
+ subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
+ self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
+
+ self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
+ self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
+
+ try:
+ # should have received the fixed IP above
+ out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
+ self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
+ # static hostname wins over transient one, thus *not* applied
+ self.assertEqual(socket.gethostname(), "foobarqux")
+ except AssertionError:
+ self.show_journal('systemd-networkd.service')
+ self.show_journal('systemd-hostnamed.service')
+ self.print_server_log()
+ raise
+
+
+class NetworkdClientTest(ClientTestBase, unittest.TestCase):
+ '''Test networkd client against networkd server'''
+
+ def setUp(self):
+ super().setUp()
+ self.dnsmasq = None
+
+ def create_iface(self, ipv6=False, dhcpserver_opts=None):
+ '''Create test interface with DHCP server behind it'''
+
+ # run "router-side" networkd in own mount namespace to shield it from
+ # "client-side" configuration and networkd
+ (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
+ self.addCleanup(os.remove, script)
+ with os.fdopen(fd, 'w+') as f:
+ f.write('''\
+#!/bin/sh
+set -eu
+mkdir -p /run/systemd/network
+mkdir -p /run/systemd/netif
+mount -t tmpfs none /run/systemd/network
+mount -t tmpfs none /run/systemd/netif
+[ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
+# create router/client veth pair
+cat << EOF > /run/systemd/network/test.netdev
+[NetDev]
+Name=%(ifr)s
+Kind=veth
+
+[Peer]
+Name=%(ifc)s
+EOF
+
+cat << EOF > /run/systemd/network/test.network
+[Match]
+Name=%(ifr)s
+
+[Network]
+Address=192.168.5.1/24
+%(addr6)s
+DHCPServer=yes
+
+[DHCPServer]
+PoolOffset=10
+PoolSize=50
+DNS=192.168.5.1
+%(dhopts)s
+EOF
+
+# run networkd as in systemd-networkd.service
+exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p}')
+''' % {'ifr': self.if_router, 'ifc': self.iface, 'addr6': ipv6 and 'Address=2600::1/64' or '',
+ 'dhopts': dhcpserver_opts or ''})
+
+ os.fchmod(fd, 0o755)
+
+ subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
+ '-p', 'InaccessibleDirectories=-/etc/systemd/network',
+ '-p', 'InaccessibleDirectories=-/run/systemd/network',
+ '-p', 'InaccessibleDirectories=-/run/systemd/netif',
+ '--service-type=notify', script])
+
+ # wait until devices got created
+ for _ in range(50):
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
+ if b'state UP' in out and b'scope global' in out:
+ break
+ time.sleep(0.1)
+
+ def shutdown_iface(self):
+ '''Remove test interface and stop DHCP server'''
+
+ if self.if_router:
+ subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
+ # ensure failed transient unit does not stay around
+ subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
+ subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
+ self.if_router = None
+
+ def print_server_log(self):
+ '''Print DHCP server log for debugging failures'''
+
+ self.show_journal('networkd-test-router.service')
+
+ @unittest.skip('networkd does not have DHCPv6 server support')
+ def test_hotplug_dhcp_ip6(self):
+ pass
+
+ @unittest.skip('networkd does not have DHCPv6 server support')
+ def test_coldplug_dhcp_ip6(self):
+ pass
+
+ def test_search_domains(self):
+
+ # we don't use this interface for this test
+ self.if_router = None
+
+ self.write_network('test.netdev', '''\
+[NetDev]
+Name=dummy0
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+ self.write_network('test.network', '''\
+[Match]
+Name=dummy0
+[Network]
+Address=192.168.42.100/24
+DNS=192.168.42.1
+Domains= one two three four five six seven eight nine ten''')
+
+ self.start_unit('systemd-networkd')
+
+ for timeout in range(50):
+ with open(RESOLV_CONF) as f:
+ contents = f.read()
+ if ' one' in contents:
+ break
+ time.sleep(0.1)
+ self.assertRegex(contents, 'search .*one two three four five six seven eight nine ten')
+
+ def test_dropin(self):
+ # we don't use this interface for this test
+ self.if_router = None
+
+ self.write_network('test.netdev', '''\
+[NetDev]
+Name=dummy0
+Kind=dummy
+MACAddress=12:34:56:78:9a:bc''')
+ self.write_network('test.network', '''\
+[Match]
+Name=dummy0
+[Network]
+Address=192.168.42.100/24
+DNS=192.168.42.1''')
+ self.write_network_dropin('test.network', 'dns', '''\
+[Network]
+DNS=127.0.0.1''')
+
+ self.start_unit('systemd-resolved')
+ self.start_unit('systemd-networkd')
+
+ for timeout in range(50):
+ with open(RESOLV_CONF) as f:
+ contents = f.read()
+ if ' 127.0.0.1' in contents and '192.168.42.1' in contents:
+ break
+ time.sleep(0.1)
+ self.assertIn('nameserver 192.168.42.1\n', contents)
+ self.assertIn('nameserver 127.0.0.1\n', contents)
+
+ def test_dhcp_timezone(self):
+ '''networkd sets time zone from DHCP'''
+
+ def get_tz():
+ out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
+ '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
+ assert out.startswith(b's "')
+ out = out.strip()
+ assert out.endswith(b'"')
+ return out[3:-1].decode()
+
+ orig_timezone = get_tz()
+ self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
+
+ self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
+ self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
+
+ # should have applied the received timezone
+ try:
+ self.assertEqual(get_tz(), 'Pacific/Honolulu')
+ except AssertionError:
+ self.show_journal('systemd-networkd.service')
+ self.show_journal('systemd-hostnamed.service')
+ raise
+
+
+class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
+ """Test [Match] sections in .network files.
+
+ Be aware that matching the test host's interfaces will wipe their
+ configuration, so as a precaution, all network files should have a
+ restrictive [Match] section to only ever interfere with the
+ temporary veth interfaces created here.
+ """
+
+ def tearDown(self):
+ """Stop networkd."""
+ subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
+ subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
+
+ def test_basic_matching(self):
+ """Verify the Name= line works throughout this class."""
+ self.add_veth_pair('test_if1', 'fake_if2')
+ self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+ self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
+
+ def test_inverted_matching(self):
+ """Verify that a '!'-prefixed value inverts the match."""
+ # Use a MAC address as the interfaces' common matching attribute
+ # to avoid depending on udev, to support testing in containers.
+ mac = '00:01:02:03:98:99'
+ self.add_veth_pair('test_veth', 'test_peer',
+ ['addr', mac], ['addr', mac])
+ self.write_network('no-veth.network', """\
+[Match]
+MACAddress={}
+Name=!nonexistent *peer*
+[Network]""".format(mac))
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+ self.assert_link_states(test_veth='managed', test_peer='unmanaged')
+
+
+class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
+ """Test if networkd manages the correct interfaces."""
+
+ def setUp(self):
+ """Write .network files to match the named veth devices."""
+ # Define the veth+peer pairs to be created.
+ # Their pairing doesn't actually matter, only their names do.
+ self.veths = {
+ 'm1def': 'm0unm',
+ 'm1man': 'm1unm',
+ }
+
+ # Define the contents of .network files to be read in order.
+ self.configs = (
+ "[Match]\nName=m1def\n",
+ "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
+ "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
+ )
+
+ # Write out the .network files to be cleaned up automatically.
+ for i, config in enumerate(self.configs):
+ self.write_network("%02d-test.network" % i, config)
+
+ def tearDown(self):
+ """Stop networkd."""
+ subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
+ subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
+
+ def create_iface(self):
+ """Create temporary veth pairs for interface matching."""
+ for veth, peer in self.veths.items():
+ self.add_veth_pair(veth, peer)
+
+ def test_unmanaged_setting(self):
+ """Verify link states with Unmanaged= settings, hot-plug."""
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+ self.create_iface()
+ self.assert_link_states(m1def='managed',
+ m1man='managed',
+ m1unm='unmanaged',
+ m0unm='unmanaged')
+
+ def test_unmanaged_setting_coldplug(self):
+ """Verify link states with Unmanaged= settings, cold-plug."""
+ self.create_iface()
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+ self.assert_link_states(m1def='managed',
+ m1man='managed',
+ m1unm='unmanaged',
+ m0unm='unmanaged')
+
+ def test_catchall_config(self):
+ """Verify link states with a catch-all config, hot-plug."""
+ # Don't actually catch ALL interfaces. It messes up the host.
+ self.write_network('all.network', "[Match]\nName=m[01]???\n")
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+ self.create_iface()
+ self.assert_link_states(m1def='managed',
+ m1man='managed',
+ m1unm='unmanaged',
+ m0unm='managed')
+
+ def test_catchall_config_coldplug(self):
+ """Verify link states with a catch-all config, cold-plug."""
+ # Don't actually catch ALL interfaces. It messes up the host.
+ self.write_network('all.network', "[Match]\nName=m[01]???\n")
+ self.create_iface()
+ subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
+ self.assert_link_states(m1def='managed',
+ m1man='managed',
+ m1unm='unmanaged',
+ m0unm='managed')
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
+ verbosity=2))