diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/boost/tools/build/test/BoostBuild.py | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/boost/tools/build/test/BoostBuild.py')
-rw-r--r-- | src/boost/tools/build/test/BoostBuild.py | 1348 |
1 files changed, 1348 insertions, 0 deletions
diff --git a/src/boost/tools/build/test/BoostBuild.py b/src/boost/tools/build/test/BoostBuild.py new file mode 100644 index 00000000..6e87d8a4 --- /dev/null +++ b/src/boost/tools/build/test/BoostBuild.py @@ -0,0 +1,1348 @@ +# Copyright 2002-2005 Vladimir Prus. +# Copyright 2002-2003 Dave Abrahams. +# Copyright 2006 Rene Rivera. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +from __future__ import print_function + +import TestCmd + +import copy +import fnmatch +import glob +import math +import os +import os.path +import re +import shutil +try: + from StringIO import StringIO +except: + from io import StringIO +import subprocess +import sys +import tempfile +import time +import traceback +import tree +import types + +from xml.sax.saxutils import escape + +try: + from functools import reduce +except: + pass + + +def isstr(data): + return isinstance(data, (type(''), type(u''))) + + +class TestEnvironmentError(Exception): + pass + + +annotations = [] + + +def print_annotation(name, value, xml): + """Writes some named bits of information about the current test run.""" + if xml: + print(escape(name) + " {{{") + print(escape(value)) + print("}}}") + else: + print(name + " {{{") + print(value) + print("}}}") + + +def flush_annotations(xml=0): + global annotations + for ann in annotations: + print_annotation(ann[0], ann[1], xml) + annotations = [] + + +def clear_annotations(): + global annotations + annotations = [] + + +defer_annotations = 0 + +def set_defer_annotations(n): + global defer_annotations + defer_annotations = n + + +def annotate_stack_trace(tb=None): + if tb: + trace = TestCmd.caller(traceback.extract_tb(tb), 0) + else: + trace = TestCmd.caller(traceback.extract_stack(), 1) + annotation("stacktrace", trace) + + +def annotation(name, value): + """Records an annotation about the test run.""" + annotations.append((name, value)) + if not defer_annotations: + flush_annotations() + + +def get_toolset(): + toolset = None + for arg in sys.argv[1:]: + if not arg.startswith("-"): + toolset = arg + return toolset or "gcc" + + +# Detect the host OS. +cygwin = hasattr(os, "uname") and os.uname()[0].lower().startswith("cygwin") +windows = cygwin or os.environ.get("OS", "").lower().startswith("windows") + +if cygwin: + default_os = "cygwin" +elif windows: + default_os = "windows" +elif hasattr(os, "uname"): + default_os = os.uname()[0].lower() + +def prepare_prefixes_and_suffixes(toolset, target_os=default_os): + prepare_suffix_map(toolset, target_os) + prepare_library_prefix(toolset, target_os) + + +def prepare_suffix_map(toolset, target_os=default_os): + """ + Set up suffix translation performed by the Boost Build testing framework + to accommodate different toolsets generating targets of the same type using + different filename extensions (suffixes). + + """ + global suffixes + suffixes = {} + if target_os == "cygwin": + suffixes[".lib"] = ".a" + suffixes[".obj"] = ".o" + suffixes[".implib"] = ".lib.a" + elif target_os == "windows": + if toolset == "gcc": + # MinGW + suffixes[".lib"] = ".a" + suffixes[".obj"] = ".o" + suffixes[".implib"] = ".dll.a" + else: + # Everything else Windows + suffixes[".implib"] = ".lib" + else: + suffixes[".exe"] = "" + suffixes[".dll"] = ".so" + suffixes[".lib"] = ".a" + suffixes[".obj"] = ".o" + suffixes[".implib"] = ".no_implib_files_on_this_platform" + + if target_os == "darwin": + suffixes[".dll"] = ".dylib" + + +def prepare_library_prefix(toolset, target_os=default_os): + """ + Setup whether Boost Build is expected to automatically prepend prefixes + to its built library targets. + + """ + global lib_prefix + lib_prefix = "lib" + + global dll_prefix + if target_os == "cygwin": + dll_prefix = "cyg" + elif target_os == "windows" and toolset != "gcc": + dll_prefix = None + else: + dll_prefix = "lib" + + +def re_remove(sequence, regex): + me = re.compile(regex) + result = list(filter(lambda x: me.match(x), sequence)) + if not result: + raise ValueError() + for r in result: + sequence.remove(r) + + +def glob_remove(sequence, pattern): + result = list(fnmatch.filter(sequence, pattern)) + if not result: + raise ValueError() + for r in result: + sequence.remove(r) + + +class Tester(TestCmd.TestCmd): + """Main tester class for Boost Build. + + Optional arguments: + + `arguments` - Arguments passed to the run executable. + `executable` - Name of the executable to invoke. + `match` - Function to use for compating actual and + expected file contents. + `boost_build_path` - Boost build path to be passed to the run + executable. + `translate_suffixes` - Whether to update suffixes on the the file + names passed from the test script so they + match those actually created by the current + toolset. For example, static library files + are specified by using the .lib suffix but + when the "gcc" toolset is used it actually + creates them using the .a suffix. + `pass_toolset` - Whether the test system should pass the + specified toolset to the run executable. + `use_test_config` - Whether the test system should tell the run + executable to read in the test_config.jam + configuration file. + `ignore_toolset_requirements` - Whether the test system should tell the run + executable to ignore toolset requirements. + `workdir` - Absolute directory where the test will be + run from. + `pass_d0` - If set, when tests are not explicitly run + in verbose mode, they are run as silent + (-d0 & --quiet Boost Jam options). + + Optional arguments inherited from the base class: + + `description` - Test description string displayed in case + of a failed test. + `subdir` - List of subdirectories to automatically + create under the working directory. Each + subdirectory needs to be specified + separately, parent coming before its child. + `verbose` - Flag that may be used to enable more + verbose test system output. Note that it + does not also enable more verbose build + system output like the --verbose command + line option does. + """ + def __init__(self, arguments=None, executable="b2", + match=TestCmd.match_exact, boost_build_path=None, + translate_suffixes=True, pass_toolset=True, use_test_config=True, + ignore_toolset_requirements=False, workdir="", pass_d0=False, + **keywords): + + assert arguments.__class__ is not str + self.original_workdir = os.path.dirname(__file__) + if workdir and not os.path.isabs(workdir): + raise ("Parameter workdir <%s> must point to an absolute " + "directory: " % workdir) + + self.last_build_timestamp = 0 + self.translate_suffixes = translate_suffixes + self.use_test_config = use_test_config + + self.toolset = get_toolset() + self.pass_toolset = pass_toolset + self.ignore_toolset_requirements = ignore_toolset_requirements + + prepare_prefixes_and_suffixes(pass_toolset and self.toolset or "gcc") + + use_default_bjam = "--default-bjam" in sys.argv + + if not use_default_bjam: + jam_build_dir = "" + + # Find where jam_src is located. Try for the debug version if it is + # lying around. + srcdir = os.path.join(os.path.dirname(__file__), "..", "src") + dirs = [os.path.join(srcdir, "engine", jam_build_dir + ".debug"), + os.path.join(srcdir, "engine", jam_build_dir)] + for d in dirs: + if os.path.exists(d): + jam_build_dir = d + break + else: + print("Cannot find built Boost.Jam") + sys.exit(1) + + verbosity = ["-d0", "--quiet"] + if not pass_d0: + verbosity = [] + if "--verbose" in sys.argv: + keywords["verbose"] = True + verbosity = ["-d2"] + self.verbosity = verbosity + + if boost_build_path is None: + boost_build_path = self.original_workdir + "/.." + + program_list = [] + if use_default_bjam: + program_list.append(executable) + else: + program_list.append(os.path.join(jam_build_dir, executable)) + program_list.append('-sBOOST_BUILD_PATH="' + boost_build_path + '"') + if arguments: + program_list += arguments + + TestCmd.TestCmd.__init__(self, program=program_list, match=match, + workdir=workdir, inpath=use_default_bjam, **keywords) + + os.chdir(self.workdir) + + def cleanup(self): + try: + TestCmd.TestCmd.cleanup(self) + os.chdir(self.original_workdir) + except AttributeError: + # When this is called during TestCmd.TestCmd.__del__ we can have + # both 'TestCmd' and 'os' unavailable in our scope. Do nothing in + # this case. + pass + + def set_toolset(self, toolset, target_os=default_os): + self.toolset = toolset + self.pass_toolset = True + prepare_prefixes_and_suffixes(toolset, target_os) + + + # + # Methods that change the working directory's content. + # + def set_tree(self, tree_location): + # It is not possible to remove the current directory. + d = os.getcwd() + os.chdir(os.path.dirname(self.workdir)) + shutil.rmtree(self.workdir, ignore_errors=False) + + if not os.path.isabs(tree_location): + tree_location = os.path.join(self.original_workdir, tree_location) + shutil.copytree(tree_location, self.workdir) + + os.chdir(d) + def make_writable(unused, dir, entries): + for e in entries: + name = os.path.join(dir, e) + os.chmod(name, os.stat(name).st_mode | 0o222) + for root, _, files in os.walk("."): + make_writable(None, root, files) + + def write(self, file, content, wait=True): + nfile = self.native_file_name(file) + self.__makedirs(os.path.dirname(nfile), wait) + if not type(content) == bytes: + content = content.encode() + f = open(nfile, "wb") + try: + f.write(content) + finally: + f.close() + self.__ensure_newer_than_last_build(nfile) + + def copy(self, src, dst): + try: + self.write(dst, self.read(src, binary=True)) + except: + self.fail_test(1) + + def copy_preserving_timestamp(self, src, dst): + src_name = self.native_file_name(src) + dst_name = self.native_file_name(dst) + stats = os.stat(src_name) + self.write(dst, self.__read(src, binary=True)) + os.utime(dst_name, (stats.st_atime, stats.st_mtime)) + + def touch(self, names, wait=True): + if isstr(names): + names = [names] + for name in names: + path = self.native_file_name(name) + if wait: + self.__ensure_newer_than_last_build(path) + else: + os.utime(path, None) + + def rm(self, names): + if not type(names) == list: + names = [names] + + if names == ["."]: + # If we are deleting the entire workspace, there is no need to wait + # for a clock tick. + self.last_build_timestamp = 0 + + # Avoid attempts to remove the current directory. + os.chdir(self.original_workdir) + for name in names: + n = glob.glob(self.native_file_name(name)) + if n: n = n[0] + if not n: + n = self.glob_file(name.replace("$toolset", self.toolset + "*") + ) + if n: + if os.path.isdir(n): + shutil.rmtree(n, ignore_errors=False) + else: + os.unlink(n) + + # Create working dir root again in case we removed it. + if not os.path.exists(self.workdir): + os.mkdir(self.workdir) + os.chdir(self.workdir) + + def expand_toolset(self, name): + """ + Expands $toolset placeholder in the given file to the name of the + toolset currently being tested. + + """ + self.write(name, self.read(name).replace("$toolset", self.toolset)) + + def dump_stdio(self): + annotation("STDOUT", self.stdout()) + annotation("STDERR", self.stderr()) + + def run_build_system(self, extra_args=None, subdir="", stdout=None, + stderr="", status=0, match=None, pass_toolset=None, + use_test_config=None, ignore_toolset_requirements=None, + expected_duration=None, **kw): + + assert extra_args.__class__ is not str + + if os.path.isabs(subdir): + print("You must pass a relative directory to subdir <%s>." % subdir + ) + return + + self.previous_tree, dummy = tree.build_tree(self.workdir) + self.wait_for_time_change_since_last_build() + + if match is None: + match = self.match + + if pass_toolset is None: + pass_toolset = self.pass_toolset + + if use_test_config is None: + use_test_config = self.use_test_config + + if ignore_toolset_requirements is None: + ignore_toolset_requirements = self.ignore_toolset_requirements + + try: + kw["program"] = [] + kw["program"] += self.program + if extra_args: + kw["program"] += extra_args + if not extra_args or not any(a.startswith("-j") for a in extra_args): + kw["program"] += ["-j1"] + if stdout is None and not any(a.startswith("-d") for a in kw["program"]): + kw["program"] += self.verbosity + if pass_toolset: + kw["program"].append("toolset=" + self.toolset) + if use_test_config: + kw["program"].append('--test-config="%s"' % os.path.join( + self.original_workdir, "test-config.jam")) + if ignore_toolset_requirements: + kw["program"].append("--ignore-toolset-requirements") + if "--python" in sys.argv: + # -z disables Python optimization mode. + # this enables type checking (all assert + # and if __debug__ statements). + kw["program"].extend(["--python", "-z"]) + if "--stacktrace" in sys.argv: + kw["program"].append("--stacktrace") + kw["chdir"] = subdir + self.last_program_invocation = kw["program"] + build_time_start = time.time() + TestCmd.TestCmd.run(self, **kw) + build_time_finish = time.time() + except: + self.dump_stdio() + raise + + old_last_build_timestamp = self.last_build_timestamp + self.tree, self.last_build_timestamp = tree.build_tree(self.workdir) + self.difference = tree.tree_difference(self.previous_tree, self.tree) + if self.difference.empty(): + # If nothing has been changed by this build and sufficient time has + # passed since the last build that actually changed something, + # there is no need to wait for touched or newly created files to + # start getting newer timestamps than the currently existing ones. + self.last_build_timestamp = old_last_build_timestamp + + self.difference.ignore_directories() + self.unexpected_difference = copy.deepcopy(self.difference) + + if (status and self.status) is not None and self.status != status: + expect = "" + if status != 0: + expect = " (expected %d)" % status + + annotation("failure", '"%s" returned %d%s' % (kw["program"], + self.status, expect)) + + annotation("reason", "unexpected status returned by bjam") + self.fail_test(1) + + if stdout is not None and not match(self.stdout(), stdout): + stdout_test = match(self.stdout(), stdout) + annotation("failure", "Unexpected stdout") + annotation("Expected STDOUT", stdout) + annotation("Actual STDOUT", self.stdout()) + stderr = self.stderr() + if stderr: + annotation("STDERR", stderr) + self.maybe_do_diff(self.stdout(), stdout, stdout_test) + self.fail_test(1, dump_stdio=False) + + # Intel tends to produce some messages to stderr which make tests fail. + intel_workaround = re.compile("^xi(link|lib): executing.*\n", re.M) + actual_stderr = re.sub(intel_workaround, "", self.stderr()) + + if stderr is not None and not match(actual_stderr, stderr): + stderr_test = match(actual_stderr, stderr) + annotation("failure", "Unexpected stderr") + annotation("Expected STDERR", stderr) + annotation("Actual STDERR", self.stderr()) + annotation("STDOUT", self.stdout()) + self.maybe_do_diff(actual_stderr, stderr, stderr_test) + self.fail_test(1, dump_stdio=False) + + if expected_duration is not None: + actual_duration = build_time_finish - build_time_start + if actual_duration > expected_duration: + print("Test run lasted %f seconds while it was expected to " + "finish in under %f seconds." % (actual_duration, + expected_duration)) + self.fail_test(1, dump_stdio=False) + + self.__ignore_junk() + + def glob_file(self, name): + name = self.adjust_name(name) + result = None + if hasattr(self, "difference"): + for f in (self.difference.added_files + + self.difference.modified_files + + self.difference.touched_files): + if fnmatch.fnmatch(f, name): + result = self.__native_file_name(f) + break + if not result: + result = glob.glob(self.__native_file_name(name)) + if result: + result = result[0] + return result + + def __read(self, name, binary=False): + try: + openMode = "r" + if binary: + openMode += "b" + else: + openMode += "U" + f = open(name, openMode) + result = f.read() + f.close() + return result + except: + annotation("failure", "Could not open '%s'" % name) + self.fail_test(1) + return "" + + def read(self, name, binary=False): + name = self.glob_file(name) + return self.__read(name, binary=binary) + + def read_and_strip(self, name): + if not self.glob_file(name): + return "" + f = open(self.glob_file(name), "rb") + lines = f.readlines() + f.close() + result = "\n".join(x.decode().rstrip() for x in lines) + if lines and lines[-1][-1] != "\n": + return result + "\n" + return result + + def fail_test(self, condition, dump_difference=True, dump_stdio=True, + dump_stack=True): + if not condition: + return + + if dump_difference and hasattr(self, "difference"): + f = StringIO() + self.difference.pprint(f) + annotation("changes caused by the last build command", + f.getvalue()) + + if dump_stdio: + self.dump_stdio() + + if "--preserve" in sys.argv: + print() + print("*** Copying the state of working dir into 'failed_test' ***") + print() + path = os.path.join(self.original_workdir, "failed_test") + if os.path.isdir(path): + shutil.rmtree(path, ignore_errors=False) + elif os.path.exists(path): + raise "Path " + path + " already exists and is not a directory" + shutil.copytree(self.workdir, path) + print("The failed command was:") + print(" ".join(self.last_program_invocation)) + + if dump_stack: + annotate_stack_trace() + sys.exit(1) + + # A number of methods below check expectations with actual difference + # between directory trees before and after a build. All the 'expect*' + # methods require exact names to be passed. All the 'ignore*' methods allow + # wildcards. + + # All names can be either a string or a list of strings. + def expect_addition(self, names): + for name in self.adjust_names(names): + try: + glob_remove(self.unexpected_difference.added_files, name) + except: + annotation("failure", "File %s not added as expected" % name) + self.fail_test(1) + + def ignore_addition(self, wildcard): + self.__ignore_elements(self.unexpected_difference.added_files, + wildcard) + + def expect_removal(self, names): + for name in self.adjust_names(names): + try: + glob_remove(self.unexpected_difference.removed_files, name) + except: + annotation("failure", "File %s not removed as expected" % name) + self.fail_test(1) + + def ignore_removal(self, wildcard): + self.__ignore_elements(self.unexpected_difference.removed_files, + wildcard) + + def expect_modification(self, names): + for name in self.adjust_names(names): + try: + glob_remove(self.unexpected_difference.modified_files, name) + except: + annotation("failure", "File %s not modified as expected" % + name) + self.fail_test(1) + + def ignore_modification(self, wildcard): + self.__ignore_elements(self.unexpected_difference.modified_files, + wildcard) + + def expect_touch(self, names): + d = self.unexpected_difference + for name in self.adjust_names(names): + # We need to check both touched and modified files. The reason is + # that: + # (1) Windows binaries such as obj, exe or dll files have slight + # differences even with identical inputs due to Windows PE + # format headers containing an internal timestamp. + # (2) Intel's compiler for Linux has the same behaviour. + filesets = [d.modified_files, d.touched_files] + + while filesets: + try: + glob_remove(filesets[-1], name) + break + except ValueError: + filesets.pop() + + if not filesets: + annotation("failure", "File %s not touched as expected" % name) + self.fail_test(1) + + def ignore_touch(self, wildcard): + self.__ignore_elements(self.unexpected_difference.touched_files, + wildcard) + + def ignore(self, wildcard): + self.ignore_addition(wildcard) + self.ignore_removal(wildcard) + self.ignore_modification(wildcard) + self.ignore_touch(wildcard) + + def expect_nothing(self, names): + for name in self.adjust_names(names): + if name in self.difference.added_files: + annotation("failure", + "File %s added, but no action was expected" % name) + self.fail_test(1) + if name in self.difference.removed_files: + annotation("failure", + "File %s removed, but no action was expected" % name) + self.fail_test(1) + pass + if name in self.difference.modified_files: + annotation("failure", + "File %s modified, but no action was expected" % name) + self.fail_test(1) + if name in self.difference.touched_files: + annotation("failure", + "File %s touched, but no action was expected" % name) + self.fail_test(1) + + def __ignore_junk(self): + # Not totally sure about this change, but I do not see a good + # alternative. + if windows: + self.ignore("*.ilk") # MSVC incremental linking files. + self.ignore("*.pdb") # MSVC program database files. + self.ignore("*.rsp") # Response files. + self.ignore("*.tds") # Borland debug symbols. + self.ignore("*.manifest") # MSVC DLL manifests. + self.ignore("bin/standalone/msvc/*/msvc-setup.bat") + + # Debug builds of bjam built with gcc produce this profiling data. + self.ignore("gmon.out") + self.ignore("*/gmon.out") + + # Boost Build's 'configure' functionality (unfinished at the time) + # produces this file. + self.ignore("bin/config.log") + self.ignore("bin/project-cache.jam") + + # Compiled Python files created when running Python based Boost Build. + self.ignore("*.pyc") + + # OSX/Darwin files and dirs. + self.ignore("*.dSYM/*") + + def expect_nothing_more(self): + if not self.unexpected_difference.empty(): + annotation("failure", "Unexpected changes found") + output = StringIO() + self.unexpected_difference.pprint(output) + annotation("unexpected changes", output.getvalue()) + self.fail_test(1) + + def expect_output_lines(self, lines, expected=True): + self.__expect_lines(self.stdout(), lines, expected) + + def expect_content_lines(self, filename, line, expected=True): + self.__expect_lines(self.read_and_strip(filename), line, expected) + + def expect_content(self, name, content, exact=False): + actual = self.read(name) + content = content.replace("$toolset", self.toolset + "*") + + matched = False + if exact: + matched = fnmatch.fnmatch(actual, content) + else: + def sorted_(z): + z.sort(key=lambda x: x.lower().replace("\\", "/")) + return z + actual_ = list(map(lambda x: sorted_(x.split()), actual.splitlines())) + content_ = list(map(lambda x: sorted_(x.split()), content.splitlines())) + if len(actual_) == len(content_): + matched = map( + lambda x, y: map(lambda n, p: fnmatch.fnmatch(n, p), x, y), + actual_, content_) + matched = reduce( + lambda x, y: x and reduce( + lambda a, b: a and b, + y, True), + matched, True) + + if not matched: + print("Expected:\n") + print(content) + print("Got:\n") + print(actual) + self.fail_test(1) + + def maybe_do_diff(self, actual, expected, result=None): + if os.environ.get("DO_DIFF"): + e = tempfile.mktemp("expected") + a = tempfile.mktemp("actual") + f = open(e, "w") + f.write(expected) + f.close() + f = open(a, "w") + f.write(actual) + f.close() + print("DIFFERENCE") + # Current diff should return 1 to indicate 'different input files' + # but some older diff versions may return 0 and depending on the + # exact Python/OS platform version, os.system() call may gobble up + # the external process's return code and return 0 itself. + if os.system('diff -u "%s" "%s"' % (e, a)) not in [0, 1]: + print('Unable to compute difference: diff -u "%s" "%s"' % (e, a + )) + os.unlink(e) + os.unlink(a) + elif type(result) is TestCmd.MatchError: + print(result.message) + else: + print("Set environmental variable 'DO_DIFF' to examine the " + "difference.") + + # Internal methods. + def adjust_lib_name(self, name): + global lib_prefix + global dll_prefix + result = name + + pos = name.rfind(".") + if pos != -1: + suffix = name[pos:] + if suffix == ".lib": + (head, tail) = os.path.split(name) + if lib_prefix: + tail = lib_prefix + tail + result = os.path.join(head, tail) + elif suffix == ".dll" or suffix == ".implib": + (head, tail) = os.path.split(name) + if dll_prefix: + tail = dll_prefix + tail + result = os.path.join(head, tail) + # If we want to use this name in a Jamfile, we better convert \ to /, + # as otherwise we would have to quote \. + result = result.replace("\\", "/") + return result + + def adjust_suffix(self, name): + if not self.translate_suffixes: + return name + pos = name.rfind(".") + if pos == -1: + return name + suffix = name[pos:] + return name[:pos] + suffixes.get(suffix, suffix) + + # Acceps either a string or a list of strings and returns a list of + # strings. Adjusts suffixes on all names. + def adjust_names(self, names): + if isstr(names): + names = [names] + r = map(self.adjust_lib_name, names) + r = map(self.adjust_suffix, r) + r = map(lambda x, t=self.toolset: x.replace("$toolset", t + "*"), r) + return list(r) + + def adjust_name(self, name): + return self.adjust_names(name)[0] + + def __native_file_name(self, name): + return os.path.normpath(os.path.join(self.workdir, *name.split("/"))) + + def native_file_name(self, name): + return self.__native_file_name(self.adjust_name(name)) + + def wait_for_time_change(self, path, touch): + """ + Wait for newly assigned file system modification timestamps for the + given path to become large enough for the timestamp difference to be + correctly recognized by both this Python based testing framework and + the Boost Jam executable being tested. May optionally touch the given + path to set its modification timestamp to the new value. + + """ + self.__wait_for_time_change(path, touch, last_build_time=False) + + def wait_for_time_change_since_last_build(self): + """ + Wait for newly assigned file system modification timestamps to + become large enough for the timestamp difference to be + correctly recognized by the Python based testing framework. + Does not care about Jam's timestamp resolution, since we + only need this to detect touched files. + """ + if self.last_build_timestamp: + timestamp_file = "timestamp-3df2f2317e15e4a9" + open(timestamp_file, "wb").close() + self.__wait_for_time_change_impl(timestamp_file, + self.last_build_timestamp, + self.__python_timestamp_resolution(timestamp_file, 0), 0) + os.unlink(timestamp_file) + + def __build_timestamp_resolution(self): + """ + Returns the minimum path modification timestamp resolution supported + by the used Boost Jam executable. + + """ + dir = tempfile.mkdtemp("bjam_version_info") + try: + jam_script = "timestamp_resolution.jam" + f = open(os.path.join(dir, jam_script), "w") + try: + f.write("EXIT $(JAM_TIMESTAMP_RESOLUTION) : 0 ;") + finally: + f.close() + p = subprocess.Popen([self.program[0], "-d0", "-f%s" % jam_script], + stdout=subprocess.PIPE, cwd=dir, universal_newlines=True) + out, err = p.communicate() + finally: + shutil.rmtree(dir, ignore_errors=False) + + if p.returncode != 0: + raise TestEnvironmentError("Unexpected return code (%s) when " + "detecting Boost Jam's minimum supported path modification " + "timestamp resolution version information." % p.returncode) + if err: + raise TestEnvironmentError("Unexpected error output (%s) when " + "detecting Boost Jam's minimum supported path modification " + "timestamp resolution version information." % err) + + r = re.match("([0-9]{2}):([0-9]{2}):([0-9]{2}\\.[0-9]{9})$", out) + if not r: + # Older Boost Jam versions did not report their minimum supported + # path modification timestamp resolution and did not actually + # support path modification timestamp resolutions finer than 1 + # second. + # TODO: Phase this support out to avoid such fallback code from + # possibly covering up other problems. + return 1 + if r.group(1) != "00" or r.group(2) != "00": # hours, minutes + raise TestEnvironmentError("Boost Jam with too coarse minimum " + "supported path modification timestamp resolution (%s:%s:%s)." + % (r.group(1), r.group(2), r.group(3))) + return float(r.group(3)) # seconds.nanoseconds + + def __ensure_newer_than_last_build(self, path): + """ + Updates the given path's modification timestamp after waiting for the + newly assigned file system modification timestamp to become large + enough for the timestamp difference between it and the last build + timestamp to be correctly recognized by both this Python based testing + framework and the Boost Jam executable being tested. Does nothing if + there is no 'last build' information available. + + """ + if self.last_build_timestamp: + self.__wait_for_time_change(path, touch=True, last_build_time=True) + + def __expect_lines(self, data, lines, expected): + """ + Checks whether the given data contains the given lines. + + Data may be specified as a single string containing text lines + separated by newline characters. + + Lines may be specified in any of the following forms: + * Single string containing text lines separated by newlines - the + given lines are searched for in the given data without any extra + data lines between them. + * Container of strings containing text lines separated by newlines + - the given lines are searched for in the given data with extra + data lines allowed between lines belonging to different strings. + * Container of strings containing text lines separated by newlines + and containers containing strings - the same as above with the + internal containers containing strings being interpreted as if + all their content was joined together into a single string + separated by newlines. + + A newline at the end of any multi-line lines string is interpreted as + an expected extra trailig empty line. + """ + # str.splitlines() trims at most one trailing newline while we want the + # trailing newline to indicate that there should be an extra empty line + # at the end. + def splitlines(x): + return (x + "\n").splitlines() + + if data is None: + data = [] + elif isstr(data): + data = splitlines(data) + + if isstr(lines): + lines = [splitlines(lines)] + else: + expanded = [] + for x in lines: + if isstr(x): + x = splitlines(x) + expanded.append(x) + lines = expanded + + if _contains_lines(data, lines) != bool(expected): + output = [] + if expected: + output = ["Did not find expected lines:"] + else: + output = ["Found unexpected lines:"] + first = True + for line_sequence in lines: + if line_sequence: + if first: + first = False + else: + output.append("...") + output.extend(" > " + line for line in line_sequence) + output.append("in output:") + output.extend(" > " + line for line in data) + annotation("failure", "\n".join(output)) + self.fail_test(1) + + def __ignore_elements(self, things, wildcard): + """Removes in-place 'things' elements matching the given 'wildcard'.""" + things[:] = list(filter(lambda x: not fnmatch.fnmatch(x, wildcard), things)) + + def __makedirs(self, path, wait): + """ + Creates a folder with the given path, together with any missing + parent folders. If WAIT is set, makes sure any newly created folders + have modification timestamps newer than the ones left behind by the + last build run. + + """ + try: + if wait: + stack = [] + while path and path not in stack and not os.path.isdir(path): + stack.append(path) + path = os.path.dirname(path) + while stack: + path = stack.pop() + os.mkdir(path) + self.__ensure_newer_than_last_build(path) + else: + os.makedirs(path) + except Exception: + pass + + def __python_timestamp_resolution(self, path, minimum_resolution): + """ + Returns the modification timestamp resolution for the given path + supported by the used Python interpreter/OS/filesystem combination. + Will not check for resolutions less than the given minimum value. Will + change the path's modification timestamp in the process. + + Return values: + 0 - nanosecond resolution supported + positive decimal - timestamp resolution in seconds + + """ + # Note on Python's floating point timestamp support: + # Python interpreter versions prior to Python 2.3 did not support + # floating point timestamps. Versions 2.3 through 3.3 may or may not + # support it depending on the configuration (may be toggled by calling + # os.stat_float_times(True/False) at program startup, disabled by + # default prior to Python 2.5 and enabled by default since). Python 3.3 + # deprecated this configuration and 3.4 removed support for it after + # which floating point timestamps are always supported. + ver = sys.version_info[0:2] + python_nanosecond_support = ver >= (3, 4) or (ver >= (2, 3) and + os.stat_float_times()) + + # Minimal expected floating point difference used to account for + # possible imprecise floating point number representations. We want + # this number to be small (at least smaller than 0.0001) but still + # large enough that we can be sure that increasing a floating point + # value by 2 * eta guarantees the value read back will be increased by + # at least eta. + eta = 0.00005 + + stats_orig = os.stat(path) + def test_time(diff): + """Returns whether a timestamp difference is detectable.""" + os.utime(path, (stats_orig.st_atime, stats_orig.st_mtime + diff)) + return os.stat(path).st_mtime > stats_orig.st_mtime + eta + + # Test for nanosecond timestamp resolution support. + if not minimum_resolution and python_nanosecond_support: + if test_time(2 * eta): + return 0 + + # Detect the filesystem timestamp resolution. Note that there is no + # need to make this code 'as fast as possible' as, this function gets + # called before having to sleep until the next detectable modification + # timestamp value and that, since we already know nanosecond resolution + # is not supported, will surely take longer than whatever we do here to + # detect this minimal detectable modification timestamp resolution. + step = 0.1 + if not python_nanosecond_support: + # If Python does not support nanosecond timestamp resolution we + # know the minimum possible supported timestamp resolution is 1 + # second. + minimum_resolution = max(1, minimum_resolution) + index = max(1, int(minimum_resolution / step)) + while step * index < minimum_resolution: + # Floating point number representation errors may cause our + # initially calculated start index to be too small if calculated + # directly. + index += 1 + while True: + # Do not simply add up the steps to avoid cumulative floating point + # number representation errors. + next = step * index + if next > 10: + raise TestEnvironmentError("File systems with too coarse " + "modification timestamp resolutions not supported.") + if test_time(next): + return next + index += 1 + + def __wait_for_time_change(self, path, touch, last_build_time): + """ + Wait until a newly assigned file system modification timestamp for + the given path is large enough for the timestamp difference between it + and the last build timestamp or the path's original file system + modification timestamp (depending on the last_build_time flag) to be + correctly recognized by both this Python based testing framework and + the Boost Jam executable being tested. May optionally touch the given + path to set its modification timestamp to the new value. + + """ + assert self.last_build_timestamp or not last_build_time + stats_orig = os.stat(path) + + if last_build_time: + start_time = self.last_build_timestamp + else: + start_time = stats_orig.st_mtime + + build_resolution = self.__build_timestamp_resolution() + assert build_resolution >= 0 + + # Check whether the current timestamp is already new enough. + if stats_orig.st_mtime > start_time and (not build_resolution or + stats_orig.st_mtime >= start_time + build_resolution): + return + + resolution = self.__python_timestamp_resolution(path, build_resolution) + assert resolution >= build_resolution + self.__wait_for_time_change_impl(path, start_time, resolution, build_resolution) + + if not touch: + os.utime(path, (stats_orig.st_atime, stats_orig.st_mtime)) + + def __wait_for_time_change_impl(self, path, start_time, resolution, build_resolution): + # Implementation notes: + # * Theoretically time.sleep() API might get interrupted too soon + # (never actually encountered). + # * We encountered cases where we sleep just long enough for the + # filesystem's modifiction timestamp to change to the desired value, + # but after waking up, the read timestamp is still just a tiny bit + # too small (encountered on Windows). This is most likely caused by + # imprecise floating point timestamp & sleep interval representation + # used by Python. Note though that we never encountered a case where + # more than one additional tiny sleep() call was needed to remedy + # the situation. + # * We try to wait long enough for the timestamp to change, but do not + # want to waste processing time by waiting too long. The main + # problem is that when we have a coarse resolution, the actual times + # get rounded and we do not know the exact sleep time needed for the + # difference between two such times to pass. E.g. if we have a 1 + # second resolution and the original and the current file timestamps + # are both 10 seconds then it could be that the current time is + # 10.99 seconds and that we can wait for just one hundredth of a + # second for the current file timestamp to reach its next value, and + # using a longer sleep interval than that would just be wasting + # time. + while True: + os.utime(path, None) + c = os.stat(path).st_mtime + if resolution: + if c > start_time and (not build_resolution or c >= start_time + + build_resolution): + break + if c <= start_time - resolution: + # Move close to the desired timestamp in one sleep, but not + # close enough for timestamp rounding to potentially cause + # us to wait too long. + if start_time - c > 5: + if last_build_time: + error_message = ("Last build time recorded as " + "being a future event, causing a too long " + "wait period. Something must have played " + "around with the system clock.") + else: + error_message = ("Original path modification " + "timestamp set to far into the future or " + "something must have played around with the " + "system clock, causing a too long wait " + "period.\nPath: '%s'" % path) + raise TestEnvironmentError(message) + _sleep(start_time - c) + else: + # We are close to the desired timestamp so take baby sleeps + # to avoid sleeping too long. + _sleep(max(0.01, resolution / 10)) + else: + if c > start_time: + break + _sleep(max(0.01, start_time - c)) + + +class List: + def __init__(self, s=""): + elements = [] + if isstr(s): + # Have to handle escaped spaces correctly. + elements = s.replace("\ ", "\001").split() + else: + elements = s + self.l = [e.replace("\001", " ") for e in elements] + + def __len__(self): + return len(self.l) + + def __getitem__(self, key): + return self.l[key] + + def __setitem__(self, key, value): + self.l[key] = value + + def __delitem__(self, key): + del self.l[key] + + def __str__(self): + return str(self.l) + + def __repr__(self): + return "%s.List(%r)" % (self.__module__, " ".join(self.l)) + + def __mul__(self, other): + result = List() + if not isinstance(other, List): + other = List(other) + for f in self: + for s in other: + result.l.append(f + s) + return result + + def __rmul__(self, other): + if not isinstance(other, List): + other = List(other) + return List.__mul__(other, self) + + def __add__(self, other): + result = List() + result.l = self.l[:] + other.l[:] + return result + + +def _contains_lines(data, lines): + data_line_count = len(data) + expected_line_count = reduce(lambda x, y: x + len(y), lines, 0) + index = 0 + for expected in lines: + if expected_line_count > data_line_count - index: + return False + expected_line_count -= len(expected) + index = _match_line_sequence(data, index, data_line_count - + expected_line_count, expected) + if index < 0: + return False + return True + + +def _match_line_sequence(data, start, end, lines): + if not lines: + return start + for index in range(start, end - len(lines) + 1): + data_index = index + for expected in lines: + if not fnmatch.fnmatch(data[data_index], expected): + break + data_index += 1 + else: + return data_index + return -1 + + +def _sleep(delay): + if delay > 5: + raise TestEnvironmentError("Test environment error: sleep period of " + "more than 5 seconds requested. Most likely caused by a file with " + "its modification timestamp set to sometime in the future.") + time.sleep(delay) + + +############################################################################### +# +# Initialization. +# +############################################################################### + +# Make os.stat() return file modification times as floats instead of integers +# to get the best possible file timestamp resolution available. The exact +# resolution depends on the underlying file system and the Python os.stat() +# implementation. The better the resolution we achieve, the shorter we need to +# wait for files we create to start getting new timestamps. +# +# Additional notes: +# * os.stat_float_times() function first introduced in Python 2.3. and +# suggested for deprecation in Python 3.3. +# * On Python versions 2.5+ we do not need to do this as there os.stat() +# returns floating point file modification times by default. +# * Windows CPython implementations prior to version 2.5 do not support file +# modification timestamp resolutions of less than 1 second no matter whether +# these timestamps are returned as integer or floating point values. +# * Python documentation states that this should be set in a program's +# __main__ module to avoid affecting other libraries that might not be ready +# to support floating point timestamps. Since we use no such external +# libraries, we ignore this warning to make it easier to enable this feature +# in both our single & multiple-test scripts. +if (2, 3) <= sys.version_info < (2, 5) and not os.stat_float_times(): + os.stat_float_times(True) + + +# Quickie tests. Should use doctest instead. +if __name__ == "__main__": + assert str(List("foo bar") * "/baz") == "['foo/baz', 'bar/baz']" + assert repr("foo/" * List("bar baz")) == "__main__.List('foo/bar foo/baz')" + + assert _contains_lines([], []) + assert _contains_lines([], [[]]) + assert _contains_lines([], [[], []]) + assert _contains_lines([], [[], [], []]) + assert not _contains_lines([], [[""]]) + assert not _contains_lines([], [["a"]]) + + assert _contains_lines([""], []) + assert _contains_lines(["a"], []) + assert _contains_lines(["a", "b"], []) + assert _contains_lines(["a", "b"], [[], [], []]) + + assert _contains_lines([""], [[""]]) + assert not _contains_lines([""], [["a"]]) + assert not _contains_lines(["a"], [[""]]) + assert _contains_lines(["a", "", "b", ""], [["a"]]) + assert _contains_lines(["a", "", "b", ""], [[""]]) + assert _contains_lines(["a", "", "b"], [["b"]]) + assert not _contains_lines(["a", "b"], [[""]]) + assert not _contains_lines(["a", "", "b", ""], [["c"]]) + assert _contains_lines(["a", "", "b", "x"], [["x"]]) + + data = ["1", "2", "3", "4", "5", "6", "7", "8", "9"] + assert _contains_lines(data, [["1", "2"]]) + assert not _contains_lines(data, [["2", "1"]]) + assert not _contains_lines(data, [["1", "3"]]) + assert not _contains_lines(data, [["1", "3"]]) + assert _contains_lines(data, [["1"], ["2"]]) + assert _contains_lines(data, [["1"], [], [], [], ["2"]]) + assert _contains_lines(data, [["1"], ["3"]]) + assert not _contains_lines(data, [["3"], ["1"]]) + assert _contains_lines(data, [["3"], ["7"], ["8"]]) + assert not _contains_lines(data, [["1"], ["3", "5"]]) + assert not _contains_lines(data, [["1"], [""], ["5"]]) + assert not _contains_lines(data, [["1"], ["5"], ["3"]]) + assert not _contains_lines(data, [["1"], ["5", "3"]]) + + assert not _contains_lines(data, [[" 3"]]) + assert not _contains_lines(data, [["3 "]]) + assert not _contains_lines(data, [["3", ""]]) + assert not _contains_lines(data, [["", "3"]]) + + print("tests passed") |