diff options
Diffstat (limited to '')
26 files changed, 1608 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/mozfile/__init__.py b/testing/mozbase/mozfile/mozfile/__init__.py new file mode 100644 index 0000000000..5d45755ac7 --- /dev/null +++ b/testing/mozbase/mozfile/mozfile/__init__.py @@ -0,0 +1,6 @@ +# flake8: noqa +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from .mozfile import * diff --git a/testing/mozbase/mozfile/mozfile/mozfile.py b/testing/mozbase/mozfile/mozfile/mozfile.py new file mode 100644 index 0000000000..05e5b8eaeb --- /dev/null +++ b/testing/mozbase/mozfile/mozfile/mozfile.py @@ -0,0 +1,641 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +# We don't import all modules at the top for performance reasons. See Bug 1008943 + +import errno +import os +import re +import stat +import sys +import time +import warnings +from contextlib import contextmanager +from textwrap import dedent + +from six.moves import urllib + +__all__ = [ + "extract_tarball", + "extract_zip", + "extract", + "is_url", + "load", + "copy_contents", + "match", + "move", + "remove", + "rmtree", + "tree", + "which", + "NamedTemporaryFile", + "TemporaryDirectory", +] + +# utilities for extracting archives + + +def extract_tarball(src, dest, ignore=None): + """extract a .tar file""" + + import tarfile + + with tarfile.open(src) as bundle: + namelist = [] + + for m in bundle: + # Mitigation for CVE-2007-4559, Python's tarfile library will allow + # writing files outside of the intended destination. + if ".." in m.name: + raise RuntimeError( + dedent( + f""" + Tar bundle '{src}' may be maliciously crafted to escape the destination! + The following path was detected: + + {m.name} + """ + ) + ) + if ignore and any(match(m.name, i) for i in ignore): + continue + bundle.extract(m, path=dest) + namelist.append(m.name) + + return namelist + + +def extract_zip(src, dest, ignore=None): + """extract a zip file""" + + import zipfile + + if isinstance(src, zipfile.ZipFile): + bundle = src + else: + try: + bundle = zipfile.ZipFile(src) + except Exception: + print("src: %s" % src) + raise + + namelist = bundle.namelist() + + for name in namelist: + if ignore and any(match(name, i) for i in ignore): + continue + + bundle.extract(name, dest) + filename = os.path.realpath(os.path.join(dest, name)) + mode = bundle.getinfo(name).external_attr >> 16 & 0x1FF + # Only update permissions if attributes are set. Otherwise fallback to the defaults. + if mode: + os.chmod(filename, mode) + bundle.close() + return namelist + + +def extract(src, dest=None, ignore=None): + """ + Takes in a tar or zip file and extracts it to dest + + If dest is not specified, extracts to os.path.dirname(src) + + Returns the list of top level files that were extracted + """ + + import tarfile + import zipfile + + assert os.path.exists(src), "'%s' does not exist" % src + + if dest is None: + dest = os.path.dirname(src) + elif not os.path.isdir(dest): + os.makedirs(dest) + assert not os.path.isfile(dest), "dest cannot be a file" + + if tarfile.is_tarfile(src): + namelist = extract_tarball(src, dest, ignore=ignore) + elif zipfile.is_zipfile(src): + namelist = extract_zip(src, dest, ignore=ignore) + else: + raise Exception("mozfile.extract: no archive format found for '%s'" % src) + + # namelist returns paths with forward slashes even in windows + top_level_files = [ + os.path.join(dest, name.rstrip("/")) + for name in namelist + if len(name.rstrip("/").split("/")) == 1 + ] + + # namelist doesn't include folders, append these to the list + for name in namelist: + index = name.find("/") + if index != -1: + root = os.path.join(dest, name[:index]) + if root not in top_level_files: + top_level_files.append(root) + + return top_level_files + + +# utilities for removal of files and directories + + +def rmtree(dir): + """Deprecated wrapper method to remove a directory tree. + + Ensure to update your code to use mozfile.remove() directly + + :param dir: directory to be removed + """ + + warnings.warn( + "mozfile.rmtree() is deprecated in favor of mozfile.remove()", + PendingDeprecationWarning, + stacklevel=2, + ) + return remove(dir) + + +def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): + """ + It's possible to see spurious errors on Windows due to various things + keeping a handle to the directory open (explorer, virus scanners, etc) + So we try a few times if it fails with a known error. + retry_delay is multiplied by the number of failed attempts to increase + the likelihood of success in subsequent attempts. + """ + retry_count = 0 + while True: + try: + func(*args) + except OSError as e: + # Error codes are defined in: + # http://docs.python.org/2/library/errno.html#module-errno + if e.errno not in (errno.EACCES, errno.ENOTEMPTY): + raise + + if retry_count == retry_max: + raise + + retry_count += 1 + + print( + '%s() failed for "%s". Reason: %s (%s). Retrying...' + % (func.__name__, args, e.strerror, e.errno) + ) + time.sleep(retry_count * retry_delay) + else: + # If no exception has been thrown it should be done + break + + +def remove(path): + """Removes the specified file, link, or directory tree. + + This is a replacement for shutil.rmtree that works better under + windows. It does the following things: + + - check path access for the current user before trying to remove + - retry operations on some known errors due to various things keeping + a handle on file paths - like explorer, virus scanners, etc. The + known errors are errno.EACCES and errno.ENOTEMPTY, and it will + retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds + between each attempt. + + Note that no error will be raised if the given path does not exists. + + :param path: path to be removed + """ + + import shutil + + def _call_with_windows_retry(*args, **kwargs): + try: + _call_windows_retry(*args, **kwargs) + except OSError as e: + # The file or directory to be removed doesn't exist anymore + if e.errno != errno.ENOENT: + raise + + def _update_permissions(path): + """Sets specified pemissions depending on filetype""" + if os.path.islink(path): + # Path is a symlink which we don't have to modify + # because it should already have all the needed permissions + return + + stats = os.stat(path) + + if os.path.isfile(path): + mode = stats.st_mode | stat.S_IWUSR + elif os.path.isdir(path): + mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR + else: + # Not supported type + return + + _call_with_windows_retry(os.chmod, (path, mode)) + + if not os.path.lexists(path): + return + + """ + On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access + files or directories that exceed MAX_PATH(260) limitation or that ends + with a period. + """ + if ( + sys.platform in ("win32", "cygwin") + and len(path) >= 3 + and path[1] == ":" + and path[2] == "\\" + ): + path = "\\\\?\\%s" % path + + if os.path.isfile(path) or os.path.islink(path): + # Verify the file or link is read/write for the current user + _update_permissions(path) + _call_with_windows_retry(os.remove, (path,)) + + elif os.path.isdir(path): + # Verify the directory is read/write/execute for the current user + _update_permissions(path) + + # We're ensuring that every nested item has writable permission. + for root, dirs, files in os.walk(path): + for entry in dirs + files: + _update_permissions(os.path.join(root, entry)) + _call_with_windows_retry(shutil.rmtree, (path,)) + + +def copy_contents(srcdir, dstdir): + """ + Copy the contents of the srcdir into the dstdir, preserving + subdirectories. + + If an existing file of the same name exists in dstdir, it will be overwritten. + """ + import shutil + + # dirs_exist_ok was introduced in Python 3.8 + # On earlier versions, or Windows, use the verbose mechanism. + # We use it on Windows because _call_with_windows_retry doesn't allow + # named arguments to be passed. + if (sys.version_info.major < 3 or sys.version_info.minor < 8) or (os.name == "nt"): + names = os.listdir(srcdir) + if not os.path.isdir(dstdir): + os.makedirs(dstdir) + errors = [] + for name in names: + srcname = os.path.join(srcdir, name) + dstname = os.path.join(dstdir, name) + try: + if os.path.islink(srcname): + linkto = os.readlink(srcname) + os.symlink(linkto, dstname) + elif os.path.isdir(srcname): + copy_contents(srcname, dstname) + else: + _call_windows_retry(shutil.copy2, (srcname, dstname)) + except OSError as why: + errors.append((srcname, dstname, str(why))) + except Exception as err: + errors.extend(err) + try: + _call_windows_retry(shutil.copystat, (srcdir, dstdir)) + except OSError as why: + if why.winerror is None: + errors.extend((srcdir, dstdir, str(why))) + if errors: + raise Exception(errors) + else: + shutil.copytree(srcdir, dstdir, dirs_exist_ok=True) + + +def move(src, dst): + """ + Move a file or directory path. + + This is a replacement for shutil.move that works better under windows, + retrying operations on some known errors due to various things keeping + a handle on file paths. + """ + import shutil + + _call_windows_retry(shutil.move, (src, dst)) + + +def depth(directory): + """returns the integer depth of a directory or path relative to '/'""" + + directory = os.path.abspath(directory) + level = 0 + while True: + directory, remainder = os.path.split(directory) + level += 1 + if not remainder: + break + return level + + +def tree(directory, sort_key=lambda x: x.lower()): + """Display tree directory structure for `directory`.""" + vertical_line = "│" + item_marker = "├" + last_child = "└" + + retval = [] + indent = [] + last = {} + top = depth(directory) + + for dirpath, dirnames, filenames in os.walk(directory, topdown=True): + + abspath = os.path.abspath(dirpath) + basename = os.path.basename(abspath) + parent = os.path.dirname(abspath) + level = depth(abspath) - top + + # sort articles of interest + for resource in (dirnames, filenames): + resource[:] = sorted(resource, key=sort_key) + + if level > len(indent): + indent.append(vertical_line) + indent = indent[:level] + + if dirnames: + files_end = item_marker + last[abspath] = dirnames[-1] + else: + files_end = last_child + + if last.get(parent) == os.path.basename(abspath): + # last directory of parent + dirpath_mark = last_child + indent[-1] = " " + elif not indent: + dirpath_mark = "" + else: + dirpath_mark = item_marker + + # append the directory and piece of tree structure + # if the top-level entry directory, print as passed + retval.append( + "%s%s%s" + % ("".join(indent[:-1]), dirpath_mark, basename if retval else directory) + ) + # add the files + if filenames: + last_file = filenames[-1] + retval.extend( + [ + ( + "%s%s%s" + % ( + "".join(indent), + files_end if filename == last_file else item_marker, + filename, + ) + ) + for index, filename in enumerate(filenames) + ] + ) + + return "\n".join(retval) + + +def which(cmd, mode=os.F_OK | os.X_OK, path=None, exts=None, extra_search_dirs=()): + """A wrapper around `shutil.which` to make the behavior on Windows + consistent with other platforms. + + On non-Windows platforms, this is a direct call to `shutil.which`. On + Windows, this: + + * Ensures that `cmd` without an extension will be found. Previously it was + only found if it had an extension in `PATHEXT`. + * Ensures the absolute path to the binary is returned. Previously if the + binary was found in `cwd`, a relative path was returned. + * Checks the Windows registry if shutil.which doesn't come up with anything. + + The arguments are the same as the ones in `shutil.which`. In addition there + is an `exts` argument that only has an effect on Windows. This is used to + set a custom value for PATHEXT and is formatted as a list of file + extensions. + + extra_search_dirs is a convenience argument. If provided, the strings in + the sequence will be appended to the END of the given `path`. + """ + from shutil import which as shutil_which + + if isinstance(path, (list, tuple)): + path = os.pathsep.join(path) + + if not path: + path = os.environ.get("PATH", os.defpath) + + if extra_search_dirs: + path = os.pathsep.join([path] + list(extra_search_dirs)) + + if sys.platform != "win32": + return shutil_which(cmd, mode=mode, path=path) + + oldexts = os.environ.get("PATHEXT", "") + if not exts: + exts = oldexts.split(os.pathsep) + + # This ensures that `cmd` without any extensions will be found. + # See: https://bugs.python.org/issue31405 + if "." not in exts: + exts.append(".") + + os.environ["PATHEXT"] = os.pathsep.join(exts) + try: + path = shutil_which(cmd, mode=mode, path=path) + if path: + return os.path.abspath(path.rstrip(".")) + finally: + if oldexts: + os.environ["PATHEXT"] = oldexts + else: + del os.environ["PATHEXT"] + + # If we've gotten this far, we need to check for registered executables + # before giving up. + try: + import winreg + except ImportError: + import _winreg as winreg + if not cmd.lower().endswith(".exe"): + cmd += ".exe" + try: + ret = winreg.QueryValue( + winreg.HKEY_LOCAL_MACHINE, + r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\%s" % cmd, + ) + return os.path.abspath(ret) if ret else None + except winreg.error: + return None + + +# utilities for temporary resources + + +class NamedTemporaryFile(object): + """ + Like tempfile.NamedTemporaryFile except it works on Windows + in the case where you open the created file a second time. + + This behaves very similarly to tempfile.NamedTemporaryFile but may + not behave exactly the same. For example, this function does not + prevent fd inheritance by children. + + Example usage: + + with NamedTemporaryFile() as fh: + fh.write(b'foobar') + + print('Filename: %s' % fh.name) + + see https://bugzilla.mozilla.org/show_bug.cgi?id=821362 + """ + + def __init__( + self, mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True + ): + + import tempfile + + fd, path = tempfile.mkstemp(suffix, prefix, dir, "t" in mode) + os.close(fd) + + self.file = open(path, mode) + self._path = path + self._delete = delete + self._unlinked = False + + def __getattr__(self, k): + return getattr(self.__dict__["file"], k) + + def __iter__(self): + return self.__dict__["file"] + + def __enter__(self): + self.file.__enter__() + return self + + def __exit__(self, exc, value, tb): + self.file.__exit__(exc, value, tb) + if self.__dict__["_delete"]: + os.unlink(self.__dict__["_path"]) + self._unlinked = True + + def __del__(self): + if self.__dict__["_unlinked"]: + return + self.file.__exit__(None, None, None) + if self.__dict__["_delete"]: + os.unlink(self.__dict__["_path"]) + + +@contextmanager +def TemporaryDirectory(): + """ + create a temporary directory using tempfile.mkdtemp, and then clean it up. + + Example usage: + with TemporaryDirectory() as tmp: + open(os.path.join(tmp, "a_temp_file"), "w").write("data") + + """ + + import shutil + import tempfile + + tempdir = tempfile.mkdtemp() + try: + yield tempdir + finally: + shutil.rmtree(tempdir) + + +# utilities dealing with URLs + + +def is_url(thing): + """ + Return True if thing looks like a URL. + """ + + parsed = urllib.parse.urlparse(thing) + if "scheme" in parsed: + return len(parsed.scheme) >= 2 + else: + return len(parsed[0]) >= 2 + + +def load(resource): + """ + open a file or URL for reading. If the passed resource string is not a URL, + or begins with 'file://', return a ``file``. Otherwise, return the + result of urllib.urlopen() + """ + + # handle file URLs separately due to python stdlib limitations + if resource.startswith("file://"): + resource = resource[len("file://") :] + + if not is_url(resource): + # if no scheme is given, it is a file path + return open(resource) + + return urllib.request.urlopen(resource) + + +# We can't depend on mozpack.path here, so copy the 'match' function over. + +re_cache = {} +# Python versions < 3.7 return r'\/' for re.escape('/'). +if re.escape("/") == "/": + MATCH_STAR_STAR_RE = re.compile(r"(^|/)\\\*\\\*/") + MATCH_STAR_STAR_END_RE = re.compile(r"(^|/)\\\*\\\*$") +else: + MATCH_STAR_STAR_RE = re.compile(r"(^|\\\/)\\\*\\\*\\\/") + MATCH_STAR_STAR_END_RE = re.compile(r"(^|\\\/)\\\*\\\*$") + + +def match(path, pattern): + """ + Return whether the given path matches the given pattern. + An asterisk can be used to match any string, including the null string, in + one part of the path: + + ``foo`` matches ``*``, ``f*`` or ``fo*o`` + + However, an asterisk matching a subdirectory may not match the null string: + + ``foo/bar`` does *not* match ``foo/*/bar`` + + If the pattern matches one of the ancestor directories of the path, the + patch is considered matching: + + ``foo/bar`` matches ``foo`` + + Two adjacent asterisks can be used to match files and zero or more + directories and subdirectories. + + ``foo/bar`` matches ``foo/**/bar``, or ``**/bar`` + """ + if not pattern: + return True + if pattern not in re_cache: + p = re.escape(pattern) + p = MATCH_STAR_STAR_RE.sub(r"\1(?:.+/)?", p) + p = MATCH_STAR_STAR_END_RE.sub(r"(?:\1.+)?", p) + p = p.replace(r"\*", "[^/]*") + "(?:/.*)?$" + re_cache[pattern] = re.compile(p) + return re_cache[pattern].match(path) is not None diff --git a/testing/mozbase/mozfile/setup.cfg b/testing/mozbase/mozfile/setup.cfg new file mode 100644 index 0000000000..2a9acf13da --- /dev/null +++ b/testing/mozbase/mozfile/setup.cfg @@ -0,0 +1,2 @@ +[bdist_wheel] +universal = 1 diff --git a/testing/mozbase/mozfile/setup.py b/testing/mozbase/mozfile/setup.py new file mode 100644 index 0000000000..172df3e68e --- /dev/null +++ b/testing/mozbase/mozfile/setup.py @@ -0,0 +1,34 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from setuptools import setup + +PACKAGE_NAME = "mozfile" +PACKAGE_VERSION = "3.0.0" + +setup( + name=PACKAGE_NAME, + version=PACKAGE_VERSION, + description="Library of file utilities for use in Mozilla testing", + long_description="see https://firefox-source-docs.mozilla.org/mozbase/index.html", + classifiers=[ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + ], + keywords="mozilla", + author="Mozilla Automation and Tools team", + author_email="tools@lists.mozilla.org", + url="https://wiki.mozilla.org/Auto-tools/Projects/Mozbase", + license="MPL", + packages=["mozfile"], + include_package_data=True, + zip_safe=False, + install_requires=["six >= 1.13.0"], + tests_require=["wptserve"], +) diff --git a/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip Binary files differnew file mode 100644 index 0000000000..2b5409e89c --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip diff --git a/testing/mozbase/mozfile/tests/files/which/baz b/testing/mozbase/mozfile/tests/files/which/baz new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/baz diff --git a/testing/mozbase/mozfile/tests/files/which/baz.exe b/testing/mozbase/mozfile/tests/files/which/baz.exe new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/baz.exe diff --git a/testing/mozbase/mozfile/tests/files/which/registered/quux.exe b/testing/mozbase/mozfile/tests/files/which/registered/quux.exe new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/registered/quux.exe diff --git a/testing/mozbase/mozfile/tests/files/which/unix/baz.exe b/testing/mozbase/mozfile/tests/files/which/unix/baz.exe new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/unix/baz.exe diff --git a/testing/mozbase/mozfile/tests/files/which/unix/file b/testing/mozbase/mozfile/tests/files/which/unix/file new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/unix/file diff --git a/testing/mozbase/mozfile/tests/files/which/unix/foo b/testing/mozbase/mozfile/tests/files/which/unix/foo new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/unix/foo diff --git a/testing/mozbase/mozfile/tests/files/which/win/bar b/testing/mozbase/mozfile/tests/files/which/win/bar new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/win/bar diff --git a/testing/mozbase/mozfile/tests/files/which/win/baz.exe b/testing/mozbase/mozfile/tests/files/which/win/baz.exe new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/win/baz.exe diff --git a/testing/mozbase/mozfile/tests/files/which/win/foo b/testing/mozbase/mozfile/tests/files/which/win/foo new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/win/foo diff --git a/testing/mozbase/mozfile/tests/files/which/win/foo.exe b/testing/mozbase/mozfile/tests/files/which/win/foo.exe new file mode 100755 index 0000000000..e69de29bb2 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/which/win/foo.exe diff --git a/testing/mozbase/mozfile/tests/manifest.ini b/testing/mozbase/mozfile/tests/manifest.ini new file mode 100644 index 0000000000..258034bef5 --- /dev/null +++ b/testing/mozbase/mozfile/tests/manifest.ini @@ -0,0 +1,10 @@ +[DEFAULT] +subsuite = mozbase +[test_extract.py] +[test_load.py] +[test_move_remove.py] +[test_tempdir.py] +[test_tempfile.py] +[test_tree.py] +[test_url.py] +[test_which.py] diff --git a/testing/mozbase/mozfile/tests/stubs.py b/testing/mozbase/mozfile/tests/stubs.py new file mode 100644 index 0000000000..3c1bd47207 --- /dev/null +++ b/testing/mozbase/mozfile/tests/stubs.py @@ -0,0 +1,56 @@ +import os +import shutil +import tempfile + +# stub file paths +files = [ + ("foo.txt",), + ( + "foo", + "bar.txt", + ), + ( + "foo", + "bar", + "fleem.txt", + ), + ( + "foobar", + "fleem.txt", + ), + ("bar.txt",), + ( + "nested_tree", + "bar", + "fleem.txt", + ), + ("readonly.txt",), +] + + +def create_empty_stub(): + tempdir = tempfile.mkdtemp() + return tempdir + + +def create_stub(tempdir=None): + """create a stub directory""" + + tempdir = tempdir or tempfile.mkdtemp() + try: + for path in files: + fullpath = os.path.join(tempdir, *path) + dirname = os.path.dirname(fullpath) + if not os.path.exists(dirname): + os.makedirs(dirname) + contents = path[-1] + f = open(fullpath, "w") + f.write(contents) + f.close() + return tempdir + except Exception: + try: + shutil.rmtree(tempdir) + except Exception: + pass + raise diff --git a/testing/mozbase/mozfile/tests/test_copycontents.py b/testing/mozbase/mozfile/tests/test_copycontents.py new file mode 100644 index 0000000000..b829d7b3a4 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_copycontents.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python + +import os +import shutil +import unittest + +import mozfile +import mozunit +import stubs + + +class MozfileCopyContentsTestCase(unittest.TestCase): + """Test our ability to copy the contents of directories""" + + def _directory_is_subset(self, set_, subset_): + """ + Confirm that all the contents of 'subset_' are contained in 'set_' + """ + names = os.listdir(subset_) + for name in names: + full_set_path = os.path.join(set_, name) + full_subset_path = os.path.join(subset_, name) + if os.path.isdir(full_subset_path): + self.assertTrue(os.path.isdir(full_set_path)) + self._directory_is_subset(full_set_path, full_subset_path) + elif os.path.islink(full_subset_path): + self.assertTrue(os.path.islink(full_set_path)) + else: + self.assertTrue(os.stat(full_set_path)) + + def _directories_are_equal(self, dir1, dir2): + """ + Confirm that the contents of 'dir1' are the same as 'dir2' + """ + names1 = os.listdir(dir1) + names2 = os.listdir(dir2) + self.assertTrue(len(names1) == len(names2)) + for name in names1: + self.assertTrue(name in names2) + dir1_path = os.path.join(dir1, name) + dir2_path = os.path.join(dir2, name) + if os.path.isdir(dir1_path): + self.assertTrue(os.path.isdir(dir2_path)) + self._directories_are_equal(dir1_path, dir2_path) + elif os.path.islink(dir1_path): + self.assertTrue(os.path.islink(dir2_path)) + else: + self.assertTrue(os.stat(dir2_path)) + + def test_copy_empty_directory(self): + tempdir = stubs.create_empty_stub() + dstdir = stubs.create_empty_stub() + self.assertTrue(os.path.isdir(tempdir)) + + mozfile.copy_contents(tempdir, dstdir) + self._directories_are_equal(dstdir, tempdir) + + if os.path.isdir(tempdir): + shutil.rmtree(tempdir) + if os.path.isdir(dstdir): + shutil.rmtree(dstdir) + + def test_copy_full_directory(self): + tempdir = stubs.create_stub() + dstdir = stubs.create_empty_stub() + self.assertTrue(os.path.isdir(tempdir)) + + mozfile.copy_contents(tempdir, dstdir) + self._directories_are_equal(dstdir, tempdir) + + if os.path.isdir(tempdir): + shutil.rmtree(tempdir) + if os.path.isdir(dstdir): + shutil.rmtree(dstdir) + + def test_copy_full_directory_with_existing_file(self): + tempdir = stubs.create_stub() + dstdir = stubs.create_empty_stub() + + filename = "i_dont_exist_in_tempdir" + f = open(os.path.join(dstdir, filename), "w") + f.write("Hello World") + f.close() + + self.assertTrue(os.path.isdir(tempdir)) + + mozfile.copy_contents(tempdir, dstdir) + self._directory_is_subset(dstdir, tempdir) + self.assertTrue(os.path.exists(os.path.join(dstdir, filename))) + + if os.path.isdir(tempdir): + shutil.rmtree(tempdir) + if os.path.isdir(dstdir): + shutil.rmtree(dstdir) + + def test_copy_full_directory_with_overlapping_file(self): + tempdir = stubs.create_stub() + dstdir = stubs.create_empty_stub() + + filename = "i_do_exist_in_tempdir" + for d in [tempdir, dstdir]: + f = open(os.path.join(d, filename), "w") + f.write("Hello " + d) + f.close() + + self.assertTrue(os.path.isdir(tempdir)) + self.assertTrue(os.path.exists(os.path.join(tempdir, filename))) + self.assertTrue(os.path.exists(os.path.join(dstdir, filename))) + + line = open(os.path.join(dstdir, filename), "r").readlines()[0] + self.assertTrue(line == "Hello " + dstdir) + + mozfile.copy_contents(tempdir, dstdir) + + line = open(os.path.join(dstdir, filename), "r").readlines()[0] + self.assertTrue(line == "Hello " + tempdir) + self._directories_are_equal(tempdir, dstdir) + + if os.path.isdir(tempdir): + shutil.rmtree(tempdir) + if os.path.isdir(dstdir): + shutil.rmtree(dstdir) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_extract.py b/testing/mozbase/mozfile/tests/test_extract.py new file mode 100644 index 0000000000..c2675d77f7 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_extract.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python + +import os +import tarfile +import tempfile +import zipfile + +import mozfile +import mozunit +import pytest +import stubs + + +@pytest.fixture +def ensure_directory_contents(): + """ensure the directory contents match""" + + def inner(directory): + for f in stubs.files: + path = os.path.join(directory, *f) + exists = os.path.exists(path) + if not exists: + print("%s does not exist" % (os.path.join(f))) + assert exists + if exists: + contents = open(path).read().strip() + assert contents == f[-1] + + return inner + + +@pytest.fixture(scope="module") +def tarpath(tmpdir_factory): + """create a stub tarball for testing""" + tmpdir = tmpdir_factory.mktemp("test_extract") + + tempdir = tmpdir.join("stubs").strpath + stubs.create_stub(tempdir) + filename = tmpdir.join("bundle.tar").strpath + archive = tarfile.TarFile(filename, mode="w") + for path in stubs.files: + archive.add(os.path.join(tempdir, *path), arcname=os.path.join(*path)) + archive.close() + + assert os.path.exists(filename) + return filename + + +@pytest.fixture(scope="module") +def zippath(tmpdir_factory): + """create a stub zipfile for testing""" + tmpdir = tmpdir_factory.mktemp("test_extract") + + tempdir = tmpdir.join("stubs").strpath + stubs.create_stub(tempdir) + filename = tmpdir.join("bundle.zip").strpath + archive = zipfile.ZipFile(filename, mode="w") + for path in stubs.files: + archive.write(os.path.join(tempdir, *path), arcname=os.path.join(*path)) + archive.close() + + assert os.path.exists(filename) + return filename + + +@pytest.fixture(scope="module", params=["tar", "zip"]) +def bundlepath(request, tarpath, zippath): + if request.param == "tar": + return tarpath + else: + return zippath + + +def test_extract(tmpdir, bundlepath, ensure_directory_contents): + """test extracting a zipfile""" + dest = tmpdir.mkdir("dest").strpath + mozfile.extract(bundlepath, dest) + ensure_directory_contents(dest) + + +def test_extract_zipfile_missing_file_attributes(tmpdir): + """if files do not have attributes set the default permissions have to be inherited.""" + _zipfile = os.path.join( + os.path.dirname(__file__), "files", "missing_file_attributes.zip" + ) + assert os.path.exists(_zipfile) + dest = tmpdir.mkdir("dest").strpath + + # Get the default file permissions for the user + fname = os.path.join(dest, "foo") + with open(fname, "w"): + pass + default_stmode = os.stat(fname).st_mode + + files = mozfile.extract_zip(_zipfile, dest) + for filename in files: + assert os.stat(os.path.join(dest, filename)).st_mode == default_stmode + + +def test_extract_non_archive(tarpath, zippath): + """test the generalized extract function""" + # test extracting some non-archive; this should fail + fd, filename = tempfile.mkstemp() + os.write(fd, b"This is not a zipfile or tarball") + os.close(fd) + exception = None + + try: + dest = tempfile.mkdtemp() + mozfile.extract(filename, dest) + except Exception as exc: + exception = exc + finally: + os.remove(filename) + os.rmdir(dest) + + assert isinstance(exception, Exception) + + +def test_extract_ignore(tmpdir, bundlepath): + dest = tmpdir.mkdir("dest").strpath + ignore = ("foo", "**/fleem.txt", "read*.txt") + mozfile.extract(bundlepath, dest, ignore=ignore) + + assert sorted(os.listdir(dest)) == ["bar.txt", "foo.txt"] + + +def test_tarball_escape(tmpdir): + """Ensures that extracting a tarball can't write outside of the intended + destination directory. + """ + workdir = tmpdir.mkdir("workdir") + os.chdir(workdir) + + # Generate a "malicious" bundle. + with open("bad.txt", "w") as fh: + fh.write("pwned!") + + def change_name(tarinfo): + tarinfo.name = "../" + tarinfo.name + return tarinfo + + with tarfile.open("evil.tar", "w:xz") as tar: + tar.add("bad.txt", filter=change_name) + + with pytest.raises(RuntimeError): + mozfile.extract_tarball("evil.tar", workdir) + assert not os.path.exists(tmpdir.join("bad.txt")) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_load.py b/testing/mozbase/mozfile/tests/test_load.py new file mode 100755 index 0000000000..7a3896e33b --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_load.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +""" +tests for mozfile.load +""" + +import mozunit +import pytest +from mozfile import load +from wptserve.handlers import handler +from wptserve.server import WebTestHttpd + + +@pytest.fixture(name="httpd_url") +def fixture_httpd_url(): + """Yield a started WebTestHttpd server.""" + + @handler + def example(request, response): + """Example request handler.""" + body = b"example" + return ( + 200, + [("Content-type", "text/plain"), ("Content-length", len(body))], + body, + ) + + httpd = WebTestHttpd(host="127.0.0.1", routes=[("GET", "*", example)]) + + httpd.start() + yield httpd.get_url() + httpd.stop() + + +def test_http(httpd_url): + """Test with WebTestHttpd and a http:// URL.""" + content = load(httpd_url).read() + assert content == b"example" + + +@pytest.fixture(name="temporary_file") +def fixture_temporary_file(tmpdir): + """Yield a path to a temporary file.""" + foobar = tmpdir.join("foobar.txt") + foobar.write("hello world") + + yield str(foobar) + + foobar.remove() + + +def test_file_path(temporary_file): + """Test loading from a file path.""" + assert load(temporary_file).read() == "hello world" + + +def test_file_url(temporary_file): + """Test loading from a file URL.""" + assert load("file://%s" % temporary_file).read() == "hello world" + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_move_remove.py b/testing/mozbase/mozfile/tests/test_move_remove.py new file mode 100644 index 0000000000..0679c6c3fa --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_move_remove.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python + +import errno +import os +import shutil +import stat +import threading +import time +import unittest +from contextlib import contextmanager + +import mozfile +import mozinfo +import mozunit +import stubs + + +def mark_readonly(path): + """Removes all write permissions from given file/directory. + + :param path: path of directory/file of which modes must be changed + """ + mode = os.stat(path)[stat.ST_MODE] + os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) + + +class FileOpenCloseThread(threading.Thread): + """Helper thread for asynchronous file handling""" + + def __init__(self, path, delay, delete=False): + threading.Thread.__init__(self) + self.file_opened = threading.Event() + self.delay = delay + self.path = path + self.delete = delete + + def run(self): + with open(self.path): + self.file_opened.set() + time.sleep(self.delay) + if self.delete: + try: + os.remove(self.path) + except Exception: + pass + + +@contextmanager +def wait_file_opened_in_thread(*args, **kwargs): + thread = FileOpenCloseThread(*args, **kwargs) + thread.start() + thread.file_opened.wait() + try: + yield thread + finally: + thread.join() + + +class MozfileRemoveTestCase(unittest.TestCase): + """Test our ability to remove directories and files""" + + def setUp(self): + # Generate a stub + self.tempdir = stubs.create_stub() + + def tearDown(self): + if os.path.isdir(self.tempdir): + shutil.rmtree(self.tempdir) + + def test_remove_directory(self): + """Test the removal of a directory""" + self.assertTrue(os.path.isdir(self.tempdir)) + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_remove_directory_with_open_file(self): + """Test removing a directory with an open file""" + # Open a file in the generated stub + filepath = os.path.join(self.tempdir, *stubs.files[1]) + f = open(filepath, "w") + f.write("foo-bar") + + # keep file open and then try removing the dir-tree + if mozinfo.isWin: + # On the Windows family WindowsError should be raised. + self.assertRaises(OSError, mozfile.remove, self.tempdir) + self.assertTrue(os.path.exists(self.tempdir)) + else: + # Folder should be deleted on all other platforms + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_remove_closed_file(self): + """Test removing a closed file""" + # Open a file in the generated stub + filepath = os.path.join(self.tempdir, *stubs.files[1]) + with open(filepath, "w") as f: + f.write("foo-bar") + + # Folder should be deleted on all platforms + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_removing_open_file_with_retry(self): + """Test removing a file in use with retry""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + + with wait_file_opened_in_thread(filepath, 0.2): + # on windows first attempt will fail, + # and it will be retried until the thread leave the handle + mozfile.remove(filepath) + + # Check deletion was successful + self.assertFalse(os.path.exists(filepath)) + + def test_removing_already_deleted_file_with_retry(self): + """Test removing a meanwhile removed file with retry""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + + with wait_file_opened_in_thread(filepath, 0.2, True): + # on windows first attempt will fail, and before + # the retry the opened file will be deleted in the thread + mozfile.remove(filepath) + + # Check deletion was successful + self.assertFalse(os.path.exists(filepath)) + + def test_remove_readonly_tree(self): + """Test removing a read-only directory""" + + dirpath = os.path.join(self.tempdir, "nested_tree") + mark_readonly(dirpath) + + # However, mozfile should change write permissions and remove dir. + mozfile.remove(dirpath) + + self.assertFalse(os.path.exists(dirpath)) + + def test_remove_readonly_file(self): + """Test removing read-only files""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + mark_readonly(filepath) + + # However, mozfile should change write permission and then remove file. + mozfile.remove(filepath) + + self.assertFalse(os.path.exists(filepath)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_symlink(self): + """Test removing a symlink""" + file_path = os.path.join(self.tempdir, *stubs.files[1]) + symlink_path = os.path.join(self.tempdir, "symlink") + + os.symlink(file_path, symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The linked folder and files should not be deleted + mozfile.remove(symlink_path) + self.assertFalse(os.path.exists(symlink_path)) + self.assertTrue(os.path.exists(file_path)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_symlink_in_subfolder(self): + """Test removing a folder with an contained symlink""" + file_path = os.path.join(self.tempdir, *stubs.files[0]) + dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1])) + symlink_path = os.path.join(dir_path, "symlink") + + os.symlink(file_path, symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The folder with the contained symlink will be deleted but not the + # original linked file + mozfile.remove(dir_path) + self.assertFalse(os.path.exists(dir_path)) + self.assertFalse(os.path.exists(symlink_path)) + self.assertTrue(os.path.exists(file_path)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_broken_symlink(self): + """Test removing a folder with an contained symlink""" + file_path = os.path.join(self.tempdir, "readonly.txt") + working_link = os.path.join(self.tempdir, "link_to_readonly.txt") + broken_link = os.path.join(self.tempdir, "broken_link") + os.symlink(file_path, working_link) + os.symlink(os.path.join(self.tempdir, "broken.txt"), broken_link) + + self.assertTrue(os.path.exists(file_path)) + self.assertTrue(os.path.islink(working_link)) + self.assertTrue(os.path.islink(broken_link)) + + mozfile.remove(working_link) + self.assertFalse(os.path.lexists(working_link)) + self.assertTrue(os.path.exists(file_path)) + + mozfile.remove(broken_link) + self.assertFalse(os.path.lexists(broken_link)) + + @unittest.skipIf( + mozinfo.isWin or not os.geteuid(), + "Symlinks are not supported on Windows and cannot run test as root", + ) + def test_remove_symlink_for_system_path(self): + """Test removing a symlink which points to a system folder""" + symlink_path = os.path.join(self.tempdir, "symlink") + + os.symlink(os.path.dirname(self.tempdir), symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The folder with the contained symlink will be deleted but not the + # original linked file + mozfile.remove(symlink_path) + self.assertFalse(os.path.exists(symlink_path)) + + def test_remove_path_that_does_not_exists(self): + not_existing_path = os.path.join(self.tempdir, "I_do_not_not_exists") + try: + mozfile.remove(not_existing_path) + except OSError as exc: + if exc.errno == errno.ENOENT: + self.fail("removing non existing path must not raise error") + raise + + +class MozFileMoveTestCase(unittest.TestCase): + def setUp(self): + # Generate a stub + self.tempdir = stubs.create_stub() + self.addCleanup(mozfile.rmtree, self.tempdir) + + def test_move_file(self): + file_path = os.path.join(self.tempdir, *stubs.files[1]) + moved_path = file_path + ".moved" + self.assertTrue(os.path.isfile(file_path)) + self.assertFalse(os.path.exists(moved_path)) + mozfile.move(file_path, moved_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.isfile(moved_path)) + + def test_move_file_with_retry(self): + file_path = os.path.join(self.tempdir, *stubs.files[1]) + moved_path = file_path + ".moved" + + with wait_file_opened_in_thread(file_path, 0.2): + # first move attempt should fail on windows and be retried + mozfile.move(file_path, moved_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.isfile(moved_path)) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_tempdir.py b/testing/mozbase/mozfile/tests/test_tempdir.py new file mode 100644 index 0000000000..ba16b478b6 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_tempdir.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +tests for mozfile.TemporaryDirectory +""" + +import os +import unittest + +import mozunit +from mozfile import TemporaryDirectory + + +class TestTemporaryDirectory(unittest.TestCase): + def test_removed(self): + """ensure that a TemporaryDirectory gets removed""" + path = None + with TemporaryDirectory() as tmp: + path = tmp + self.assertTrue(os.path.isdir(tmp)) + tmpfile = os.path.join(tmp, "a_temp_file") + open(tmpfile, "w").write("data") + self.assertTrue(os.path.isfile(tmpfile)) + self.assertFalse(os.path.isdir(path)) + self.assertFalse(os.path.exists(path)) + + def test_exception(self): + """ensure that TemporaryDirectory handles exceptions""" + path = None + with self.assertRaises(Exception): + with TemporaryDirectory() as tmp: + path = tmp + self.assertTrue(os.path.isdir(tmp)) + raise Exception("oops") + self.assertFalse(os.path.isdir(path)) + self.assertFalse(os.path.exists(path)) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_tempfile.py b/testing/mozbase/mozfile/tests/test_tempfile.py new file mode 100644 index 0000000000..3e250d6a76 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_tempfile.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +tests for mozfile.NamedTemporaryFile +""" +import os +import unittest + +import mozfile +import mozunit +import six + + +class TestNamedTemporaryFile(unittest.TestCase): + """test our fix for NamedTemporaryFile""" + + def test_named_temporary_file(self): + """Ensure the fix for re-opening a NamedTemporaryFile works + + Refer to https://bugzilla.mozilla.org/show_bug.cgi?id=818777 + and https://bugzilla.mozilla.org/show_bug.cgi?id=821362 + """ + + test_string = b"A simple test" + with mozfile.NamedTemporaryFile() as temp: + # Test we can write to file + temp.write(test_string) + # Forced flush, so that we can read later + temp.flush() + + # Test we can open the file again on all platforms + self.assertEqual(open(temp.name, "rb").read(), test_string) + + def test_iteration(self): + """ensure the line iterator works""" + + # make a file and write to it + tf = mozfile.NamedTemporaryFile() + notes = [b"doe", b"rae", b"mi"] + for note in notes: + tf.write(b"%s\n" % note) + tf.flush() + + # now read from it + tf.seek(0) + lines = [line.rstrip(b"\n") for line in tf.readlines()] + self.assertEqual(lines, notes) + + # now read from it iteratively + lines = [] + for line in tf: + lines.append(line.strip()) + self.assertEqual(lines, []) # because we did not seek(0) + tf.seek(0) + lines = [] + for line in tf: + lines.append(line.strip()) + self.assertEqual(lines, notes) + + def test_delete(self): + """ensure ``delete=True/False`` works as expected""" + + # make a deleteable file; ensure it gets cleaned up + path = None + with mozfile.NamedTemporaryFile(delete=True) as tf: + path = tf.name + self.assertTrue(isinstance(path, six.string_types)) + self.assertFalse(os.path.exists(path)) + + # it is also deleted when __del__ is called + # here we will do so explicitly + tf = mozfile.NamedTemporaryFile(delete=True) + path = tf.name + self.assertTrue(os.path.exists(path)) + del tf + self.assertFalse(os.path.exists(path)) + + # Now the same thing but we won't delete the file + path = None + try: + with mozfile.NamedTemporaryFile(delete=False) as tf: + path = tf.name + self.assertTrue(os.path.exists(path)) + finally: + if path and os.path.exists(path): + os.remove(path) + + path = None + try: + tf = mozfile.NamedTemporaryFile(delete=False) + path = tf.name + self.assertTrue(os.path.exists(path)) + del tf + self.assertTrue(os.path.exists(path)) + finally: + if path and os.path.exists(path): + os.remove(path) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_tree.py b/testing/mozbase/mozfile/tests/test_tree.py new file mode 100644 index 0000000000..eba7d34a6b --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_tree.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# coding=UTF-8 + +import os +import shutil +import tempfile +import unittest + +import mozunit +from mozfile import tree + + +class TestTree(unittest.TestCase): + """Test the tree function.""" + + def test_unicode_paths(self): + """Test creating tree structure from a Unicode path.""" + try: + tmpdir = tempfile.mkdtemp(suffix=u"tmp🍪") + os.mkdir(os.path.join(tmpdir, u"dir🍪")) + with open(os.path.join(tmpdir, u"file🍪"), "w") as f: + f.write("foo") + + self.assertEqual(u"{}\n├file🍪\n└dir🍪".format(tmpdir), tree(tmpdir)) + finally: + shutil.rmtree(tmpdir) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_url.py b/testing/mozbase/mozfile/tests/test_url.py new file mode 100755 index 0000000000..a19f5f16a8 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_url.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python + +""" +tests for is_url +""" +import unittest + +import mozunit +from mozfile import is_url + + +class TestIsUrl(unittest.TestCase): + """test the is_url function""" + + def test_is_url(self): + self.assertTrue(is_url("http://mozilla.org")) + self.assertFalse(is_url("/usr/bin/mozilla.org")) + self.assertTrue(is_url("file:///usr/bin/mozilla.org")) + self.assertFalse(is_url("c:\foo\bar")) + + +if __name__ == "__main__": + mozunit.main() diff --git a/testing/mozbase/mozfile/tests/test_which.py b/testing/mozbase/mozfile/tests/test_which.py new file mode 100644 index 0000000000..b02f13ccdf --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_which.py @@ -0,0 +1,63 @@ +# Any copyright is dedicated to the Public Domain. +# https://creativecommons.org/publicdomain/zero/1.0/ + +import os +import sys + +import mozunit +import six +from mozfile import which + +here = os.path.abspath(os.path.dirname(__file__)) + + +def test_which(monkeypatch): + cwd = os.path.join(here, "files", "which") + monkeypatch.chdir(cwd) + + if sys.platform == "win32": + if six.PY3: + import winreg + else: + import _winreg as winreg + bindir = os.path.join(cwd, "win") + monkeypatch.setenv("PATH", bindir) + monkeypatch.setattr(winreg, "QueryValue", (lambda k, sk: None)) + + assert which("foo.exe").lower() == os.path.join(bindir, "foo.exe").lower() + assert which("foo").lower() == os.path.join(bindir, "foo.exe").lower() + assert ( + which("foo", exts=[".FOO", ".BAR"]).lower() + == os.path.join(bindir, "foo").lower() + ) + assert os.environ.get("PATHEXT") != [".FOO", ".BAR"] + assert which("foo.txt") is None + + assert which("bar").lower() == os.path.join(bindir, "bar").lower() + assert which("baz").lower() == os.path.join(cwd, "baz.exe").lower() + + registered_dir = os.path.join(cwd, "registered") + quux = os.path.join(registered_dir, "quux.exe").lower() + + def mock_registry(key, subkey): + assert subkey == ( + r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\quux.exe" + ) + return quux + + monkeypatch.setattr(winreg, "QueryValue", mock_registry) + assert which("quux").lower() == quux + assert which("quux.exe").lower() == quux + + else: + bindir = os.path.join(cwd, "unix") + monkeypatch.setenv("PATH", bindir) + assert which("foo") == os.path.join(bindir, "foo") + assert which("baz") is None + assert which("baz", exts=[".EXE"]) is None + assert "PATHEXT" not in os.environ + assert which("file") is None + + +if __name__ == "__main__": + mozunit.main() |