diff options
Diffstat (limited to '')
-rw-r--r-- | debian/tests/control | 3 | ||||
-rw-r--r-- | debian/tests/postfix | 19 | ||||
-rw-r--r-- | debian/tests/test-postfix.py | 553 | ||||
-rw-r--r-- | debian/tests/testlib.py | 1151 |
4 files changed, 1726 insertions, 0 deletions
diff --git a/debian/tests/control b/debian/tests/control new file mode 100644 index 0000000..0560b7e --- /dev/null +++ b/debian/tests/control @@ -0,0 +1,3 @@ +Tests: postfix +Depends: procmail, sasl2-bin, python3-pexpect, lsb-release, python3, libsasl2-modules +Restrictions: needs-root diff --git a/debian/tests/postfix b/debian/tests/postfix new file mode 100644 index 0000000..c5be87b --- /dev/null +++ b/debian/tests/postfix @@ -0,0 +1,19 @@ +#!/bin/bash +#---------------- +# Testing postfix +#---------------- +set -e + +# reconfigure postfix +debconf-set-selections <<< "postfix postfix/mailname string localhost" 2>&1 +debconf-set-selections <<< "postfix postfix/main_mailer_type string 'Internet Site'" 2>&1 + +# install and modify +hostname localhost +apt-get install -y postfix 2>&1 +hostname --fqdn > /etc/mailname +service postfix restart 2>&1 +python3 `dirname $0`/test-postfix.py 2>&1 + +# Check set-permissions +postfix set-permissions diff --git a/debian/tests/test-postfix.py b/debian/tests/test-postfix.py new file mode 100644 index 0000000..b3cbdc2 --- /dev/null +++ b/debian/tests/test-postfix.py @@ -0,0 +1,553 @@ +#!/usr/bin/python +# +# test-postfix.py quality assurance test script for postfix +# Copyright (C) 2008-2012 Canonical Ltd. +# Author: Kees Cook <kees@ubuntu.com> +# Author: Marc Deslauriers <marc.deslauriers@canonical.com> +# Author: Jamie Strandboge <jamie@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# QRT-Packages: postfix sasl2-bin procmail python-pexpect +# QRT-Privilege: root +# QRT-Conflicts: exim4 + +''' + Note: When installing postfix, select "Internet Site". This script will + not work if "Local Only" was selected. + + How to run against a clean schroot named 'hardy': + schroot -c hardy -u root -- sh -c 'apt-get -y install procmail python-unit postfix sasl2-bin python-pexpect lsb-release && ./test-postfix.py -v' + + Tests: + 00: setup + 10: basic plain auth setup + 11: above, but with CVE reproducers + 20: sasl non-PLAIN setup + 21: 20, but with CVE reproducers + 99: restore configs +''' + +import unittest, subprocess, re, pexpect, smtplib, socket, os, time, tempfile +import testlib + +import sys +''' Test for postfix check output''' +result = subprocess.run(['postconf', 'maillog_file = /dev/stdout'], capture_output=True, text=True) +result = subprocess.run(['postfix', 'check'], capture_output=True, text=True) +if result.returncode != 0: + print('postfix check failed with error code: {0}.'.format(result.returncode)) + sys.exit(1) +if result.stdout: + print('postfix check warning/error: {0}'.format(result.stdout[26:])) + subprocess.run(['postconf', 'maillog_file ='], capture_output=True, text=True) + sys.exit(1) +subprocess.run(['postconf', 'maillog_file ='], capture_output=True, text=True) + +class PostfixTest(testlib.TestlibCase): + '''Test Postfix MTA.''' + + def _setUp(self): + '''Create server configs.''' + + # Move listener to localhost:2525 + conf_file = '/etc/postfix/master.cf' + contents = '' + with open(conf_file) as fh: + for cfline in fh: + if cfline.startswith('smtp') and 'smtpd' in cfline and 'inet' in cfline: + contents += '127.0.0.1:2525 inet n - - - - smtpd\n' + else: + contents += "%s\n" % cfline + testlib.config_replace(conf_file, contents, append=False) + + conf_file = '/etc/postfix/main.cf' + # Use mbox only + testlib.config_comment(conf_file,'home_mailbox') + testlib.config_set(conf_file,'mailbox_command','procmail -a "$EXTENSION"') + + # Turn on sasl + self._setup_sasl("PLAIN") + reply = self._check_auth("PLAIN") + + + def setUp(self): + '''Set up prior to each test_* function''' + # list of files that we update + self.conf_files = [ '/etc/postfix/master.cf', '/etc/postfix/main.cf', '/etc/default/saslauthd', '/etc/postfix/sasl/smtpd.conf', '/etc/sasldb2'] + + self.user = testlib.TestUser(lower=True) + self.s = None + # Silently allow for this connection to fail, to handle the + # initial setup of the postfix server. + try: + self.s = smtplib.SMTP('localhost', port=2525) + except: + pass + + def _tearDown(self): + '''Restore server configs''' + for f in self.conf_files: + testlib.config_restore(f) + + # put saslauthd back + for f in ['/var/spool/postfix/var/run/saslauthd', '/var/run/saslauthd']: + if os.path.isfile(f) or os.path.islink(f): + os.unlink(f) + elif os.path.exists(f): + testlib.recursive_rm(f) + subprocess.call(['mkdir','-p','/var/run/saslauthd']) + subprocess.call(['/usr/sbin/service', 'saslauthd', 'stop'], stdout=subprocess.PIPE) + subprocess.call(['/usr/sbin/service', 'saslauthd', 'start'], stdout=subprocess.PIPE) + subprocess.call(['cp', '/etc/sasldb2', '/var/spool/postfix/etc/sasldb2']) + + def tearDown(self): + '''Clean up after each test_* function''' + + try: + self.s.quit() + except: + pass + self.user = None + + def _restart_server(self): + '''Restart server''' + subprocess.call(['/usr/sbin/service', 'postfix', 'stop'], stdout=subprocess.PIPE) + assert subprocess.call(['/usr/sbin/service', 'postfix', 'start'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0 + # Postfix exits its init script before the master listener has started + time.sleep(2) + + def _setup_sasl(self, mech, other_mech="", force_sasldb=False): + '''Setup sasl for mech''' + conf_file = '/etc/postfix/main.cf' + for field in ['smtpd_sasl_type','smtpd_sasl_local_domain','smtpd_tls_auth_only']: + testlib.config_comment(conf_file,field) + testlib.config_set(conf_file,'smtpd_sasl_path','smtpd') + testlib.config_set(conf_file,'smtpd_sasl_auth_enable','yes') + #testlib.config_set(conf_file,'broken_sasl_auth_clients','yes') + testlib.config_set(conf_file,'smtpd_sasl_authenticated_header','yes') + testlib.config_set(conf_file,'smtpd_tls_loglevel','2') + + # setup smtpd.conf and the sasl users + contents = '' + + self.assertTrue(mech in ['LOGIN', 'PLAIN', 'CRAM-MD5', 'DIGEST-MD5'], "Invalid mech: %s" % mech) + + if not force_sasldb and (mech == "PLAIN" or mech == "LOGIN"): + conf_file = '/etc/default/saslauthd' + testlib.config_set(conf_file, 'START', 'yes', spaces=False) + + contents = ''' +pwcheck_method: saslauthd +allowanonymouslogin: 0 +allowplaintext: 1 +mech_list: %s %s +''' % (mech, other_mech) + + # attach SASL to postfix chroot + subprocess.call(['mkdir','-p','/var/spool/postfix/var/run/saslauthd']) + subprocess.call(['rm','-rf','/var/run/saslauthd']) + subprocess.call(['ln','-s','/var/run/saslauthd', '/var/spool/postfix/var/run/saslauthd']) + subprocess.call(['/usr/sbin/service', 'saslauthd', 'stop'], stdout=subprocess.PIPE) + assert subprocess.call(['/usr/sbin/service', 'saslauthd', 'start'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0 + + # Force crackful perms so chroot'd postfix can talk to saslauthd + subprocess.call(['chmod','o+x','/var/run/saslauthd']) + else: + plaintext = "1" + if mech == "LOGIN" or mech == "PLAIN": + plaintext = "0" + contents = ''' +pwcheck_method: auxprop +allowanonymouslogin: 0 +allowplaintext: %s +mech_list: %s %s +''' % (plaintext, mech, other_mech) + + # Add user to sasldb2 + testlib.config_replace("/etc/sasldb2", '', append=False) + + rc, report = testlib.cmd(['postconf', '-h', 'myhostname']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, result + report) + + child = pexpect.spawn('saslpasswd2 -c -u %s %s' % (report.strip(), self.user.login)) + time.sleep(0.2) + child.expect(r'(?i)password', timeout=5) + time.sleep(0.2) + child.sendline(self.user.password) + time.sleep(0.2) + child.expect(r'.*(for verification)', timeout=5) + time.sleep(0.2) + child.sendline(self.user.password) + time.sleep(0.2) + rc = child.expect('\n', timeout=5) + time.sleep(0.2) + self.assertEqual(rc, expected, "passwd returned %d" %(rc)) + + child.kill(0) + + os.chmod("/etc/sasldb2", 0o640) + rc, report = testlib.cmd(['chgrp', 'postfix', '/etc/sasldb2']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, result + report) + + # Force crackful perms so chroot'd postfix can talk to saslauthd + subprocess.call(['mv', '-f', '/etc/sasldb2', '/var/spool/postfix/etc']) + subprocess.call(['ln', '-s', '/var/spool/postfix/etc/sasldb2', '/etc/sasldb2']) + + conf_file = '/etc/postfix/sasl/smtpd.conf' + testlib.config_replace(conf_file, contents, append=False) + + # Restart server + self._restart_server() + + def _is_listening(self): + '''Is the server listening''' + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5) + s.connect(('localhost',2525)) + greeting = s.recv(1024).decode('utf-8') + # 220 gorgon.outflux.net ESMTP Postfix (Ubuntu) + self.assertTrue(greeting.startswith('220 '),greeting) + self.assertTrue('ESMTP' in greeting,greeting) + self.assertTrue('Postfix' in greeting,greeting) + self.assertFalse('MTA' in greeting,greeting) + s.close() + + def test_00_listening(self): + '''Postfix is listening''' + # Get the main instance running + self._setUp() + + self._is_listening() + + def _vrfy(self, address, valid = True): + self.s.putcmd("vrfy",address) + code, msg = self.s.getreply() + reply = '%d %s' % (code, msg) + if valid: + self.assertEqual(code, 252, reply) + self.assertTrue(address.encode('utf-8') in msg, reply) + else: + self.assertEqual(code, 550, reply) + self.assertTrue(b'Recipient address rejected' in msg, reply) + self.assertTrue(('<%s>' % (address)).encode('utf-8') in msg, reply) + + def test_10_commands(self): + '''Basic SMTP commands''' + + #s = smtplib.SMTP('localhost', port=2525) + # EHLO + code, msg = self.s.ehlo() + reply = '%d %s' % (code, msg) + self.assertEqual(code, 250, reply) + self.assertEqual(self.s.does_esmtp, 1, reply) + self.assertTrue(b'8BITMIME' in self.s.ehlo_resp, reply) + # No help available + self.s.putcmd("help") + code, msg = self.s.getreply() + reply = '%d %s' % (code, msg) + self.assertEqual(code, 500, reply) + self.assertTrue(b'Error' in msg, reply) + # VRFY addresses + self._vrfy('address@example.com', valid=True) + self._vrfy('does-not-exist', valid=False) + self._vrfy(self.user.login, valid=True) + + def _test_deliver_mail(self, user_sent_to, auth_user=None, auth_pass=None, use_tls=False): + '''Perform mail delivery''' + + if auth_user and auth_pass: + self.s.login(auth_user, auth_pass) + if use_tls: + self.s.starttls() + failed = self.s.sendmail('root',[user_sent_to.login,'does-not-exist'],'''From: Rooty <root> +To: "%s" <%s> +Subject: This is test 1 + +Hello, nice to meet you. +''' % (user_sent_to.gecos, user_sent_to.login)) + #for addr in failed.keys(): + # print '%s %d %s' % (addr, failed[addr][0], failed[addr][1]) + self.assertEqual(len(failed),1,failed) + self.assertTrue('does-not-exist' in failed,failed) + self.assertEqual(failed['does-not-exist'][0],550,failed) + + # Frighteningly, postfix seems to accept email before confirming + # a successful write to disk for the recipient! + time.sleep(2) + + def _test_mail_in_spool(self, user_directed_to, target_spool_user=None, spool_file=None, auth_user=None, use_tls=False): + '''Check that mail arrived in the spool''' + + # Handle the case of forwarded emails + if target_spool_user == None: + target_spool_user = user_directed_to + # Read delivered email + if spool_file == None: + spool_file = '/var/mail/%s' % (target_spool_user.login) + time.sleep(1) + with open(spool_file) as fh: + contents = fh.read() + # Server-side added headers... + self.assertTrue('\nReceived: ' in contents, contents) + if use_tls and self.lsb_release['Release'] > 6.06: + expected = ' (Postfix) with ESMTPS id ' + else: + expected = ' (Postfix) with ESMTP id ' + if auth_user: + if self.lsb_release['Release'] < 8.04: + self._skipped("Received header portion") + else: + expected = ' (Postfix) with ESMTPA id ' + self.assertTrue('(Authenticated sender: %s)' % (auth_user)) + self.assertTrue(expected in contents, 'Looking for "%s" in email:\n%s' % (expected, contents)) + self.assertTrue('\nMessage-Id: ' in contents, contents) + self.assertTrue('\nDate: ' in contents, contents) + # client-side headers/body... + self.assertTrue('\nSubject: This is test 1' in contents, contents) + self.assertTrue('\nFrom: Rooty' in contents, contents) + self.assertTrue('\nTo: "Buddy %s" <%s@' % (user_directed_to.login, user_directed_to.login) in contents, contents) + self.assertTrue('\nHello, nice to meet you.' in contents, contents) + + def _test_roundtrip_mail(self, user_sent_to, user_to_check=None, spool_file=None, auth_user=None, auth_pass=None, use_tls=False): + '''Send and check email delivery''' + self._test_deliver_mail(user_sent_to, auth_user, auth_pass, use_tls=use_tls) + self._test_mail_in_spool(user_sent_to, user_to_check, spool_file, auth_user=auth_user, use_tls=use_tls) + + def test_10_sending_mail_direct(self): + '''Mail delivered normally''' + self._test_roundtrip_mail(self.user) + + def test_10_sending_mail_direct_with_tls(self): + '''Mail delivered normally with TLS''' + self._test_roundtrip_mail(self.user, use_tls=True) + + def test_10_sending_mail_direct_auth(self): + '''Mail authentication''' + # Verify rejected bad password and user + self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap') + self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap') + self.s.login(self.user.login, self.user.password) + + def test_10_sending_mail_direct_auth_full(self): + '''Mail delivered with authentication''' + # Perform end-to-end authentication test + self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password) + + def _write_forward(self, user, contents): + forward_filename = '/home/%s/.forward' % (user.login) + with open(forward_filename,'w') as fh: + fh.write(contents) + os.chown(forward_filename, user.uid, user.gid) + + def test_10_sending_mail_forward_normal(self): + '''Mail delivered via .forward''' + + forward_user = testlib.TestUser(lower=True) + self._write_forward(forward_user, self.user.login+'\n') + self._test_roundtrip_mail(forward_user, self.user) + + def test_10_sending_mail_forward_xternal(self): + '''Mail processed by commands in .forward''' + + # Create user-writable redirected mbox destination + mbox, mbox_name = testlib.mkstemp_fill('',prefix='test-postfix.mbox-') + mbox.close() + os.chown(mbox_name, self.user.uid, self.user.gid) + + # Create a script to run in the .forward + redir, redir_name = testlib.mkstemp_fill('''#!/bin/bash +/bin/cat > "%s" +''' % (mbox_name),prefix='test-postfix.redir-') + redir.close() + os.chmod(redir_name,0o755) + + self._write_forward(self.user,'|%s\n' % (redir_name)) + + # SKIP TESTING, FAILS IN TESTBED + #self._test_roundtrip_mail(self.user, spool_file=mbox_name) + + os.unlink(redir_name) + os.unlink(mbox_name) + + def test_11_security_CVE_2008_2936(self): + '''CVE-2008-2936 fixed''' + + # First, create our "target" file + secret = '/root/secret.txt' + with open(secret,'w') as fh: + fh.write('Secret information\n') + os.chmod(secret, 0o700) + + # Now, create a symlink to the target (we're going to use /var/tmp + # since we're assuming it, /root, /var/mail are on the same filesystem. + # For most chroot testing, /tmp is mounted from the real machine. + if os.path.islink('/var/tmp/secret.link'): + os.unlink('/var/tmp/secret.link') + self.assertEqual(subprocess.call(['su','-c','ln -s /root/secret.txt /var/tmp/secret.link',self.user.login]),0,"Symlink creation") + + # Now, the hardlink, which in ubuntu's case needs to be done by root. + os.link('/var/tmp/secret.link','/var/mail/%s' % (self.user.login)) + + # Email delivered to this user will be written to the root-owned + # file now if the CVE is unfixed. + failed = self.s.sendmail('root',[self.user.login],'''From: Evil <root> +To: "%s" <%s> +Subject: This is an overwrite test + +Hello, nice to pwn you. +''' % (self.user.gecos, self.user.login)) + self.assertEqual(len(failed),0,failed) + + # Pause for delivery + time.sleep(2) + + with open(secret) as fh: + contents = fh.read() + # Clean up before possible failures + os.unlink('/var/mail/%s' % (self.user.login)) + os.unlink('/var/tmp/secret.link') + os.unlink(secret) + # Check results + self.assertTrue('Secret information' in contents, contents) + self.assertFalse('nice to pwn you' in contents, contents) + + def _check_auth(self, mech): + '''Check AUTH: side effect-- self.s is set''' + try: + self.s.quit() + except: + pass + self.s = smtplib.SMTP('localhost', port=2525) + + self._is_listening() + + # has mech + code, msg = self.s.ehlo() + reply = '%d %s' % (code, msg) + self.assertEqual(code, 250, reply) + self.assertEqual(self.s.does_esmtp, 1, reply) + self.assertTrue(mech.encode('utf-8') in self.s.ehlo_resp, reply) + return reply + + def test_20_sasldb_cram_md5(self): + '''Test sasldb CRAM-MD5''' + # Quit the setUp() connection, restart the server and reconnect + self.s.quit() + self._setup_sasl("CRAM-MD5") + + reply = self._check_auth("CRAM-MD5") + self.assertTrue('PLAIN' not in reply, reply) + + # Verify rejected bad password and user + self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap') + self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap') + + # Perform end-to-end authentication test + self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password) + + def test_20_sasldb_digest_md5(self): + '''Test sasldb DIGEST-MD5 is supported''' + # Quit the setUp() connection, restart the server and reconnect + self.s.quit() + self._setup_sasl("DIGEST-MD5") + + reply = self._check_auth("DIGEST-MD5") + self.assertTrue('PLAIN' not in reply, reply) + + # TODO: Perform end-to-end authentication test (need alternative to smtplib) + #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap') + #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap') + #self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password) + + def test_20_sasldb_login(self): + '''Test sasldb LOGIN is supported''' + # Quit the setUp() connection, restart the server and reconnect + self.s.quit() + self._setup_sasl("LOGIN", force_sasldb=True) + + reply = self._check_auth("LOGIN") + self.assertTrue('PLAIN' not in reply, reply) + + # TODO: Perform end-to-end authentication test (need alternative to smtplib) + #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap') + #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap') + #self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password) + + def test_20_sasldb_plain(self): + '''Test sasldb PLAIN''' + # Quit the setUp() connection, restart the server and reconnect + self.s.quit() + self._setup_sasl("PLAIN", force_sasldb=True) + + reply = self._check_auth("PLAIN") + + # Verify rejected bad password and user + self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap') + self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap') + # TODO: Perform end-to-end authentication test (need alternative to smtplib) + self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password) + + def test_21_security_CVE_2011_1720(self): + '''CVE-2011-1720 fixed''' + # http://www.postfix.org/CVE-2011-1720.html + + # setup sasl and connect + self.s.quit() + self._setup_sasl("CRAM-MD5", "DIGEST-MD5") + + # verify sasl support + rc, report = testlib.cmd(['postconf', 'smtpd_sasl_auth_enable']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, result + report) + self.assertTrue('yes' in report, "Could not find 'yes' in report:\n%s" % report) + + if self.lsb_release['Release'] > 6.06: + rc, report = testlib.cmd(['postconf', 'smtpd_sasl_type']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, result + report) + self.assertTrue('cyrus' in report, "Could not find 'cyrus' in report:\n%s" % report) + + # ehlo + reply = self._check_auth("CRAM-MD5") + self.assertTrue('DIGEST-MD5' in reply, reply) + + code, msg = self.s.docmd("AUTH", "CRAM-MD5") + reply = '%d %s' % (code, msg) + self.assertEqual(code, 334, reply) + + code, msg = self.s.docmd("*") + reply = '%d %s' % (code, msg) + self.assertEqual(code, 501, reply) + + error = False + try: + code, msg = self.s.docmd("AUTH", "DIGEST-MD5") + except: + error = True + self.assertFalse(error, "server disconnected") + reply = '%d %s' % (code, msg) + self.assertEqual(code, 334, reply) + + def test_99_restore(self): + '''Restore configuration''' + self._tearDown() + +if __name__ == '__main__': + unittest.main() diff --git a/debian/tests/testlib.py b/debian/tests/testlib.py new file mode 100644 index 0000000..3c4026d --- /dev/null +++ b/debian/tests/testlib.py @@ -0,0 +1,1151 @@ +# +# testlib.py quality assurance test script +# Copyright (C) 2008-2011 Canonical Ltd. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Library General Public +# License as published by the Free Software Foundation; either +# version 2 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Library General Public License for more details. +# +# You should have received a copy of the GNU Library General Public +# License along with this program. If not, see +# <http://www.gnu.org/licenses/>. +# + +'''Common classes and functions for package tests.''' + +import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob +import sys, socket, gzip +from stat import * + +import warnings +warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning) +try: + import apt_pkg + apt_pkg.InitSystem(); +except: + # On non-Debian system, fall back to simple comparison without debianisms + class apt_pkg(object): + def VersionCompare(one, two): + list_one = one.split('.') + list_two = two.split('.') + while len(list_one)>0 and len(list_two)>0: + if list_one[0] > list_two[0]: + return 1 + if list_one[0] < list_two[0]: + return -1 + list_one.pop(0) + list_two.pop(0) + return 0 + +bogus_nxdomain = "208.69.32.132" + +# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html +# This is needed so that the subprocesses that produce endless output +# actually quit when the reader goes away. +import signal +def subprocess_setup(): + # Python installs a SIGPIPE handler by default. This is usually not what + # non-Python subprocesses expect. + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + +class TimedOutException(Exception): + def __init__(self, value = "Timed Out"): + self.value = value + def __str__(self): + return repr(self.value) + +def _restore_backup(path): + pathbackup = path + '.autotest' + if os.path.exists(pathbackup): + shutil.move(pathbackup, path) + +def _save_backup(path): + pathbackup = path + '.autotest' + if os.path.exists(path) and not os.path.exists(pathbackup): + shutil.copy2(path, pathbackup) + # copy2 does not copy ownership, so do it here. + # Reference: http://docs.python.org/library/shutil.html + a = os.stat(path) + os.chown(pathbackup, a[4], a[5]) + +def config_copydir(path): + if os.path.exists(path) and not os.path.isdir(path): + raise OSError("'%s' is not a directory" % (path)) + _restore_backup(path) + + pathbackup = path + '.autotest' + if os.path.exists(path): + shutil.copytree(path, pathbackup, symlinks=True) + +def config_replace(path,contents,append=False): + '''Replace (or append) to a config file''' + _restore_backup(path) + if os.path.exists(path): + _save_backup(path) + if append: + with open(path) as fh: + contents = fh.read() + contents + with open(path, 'w') as fh: + fh.write(contents) + + +def config_comment(path, field): + _save_backup(path) + contents = "" + with open(path) as fh: + for line in fh: + if re.search("^\s*%s\s*=" % (field), line): + line = "#" + line + contents += line + + with open(path + '.new', 'w') as new_fh: + new_fh.write(contents) + os.rename(path + '.new', path) + + +def config_set(path, field, value, spaces=True): + _save_backup(path) + contents = "" + if spaces==True: + setting = '%s = %s\n' % (field, value) + else: + setting = '%s=%s\n' % (field, value) + found = False + with open(path) as fh: + for line in fh: + if re.search("^\s*%s\s*=" % (field), line): + found = True + line = setting + contents += line + if not found: + contents += setting + + with open(path + '.new', 'w') as new_config: + new_config.write(contents) + os.rename(path + '.new', path) + + +def config_patch(path, patch, depth=1): + '''Patch a config file''' + _restore_backup(path) + _save_backup(path) + + handle, name = mkstemp_fill(patch) + rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE) + os.unlink(name) + if rc != 0: + raise Exception("Patch failed") + +def config_restore(path): + '''Rename a replaced config file back to its initial state''' + _restore_backup(path) + +def timeout(secs, f, *args): + def handler(signum, frame): + raise TimedOutException() + + old = signal.signal(signal.SIGALRM, handler) + result = None + signal.alarm(secs) + try: + result = f(*args) + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, old) + + return result + +def require_nonroot(): + if os.geteuid() == 0: + print("This series of tests should be run as a regular user with sudo access, not as root.", file=sys.stderr) + sys.exit(1) + + +def require_root(): + if os.geteuid() != 0: + print("This series of tests should be run with root privileges (e.g. via sudo).", file=sys.stderr) + sys.exit(1) + + +def require_sudo(): + if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None: + print("This series of tests must be run under sudo.", file=sys.stderr) + sys.exit(1) + if os.environ['SUDO_USER'] == 'root': + print('Please run this test using sudo from a regular user. (You ran sudo from root.)', file=sys.stderr) + sys.exit(1) + +def random_string(length,lower=False): + '''Return a random string, consisting of ASCII letters, with given + length.''' + + s = '' + selection = string.ascii_letters + if lower: + selection = string.ascii_lowercase + maxind = len(selection)-1 + for l in range(length): + s += selection[random.randint(0, maxind)] + return s + +def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None): + '''As tempfile.mkstemp does, return a (file, name) pair, but with + prefilled contents.''' + + handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir) + os.close(handle) + handle = open(name,"w+") + handle.write(contents) + handle.flush() + handle.seek(0) + + return handle, name + +def create_fill(path, contents, mode=0o644): + '''Safely create a page''' + # make the temp file in the same dir as the destination file so we + # don't get invalid cross-device link errors when we rename + handle, name = mkstemp_fill(contents, dir=os.path.dirname(path)) + handle.close() + os.rename(name, path) + os.chmod(path, mode) + +def login_exists(login): + '''Checks whether the given login exists on the system.''' + + try: + pwd.getpwnam(login) + return True + except KeyError: + return False + +def group_exists(group): + '''Checks whether the given login exists on the system.''' + + try: + grp.getgrnam(group) + return True + except KeyError: + return False + +def recursive_rm(dirPath, contents_only=False): + '''recursively remove directory''' + names = os.listdir(dirPath) + for name in names: + path = os.path.join(dirPath, name) + if os.path.islink(path) or not os.path.isdir(path): + os.unlink(path) + else: + recursive_rm(path) + if contents_only == False: + os.rmdir(dirPath) + +def check_pidfile(exe, pidfile): + '''Checks if pid in pidfile is running''' + if not os.path.exists(pidfile): + return False + + # get the pid + try: + with open(pidfile, 'r') as fd: + pid = fd.readline().rstrip('\n') + except: + return False + + return check_pid(exe, pid) + + +def check_pid(exe, pid): + '''Checks if pid is running''' + cmdline = "/proc/%s/cmdline" % (str(pid)) + if not os.path.exists(cmdline): + return False + + # get the command line + try: + with open(cmdline, 'r') as fd: + tmp = fd.readline().split('\0') + except: + return False + + # this allows us to match absolute paths or just the executable name + if re.match('^' + exe + '$', tmp[0]) or \ + re.match('.*/' + exe + '$', tmp[0]) or \ + re.match('^' + exe + ': ', tmp[0]) or \ + re.match('^\(' + exe + '\)', tmp[0]): + return True + + return False + +def check_port(port, proto, ver=4): + '''Check if something is listening on the specified port. + WARNING: for some reason this does not work with a bind mounted /proc + ''' + assert (port >= 1) + assert (port <= 65535) + assert (proto.lower() == "tcp" or proto.lower() == "udp") + assert (ver == 4 or ver == 6) + + fn = "/proc/net/%s" % (proto) + if ver == 6: + fn += str(ver) + + rc, report = cmd(['cat', fn]) + assert (rc == 0) + + hport = "%0.4x" % port + + if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()): + return True + return False + +def get_arch(): + '''Get the current architecture''' + rc, report = cmd(['uname', '-m']) + assert (rc == 0) + return report.strip() + +def get_memory(): + '''Gets total ram and swap''' + meminfo = "/proc/meminfo" + memtotal = 0 + swaptotal = 0 + if not os.path.exists(meminfo): + return (False, False) + + try: + fd = open(meminfo, 'r') + for line in fd.readlines(): + splitline = line.split() + if splitline[0] == 'MemTotal:': + memtotal = int(splitline[1]) + elif splitline[0] == 'SwapTotal:': + swaptotal = int(splitline[1]) + fd.close() + except: + return (False, False) + + return (memtotal,swaptotal) + +def is_running_in_vm(): + '''Check if running under a VM''' + # add other virtualization environments here + for search in ['QEMU Virtual CPU']: + rc, report = cmd_pipe(['dmesg'], ['grep', search]) + if rc == 0: + return True + return False + +def ubuntu_release(): + '''Get the Ubuntu release''' + f = "/etc/lsb-release" + try: + size = os.stat(f)[ST_SIZE] + except: + return "UNKNOWN" + + if size > 1024*1024: + raise IOError('Could not open "%s" (too big)' % f) + + with open("/etc/lsb-release", 'r') as fh: + lines = fh.readlines() + + pat = re.compile(r'DISTRIB_CODENAME') + for line in lines: + if pat.search(line): + return line.split('=')[1].rstrip('\n').rstrip('\r') + + return "UNKNOWN" + +def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None): + '''Try to execute given command (array) and return its stdout, or return + a textual error if it failed.''' + + try: + sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup, universal_newlines=True) + except OSError as e: + return [127, str(e)] + + out, outerr = sp.communicate(input) + # Handle redirection of stdout + if out == None: + out = '' + # Handle redirection of stderr + if outerr == None: + outerr = '' + return [sp.returncode,out+outerr] + +def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None): + '''Try to pipe command1 into command2.''' + try: + sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True) + sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True) + except OSError as e: + return [127, str(e)] + + out = sp2.communicate(input)[0] + return [sp2.returncode,out] + +def cwd_has_enough_space(cdir, total_bytes): + '''Determine if the partition of the current working directory has 'bytes' + free.''' + rc, df_output = cmd(['df']) + result = 'Got exit code %d, expected %d\n' % (rc, 0) + if rc != 0: + return False + + kb = total_bytes / 1024 + + mounts = dict() + for line in df_output.splitlines(): + if '/' not in line: + continue + tmp = line.split() + mounts[tmp[5]] = int(tmp[3]) + + cdir = os.getcwd() + while cdir != '/': + if not mounts.has_key(cdir): + cdir = os.path.dirname(cdir) + continue + if kb < mounts[cdir]: + return True + else: + return False + + if kb < mounts['/']: + return True + + return False + +def get_md5(filename): + '''Gets the md5sum of the file specified''' + + (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename]) + expected = 0 + assert (expected == rc) + + return report.split(' ')[0] + +def dpkg_compare_installed_version(pkg, check, version): + '''Gets the version for the installed package, and compares it to the + specified version. + ''' + (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg]) + assert (rc == 0) + assert ("Status: install ok installed" in report) + installed_version = "" + for line in report.splitlines(): + if line.startswith("Version: "): + installed_version = line.split()[1] + + assert (installed_version != "") + + (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version]) + assert (rc == 0 or rc == 1) + if rc == 0: + return True + return False + +def prepare_source(source, builder, cached_src, build_src, patch_system): + '''Download and unpack source package, installing necessary build depends, + adjusting the permissions for the 'builder' user, and returning the + directory of the unpacked source. Patch system can be one of: + - cdbs + - dpatch + - quilt + - quiltv3 + - None (not the string) + + This is normally used like this: + + def setUp(self): + ... + self.topdir = os.getcwd() + self.cached_src = os.path.join(os.getcwd(), "source") + self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp') + self.builder = testlib.TestUser() + testlib.cmd(['chgrp', self.builder.login, self.tmpdir]) + os.chmod(self.tmpdir, 0o775) + + def tearDown(self): + ... + self.builder = None + self.topdir = os.getcwd() + if os.path.exists(self.tmpdir): + testlib.recursive_rm(self.tmpdir) + + def test_suite_build(self): + ... + build_dir = testlib.prepare_source('foo', \ + self.builder, \ + self.cached_src, \ + os.path.join(self.tmpdir, \ + os.path.basename(self.cached_src)), + "quilt") + os.chdir(build_dir) + + # Example for typical build, adjust as necessary + print("") + print(" make clean") + rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean']) + + print(" configure") + rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug']) + + print(" make (will take a while)") + rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make']) + + print(" make check (will take a while)",) + rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, result + report) + + def test_suite_cleanup(self): + ... + if os.path.exists(self.cached_src): + testlib.recursive_rm(self.cached_src) + + It is up to the caller to clean up cached_src and build_src (as in the + above example, often the build_src is in a tmpdir that is cleaned in + tearDown() and the cached_src is cleaned in a one time clean-up + operation (eg 'test_suite_cleanup()) which must be run after the build + suite test (obviously). + ''' + + # Make sure we have a clean slate + assert (os.path.exists(os.path.dirname(build_src))) + assert (not os.path.exists(build_src)) + + cdir = os.getcwd() + if os.path.exists(cached_src): + shutil.copytree(cached_src, build_src) + os.chdir(build_src) + else: + # Only install the build dependencies on the initial setup + rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source]) + assert (rc == 0) + + os.makedirs(build_src) + os.chdir(build_src) + + # These are always needed + pkgs = ['build-essential', 'dpkg-dev', 'fakeroot'] + rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs) + assert (rc == 0) + + rc, report = cmd(['apt-get','source',source]) + assert (rc == 0) + shutil.copytree(build_src, cached_src) + + unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0]) + + # Now apply the patches. Do it here so that we don't mess up our cached + # sources. + os.chdir(unpacked_dir) + assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None]) + if patch_system != None and patch_system != "quiltv3": + if patch_system == "quilt": + os.environ.setdefault('QUILT_PATCHES','debian/patches') + rc, report = cmd(['quilt', 'push', '-a']) + assert (rc == 0) + elif patch_system == "cdbs": + rc, report = cmd(['./debian/rules', 'apply-patches']) + assert (rc == 0) + elif patch_system == "dpatch": + rc, report = cmd(['dpatch', 'apply-all']) + assert (rc == 0) + + cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src]) + os.chdir(cdir) + + return unpacked_dir + +def _aa_status(): + '''Get aa-status output''' + exe = "/usr/sbin/aa-status" + assert (os.path.exists(exe)) + if os.geteuid() == 0: + return cmd([exe]) + return cmd(['sudo', exe]) + +def is_apparmor_loaded(path): + '''Check if profile is loaded''' + rc, report = _aa_status() + if rc != 0: + return False + + for line in report.splitlines(): + if line.endswith(path): + return True + return False + +def is_apparmor_confined(path): + '''Check if application is confined''' + rc, report = _aa_status() + if rc != 0: + return False + + for line in report.splitlines(): + if re.search('%s \(' % path, line): + return True + return False + +def check_apparmor(path, first_ubuntu_release, is_running=True): + '''Check if path is loaded and confined for everything higher than the + first Ubuntu release specified. + + Usage: + rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True) + if rc < 0: + return self._skipped(report) + + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, result + report) + ''' + global manager + rc = -1 + + if manager.lsb_release["Release"] < first_ubuntu_release: + return (rc, "Skipped apparmor check") + + if not os.path.exists('/sbin/apparmor_parser'): + return (rc, "Skipped (couldn't find apparmor_parser)") + + rc = 0 + msg = "" + if not is_apparmor_loaded(path): + rc = 1 + msg = "Profile not loaded for '%s'" % path + + # this check only makes sense it the 'path' is currently executing + if is_running and rc == 0 and not is_apparmor_confined(path): + rc = 1 + msg = "'%s' is not running in enforce mode" % path + + return (rc, msg) + +def get_gcc_version(gcc, full=True): + gcc_version = 'none' + if not gcc.startswith('/'): + gcc = '/usr/bin/%s' % (gcc) + if os.path.exists(gcc): + gcc_version = 'unknown' + lines = cmd([gcc,'-v'])[1].strip().splitlines() + version_lines = [x for x in lines if x.startswith('gcc version')] + if len(version_lines) == 1: + gcc_version = " ".join(version_lines[0].split()[2:]) + if not full: + return gcc_version.split()[0] + return gcc_version + +def is_kdeinit_running(): + '''Test if kdeinit is running''' + # applications that use kdeinit will spawn it if it isn't running in the + # test. This is a problem because it does not exit. This is a helper to + # check for it. + rc, report = cmd(['ps', 'x']) + if 'kdeinit4 Running' not in report: + print("kdeinit not running (you may start/stop any KDE application then run this script again)", file=sys.stderr) + return False + return True + +def get_pkgconfig_flags(libs=[]): + '''Find pkg-config flags for libraries''' + assert (len(libs) > 0) + rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs) + expected = 0 + if rc != expected: + print('Got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr) + assert(rc == expected) + return pkg_config.split() + +class TestDaemon: + '''Helper class to manage daemons consistently''' + def __init__(self, init): + '''Setup daemon attributes''' + self.initscript = init + + def start(self): + '''Start daemon''' + rc, report = cmd([self.initscript, 'start']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + time.sleep(2) + if expected != rc: + return (False, result + report) + + if "fail" in report: + return (False, "Found 'fail' in report\n" + report) + + return (True, "") + + def stop(self): + '''Stop daemon''' + rc, report = cmd([self.initscript, 'stop']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + if expected != rc: + return (False, result + report) + + if "fail" in report: + return (False, "Found 'fail' in report\n" + report) + + return (True, "") + + def reload(self): + '''Reload daemon''' + rc, report = cmd([self.initscript, 'force-reload']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + if expected != rc: + return (False, result + report) + + if "fail" in report: + return (False, "Found 'fail' in report\n" + report) + + return (True, "") + + def restart(self): + '''Restart daemon''' + (res, str) = self.stop() + if not res: + return (res, str) + + (res, str) = self.start() + if not res: + return (res, str) + + return (True, "") + + def status(self): + '''Check daemon status''' + rc, report = cmd([self.initscript, 'status']) + expected = 0 + result = 'Got exit code %d, expected %d\n' % (rc, expected) + if expected != rc: + return (False, result + report) + + if "fail" in report: + return (False, "Found 'fail' in report\n" + report) + + return (True, "") + +class TestlibManager(object): + '''Singleton class used to set up per-test-run information''' + def __init__(self): + # Set glibc aborts to dump to stderr instead of the tty so test output + # is more sane. + os.environ.setdefault('LIBC_FATAL_STDERR_','1') + + # check verbosity + self.verbosity = False + if (len(sys.argv) > 1 and '-v' in sys.argv[1:]): + self.verbosity = True + + # Load LSB release file + self.lsb_release = dict() + if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'): + raise OSError("Please install 'lsb-release'") + for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True).communicate()[0].splitlines(): + field, value = line.split(':',1) + value=value.strip() + field=field.strip() + # Convert numerics + try: + value = float(value) + except: + pass + self.lsb_release.setdefault(field,value) + + # FIXME: hack OEM releases into known-Ubuntu versions + if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)": + if self.lsb_release['Release'] == 1.0: + self.lsb_release['Distributor ID'] = "Ubuntu" + self.lsb_release['Release'] = 8.04 + else: + raise OSError("Unknown version of HP MIE") + + # FIXME: hack to assume a most-recent release if we're not + # running under Ubuntu. + if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]: + self.lsb_release['Release'] = 10000 + # Adjust Linaro release to pretend to be Ubuntu + if self.lsb_release['Distributor ID'] in ["Linaro"]: + self.lsb_release['Distributor ID'] = "Ubuntu" + self.lsb_release['Release'] -= 0.01 + + # Load arch + if not os.path.exists('/usr/bin/dpkg'): + machine = cmd(['uname','-m'])[1].strip() + if machine.endswith('86'): + self.dpkg_arch = 'i386' + elif machine.endswith('_64'): + self.dpkg_arch = 'amd64' + elif machine.startswith('arm'): + self.dpkg_arch = 'armel' + else: + raise ValueError("Unknown machine type '%s'" % (machine)) + else: + self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip() + + # Find kernel version + self.kernel_is_ubuntu = False + self.kernel_version_signature = None + self.kernel_version = cmd(["uname","-r"])[1].strip() + versig = '/proc/version_signature' + if os.path.exists(versig): + self.kernel_is_ubuntu = True + self.kernel_version_signature = open(versig).read().strip() + self.kernel_version_ubuntu = self.kernel_version + elif os.path.exists('/usr/bin/dpkg'): + # this can easily be inaccurate but is only an issue for Dapper + rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)]) + if rc == 0: + self.kernel_version_signature = out.strip().split('\n').pop().split()[2] + self.kernel_version_ubuntu = self.kernel_version_signature + if self.kernel_version_signature == None: + # Attempt to fall back to something for non-Debian-based + self.kernel_version_signature = self.kernel_version + self.kernel_version_ubuntu = self.kernel_version + # Build ubuntu version without hardware suffix + try: + self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)]) + except: + pass + + # Find gcc version + self.gcc_version = get_gcc_version('gcc') + + # Find libc + self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0] + + # Report self + if self.verbosity: + kernel = self.kernel_version_ubuntu + if kernel != self.kernel_version_signature: + kernel += " (%s)" % (self.kernel_version_signature) + print("Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( + sys.argv[0], + self.lsb_release['Distributor ID'], + self.lsb_release['Release'], + kernel, + self.dpkg_arch, + os.geteuid(), os.getuid(), + os.environ.get('SUDO_USER', '')), file=sys.stdout) + sys.stdout.flush() + + # Additional heuristics + #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']: + # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000)) + # sys.stdout.flush() + # time.sleep(0.5) + # sys.stdout.write("destroyed\n") + # time.sleep(0.5) + + def hello(self, msg): + print("Hello from %s" % (msg), file=sys.stderr) +# The central instance +manager = TestlibManager() + +class TestlibCase(unittest.TestCase): + def __init__(self, *args): + '''This is called for each TestCase test instance, which isn't much better + than SetUp.''' + + unittest.TestCase.__init__(self, *args) + + # Attach to and duplicate dicts from manager singleton + self.manager = manager + #self.manager.hello(repr(self) + repr(*args)) + self.my_verbosity = self.manager.verbosity + self.lsb_release = self.manager.lsb_release + self.dpkg_arch = self.manager.dpkg_arch + self.kernel_version = self.manager.kernel_version + self.kernel_version_signature = self.manager.kernel_version_signature + self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu + self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu + self.gcc_version = self.manager.gcc_version + self.path_libc = self.manager.path_libc + + def version_compare(self, one, two): + return apt_pkg.VersionCompare(one,two) + + def assertFileType(self, filename, filetype): + '''Checks the file type of the file specified''' + + (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename]) + out = out.strip() + expected = 0 + # Absolutely no idea why this happens on Hardy + if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0: + rc = 0 + result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report) + self.assertEqual(expected, rc, result) + + filetype = '^%s$' % (filetype) + result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype) + self.assertNotEquals(None, re.search(filetype, out), result) + + def yank_commonname_from_cert(self, certfile): + '''Extract the commonName from a given PEM''' + rc, out = cmd(['openssl','asn1parse','-in',certfile]) + if rc == 0: + ready = False + for line in out.splitlines(): + if ready: + return line.split(':')[-1] + if ':commonName' in line: + ready = True + return socket.getfqdn() + + def announce(self, text): + if self.my_verbosity: + print("(%s) " % (text), file=sys.stderr, end='') + sys.stdout.flush() + + def make_clean(self): + rc, output = self.shell_cmd(['make','clean']) + self.assertEqual(rc, 0, output) + + def get_makefile_compiler(self): + # Find potential compiler name + compiler = 'gcc' + if os.path.exists('Makefile'): + for line in open('Makefile'): + if line.startswith('CC') and '=' in line: + items = [x.strip() for x in line.split('=')] + if items[0] == 'CC': + compiler = items[1] + break + return compiler + + def make_target(self, target, expected=0): + '''Compile a target and report output''' + + compiler = self.get_makefile_compiler() + rc, output = self.shell_cmd(['make',target]) + self.assertEqual(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output) + self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output) + return output + + # call as return testlib.skipped() + def _skipped(self, reason=""): + '''Provide a visible way to indicate that a test was skipped''' + if reason != "": + reason = ': %s' % (reason) + self.announce("skipped%s" % (reason)) + return False + + def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT): + argstr = "'" + "', '".join(args).strip() + "'" + rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr) + report = 'Command: ' + argstr + '\nOutput:\n' + out + return rc, report, out + + def shell_cmd(self, args, stdin=None): + return cmd(args,stdin=stdin) + + def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""): + '''Test a shell command matches a specific exit code''' + rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) + result = 'Got exit code %d, expected %d\n' % (rc, expected) + self.assertEqual(expected, rc, msg + result + report) + + def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""): + '''Test a shell command doesn't match a specific exit code''' + rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) + result = 'Got (unwanted) exit code %d\n' % rc + self.assertNotEquals(unwanted, rc, msg + result + report) + + def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False): + '''Test a shell command contains a specific output''' + rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) + result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text) + if not invert: + self.assertTrue(text in out, msg + result + report) + else: + self.assertFalse(text in out, msg + result + report) + + def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None): + '''Test a shell command matches a specific output''' + rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) + result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args)) + if not invert: + self.assertEqual(text, out, msg + result + report) + else: + self.assertNotEquals(text, out, msg + result + report) + if expected != None: + result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args)) + self.assertEqual(rc, expected, msg + result + report) + + def _word_find(self, report, content, invert=False): + '''Check for a specific string''' + if invert: + warning = 'Found "%s"\n' % content + self.assertTrue(content not in report, warning + report) + else: + warning = 'Could not find "%s"\n' % content + self.assertTrue(content in report, warning + report) + + def _test_sysctl_value(self, path, expected, msg=None, exists=True): + sysctl = '/proc/sys/%s' % (path) + self.assertEqual(exists, os.path.exists(sysctl), sysctl) + value = None + if exists: + with open(sysctl) as sysctl_fd: + value = int(sysctl_fd.read()) + report = "%s is not %d: %d" % (sysctl, expected, value) + if msg: + report += " (%s)" % (msg) + self.assertEqual(value, expected, report) + return value + + def set_sysctl_value(self, path, desired): + sysctl = '/proc/sys/%s' % (path) + self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl)) + with open(sysctl, 'w') as sysctl_fh: + sysctl_fh.write(str(desired)) + self._test_sysctl_value(path, desired) + + def kernel_at_least(self, introduced): + return self.version_compare(self.kernel_version_ubuntu, + introduced) >= 0 + + def kernel_claims_cve_fixed(self, cve): + changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version) + if os.path.exists(changelog): + for line in gzip.open(changelog): + if cve in line and not "revert" in line and not "Revert" in line: + return True + return False + +class TestGroup: + '''Create a temporary test group and remove it again in the dtor.''' + + def __init__(self, group=None, lower=False): + '''Create a new group''' + + self.group = None + if group: + if group_exists(group): + raise ValueError('group name already exists') + else: + while(True): + group = random_string(7,lower=lower) + if not group_exists(group): + break + + assert subprocess.call(['groupadd',group]) == 0 + self.group = group + g = grp.getgrnam(self.group) + self.gid = g[2] + + def __del__(self): + '''Remove the created group.''' + + if self.group: + rc, report = cmd(['groupdel', self.group]) + assert rc == 0 + +class TestUser: + '''Create a temporary test user and remove it again in the dtor.''' + + def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None): + '''Create a new user account with a random password. + + By default, the login name is random, too, but can be explicitly + specified with 'login'. By default, a home directory is created, this + can be suppressed with 'home=False'.''' + + self.login = None + + if os.geteuid() != 0: + raise ValueError("You must be root to run this test") + + if login: + if login_exists(login): + raise ValueError('login name already exists') + else: + while(True): + login = 't' + random_string(7,lower=lower) + if not login_exists(login): + break + + self.salt = random_string(2) + self.password = random_string(8,lower=lower) + self.crypted = crypt.crypt(self.password, self.salt) + + creation = ['useradd', '-p', self.crypted] + if home: + creation += ['-m'] + if group: + creation += ['-G',group] + if uidmin: + creation += ['-K','UID_MIN=%d'%uidmin] + if shell: + creation += ['-s',shell] + creation += [login] + assert subprocess.call(creation) == 0 + # Set GECOS + assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0 + + self.login = login + p = pwd.getpwnam(self.login) + self.uid = p[2] + self.gid = p[3] + self.gecos = p[4] + self.home = p[5] + self.shell = p[6] + + def __del__(self): + '''Remove the created user account.''' + + if self.login: + # sanity check the login name so we don't accidentally wipe too much + if len(self.login)>3 and not '/' in self.login: + subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login]) + rc, report = cmd(['userdel', '-f', self.login]) + assert rc == 0 + + def add_to_group(self, group): + '''Add user to the specified group name''' + rc, report = cmd(['usermod', '-G', group, self.login]) + if rc != 0: + print(report) + assert rc == 0 + +# Timeout handler using alarm() from John P. Speno's Pythonic Avocado +class TimeoutFunctionException(Exception): + """Exception to raise on a timeout""" + pass +class TimeoutFunction: + def __init__(self, function, timeout): + self.timeout = timeout + self.function = function + + def handle_timeout(self, signum, frame): + raise TimeoutFunctionException() + + def __call__(self, *args, **kwargs): + old = signal.signal(signal.SIGALRM, self.handle_timeout) + signal.alarm(self.timeout) + try: + result = self.function(*args, **kwargs) + finally: + signal.signal(signal.SIGALRM, old) + signal.alarm(0) + return result + + +def main(): + print("hi") + unittest.main() |