summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/tools/wpt/utils.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /testing/web-platform/tests/tools/wpt/utils.py
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/web-platform/tests/tools/wpt/utils.py')
-rw-r--r--testing/web-platform/tests/tools/wpt/utils.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/testing/web-platform/tests/tools/wpt/utils.py b/testing/web-platform/tests/tools/wpt/utils.py
new file mode 100644
index 0000000000..b015b95e1a
--- /dev/null
+++ b/testing/web-platform/tests/tools/wpt/utils.py
@@ -0,0 +1,168 @@
+# mypy: allow-untyped-defs
+
+import errno
+import logging
+import os
+import sys
+import shutil
+import stat
+import subprocess
+import tarfile
+import time
+import zipfile
+from io import BytesIO
+from socket import error as SocketError # NOQA: N812
+from urllib.request import urlopen
+
+logger = logging.getLogger(__name__)
+
+
+def call(*args):
+ """Log terminal command, invoke it as a subprocess.
+
+ Returns a bytestring of the subprocess output if no error.
+ """
+ logger.debug(" ".join(args))
+ try:
+ return subprocess.check_output(args).decode('utf8')
+ except subprocess.CalledProcessError as e:
+ logger.critical("%s exited with return code %i" %
+ (e.cmd, e.returncode))
+ logger.critical(e.output)
+ raise
+
+
+def seekable(fileobj):
+ """Attempt to use file.seek on given file, with fallbacks."""
+ try:
+ fileobj.seek(fileobj.tell())
+ except Exception:
+ return BytesIO(fileobj.read())
+ else:
+ return fileobj
+
+
+def untar(fileobj, dest="."):
+ """Extract tar archive."""
+ logger.debug("untar")
+ fileobj = seekable(fileobj)
+ with tarfile.open(fileobj=fileobj) as tar_data:
+ tar_data.extractall(path=dest)
+
+
+def unzip(fileobj, dest=None, limit=None):
+ """Extract zip archive."""
+ logger.debug("unzip")
+ fileobj = seekable(fileobj)
+ with zipfile.ZipFile(fileobj) as zip_data:
+ for info in zip_data.infolist():
+ if limit is not None and info.filename not in limit:
+ continue
+ # external_attr has a size of 4 bytes and the info it contains depends on the system where the ZIP file was created.
+ # - If the Zipfile was created on an UNIX environment, then the 2 highest bytes represent UNIX permissions and file
+ # type bits (sys/stat.h st_mode entry on struct stat) and the lowest byte represents DOS FAT compatibility attributes
+ # (used mainly to store the directory bit).
+ # - If the ZipFile was created on a WIN/DOS environment then the lowest byte represents DOS FAT file attributes
+ # (those attributes are: directory bit, hidden bit, read-only bit, system-file bit, etc).
+ # More info at https://unix.stackexchange.com/a/14727 and https://forensicswiki.xyz/page/ZIP
+ # So, we can ignore the DOS FAT attributes because python ZipFile.extract() already takes care of creating the directories
+ # as needed (both on win and *nix) and the other DOS FAT attributes (hidden/read-only/system-file/etc) are not interesting
+ # here (not even on Windows, since we don't care about setting those extra attributes for our use case).
+ # So we do this:
+ # 1. When uncompressing on a Windows system we just call to extract().
+ # 2. When uncompressing on an Unix-like system we only take care of the attributes if the zip file was created on an
+ # Unix-like system, otherwise we don't have any info about the file permissions other than the DOS FAT attributes,
+ # which are useless here, so just call to extract() without setting any specific file permission in that case.
+ if info.create_system == 0 or sys.platform == 'win32':
+ zip_data.extract(info, path=dest)
+ else:
+ stat_st_mode = info.external_attr >> 16
+ info_dst_path = os.path.join(dest, info.filename)
+ if stat.S_ISLNK(stat_st_mode):
+ # Symlinks are stored in the ZIP file as text files that contain inside the target filename of the symlink.
+ # Recreate the symlink instead of calling extract() when an entry with the attribute stat.S_IFLNK is detected.
+ link_src_path = zip_data.read(info)
+ link_dst_dir = os.path.dirname(info_dst_path)
+ if not os.path.isdir(link_dst_dir):
+ os.makedirs(link_dst_dir)
+
+ # Remove existing link if exists.
+ if os.path.islink(info_dst_path):
+ os.unlink(info_dst_path)
+ os.symlink(link_src_path, info_dst_path)
+ else:
+ zip_data.extract(info, path=dest)
+ # Preserve bits 0-8 only: rwxrwxrwx (no sticky/setuid/setgid bits).
+ perm = stat_st_mode & 0x1FF
+ os.chmod(info_dst_path, perm)
+
+
+def get(url):
+ """Issue GET request to a given URL and return the response."""
+ import requests
+
+ logger.debug("GET %s" % url)
+ resp = requests.get(url, stream=True)
+ resp.raise_for_status()
+ return resp
+
+
+def get_download_to_descriptor(fd, url, max_retries=5):
+ """Download an URL in chunks and saves it to a file descriptor (truncating it)
+ It doesn't close the descriptor, but flushes it on success.
+ It retries the download in case of ECONNRESET up to max_retries.
+ This function is meant to download big files directly to the disk without
+ caching the whole file in memory.
+ """
+ if max_retries < 1:
+ max_retries = 1
+ wait = 2
+ for current_retry in range(1, max_retries+1):
+ try:
+ logger.info("Downloading %s Try %d/%d" % (url, current_retry, max_retries))
+ resp = urlopen(url)
+ # We may come here in a retry, ensure to truncate fd before start writing.
+ fd.seek(0)
+ fd.truncate(0)
+ while True:
+ chunk = resp.read(16*1024)
+ if not chunk:
+ break # Download finished
+ fd.write(chunk)
+ fd.flush()
+ # Success
+ return
+ except SocketError as e:
+ if current_retry < max_retries and e.errno == errno.ECONNRESET:
+ # Retry
+ logger.error("Connection reset by peer. Retrying after %ds..." % wait)
+ time.sleep(wait)
+ wait *= 2
+ else:
+ # Maximum retries or unknown error
+ raise
+
+def rmtree(path):
+ # This works around two issues:
+ # 1. Cannot delete read-only files owned by us (e.g. files extracted from tarballs)
+ # 2. On Windows, we sometimes just need to retry in case the file handler
+ # hasn't been fully released (a common issue).
+ def handle_remove_readonly(func, path, exc):
+ excvalue = exc[1]
+ if func in (os.rmdir, os.remove, os.unlink) and excvalue.errno == errno.EACCES:
+ os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
+ func(path)
+ else:
+ raise
+
+ return shutil.rmtree(path, onerror=handle_remove_readonly)
+
+
+def sha256sum(file_path):
+ """Computes the SHA256 hash sum of a file"""
+ from hashlib import sha256
+ hash = sha256()
+ with open(file_path, 'rb') as f:
+ for chunk in iter(lambda: f.read(4096), b''):
+ hash.update(chunk)
+ return hash.hexdigest()