summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozfile/mozfile/mozfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'testing/mozbase/mozfile/mozfile/mozfile.py')
-rw-r--r--testing/mozbase/mozfile/mozfile/mozfile.py674
1 files changed, 674 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/mozfile/mozfile.py b/testing/mozbase/mozfile/mozfile/mozfile.py
new file mode 100644
index 0000000000..1f4ffa74af
--- /dev/null
+++ b/testing/mozbase/mozfile/mozfile/mozfile.py
@@ -0,0 +1,674 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+# We don't import all modules at the top for performance reasons. See Bug 1008943
+
+import errno
+import os
+import re
+import stat
+import sys
+import time
+import warnings
+from contextlib import contextmanager
+from textwrap import dedent
+
+from six.moves import urllib
+
+__all__ = [
+ "extract_tarball",
+ "extract_zip",
+ "extract",
+ "is_url",
+ "load",
+ "copy_contents",
+ "match",
+ "move",
+ "remove",
+ "rmtree",
+ "tree",
+ "which",
+ "NamedTemporaryFile",
+ "TemporaryDirectory",
+]
+
+# utilities for extracting archives
+
+
+def extract_tarball(src, dest, ignore=None):
+ """extract a .tar file"""
+
+ import tarfile
+
+ def _is_within_directory(directory, target):
+ real_directory = os.path.realpath(directory)
+ real_target = os.path.realpath(target)
+ prefix = os.path.commonprefix([real_directory, real_target])
+ return prefix == real_directory
+
+ with tarfile.open(src) as bundle:
+ namelist = []
+
+ for m in bundle:
+ # Mitigation for CVE-2007-4559, Python's tarfile library will allow
+ # writing files outside of the intended destination.
+ member_path = os.path.join(dest, m.name)
+ if not _is_within_directory(dest, member_path):
+ raise RuntimeError(
+ dedent(
+ f"""
+ Tar bundle '{src}' may be maliciously crafted to escape the destination!
+ The following path was detected:
+
+ {m.name}
+ """
+ )
+ )
+ if m.issym():
+ link_path = os.path.join(os.path.dirname(member_path), m.linkname)
+ if not _is_within_directory(dest, link_path):
+ raise RuntimeError(
+ dedent(
+ f"""
+ Tar bundle '{src}' may be maliciously crafted to escape the destination!
+ The following path was detected:
+
+ {m.name}
+ """
+ )
+ )
+
+ if m.mode & (stat.S_ISUID | stat.S_ISGID):
+ raise RuntimeError(
+ dedent(
+ f"""
+ Tar bundle '{src}' may be maliciously crafted to setuid/setgid!
+ The following path was detected:
+
+ {m.name}
+ """
+ )
+ )
+
+ if ignore and any(match(m.name, i) for i in ignore):
+ continue
+ bundle.extract(m, path=dest)
+ namelist.append(m.name)
+
+ return namelist
+
+
+def extract_zip(src, dest, ignore=None):
+ """extract a zip file"""
+
+ import zipfile
+
+ if isinstance(src, zipfile.ZipFile):
+ bundle = src
+ else:
+ try:
+ bundle = zipfile.ZipFile(src)
+ except Exception:
+ print("src: %s" % src)
+ raise
+
+ namelist = bundle.namelist()
+
+ for name in namelist:
+ if ignore and any(match(name, i) for i in ignore):
+ continue
+
+ bundle.extract(name, dest)
+ filename = os.path.realpath(os.path.join(dest, name))
+ mode = bundle.getinfo(name).external_attr >> 16 & 0x1FF
+ # Only update permissions if attributes are set. Otherwise fallback to the defaults.
+ if mode:
+ os.chmod(filename, mode)
+ bundle.close()
+ return namelist
+
+
+def extract(src, dest=None, ignore=None):
+ """
+ Takes in a tar or zip file and extracts it to dest
+
+ If dest is not specified, extracts to os.path.dirname(src)
+
+ Returns the list of top level files that were extracted
+ """
+
+ import tarfile
+ import zipfile
+
+ assert os.path.exists(src), "'%s' does not exist" % src
+
+ if dest is None:
+ dest = os.path.dirname(src)
+ elif not os.path.isdir(dest):
+ os.makedirs(dest)
+ assert not os.path.isfile(dest), "dest cannot be a file"
+
+ if tarfile.is_tarfile(src):
+ namelist = extract_tarball(src, dest, ignore=ignore)
+ elif zipfile.is_zipfile(src):
+ namelist = extract_zip(src, dest, ignore=ignore)
+ else:
+ raise Exception("mozfile.extract: no archive format found for '%s'" % src)
+
+ # namelist returns paths with forward slashes even in windows
+ top_level_files = [
+ os.path.join(dest, name.rstrip("/"))
+ for name in namelist
+ if len(name.rstrip("/").split("/")) == 1
+ ]
+
+ # namelist doesn't include folders, append these to the list
+ for name in namelist:
+ index = name.find("/")
+ if index != -1:
+ root = os.path.join(dest, name[:index])
+ if root not in top_level_files:
+ top_level_files.append(root)
+
+ return top_level_files
+
+
+# utilities for removal of files and directories
+
+
+def rmtree(dir):
+ """Deprecated wrapper method to remove a directory tree.
+
+ Ensure to update your code to use mozfile.remove() directly
+
+ :param dir: directory to be removed
+ """
+
+ warnings.warn(
+ "mozfile.rmtree() is deprecated in favor of mozfile.remove()",
+ PendingDeprecationWarning,
+ stacklevel=2,
+ )
+ return remove(dir)
+
+
+def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5):
+ """
+ It's possible to see spurious errors on Windows due to various things
+ keeping a handle to the directory open (explorer, virus scanners, etc)
+ So we try a few times if it fails with a known error.
+ retry_delay is multiplied by the number of failed attempts to increase
+ the likelihood of success in subsequent attempts.
+ """
+ retry_count = 0
+ while True:
+ try:
+ func(*args)
+ except OSError as e:
+ # Error codes are defined in:
+ # http://docs.python.org/2/library/errno.html#module-errno
+ if e.errno not in (errno.EACCES, errno.ENOTEMPTY):
+ raise
+
+ if retry_count == retry_max:
+ raise
+
+ retry_count += 1
+
+ print(
+ '%s() failed for "%s". Reason: %s (%s). Retrying...'
+ % (func.__name__, args, e.strerror, e.errno)
+ )
+ time.sleep(retry_count * retry_delay)
+ else:
+ # If no exception has been thrown it should be done
+ break
+
+
+def remove(path):
+ """Removes the specified file, link, or directory tree.
+
+ This is a replacement for shutil.rmtree that works better under
+ windows. It does the following things:
+
+ - check path access for the current user before trying to remove
+ - retry operations on some known errors due to various things keeping
+ a handle on file paths - like explorer, virus scanners, etc. The
+ known errors are errno.EACCES and errno.ENOTEMPTY, and it will
+ retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds
+ between each attempt.
+
+ Note that no error will be raised if the given path does not exists.
+
+ :param path: path to be removed
+ """
+
+ import shutil
+
+ def _call_with_windows_retry(*args, **kwargs):
+ try:
+ _call_windows_retry(*args, **kwargs)
+ except OSError as e:
+ # The file or directory to be removed doesn't exist anymore
+ if e.errno != errno.ENOENT:
+ raise
+
+ def _update_permissions(path):
+ """Sets specified pemissions depending on filetype"""
+ if os.path.islink(path):
+ # Path is a symlink which we don't have to modify
+ # because it should already have all the needed permissions
+ return
+
+ stats = os.stat(path)
+
+ if os.path.isfile(path):
+ mode = stats.st_mode | stat.S_IWUSR
+ elif os.path.isdir(path):
+ mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR
+ else:
+ # Not supported type
+ return
+
+ _call_with_windows_retry(os.chmod, (path, mode))
+
+ if not os.path.lexists(path):
+ return
+
+ """
+ On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access
+ files or directories that exceed MAX_PATH(260) limitation or that ends
+ with a period.
+ """
+ if (
+ sys.platform in ("win32", "cygwin")
+ and len(path) >= 3
+ and path[1] == ":"
+ and path[2] == "\\"
+ ):
+ path = "\\\\?\\%s" % path
+
+ if os.path.isfile(path) or os.path.islink(path):
+ # Verify the file or link is read/write for the current user
+ _update_permissions(path)
+ _call_with_windows_retry(os.remove, (path,))
+
+ elif os.path.isdir(path):
+ # Verify the directory is read/write/execute for the current user
+ _update_permissions(path)
+
+ # We're ensuring that every nested item has writable permission.
+ for root, dirs, files in os.walk(path):
+ for entry in dirs + files:
+ _update_permissions(os.path.join(root, entry))
+ _call_with_windows_retry(shutil.rmtree, (path,))
+
+
+def copy_contents(srcdir, dstdir):
+ """
+ Copy the contents of the srcdir into the dstdir, preserving
+ subdirectories.
+
+ If an existing file of the same name exists in dstdir, it will be overwritten.
+ """
+ import shutil
+
+ # dirs_exist_ok was introduced in Python 3.8
+ # On earlier versions, or Windows, use the verbose mechanism.
+ # We use it on Windows because _call_with_windows_retry doesn't allow
+ # named arguments to be passed.
+ if (sys.version_info.major < 3 or sys.version_info.minor < 8) or (os.name == "nt"):
+ names = os.listdir(srcdir)
+ if not os.path.isdir(dstdir):
+ os.makedirs(dstdir)
+ errors = []
+ for name in names:
+ srcname = os.path.join(srcdir, name)
+ dstname = os.path.join(dstdir, name)
+ try:
+ if os.path.islink(srcname):
+ linkto = os.readlink(srcname)
+ os.symlink(linkto, dstname)
+ elif os.path.isdir(srcname):
+ copy_contents(srcname, dstname)
+ else:
+ _call_windows_retry(shutil.copy2, (srcname, dstname))
+ except OSError as why:
+ errors.append((srcname, dstname, str(why)))
+ except Exception as err:
+ errors.extend(err)
+ try:
+ _call_windows_retry(shutil.copystat, (srcdir, dstdir))
+ except OSError as why:
+ if why.winerror is None:
+ errors.extend((srcdir, dstdir, str(why)))
+ if errors:
+ raise Exception(errors)
+ else:
+ shutil.copytree(srcdir, dstdir, dirs_exist_ok=True)
+
+
+def move(src, dst):
+ """
+ Move a file or directory path.
+
+ This is a replacement for shutil.move that works better under windows,
+ retrying operations on some known errors due to various things keeping
+ a handle on file paths.
+ """
+ import shutil
+
+ _call_windows_retry(shutil.move, (src, dst))
+
+
+def depth(directory):
+ """returns the integer depth of a directory or path relative to '/'"""
+
+ directory = os.path.abspath(directory)
+ level = 0
+ while True:
+ directory, remainder = os.path.split(directory)
+ level += 1
+ if not remainder:
+ break
+ return level
+
+
+def tree(directory, sort_key=lambda x: x.lower()):
+ """Display tree directory structure for `directory`."""
+ vertical_line = "│"
+ item_marker = "├"
+ last_child = "└"
+
+ retval = []
+ indent = []
+ last = {}
+ top = depth(directory)
+
+ for dirpath, dirnames, filenames in os.walk(directory, topdown=True):
+
+ abspath = os.path.abspath(dirpath)
+ basename = os.path.basename(abspath)
+ parent = os.path.dirname(abspath)
+ level = depth(abspath) - top
+
+ # sort articles of interest
+ for resource in (dirnames, filenames):
+ resource[:] = sorted(resource, key=sort_key)
+
+ if level > len(indent):
+ indent.append(vertical_line)
+ indent = indent[:level]
+
+ if dirnames:
+ files_end = item_marker
+ last[abspath] = dirnames[-1]
+ else:
+ files_end = last_child
+
+ if last.get(parent) == os.path.basename(abspath):
+ # last directory of parent
+ dirpath_mark = last_child
+ indent[-1] = " "
+ elif not indent:
+ dirpath_mark = ""
+ else:
+ dirpath_mark = item_marker
+
+ # append the directory and piece of tree structure
+ # if the top-level entry directory, print as passed
+ retval.append(
+ "%s%s%s"
+ % ("".join(indent[:-1]), dirpath_mark, basename if retval else directory)
+ )
+ # add the files
+ if filenames:
+ last_file = filenames[-1]
+ retval.extend(
+ [
+ (
+ "%s%s%s"
+ % (
+ "".join(indent),
+ files_end if filename == last_file else item_marker,
+ filename,
+ )
+ )
+ for index, filename in enumerate(filenames)
+ ]
+ )
+
+ return "\n".join(retval)
+
+
+def which(cmd, mode=os.F_OK | os.X_OK, path=None, exts=None, extra_search_dirs=()):
+ """A wrapper around `shutil.which` to make the behavior on Windows
+ consistent with other platforms.
+
+ On non-Windows platforms, this is a direct call to `shutil.which`. On
+ Windows, this:
+
+ * Ensures that `cmd` without an extension will be found. Previously it was
+ only found if it had an extension in `PATHEXT`.
+ * Ensures the absolute path to the binary is returned. Previously if the
+ binary was found in `cwd`, a relative path was returned.
+ * Checks the Windows registry if shutil.which doesn't come up with anything.
+
+ The arguments are the same as the ones in `shutil.which`. In addition there
+ is an `exts` argument that only has an effect on Windows. This is used to
+ set a custom value for PATHEXT and is formatted as a list of file
+ extensions.
+
+ extra_search_dirs is a convenience argument. If provided, the strings in
+ the sequence will be appended to the END of the given `path`.
+ """
+ from shutil import which as shutil_which
+
+ if isinstance(path, (list, tuple)):
+ path = os.pathsep.join(path)
+
+ if not path:
+ path = os.environ.get("PATH", os.defpath)
+
+ if extra_search_dirs:
+ path = os.pathsep.join([path] + list(extra_search_dirs))
+
+ if sys.platform != "win32":
+ return shutil_which(cmd, mode=mode, path=path)
+
+ oldexts = os.environ.get("PATHEXT", "")
+ if not exts:
+ exts = oldexts.split(os.pathsep)
+
+ # This ensures that `cmd` without any extensions will be found.
+ # See: https://bugs.python.org/issue31405
+ if "." not in exts:
+ exts.append(".")
+
+ os.environ["PATHEXT"] = os.pathsep.join(exts)
+ try:
+ path = shutil_which(cmd, mode=mode, path=path)
+ if path:
+ return os.path.abspath(path.rstrip("."))
+ finally:
+ if oldexts:
+ os.environ["PATHEXT"] = oldexts
+ else:
+ del os.environ["PATHEXT"]
+
+ # If we've gotten this far, we need to check for registered executables
+ # before giving up.
+ try:
+ import winreg
+ except ImportError:
+ import _winreg as winreg
+ if not cmd.lower().endswith(".exe"):
+ cmd += ".exe"
+ try:
+ ret = winreg.QueryValue(
+ winreg.HKEY_LOCAL_MACHINE,
+ r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\%s" % cmd,
+ )
+ return os.path.abspath(ret) if ret else None
+ except winreg.error:
+ return None
+
+
+# utilities for temporary resources
+
+
+class NamedTemporaryFile(object):
+ """
+ Like tempfile.NamedTemporaryFile except it works on Windows
+ in the case where you open the created file a second time.
+
+ This behaves very similarly to tempfile.NamedTemporaryFile but may
+ not behave exactly the same. For example, this function does not
+ prevent fd inheritance by children.
+
+ Example usage:
+
+ with NamedTemporaryFile() as fh:
+ fh.write(b'foobar')
+
+ print('Filename: %s' % fh.name)
+
+ see https://bugzilla.mozilla.org/show_bug.cgi?id=821362
+ """
+
+ def __init__(
+ self, mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
+ ):
+
+ import tempfile
+
+ fd, path = tempfile.mkstemp(suffix, prefix, dir, "t" in mode)
+ os.close(fd)
+
+ self.file = open(path, mode)
+ self._path = path
+ self._delete = delete
+ self._unlinked = False
+
+ def __getattr__(self, k):
+ return getattr(self.__dict__["file"], k)
+
+ def __iter__(self):
+ return self.__dict__["file"]
+
+ def __enter__(self):
+ self.file.__enter__()
+ return self
+
+ def __exit__(self, exc, value, tb):
+ self.file.__exit__(exc, value, tb)
+ if self.__dict__["_delete"]:
+ os.unlink(self.__dict__["_path"])
+ self._unlinked = True
+
+ def __del__(self):
+ if self.__dict__["_unlinked"]:
+ return
+ self.file.__exit__(None, None, None)
+ if self.__dict__["_delete"]:
+ os.unlink(self.__dict__["_path"])
+
+
+@contextmanager
+def TemporaryDirectory():
+ """
+ create a temporary directory using tempfile.mkdtemp, and then clean it up.
+
+ Example usage:
+ with TemporaryDirectory() as tmp:
+ open(os.path.join(tmp, "a_temp_file"), "w").write("data")
+
+ """
+
+ import shutil
+ import tempfile
+
+ tempdir = tempfile.mkdtemp()
+ try:
+ yield tempdir
+ finally:
+ shutil.rmtree(tempdir)
+
+
+# utilities dealing with URLs
+
+
+def is_url(thing):
+ """
+ Return True if thing looks like a URL.
+ """
+
+ parsed = urllib.parse.urlparse(thing)
+ if "scheme" in parsed:
+ return len(parsed.scheme) >= 2
+ else:
+ return len(parsed[0]) >= 2
+
+
+def load(resource):
+ """
+ open a file or URL for reading. If the passed resource string is not a URL,
+ or begins with 'file://', return a ``file``. Otherwise, return the
+ result of urllib.urlopen()
+ """
+
+ # handle file URLs separately due to python stdlib limitations
+ if resource.startswith("file://"):
+ resource = resource[len("file://") :]
+
+ if not is_url(resource):
+ # if no scheme is given, it is a file path
+ return open(resource)
+
+ return urllib.request.urlopen(resource)
+
+
+# We can't depend on mozpack.path here, so copy the 'match' function over.
+
+re_cache = {}
+# Python versions < 3.7 return r'\/' for re.escape('/').
+if re.escape("/") == "/":
+ MATCH_STAR_STAR_RE = re.compile(r"(^|/)\\\*\\\*/")
+ MATCH_STAR_STAR_END_RE = re.compile(r"(^|/)\\\*\\\*$")
+else:
+ MATCH_STAR_STAR_RE = re.compile(r"(^|\\\/)\\\*\\\*\\\/")
+ MATCH_STAR_STAR_END_RE = re.compile(r"(^|\\\/)\\\*\\\*$")
+
+
+def match(path, pattern):
+ """
+ Return whether the given path matches the given pattern.
+ An asterisk can be used to match any string, including the null string, in
+ one part of the path:
+
+ ``foo`` matches ``*``, ``f*`` or ``fo*o``
+
+ However, an asterisk matching a subdirectory may not match the null string:
+
+ ``foo/bar`` does *not* match ``foo/*/bar``
+
+ If the pattern matches one of the ancestor directories of the path, the
+ patch is considered matching:
+
+ ``foo/bar`` matches ``foo``
+
+ Two adjacent asterisks can be used to match files and zero or more
+ directories and subdirectories.
+
+ ``foo/bar`` matches ``foo/**/bar``, or ``**/bar``
+ """
+ if not pattern:
+ return True
+ if pattern not in re_cache:
+ p = re.escape(pattern)
+ p = MATCH_STAR_STAR_RE.sub(r"\1(?:.+/)?", p)
+ p = MATCH_STAR_STAR_END_RE.sub(r"(?:\1.+)?", p)
+ p = p.replace(r"\*", "[^/]*") + "(?:/.*)?$"
+ re_cache[pattern] = re.compile(p)
+ return re_cache[pattern].match(path) is not None