# # testlib.py quality assurance test script # Copyright (C) 2008-2011 Canonical Ltd. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Library General Public # License as published by the Free Software Foundation; either # version 2 of the License. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Library General Public License for more details. # # You should have received a copy of the GNU Library General Public # License along with this program. If not, see # . # '''Common classes and functions for package tests.''' import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob import sys, socket, gzip from stat import * import warnings warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning) try: import apt_pkg apt_pkg.InitSystem(); except: # On non-Debian system, fall back to simple comparison without debianisms class apt_pkg(object): def VersionCompare(one, two): list_one = one.split('.') list_two = two.split('.') while len(list_one)>0 and len(list_two)>0: if list_one[0] > list_two[0]: return 1 if list_one[0] < list_two[0]: return -1 list_one.pop(0) list_two.pop(0) return 0 bogus_nxdomain = "208.69.32.132" # http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html # This is needed so that the subprocesses that produce endless output # actually quit when the reader goes away. import signal def subprocess_setup(): # Python installs a SIGPIPE handler by default. This is usually not what # non-Python subprocesses expect. signal.signal(signal.SIGPIPE, signal.SIG_DFL) class TimedOutException(Exception): def __init__(self, value = "Timed Out"): self.value = value def __str__(self): return repr(self.value) def _restore_backup(path): pathbackup = path + '.autotest' if os.path.exists(pathbackup): shutil.move(pathbackup, path) def _save_backup(path): pathbackup = path + '.autotest' if os.path.exists(path) and not os.path.exists(pathbackup): shutil.copy2(path, pathbackup) # copy2 does not copy ownership, so do it here. # Reference: http://docs.python.org/library/shutil.html a = os.stat(path) os.chown(pathbackup, a[4], a[5]) def config_copydir(path): if os.path.exists(path) and not os.path.isdir(path): raise OSError("'%s' is not a directory" % (path)) _restore_backup(path) pathbackup = path + '.autotest' if os.path.exists(path): shutil.copytree(path, pathbackup, symlinks=True) def config_replace(path,contents,append=False): '''Replace (or append) to a config file''' _restore_backup(path) if os.path.exists(path): _save_backup(path) if append: with open(path) as fh: contents = fh.read() + contents with open(path, 'w') as fh: fh.write(contents) def config_comment(path, field): _save_backup(path) contents = "" with open(path) as fh: for line in fh: if re.search(r"^\s*%s\s*=" % (field), line): line = "#" + line contents += line with open(path + '.new', 'w') as new_fh: new_fh.write(contents) os.rename(path + '.new', path) def config_set(path, field, value, spaces=True): _save_backup(path) contents = "" if spaces==True: setting = '%s = %s\n' % (field, value) else: setting = '%s=%s\n' % (field, value) found = False with open(path) as fh: for line in fh: if re.search(r"^\s*%s\s*=" % (field), line): found = True line = setting contents += line if not found: contents += setting with open(path + '.new', 'w') as new_config: new_config.write(contents) os.rename(path + '.new', path) def config_patch(path, patch, depth=1): '''Patch a config file''' _restore_backup(path) _save_backup(path) handle, name = mkstemp_fill(patch) rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE) os.unlink(name) if rc != 0: raise Exception("Patch failed") def config_restore(path): '''Rename a replaced config file back to its initial state''' _restore_backup(path) def timeout(secs, f, *args): def handler(signum, frame): raise TimedOutException() old = signal.signal(signal.SIGALRM, handler) result = None signal.alarm(secs) try: result = f(*args) finally: signal.alarm(0) signal.signal(signal.SIGALRM, old) return result def require_nonroot(): if os.geteuid() == 0: print("This series of tests should be run as a regular user with sudo access, not as root.", file=sys.stderr) sys.exit(1) def require_root(): if os.geteuid() != 0: print("This series of tests should be run with root privileges (e.g. via sudo).", file=sys.stderr) sys.exit(1) def require_sudo(): if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None: print("This series of tests must be run under sudo.", file=sys.stderr) sys.exit(1) if os.environ['SUDO_USER'] == 'root': print('Please run this test using sudo from a regular user. (You ran sudo from root.)', file=sys.stderr) sys.exit(1) def random_string(length,lower=False): '''Return a random string, consisting of ASCII letters, with given length.''' s = '' selection = string.ascii_letters if lower: selection = string.ascii_lowercase maxind = len(selection)-1 for l in range(length): s += selection[random.randint(0, maxind)] return s def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None): '''As tempfile.mkstemp does, return a (file, name) pair, but with prefilled contents.''' handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir) os.close(handle) handle = open(name,"w+") handle.write(contents) handle.flush() handle.seek(0) return handle, name def create_fill(path, contents, mode=0o644): '''Safely create a page''' # make the temp file in the same dir as the destination file so we # don't get invalid cross-device link errors when we rename handle, name = mkstemp_fill(contents, dir=os.path.dirname(path)) handle.close() os.rename(name, path) os.chmod(path, mode) def login_exists(login): '''Checks whether the given login exists on the system.''' try: pwd.getpwnam(login) return True except KeyError: return False def group_exists(group): '''Checks whether the given login exists on the system.''' try: grp.getgrnam(group) return True except KeyError: return False def recursive_rm(dirPath, contents_only=False): '''recursively remove directory''' names = os.listdir(dirPath) for name in names: path = os.path.join(dirPath, name) if os.path.islink(path) or not os.path.isdir(path): os.unlink(path) else: recursive_rm(path) if contents_only == False: os.rmdir(dirPath) def check_pidfile(exe, pidfile): '''Checks if pid in pidfile is running''' if not os.path.exists(pidfile): return False # get the pid try: with open(pidfile, 'r') as fd: pid = fd.readline().rstrip('\n') except: return False return check_pid(exe, pid) def check_pid(exe, pid): '''Checks if pid is running''' cmdline = "/proc/%s/cmdline" % (str(pid)) if not os.path.exists(cmdline): return False # get the command line try: with open(cmdline, 'r') as fd: tmp = fd.readline().split('\0') except: return False # this allows us to match absolute paths or just the executable name if re.match('^' + exe + '$', tmp[0]) or \ re.match('.*/' + exe + '$', tmp[0]) or \ re.match('^' + exe + ': ', tmp[0]) or \ re.match(r'^\(' + exe + r'\)', tmp[0]): return True return False def check_port(port, proto, ver=4): '''Check if something is listening on the specified port. WARNING: for some reason this does not work with a bind mounted /proc ''' assert (port >= 1) assert (port <= 65535) assert (proto.lower() == "tcp" or proto.lower() == "udp") assert (ver == 4 or ver == 6) fn = "/proc/net/%s" % (proto) if ver == 6: fn += str(ver) rc, report = cmd(['cat', fn]) assert (rc == 0) hport = "%0.4x" % port if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()): return True return False def get_arch(): '''Get the current architecture''' rc, report = cmd(['uname', '-m']) assert (rc == 0) return report.strip() def get_memory(): '''Gets total ram and swap''' meminfo = "/proc/meminfo" memtotal = 0 swaptotal = 0 if not os.path.exists(meminfo): return (False, False) try: fd = open(meminfo, 'r') for line in fd.readlines(): splitline = line.split() if splitline[0] == 'MemTotal:': memtotal = int(splitline[1]) elif splitline[0] == 'SwapTotal:': swaptotal = int(splitline[1]) fd.close() except: return (False, False) return (memtotal,swaptotal) def is_running_in_vm(): '''Check if running under a VM''' # add other virtualization environments here for search in ['QEMU Virtual CPU']: rc, report = cmd_pipe(['dmesg'], ['grep', search]) if rc == 0: return True return False def ubuntu_release(): '''Get the Ubuntu release''' f = "/etc/lsb-release" try: size = os.stat(f)[ST_SIZE] except: return "UNKNOWN" if size > 1024*1024: raise IOError('Could not open "%s" (too big)' % f) with open("/etc/lsb-release", 'r') as fh: lines = fh.readlines() pat = re.compile(r'DISTRIB_CODENAME') for line in lines: if pat.search(line): return line.split('=')[1].rstrip('\n').rstrip('\r') return "UNKNOWN" def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None): '''Try to execute given command (array) and return its stdout, or return a textual error if it failed.''' try: sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup, universal_newlines=True) except OSError as e: return [127, str(e)] out, outerr = sp.communicate(input) # Handle redirection of stdout if out == None: out = '' # Handle redirection of stderr if outerr == None: outerr = '' return [sp.returncode,out+outerr] def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None): '''Try to pipe command1 into command2.''' try: sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True) sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True) except OSError as e: return [127, str(e)] out = sp2.communicate(input)[0] return [sp2.returncode,out] def cwd_has_enough_space(cdir, total_bytes): '''Determine if the partition of the current working directory has 'bytes' free.''' rc, df_output = cmd(['df']) result = 'Got exit code %d, expected %d\n' % (rc, 0) if rc != 0: return False kb = total_bytes / 1024 mounts = dict() for line in df_output.splitlines(): if '/' not in line: continue tmp = line.split() mounts[tmp[5]] = int(tmp[3]) cdir = os.getcwd() while cdir != '/': if not mounts.has_key(cdir): cdir = os.path.dirname(cdir) continue if kb < mounts[cdir]: return True else: return False if kb < mounts['/']: return True return False def get_md5(filename): '''Gets the md5sum of the file specified''' (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename]) expected = 0 assert (expected == rc) return report.split(' ')[0] def dpkg_compare_installed_version(pkg, check, version): '''Gets the version for the installed package, and compares it to the specified version. ''' (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg]) assert (rc == 0) assert ("Status: install ok installed" in report) installed_version = "" for line in report.splitlines(): if line.startswith("Version: "): installed_version = line.split()[1] assert (installed_version != "") (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version]) assert (rc == 0 or rc == 1) if rc == 0: return True return False def prepare_source(source, builder, cached_src, build_src, patch_system): '''Download and unpack source package, installing necessary build depends, adjusting the permissions for the 'builder' user, and returning the directory of the unpacked source. Patch system can be one of: - cdbs - dpatch - quilt - quiltv3 - None (not the string) This is normally used like this: def setUp(self): ... self.topdir = os.getcwd() self.cached_src = os.path.join(os.getcwd(), "source") self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp') self.builder = testlib.TestUser() testlib.cmd(['chgrp', self.builder.login, self.tmpdir]) os.chmod(self.tmpdir, 0o775) def tearDown(self): ... self.builder = None self.topdir = os.getcwd() if os.path.exists(self.tmpdir): testlib.recursive_rm(self.tmpdir) def test_suite_build(self): ... build_dir = testlib.prepare_source('foo', \ self.builder, \ self.cached_src, \ os.path.join(self.tmpdir, \ os.path.basename(self.cached_src)), "quilt") os.chdir(build_dir) # Example for typical build, adjust as necessary print("") print(" make clean") rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean']) print(" configure") rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug']) print(" make (will take a while)") rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make']) print(" make check (will take a while)",) rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check']) expected = 0 result = 'Got exit code %d, expected %d\n' % (rc, expected) self.assertEqual(expected, rc, result + report) def test_suite_cleanup(self): ... if os.path.exists(self.cached_src): testlib.recursive_rm(self.cached_src) It is up to the caller to clean up cached_src and build_src (as in the above example, often the build_src is in a tmpdir that is cleaned in tearDown() and the cached_src is cleaned in a one time clean-up operation (eg 'test_suite_cleanup()) which must be run after the build suite test (obviously). ''' # Make sure we have a clean slate assert (os.path.exists(os.path.dirname(build_src))) assert (not os.path.exists(build_src)) cdir = os.getcwd() if os.path.exists(cached_src): shutil.copytree(cached_src, build_src) os.chdir(build_src) else: # Only install the build dependencies on the initial setup rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source]) assert (rc == 0) os.makedirs(build_src) os.chdir(build_src) # These are always needed pkgs = ['build-essential', 'dpkg-dev', 'fakeroot'] rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs) assert (rc == 0) rc, report = cmd(['apt-get','source',source]) assert (rc == 0) shutil.copytree(build_src, cached_src) unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0]) # Now apply the patches. Do it here so that we don't mess up our cached # sources. os.chdir(unpacked_dir) assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None]) if patch_system != None and patch_system != "quiltv3": if patch_system == "quilt": os.environ.setdefault('QUILT_PATCHES','debian/patches') rc, report = cmd(['quilt', 'push', '-a']) assert (rc == 0) elif patch_system == "cdbs": rc, report = cmd(['./debian/rules', 'apply-patches']) assert (rc == 0) elif patch_system == "dpatch": rc, report = cmd(['dpatch', 'apply-all']) assert (rc == 0) cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src]) os.chdir(cdir) return unpacked_dir def _aa_status(): '''Get aa-status output''' exe = "/usr/sbin/aa-status" assert (os.path.exists(exe)) if os.geteuid() == 0: return cmd([exe]) return cmd(['sudo', exe]) def is_apparmor_loaded(path): '''Check if profile is loaded''' rc, report = _aa_status() if rc != 0: return False for line in report.splitlines(): if line.endswith(path): return True return False def is_apparmor_confined(path): '''Check if application is confined''' rc, report = _aa_status() if rc != 0: return False for line in report.splitlines(): if re.search(r'%s \(' % path, line): return True return False def check_apparmor(path, first_ubuntu_release, is_running=True): '''Check if path is loaded and confined for everything higher than the first Ubuntu release specified. Usage: rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True) if rc < 0: return self._skipped(report) expected = 0 result = 'Got exit code %d, expected %d\n' % (rc, expected) self.assertEqual(expected, rc, result + report) ''' global manager rc = -1 if manager.lsb_release["Release"] < first_ubuntu_release: return (rc, "Skipped apparmor check") if not os.path.exists('/sbin/apparmor_parser'): return (rc, "Skipped (couldn't find apparmor_parser)") rc = 0 msg = "" if not is_apparmor_loaded(path): rc = 1 msg = "Profile not loaded for '%s'" % path # this check only makes sense it the 'path' is currently executing if is_running and rc == 0 and not is_apparmor_confined(path): rc = 1 msg = "'%s' is not running in enforce mode" % path return (rc, msg) def get_gcc_version(gcc, full=True): gcc_version = 'none' if not gcc.startswith('/'): gcc = '/usr/bin/%s' % (gcc) if os.path.exists(gcc): gcc_version = 'unknown' lines = cmd([gcc,'-v'])[1].strip().splitlines() version_lines = [x for x in lines if x.startswith('gcc version')] if len(version_lines) == 1: gcc_version = " ".join(version_lines[0].split()[2:]) if not full: return gcc_version.split()[0] return gcc_version def is_kdeinit_running(): '''Test if kdeinit is running''' # applications that use kdeinit will spawn it if it isn't running in the # test. This is a problem because it does not exit. This is a helper to # check for it. rc, report = cmd(['ps', 'x']) if 'kdeinit4 Running' not in report: print("kdeinit not running (you may start/stop any KDE application then run this script again)", file=sys.stderr) return False return True def get_pkgconfig_flags(libs=[]): '''Find pkg-config flags for libraries''' assert (len(libs) > 0) rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs) expected = 0 if rc != expected: print('Got exit code %d, expected %d\n' % (rc, expected), file=sys.stderr) assert(rc == expected) return pkg_config.split() class TestDaemon: '''Helper class to manage daemons consistently''' def __init__(self, init): '''Setup daemon attributes''' self.initscript = init def start(self): '''Start daemon''' rc, report = cmd([self.initscript, 'start']) expected = 0 result = 'Got exit code %d, expected %d\n' % (rc, expected) time.sleep(2) if expected != rc: return (False, result + report) if "fail" in report: return (False, "Found 'fail' in report\n" + report) return (True, "") def stop(self): '''Stop daemon''' rc, report = cmd([self.initscript, 'stop']) expected = 0 result = 'Got exit code %d, expected %d\n' % (rc, expected) if expected != rc: return (False, result + report) if "fail" in report: return (False, "Found 'fail' in report\n" + report) return (True, "") def reload(self): '''Reload daemon''' rc, report = cmd([self.initscript, 'force-reload']) expected = 0 result = 'Got exit code %d, expected %d\n' % (rc, expected) if expected != rc: return (False, result + report) if "fail" in report: return (False, "Found 'fail' in report\n" + report) return (True, "") def restart(self): '''Restart daemon''' (res, str) = self.stop() if not res: return (res, str) (res, str) = self.start() if not res: return (res, str) return (True, "") def status(self): '''Check daemon status''' rc, report = cmd([self.initscript, 'status']) expected = 0 result = 'Got exit code %d, expected %d\n' % (rc, expected) if expected != rc: return (False, result + report) if "fail" in report: return (False, "Found 'fail' in report\n" + report) return (True, "") class TestlibManager(object): '''Singleton class used to set up per-test-run information''' def __init__(self): # Set glibc aborts to dump to stderr instead of the tty so test output # is more sane. os.environ.setdefault('LIBC_FATAL_STDERR_','1') # check verbosity self.verbosity = False if (len(sys.argv) > 1 and '-v' in sys.argv[1:]): self.verbosity = True # Load LSB release file self.lsb_release = dict() if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'): raise OSError("Please install 'lsb-release'") for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True).communicate()[0].splitlines(): field, value = line.split(':',1) value=value.strip() field=field.strip() # Convert numerics try: value = float(value) except: pass self.lsb_release.setdefault(field,value) # FIXME: hack OEM releases into known-Ubuntu versions if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)": if self.lsb_release['Release'] == 1.0: self.lsb_release['Distributor ID'] = "Ubuntu" self.lsb_release['Release'] = 8.04 else: raise OSError("Unknown version of HP MIE") # FIXME: hack to assume a most-recent release if we're not # running under Ubuntu. if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]: self.lsb_release['Release'] = 10000 # Adjust Linaro release to pretend to be Ubuntu if self.lsb_release['Distributor ID'] in ["Linaro"]: self.lsb_release['Distributor ID'] = "Ubuntu" self.lsb_release['Release'] -= 0.01 # Load arch if not os.path.exists('/usr/bin/dpkg'): machine = cmd(['uname','-m'])[1].strip() if machine.endswith('86'): self.dpkg_arch = 'i386' elif machine.endswith('_64'): self.dpkg_arch = 'amd64' elif machine.startswith('arm'): self.dpkg_arch = 'armel' else: raise ValueError("Unknown machine type '%s'" % (machine)) else: self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip() # Find kernel version self.kernel_is_ubuntu = False self.kernel_version_signature = None self.kernel_version = cmd(["uname","-r"])[1].strip() versig = '/proc/version_signature' if os.path.exists(versig): self.kernel_is_ubuntu = True self.kernel_version_signature = open(versig).read().strip() self.kernel_version_ubuntu = self.kernel_version elif os.path.exists('/usr/bin/dpkg'): # this can easily be inaccurate but is only an issue for Dapper rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)]) if rc == 0: self.kernel_version_signature = out.strip().split('\n').pop().split()[2] self.kernel_version_ubuntu = self.kernel_version_signature if self.kernel_version_signature == None: # Attempt to fall back to something for non-Debian-based self.kernel_version_signature = self.kernel_version self.kernel_version_ubuntu = self.kernel_version # Build ubuntu version without hardware suffix try: self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)]) except: pass # Find gcc version self.gcc_version = get_gcc_version('gcc') # Find libc self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0] # Report self if self.verbosity: kernel = self.kernel_version_ubuntu if kernel != self.kernel_version_signature: kernel += " (%s)" % (self.kernel_version_signature) print("Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( sys.argv[0], self.lsb_release['Distributor ID'], self.lsb_release['Release'], kernel, self.dpkg_arch, os.geteuid(), os.getuid(), os.environ.get('SUDO_USER', '')), file=sys.stdout) sys.stdout.flush() # Additional heuristics #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']: # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000)) # sys.stdout.flush() # time.sleep(0.5) # sys.stdout.write("destroyed\n") # time.sleep(0.5) def hello(self, msg): print("Hello from %s" % (msg), file=sys.stderr) # The central instance manager = TestlibManager() class TestlibCase(unittest.TestCase): def __init__(self, *args): '''This is called for each TestCase test instance, which isn't much better than SetUp.''' unittest.TestCase.__init__(self, *args) # Attach to and duplicate dicts from manager singleton self.manager = manager #self.manager.hello(repr(self) + repr(*args)) self.my_verbosity = self.manager.verbosity self.lsb_release = self.manager.lsb_release self.dpkg_arch = self.manager.dpkg_arch self.kernel_version = self.manager.kernel_version self.kernel_version_signature = self.manager.kernel_version_signature self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu self.gcc_version = self.manager.gcc_version self.path_libc = self.manager.path_libc def version_compare(self, one, two): return apt_pkg.VersionCompare(one,two) def assertFileType(self, filename, filetype): '''Checks the file type of the file specified''' (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename]) out = out.strip() expected = 0 # Absolutely no idea why this happens on Hardy if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0: rc = 0 result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report) self.assertEqual(expected, rc, result) filetype = '^%s$' % (filetype) result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype) self.assertNotEquals(None, re.search(filetype, out), result) def yank_commonname_from_cert(self, certfile): '''Extract the commonName from a given PEM''' rc, out = cmd(['openssl','asn1parse','-in',certfile]) if rc == 0: ready = False for line in out.splitlines(): if ready: return line.split(':')[-1] if ':commonName' in line: ready = True return socket.getfqdn() def announce(self, text): if self.my_verbosity: print("(%s) " % (text), file=sys.stderr, end='') sys.stdout.flush() def make_clean(self): rc, output = self.shell_cmd(['make','clean']) self.assertEqual(rc, 0, output) def get_makefile_compiler(self): # Find potential compiler name compiler = 'gcc' if os.path.exists('Makefile'): for line in open('Makefile'): if line.startswith('CC') and '=' in line: items = [x.strip() for x in line.split('=')] if items[0] == 'CC': compiler = items[1] break return compiler def make_target(self, target, expected=0): '''Compile a target and report output''' compiler = self.get_makefile_compiler() rc, output = self.shell_cmd(['make',target]) self.assertEqual(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output) self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output) return output # call as return testlib.skipped() def _skipped(self, reason=""): '''Provide a visible way to indicate that a test was skipped''' if reason != "": reason = ': %s' % (reason) self.announce("skipped%s" % (reason)) return False def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT): argstr = "'" + "', '".join(args).strip() + "'" rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr) report = 'Command: ' + argstr + '\nOutput:\n' + out return rc, report, out def shell_cmd(self, args, stdin=None): return cmd(args,stdin=stdin) def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""): '''Test a shell command matches a specific exit code''' rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) result = 'Got exit code %d, expected %d\n' % (rc, expected) self.assertEqual(expected, rc, msg + result + report) def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""): '''Test a shell command doesn't match a specific exit code''' rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) result = 'Got (unwanted) exit code %d\n' % rc self.assertNotEquals(unwanted, rc, msg + result + report) def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False): '''Test a shell command contains a specific output''' rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text) if not invert: self.assertTrue(text in out, msg + result + report) else: self.assertFalse(text in out, msg + result + report) def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None): '''Test a shell command matches a specific output''' rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr) result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args)) if not invert: self.assertEqual(text, out, msg + result + report) else: self.assertNotEquals(text, out, msg + result + report) if expected != None: result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args)) self.assertEqual(rc, expected, msg + result + report) def _word_find(self, report, content, invert=False): '''Check for a specific string''' if invert: warning = 'Found "%s"\n' % content self.assertTrue(content not in report, warning + report) else: warning = 'Could not find "%s"\n' % content self.assertTrue(content in report, warning + report) def _test_sysctl_value(self, path, expected, msg=None, exists=True): sysctl = '/proc/sys/%s' % (path) self.assertEqual(exists, os.path.exists(sysctl), sysctl) value = None if exists: with open(sysctl) as sysctl_fd: value = int(sysctl_fd.read()) report = "%s is not %d: %d" % (sysctl, expected, value) if msg: report += " (%s)" % (msg) self.assertEqual(value, expected, report) return value def set_sysctl_value(self, path, desired): sysctl = '/proc/sys/%s' % (path) self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl)) with open(sysctl, 'w') as sysctl_fh: sysctl_fh.write(str(desired)) self._test_sysctl_value(path, desired) def kernel_at_least(self, introduced): return self.version_compare(self.kernel_version_ubuntu, introduced) >= 0 def kernel_claims_cve_fixed(self, cve): changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version) if os.path.exists(changelog): for line in gzip.open(changelog): if cve in line and not "revert" in line and not "Revert" in line: return True return False class TestGroup: '''Create a temporary test group and remove it again in the dtor.''' def __init__(self, group=None, lower=False): '''Create a new group''' self.group = None if group: if group_exists(group): raise ValueError('group name already exists') else: while(True): group = random_string(7,lower=lower) if not group_exists(group): break assert subprocess.call(['groupadd',group]) == 0 self.group = group g = grp.getgrnam(self.group) self.gid = g[2] def __del__(self): '''Remove the created group.''' if self.group: rc, report = cmd(['groupdel', self.group]) assert rc == 0 class TestUser: '''Create a temporary test user and remove it again in the dtor.''' def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None): '''Create a new user account with a random password. By default, the login name is random, too, but can be explicitly specified with 'login'. By default, a home directory is created, this can be suppressed with 'home=False'.''' self.login = None if os.geteuid() != 0: raise ValueError("You must be root to run this test") if login: if login_exists(login): raise ValueError('login name already exists') else: while(True): login = 't' + random_string(7,lower=lower) if not login_exists(login): break self.salt = random_string(2) self.password = random_string(8,lower=lower) self.crypted = crypt.crypt(self.password, self.salt) creation = ['useradd', '-p', self.crypted] if home: creation += ['-m'] if group: creation += ['-G',group] if uidmin: creation += ['-K','UID_MIN=%d'%uidmin] if shell: creation += ['-s',shell] creation += [login] assert subprocess.call(creation) == 0 # Set GECOS assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0 self.login = login p = pwd.getpwnam(self.login) self.uid = p[2] self.gid = p[3] self.gecos = p[4] self.home = p[5] self.shell = p[6] def __del__(self): '''Remove the created user account.''' if self.login: # sanity check the login name so we don't accidentally wipe too much if len(self.login)>3 and not '/' in self.login: subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login]) rc, report = cmd(['userdel', '-f', self.login]) assert rc == 0 def add_to_group(self, group): '''Add user to the specified group name''' rc, report = cmd(['usermod', '-G', group, self.login]) if rc != 0: print(report) assert rc == 0 # Timeout handler using alarm() from John P. Speno's Pythonic Avocado class TimeoutFunctionException(Exception): """Exception to raise on a timeout""" pass class TimeoutFunction: def __init__(self, function, timeout): self.timeout = timeout self.function = function def handle_timeout(self, signum, frame): raise TimeoutFunctionException() def __call__(self, *args, **kwargs): old = signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.timeout) try: result = self.function(*args, **kwargs) finally: signal.signal(signal.SIGALRM, old) signal.alarm(0) return result def main(): print("hi") unittest.main()