#!/usr/bin/env python3 # systemd integration test: Handling of storage devices # (C) 2015 Canonical Ltd. # Author: Martin Pitt import os import sys import unittest import subprocess import time import random from glob import glob @unittest.skipIf(os.path.isdir('/sys/module/scsi_debug'), 'The scsi_debug module is already loaded') class FakeDriveTestBase(unittest.TestCase): @classmethod def setUpClass(klass): # create a fake SCSI hard drive subprocess.check_call(['modprobe', 'scsi_debug', 'dev_size_mb=32']) # wait until drive got created sys_dirs = [] while not sys_dirs: sys_dirs = glob('/sys/bus/pseudo/drivers/scsi_debug/adapter*/host*/target*/*:*/block') time.sleep(0.1) assert len(sys_dirs) == 1 devs = os.listdir(sys_dirs[0]) assert len(devs) == 1 klass.device = '/dev/' + devs[0] @classmethod def tearDownClass(klass): # create a fake SCSI hard drive subprocess.check_call(['rmmod', 'scsi_debug']) def tearDown(self): # clear drive with open(self.device, 'wb') as f: block = b'0' * 1048576 try: while True: f.write(block) except OSError: pass subprocess.check_call(['udevadm', 'settle']) subprocess.check_call(['systemctl', 'daemon-reload']) class CryptsetupTest(FakeDriveTestBase): def setUp(self): self.plaintext_name = 'testcrypt1' self.plaintext_dev = '/dev/mapper/' + self.plaintext_name if os.path.exists(self.plaintext_dev): self.fail('%s exists already' % self.plaintext_dev) super().setUp() if os.path.exists('/etc/crypttab'): os.rename('/etc/crypttab', '/etc/crypttab.systemdtest') self.password = 'pwd%i' % random.randint(1000, 10000) self.password_agent = None def tearDown(self): if self.password_agent: os.kill(self.password_agent, 9) os.waitpid(self.password_agent, 0) self.password_agent = None subprocess.call(['umount', self.plaintext_dev], stderr=subprocess.DEVNULL) subprocess.call(['systemctl', 'start', '--no-ask-password', 'systemd-cryptsetup@%s.service' % self.plaintext_name], stderr=subprocess.STDOUT) subprocess.call(['systemctl', 'stop', 'systemd-cryptsetup@%s.service' % self.plaintext_name], stderr=subprocess.STDOUT) if os.path.exists('/etc/crypttab'): os.unlink('/etc/crypttab') if os.path.exists('/etc/crypttab.systemdtest'): os.rename('/etc/crypttab.systemdtest', '/etc/crypttab') if os.path.exists(self.plaintext_dev): subprocess.call(['dmsetup', 'remove', self.plaintext_dev], stderr=subprocess.STDOUT) super().tearDown() def format_luks(self): '''Format test device with LUKS''' p = subprocess.Popen(['cryptsetup', '--batch-mode', 'luksFormat', self.device, '-'], stdin=subprocess.PIPE) p.communicate(self.password.encode()) self.assertEqual(p.returncode, 0) os.sync() subprocess.check_call(['udevadm', 'settle']) def start_password_agent(self): '''Run password agent to answer passphrase request for crypt device''' pid = os.fork() if pid > 0: self.password_agent = pid return # wait for incoming request found = False while not found: for ask in glob('/run/systemd/ask-password/ask.*'): with open(ask) as f: contents = f.read() if 'disk scsi_debug' in contents and self.plaintext_name in contents: found = True break if not found: time.sleep(0.5) # parse Socket= for line in contents.splitlines(): if line.startswith('Socket='): socket = line.split('=', 1)[1] break # send reply p = subprocess.Popen(['/lib/systemd/systemd-reply-password', '1', socket], stdin=subprocess.PIPE) p.communicate(self.password.encode()) assert p.returncode == 0 os._exit(0) def apply(self, target): '''Tell systemd to generate and run the cryptsetup units''' subprocess.check_call(['systemctl', 'daemon-reload']) self.start_password_agent() subprocess.check_call(['systemctl', '--no-ask-password', 'restart', target]) for timeout in range(50): if os.path.exists(self.plaintext_dev): break time.sleep(0.1) else: self.fail('timed out for %s to appear' % self.plaintext_dev) def test_luks_by_devname(self): '''LUKS device by plain device name, empty''' self.format_luks() with open('/etc/crypttab', 'w') as f: f.write('%s %s none luks\n' % (self.plaintext_name, self.device)) self.apply('cryptsetup.target') # should not be mounted with open('/proc/mounts') as f: self.assertNotIn(self.plaintext_name, f.read()) # device should not have anything on it p = subprocess.Popen(['blkid', self.plaintext_dev], stdout=subprocess.PIPE) out = p.communicate()[0] self.assertEqual(out, b'') self.assertNotEqual(p.returncode, 0) def test_luks_by_uuid(self): '''LUKS device by UUID, empty''' self.format_luks() uuid = subprocess.check_output(['blkid', '-ovalue', '-sUUID', self.device], universal_newlines=True).strip() with open('/etc/crypttab', 'w') as f: f.write('%s UUID=%s none luks\n' % (self.plaintext_name, uuid)) self.apply('cryptsetup.target') # should not be mounted with open('/proc/mounts') as f: self.assertNotIn(self.plaintext_name, f.read()) # device should not have anything on it p = subprocess.Popen(['blkid', self.plaintext_dev], stdout=subprocess.PIPE) out = p.communicate()[0] self.assertEqual(out, b'') self.assertNotEqual(p.returncode, 0) def test_luks_swap(self): '''LUKS device with "swap" option''' self.format_luks() with open('/etc/crypttab', 'w') as f: f.write('%s %s none luks,swap\n' % (self.plaintext_name, self.device)) self.apply('cryptsetup.target') # should not be mounted with open('/proc/mounts') as f: self.assertNotIn(self.plaintext_name, f.read()) # device should be formatted with swap out = subprocess.check_output(['blkid', '-ovalue', '-sTYPE', self.plaintext_dev]) self.assertEqual(out, b'swap\n') def test_luks_tmp(self): '''LUKS device with "tmp" option''' self.format_luks() with open('/etc/crypttab', 'w') as f: f.write('%s %s none luks,tmp\n' % (self.plaintext_name, self.device)) self.apply('cryptsetup.target') # should not be mounted with open('/proc/mounts') as f: self.assertNotIn(self.plaintext_name, f.read()) # device should be formatted with ext2 out = subprocess.check_output(['blkid', '-ovalue', '-sTYPE', self.plaintext_dev]) self.assertEqual(out, b'ext2\n') def test_luks_fstab(self): '''LUKS device in /etc/fstab''' self.format_luks() with open('/etc/crypttab', 'w') as f: f.write('%s %s none luks,tmp\n' % (self.plaintext_name, self.device)) mountpoint = '/run/crypt1.systemdtest' os.mkdir(mountpoint) self.addCleanup(os.rmdir, mountpoint) os.rename('/etc/fstab', '/etc/fstab.systemdtest') self.addCleanup(os.rename, '/etc/fstab.systemdtest', '/etc/fstab') with open('/etc/fstab', 'a') as f: with open('/etc/fstab.systemdtest') as forig: f.write(forig.read()) f.write('%s %s ext2 defaults 0 0\n' % (self.plaintext_dev, mountpoint)) # this should now be a requirement of local-fs.target self.apply('local-fs.target') # should be mounted found = False with open('/proc/mounts') as f: for line in f: fields = line.split() if fields[0] == self.plaintext_dev: self.assertEqual(fields[1], mountpoint) self.assertEqual(fields[2], 'ext2') found = True break if not found: self.fail('%s is not mounted' % self.plaintext_dev) if __name__ == '__main__': unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))