diff options
Diffstat (limited to 'debian/tests/storage')
-rwxr-xr-x | debian/tests/storage | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/debian/tests/storage b/debian/tests/storage new file mode 100755 index 0000000..a8403e0 --- /dev/null +++ b/debian/tests/storage @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +# systemd integration test: Handling of storage devices +# (C) 2015 Canonical Ltd. +# Author: Martin Pitt <martin.pitt@ubuntu.com> + +import os +import sys +import unittest +import subprocess +import time +import random +from glob import glob + + +@unittest.skipIf(os.path.isdir('/sys/module/scsi_debug'), + 'The scsi_debug module is already loaded') +class FakeDriveTestBase(unittest.TestCase): + @classmethod + def setUpClass(klass): + # create a fake SCSI hard drive + subprocess.check_call(['modprobe', 'scsi_debug', 'dev_size_mb=32']) + # wait until drive got created + sys_dirs = [] + while not sys_dirs: + sys_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block') + time.sleep(0.1) + assert len(sys_dirs) == 1 + devs = os.listdir(sys_dirs[0]) + assert len(devs) == 1 + klass.device = '/dev/' + devs[0] + + @classmethod + def tearDownClass(klass): + # create a fake SCSI hard drive + subprocess.check_call(['rmmod', 'scsi_debug']) + + def tearDown(self): + # clear drive + with open(self.device, 'wb') as f: + block = b'0' * 1048576 + try: + while True: + f.write(block) + except OSError: + pass + subprocess.check_call(['udevadm', 'settle']) + subprocess.check_call(['systemctl', 'daemon-reload']) + + +class CryptsetupTest(FakeDriveTestBase): + def setUp(self): + self.plaintext_name = 'testcrypt1' + self.plaintext_dev = '/dev/mapper/' + self.plaintext_name + if os.path.exists(self.plaintext_dev): + self.fail('%s exists already' % self.plaintext_dev) + + super().setUp() + + if os.path.exists('/etc/crypttab'): + os.rename('/etc/crypttab', '/etc/crypttab.systemdtest') + self.password = 'pwd%i' % random.randint(1000, 10000) + self.password_agent = None + + def tearDown(self): + if self.password_agent: + os.kill(self.password_agent, 9) + os.waitpid(self.password_agent, 0) + self.password_agent = None + subprocess.call(['umount', self.plaintext_dev], stderr=subprocess.DEVNULL) + subprocess.call(['systemctl', 'start', '--no-ask-password', 'systemd-cryptsetup@%s.service' % self.plaintext_name], + stderr=subprocess.STDOUT) + subprocess.call(['systemctl', 'stop', 'systemd-cryptsetup@%s.service' % self.plaintext_name], + stderr=subprocess.STDOUT) + if os.path.exists('/etc/crypttab'): + os.unlink('/etc/crypttab') + if os.path.exists('/etc/crypttab.systemdtest'): + os.rename('/etc/crypttab.systemdtest', '/etc/crypttab') + if os.path.exists(self.plaintext_dev): + subprocess.call(['dmsetup', 'remove', self.plaintext_dev], + stderr=subprocess.STDOUT) + + super().tearDown() + + def format_luks(self): + '''Format test device with LUKS''' + + p = subprocess.Popen(['cryptsetup', '--batch-mode', 'luksFormat', self.device, '-'], + stdin=subprocess.PIPE) + p.communicate(self.password.encode()) + self.assertEqual(p.returncode, 0) + os.sync() + subprocess.check_call(['udevadm', 'settle']) + + def start_password_agent(self): + '''Run password agent to answer passphrase request for crypt device''' + + pid = os.fork() + if pid > 0: + self.password_agent = pid + return + + # wait for incoming request + found = False + while not found: + for ask in glob('/run/systemd/ask-password/ask.*'): + with open(ask) as f: + contents = f.read() + if 'disk scsi_debug' in contents and self.plaintext_name in contents: + found = True + break + if not found: + time.sleep(0.5) + + # parse Socket= + for line in contents.splitlines(): + if line.startswith('Socket='): + socket = line.split('=', 1)[1] + break + + # send reply + p = subprocess.Popen(['/lib/systemd/systemd-reply-password', '1', socket], + stdin=subprocess.PIPE) + p.communicate(self.password.encode()) + assert p.returncode == 0 + + os._exit(0) + + def apply(self, target): + '''Tell systemd to generate and run the cryptsetup units''' + + subprocess.check_call(['systemctl', 'daemon-reload']) + + self.start_password_agent() + subprocess.check_call(['systemctl', '--no-ask-password', 'restart', target]) + for timeout in range(50): + if os.path.exists(self.plaintext_dev): + break + time.sleep(0.1) + else: + self.fail('timed out for %s to appear' % self.plaintext_dev) + + def test_luks_by_devname(self): + '''LUKS device by plain device name, empty''' + + self.format_luks() + with open('/etc/crypttab', 'w') as f: + f.write('%s %s none luks\n' % (self.plaintext_name, self.device)) + self.apply('cryptsetup.target') + + # should not be mounted + with open('/proc/mounts') as f: + self.assertNotIn(self.plaintext_name, f.read()) + + # device should not have anything on it + p = subprocess.Popen(['blkid', self.plaintext_dev], stdout=subprocess.PIPE) + out = p.communicate()[0] + self.assertEqual(out, b'') + self.assertNotEqual(p.returncode, 0) + + def test_luks_by_uuid(self): + '''LUKS device by UUID, empty''' + + self.format_luks() + uuid = subprocess.check_output(['blkid', '-ovalue', '-sUUID', self.device], + universal_newlines=True).strip() + with open('/etc/crypttab', 'w') as f: + f.write('%s UUID=%s none luks\n' % (self.plaintext_name, uuid)) + self.apply('cryptsetup.target') + + # should not be mounted + with open('/proc/mounts') as f: + self.assertNotIn(self.plaintext_name, f.read()) + + # device should not have anything on it + p = subprocess.Popen(['blkid', self.plaintext_dev], stdout=subprocess.PIPE) + out = p.communicate()[0] + self.assertEqual(out, b'') + self.assertNotEqual(p.returncode, 0) + + def test_luks_swap(self): + '''LUKS device with "swap" option''' + + self.format_luks() + with open('/etc/crypttab', 'w') as f: + f.write('%s %s none luks,swap\n' % (self.plaintext_name, self.device)) + self.apply('cryptsetup.target') + + # should not be mounted + with open('/proc/mounts') as f: + self.assertNotIn(self.plaintext_name, f.read()) + + # device should be formatted with swap + out = subprocess.check_output(['blkid', '-ovalue', '-sTYPE', self.plaintext_dev]) + self.assertEqual(out, b'swap\n') + + def test_luks_tmp(self): + '''LUKS device with "tmp" option''' + + self.format_luks() + with open('/etc/crypttab', 'w') as f: + f.write('%s %s none luks,tmp\n' % (self.plaintext_name, self.device)) + self.apply('cryptsetup.target') + + # should not be mounted + with open('/proc/mounts') as f: + self.assertNotIn(self.plaintext_name, f.read()) + + # device should be formatted with ext2 + out = subprocess.check_output(['blkid', '-ovalue', '-sTYPE', self.plaintext_dev]) + self.assertEqual(out, b'ext2\n') + + def test_luks_fstab(self): + '''LUKS device in /etc/fstab''' + + self.format_luks() + with open('/etc/crypttab', 'w') as f: + f.write('%s %s none luks,tmp\n' % (self.plaintext_name, self.device)) + + mountpoint = '/run/crypt1.systemdtest' + os.mkdir(mountpoint) + self.addCleanup(os.rmdir, mountpoint) + os.rename('/etc/fstab', '/etc/fstab.systemdtest') + self.addCleanup(os.rename, '/etc/fstab.systemdtest', '/etc/fstab') + with open('/etc/fstab', 'a') as f: + with open('/etc/fstab.systemdtest') as forig: + f.write(forig.read()) + f.write('%s %s ext2 defaults 0 0\n' % (self.plaintext_dev, mountpoint)) + + # this should now be a requirement of local-fs.target + self.apply('local-fs.target') + + # should be mounted + found = False + with open('/proc/mounts') as f: + for line in f: + fields = line.split() + if fields[0] == self.plaintext_dev: + self.assertEqual(fields[1], mountpoint) + self.assertEqual(fields[2], 'ext2') + found = True + break + if not found: + self.fail('%s is not mounted' % self.plaintext_dev) + + +if __name__ == '__main__': + unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, + verbosity=2)) |