summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozfile
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /testing/mozbase/mozfile
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/mozbase/mozfile')
-rw-r--r--testing/mozbase/mozfile/mozfile/__init__.py6
-rw-r--r--testing/mozbase/mozfile/mozfile/mozfile.py691
-rw-r--r--testing/mozbase/mozfile/setup.cfg2
-rw-r--r--testing/mozbase/mozfile/setup.py34
-rw-r--r--testing/mozbase/mozfile/tests/files/missing_file_attributes.zipbin0 -> 442 bytes
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/baz0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/baz.exe0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/registered/quux.exe0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/unix/baz.exe0
-rw-r--r--testing/mozbase/mozfile/tests/files/which/unix/file0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/unix/foo0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/win/bar0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/win/baz.exe0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/win/foo0
-rwxr-xr-xtesting/mozbase/mozfile/tests/files/which/win/foo.exe0
-rw-r--r--testing/mozbase/mozfile/tests/manifest.toml18
-rw-r--r--testing/mozbase/mozfile/tests/stubs.py56
-rw-r--r--testing/mozbase/mozfile/tests/test_copycontents.py126
-rw-r--r--testing/mozbase/mozfile/tests/test_extract.py152
-rwxr-xr-xtesting/mozbase/mozfile/tests/test_load.py63
-rw-r--r--testing/mozbase/mozfile/tests/test_move_remove.py253
-rw-r--r--testing/mozbase/mozfile/tests/test_tempdir.py44
-rw-r--r--testing/mozbase/mozfile/tests/test_tempfile.py105
-rw-r--r--testing/mozbase/mozfile/tests/test_tree.py30
-rwxr-xr-xtesting/mozbase/mozfile/tests/test_url.py23
-rw-r--r--testing/mozbase/mozfile/tests/test_which.py63
26 files changed, 1666 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/mozfile/__init__.py b/testing/mozbase/mozfile/mozfile/__init__.py
new file mode 100644
index 0000000000..5d45755ac7
--- /dev/null
+++ b/testing/mozbase/mozfile/mozfile/__init__.py
@@ -0,0 +1,6 @@
+# flake8: noqa
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from .mozfile import *
diff --git a/testing/mozbase/mozfile/mozfile/mozfile.py b/testing/mozbase/mozfile/mozfile/mozfile.py
new file mode 100644
index 0000000000..892f8ee20f
--- /dev/null
+++ b/testing/mozbase/mozfile/mozfile/mozfile.py
@@ -0,0 +1,691 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+# We don't import all modules at the top for performance reasons. See Bug 1008943
+
+import errno
+import os
+import re
+import stat
+import sys
+import time
+import warnings
+from contextlib import contextmanager
+from textwrap import dedent
+
+from six.moves import urllib
+
+__all__ = [
+ "extract_tarball",
+ "extract_zip",
+ "extract",
+ "is_url",
+ "load",
+ "load_source",
+ "copy_contents",
+ "match",
+ "move",
+ "remove",
+ "rmtree",
+ "tree",
+ "which",
+ "NamedTemporaryFile",
+ "TemporaryDirectory",
+]
+
+# utilities for extracting archives
+
+
+def extract_tarball(src, dest, ignore=None):
+ """extract a .tar file"""
+
+ import tarfile
+
+ def _is_within_directory(directory, target):
+ real_directory = os.path.realpath(directory)
+ real_target = os.path.realpath(target)
+ prefix = os.path.commonprefix([real_directory, real_target])
+ return prefix == real_directory
+
+ with tarfile.open(src) as bundle:
+ namelist = []
+
+ for m in bundle:
+ # Mitigation for CVE-2007-4559, Python's tarfile library will allow
+ # writing files outside of the intended destination.
+ member_path = os.path.join(dest, m.name)
+ if not _is_within_directory(dest, member_path):
+ raise RuntimeError(
+ dedent(
+ f"""
+ Tar bundle '{src}' may be maliciously crafted to escape the destination!
+ The following path was detected:
+
+ {m.name}
+ """
+ )
+ )
+ if m.issym():
+ link_path = os.path.join(os.path.dirname(member_path), m.linkname)
+ if not _is_within_directory(dest, link_path):
+ raise RuntimeError(
+ dedent(
+ f"""
+ Tar bundle '{src}' may be maliciously crafted to escape the destination!
+ The following path was detected:
+
+ {m.name}
+ """
+ )
+ )
+
+ if m.mode & (stat.S_ISUID | stat.S_ISGID):
+ raise RuntimeError(
+ dedent(
+ f"""
+ Tar bundle '{src}' may be maliciously crafted to setuid/setgid!
+ The following path was detected:
+
+ {m.name}
+ """
+ )
+ )
+
+ if ignore and any(match(m.name, i) for i in ignore):
+ continue
+ bundle.extract(m, path=dest)
+ namelist.append(m.name)
+
+ return namelist
+
+
+def extract_zip(src, dest, ignore=None):
+ """extract a zip file"""
+
+ import zipfile
+
+ if isinstance(src, zipfile.ZipFile):
+ bundle = src
+ else:
+ try:
+ bundle = zipfile.ZipFile(src)
+ except Exception:
+ print("src: %s" % src)
+ raise
+
+ namelist = bundle.namelist()
+
+ for name in namelist:
+ if ignore and any(match(name, i) for i in ignore):
+ continue
+
+ bundle.extract(name, dest)
+ filename = os.path.realpath(os.path.join(dest, name))
+ mode = bundle.getinfo(name).external_attr >> 16 & 0x1FF
+ # Only update permissions if attributes are set. Otherwise fallback to the defaults.
+ if mode:
+ os.chmod(filename, mode)
+ bundle.close()
+ return namelist
+
+
+def extract(src, dest=None, ignore=None):
+ """
+ Takes in a tar or zip file and extracts it to dest
+
+ If dest is not specified, extracts to os.path.dirname(src)
+
+ Returns the list of top level files that were extracted
+ """
+
+ import tarfile
+ import zipfile
+
+ assert os.path.exists(src), "'%s' does not exist" % src
+
+ if dest is None:
+ dest = os.path.dirname(src)
+ elif not os.path.isdir(dest):
+ os.makedirs(dest)
+ assert not os.path.isfile(dest), "dest cannot be a file"
+
+ if tarfile.is_tarfile(src):
+ namelist = extract_tarball(src, dest, ignore=ignore)
+ elif zipfile.is_zipfile(src):
+ namelist = extract_zip(src, dest, ignore=ignore)
+ else:
+ raise Exception("mozfile.extract: no archive format found for '%s'" % src)
+
+ # namelist returns paths with forward slashes even in windows
+ top_level_files = [
+ os.path.join(dest, name.rstrip("/"))
+ for name in namelist
+ if len(name.rstrip("/").split("/")) == 1
+ ]
+
+ # namelist doesn't include folders, append these to the list
+ for name in namelist:
+ index = name.find("/")
+ if index != -1:
+ root = os.path.join(dest, name[:index])
+ if root not in top_level_files:
+ top_level_files.append(root)
+
+ return top_level_files
+
+
+# utilities for removal of files and directories
+
+
+def rmtree(dir):
+ """Deprecated wrapper method to remove a directory tree.
+
+ Ensure to update your code to use mozfile.remove() directly
+
+ :param dir: directory to be removed
+ """
+
+ warnings.warn(
+ "mozfile.rmtree() is deprecated in favor of mozfile.remove()",
+ PendingDeprecationWarning,
+ stacklevel=2,
+ )
+ return remove(dir)
+
+
+def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5):
+ """
+ It's possible to see spurious errors on Windows due to various things
+ keeping a handle to the directory open (explorer, virus scanners, etc)
+ So we try a few times if it fails with a known error.
+ retry_delay is multiplied by the number of failed attempts to increase
+ the likelihood of success in subsequent attempts.
+ """
+ retry_count = 0
+ while True:
+ try:
+ func(*args)
+ except OSError as e:
+ # Error codes are defined in:
+ # http://docs.python.org/2/library/errno.html#module-errno
+ if e.errno not in (errno.EACCES, errno.ENOTEMPTY):
+ raise
+
+ if retry_count == retry_max:
+ raise
+
+ retry_count += 1
+
+ print(
+ '%s() failed for "%s". Reason: %s (%s). Retrying...'
+ % (func.__name__, args, e.strerror, e.errno)
+ )
+ time.sleep(retry_count * retry_delay)
+ else:
+ # If no exception has been thrown it should be done
+ break
+
+
+def remove(path):
+ """Removes the specified file, link, or directory tree.
+
+ This is a replacement for shutil.rmtree that works better under
+ windows. It does the following things:
+
+ - check path access for the current user before trying to remove
+ - retry operations on some known errors due to various things keeping
+ a handle on file paths - like explorer, virus scanners, etc. The
+ known errors are errno.EACCES and errno.ENOTEMPTY, and it will
+ retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds
+ between each attempt.
+
+ Note that no error will be raised if the given path does not exists.
+
+ :param path: path to be removed
+ """
+
+ import shutil
+
+ def _call_with_windows_retry(*args, **kwargs):
+ try:
+ _call_windows_retry(*args, **kwargs)
+ except OSError as e:
+ # The file or directory to be removed doesn't exist anymore
+ if e.errno != errno.ENOENT:
+ raise
+
+ def _update_permissions(path):
+ """Sets specified pemissions depending on filetype"""
+ if os.path.islink(path):
+ # Path is a symlink which we don't have to modify
+ # because it should already have all the needed permissions
+ return
+
+ stats = os.stat(path)
+
+ if os.path.isfile(path):
+ mode = stats.st_mode | stat.S_IWUSR
+ elif os.path.isdir(path):
+ mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR
+ else:
+ # Not supported type
+ return
+
+ _call_with_windows_retry(os.chmod, (path, mode))
+
+ if not os.path.lexists(path):
+ return
+
+ """
+ On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access
+ files or directories that exceed MAX_PATH(260) limitation or that ends
+ with a period.
+ """
+ if (
+ sys.platform in ("win32", "cygwin")
+ and len(path) >= 3
+ and path[1] == ":"
+ and path[2] == "\\"
+ ):
+ path = "\\\\?\\%s" % path
+
+ if os.path.isfile(path) or os.path.islink(path):
+ # Verify the file or link is read/write for the current user
+ _update_permissions(path)
+ _call_with_windows_retry(os.remove, (path,))
+
+ elif os.path.isdir(path):
+ # Verify the directory is read/write/execute for the current user
+ _update_permissions(path)
+
+ # We're ensuring that every nested item has writable permission.
+ for root, dirs, files in os.walk(path):
+ for entry in dirs + files:
+ _update_permissions(os.path.join(root, entry))
+ _call_with_windows_retry(shutil.rmtree, (path,))
+
+
+def copy_contents(srcdir, dstdir, ignore_dangling_symlinks=False):
+ """
+ Copy the contents of the srcdir into the dstdir, preserving
+ subdirectories.
+
+ If an existing file of the same name exists in dstdir, it will be overwritten.
+ """
+ import shutil
+
+ # dirs_exist_ok was introduced in Python 3.8
+ # On earlier versions, or Windows, use the verbose mechanism.
+ # We use it on Windows because _call_with_windows_retry doesn't allow
+ # named arguments to be passed.
+ if (sys.version_info.major < 3 or sys.version_info.minor < 8) or (os.name == "nt"):
+ names = os.listdir(srcdir)
+ if not os.path.isdir(dstdir):
+ os.makedirs(dstdir)
+ errors = []
+ for name in names:
+ srcname = os.path.join(srcdir, name)
+ dstname = os.path.join(dstdir, name)
+ try:
+ if os.path.islink(srcname):
+ linkto = os.readlink(srcname)
+ os.symlink(linkto, dstname)
+ elif os.path.isdir(srcname):
+ copy_contents(srcname, dstname)
+ else:
+ _call_windows_retry(shutil.copy2, (srcname, dstname))
+ except OSError as why:
+ errors.append((srcname, dstname, str(why)))
+ except Exception as err:
+ errors.extend(err)
+ try:
+ _call_windows_retry(shutil.copystat, (srcdir, dstdir))
+ except OSError as why:
+ if why.winerror is None:
+ errors.extend((srcdir, dstdir, str(why)))
+ if errors:
+ raise Exception(errors)
+ else:
+ shutil.copytree(
+ srcdir,
+ dstdir,
+ dirs_exist_ok=True,
+ ignore_dangling_symlinks=ignore_dangling_symlinks,
+ )
+
+
+def move(src, dst):
+ """
+ Move a file or directory path.
+
+ This is a replacement for shutil.move that works better under windows,
+ retrying operations on some known errors due to various things keeping
+ a handle on file paths.
+ """
+ import shutil
+
+ _call_windows_retry(shutil.move, (src, dst))
+
+
+def depth(directory):
+ """returns the integer depth of a directory or path relative to '/'"""
+
+ directory = os.path.abspath(directory)
+ level = 0
+ while True:
+ directory, remainder = os.path.split(directory)
+ level += 1
+ if not remainder:
+ break
+ return level
+
+
+def tree(directory, sort_key=lambda x: x.lower()):
+ """Display tree directory structure for `directory`."""
+ vertical_line = "│"
+ item_marker = "├"
+ last_child = "└"
+
+ retval = []
+ indent = []
+ last = {}
+ top = depth(directory)
+
+ for dirpath, dirnames, filenames in os.walk(directory, topdown=True):
+ abspath = os.path.abspath(dirpath)
+ basename = os.path.basename(abspath)
+ parent = os.path.dirname(abspath)
+ level = depth(abspath) - top
+
+ # sort articles of interest
+ for resource in (dirnames, filenames):
+ resource[:] = sorted(resource, key=sort_key)
+
+ if level > len(indent):
+ indent.append(vertical_line)
+ indent = indent[:level]
+
+ if dirnames:
+ files_end = item_marker
+ last[abspath] = dirnames[-1]
+ else:
+ files_end = last_child
+
+ if last.get(parent) == os.path.basename(abspath):
+ # last directory of parent
+ dirpath_mark = last_child
+ indent[-1] = " "
+ elif not indent:
+ dirpath_mark = ""
+ else:
+ dirpath_mark = item_marker
+
+ # append the directory and piece of tree structure
+ # if the top-level entry directory, print as passed
+ retval.append(
+ "%s%s%s"
+ % ("".join(indent[:-1]), dirpath_mark, basename if retval else directory)
+ )
+ # add the files
+ if filenames:
+ last_file = filenames[-1]
+ retval.extend(
+ [
+ (
+ "%s%s%s"
+ % (
+ "".join(indent),
+ files_end if filename == last_file else item_marker,
+ filename,
+ )
+ )
+ for index, filename in enumerate(filenames)
+ ]
+ )
+
+ return "\n".join(retval)
+
+
+def which(cmd, mode=os.F_OK | os.X_OK, path=None, exts=None, extra_search_dirs=()):
+ """A wrapper around `shutil.which` to make the behavior on Windows
+ consistent with other platforms.
+
+ On non-Windows platforms, this is a direct call to `shutil.which`. On
+ Windows, this:
+
+ * Ensures that `cmd` without an extension will be found. Previously it was
+ only found if it had an extension in `PATHEXT`.
+ * Ensures the absolute path to the binary is returned. Previously if the
+ binary was found in `cwd`, a relative path was returned.
+ * Checks the Windows registry if shutil.which doesn't come up with anything.
+
+ The arguments are the same as the ones in `shutil.which`. In addition there
+ is an `exts` argument that only has an effect on Windows. This is used to
+ set a custom value for PATHEXT and is formatted as a list of file
+ extensions.
+
+ extra_search_dirs is a convenience argument. If provided, the strings in
+ the sequence will be appended to the END of the given `path`.
+ """
+ from shutil import which as shutil_which
+
+ if isinstance(path, (list, tuple)):
+ path = os.pathsep.join(path)
+
+ if not path:
+ path = os.environ.get("PATH", os.defpath)
+
+ if extra_search_dirs:
+ path = os.pathsep.join([path] + list(extra_search_dirs))
+
+ if sys.platform != "win32":
+ return shutil_which(cmd, mode=mode, path=path)
+
+ oldexts = os.environ.get("PATHEXT", "")
+ if not exts:
+ exts = oldexts.split(os.pathsep)
+
+ # This ensures that `cmd` without any extensions will be found.
+ # See: https://bugs.python.org/issue31405
+ if "." not in exts:
+ exts.append(".")
+
+ os.environ["PATHEXT"] = os.pathsep.join(exts)
+ try:
+ path = shutil_which(cmd, mode=mode, path=path)
+ if path:
+ return os.path.abspath(path.rstrip("."))
+ finally:
+ if oldexts:
+ os.environ["PATHEXT"] = oldexts
+ else:
+ del os.environ["PATHEXT"]
+
+ # If we've gotten this far, we need to check for registered executables
+ # before giving up.
+ try:
+ import winreg
+ except ImportError:
+ import _winreg as winreg
+ if not cmd.lower().endswith(".exe"):
+ cmd += ".exe"
+ try:
+ ret = winreg.QueryValue(
+ winreg.HKEY_LOCAL_MACHINE,
+ r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\%s" % cmd,
+ )
+ return os.path.abspath(ret) if ret else None
+ except winreg.error:
+ return None
+
+
+# utilities for temporary resources
+
+
+class NamedTemporaryFile(object):
+ """
+ Like tempfile.NamedTemporaryFile except it works on Windows
+ in the case where you open the created file a second time.
+
+ This behaves very similarly to tempfile.NamedTemporaryFile but may
+ not behave exactly the same. For example, this function does not
+ prevent fd inheritance by children.
+
+ Example usage:
+
+ with NamedTemporaryFile() as fh:
+ fh.write(b'foobar')
+
+ print('Filename: %s' % fh.name)
+
+ see https://bugzilla.mozilla.org/show_bug.cgi?id=821362
+ """
+
+ def __init__(
+ self, mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
+ ):
+ import tempfile
+
+ fd, path = tempfile.mkstemp(suffix, prefix, dir, "t" in mode)
+ os.close(fd)
+
+ self.file = open(path, mode)
+ self._path = path
+ self._delete = delete
+ self._unlinked = False
+
+ def __getattr__(self, k):
+ return getattr(self.__dict__["file"], k)
+
+ def __iter__(self):
+ return self.__dict__["file"]
+
+ def __enter__(self):
+ self.file.__enter__()
+ return self
+
+ def __exit__(self, exc, value, tb):
+ self.file.__exit__(exc, value, tb)
+ if self.__dict__["_delete"]:
+ os.unlink(self.__dict__["_path"])
+ self._unlinked = True
+
+ def __del__(self):
+ if self.__dict__["_unlinked"]:
+ return
+ self.file.__exit__(None, None, None)
+ if self.__dict__["_delete"]:
+ os.unlink(self.__dict__["_path"])
+
+
+@contextmanager
+def TemporaryDirectory():
+ """
+ create a temporary directory using tempfile.mkdtemp, and then clean it up.
+
+ Example usage:
+ with TemporaryDirectory() as tmp:
+ open(os.path.join(tmp, "a_temp_file"), "w").write("data")
+
+ """
+
+ import shutil
+ import tempfile
+
+ tempdir = tempfile.mkdtemp()
+ try:
+ yield tempdir
+ finally:
+ shutil.rmtree(tempdir)
+
+
+# utilities dealing with URLs
+
+
+def is_url(thing):
+ """
+ Return True if thing looks like a URL.
+ """
+
+ parsed = urllib.parse.urlparse(thing)
+ if "scheme" in parsed:
+ return len(parsed.scheme) >= 2
+ else:
+ return len(parsed[0]) >= 2
+
+
+def load(resource):
+ """
+ open a file or URL for reading. If the passed resource string is not a URL,
+ or begins with 'file://', return a ``file``. Otherwise, return the
+ result of urllib.urlopen()
+ """
+
+ # handle file URLs separately due to python stdlib limitations
+ if resource.startswith("file://"):
+ resource = resource[len("file://") :]
+
+ if not is_url(resource):
+ # if no scheme is given, it is a file path
+ return open(resource)
+
+ return urllib.request.urlopen(resource)
+
+
+# see https://docs.python.org/3/whatsnew/3.12.html#imp
+def load_source(modname, filename):
+ import importlib.machinery
+ import importlib.util
+
+ loader = importlib.machinery.SourceFileLoader(modname, filename)
+ spec = importlib.util.spec_from_file_location(modname, filename, loader=loader)
+ module = importlib.util.module_from_spec(spec)
+ sys.modules[module.__name__] = module
+ loader.exec_module(module)
+ return module
+
+
+# We can't depend on mozpack.path here, so copy the 'match' function over.
+
+re_cache = {}
+# Python versions < 3.7 return r'\/' for re.escape('/').
+if re.escape("/") == "/":
+ MATCH_STAR_STAR_RE = re.compile(r"(^|/)\\\*\\\*/")
+ MATCH_STAR_STAR_END_RE = re.compile(r"(^|/)\\\*\\\*$")
+else:
+ MATCH_STAR_STAR_RE = re.compile(r"(^|\\\/)\\\*\\\*\\\/")
+ MATCH_STAR_STAR_END_RE = re.compile(r"(^|\\\/)\\\*\\\*$")
+
+
+def match(path, pattern):
+ """
+ Return whether the given path matches the given pattern.
+ An asterisk can be used to match any string, including the null string, in
+ one part of the path:
+
+ ``foo`` matches ``*``, ``f*`` or ``fo*o``
+
+ However, an asterisk matching a subdirectory may not match the null string:
+
+ ``foo/bar`` does *not* match ``foo/*/bar``
+
+ If the pattern matches one of the ancestor directories of the path, the
+ patch is considered matching:
+
+ ``foo/bar`` matches ``foo``
+
+ Two adjacent asterisks can be used to match files and zero or more
+ directories and subdirectories.
+
+ ``foo/bar`` matches ``foo/**/bar``, or ``**/bar``
+ """
+ if not pattern:
+ return True
+ if pattern not in re_cache:
+ p = re.escape(pattern)
+ p = MATCH_STAR_STAR_RE.sub(r"\1(?:.+/)?", p)
+ p = MATCH_STAR_STAR_END_RE.sub(r"(?:\1.+)?", p)
+ p = p.replace(r"\*", "[^/]*") + "(?:/.*)?$"
+ re_cache[pattern] = re.compile(p)
+ return re_cache[pattern].match(path) is not None
diff --git a/testing/mozbase/mozfile/setup.cfg b/testing/mozbase/mozfile/setup.cfg
new file mode 100644
index 0000000000..2a9acf13da
--- /dev/null
+++ b/testing/mozbase/mozfile/setup.cfg
@@ -0,0 +1,2 @@
+[bdist_wheel]
+universal = 1
diff --git a/testing/mozbase/mozfile/setup.py b/testing/mozbase/mozfile/setup.py
new file mode 100644
index 0000000000..172df3e68e
--- /dev/null
+++ b/testing/mozbase/mozfile/setup.py
@@ -0,0 +1,34 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from setuptools import setup
+
+PACKAGE_NAME = "mozfile"
+PACKAGE_VERSION = "3.0.0"
+
+setup(
+ name=PACKAGE_NAME,
+ version=PACKAGE_VERSION,
+ description="Library of file utilities for use in Mozilla testing",
+ long_description="see https://firefox-source-docs.mozilla.org/mozbase/index.html",
+ classifiers=[
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)",
+ ],
+ keywords="mozilla",
+ author="Mozilla Automation and Tools team",
+ author_email="tools@lists.mozilla.org",
+ url="https://wiki.mozilla.org/Auto-tools/Projects/Mozbase",
+ license="MPL",
+ packages=["mozfile"],
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=["six >= 1.13.0"],
+ tests_require=["wptserve"],
+)
diff --git a/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip
new file mode 100644
index 0000000000..2b5409e89c
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip
Binary files differ
diff --git a/testing/mozbase/mozfile/tests/files/which/baz b/testing/mozbase/mozfile/tests/files/which/baz
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/baz
diff --git a/testing/mozbase/mozfile/tests/files/which/baz.exe b/testing/mozbase/mozfile/tests/files/which/baz.exe
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/baz.exe
diff --git a/testing/mozbase/mozfile/tests/files/which/registered/quux.exe b/testing/mozbase/mozfile/tests/files/which/registered/quux.exe
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/registered/quux.exe
diff --git a/testing/mozbase/mozfile/tests/files/which/unix/baz.exe b/testing/mozbase/mozfile/tests/files/which/unix/baz.exe
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/unix/baz.exe
diff --git a/testing/mozbase/mozfile/tests/files/which/unix/file b/testing/mozbase/mozfile/tests/files/which/unix/file
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/unix/file
diff --git a/testing/mozbase/mozfile/tests/files/which/unix/foo b/testing/mozbase/mozfile/tests/files/which/unix/foo
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/unix/foo
diff --git a/testing/mozbase/mozfile/tests/files/which/win/bar b/testing/mozbase/mozfile/tests/files/which/win/bar
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/win/bar
diff --git a/testing/mozbase/mozfile/tests/files/which/win/baz.exe b/testing/mozbase/mozfile/tests/files/which/win/baz.exe
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/win/baz.exe
diff --git a/testing/mozbase/mozfile/tests/files/which/win/foo b/testing/mozbase/mozfile/tests/files/which/win/foo
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/win/foo
diff --git a/testing/mozbase/mozfile/tests/files/which/win/foo.exe b/testing/mozbase/mozfile/tests/files/which/win/foo.exe
new file mode 100755
index 0000000000..e69de29bb2
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/which/win/foo.exe
diff --git a/testing/mozbase/mozfile/tests/manifest.toml b/testing/mozbase/mozfile/tests/manifest.toml
new file mode 100644
index 0000000000..643b9c4c6e
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/manifest.toml
@@ -0,0 +1,18 @@
+[DEFAULT]
+subsuite = "mozbase"
+
+["test_extract.py"]
+
+["test_load.py"]
+
+["test_move_remove.py"]
+
+["test_tempdir.py"]
+
+["test_tempfile.py"]
+
+["test_tree.py"]
+
+["test_url.py"]
+
+["test_which.py"]
diff --git a/testing/mozbase/mozfile/tests/stubs.py b/testing/mozbase/mozfile/tests/stubs.py
new file mode 100644
index 0000000000..3c1bd47207
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/stubs.py
@@ -0,0 +1,56 @@
+import os
+import shutil
+import tempfile
+
+# stub file paths
+files = [
+ ("foo.txt",),
+ (
+ "foo",
+ "bar.txt",
+ ),
+ (
+ "foo",
+ "bar",
+ "fleem.txt",
+ ),
+ (
+ "foobar",
+ "fleem.txt",
+ ),
+ ("bar.txt",),
+ (
+ "nested_tree",
+ "bar",
+ "fleem.txt",
+ ),
+ ("readonly.txt",),
+]
+
+
+def create_empty_stub():
+ tempdir = tempfile.mkdtemp()
+ return tempdir
+
+
+def create_stub(tempdir=None):
+ """create a stub directory"""
+
+ tempdir = tempdir or tempfile.mkdtemp()
+ try:
+ for path in files:
+ fullpath = os.path.join(tempdir, *path)
+ dirname = os.path.dirname(fullpath)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+ contents = path[-1]
+ f = open(fullpath, "w")
+ f.write(contents)
+ f.close()
+ return tempdir
+ except Exception:
+ try:
+ shutil.rmtree(tempdir)
+ except Exception:
+ pass
+ raise
diff --git a/testing/mozbase/mozfile/tests/test_copycontents.py b/testing/mozbase/mozfile/tests/test_copycontents.py
new file mode 100644
index 0000000000..b829d7b3a4
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_copycontents.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+
+import os
+import shutil
+import unittest
+
+import mozfile
+import mozunit
+import stubs
+
+
+class MozfileCopyContentsTestCase(unittest.TestCase):
+ """Test our ability to copy the contents of directories"""
+
+ def _directory_is_subset(self, set_, subset_):
+ """
+ Confirm that all the contents of 'subset_' are contained in 'set_'
+ """
+ names = os.listdir(subset_)
+ for name in names:
+ full_set_path = os.path.join(set_, name)
+ full_subset_path = os.path.join(subset_, name)
+ if os.path.isdir(full_subset_path):
+ self.assertTrue(os.path.isdir(full_set_path))
+ self._directory_is_subset(full_set_path, full_subset_path)
+ elif os.path.islink(full_subset_path):
+ self.assertTrue(os.path.islink(full_set_path))
+ else:
+ self.assertTrue(os.stat(full_set_path))
+
+ def _directories_are_equal(self, dir1, dir2):
+ """
+ Confirm that the contents of 'dir1' are the same as 'dir2'
+ """
+ names1 = os.listdir(dir1)
+ names2 = os.listdir(dir2)
+ self.assertTrue(len(names1) == len(names2))
+ for name in names1:
+ self.assertTrue(name in names2)
+ dir1_path = os.path.join(dir1, name)
+ dir2_path = os.path.join(dir2, name)
+ if os.path.isdir(dir1_path):
+ self.assertTrue(os.path.isdir(dir2_path))
+ self._directories_are_equal(dir1_path, dir2_path)
+ elif os.path.islink(dir1_path):
+ self.assertTrue(os.path.islink(dir2_path))
+ else:
+ self.assertTrue(os.stat(dir2_path))
+
+ def test_copy_empty_directory(self):
+ tempdir = stubs.create_empty_stub()
+ dstdir = stubs.create_empty_stub()
+ self.assertTrue(os.path.isdir(tempdir))
+
+ mozfile.copy_contents(tempdir, dstdir)
+ self._directories_are_equal(dstdir, tempdir)
+
+ if os.path.isdir(tempdir):
+ shutil.rmtree(tempdir)
+ if os.path.isdir(dstdir):
+ shutil.rmtree(dstdir)
+
+ def test_copy_full_directory(self):
+ tempdir = stubs.create_stub()
+ dstdir = stubs.create_empty_stub()
+ self.assertTrue(os.path.isdir(tempdir))
+
+ mozfile.copy_contents(tempdir, dstdir)
+ self._directories_are_equal(dstdir, tempdir)
+
+ if os.path.isdir(tempdir):
+ shutil.rmtree(tempdir)
+ if os.path.isdir(dstdir):
+ shutil.rmtree(dstdir)
+
+ def test_copy_full_directory_with_existing_file(self):
+ tempdir = stubs.create_stub()
+ dstdir = stubs.create_empty_stub()
+
+ filename = "i_dont_exist_in_tempdir"
+ f = open(os.path.join(dstdir, filename), "w")
+ f.write("Hello World")
+ f.close()
+
+ self.assertTrue(os.path.isdir(tempdir))
+
+ mozfile.copy_contents(tempdir, dstdir)
+ self._directory_is_subset(dstdir, tempdir)
+ self.assertTrue(os.path.exists(os.path.join(dstdir, filename)))
+
+ if os.path.isdir(tempdir):
+ shutil.rmtree(tempdir)
+ if os.path.isdir(dstdir):
+ shutil.rmtree(dstdir)
+
+ def test_copy_full_directory_with_overlapping_file(self):
+ tempdir = stubs.create_stub()
+ dstdir = stubs.create_empty_stub()
+
+ filename = "i_do_exist_in_tempdir"
+ for d in [tempdir, dstdir]:
+ f = open(os.path.join(d, filename), "w")
+ f.write("Hello " + d)
+ f.close()
+
+ self.assertTrue(os.path.isdir(tempdir))
+ self.assertTrue(os.path.exists(os.path.join(tempdir, filename)))
+ self.assertTrue(os.path.exists(os.path.join(dstdir, filename)))
+
+ line = open(os.path.join(dstdir, filename), "r").readlines()[0]
+ self.assertTrue(line == "Hello " + dstdir)
+
+ mozfile.copy_contents(tempdir, dstdir)
+
+ line = open(os.path.join(dstdir, filename), "r").readlines()[0]
+ self.assertTrue(line == "Hello " + tempdir)
+ self._directories_are_equal(tempdir, dstdir)
+
+ if os.path.isdir(tempdir):
+ shutil.rmtree(tempdir)
+ if os.path.isdir(dstdir):
+ shutil.rmtree(dstdir)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_extract.py b/testing/mozbase/mozfile/tests/test_extract.py
new file mode 100644
index 0000000000..c2675d77f7
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_extract.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+
+import os
+import tarfile
+import tempfile
+import zipfile
+
+import mozfile
+import mozunit
+import pytest
+import stubs
+
+
+@pytest.fixture
+def ensure_directory_contents():
+ """ensure the directory contents match"""
+
+ def inner(directory):
+ for f in stubs.files:
+ path = os.path.join(directory, *f)
+ exists = os.path.exists(path)
+ if not exists:
+ print("%s does not exist" % (os.path.join(f)))
+ assert exists
+ if exists:
+ contents = open(path).read().strip()
+ assert contents == f[-1]
+
+ return inner
+
+
+@pytest.fixture(scope="module")
+def tarpath(tmpdir_factory):
+ """create a stub tarball for testing"""
+ tmpdir = tmpdir_factory.mktemp("test_extract")
+
+ tempdir = tmpdir.join("stubs").strpath
+ stubs.create_stub(tempdir)
+ filename = tmpdir.join("bundle.tar").strpath
+ archive = tarfile.TarFile(filename, mode="w")
+ for path in stubs.files:
+ archive.add(os.path.join(tempdir, *path), arcname=os.path.join(*path))
+ archive.close()
+
+ assert os.path.exists(filename)
+ return filename
+
+
+@pytest.fixture(scope="module")
+def zippath(tmpdir_factory):
+ """create a stub zipfile for testing"""
+ tmpdir = tmpdir_factory.mktemp("test_extract")
+
+ tempdir = tmpdir.join("stubs").strpath
+ stubs.create_stub(tempdir)
+ filename = tmpdir.join("bundle.zip").strpath
+ archive = zipfile.ZipFile(filename, mode="w")
+ for path in stubs.files:
+ archive.write(os.path.join(tempdir, *path), arcname=os.path.join(*path))
+ archive.close()
+
+ assert os.path.exists(filename)
+ return filename
+
+
+@pytest.fixture(scope="module", params=["tar", "zip"])
+def bundlepath(request, tarpath, zippath):
+ if request.param == "tar":
+ return tarpath
+ else:
+ return zippath
+
+
+def test_extract(tmpdir, bundlepath, ensure_directory_contents):
+ """test extracting a zipfile"""
+ dest = tmpdir.mkdir("dest").strpath
+ mozfile.extract(bundlepath, dest)
+ ensure_directory_contents(dest)
+
+
+def test_extract_zipfile_missing_file_attributes(tmpdir):
+ """if files do not have attributes set the default permissions have to be inherited."""
+ _zipfile = os.path.join(
+ os.path.dirname(__file__), "files", "missing_file_attributes.zip"
+ )
+ assert os.path.exists(_zipfile)
+ dest = tmpdir.mkdir("dest").strpath
+
+ # Get the default file permissions for the user
+ fname = os.path.join(dest, "foo")
+ with open(fname, "w"):
+ pass
+ default_stmode = os.stat(fname).st_mode
+
+ files = mozfile.extract_zip(_zipfile, dest)
+ for filename in files:
+ assert os.stat(os.path.join(dest, filename)).st_mode == default_stmode
+
+
+def test_extract_non_archive(tarpath, zippath):
+ """test the generalized extract function"""
+ # test extracting some non-archive; this should fail
+ fd, filename = tempfile.mkstemp()
+ os.write(fd, b"This is not a zipfile or tarball")
+ os.close(fd)
+ exception = None
+
+ try:
+ dest = tempfile.mkdtemp()
+ mozfile.extract(filename, dest)
+ except Exception as exc:
+ exception = exc
+ finally:
+ os.remove(filename)
+ os.rmdir(dest)
+
+ assert isinstance(exception, Exception)
+
+
+def test_extract_ignore(tmpdir, bundlepath):
+ dest = tmpdir.mkdir("dest").strpath
+ ignore = ("foo", "**/fleem.txt", "read*.txt")
+ mozfile.extract(bundlepath, dest, ignore=ignore)
+
+ assert sorted(os.listdir(dest)) == ["bar.txt", "foo.txt"]
+
+
+def test_tarball_escape(tmpdir):
+ """Ensures that extracting a tarball can't write outside of the intended
+ destination directory.
+ """
+ workdir = tmpdir.mkdir("workdir")
+ os.chdir(workdir)
+
+ # Generate a "malicious" bundle.
+ with open("bad.txt", "w") as fh:
+ fh.write("pwned!")
+
+ def change_name(tarinfo):
+ tarinfo.name = "../" + tarinfo.name
+ return tarinfo
+
+ with tarfile.open("evil.tar", "w:xz") as tar:
+ tar.add("bad.txt", filter=change_name)
+
+ with pytest.raises(RuntimeError):
+ mozfile.extract_tarball("evil.tar", workdir)
+ assert not os.path.exists(tmpdir.join("bad.txt"))
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_load.py b/testing/mozbase/mozfile/tests/test_load.py
new file mode 100755
index 0000000000..7a3896e33b
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_load.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+"""
+tests for mozfile.load
+"""
+
+import mozunit
+import pytest
+from mozfile import load
+from wptserve.handlers import handler
+from wptserve.server import WebTestHttpd
+
+
+@pytest.fixture(name="httpd_url")
+def fixture_httpd_url():
+ """Yield a started WebTestHttpd server."""
+
+ @handler
+ def example(request, response):
+ """Example request handler."""
+ body = b"example"
+ return (
+ 200,
+ [("Content-type", "text/plain"), ("Content-length", len(body))],
+ body,
+ )
+
+ httpd = WebTestHttpd(host="127.0.0.1", routes=[("GET", "*", example)])
+
+ httpd.start()
+ yield httpd.get_url()
+ httpd.stop()
+
+
+def test_http(httpd_url):
+ """Test with WebTestHttpd and a http:// URL."""
+ content = load(httpd_url).read()
+ assert content == b"example"
+
+
+@pytest.fixture(name="temporary_file")
+def fixture_temporary_file(tmpdir):
+ """Yield a path to a temporary file."""
+ foobar = tmpdir.join("foobar.txt")
+ foobar.write("hello world")
+
+ yield str(foobar)
+
+ foobar.remove()
+
+
+def test_file_path(temporary_file):
+ """Test loading from a file path."""
+ assert load(temporary_file).read() == "hello world"
+
+
+def test_file_url(temporary_file):
+ """Test loading from a file URL."""
+ assert load("file://%s" % temporary_file).read() == "hello world"
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_move_remove.py b/testing/mozbase/mozfile/tests/test_move_remove.py
new file mode 100644
index 0000000000..0679c6c3fa
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_move_remove.py
@@ -0,0 +1,253 @@
+#!/usr/bin/env python
+
+import errno
+import os
+import shutil
+import stat
+import threading
+import time
+import unittest
+from contextlib import contextmanager
+
+import mozfile
+import mozinfo
+import mozunit
+import stubs
+
+
+def mark_readonly(path):
+ """Removes all write permissions from given file/directory.
+
+ :param path: path of directory/file of which modes must be changed
+ """
+ mode = os.stat(path)[stat.ST_MODE]
+ os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+
+
+class FileOpenCloseThread(threading.Thread):
+ """Helper thread for asynchronous file handling"""
+
+ def __init__(self, path, delay, delete=False):
+ threading.Thread.__init__(self)
+ self.file_opened = threading.Event()
+ self.delay = delay
+ self.path = path
+ self.delete = delete
+
+ def run(self):
+ with open(self.path):
+ self.file_opened.set()
+ time.sleep(self.delay)
+ if self.delete:
+ try:
+ os.remove(self.path)
+ except Exception:
+ pass
+
+
+@contextmanager
+def wait_file_opened_in_thread(*args, **kwargs):
+ thread = FileOpenCloseThread(*args, **kwargs)
+ thread.start()
+ thread.file_opened.wait()
+ try:
+ yield thread
+ finally:
+ thread.join()
+
+
+class MozfileRemoveTestCase(unittest.TestCase):
+ """Test our ability to remove directories and files"""
+
+ def setUp(self):
+ # Generate a stub
+ self.tempdir = stubs.create_stub()
+
+ def tearDown(self):
+ if os.path.isdir(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def test_remove_directory(self):
+ """Test the removal of a directory"""
+ self.assertTrue(os.path.isdir(self.tempdir))
+ mozfile.remove(self.tempdir)
+ self.assertFalse(os.path.exists(self.tempdir))
+
+ def test_remove_directory_with_open_file(self):
+ """Test removing a directory with an open file"""
+ # Open a file in the generated stub
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+ f = open(filepath, "w")
+ f.write("foo-bar")
+
+ # keep file open and then try removing the dir-tree
+ if mozinfo.isWin:
+ # On the Windows family WindowsError should be raised.
+ self.assertRaises(OSError, mozfile.remove, self.tempdir)
+ self.assertTrue(os.path.exists(self.tempdir))
+ else:
+ # Folder should be deleted on all other platforms
+ mozfile.remove(self.tempdir)
+ self.assertFalse(os.path.exists(self.tempdir))
+
+ def test_remove_closed_file(self):
+ """Test removing a closed file"""
+ # Open a file in the generated stub
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+ with open(filepath, "w") as f:
+ f.write("foo-bar")
+
+ # Folder should be deleted on all platforms
+ mozfile.remove(self.tempdir)
+ self.assertFalse(os.path.exists(self.tempdir))
+
+ def test_removing_open_file_with_retry(self):
+ """Test removing a file in use with retry"""
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+
+ with wait_file_opened_in_thread(filepath, 0.2):
+ # on windows first attempt will fail,
+ # and it will be retried until the thread leave the handle
+ mozfile.remove(filepath)
+
+ # Check deletion was successful
+ self.assertFalse(os.path.exists(filepath))
+
+ def test_removing_already_deleted_file_with_retry(self):
+ """Test removing a meanwhile removed file with retry"""
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+
+ with wait_file_opened_in_thread(filepath, 0.2, True):
+ # on windows first attempt will fail, and before
+ # the retry the opened file will be deleted in the thread
+ mozfile.remove(filepath)
+
+ # Check deletion was successful
+ self.assertFalse(os.path.exists(filepath))
+
+ def test_remove_readonly_tree(self):
+ """Test removing a read-only directory"""
+
+ dirpath = os.path.join(self.tempdir, "nested_tree")
+ mark_readonly(dirpath)
+
+ # However, mozfile should change write permissions and remove dir.
+ mozfile.remove(dirpath)
+
+ self.assertFalse(os.path.exists(dirpath))
+
+ def test_remove_readonly_file(self):
+ """Test removing read-only files"""
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+ mark_readonly(filepath)
+
+ # However, mozfile should change write permission and then remove file.
+ mozfile.remove(filepath)
+
+ self.assertFalse(os.path.exists(filepath))
+
+ @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
+ def test_remove_symlink(self):
+ """Test removing a symlink"""
+ file_path = os.path.join(self.tempdir, *stubs.files[1])
+ symlink_path = os.path.join(self.tempdir, "symlink")
+
+ os.symlink(file_path, symlink_path)
+ self.assertTrue(os.path.islink(symlink_path))
+
+ # The linked folder and files should not be deleted
+ mozfile.remove(symlink_path)
+ self.assertFalse(os.path.exists(symlink_path))
+ self.assertTrue(os.path.exists(file_path))
+
+ @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
+ def test_remove_symlink_in_subfolder(self):
+ """Test removing a folder with an contained symlink"""
+ file_path = os.path.join(self.tempdir, *stubs.files[0])
+ dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1]))
+ symlink_path = os.path.join(dir_path, "symlink")
+
+ os.symlink(file_path, symlink_path)
+ self.assertTrue(os.path.islink(symlink_path))
+
+ # The folder with the contained symlink will be deleted but not the
+ # original linked file
+ mozfile.remove(dir_path)
+ self.assertFalse(os.path.exists(dir_path))
+ self.assertFalse(os.path.exists(symlink_path))
+ self.assertTrue(os.path.exists(file_path))
+
+ @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
+ def test_remove_broken_symlink(self):
+ """Test removing a folder with an contained symlink"""
+ file_path = os.path.join(self.tempdir, "readonly.txt")
+ working_link = os.path.join(self.tempdir, "link_to_readonly.txt")
+ broken_link = os.path.join(self.tempdir, "broken_link")
+ os.symlink(file_path, working_link)
+ os.symlink(os.path.join(self.tempdir, "broken.txt"), broken_link)
+
+ self.assertTrue(os.path.exists(file_path))
+ self.assertTrue(os.path.islink(working_link))
+ self.assertTrue(os.path.islink(broken_link))
+
+ mozfile.remove(working_link)
+ self.assertFalse(os.path.lexists(working_link))
+ self.assertTrue(os.path.exists(file_path))
+
+ mozfile.remove(broken_link)
+ self.assertFalse(os.path.lexists(broken_link))
+
+ @unittest.skipIf(
+ mozinfo.isWin or not os.geteuid(),
+ "Symlinks are not supported on Windows and cannot run test as root",
+ )
+ def test_remove_symlink_for_system_path(self):
+ """Test removing a symlink which points to a system folder"""
+ symlink_path = os.path.join(self.tempdir, "symlink")
+
+ os.symlink(os.path.dirname(self.tempdir), symlink_path)
+ self.assertTrue(os.path.islink(symlink_path))
+
+ # The folder with the contained symlink will be deleted but not the
+ # original linked file
+ mozfile.remove(symlink_path)
+ self.assertFalse(os.path.exists(symlink_path))
+
+ def test_remove_path_that_does_not_exists(self):
+ not_existing_path = os.path.join(self.tempdir, "I_do_not_not_exists")
+ try:
+ mozfile.remove(not_existing_path)
+ except OSError as exc:
+ if exc.errno == errno.ENOENT:
+ self.fail("removing non existing path must not raise error")
+ raise
+
+
+class MozFileMoveTestCase(unittest.TestCase):
+ def setUp(self):
+ # Generate a stub
+ self.tempdir = stubs.create_stub()
+ self.addCleanup(mozfile.rmtree, self.tempdir)
+
+ def test_move_file(self):
+ file_path = os.path.join(self.tempdir, *stubs.files[1])
+ moved_path = file_path + ".moved"
+ self.assertTrue(os.path.isfile(file_path))
+ self.assertFalse(os.path.exists(moved_path))
+ mozfile.move(file_path, moved_path)
+ self.assertFalse(os.path.exists(file_path))
+ self.assertTrue(os.path.isfile(moved_path))
+
+ def test_move_file_with_retry(self):
+ file_path = os.path.join(self.tempdir, *stubs.files[1])
+ moved_path = file_path + ".moved"
+
+ with wait_file_opened_in_thread(file_path, 0.2):
+ # first move attempt should fail on windows and be retried
+ mozfile.move(file_path, moved_path)
+ self.assertFalse(os.path.exists(file_path))
+ self.assertTrue(os.path.isfile(moved_path))
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_tempdir.py b/testing/mozbase/mozfile/tests/test_tempdir.py
new file mode 100644
index 0000000000..ba16b478b6
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_tempdir.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+tests for mozfile.TemporaryDirectory
+"""
+
+import os
+import unittest
+
+import mozunit
+from mozfile import TemporaryDirectory
+
+
+class TestTemporaryDirectory(unittest.TestCase):
+ def test_removed(self):
+ """ensure that a TemporaryDirectory gets removed"""
+ path = None
+ with TemporaryDirectory() as tmp:
+ path = tmp
+ self.assertTrue(os.path.isdir(tmp))
+ tmpfile = os.path.join(tmp, "a_temp_file")
+ open(tmpfile, "w").write("data")
+ self.assertTrue(os.path.isfile(tmpfile))
+ self.assertFalse(os.path.isdir(path))
+ self.assertFalse(os.path.exists(path))
+
+ def test_exception(self):
+ """ensure that TemporaryDirectory handles exceptions"""
+ path = None
+ with self.assertRaises(Exception):
+ with TemporaryDirectory() as tmp:
+ path = tmp
+ self.assertTrue(os.path.isdir(tmp))
+ raise Exception("oops")
+ self.assertFalse(os.path.isdir(path))
+ self.assertFalse(os.path.exists(path))
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_tempfile.py b/testing/mozbase/mozfile/tests/test_tempfile.py
new file mode 100644
index 0000000000..3e250d6a76
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_tempfile.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+tests for mozfile.NamedTemporaryFile
+"""
+import os
+import unittest
+
+import mozfile
+import mozunit
+import six
+
+
+class TestNamedTemporaryFile(unittest.TestCase):
+ """test our fix for NamedTemporaryFile"""
+
+ def test_named_temporary_file(self):
+ """Ensure the fix for re-opening a NamedTemporaryFile works
+
+ Refer to https://bugzilla.mozilla.org/show_bug.cgi?id=818777
+ and https://bugzilla.mozilla.org/show_bug.cgi?id=821362
+ """
+
+ test_string = b"A simple test"
+ with mozfile.NamedTemporaryFile() as temp:
+ # Test we can write to file
+ temp.write(test_string)
+ # Forced flush, so that we can read later
+ temp.flush()
+
+ # Test we can open the file again on all platforms
+ self.assertEqual(open(temp.name, "rb").read(), test_string)
+
+ def test_iteration(self):
+ """ensure the line iterator works"""
+
+ # make a file and write to it
+ tf = mozfile.NamedTemporaryFile()
+ notes = [b"doe", b"rae", b"mi"]
+ for note in notes:
+ tf.write(b"%s\n" % note)
+ tf.flush()
+
+ # now read from it
+ tf.seek(0)
+ lines = [line.rstrip(b"\n") for line in tf.readlines()]
+ self.assertEqual(lines, notes)
+
+ # now read from it iteratively
+ lines = []
+ for line in tf:
+ lines.append(line.strip())
+ self.assertEqual(lines, []) # because we did not seek(0)
+ tf.seek(0)
+ lines = []
+ for line in tf:
+ lines.append(line.strip())
+ self.assertEqual(lines, notes)
+
+ def test_delete(self):
+ """ensure ``delete=True/False`` works as expected"""
+
+ # make a deleteable file; ensure it gets cleaned up
+ path = None
+ with mozfile.NamedTemporaryFile(delete=True) as tf:
+ path = tf.name
+ self.assertTrue(isinstance(path, six.string_types))
+ self.assertFalse(os.path.exists(path))
+
+ # it is also deleted when __del__ is called
+ # here we will do so explicitly
+ tf = mozfile.NamedTemporaryFile(delete=True)
+ path = tf.name
+ self.assertTrue(os.path.exists(path))
+ del tf
+ self.assertFalse(os.path.exists(path))
+
+ # Now the same thing but we won't delete the file
+ path = None
+ try:
+ with mozfile.NamedTemporaryFile(delete=False) as tf:
+ path = tf.name
+ self.assertTrue(os.path.exists(path))
+ finally:
+ if path and os.path.exists(path):
+ os.remove(path)
+
+ path = None
+ try:
+ tf = mozfile.NamedTemporaryFile(delete=False)
+ path = tf.name
+ self.assertTrue(os.path.exists(path))
+ del tf
+ self.assertTrue(os.path.exists(path))
+ finally:
+ if path and os.path.exists(path):
+ os.remove(path)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_tree.py b/testing/mozbase/mozfile/tests/test_tree.py
new file mode 100644
index 0000000000..556c1b9139
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_tree.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# coding=UTF-8
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import mozunit
+from mozfile import tree
+
+
+class TestTree(unittest.TestCase):
+ """Test the tree function."""
+
+ def test_unicode_paths(self):
+ """Test creating tree structure from a Unicode path."""
+ try:
+ tmpdir = tempfile.mkdtemp(suffix="tmp🍪")
+ os.mkdir(os.path.join(tmpdir, "dir🍪"))
+ with open(os.path.join(tmpdir, "file🍪"), "w") as f:
+ f.write("foo")
+
+ self.assertEqual("{}\n├file🍪\n└dir🍪".format(tmpdir), tree(tmpdir))
+ finally:
+ shutil.rmtree(tmpdir)
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_url.py b/testing/mozbase/mozfile/tests/test_url.py
new file mode 100755
index 0000000000..a19f5f16a8
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_url.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+
+"""
+tests for is_url
+"""
+import unittest
+
+import mozunit
+from mozfile import is_url
+
+
+class TestIsUrl(unittest.TestCase):
+ """test the is_url function"""
+
+ def test_is_url(self):
+ self.assertTrue(is_url("http://mozilla.org"))
+ self.assertFalse(is_url("/usr/bin/mozilla.org"))
+ self.assertTrue(is_url("file:///usr/bin/mozilla.org"))
+ self.assertFalse(is_url("c:\foo\bar"))
+
+
+if __name__ == "__main__":
+ mozunit.main()
diff --git a/testing/mozbase/mozfile/tests/test_which.py b/testing/mozbase/mozfile/tests/test_which.py
new file mode 100644
index 0000000000..b02f13ccdf
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_which.py
@@ -0,0 +1,63 @@
+# Any copyright is dedicated to the Public Domain.
+# https://creativecommons.org/publicdomain/zero/1.0/
+
+import os
+import sys
+
+import mozunit
+import six
+from mozfile import which
+
+here = os.path.abspath(os.path.dirname(__file__))
+
+
+def test_which(monkeypatch):
+ cwd = os.path.join(here, "files", "which")
+ monkeypatch.chdir(cwd)
+
+ if sys.platform == "win32":
+ if six.PY3:
+ import winreg
+ else:
+ import _winreg as winreg
+ bindir = os.path.join(cwd, "win")
+ monkeypatch.setenv("PATH", bindir)
+ monkeypatch.setattr(winreg, "QueryValue", (lambda k, sk: None))
+
+ assert which("foo.exe").lower() == os.path.join(bindir, "foo.exe").lower()
+ assert which("foo").lower() == os.path.join(bindir, "foo.exe").lower()
+ assert (
+ which("foo", exts=[".FOO", ".BAR"]).lower()
+ == os.path.join(bindir, "foo").lower()
+ )
+ assert os.environ.get("PATHEXT") != [".FOO", ".BAR"]
+ assert which("foo.txt") is None
+
+ assert which("bar").lower() == os.path.join(bindir, "bar").lower()
+ assert which("baz").lower() == os.path.join(cwd, "baz.exe").lower()
+
+ registered_dir = os.path.join(cwd, "registered")
+ quux = os.path.join(registered_dir, "quux.exe").lower()
+
+ def mock_registry(key, subkey):
+ assert subkey == (
+ r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\quux.exe"
+ )
+ return quux
+
+ monkeypatch.setattr(winreg, "QueryValue", mock_registry)
+ assert which("quux").lower() == quux
+ assert which("quux.exe").lower() == quux
+
+ else:
+ bindir = os.path.join(cwd, "unix")
+ monkeypatch.setenv("PATH", bindir)
+ assert which("foo") == os.path.join(bindir, "foo")
+ assert which("baz") is None
+ assert which("baz", exts=[".EXE"]) is None
+ assert "PATHEXT" not in os.environ
+ assert which("file") is None
+
+
+if __name__ == "__main__":
+ mozunit.main()