#!/usr/bin/python3 # autopkgtest check: Boot with systemd and check critical desktop services # (C) 2014 Canonical Ltd. # Author: Martin Pitt import sys import os import unittest import subprocess import tempfile import shutil import time import re from glob import glob is_container = subprocess.call(['systemd-detect-virt', '--container']) == 0 def wait_unit_stop(unit, timeout=10): '''Wait until given unit is not running any more Raise RuntimeError on timeout. ''' for i in range(timeout): if subprocess.call(['systemctl', 'is-active', '--quiet', unit]) != 0: return time.sleep(1) raise RuntimeError('Timed out waiting for %s to stop' % unit) class ServicesTest(unittest.TestCase): '''Check that expected services are running''' def test_0_init(self): '''Verify that init is systemd''' self.assertIn('systemd', os.readlink('/proc/1/exe')) def test_no_failed(self): '''No failed units''' out = subprocess.check_output( ['systemctl', '--state=failed', '--no-legend', '--plain'], universal_newlines=True) failed = out.splitlines() # ignore /etc/modules failure as stuff that we put there by default # often fails failed = [f for f in failed if 'systemd-modules-load' not in f] # apparmor fails if not enabled in the kernel if not os.path.exists('/sys/kernel/security/apparmor'): failed = [f for f in failed if 'apparmor.service' not in f] # ignore thermald as it doesn't start in most virtual envs failed = [f for f in failed if 'thermald' not in f] # console-setup.service fails on devices without keyboard (LP: #1516591) failed = [f for f in failed if 'console-setup' not in f] # cpi.service fails on s390x failed = [f for f in failed if 'cpi.service' not in f] # https://bugs.debian.org/969568 failed = [f for f in failed if 'rng-tools-debian.service' not in f] # https://bugs.debian.org/926138 if is_container: failed = [f for f in failed if 'e2scrub_reap.service' not in f] if failed: for f in failed: f = f.split()[0] print('-------- journal for failed service %s -----------' % f) sys.stdout.flush() subprocess.call(['journalctl', '-b', '-u', f]) self.assertEqual(failed, []) @unittest.skipUnless(shutil.which('gdm3') is not None, 'gdm3 not found') def test_gdm3(self): subprocess.check_call(['pgrep', '-af', '/gdm[-3]']) self.active_unit('gdm') def test_dbus(self): out = subprocess.check_output( ['dbus-send', '--print-reply', '--system', '--dest=org.freedesktop.DBus', '/', 'org.freedesktop.DBus.GetId']) self.assertIn(b'string "', out) self.active_unit('dbus') def test_network_manager(self): # 0.9.10 changed the command name _help = subprocess.check_output(['nmcli', '--help'], stderr=subprocess.STDOUT) if b' g[eneral]' in _help: out = subprocess.check_output(['nmcli', 'general']) else: out = subprocess.check_output(['nmcli', 'nm']) self.assertIn(b'enabled', out) self.active_unit('NetworkManager') def test_cron(self): out = subprocess.check_output(['ps', 'u', '-C', 'cron']) self.assertIn(b'root', out) self.active_unit('cron') def test_logind(self): out = subprocess.check_output(['loginctl']) self.assertNotEqual(b'', out) self.active_unit('systemd-logind') @unittest.skipIf('TEST_UPSTREAM' in os.environ, 'Forwarding to rsyslog is a Debian patch') def test_rsyslog(self): out = subprocess.check_output(['ps', 'u', '-C', 'rsyslogd']) self.assertIn(b'bin/rsyslogd', out) self.active_unit('rsyslog') with open('/var/log/syslog') as f: log = f.read() if not is_container: # has kernel messages self.assertRegex(log, 'kernel:.*') # has init messages self.assertRegex(log, 'systemd.*Reached target(?: graphical.target -)? Graphical Interface') # has other services self.assertRegex(log, 'NetworkManager.*:') @unittest.skipIf(is_container, 'udev does not work in containers') def test_udev(self): out = subprocess.check_output(['udevadm', 'info', '--export-db']) self.assertIn(b'\nP: /devices/', out) self.active_unit('systemd-udevd') def test_tmp_mount(self): # check if we want to mount /tmp in fstab want_tmp_mount = False try: with open('/etc/fstab') as f: for l in f: try: if not l.startswith('#') and l.split()[1] in ('/tmp', '/tmp/'): want_tmp_mount = True break except IndexError: pass except FileNotFoundError: pass # ensure that we actually do/don't have a /tmp mount (status, status_out) = subprocess.getstatusoutput('systemctl status tmp.mount') findmnt = subprocess.call(['findmnt', '-n', '/tmp'], stdout=subprocess.PIPE) if want_tmp_mount: self.assertEqual(status, 0, status_out) self.assertEqual(findmnt, 0) else: # 4 is correct (since upstream commit ca473d57), accept 3 for systemd <= 230 self.assertIn(status, [3, 4], status_out) self.assertNotEqual(findmnt, 0) @unittest.skipIf('TEST_UPSTREAM' in os.environ, 'Debian specific configuration, N/A for upstream') def test_tmp_cleanup(self): # systemd-tmpfiles-clean.timer only runs 15 mins after boot, shortcut # it self.assertEqual(subprocess.call( ['systemctl', 'status', 'systemd-tmpfiles-clean.timer'], stdout=subprocess.PIPE), 0) subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean']) if not is_container: # all files in /tmp/ should get cleaned up on boot self.assertFalse(os.path.exists('/tmp/oldfile.test')) self.assertFalse(os.path.exists('/tmp/newfile.test')) # files in /var/tmp/ older than 30d should get cleaned up # XXX FIXME: /var/tmp/ cleanup was disabled in #675422 # if not is_container: # self.assertFalse(os.path.exists('/var/tmp/oldfile.test')) self.assertTrue(os.path.exists('/var/tmp/newfile.test')) # next run should leave the recent ones os.close(os.open('/tmp/newfile.test', os.O_CREAT | os.O_EXCL | os.O_WRONLY)) subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean']) wait_unit_stop('systemd-tmpfiles-clean') self.assertTrue(os.path.exists('/tmp/newfile.test')) # Helper methods def active_unit(self, unit): '''Check that given unit is active''' out = subprocess.check_output(['systemctl', 'status', unit]) self.assertIn(b'active (running)', out) class JournalTest(unittest.TestCase): '''Check journal functionality''' def test_no_options(self): out = subprocess.check_output(['journalctl']) if not is_container: # has kernel messages self.assertRegex(out, b'kernel:.*') # has init messages self.assertRegex(out, b'systemd.*Reached target(?: graphical.target -)? Graphical Interface') # has other services self.assertRegex(out, b'NetworkManager.*:.*starting') def test_log_for_service(self): out = subprocess.check_output( ['journalctl', '_SYSTEMD_UNIT=NetworkManager.service']) self.assertRegex(out, b'NetworkManager.*:.*starting') self.assertNotIn(b'kernel:', out) self.assertNotIn(b'systemd:', out) @unittest.skipIf(is_container, 'nspawn does not work in most containers') class NspawnTest(unittest.TestCase): '''Check nspawn''' @classmethod def setUpClass(kls): '''Build a bootable busybox mini-container''' kls.td_c_busybox = tempfile.TemporaryDirectory(prefix='c_busybox.') kls.c_busybox = kls.td_c_busybox.name for d in ['etc/init.d', 'bin', 'sbin']: os.makedirs(os.path.join(kls.c_busybox, d)) shutil.copy('/bin/busybox', os.path.join(kls.c_busybox, 'bin')) shutil.copy('/etc/os-release', os.path.join(kls.c_busybox, 'etc')) os.symlink('busybox', os.path.join(kls.c_busybox, 'bin', 'sh')) os.symlink('../bin/busybox', os.path.join(kls.c_busybox, 'sbin/init')) with open(os.path.join(kls.c_busybox, 'etc/init.d/rcS'), 'w') as f: f.write('''#!/bin/sh echo fake container started ps aux poweroff\n''') os.fchmod(f.fileno(), 0o755) subprocess.check_call(['systemd-machine-id-setup', '--root', kls.c_busybox], stderr=subprocess.PIPE) def setUp(self): self.workdir = tempfile.TemporaryDirectory() def test_boot(self): cont = os.path.join(self.workdir.name, 'c1') shutil.copytree(self.c_busybox, cont, symlinks=True) os.sync() nspawn = subprocess.Popen(['systemd-nspawn', '-D', cont, '-b'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out = nspawn.communicate(timeout=60)[0] self.assertIn(b'Spawning container c1', out) self.assertIn(b'fake container started', out) self.assertRegex(out, b'\n\s+1\s+0\s+init[\r\n]') self.assertRegex(out, b'\n\s+2+\s+0\s.*rcS[\r\n]') self.assertRegex(out, b'Container c1.*shut down') self.assertEqual(nspawn.returncode, 0) def test_service(self): self.assertTrue(os.path.isdir('/var/lib/machines')) cont = '/var/lib/machines/c1' shutil.copytree(self.c_busybox, cont, symlinks=True) self.addCleanup(shutil.rmtree, cont) os.sync() subprocess.check_call(['systemctl', 'start', 'systemd-nspawn@c1']) wait_unit_stop('systemd-nspawn@c1') subprocess.call(['journalctl', '--sync']) systemctl = subprocess.Popen( ['systemctl', 'status', '-overbose', '-l', 'systemd-nspawn@c1'], stdout=subprocess.PIPE) out = systemctl.communicate()[0].decode('UTF-8', 'replace') self.assertEqual(systemctl.returncode, 3, out) self.assertNotIn('failed', out) @unittest.skipUnless(os.path.exists('/sys/kernel/security/apparmor'), 'AppArmor not enabled') class AppArmorTest(unittest.TestCase): def test_profile(self): '''AppArmor confined unit''' # create AppArmor profile aa_profile = tempfile.NamedTemporaryFile(prefix='aa_violator.') aa_profile.write(b'''#include profile "violator-test" { #include /{usr/,}bin/** rix, /etc/machine-id r, } ''') aa_profile.flush() subprocess.check_call(['apparmor_parser', '-r', '-v', aa_profile.name]) # create confined unit with open('/run/systemd/system/violator.service', 'w') as f: f.write('''[Unit] Description=AppArmor test [Service] ExecStart=/bin/sh -euc 'echo CP1; cat /etc/machine-id; echo CP2; if cat /etc/passwd; then exit 1; fi; echo CP3' AppArmorProfile=violator-test ''') self.addCleanup(os.unlink, '/run/systemd/system/violator.service') # launch subprocess.check_call(['systemctl', 'daemon-reload']) subprocess.check_call(['systemctl', 'start', 'violator.service']) wait_unit_stop('violator.service') # check status st = subprocess.Popen(['systemctl', 'status', '-l', 'violator.service'], stdout=subprocess.PIPE, universal_newlines=True) out = st.communicate()[0] # unit should be stopped self.assertEqual(st.returncode, 3) self.assertIn('inactive', out) self.assertIn('CP1', out) self.assertIn('CP2', out) self.assertIn('CP3', out) with open('/etc/machine-id') as f: self.assertIn(f.read().strip(), out) self.assertNotIn('root:x', out, 'unit can read /etc/passwd') @unittest.skipIf(os.path.exists('/sys/fs/cgroup/cgroup.controllers'), 'test needs to be reworked on unified cgroup hierarchy') class CgroupsTest(unittest.TestCase): '''Check cgroup setup''' @classmethod def setUpClass(kls): kls.controllers = [] for controller in glob('/sys/fs/cgroup/*'): if not os.path.islink(controller): kls.controllers.append(controller) def setUp(self): self.service = 'testsrv.service' self.service_file = '/run/systemd/system/' + self.service def tearDown(self): subprocess.call(['systemctl', 'stop', self.service], stderr=subprocess.PIPE) try: os.unlink(self.service_file) except OSError: pass subprocess.check_call(['systemctl', 'daemon-reload']) def create_service(self, extra_service=''): '''Create test service unit''' with open(self.service_file, 'w') as f: f.write('''[Unit] Description=test service [Service] ExecStart=/bin/sleep 500 %s ''' % extra_service) subprocess.check_call(['systemctl', 'daemon-reload']) def assertNoControllers(self): '''Assert that no cgroup controllers exist for test service''' cs = glob('/sys/fs/cgroup/*/system.slice/%s' % self.service) self.assertEqual(cs, []) def assertController(self, name): '''Assert that cgroup controller exists for test service''' c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service) self.assertTrue(os.path.isdir(c)) def assertNoController(self, name): '''Assert that cgroup controller does not exist for test service''' c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service) self.assertFalse(os.path.isdir(c)) def test_simple(self): '''simple service''' self.create_service() self.assertNoControllers() subprocess.check_call(['systemctl', 'start', self.service]) self.assertController('systemd') subprocess.check_call(['systemctl', 'stop', self.service]) self.assertNoControllers() def test_cpushares(self): '''service with CPUShares''' self.create_service('CPUShares=1000') self.assertNoControllers() subprocess.check_call(['systemctl', 'start', self.service]) self.assertController('systemd') self.assertController('cpu,cpuacct') subprocess.check_call(['systemctl', 'stop', self.service]) self.assertNoControllers() class SeccompTest(unittest.TestCase): '''Check seccomp syscall filtering''' def test_failing(self): with open('/run/systemd/system/scfail.service', 'w') as f: f.write('''[Unit] Description=seccomp test [Service] ExecStart=/bin/cat /etc/machine-id SystemCallFilter=access ''') self.addCleanup(os.unlink, '/run/systemd/system/scfail.service') # launch subprocess.check_call(['systemctl', 'daemon-reload']) subprocess.check_call(['systemctl', 'start', 'scfail.service']) wait_unit_stop('scfail.service') # check status st = subprocess.Popen(['systemctl', 'status', '-l', 'scfail.service'], stdout=subprocess.PIPE) out = st.communicate()[0] # unit should be stopped self.assertEqual(st.returncode, 3) subprocess.check_call(['systemctl', 'reset-failed', 'scfail.service']) self.assertIn(b'failed', out) self.assertRegex(out, b'code=(killed|dumped), signal=SYS') with open('/etc/machine-id') as f: self.assertNotIn(f.read().strip().encode('ASCII'), out) @unittest.skipIf(is_container, 'systemd-coredump does not work in containers') class CoredumpTest(unittest.TestCase): '''Check systemd-coredump''' def test_bash_crash(self): subprocess.call("ulimit -c unlimited; bash -c 'kill -SEGV $$'", shell=True, cwd='/tmp', stderr=subprocess.DEVNULL) # with systemd-coredump installed we should get the core dumps in # systemd's dir for timeout in range(50): cores = glob('/var/lib/systemd/coredump/core.bash.*') if cores: break time.sleep(1) self.assertNotEqual(cores, []) self.assertEqual(glob('/tmp/core*'), []) # we should also get a message and stack trace in journal for timeout in range(10): subprocess.call(['journalctl', '--sync']) journal = subprocess.check_output(['journalctl', '-t', 'systemd-coredump']) if re.search(b'Process.*bash.*dumped core', journal) and \ re.search(b'#[0-9] .*bash', journal): break time.sleep(1) self.assertRegex(journal, b'Process.*bash.*dumped core') self.assertIn(b'Stack trace', journal) self.assertRegex(journal, b'#[0-9] .*bash') class CLITest(unittest.TestCase): def setUp(self): self.programs = [] for line in subprocess.check_output(['dpkg', '-L', 'systemd', 'systemd-container', 'systemd-coredump', 'udev'], universal_newlines=True).splitlines(): if '/bin/' in line: self.programs.append(line.strip()) def test_help(self): '--help works and succeeds''' for program in self.programs: p = subprocess.Popen([program, '--help'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (out, err) = p.communicate() try: self.assertEqual(err, '') self.assertEqual(p.returncode, 0) self.assertIn(os.path.basename(program), out) self.assertTrue('--help' in out or 'Usage' in out, out) except AssertionError: print('Failed program: %s' % program) raise def test_version(self): '--version works and succeeds''' version = subprocess.check_output(['pkg-config', '--modversion', 'systemd'], universal_newlines=True).strip() for program in self.programs: # known to not respond to --version if os.path.basename(program) in ['kernel-install', 'systemd-ask-password', 'systemd-stdio-bridge']: continue p = subprocess.Popen([program, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (out, err) = p.communicate() try: self.assertEqual(err, '') self.assertEqual(p.returncode, 0) self.assertIn(version, out) except AssertionError: print('Failed program: %s' % program) raise def test_invalid_option(self): '''Calling with invalid option fails''' for program in self.programs: p = subprocess.Popen([program, '--invalid-option'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) (out, err) = p.communicate() try: # kernel-install is an unique snowflake if not program.endswith('/kernel-install'): self.assertIn('--invalid-option', err) self.assertNotEqual(p.returncode, 0) except AssertionError: print('Failed program: %s' % program) raise def pre_boot_setup(): '''Test setup before rebooting testbed''' subprocess.check_call(['systemctl', 'set-default', 'graphical.target'], stderr=subprocess.STDOUT) # This test installs network-manager, which seems to cause # systemd-networkd-wait-online to be stuck as they conflict, # so systemctl start network-online.target ran by autopkgtest # gets stuck, at least in Debian Bullseye images. # https://salsa.debian.org/ci-team/autopkgtest/-/blob/debian/5.21/virt/autopkgtest-virt-lxc#L131 subprocess.check_call(['systemctl', 'disable', 'systemd-networkd.service'], stderr=subprocess.STDOUT) # create a few temporary files to ensure that they get cleaned up on boot os.close(os.open('/tmp/newfile.test', os.O_CREAT | os.O_EXCL | os.O_WRONLY)) os.close(os.open('/var/tmp/newfile.test', os.O_CREAT | os.O_EXCL | os.O_WRONLY)) # we can't use utime() here, as systemd looks for ctime if not is_container: cur_time = time.clock_gettime(time.CLOCK_REALTIME) time.clock_settime(time.CLOCK_REALTIME, cur_time - 2 * 30 * 86400) try: os.close(os.open('/tmp/oldfile.test', os.O_CREAT | os.O_EXCL | os.O_WRONLY)) os.close(os.open('/var/tmp/oldfile.test', os.O_CREAT | os.O_EXCL | os.O_WRONLY)) finally: time.clock_settime(time.CLOCK_REALTIME, cur_time) # allow X to start even on headless machines os.makedirs('/etc/X11/xorg.conf.d/', exist_ok=True) with open('/etc/X11/xorg.conf.d/dummy.conf', 'w') as f: f.write('''Section "Device" Identifier "test" Driver "dummy" EndSection''') if __name__ == '__main__': if not os.getenv('AUTOPKGTEST_REBOOT_MARK'): pre_boot_setup() print('Rebooting...') subprocess.check_call(['/tmp/autopkgtest-reboot', 'boot1']) unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))