From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/boost/tools/build/test/TestCmd.py | 609 ++++++++++++++++++++++++++++++++++ 1 file changed, 609 insertions(+) create mode 100644 src/boost/tools/build/test/TestCmd.py (limited to 'src/boost/tools/build/test/TestCmd.py') diff --git a/src/boost/tools/build/test/TestCmd.py b/src/boost/tools/build/test/TestCmd.py new file mode 100644 index 000000000..3f9c2a3ce --- /dev/null +++ b/src/boost/tools/build/test/TestCmd.py @@ -0,0 +1,609 @@ +""" +TestCmd.py: a testing framework for commands and scripts. + +The TestCmd module provides a framework for portable automated testing of +executable commands and scripts (in any language, not just Python), especially +commands and scripts that require file system interaction. + +In addition to running tests and evaluating conditions, the TestCmd module +manages and cleans up one or more temporary workspace directories, and provides +methods for creating files and directories in those workspace directories from +in-line data, here-documents), allowing tests to be completely self-contained. + +A TestCmd environment object is created via the usual invocation: + + test = TestCmd() + +The TestCmd module provides pass_test(), fail_test(), and no_result() unbound +methods that report test results for use with the Aegis change management +system. These methods terminate the test immediately, reporting PASSED, FAILED +or NO RESULT respectively and exiting with status 0 (success), 1 or 2 +respectively. This allows for a distinction between an actual failed test and a +test that could not be properly evaluated because of an external condition (such +as a full file system or incorrect permissions). + +""" + +# Copyright 2000 Steven Knight +# This module is free software, and you may redistribute it and/or modify +# it under the same terms as Python itself, so long as this copyright message +# and disclaimer are retained in their original form. +# +# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, +# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF +# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. +# +# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, +# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, +# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + +# Copyright 2002-2003 Vladimir Prus. +# Copyright 2002-2003 Dave Abrahams. +# Copyright 2006 Rene Rivera. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +from __future__ import print_function + +__author__ = "Steven Knight " +__revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software" +__version__ = "0.02" + +from types import * + +import os +import os.path +import re +import shutil +import stat +import subprocess +import sys +import tempfile +import traceback + + +tempfile.template = 'testcmd.' + +_Cleanup = [] + +def _clean(): + global _Cleanup + list = _Cleanup[:] + _Cleanup = [] + list.reverse() + for test in list: + test.cleanup() + +sys.exitfunc = _clean + + +def caller(tblist, skip): + string = "" + arr = [] + for file, line, name, text in tblist: + if file[-10:] == "TestCmd.py": + break + arr = [(file, line, name, text)] + arr + atfrom = "at" + for file, line, name, text in arr[skip:]: + if name == "?": + name = "" + else: + name = " (" + name + ")" + string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) + atfrom = "\tfrom" + return string + + +def fail_test(self=None, condition=True, function=None, skip=0): + """Cause the test to fail. + + By default, the fail_test() method reports that the test FAILED and exits + with a status of 1. If a condition argument is supplied, the test fails + only if the condition is true. + + """ + if not condition: + return + if not function is None: + function() + of = "" + desc = "" + sep = " " + if not self is None: + if self.program: + of = " of " + " ".join(self.program) + sep = "\n\t" + if self.description: + desc = " [" + self.description + "]" + sep = "\n\t" + + at = caller(traceback.extract_stack(), skip) + + sys.stderr.write("FAILED test" + of + desc + sep + at + """ +in directory: """ + os.getcwd() ) + sys.exit(1) + + +def no_result(self=None, condition=True, function=None, skip=0): + """Causes a test to exit with no valid result. + + By default, the no_result() method reports NO RESULT for the test and + exits with a status of 2. If a condition argument is supplied, the test + fails only if the condition is true. + + """ + if not condition: + return + if not function is None: + function() + of = "" + desc = "" + sep = " " + if not self is None: + if self.program: + of = " of " + self.program + sep = "\n\t" + if self.description: + desc = " [" + self.description + "]" + sep = "\n\t" + + at = caller(traceback.extract_stack(), skip) + sys.stderr.write("NO RESULT for test" + of + desc + sep + at) + sys.exit(2) + + +def pass_test(self=None, condition=True, function=None): + """Causes a test to pass. + + By default, the pass_test() method reports PASSED for the test and exits + with a status of 0. If a condition argument is supplied, the test passes + only if the condition is true. + + """ + if not condition: + return + if not function is None: + function() + sys.stderr.write("PASSED\n") + sys.exit(0) + +class MatchError(object): + def __init__(self, message): + self.message = message + def __nonzero__(self): + return False + def __bool__(self): + return False + +def match_exact(lines=None, matches=None): + """ + Returns whether the given lists or strings containing lines separated + using newline characters contain exactly the same data. + + """ + if not type(lines) is list: + lines = lines.split("\n") + if not type(matches) is list: + matches = matches.split("\n") + if len(lines) != len(matches): + return + for i in range(len(lines)): + if lines[i] != matches[i]: + return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % + (i+1, matches[i], lines[i])) + if len(lines) < len(matches): + return MatchError("Missing lines at line %d\n- %s" % + (len(lines), "\n- ".join(matches[len(lines):]))) + if len(lines) > len(matches): + return MatchError("Extra lines at line %d\n+ %s" % + (len(matches), "\n+ ".join(lines[len(matches):]))) + return 1 + + +def match_re(lines=None, res=None): + """ + Given lists or strings contain lines separated using newline characters. + This function matches those lines one by one, interpreting the lines in the + res parameter as regular expressions. + + """ + if not type(lines) is list: + lines = lines.split("\n") + if not type(res) is list: + res = res.split("\n") + for i in range(min(len(lines), len(res))): + if not re.compile("^" + res[i] + "$").search(lines[i]): + return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % + (i+1, res[i], lines[i])) + if len(lines) < len(res): + return MatchError("Missing lines at line %d\n- %s" % + (len(lines), "\n- ".join(res[len(lines):]))) + if len(lines) > len(res): + return MatchError("Extra lines at line %d\n+ %s" % + (len(res), "\n+ ".join(lines[len(res):]))) + return 1 + + +class TestCmd: + def __init__(self, description=None, program=None, workdir=None, + subdir=None, verbose=False, match=None, inpath=None): + + self._cwd = os.getcwd() + self.description_set(description) + self.program_set(program, inpath) + self.verbose_set(verbose) + if match is None: + self.match_func = match_re + else: + self.match_func = match + self._dirlist = [] + self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} + env = os.environ.get('PRESERVE') + if env: + self._preserve['pass_test'] = env + self._preserve['fail_test'] = env + self._preserve['no_result'] = env + else: + env = os.environ.get('PRESERVE_PASS') + if env is not None: + self._preserve['pass_test'] = env + env = os.environ.get('PRESERVE_FAIL') + if env is not None: + self._preserve['fail_test'] = env + env = os.environ.get('PRESERVE_PASS') + if env is not None: + self._preserve['PRESERVE_NO_RESULT'] = env + self._stdout = [] + self._stderr = [] + self.status = None + self.condition = 'no_result' + self.workdir_set(workdir) + self.subdir(subdir) + + def __del__(self): + self.cleanup() + + def __repr__(self): + return "%x" % id(self) + + def cleanup(self, condition=None): + """ + Removes any temporary working directories for the specified TestCmd + environment. If the environment variable PRESERVE was set when the + TestCmd environment was created, temporary working directories are not + removed. If any of the environment variables PRESERVE_PASS, + PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd + environment was created, then temporary working directories are not + removed if the test passed, failed or had no result, respectively. + Temporary working directories are also preserved for conditions + specified via the preserve method. + + Typically, this method is not called directly, but is used when the + script exits to clean up temporary working directories as appropriate + for the exit status. + + """ + if not self._dirlist: + return + if condition is None: + condition = self.condition + if self._preserve[condition]: + for dir in self._dirlist: + print("Preserved directory %s" % dir) + else: + list = self._dirlist[:] + list.reverse() + for dir in list: + self.writable(dir, 1) + shutil.rmtree(dir, ignore_errors=1) + + self._dirlist = [] + self.workdir = None + os.chdir(self._cwd) + try: + global _Cleanup + _Cleanup.remove(self) + except (AttributeError, ValueError): + pass + + def description_set(self, description): + """Set the description of the functionality being tested.""" + self.description = description + + def fail_test(self, condition=True, function=None, skip=0): + """Cause the test to fail.""" + if not condition: + return + self.condition = 'fail_test' + fail_test(self = self, + condition = condition, + function = function, + skip = skip) + + def match(self, lines, matches): + """Compare actual and expected file contents.""" + return self.match_func(lines, matches) + + def match_exact(self, lines, matches): + """Compare actual and expected file content exactly.""" + return match_exact(lines, matches) + + def match_re(self, lines, res): + """Compare file content with a regular expression.""" + return match_re(lines, res) + + def no_result(self, condition=True, function=None, skip=0): + """Report that the test could not be run.""" + if not condition: + return + self.condition = 'no_result' + no_result(self = self, + condition = condition, + function = function, + skip = skip) + + def pass_test(self, condition=True, function=None): + """Cause the test to pass.""" + if not condition: + return + self.condition = 'pass_test' + pass_test(self, condition, function) + + def preserve(self, *conditions): + """ + Arrange for the temporary working directories for the specified + TestCmd environment to be preserved for one or more conditions. If no + conditions are specified, arranges for the temporary working + directories to be preserved for all conditions. + + """ + if conditions is (): + conditions = ('pass_test', 'fail_test', 'no_result') + for cond in conditions: + self._preserve[cond] = 1 + + def program_set(self, program, inpath): + """Set the executable program or script to be tested.""" + if not inpath and program and not os.path.isabs(program[0]): + program[0] = os.path.join(self._cwd, program[0]) + self.program = program + + def read(self, file, mode='rb'): + """ + Reads and returns the contents of the specified file name. The file + name may be a list, in which case the elements are concatenated with + the os.path.join() method. The file is assumed to be under the + temporary working directory unless it is an absolute path name. The I/O + mode for the file may be specified and must begin with an 'r'. The + default is 'rb' (binary read). + + """ + if type(file) is list: + file = os.path.join(*file) + if not os.path.isabs(file): + file = os.path.join(self.workdir, file) + if mode[0] != 'r': + raise ValueError("mode must begin with 'r'") + return open(file, mode).read() + + def run(self, program=None, arguments=None, chdir=None, stdin=None, + universal_newlines=True): + """ + Runs a test of the program or script for the test environment. + Standard output and error output are saved for future retrieval via the + stdout() and stderr() methods. + + 'universal_newlines' parameter controls how the child process + input/output streams are opened as defined for the same named Python + subprocess.POpen constructor parameter. + + """ + if chdir: + if not os.path.isabs(chdir): + chdir = os.path.join(self.workpath(chdir)) + if self.verbose: + sys.stderr.write("chdir(" + chdir + ")\n") + else: + chdir = self.workdir + + cmd = [] + if program and program[0]: + if program[0] != self.program[0] and not os.path.isabs(program[0]): + program[0] = os.path.join(self._cwd, program[0]) + cmd += program + else: + cmd += self.program + if arguments: + cmd += arguments.split(" ") + if self.verbose: + sys.stderr.write(" ".join(cmd) + "\n") + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir, + universal_newlines=universal_newlines) + + if stdin: + if type(stdin) is list: + stdin = "".join(stdin) + out, err = p.communicate(stdin) + if not type(out) is str: + out = out.decode() + if not type(err) is str: + err = err.decode() + self._stdout.append(out) + self._stderr.append(err) + self.status = p.returncode + + if self.verbose: + sys.stdout.write(self._stdout[-1]) + sys.stderr.write(self._stderr[-1]) + + def stderr(self, run=None): + """ + Returns the error output from the specified run number. If there is + no specified run number, then returns the error output of the last run. + If the run number is less than zero, then returns the error output from + that many runs back from the current run. + + """ + if not run: + run = len(self._stderr) + elif run < 0: + run = len(self._stderr) + run + run -= 1 + if run < 0: + return '' + return self._stderr[run] + + def stdout(self, run=None): + """ + Returns the standard output from the specified run number. If there + is no specified run number, then returns the standard output of the + last run. If the run number is less than zero, then returns the + standard output from that many runs back from the current run. + + """ + if not run: + run = len(self._stdout) + elif run < 0: + run = len(self._stdout) + run + run -= 1 + if run < 0: + return '' + return self._stdout[run] + + def subdir(self, *subdirs): + """ + Create new subdirectories under the temporary working directory, one + for each argument. An argument may be a list, in which case the list + elements are concatenated using the os.path.join() method. + Subdirectories multiple levels deep must be created using a separate + argument for each level: + + test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) + + Returns the number of subdirectories actually created. + + """ + count = 0 + for sub in subdirs: + if sub is None: + continue + if type(sub) is list: + sub = os.path.join(*tuple(sub)) + new = os.path.join(self.workdir, sub) + try: + os.mkdir(new) + except: + pass + else: + count += 1 + return count + + def unlink(self, file): + """ + Unlinks the specified file name. The file name may be a list, in + which case the elements are concatenated using the os.path.join() + method. The file is assumed to be under the temporary working directory + unless it is an absolute path name. + + """ + if type(file) is list: + file = os.path.join(*tuple(file)) + if not os.path.isabs(file): + file = os.path.join(self.workdir, file) + os.unlink(file) + + def verbose_set(self, verbose): + """Set the verbose level.""" + self.verbose = verbose + + def workdir_set(self, path): + """ + Creates a temporary working directory with the specified path name. + If the path is a null string (''), a unique directory name is created. + + """ + if os.path.isabs(path): + self.workdir = path + else: + if path != None: + if path == '': + path = tempfile.mktemp() + if path != None: + os.mkdir(path) + self._dirlist.append(path) + global _Cleanup + try: + _Cleanup.index(self) + except ValueError: + _Cleanup.append(self) + # We would like to set self.workdir like this: + # self.workdir = path + # But symlinks in the path will report things differently from + # os.getcwd(), so chdir there and back to fetch the canonical + # path. + cwd = os.getcwd() + os.chdir(path) + self.workdir = os.getcwd() + os.chdir(cwd) + else: + self.workdir = None + + def workpath(self, *args): + """ + Returns the absolute path name to a subdirectory or file within the + current temporary working directory. Concatenates the temporary working + directory name with the specified arguments using os.path.join(). + + """ + return os.path.join(self.workdir, *tuple(args)) + + def writable(self, top, write): + """ + Make the specified directory tree writable (write == 1) or not + (write == None). + + """ + def _walk_chmod(arg, dirname, names): + st = os.stat(dirname) + os.chmod(dirname, arg(st[stat.ST_MODE])) + for name in names: + fullname = os.path.join(dirname, name) + st = os.stat(fullname) + os.chmod(fullname, arg(st[stat.ST_MODE])) + + _mode_writable = lambda mode: stat.S_IMODE(mode|0o200) + _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0o200) + + if write: + f = _mode_writable + else: + f = _mode_non_writable + try: + for root, _, files in os.walk(top): + _walk_chmod(f, root, files) + except: + pass # Ignore any problems changing modes. + + def write(self, file, content, mode='wb'): + """ + Writes the specified content text (second argument) to the specified + file name (first argument). The file name may be a list, in which case + the elements are concatenated using the os.path.join() method. The file + is created under the temporary working directory. Any subdirectories in + the path must already exist. The I/O mode for the file may be specified + and must begin with a 'w'. The default is 'wb' (binary write). + + """ + if type(file) is list: + file = os.path.join(*tuple(file)) + if not os.path.isabs(file): + file = os.path.join(self.workdir, file) + if mode[0] != 'w': + raise ValueError("mode must begin with 'w'") + open(file, mode).write(content) -- cgit v1.2.3