diff options
Diffstat (limited to '')
-rwxr-xr-x | debian/tests/boot-and-services | 569 |
1 files changed, 569 insertions, 0 deletions
diff --git a/debian/tests/boot-and-services b/debian/tests/boot-and-services new file mode 100755 index 0000000..1208922 --- /dev/null +++ b/debian/tests/boot-and-services @@ -0,0 +1,569 @@ +#!/usr/bin/python3 +# autopkgtest check: Boot with systemd and check critical desktop services +# (C) 2014 Canonical Ltd. +# Author: Martin Pitt <martin.pitt@ubuntu.com> + +import sys +import os +import unittest +import subprocess +import tempfile +import shutil +import time +import re +from glob import glob + +is_container = subprocess.call(['systemd-detect-virt', '--container']) == 0 + + +def wait_unit_stop(unit, timeout=10): + '''Wait until given unit is not running any more + + Raise RuntimeError on timeout. + ''' + for i in range(timeout): + if subprocess.call(['systemctl', 'is-active', '--quiet', unit]) != 0: + return + time.sleep(1) + + raise RuntimeError('Timed out waiting for %s to stop' % unit) + + +class ServicesTest(unittest.TestCase): + '''Check that expected services are running''' + + def test_0_init(self): + '''Verify that init is systemd''' + + self.assertIn('systemd', os.readlink('/proc/1/exe')) + + def test_no_failed(self): + '''No failed units''' + + out = subprocess.check_output( + ['systemctl', '--state=failed', '--no-legend', '--plain'], + universal_newlines=True) + failed = out.splitlines() + # ignore /etc/modules failure as stuff that we put there by default + # often fails + failed = [f for f in failed if 'systemd-modules-load' not in f] + # apparmor fails if not enabled in the kernel + if not os.path.exists('/sys/kernel/security/apparmor'): + failed = [f for f in failed if 'apparmor.service' not in f] + # ignore thermald as it doesn't start in most virtual envs + failed = [f for f in failed if 'thermald' not in f] + # console-setup.service fails on devices without keyboard (LP: #1516591) + failed = [f for f in failed if 'console-setup' not in f] + # cpi.service fails on s390x + failed = [f for f in failed if 'cpi.service' not in f] + # https://bugs.debian.org/969568 + failed = [f for f in failed if 'rng-tools-debian.service' not in f] + # https://bugs.debian.org/926138 + if is_container: + failed = [f for f in failed if 'e2scrub_reap.service' not in f] + if failed: + for f in failed: + f = f.split()[0] + print('-------- journal for failed service %s -----------' % f) + sys.stdout.flush() + subprocess.call(['journalctl', '-b', '-u', f]) + self.assertEqual(failed, []) + + @unittest.skipUnless(shutil.which('gdm3') is not None, 'gdm3 not found') + def test_gdm3(self): + subprocess.check_call(['pgrep', '-af', '/gdm[-3]']) + self.active_unit('gdm') + + def test_dbus(self): + out = subprocess.check_output( + ['dbus-send', '--print-reply', '--system', + '--dest=org.freedesktop.DBus', '/', 'org.freedesktop.DBus.GetId']) + self.assertIn(b'string "', out) + self.active_unit('dbus') + + def test_network_manager(self): + # 0.9.10 changed the command name + _help = subprocess.check_output(['nmcli', '--help'], + stderr=subprocess.STDOUT) + if b' g[eneral]' in _help: + out = subprocess.check_output(['nmcli', 'general']) + else: + out = subprocess.check_output(['nmcli', 'nm']) + self.assertIn(b'enabled', out) + self.active_unit('NetworkManager') + + def test_cron(self): + out = subprocess.check_output(['ps', 'u', '-C', 'cron']) + self.assertIn(b'root', out) + self.active_unit('cron') + + def test_logind(self): + out = subprocess.check_output(['loginctl']) + self.assertNotEqual(b'', out) + self.active_unit('systemd-logind') + + @unittest.skipIf('TEST_UPSTREAM' in os.environ, + 'Forwarding to rsyslog is a Debian patch') + def test_rsyslog(self): + out = subprocess.check_output(['ps', 'u', '-C', 'rsyslogd']) + self.assertIn(b'bin/rsyslogd', out) + self.active_unit('rsyslog') + with open('/var/log/syslog') as f: + log = f.read() + if not is_container: + # has kernel messages + self.assertRegex(log, 'kernel:.*') + # has init messages + self.assertRegex(log, 'systemd.*Reached target(?: graphical.target -)? Graphical Interface') + # has other services + self.assertRegex(log, 'NetworkManager.*:') + + @unittest.skipIf(is_container, 'udev does not work in containers') + def test_udev(self): + out = subprocess.check_output(['udevadm', 'info', '--export-db']) + self.assertIn(b'\nP: /devices/', out) + self.active_unit('systemd-udevd') + + def test_tmp_mount(self): + # check if we want to mount /tmp in fstab + want_tmp_mount = False + try: + with open('/etc/fstab') as f: + for l in f: + try: + if not l.startswith('#') and l.split()[1] in ('/tmp', '/tmp/'): + want_tmp_mount = True + break + except IndexError: + pass + except FileNotFoundError: + pass + + # ensure that we actually do/don't have a /tmp mount + (status, status_out) = subprocess.getstatusoutput('systemctl status tmp.mount') + findmnt = subprocess.call(['findmnt', '-n', '/tmp'], stdout=subprocess.PIPE) + if want_tmp_mount: + self.assertEqual(status, 0, status_out) + self.assertEqual(findmnt, 0) + else: + # 4 is correct (since upstream commit ca473d57), accept 3 for systemd <= 230 + self.assertIn(status, [3, 4], status_out) + self.assertNotEqual(findmnt, 0) + + @unittest.skipIf('TEST_UPSTREAM' in os.environ, + 'Debian specific configuration, N/A for upstream') + def test_tmp_cleanup(self): + # systemd-tmpfiles-clean.timer only runs 15 mins after boot, shortcut + # it + self.assertEqual(subprocess.call( + ['systemctl', 'status', 'systemd-tmpfiles-clean.timer'], + stdout=subprocess.PIPE), 0) + subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean']) + if not is_container: + # all files in /tmp/ should get cleaned up on boot + self.assertFalse(os.path.exists('/tmp/oldfile.test')) + self.assertFalse(os.path.exists('/tmp/newfile.test')) + # files in /var/tmp/ older than 30d should get cleaned up + # XXX FIXME: /var/tmp/ cleanup was disabled in #675422 + # if not is_container: + # self.assertFalse(os.path.exists('/var/tmp/oldfile.test')) + self.assertTrue(os.path.exists('/var/tmp/newfile.test')) + + # next run should leave the recent ones + os.close(os.open('/tmp/newfile.test', + os.O_CREAT | os.O_EXCL | os.O_WRONLY)) + subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean']) + wait_unit_stop('systemd-tmpfiles-clean') + self.assertTrue(os.path.exists('/tmp/newfile.test')) + + # Helper methods + + def active_unit(self, unit): + '''Check that given unit is active''' + + out = subprocess.check_output(['systemctl', 'status', unit]) + self.assertIn(b'active (running)', out) + + +class JournalTest(unittest.TestCase): + '''Check journal functionality''' + + def test_no_options(self): + out = subprocess.check_output(['journalctl']) + if not is_container: + # has kernel messages + self.assertRegex(out, b'kernel:.*') + # has init messages + self.assertRegex(out, b'systemd.*Reached target(?: graphical.target -)? Graphical Interface') + # has other services + self.assertRegex(out, b'NetworkManager.*:.*starting') + + def test_log_for_service(self): + out = subprocess.check_output( + ['journalctl', '_SYSTEMD_UNIT=NetworkManager.service']) + self.assertRegex(out, b'NetworkManager.*:.*starting') + self.assertNotIn(b'kernel:', out) + self.assertNotIn(b'systemd:', out) + + +@unittest.skipIf(is_container, 'nspawn does not work in most containers') +class NspawnTest(unittest.TestCase): + '''Check nspawn''' + + @classmethod + def setUpClass(kls): + '''Build a bootable busybox mini-container''' + + kls.td_c_busybox = tempfile.TemporaryDirectory(prefix='c_busybox.') + kls.c_busybox = kls.td_c_busybox.name + for d in ['etc/init.d', 'bin', 'sbin']: + os.makedirs(os.path.join(kls.c_busybox, d)) + shutil.copy('/bin/busybox', os.path.join(kls.c_busybox, 'bin')) + shutil.copy('/etc/os-release', os.path.join(kls.c_busybox, 'etc')) + os.symlink('busybox', os.path.join(kls.c_busybox, 'bin', 'sh')) + os.symlink('../bin/busybox', os.path.join(kls.c_busybox, 'sbin/init')) + with open(os.path.join(kls.c_busybox, 'etc/init.d/rcS'), 'w') as f: + f.write('''#!/bin/sh +echo fake container started +ps aux +poweroff\n''') + os.fchmod(f.fileno(), 0o755) + subprocess.check_call(['systemd-machine-id-setup', '--root', + kls.c_busybox], stderr=subprocess.PIPE) + + def setUp(self): + self.workdir = tempfile.TemporaryDirectory() + + def test_boot(self): + cont = os.path.join(self.workdir.name, 'c1') + shutil.copytree(self.c_busybox, cont, symlinks=True) + os.sync() + nspawn = subprocess.Popen(['systemd-nspawn', '-D', cont, '-b'], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + out = nspawn.communicate(timeout=60)[0] + self.assertIn(b'Spawning container c1', out) + self.assertIn(b'fake container started', out) + self.assertRegex(out, b'\n\s+1\s+0\s+init[\r\n]') + self.assertRegex(out, b'\n\s+2+\s+0\s.*rcS[\r\n]') + self.assertRegex(out, b'Container c1.*shut down') + self.assertEqual(nspawn.returncode, 0) + + def test_service(self): + self.assertTrue(os.path.isdir('/var/lib/machines')) + cont = '/var/lib/machines/c1' + shutil.copytree(self.c_busybox, cont, symlinks=True) + self.addCleanup(shutil.rmtree, cont) + os.sync() + subprocess.check_call(['systemctl', 'start', 'systemd-nspawn@c1']) + wait_unit_stop('systemd-nspawn@c1') + + subprocess.call(['journalctl', '--sync']) + systemctl = subprocess.Popen( + ['systemctl', 'status', '-overbose', '-l', 'systemd-nspawn@c1'], + stdout=subprocess.PIPE) + out = systemctl.communicate()[0].decode('UTF-8', 'replace') + self.assertEqual(systemctl.returncode, 3, out) + self.assertNotIn('failed', out) + + +@unittest.skipUnless(os.path.exists('/sys/kernel/security/apparmor'), + 'AppArmor not enabled') +class AppArmorTest(unittest.TestCase): + def test_profile(self): + '''AppArmor confined unit''' + + # create AppArmor profile + aa_profile = tempfile.NamedTemporaryFile(prefix='aa_violator.') + aa_profile.write(b'''#include <tunables/global> + +profile "violator-test" { + #include <abstractions/base> + + /{usr/,}bin/** rix, + /etc/machine-id r, +} +''') + aa_profile.flush() + subprocess.check_call(['apparmor_parser', '-r', '-v', aa_profile.name]) + + # create confined unit + with open('/run/systemd/system/violator.service', 'w') as f: + f.write('''[Unit] +Description=AppArmor test + +[Service] +ExecStart=/bin/sh -euc 'echo CP1; cat /etc/machine-id; echo CP2; if cat /etc/passwd; then exit 1; fi; echo CP3' +AppArmorProfile=violator-test +''') + self.addCleanup(os.unlink, '/run/systemd/system/violator.service') + + # launch + subprocess.check_call(['systemctl', 'daemon-reload']) + subprocess.check_call(['systemctl', 'start', 'violator.service']) + wait_unit_stop('violator.service') + + # check status + st = subprocess.Popen(['systemctl', 'status', '-l', + 'violator.service'], stdout=subprocess.PIPE, + universal_newlines=True) + out = st.communicate()[0] + # unit should be stopped + self.assertEqual(st.returncode, 3) + + self.assertIn('inactive', out) + self.assertIn('CP1', out) + self.assertIn('CP2', out) + self.assertIn('CP3', out) + with open('/etc/machine-id') as f: + self.assertIn(f.read().strip(), out) + self.assertNotIn('root:x', out, 'unit can read /etc/passwd') + + +@unittest.skipIf(os.path.exists('/sys/fs/cgroup/cgroup.controllers'), + 'test needs to be reworked on unified cgroup hierarchy') +class CgroupsTest(unittest.TestCase): + '''Check cgroup setup''' + + @classmethod + def setUpClass(kls): + kls.controllers = [] + for controller in glob('/sys/fs/cgroup/*'): + if not os.path.islink(controller): + kls.controllers.append(controller) + + def setUp(self): + self.service = 'testsrv.service' + self.service_file = '/run/systemd/system/' + self.service + + def tearDown(self): + subprocess.call(['systemctl', 'stop', self.service], + stderr=subprocess.PIPE) + try: + os.unlink(self.service_file) + except OSError: + pass + subprocess.check_call(['systemctl', 'daemon-reload']) + + def create_service(self, extra_service=''): + '''Create test service unit''' + + with open(self.service_file, 'w') as f: + f.write('''[Unit] +Description=test service +[Service] +ExecStart=/bin/sleep 500 +%s +''' % extra_service) + subprocess.check_call(['systemctl', 'daemon-reload']) + + def assertNoControllers(self): + '''Assert that no cgroup controllers exist for test service''' + + cs = glob('/sys/fs/cgroup/*/system.slice/%s' % self.service) + self.assertEqual(cs, []) + + def assertController(self, name): + '''Assert that cgroup controller exists for test service''' + + c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service) + self.assertTrue(os.path.isdir(c)) + + def assertNoController(self, name): + '''Assert that cgroup controller does not exist for test service''' + + c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service) + self.assertFalse(os.path.isdir(c)) + + def test_simple(self): + '''simple service''' + + self.create_service() + self.assertNoControllers() + subprocess.check_call(['systemctl', 'start', self.service]) + self.assertController('systemd') + subprocess.check_call(['systemctl', 'stop', self.service]) + self.assertNoControllers() + + def test_cpushares(self): + '''service with CPUShares''' + + self.create_service('CPUShares=1000') + self.assertNoControllers() + subprocess.check_call(['systemctl', 'start', self.service]) + self.assertController('systemd') + self.assertController('cpu,cpuacct') + subprocess.check_call(['systemctl', 'stop', self.service]) + self.assertNoControllers() + + +class SeccompTest(unittest.TestCase): + '''Check seccomp syscall filtering''' + + def test_failing(self): + with open('/run/systemd/system/scfail.service', 'w') as f: + f.write('''[Unit] +Description=seccomp test +[Service] +ExecStart=/bin/cat /etc/machine-id +SystemCallFilter=access +''') + self.addCleanup(os.unlink, '/run/systemd/system/scfail.service') + + # launch + subprocess.check_call(['systemctl', 'daemon-reload']) + subprocess.check_call(['systemctl', 'start', 'scfail.service']) + wait_unit_stop('scfail.service') + + # check status + st = subprocess.Popen(['systemctl', 'status', '-l', + 'scfail.service'], stdout=subprocess.PIPE) + out = st.communicate()[0] + # unit should be stopped + self.assertEqual(st.returncode, 3) + + subprocess.check_call(['systemctl', 'reset-failed', 'scfail.service']) + + self.assertIn(b'failed', out) + self.assertRegex(out, b'code=(killed|dumped), signal=SYS') + with open('/etc/machine-id') as f: + self.assertNotIn(f.read().strip().encode('ASCII'), out) + + +@unittest.skipIf(is_container, 'systemd-coredump does not work in containers') +class CoredumpTest(unittest.TestCase): + '''Check systemd-coredump''' + + def test_bash_crash(self): + subprocess.call("ulimit -c unlimited; bash -c 'kill -SEGV $$'", shell=True, + cwd='/tmp', stderr=subprocess.DEVNULL) + + # with systemd-coredump installed we should get the core dumps in + # systemd's dir + for timeout in range(50): + cores = glob('/var/lib/systemd/coredump/core.bash.*') + if cores: + break + time.sleep(1) + self.assertNotEqual(cores, []) + self.assertEqual(glob('/tmp/core*'), []) + + # we should also get a message and stack trace in journal + for timeout in range(10): + subprocess.call(['journalctl', '--sync']) + journal = subprocess.check_output(['journalctl', '-t', 'systemd-coredump']) + if re.search(b'Process.*bash.*dumped core', journal) and \ + re.search(b'#[0-9] .*bash', journal): + break + time.sleep(1) + self.assertRegex(journal, b'Process.*bash.*dumped core') + self.assertIn(b'Stack trace', journal) + self.assertRegex(journal, b'#[0-9] .*bash') + + +class CLITest(unittest.TestCase): + def setUp(self): + self.programs = [] + for line in subprocess.check_output(['dpkg', '-L', 'systemd', 'systemd-container', 'systemd-coredump', 'udev'], + universal_newlines=True).splitlines(): + if '/bin/' in line: + self.programs.append(line.strip()) + + def test_help(self): + '--help works and succeeds''' + + for program in self.programs: + p = subprocess.Popen([program, '--help'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + (out, err) = p.communicate() + try: + self.assertEqual(err, '') + self.assertEqual(p.returncode, 0) + self.assertIn(os.path.basename(program), out) + self.assertTrue('--help' in out or 'Usage' in out, out) + except AssertionError: + print('Failed program: %s' % program) + raise + + def test_version(self): + '--version works and succeeds''' + + version = subprocess.check_output(['pkg-config', '--modversion', 'systemd'], + universal_newlines=True).strip() + + for program in self.programs: + # known to not respond to --version + if os.path.basename(program) in ['kernel-install', 'systemd-ask-password', 'systemd-stdio-bridge']: + continue + p = subprocess.Popen([program, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + (out, err) = p.communicate() + try: + self.assertEqual(err, '') + self.assertEqual(p.returncode, 0) + self.assertIn(version, out) + except AssertionError: + print('Failed program: %s' % program) + raise + + def test_invalid_option(self): + '''Calling with invalid option fails''' + + for program in self.programs: + p = subprocess.Popen([program, '--invalid-option'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + (out, err) = p.communicate() + try: + # kernel-install is an unique snowflake + if not program.endswith('/kernel-install'): + self.assertIn('--invalid-option', err) + self.assertNotEqual(p.returncode, 0) + except AssertionError: + print('Failed program: %s' % program) + raise + + +def pre_boot_setup(): + '''Test setup before rebooting testbed''' + + subprocess.check_call(['systemctl', 'set-default', 'graphical.target'], + stderr=subprocess.STDOUT) + + # This test installs network-manager, which seems to cause + # systemd-networkd-wait-online to be stuck as they conflict, + # so systemctl start network-online.target ran by autopkgtest + # gets stuck, at least in Debian Bullseye images. + # https://salsa.debian.org/ci-team/autopkgtest/-/blob/debian/5.21/virt/autopkgtest-virt-lxc#L131 + subprocess.check_call(['systemctl', 'disable', 'systemd-networkd.service'], + stderr=subprocess.STDOUT) + + # create a few temporary files to ensure that they get cleaned up on boot + os.close(os.open('/tmp/newfile.test', + os.O_CREAT | os.O_EXCL | os.O_WRONLY)) + os.close(os.open('/var/tmp/newfile.test', + os.O_CREAT | os.O_EXCL | os.O_WRONLY)) + # we can't use utime() here, as systemd looks for ctime + if not is_container: + cur_time = time.clock_gettime(time.CLOCK_REALTIME) + time.clock_settime(time.CLOCK_REALTIME, cur_time - 2 * 30 * 86400) + try: + os.close(os.open('/tmp/oldfile.test', + os.O_CREAT | os.O_EXCL | os.O_WRONLY)) + os.close(os.open('/var/tmp/oldfile.test', + os.O_CREAT | os.O_EXCL | os.O_WRONLY)) + finally: + time.clock_settime(time.CLOCK_REALTIME, cur_time) + + # allow X to start even on headless machines + os.makedirs('/etc/X11/xorg.conf.d/', exist_ok=True) + with open('/etc/X11/xorg.conf.d/dummy.conf', 'w') as f: + f.write('''Section "Device" + Identifier "test" + Driver "dummy" +EndSection''') + + +if __name__ == '__main__': + if not os.getenv('AUTOPKGTEST_REBOOT_MARK'): + pre_boot_setup() + print('Rebooting...') + subprocess.check_call(['/tmp/autopkgtest-reboot', 'boot1']) + + unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, + verbosity=2)) |