summaryrefslogtreecommitdiffstats
path: root/test/sysv-generator-test.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 20:49:52 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 20:49:52 +0000
commit55944e5e40b1be2afc4855d8d2baf4b73d1876b5 (patch)
tree33f869f55a1b149e9b7c2b7e201867ca5dd52992 /test/sysv-generator-test.py
parentInitial commit. (diff)
downloadsystemd-55944e5e40b1be2afc4855d8d2baf4b73d1876b5.tar.xz
systemd-55944e5e40b1be2afc4855d8d2baf4b73d1876b5.zip
Adding upstream version 255.4.upstream/255.4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/sysv-generator-test.py')
-rwxr-xr-xtest/sysv-generator-test.py413
1 files changed, 413 insertions, 0 deletions
diff --git a/test/sysv-generator-test.py b/test/sysv-generator-test.py
new file mode 100755
index 0000000..24fafba
--- /dev/null
+++ b/test/sysv-generator-test.py
@@ -0,0 +1,413 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: LGPL-2.1-or-later
+#
+# systemd-sysv-generator integration test
+#
+# © 2015 Canonical Ltd.
+# Author: Martin Pitt <martin.pitt@ubuntu.com>
+
+import collections
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+import unittest
+
+from configparser import RawConfigParser
+from glob import glob
+
+sysv_generator = './systemd-sysv-generator'
+
+class MultiDict(collections.OrderedDict):
+ def __setitem__(self, key, value):
+ if isinstance(value, list) and key in self:
+ self[key].extend(value)
+ else:
+ super(MultiDict, self).__setitem__(key, value)
+
+class SysvGeneratorTest(unittest.TestCase):
+ def setUp(self):
+ self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
+ self.init_d_dir = os.path.join(self.workdir, 'init.d')
+ os.mkdir(self.init_d_dir)
+ self.rcnd_dir = self.workdir
+ self.unit_dir = os.path.join(self.workdir, 'systemd')
+ os.mkdir(self.unit_dir)
+ self.out_dir = os.path.join(self.workdir, 'output')
+ os.mkdir(self.out_dir)
+
+ def tearDown(self):
+ shutil.rmtree(self.workdir)
+
+ #
+ # Helper methods
+ #
+
+ def run_generator(self, expect_error=False):
+ '''Run sysv-generator.
+
+ Fail if stderr contains any "Fail", unless expect_error is True.
+ Return (stderr, filename -> ConfigParser) pair with output to stderr and
+ parsed generated units.
+ '''
+ env = os.environ.copy()
+ # We might debug log about errors that aren't actually fatal so let's bump the log level to info to
+ # prevent those logs from interfering with the test.
+ env['SYSTEMD_LOG_LEVEL'] = 'info'
+ env['SYSTEMD_LOG_TARGET'] = 'console'
+ env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
+ env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
+ env['SYSTEMD_UNIT_PATH'] = self.unit_dir
+ gen = subprocess.Popen(
+ [sysv_generator, 'ignored', 'ignored', self.out_dir],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True, env=env)
+ (out, err) = gen.communicate()
+ if not expect_error:
+ self.assertFalse('Fail' in err, err)
+ self.assertEqual(gen.returncode, 0, err)
+
+ results = {}
+ for service in glob(self.out_dir + '/*.service'):
+ if os.path.islink(service):
+ continue
+ try:
+ # for python3 we need here strict=False to parse multiple
+ # lines with the same key
+ cp = RawConfigParser(dict_type=MultiDict, strict=False)
+ except TypeError:
+ # RawConfigParser in python2 does not have the strict option
+ # but it allows multiple lines with the same key by default
+ cp = RawConfigParser(dict_type=MultiDict)
+ cp.optionxform = lambda o: o # don't lower-case option names
+ with open(service) as f:
+ cp.read_file(f)
+ results[os.path.basename(service)] = cp
+
+ return (err, results)
+
+ def add_sysv(self, fname, keys, enable=False, prio=1):
+ '''Create a SysV init script with the given keys in the LSB header
+
+ There are sensible default values for all fields.
+ If enable is True, links will be created in the rcN.d dirs. In that
+ case, the priority can be given with "prio" (default to 1).
+
+ Return path of generated script.
+ '''
+ name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
+ keys.setdefault('Provides', name_without_sh)
+ keys.setdefault('Required-Start', '$local_fs')
+ keys.setdefault('Required-Stop', keys['Required-Start'])
+ keys.setdefault('Default-Start', '2 3 4 5')
+ keys.setdefault('Default-Stop', '0 1 6')
+ keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh))
+ keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh))
+ script = os.path.join(self.init_d_dir, fname)
+ with open(script, 'w') as f:
+ f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
+ for k, v in keys.items():
+ if v is not None:
+ f.write('#{:>20} {}\n'.format(k + ':', v))
+ f.write('### END INIT INFO\ncode --goes here\n')
+ os.chmod(script, 0o755)
+
+ if enable:
+ def make_link(prefix, runlevel):
+ d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel))
+ if not os.path.isdir(d):
+ os.mkdir(d)
+ os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
+
+ for rl in keys['Default-Start'].split():
+ make_link('S%02i' % prio, rl)
+ for rl in keys['Default-Stop'].split():
+ make_link('K%02i' % (99 - prio), rl)
+
+ return script
+
+ def assert_enabled(self, unit, targets):
+ '''assert that a unit is enabled in precisely the given targets'''
+
+ all_targets = ['multi-user', 'graphical']
+
+ # should be enabled
+ for target in all_targets:
+ link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit)
+ if target in targets:
+ unit_file = os.readlink(link)
+ # os.path.exists() will fail on a dangling symlink
+ self.assertTrue(os.path.exists(link))
+ self.assertEqual(os.path.basename(unit_file), unit)
+ else:
+ self.assertFalse(os.path.exists(link),
+ '{} unexpectedly exists'.format(link))
+
+ #
+ # test cases
+ #
+
+ def test_nothing(self):
+ '''no input files'''
+
+ results = self.run_generator()[1]
+ self.assertEqual(results, {})
+ self.assertEqual(os.listdir(self.out_dir), [])
+
+ def test_simple_disabled(self):
+ '''simple service without dependencies, disabled'''
+
+ self.add_sysv('foo', {}, enable=False)
+ err, results = self.run_generator()
+ self.assertEqual(len(results), 1)
+
+ # no enablement links or other stuff
+ self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
+
+ s = results['foo.service']
+ self.assertEqual(s.sections(), ['Unit', 'Service'])
+ self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
+ # $local_fs does not need translation, don't expect any dependency
+ # fields here
+ self.assertEqual(set(s.options('Unit')),
+ set(['Documentation', 'SourcePath', 'Description']))
+
+ self.assertEqual(s.get('Service', 'Type'), 'forking')
+ init_script = os.path.join(self.init_d_dir, 'foo')
+ self.assertEqual(s.get('Service', 'ExecStart'),
+ '{} start'.format(init_script))
+ self.assertEqual(s.get('Service', 'ExecStop'),
+ '{} stop'.format(init_script))
+
+ self.assertNotIn('Overwriting', err)
+
+ def test_simple_enabled_all(self):
+ '''simple service without dependencies, enabled in all runlevels'''
+
+ self.add_sysv('foo', {}, enable=True)
+ err, results = self.run_generator()
+ self.assertEqual(list(results), ['foo.service'])
+ self.assert_enabled('foo.service', ['multi-user', 'graphical'])
+ self.assertNotIn('Overwriting', err)
+
+ def test_simple_escaped(self):
+ '''simple service without dependencies, that requires escaping the name'''
+
+ self.add_sysv('foo+', {})
+ self.add_sysv('foo-admin', {})
+ err, results = self.run_generator()
+ self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
+ self.assertNotIn('Overwriting', err)
+
+ def test_simple_enabled_some(self):
+ '''simple service without dependencies, enabled in some runlevels'''
+
+ self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
+ err, results = self.run_generator()
+ self.assertEqual(list(results), ['foo.service'])
+ self.assert_enabled('foo.service', ['multi-user'])
+
+ def test_lsb_macro_dep_single(self):
+ '''single LSB macro dependency: $network'''
+
+ self.add_sysv('foo', {'Required-Start': '$network'})
+ s = self.run_generator()[1]['foo.service']
+ self.assertEqual(set(s.options('Unit')),
+ set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
+ self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
+ self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
+
+ def test_lsb_macro_dep_multi(self):
+ '''multiple LSB macro dependencies'''
+
+ self.add_sysv('foo', {'Required-Start': '$named $portmap'})
+ s = self.run_generator()[1]['foo.service']
+ self.assertEqual(set(s.options('Unit')),
+ set(['Documentation', 'SourcePath', 'Description', 'After']))
+ self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
+
+ def test_lsb_deps(self):
+ '''LSB header dependencies to other services'''
+
+ # also give symlink priorities here; they should be ignored
+ self.add_sysv('foo', {'Required-Start': 'must1 must2',
+ 'Should-Start': 'may1 ne_may2'},
+ enable=True, prio=40)
+ self.add_sysv('must1', {}, enable=True, prio=10)
+ self.add_sysv('must2', {}, enable=True, prio=15)
+ self.add_sysv('may1', {}, enable=True, prio=20)
+ # do not create ne_may2
+ err, results = self.run_generator()
+ self.assertEqual(sorted(results),
+ ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
+
+ # foo should depend on all of them
+ self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
+ ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
+
+ # other services should not depend on each other
+ self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
+ self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
+ self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
+
+ def test_symlink_prio_deps(self):
+ '''script without LSB headers use rcN.d priority'''
+
+ # create two init.d scripts without LSB header and enable them with
+ # startup priorities
+ for prio, name in [(10, 'provider'), (15, 'consumer')]:
+ with open(os.path.join(self.init_d_dir, name), 'w') as f:
+ f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
+ os.fchmod(f.fileno(), 0o755)
+
+ d = os.path.join(self.rcnd_dir, 'rc2.d')
+ if not os.path.isdir(d):
+ os.mkdir(d)
+ os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name)))
+
+ err, results = self.run_generator()
+ self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
+ self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
+ self.assertEqual(results['consumer.service'].get('Unit', 'After'),
+ 'provider.service')
+
+ def test_multiple_provides(self):
+ '''multiple Provides: names'''
+
+ self.add_sysv('foo', {'Provides': 'foo bar baz'})
+ err, results = self.run_generator()
+ self.assertEqual(list(results), ['foo.service'])
+ self.assertEqual(set(results['foo.service'].options('Unit')),
+ set(['Documentation', 'SourcePath', 'Description']))
+ # should create symlinks for the alternative names
+ for f in ['bar.service', 'baz.service']:
+ self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
+ 'foo.service')
+ self.assertNotIn('Overwriting', err)
+
+ def test_provides_escaped(self):
+ '''a script that Provides: a name that requires escaping'''
+
+ self.add_sysv('foo', {'Provides': 'foo foo+'})
+ err, results = self.run_generator()
+ self.assertEqual(list(results), ['foo.service'])
+ self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
+ 'foo.service')
+ self.assertNotIn('Overwriting', err)
+
+ def test_same_provides_in_multiple_scripts(self):
+ '''multiple init.d scripts provide the same name'''
+
+ self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
+ self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
+ err, results = self.run_generator()
+ self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
+ # should create symlink for the alternative name for either unit
+ self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
+ ['foo.service', 'bar.service'])
+
+ def test_provide_other_script(self):
+ '''init.d scripts provides the name of another init.d script'''
+
+ self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
+ self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
+ err, results = self.run_generator()
+ self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
+ # we do expect an overwrite here, bar.service should overwrite the
+ # alias link from foo.service
+ self.assertIn('Overwriting', err)
+
+ def test_nonexecutable_script(self):
+ '''ignores non-executable init.d script'''
+
+ os.chmod(self.add_sysv('foo', {}), 0o644)
+ err, results = self.run_generator()
+ self.assertEqual(results, {})
+
+ def test_sh_suffix(self):
+ '''init.d script with .sh suffix'''
+
+ self.add_sysv('foo.sh', {}, enable=True)
+ err, results = self.run_generator()
+ s = results['foo.service']
+
+ self.assertEqual(s.sections(), ['Unit', 'Service'])
+ # should not have a .sh
+ self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
+
+ # calls correct script with .sh
+ init_script = os.path.join(self.init_d_dir, 'foo.sh')
+ self.assertEqual(s.get('Service', 'ExecStart'),
+ '{} start'.format(init_script))
+ self.assertEqual(s.get('Service', 'ExecStop'),
+ '{} stop'.format(init_script))
+
+ self.assert_enabled('foo.service', ['multi-user', 'graphical'])
+
+ def test_sh_suffix_with_provides(self):
+ '''init.d script with .sh suffix and Provides:'''
+
+ self.add_sysv('foo.sh', {'Provides': 'foo bar'})
+ err, results = self.run_generator()
+ # ensure we don't try to create a symlink to itself
+ self.assertNotIn('itself', err)
+ self.assertEqual(list(results), ['foo.service'])
+ self.assertEqual(results['foo.service'].get('Unit', 'Description'),
+ 'LSB: test foo service')
+
+ # should create symlink for the alternative name
+ self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
+ 'foo.service')
+
+ def test_hidden_files(self):
+ '''init.d script with hidden file suffix'''
+
+ script = self.add_sysv('foo', {}, enable=True)
+ # backup files (not enabled in rcN.d/)
+ shutil.copy(script, script + '.dpkg-new')
+ shutil.copy(script, script + '.dpkg-dist')
+ shutil.copy(script, script + '.swp')
+ shutil.copy(script, script + '.rpmsave')
+
+ err, results = self.run_generator()
+ self.assertEqual(list(results), ['foo.service'])
+
+ self.assert_enabled('foo.service', ['multi-user', 'graphical'])
+
+ def test_backup_file(self):
+ '''init.d script with backup file'''
+
+ script = self.add_sysv('foo', {}, enable=True)
+ # backup files (not enabled in rcN.d/)
+ shutil.copy(script, script + '.bak')
+ shutil.copy(script, script + '.old')
+ shutil.copy(script, script + '.tmp')
+ shutil.copy(script, script + '.new')
+
+ err, results = self.run_generator()
+ print(err)
+ self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
+
+ # ensure we don't try to create a symlink to itself
+ self.assertNotIn('itself', err)
+
+ self.assert_enabled('foo.service', ['multi-user', 'graphical'])
+ self.assert_enabled('foo.bak.service', [])
+ self.assert_enabled('foo.old.service', [])
+
+ def test_existing_native_unit(self):
+ '''existing native unit'''
+
+ with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
+ f.write('[Unit]\n')
+
+ self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
+ err, results = self.run_generator()
+ self.assertEqual(list(results), [])
+ # no enablement or alias links, as native unit is disabled
+ self.assertEqual(os.listdir(self.out_dir), [])
+
+
+if __name__ == '__main__':
+ unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))