summaryrefslogtreecommitdiffstats
path: root/debian/tests/boot-and-services
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 13:00:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 13:00:48 +0000
commitf542925b701989ba6eed7b08b5226d4021b9b85f (patch)
tree57e14731f21a6d663326d30b7b88736e9d51c420 /debian/tests/boot-and-services
parentAdding upstream version 247.3. (diff)
downloadsystemd-8cfa7e7e684437c19d5798bdcb7baba512dd8f9e.tar.xz
systemd-8cfa7e7e684437c19d5798bdcb7baba512dd8f9e.zip
Adding debian version 247.3-7+deb11u4.debian/247.3-7+deb11u4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'debian/tests/boot-and-services')
-rwxr-xr-xdebian/tests/boot-and-services558
1 files changed, 558 insertions, 0 deletions
diff --git a/debian/tests/boot-and-services b/debian/tests/boot-and-services
new file mode 100755
index 0000000..5ab9135
--- /dev/null
+++ b/debian/tests/boot-and-services
@@ -0,0 +1,558 @@
+#!/usr/bin/python3
+# autopkgtest check: Boot with systemd and check critical desktop services
+# (C) 2014 Canonical Ltd.
+# Author: Martin Pitt <martin.pitt@ubuntu.com>
+
+import sys
+import os
+import unittest
+import subprocess
+import tempfile
+import shutil
+import time
+import re
+from glob import glob
+
+is_container = subprocess.call(['systemd-detect-virt', '--container']) == 0
+
+
+def wait_unit_stop(unit, timeout=10):
+ '''Wait until given unit is not running any more
+
+ Raise RuntimeError on timeout.
+ '''
+ for i in range(timeout):
+ if subprocess.call(['systemctl', 'is-active', '--quiet', unit]) != 0:
+ return
+ time.sleep(1)
+
+ raise RuntimeError('Timed out waiting for %s to stop' % unit)
+
+
+class ServicesTest(unittest.TestCase):
+ '''Check that expected services are running'''
+
+ def test_0_init(self):
+ '''Verify that init is systemd'''
+
+ self.assertIn('systemd', os.readlink('/proc/1/exe'))
+
+ def test_no_failed(self):
+ '''No failed units'''
+
+ out = subprocess.check_output(['systemctl', '--state=failed', '--no-legend'],
+ universal_newlines=True)
+ failed = out.splitlines()
+ # ignore /etc/modules failure as stuff that we put there by default
+ # often fails
+ failed = [f for f in failed if 'systemd-modules-load' not in f]
+ # apparmor fails if not enabled in the kernel
+ if not os.path.exists('/sys/kernel/security/apparmor'):
+ failed = [f for f in failed if 'apparmor.service' not in f]
+ # ignore thermald as it doesn't start in most virtual envs
+ failed = [f for f in failed if 'thermald' not in f]
+ # console-setup.service fails on devices without keyboard (LP: #1516591)
+ failed = [f for f in failed if 'console-setup' not in f]
+ # cpi.service fails on s390x
+ failed = [f for f in failed if 'cpi.service' not in f]
+ # https://bugs.debian.org/926138
+ if is_container:
+ failed = [f for f in failed if 'e2scrub_reap.service' not in f]
+ if failed:
+ for f in failed:
+ f = f.split()[0]
+ print('-------- journal for failed service %s -----------' % f)
+ sys.stdout.flush()
+ subprocess.call(['journalctl', '-b', '-u', f])
+ self.assertEqual(failed, [])
+
+ @unittest.skipUnless(shutil.which('gdm3') is not None, 'gdm3 not found')
+ def test_gdm3(self):
+ subprocess.check_call(['pgrep', '-af', '/gdm[-3]'])
+ self.active_unit('gdm')
+
+ def test_dbus(self):
+ out = subprocess.check_output(
+ ['dbus-send', '--print-reply', '--system',
+ '--dest=org.freedesktop.DBus', '/', 'org.freedesktop.DBus.GetId'])
+ self.assertIn(b'string "', out)
+ self.active_unit('dbus')
+
+ def test_network_manager(self):
+ # 0.9.10 changed the command name
+ _help = subprocess.check_output(['nmcli', '--help'],
+ stderr=subprocess.STDOUT)
+ if b' g[eneral]' in _help:
+ out = subprocess.check_output(['nmcli', 'general'])
+ else:
+ out = subprocess.check_output(['nmcli', 'nm'])
+ self.assertIn(b'enabled', out)
+ self.active_unit('NetworkManager')
+
+ def test_cron(self):
+ out = subprocess.check_output(['ps', 'u', '-C', 'cron'])
+ self.assertIn(b'root', out)
+ self.active_unit('cron')
+
+ def test_logind(self):
+ out = subprocess.check_output(['loginctl'])
+ self.assertNotEqual(b'', out)
+ self.active_unit('systemd-logind')
+
+ @unittest.skipIf('TEST_UPSTREAM' in os.environ,
+ 'Forwarding to rsyslog is a Debian patch')
+ def test_rsyslog(self):
+ out = subprocess.check_output(['ps', 'u', '-C', 'rsyslogd'])
+ self.assertIn(b'bin/rsyslogd', out)
+ self.active_unit('rsyslog')
+ with open('/var/log/syslog') as f:
+ log = f.read()
+ if not is_container:
+ # has kernel messages
+ self.assertRegex(log, 'kernel:.*')
+ # has init messages
+ self.assertRegex(log, 'systemd.*Reached target Graphical Interface')
+ # has other services
+ self.assertRegex(log, 'NetworkManager.*:')
+
+ @unittest.skipIf(is_container, 'udev does not work in containers')
+ def test_udev(self):
+ out = subprocess.check_output(['udevadm', 'info', '--export-db'])
+ self.assertIn(b'\nP: /devices/', out)
+ self.active_unit('systemd-udevd')
+
+ def test_tmp_mount(self):
+ # check if we want to mount /tmp in fstab
+ want_tmp_mount = False
+ try:
+ with open('/etc/fstab') as f:
+ for l in f:
+ try:
+ if not l.startswith('#') and l.split()[1] in ('/tmp', '/tmp/'):
+ want_tmp_mount = True
+ break
+ except IndexError:
+ pass
+ except FileNotFoundError:
+ pass
+
+ # ensure that we actually do/don't have a /tmp mount
+ (status, status_out) = subprocess.getstatusoutput('systemctl status tmp.mount')
+ findmnt = subprocess.call(['findmnt', '-n', '/tmp'], stdout=subprocess.PIPE)
+ if want_tmp_mount:
+ self.assertEqual(status, 0, status_out)
+ self.assertEqual(findmnt, 0)
+ else:
+ # 4 is correct (since upstream commit ca473d57), accept 3 for systemd <= 230
+ self.assertIn(status, [3, 4], status_out)
+ self.assertNotEqual(findmnt, 0)
+
+ @unittest.skipIf('TEST_UPSTREAM' in os.environ,
+ 'Debian specific configuration, N/A for upstream')
+ def test_tmp_cleanup(self):
+ # systemd-tmpfiles-clean.timer only runs 15 mins after boot, shortcut
+ # it
+ self.assertEqual(subprocess.call(
+ ['systemctl', 'status', 'systemd-tmpfiles-clean.timer'],
+ stdout=subprocess.PIPE), 0)
+ subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean'])
+ if not is_container:
+ # all files in /tmp/ should get cleaned up on boot
+ self.assertFalse(os.path.exists('/tmp/oldfile.test'))
+ self.assertFalse(os.path.exists('/tmp/newfile.test'))
+ # files in /var/tmp/ older than 30d should get cleaned up
+ # XXX FIXME: /var/tmp/ cleanup was disabled in #675422
+ # if not is_container:
+ # self.assertFalse(os.path.exists('/var/tmp/oldfile.test'))
+ self.assertTrue(os.path.exists('/var/tmp/newfile.test'))
+
+ # next run should leave the recent ones
+ os.close(os.open('/tmp/newfile.test',
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY))
+ subprocess.check_call(['systemctl', 'start', 'systemd-tmpfiles-clean'])
+ wait_unit_stop('systemd-tmpfiles-clean')
+ self.assertTrue(os.path.exists('/tmp/newfile.test'))
+
+ # Helper methods
+
+ def active_unit(self, unit):
+ '''Check that given unit is active'''
+
+ out = subprocess.check_output(['systemctl', 'status', unit])
+ self.assertIn(b'active (running)', out)
+
+
+class JournalTest(unittest.TestCase):
+ '''Check journal functionality'''
+
+ def test_no_options(self):
+ out = subprocess.check_output(['journalctl'])
+ if not is_container:
+ # has kernel messages
+ self.assertRegex(out, b'kernel:.*')
+ # has init messages
+ self.assertRegex(out, b'systemd.*Reached target Graphical Interface')
+ # has other services
+ self.assertRegex(out, b'NetworkManager.*:.*starting')
+
+ def test_log_for_service(self):
+ out = subprocess.check_output(
+ ['journalctl', '_SYSTEMD_UNIT=NetworkManager.service'])
+ self.assertRegex(out, b'NetworkManager.*:.*starting')
+ self.assertNotIn(b'kernel:', out)
+ self.assertNotIn(b'systemd:', out)
+
+
+@unittest.skipIf(is_container, 'nspawn does not work in most containers')
+class NspawnTest(unittest.TestCase):
+ '''Check nspawn'''
+
+ @classmethod
+ def setUpClass(kls):
+ '''Build a bootable busybox mini-container'''
+
+ kls.td_c_busybox = tempfile.TemporaryDirectory(prefix='c_busybox.')
+ kls.c_busybox = kls.td_c_busybox.name
+ for d in ['etc/init.d', 'bin', 'sbin']:
+ os.makedirs(os.path.join(kls.c_busybox, d))
+ shutil.copy('/bin/busybox', os.path.join(kls.c_busybox, 'bin'))
+ shutil.copy('/etc/os-release', os.path.join(kls.c_busybox, 'etc'))
+ os.symlink('busybox', os.path.join(kls.c_busybox, 'bin', 'sh'))
+ os.symlink('../bin/busybox', os.path.join(kls.c_busybox, 'sbin/init'))
+ with open(os.path.join(kls.c_busybox, 'etc/init.d/rcS'), 'w') as f:
+ f.write('''#!/bin/sh
+echo fake container started
+ps aux
+poweroff\n''')
+ os.fchmod(f.fileno(), 0o755)
+ subprocess.check_call(['systemd-machine-id-setup', '--root',
+ kls.c_busybox], stderr=subprocess.PIPE)
+
+ def setUp(self):
+ self.workdir = tempfile.TemporaryDirectory()
+
+ def test_boot(self):
+ cont = os.path.join(self.workdir.name, 'c1')
+ shutil.copytree(self.c_busybox, cont, symlinks=True)
+ os.sync()
+ nspawn = subprocess.Popen(['systemd-nspawn', '-D', cont, '-b'],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ out = nspawn.communicate(timeout=60)[0]
+ self.assertIn(b'Spawning container c1', out)
+ self.assertIn(b'fake container started', out)
+ self.assertRegex(out, b'\n\s+1\s+0\s+init[\r\n]')
+ self.assertRegex(out, b'\n\s+2+\s+0\s.*rcS[\r\n]')
+ self.assertRegex(out, b'Container c1.*shut down')
+ self.assertEqual(nspawn.returncode, 0)
+
+ def test_service(self):
+ self.assertTrue(os.path.isdir('/var/lib/machines'))
+ cont = '/var/lib/machines/c1'
+ shutil.copytree(self.c_busybox, cont, symlinks=True)
+ self.addCleanup(shutil.rmtree, cont)
+ os.sync()
+ subprocess.check_call(['systemctl', 'start', 'systemd-nspawn@c1'])
+ wait_unit_stop('systemd-nspawn@c1')
+
+ subprocess.call(['journalctl', '--sync'])
+ systemctl = subprocess.Popen(
+ ['systemctl', 'status', '-overbose', '-l', 'systemd-nspawn@c1'],
+ stdout=subprocess.PIPE)
+ out = systemctl.communicate()[0].decode('UTF-8', 'replace')
+ self.assertEqual(systemctl.returncode, 3, out)
+ self.assertNotIn('failed', out)
+
+
+@unittest.skipUnless(os.path.exists('/sys/kernel/security/apparmor'),
+ 'AppArmor not enabled')
+class AppArmorTest(unittest.TestCase):
+ def test_profile(self):
+ '''AppArmor confined unit'''
+
+ # create AppArmor profile
+ aa_profile = tempfile.NamedTemporaryFile(prefix='aa_violator.')
+ aa_profile.write(b'''#include <tunables/global>
+
+profile "violator-test" {
+ #include <abstractions/base>
+
+ /{usr/,}bin/** rix,
+ /etc/machine-id r,
+}
+''')
+ aa_profile.flush()
+ subprocess.check_call(['apparmor_parser', '-r', '-v', aa_profile.name])
+
+ # create confined unit
+ with open('/run/systemd/system/violator.service', 'w') as f:
+ f.write('''[Unit]
+Description=AppArmor test
+
+[Service]
+ExecStart=/bin/sh -euc 'echo CP1; cat /etc/machine-id; echo CP2; if cat /etc/passwd; then exit 1; fi; echo CP3'
+AppArmorProfile=violator-test
+''')
+ self.addCleanup(os.unlink, '/run/systemd/system/violator.service')
+
+ # launch
+ subprocess.check_call(['systemctl', 'daemon-reload'])
+ subprocess.check_call(['systemctl', 'start', 'violator.service'])
+ wait_unit_stop('violator.service')
+
+ # check status
+ st = subprocess.Popen(['systemctl', 'status', '-l',
+ 'violator.service'], stdout=subprocess.PIPE,
+ universal_newlines=True)
+ out = st.communicate()[0]
+ # unit should be stopped
+ self.assertEqual(st.returncode, 3)
+
+ self.assertIn('inactive', out)
+ self.assertIn('CP1', out)
+ self.assertIn('CP2', out)
+ self.assertIn('CP3', out)
+ with open('/etc/machine-id') as f:
+ self.assertIn(f.read().strip(), out)
+ self.assertNotIn('root:x', out, 'unit can read /etc/passwd')
+
+
+@unittest.skipIf(os.path.exists('/sys/fs/cgroup/cgroup.controllers'),
+ 'test needs to be reworked on unified cgroup hierarchy')
+class CgroupsTest(unittest.TestCase):
+ '''Check cgroup setup'''
+
+ @classmethod
+ def setUpClass(kls):
+ kls.controllers = []
+ for controller in glob('/sys/fs/cgroup/*'):
+ if not os.path.islink(controller):
+ kls.controllers.append(controller)
+
+ def setUp(self):
+ self.service = 'testsrv.service'
+ self.service_file = '/run/systemd/system/' + self.service
+
+ def tearDown(self):
+ subprocess.call(['systemctl', 'stop', self.service],
+ stderr=subprocess.PIPE)
+ try:
+ os.unlink(self.service_file)
+ except OSError:
+ pass
+ subprocess.check_call(['systemctl', 'daemon-reload'])
+
+ def create_service(self, extra_service=''):
+ '''Create test service unit'''
+
+ with open(self.service_file, 'w') as f:
+ f.write('''[Unit]
+Description=test service
+[Service]
+ExecStart=/bin/sleep 500
+%s
+''' % extra_service)
+ subprocess.check_call(['systemctl', 'daemon-reload'])
+
+ def assertNoControllers(self):
+ '''Assert that no cgroup controllers exist for test service'''
+
+ cs = glob('/sys/fs/cgroup/*/system.slice/%s' % self.service)
+ self.assertEqual(cs, [])
+
+ def assertController(self, name):
+ '''Assert that cgroup controller exists for test service'''
+
+ c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service)
+ self.assertTrue(os.path.isdir(c))
+
+ def assertNoController(self, name):
+ '''Assert that cgroup controller does not exist for test service'''
+
+ c = '/sys/fs/cgroup/%s/system.slice/%s' % (name, self.service)
+ self.assertFalse(os.path.isdir(c))
+
+ def test_simple(self):
+ '''simple service'''
+
+ self.create_service()
+ self.assertNoControllers()
+ subprocess.check_call(['systemctl', 'start', self.service])
+ self.assertController('systemd')
+ subprocess.check_call(['systemctl', 'stop', self.service])
+ self.assertNoControllers()
+
+ def test_cpushares(self):
+ '''service with CPUShares'''
+
+ self.create_service('CPUShares=1000')
+ self.assertNoControllers()
+ subprocess.check_call(['systemctl', 'start', self.service])
+ self.assertController('systemd')
+ self.assertController('cpu,cpuacct')
+ subprocess.check_call(['systemctl', 'stop', self.service])
+ self.assertNoControllers()
+
+
+class SeccompTest(unittest.TestCase):
+ '''Check seccomp syscall filtering'''
+
+ def test_failing(self):
+ with open('/run/systemd/system/scfail.service', 'w') as f:
+ f.write('''[Unit]
+Description=seccomp test
+[Service]
+ExecStart=/bin/cat /etc/machine-id
+SystemCallFilter=access
+''')
+ self.addCleanup(os.unlink, '/run/systemd/system/scfail.service')
+
+ # launch
+ subprocess.check_call(['systemctl', 'daemon-reload'])
+ subprocess.check_call(['systemctl', 'start', 'scfail.service'])
+ wait_unit_stop('scfail.service')
+
+ # check status
+ st = subprocess.Popen(['systemctl', 'status', '-l',
+ 'scfail.service'], stdout=subprocess.PIPE)
+ out = st.communicate()[0]
+ # unit should be stopped
+ self.assertEqual(st.returncode, 3)
+
+ subprocess.check_call(['systemctl', 'reset-failed', 'scfail.service'])
+
+ self.assertIn(b'failed', out)
+ self.assertRegex(out, b'code=(killed|dumped), signal=SYS')
+ with open('/etc/machine-id') as f:
+ self.assertNotIn(f.read().strip().encode('ASCII'), out)
+
+
+@unittest.skipIf(is_container, 'systemd-coredump does not work in containers')
+class CoredumpTest(unittest.TestCase):
+ '''Check systemd-coredump'''
+
+ def test_bash_crash(self):
+ subprocess.call("ulimit -c unlimited; bash -c 'kill -SEGV $$'", shell=True,
+ cwd='/tmp', stderr=subprocess.DEVNULL)
+
+ # with systemd-coredump installed we should get the core dumps in
+ # systemd's dir
+ for timeout in range(50):
+ cores = glob('/var/lib/systemd/coredump/core.bash.*')
+ if cores:
+ break
+ time.sleep(1)
+ self.assertNotEqual(cores, [])
+ self.assertEqual(glob('/tmp/core*'), [])
+
+ # we should also get a message and stack trace in journal
+ for timeout in range(10):
+ subprocess.call(['journalctl', '--sync'])
+ journal = subprocess.check_output(['journalctl', '-t', 'systemd-coredump'])
+ if re.search(b'Process.*bash.*dumped core', journal) and \
+ re.search(b'#[0-9] .*bash', journal):
+ break
+ time.sleep(1)
+ self.assertRegex(journal, b'Process.*bash.*dumped core')
+ self.assertIn(b'Stack trace', journal)
+ self.assertRegex(journal, b'#[0-9] .*bash')
+
+
+class CLITest(unittest.TestCase):
+ def setUp(self):
+ self.programs = []
+ for line in subprocess.check_output(['dpkg', '-L', 'systemd', 'systemd-container', 'systemd-coredump', 'udev'],
+ universal_newlines=True).splitlines():
+ if '/bin/' in line:
+ self.programs.append(line.strip())
+
+ def test_help(self):
+ '--help works and succeeds'''
+
+ for program in self.programs:
+ p = subprocess.Popen([program, '--help'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
+ (out, err) = p.communicate()
+ try:
+ self.assertEqual(err, '')
+ self.assertEqual(p.returncode, 0)
+ self.assertIn(os.path.basename(program), out)
+ self.assertTrue('--help' in out or 'Usage' in out, out)
+ except AssertionError:
+ print('Failed program: %s' % program)
+ raise
+
+ def test_version(self):
+ '--version works and succeeds'''
+
+ version = subprocess.check_output(['pkg-config', '--modversion', 'systemd'],
+ universal_newlines=True).strip()
+
+ for program in self.programs:
+ # known to not respond to --version
+ if os.path.basename(program) in ['kernel-install', 'systemd-ask-password', 'systemd-stdio-bridge']:
+ continue
+ p = subprocess.Popen([program, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
+ (out, err) = p.communicate()
+ try:
+ self.assertEqual(err, '')
+ self.assertEqual(p.returncode, 0)
+ self.assertIn(version, out)
+ except AssertionError:
+ print('Failed program: %s' % program)
+ raise
+
+ def test_invalid_option(self):
+ '''Calling with invalid option fails'''
+
+ for program in self.programs:
+ p = subprocess.Popen([program, '--invalid-option'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
+ (out, err) = p.communicate()
+ try:
+ # kernel-install is an unique snowflake
+ if not program.endswith('/kernel-install'):
+ self.assertIn('--invalid-option', err)
+ self.assertNotEqual(p.returncode, 0)
+ except AssertionError:
+ print('Failed program: %s' % program)
+ raise
+
+
+def pre_boot_setup():
+ '''Test setup before rebooting testbed'''
+
+ subprocess.check_call(['systemctl', 'set-default', 'graphical.target'],
+ stderr=subprocess.STDOUT)
+
+ # create a few temporary files to ensure that they get cleaned up on boot
+ os.close(os.open('/tmp/newfile.test',
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY))
+ os.close(os.open('/var/tmp/newfile.test',
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY))
+ # we can't use utime() here, as systemd looks for ctime
+ if not is_container:
+ cur_time = time.clock_gettime(time.CLOCK_REALTIME)
+ time.clock_settime(time.CLOCK_REALTIME, cur_time - 2 * 30 * 86400)
+ try:
+ os.close(os.open('/tmp/oldfile.test',
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY))
+ os.close(os.open('/var/tmp/oldfile.test',
+ os.O_CREAT | os.O_EXCL | os.O_WRONLY))
+ finally:
+ time.clock_settime(time.CLOCK_REALTIME, cur_time)
+
+ # allow X to start even on headless machines
+ os.makedirs('/etc/X11/xorg.conf.d/', exist_ok=True)
+ with open('/etc/X11/xorg.conf.d/dummy.conf', 'w') as f:
+ f.write('''Section "Device"
+ Identifier "test"
+ Driver "dummy"
+EndSection''')
+
+
+if __name__ == '__main__':
+ if not os.getenv('AUTOPKGTEST_REBOOT_MARK'):
+ pre_boot_setup()
+ print('Rebooting...')
+ subprocess.check_call(['/tmp/autopkgtest-reboot', 'boot1'])
+
+ unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
+ verbosity=2))