diff options
Diffstat (limited to 'testing/mozbase/mozfile/mozfile/mozfile.py')
-rw-r--r-- | testing/mozbase/mozfile/mozfile/mozfile.py | 674 |
1 files changed, 674 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/mozfile/mozfile.py b/testing/mozbase/mozfile/mozfile/mozfile.py new file mode 100644 index 0000000000..1f4ffa74af --- /dev/null +++ b/testing/mozbase/mozfile/mozfile/mozfile.py @@ -0,0 +1,674 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +# We don't import all modules at the top for performance reasons. See Bug 1008943 + +import errno +import os +import re +import stat +import sys +import time +import warnings +from contextlib import contextmanager +from textwrap import dedent + +from six.moves import urllib + +__all__ = [ + "extract_tarball", + "extract_zip", + "extract", + "is_url", + "load", + "copy_contents", + "match", + "move", + "remove", + "rmtree", + "tree", + "which", + "NamedTemporaryFile", + "TemporaryDirectory", +] + +# utilities for extracting archives + + +def extract_tarball(src, dest, ignore=None): + """extract a .tar file""" + + import tarfile + + def _is_within_directory(directory, target): + real_directory = os.path.realpath(directory) + real_target = os.path.realpath(target) + prefix = os.path.commonprefix([real_directory, real_target]) + return prefix == real_directory + + with tarfile.open(src) as bundle: + namelist = [] + + for m in bundle: + # Mitigation for CVE-2007-4559, Python's tarfile library will allow + # writing files outside of the intended destination. + member_path = os.path.join(dest, m.name) + if not _is_within_directory(dest, member_path): + raise RuntimeError( + dedent( + f""" + Tar bundle '{src}' may be maliciously crafted to escape the destination! + The following path was detected: + + {m.name} + """ + ) + ) + if m.issym(): + link_path = os.path.join(os.path.dirname(member_path), m.linkname) + if not _is_within_directory(dest, link_path): + raise RuntimeError( + dedent( + f""" + Tar bundle '{src}' may be maliciously crafted to escape the destination! + The following path was detected: + + {m.name} + """ + ) + ) + + if m.mode & (stat.S_ISUID | stat.S_ISGID): + raise RuntimeError( + dedent( + f""" + Tar bundle '{src}' may be maliciously crafted to setuid/setgid! + The following path was detected: + + {m.name} + """ + ) + ) + + if ignore and any(match(m.name, i) for i in ignore): + continue + bundle.extract(m, path=dest) + namelist.append(m.name) + + return namelist + + +def extract_zip(src, dest, ignore=None): + """extract a zip file""" + + import zipfile + + if isinstance(src, zipfile.ZipFile): + bundle = src + else: + try: + bundle = zipfile.ZipFile(src) + except Exception: + print("src: %s" % src) + raise + + namelist = bundle.namelist() + + for name in namelist: + if ignore and any(match(name, i) for i in ignore): + continue + + bundle.extract(name, dest) + filename = os.path.realpath(os.path.join(dest, name)) + mode = bundle.getinfo(name).external_attr >> 16 & 0x1FF + # Only update permissions if attributes are set. Otherwise fallback to the defaults. + if mode: + os.chmod(filename, mode) + bundle.close() + return namelist + + +def extract(src, dest=None, ignore=None): + """ + Takes in a tar or zip file and extracts it to dest + + If dest is not specified, extracts to os.path.dirname(src) + + Returns the list of top level files that were extracted + """ + + import tarfile + import zipfile + + assert os.path.exists(src), "'%s' does not exist" % src + + if dest is None: + dest = os.path.dirname(src) + elif not os.path.isdir(dest): + os.makedirs(dest) + assert not os.path.isfile(dest), "dest cannot be a file" + + if tarfile.is_tarfile(src): + namelist = extract_tarball(src, dest, ignore=ignore) + elif zipfile.is_zipfile(src): + namelist = extract_zip(src, dest, ignore=ignore) + else: + raise Exception("mozfile.extract: no archive format found for '%s'" % src) + + # namelist returns paths with forward slashes even in windows + top_level_files = [ + os.path.join(dest, name.rstrip("/")) + for name in namelist + if len(name.rstrip("/").split("/")) == 1 + ] + + # namelist doesn't include folders, append these to the list + for name in namelist: + index = name.find("/") + if index != -1: + root = os.path.join(dest, name[:index]) + if root not in top_level_files: + top_level_files.append(root) + + return top_level_files + + +# utilities for removal of files and directories + + +def rmtree(dir): + """Deprecated wrapper method to remove a directory tree. + + Ensure to update your code to use mozfile.remove() directly + + :param dir: directory to be removed + """ + + warnings.warn( + "mozfile.rmtree() is deprecated in favor of mozfile.remove()", + PendingDeprecationWarning, + stacklevel=2, + ) + return remove(dir) + + +def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): + """ + It's possible to see spurious errors on Windows due to various things + keeping a handle to the directory open (explorer, virus scanners, etc) + So we try a few times if it fails with a known error. + retry_delay is multiplied by the number of failed attempts to increase + the likelihood of success in subsequent attempts. + """ + retry_count = 0 + while True: + try: + func(*args) + except OSError as e: + # Error codes are defined in: + # http://docs.python.org/2/library/errno.html#module-errno + if e.errno not in (errno.EACCES, errno.ENOTEMPTY): + raise + + if retry_count == retry_max: + raise + + retry_count += 1 + + print( + '%s() failed for "%s". Reason: %s (%s). Retrying...' + % (func.__name__, args, e.strerror, e.errno) + ) + time.sleep(retry_count * retry_delay) + else: + # If no exception has been thrown it should be done + break + + +def remove(path): + """Removes the specified file, link, or directory tree. + + This is a replacement for shutil.rmtree that works better under + windows. It does the following things: + + - check path access for the current user before trying to remove + - retry operations on some known errors due to various things keeping + a handle on file paths - like explorer, virus scanners, etc. The + known errors are errno.EACCES and errno.ENOTEMPTY, and it will + retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds + between each attempt. + + Note that no error will be raised if the given path does not exists. + + :param path: path to be removed + """ + + import shutil + + def _call_with_windows_retry(*args, **kwargs): + try: + _call_windows_retry(*args, **kwargs) + except OSError as e: + # The file or directory to be removed doesn't exist anymore + if e.errno != errno.ENOENT: + raise + + def _update_permissions(path): + """Sets specified pemissions depending on filetype""" + if os.path.islink(path): + # Path is a symlink which we don't have to modify + # because it should already have all the needed permissions + return + + stats = os.stat(path) + + if os.path.isfile(path): + mode = stats.st_mode | stat.S_IWUSR + elif os.path.isdir(path): + mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR + else: + # Not supported type + return + + _call_with_windows_retry(os.chmod, (path, mode)) + + if not os.path.lexists(path): + return + + """ + On Windows, adds '\\\\?\\' to paths which match ^[A-Za-z]:\\.* to access + files or directories that exceed MAX_PATH(260) limitation or that ends + with a period. + """ + if ( + sys.platform in ("win32", "cygwin") + and len(path) >= 3 + and path[1] == ":" + and path[2] == "\\" + ): + path = "\\\\?\\%s" % path + + if os.path.isfile(path) or os.path.islink(path): + # Verify the file or link is read/write for the current user + _update_permissions(path) + _call_with_windows_retry(os.remove, (path,)) + + elif os.path.isdir(path): + # Verify the directory is read/write/execute for the current user + _update_permissions(path) + + # We're ensuring that every nested item has writable permission. + for root, dirs, files in os.walk(path): + for entry in dirs + files: + _update_permissions(os.path.join(root, entry)) + _call_with_windows_retry(shutil.rmtree, (path,)) + + +def copy_contents(srcdir, dstdir): + """ + Copy the contents of the srcdir into the dstdir, preserving + subdirectories. + + If an existing file of the same name exists in dstdir, it will be overwritten. + """ + import shutil + + # dirs_exist_ok was introduced in Python 3.8 + # On earlier versions, or Windows, use the verbose mechanism. + # We use it on Windows because _call_with_windows_retry doesn't allow + # named arguments to be passed. + if (sys.version_info.major < 3 or sys.version_info.minor < 8) or (os.name == "nt"): + names = os.listdir(srcdir) + if not os.path.isdir(dstdir): + os.makedirs(dstdir) + errors = [] + for name in names: + srcname = os.path.join(srcdir, name) + dstname = os.path.join(dstdir, name) + try: + if os.path.islink(srcname): + linkto = os.readlink(srcname) + os.symlink(linkto, dstname) + elif os.path.isdir(srcname): + copy_contents(srcname, dstname) + else: + _call_windows_retry(shutil.copy2, (srcname, dstname)) + except OSError as why: + errors.append((srcname, dstname, str(why))) + except Exception as err: + errors.extend(err) + try: + _call_windows_retry(shutil.copystat, (srcdir, dstdir)) + except OSError as why: + if why.winerror is None: + errors.extend((srcdir, dstdir, str(why))) + if errors: + raise Exception(errors) + else: + shutil.copytree(srcdir, dstdir, dirs_exist_ok=True) + + +def move(src, dst): + """ + Move a file or directory path. + + This is a replacement for shutil.move that works better under windows, + retrying operations on some known errors due to various things keeping + a handle on file paths. + """ + import shutil + + _call_windows_retry(shutil.move, (src, dst)) + + +def depth(directory): + """returns the integer depth of a directory or path relative to '/'""" + + directory = os.path.abspath(directory) + level = 0 + while True: + directory, remainder = os.path.split(directory) + level += 1 + if not remainder: + break + return level + + +def tree(directory, sort_key=lambda x: x.lower()): + """Display tree directory structure for `directory`.""" + vertical_line = "│" + item_marker = "├" + last_child = "└" + + retval = [] + indent = [] + last = {} + top = depth(directory) + + for dirpath, dirnames, filenames in os.walk(directory, topdown=True): + + abspath = os.path.abspath(dirpath) + basename = os.path.basename(abspath) + parent = os.path.dirname(abspath) + level = depth(abspath) - top + + # sort articles of interest + for resource in (dirnames, filenames): + resource[:] = sorted(resource, key=sort_key) + + if level > len(indent): + indent.append(vertical_line) + indent = indent[:level] + + if dirnames: + files_end = item_marker + last[abspath] = dirnames[-1] + else: + files_end = last_child + + if last.get(parent) == os.path.basename(abspath): + # last directory of parent + dirpath_mark = last_child + indent[-1] = " " + elif not indent: + dirpath_mark = "" + else: + dirpath_mark = item_marker + + # append the directory and piece of tree structure + # if the top-level entry directory, print as passed + retval.append( + "%s%s%s" + % ("".join(indent[:-1]), dirpath_mark, basename if retval else directory) + ) + # add the files + if filenames: + last_file = filenames[-1] + retval.extend( + [ + ( + "%s%s%s" + % ( + "".join(indent), + files_end if filename == last_file else item_marker, + filename, + ) + ) + for index, filename in enumerate(filenames) + ] + ) + + return "\n".join(retval) + + +def which(cmd, mode=os.F_OK | os.X_OK, path=None, exts=None, extra_search_dirs=()): + """A wrapper around `shutil.which` to make the behavior on Windows + consistent with other platforms. + + On non-Windows platforms, this is a direct call to `shutil.which`. On + Windows, this: + + * Ensures that `cmd` without an extension will be found. Previously it was + only found if it had an extension in `PATHEXT`. + * Ensures the absolute path to the binary is returned. Previously if the + binary was found in `cwd`, a relative path was returned. + * Checks the Windows registry if shutil.which doesn't come up with anything. + + The arguments are the same as the ones in `shutil.which`. In addition there + is an `exts` argument that only has an effect on Windows. This is used to + set a custom value for PATHEXT and is formatted as a list of file + extensions. + + extra_search_dirs is a convenience argument. If provided, the strings in + the sequence will be appended to the END of the given `path`. + """ + from shutil import which as shutil_which + + if isinstance(path, (list, tuple)): + path = os.pathsep.join(path) + + if not path: + path = os.environ.get("PATH", os.defpath) + + if extra_search_dirs: + path = os.pathsep.join([path] + list(extra_search_dirs)) + + if sys.platform != "win32": + return shutil_which(cmd, mode=mode, path=path) + + oldexts = os.environ.get("PATHEXT", "") + if not exts: + exts = oldexts.split(os.pathsep) + + # This ensures that `cmd` without any extensions will be found. + # See: https://bugs.python.org/issue31405 + if "." not in exts: + exts.append(".") + + os.environ["PATHEXT"] = os.pathsep.join(exts) + try: + path = shutil_which(cmd, mode=mode, path=path) + if path: + return os.path.abspath(path.rstrip(".")) + finally: + if oldexts: + os.environ["PATHEXT"] = oldexts + else: + del os.environ["PATHEXT"] + + # If we've gotten this far, we need to check for registered executables + # before giving up. + try: + import winreg + except ImportError: + import _winreg as winreg + if not cmd.lower().endswith(".exe"): + cmd += ".exe" + try: + ret = winreg.QueryValue( + winreg.HKEY_LOCAL_MACHINE, + r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\%s" % cmd, + ) + return os.path.abspath(ret) if ret else None + except winreg.error: + return None + + +# utilities for temporary resources + + +class NamedTemporaryFile(object): + """ + Like tempfile.NamedTemporaryFile except it works on Windows + in the case where you open the created file a second time. + + This behaves very similarly to tempfile.NamedTemporaryFile but may + not behave exactly the same. For example, this function does not + prevent fd inheritance by children. + + Example usage: + + with NamedTemporaryFile() as fh: + fh.write(b'foobar') + + print('Filename: %s' % fh.name) + + see https://bugzilla.mozilla.org/show_bug.cgi?id=821362 + """ + + def __init__( + self, mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True + ): + + import tempfile + + fd, path = tempfile.mkstemp(suffix, prefix, dir, "t" in mode) + os.close(fd) + + self.file = open(path, mode) + self._path = path + self._delete = delete + self._unlinked = False + + def __getattr__(self, k): + return getattr(self.__dict__["file"], k) + + def __iter__(self): + return self.__dict__["file"] + + def __enter__(self): + self.file.__enter__() + return self + + def __exit__(self, exc, value, tb): + self.file.__exit__(exc, value, tb) + if self.__dict__["_delete"]: + os.unlink(self.__dict__["_path"]) + self._unlinked = True + + def __del__(self): + if self.__dict__["_unlinked"]: + return + self.file.__exit__(None, None, None) + if self.__dict__["_delete"]: + os.unlink(self.__dict__["_path"]) + + +@contextmanager +def TemporaryDirectory(): + """ + create a temporary directory using tempfile.mkdtemp, and then clean it up. + + Example usage: + with TemporaryDirectory() as tmp: + open(os.path.join(tmp, "a_temp_file"), "w").write("data") + + """ + + import shutil + import tempfile + + tempdir = tempfile.mkdtemp() + try: + yield tempdir + finally: + shutil.rmtree(tempdir) + + +# utilities dealing with URLs + + +def is_url(thing): + """ + Return True if thing looks like a URL. + """ + + parsed = urllib.parse.urlparse(thing) + if "scheme" in parsed: + return len(parsed.scheme) >= 2 + else: + return len(parsed[0]) >= 2 + + +def load(resource): + """ + open a file or URL for reading. If the passed resource string is not a URL, + or begins with 'file://', return a ``file``. Otherwise, return the + result of urllib.urlopen() + """ + + # handle file URLs separately due to python stdlib limitations + if resource.startswith("file://"): + resource = resource[len("file://") :] + + if not is_url(resource): + # if no scheme is given, it is a file path + return open(resource) + + return urllib.request.urlopen(resource) + + +# We can't depend on mozpack.path here, so copy the 'match' function over. + +re_cache = {} +# Python versions < 3.7 return r'\/' for re.escape('/'). +if re.escape("/") == "/": + MATCH_STAR_STAR_RE = re.compile(r"(^|/)\\\*\\\*/") + MATCH_STAR_STAR_END_RE = re.compile(r"(^|/)\\\*\\\*$") +else: + MATCH_STAR_STAR_RE = re.compile(r"(^|\\\/)\\\*\\\*\\\/") + MATCH_STAR_STAR_END_RE = re.compile(r"(^|\\\/)\\\*\\\*$") + + +def match(path, pattern): + """ + Return whether the given path matches the given pattern. + An asterisk can be used to match any string, including the null string, in + one part of the path: + + ``foo`` matches ``*``, ``f*`` or ``fo*o`` + + However, an asterisk matching a subdirectory may not match the null string: + + ``foo/bar`` does *not* match ``foo/*/bar`` + + If the pattern matches one of the ancestor directories of the path, the + patch is considered matching: + + ``foo/bar`` matches ``foo`` + + Two adjacent asterisks can be used to match files and zero or more + directories and subdirectories. + + ``foo/bar`` matches ``foo/**/bar``, or ``**/bar`` + """ + if not pattern: + return True + if pattern not in re_cache: + p = re.escape(pattern) + p = MATCH_STAR_STAR_RE.sub(r"\1(?:.+/)?", p) + p = MATCH_STAR_STAR_END_RE.sub(r"(?:\1.+)?", p) + p = p.replace(r"\*", "[^/]*") + "(?:/.*)?$" + re_cache[pattern] = re.compile(p) + return re_cache[pattern].match(path) is not None |