From b750101eb236130cf056c675997decbac904cc49 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 17:35:18 +0200 Subject: Adding upstream version 252.22. Signed-off-by: Daniel Baumann --- test/sysv-generator-test.py | 413 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 413 insertions(+) create mode 100755 test/sysv-generator-test.py (limited to 'test/sysv-generator-test.py') diff --git a/test/sysv-generator-test.py b/test/sysv-generator-test.py new file mode 100755 index 0000000..24fafba --- /dev/null +++ b/test/sysv-generator-test.py @@ -0,0 +1,413 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# systemd-sysv-generator integration test +# +# © 2015 Canonical Ltd. +# Author: Martin Pitt + +import collections +import os +import shutil +import subprocess +import sys +import tempfile +import unittest + +from configparser import RawConfigParser +from glob import glob + +sysv_generator = './systemd-sysv-generator' + +class MultiDict(collections.OrderedDict): + def __setitem__(self, key, value): + if isinstance(value, list) and key in self: + self[key].extend(value) + else: + super(MultiDict, self).__setitem__(key, value) + +class SysvGeneratorTest(unittest.TestCase): + def setUp(self): + self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.') + self.init_d_dir = os.path.join(self.workdir, 'init.d') + os.mkdir(self.init_d_dir) + self.rcnd_dir = self.workdir + self.unit_dir = os.path.join(self.workdir, 'systemd') + os.mkdir(self.unit_dir) + self.out_dir = os.path.join(self.workdir, 'output') + os.mkdir(self.out_dir) + + def tearDown(self): + shutil.rmtree(self.workdir) + + # + # Helper methods + # + + def run_generator(self, expect_error=False): + '''Run sysv-generator. + + Fail if stderr contains any "Fail", unless expect_error is True. + Return (stderr, filename -> ConfigParser) pair with output to stderr and + parsed generated units. + ''' + env = os.environ.copy() + # We might debug log about errors that aren't actually fatal so let's bump the log level to info to + # prevent those logs from interfering with the test. + env['SYSTEMD_LOG_LEVEL'] = 'info' + env['SYSTEMD_LOG_TARGET'] = 'console' + env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir + env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir + env['SYSTEMD_UNIT_PATH'] = self.unit_dir + gen = subprocess.Popen( + [sysv_generator, 'ignored', 'ignored', self.out_dir], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True, env=env) + (out, err) = gen.communicate() + if not expect_error: + self.assertFalse('Fail' in err, err) + self.assertEqual(gen.returncode, 0, err) + + results = {} + for service in glob(self.out_dir + '/*.service'): + if os.path.islink(service): + continue + try: + # for python3 we need here strict=False to parse multiple + # lines with the same key + cp = RawConfigParser(dict_type=MultiDict, strict=False) + except TypeError: + # RawConfigParser in python2 does not have the strict option + # but it allows multiple lines with the same key by default + cp = RawConfigParser(dict_type=MultiDict) + cp.optionxform = lambda o: o # don't lower-case option names + with open(service) as f: + cp.read_file(f) + results[os.path.basename(service)] = cp + + return (err, results) + + def add_sysv(self, fname, keys, enable=False, prio=1): + '''Create a SysV init script with the given keys in the LSB header + + There are sensible default values for all fields. + If enable is True, links will be created in the rcN.d dirs. In that + case, the priority can be given with "prio" (default to 1). + + Return path of generated script. + ''' + name_without_sh = fname.endswith('.sh') and fname[:-3] or fname + keys.setdefault('Provides', name_without_sh) + keys.setdefault('Required-Start', '$local_fs') + keys.setdefault('Required-Stop', keys['Required-Start']) + keys.setdefault('Default-Start', '2 3 4 5') + keys.setdefault('Default-Stop', '0 1 6') + keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh)) + keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh)) + script = os.path.join(self.init_d_dir, fname) + with open(script, 'w') as f: + f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n') + for k, v in keys.items(): + if v is not None: + f.write('#{:>20} {}\n'.format(k + ':', v)) + f.write('### END INIT INFO\ncode --goes here\n') + os.chmod(script, 0o755) + + if enable: + def make_link(prefix, runlevel): + d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel)) + if not os.path.isdir(d): + os.mkdir(d) + os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname)) + + for rl in keys['Default-Start'].split(): + make_link('S%02i' % prio, rl) + for rl in keys['Default-Stop'].split(): + make_link('K%02i' % (99 - prio), rl) + + return script + + def assert_enabled(self, unit, targets): + '''assert that a unit is enabled in precisely the given targets''' + + all_targets = ['multi-user', 'graphical'] + + # should be enabled + for target in all_targets: + link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit) + if target in targets: + unit_file = os.readlink(link) + # os.path.exists() will fail on a dangling symlink + self.assertTrue(os.path.exists(link)) + self.assertEqual(os.path.basename(unit_file), unit) + else: + self.assertFalse(os.path.exists(link), + '{} unexpectedly exists'.format(link)) + + # + # test cases + # + + def test_nothing(self): + '''no input files''' + + results = self.run_generator()[1] + self.assertEqual(results, {}) + self.assertEqual(os.listdir(self.out_dir), []) + + def test_simple_disabled(self): + '''simple service without dependencies, disabled''' + + self.add_sysv('foo', {}, enable=False) + err, results = self.run_generator() + self.assertEqual(len(results), 1) + + # no enablement links or other stuff + self.assertEqual(os.listdir(self.out_dir), ['foo.service']) + + s = results['foo.service'] + self.assertEqual(s.sections(), ['Unit', 'Service']) + self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') + # $local_fs does not need translation, don't expect any dependency + # fields here + self.assertEqual(set(s.options('Unit')), + set(['Documentation', 'SourcePath', 'Description'])) + + self.assertEqual(s.get('Service', 'Type'), 'forking') + init_script = os.path.join(self.init_d_dir, 'foo') + self.assertEqual(s.get('Service', 'ExecStart'), + '{} start'.format(init_script)) + self.assertEqual(s.get('Service', 'ExecStop'), + '{} stop'.format(init_script)) + + self.assertNotIn('Overwriting', err) + + def test_simple_enabled_all(self): + '''simple service without dependencies, enabled in all runlevels''' + + self.add_sysv('foo', {}, enable=True) + err, results = self.run_generator() + self.assertEqual(list(results), ['foo.service']) + self.assert_enabled('foo.service', ['multi-user', 'graphical']) + self.assertNotIn('Overwriting', err) + + def test_simple_escaped(self): + '''simple service without dependencies, that requires escaping the name''' + + self.add_sysv('foo+', {}) + self.add_sysv('foo-admin', {}) + err, results = self.run_generator() + self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'}) + self.assertNotIn('Overwriting', err) + + def test_simple_enabled_some(self): + '''simple service without dependencies, enabled in some runlevels''' + + self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True) + err, results = self.run_generator() + self.assertEqual(list(results), ['foo.service']) + self.assert_enabled('foo.service', ['multi-user']) + + def test_lsb_macro_dep_single(self): + '''single LSB macro dependency: $network''' + + self.add_sysv('foo', {'Required-Start': '$network'}) + s = self.run_generator()[1]['foo.service'] + self.assertEqual(set(s.options('Unit')), + set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants'])) + self.assertEqual(s.get('Unit', 'After'), 'network-online.target') + self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target') + + def test_lsb_macro_dep_multi(self): + '''multiple LSB macro dependencies''' + + self.add_sysv('foo', {'Required-Start': '$named $portmap'}) + s = self.run_generator()[1]['foo.service'] + self.assertEqual(set(s.options('Unit')), + set(['Documentation', 'SourcePath', 'Description', 'After'])) + self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target']) + + def test_lsb_deps(self): + '''LSB header dependencies to other services''' + + # also give symlink priorities here; they should be ignored + self.add_sysv('foo', {'Required-Start': 'must1 must2', + 'Should-Start': 'may1 ne_may2'}, + enable=True, prio=40) + self.add_sysv('must1', {}, enable=True, prio=10) + self.add_sysv('must2', {}, enable=True, prio=15) + self.add_sysv('may1', {}, enable=True, prio=20) + # do not create ne_may2 + err, results = self.run_generator() + self.assertEqual(sorted(results), + ['foo.service', 'may1.service', 'must1.service', 'must2.service']) + + # foo should depend on all of them + self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()), + ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service']) + + # other services should not depend on each other + self.assertFalse(results['must1.service'].has_option('Unit', 'After')) + self.assertFalse(results['must2.service'].has_option('Unit', 'After')) + self.assertFalse(results['may1.service'].has_option('Unit', 'After')) + + def test_symlink_prio_deps(self): + '''script without LSB headers use rcN.d priority''' + + # create two init.d scripts without LSB header and enable them with + # startup priorities + for prio, name in [(10, 'provider'), (15, 'consumer')]: + with open(os.path.join(self.init_d_dir, name), 'w') as f: + f.write('#!/bin/init-d-interpreter\ncode --goes here\n') + os.fchmod(f.fileno(), 0o755) + + d = os.path.join(self.rcnd_dir, 'rc2.d') + if not os.path.isdir(d): + os.mkdir(d) + os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name))) + + err, results = self.run_generator() + self.assertEqual(sorted(results), ['consumer.service', 'provider.service']) + self.assertFalse(results['provider.service'].has_option('Unit', 'After')) + self.assertEqual(results['consumer.service'].get('Unit', 'After'), + 'provider.service') + + def test_multiple_provides(self): + '''multiple Provides: names''' + + self.add_sysv('foo', {'Provides': 'foo bar baz'}) + err, results = self.run_generator() + self.assertEqual(list(results), ['foo.service']) + self.assertEqual(set(results['foo.service'].options('Unit')), + set(['Documentation', 'SourcePath', 'Description'])) + # should create symlinks for the alternative names + for f in ['bar.service', 'baz.service']: + self.assertEqual(os.readlink(os.path.join(self.out_dir, f)), + 'foo.service') + self.assertNotIn('Overwriting', err) + + def test_provides_escaped(self): + '''a script that Provides: a name that requires escaping''' + + self.add_sysv('foo', {'Provides': 'foo foo+'}) + err, results = self.run_generator() + self.assertEqual(list(results), ['foo.service']) + self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')), + 'foo.service') + self.assertNotIn('Overwriting', err) + + def test_same_provides_in_multiple_scripts(self): + '''multiple init.d scripts provide the same name''' + + self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1) + self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2) + err, results = self.run_generator() + self.assertEqual(sorted(results), ['bar.service', 'foo.service']) + # should create symlink for the alternative name for either unit + self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')), + ['foo.service', 'bar.service']) + + def test_provide_other_script(self): + '''init.d scripts provides the name of another init.d script''' + + self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True) + self.add_sysv('bar', {'Provides': 'bar'}, enable=True) + err, results = self.run_generator() + self.assertEqual(sorted(results), ['bar.service', 'foo.service']) + # we do expect an overwrite here, bar.service should overwrite the + # alias link from foo.service + self.assertIn('Overwriting', err) + + def test_nonexecutable_script(self): + '''ignores non-executable init.d script''' + + os.chmod(self.add_sysv('foo', {}), 0o644) + err, results = self.run_generator() + self.assertEqual(results, {}) + + def test_sh_suffix(self): + '''init.d script with .sh suffix''' + + self.add_sysv('foo.sh', {}, enable=True) + err, results = self.run_generator() + s = results['foo.service'] + + self.assertEqual(s.sections(), ['Unit', 'Service']) + # should not have a .sh + self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') + + # calls correct script with .sh + init_script = os.path.join(self.init_d_dir, 'foo.sh') + self.assertEqual(s.get('Service', 'ExecStart'), + '{} start'.format(init_script)) + self.assertEqual(s.get('Service', 'ExecStop'), + '{} stop'.format(init_script)) + + self.assert_enabled('foo.service', ['multi-user', 'graphical']) + + def test_sh_suffix_with_provides(self): + '''init.d script with .sh suffix and Provides:''' + + self.add_sysv('foo.sh', {'Provides': 'foo bar'}) + err, results = self.run_generator() + # ensure we don't try to create a symlink to itself + self.assertNotIn('itself', err) + self.assertEqual(list(results), ['foo.service']) + self.assertEqual(results['foo.service'].get('Unit', 'Description'), + 'LSB: test foo service') + + # should create symlink for the alternative name + self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')), + 'foo.service') + + def test_hidden_files(self): + '''init.d script with hidden file suffix''' + + script = self.add_sysv('foo', {}, enable=True) + # backup files (not enabled in rcN.d/) + shutil.copy(script, script + '.dpkg-new') + shutil.copy(script, script + '.dpkg-dist') + shutil.copy(script, script + '.swp') + shutil.copy(script, script + '.rpmsave') + + err, results = self.run_generator() + self.assertEqual(list(results), ['foo.service']) + + self.assert_enabled('foo.service', ['multi-user', 'graphical']) + + def test_backup_file(self): + '''init.d script with backup file''' + + script = self.add_sysv('foo', {}, enable=True) + # backup files (not enabled in rcN.d/) + shutil.copy(script, script + '.bak') + shutil.copy(script, script + '.old') + shutil.copy(script, script + '.tmp') + shutil.copy(script, script + '.new') + + err, results = self.run_generator() + print(err) + self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service']) + + # ensure we don't try to create a symlink to itself + self.assertNotIn('itself', err) + + self.assert_enabled('foo.service', ['multi-user', 'graphical']) + self.assert_enabled('foo.bak.service', []) + self.assert_enabled('foo.old.service', []) + + def test_existing_native_unit(self): + '''existing native unit''' + + with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f: + f.write('[Unit]\n') + + self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True) + err, results = self.run_generator() + self.assertEqual(list(results), []) + # no enablement or alias links, as native unit is disabled + self.assertEqual(os.listdir(self.out_dir), []) + + +if __name__ == '__main__': + unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) -- cgit v1.2.3