Adding upstream version 1:10.0.2+ds.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
This commit is contained in:
parent
bf2768bd0f
commit
ea34ddeea6
37998 changed files with 9510514 additions and 0 deletions
107
tests/functional/qemu_test/uncompress.py
Normal file
107
tests/functional/qemu_test/uncompress.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
# SPDX-License-Identifier: GPL-2.0-or-later
|
||||
#
|
||||
# Utilities for python-based QEMU tests
|
||||
#
|
||||
# Copyright 2024 Red Hat, Inc.
|
||||
#
|
||||
# Authors:
|
||||
# Thomas Huth <thuth@redhat.com>
|
||||
|
||||
import gzip
|
||||
import lzma
|
||||
import os
|
||||
import stat
|
||||
import shutil
|
||||
from urllib.parse import urlparse
|
||||
from subprocess import run, CalledProcessError, DEVNULL
|
||||
|
||||
from .asset import Asset
|
||||
|
||||
|
||||
def gzip_uncompress(gz_path, output_path):
|
||||
if os.path.exists(output_path):
|
||||
return
|
||||
with gzip.open(gz_path, 'rb') as gz_in:
|
||||
try:
|
||||
with open(output_path, 'wb') as raw_out:
|
||||
shutil.copyfileobj(gz_in, raw_out)
|
||||
except:
|
||||
os.remove(output_path)
|
||||
raise
|
||||
|
||||
def lzma_uncompress(xz_path, output_path):
|
||||
if os.path.exists(output_path):
|
||||
return
|
||||
with lzma.open(xz_path, 'rb') as lzma_in:
|
||||
try:
|
||||
with open(output_path, 'wb') as raw_out:
|
||||
shutil.copyfileobj(lzma_in, raw_out)
|
||||
except:
|
||||
os.remove(output_path)
|
||||
raise
|
||||
|
||||
|
||||
def zstd_uncompress(zstd_path, output_path):
|
||||
if os.path.exists(output_path):
|
||||
return
|
||||
|
||||
try:
|
||||
run(['zstd', "-f", "-d", zstd_path,
|
||||
"-o", output_path], capture_output=True, check=True)
|
||||
except CalledProcessError as e:
|
||||
os.remove(output_path)
|
||||
raise Exception(
|
||||
f"Unable to decompress zstd file {zstd_path} with {e}") from e
|
||||
|
||||
# zstd copies source archive permissions for the output
|
||||
# file, so must make this writable for QEMU
|
||||
os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)
|
||||
|
||||
|
||||
'''
|
||||
@params compressed: filename, Asset, or file-like object to uncompress
|
||||
@params uncompressed: filename to uncompress into
|
||||
@params format: optional compression format (gzip, lzma)
|
||||
|
||||
Uncompresses @compressed into @uncompressed
|
||||
|
||||
If @format is None, heuristics will be applied to guess the format
|
||||
from the filename or Asset URL. @format must be non-None if @uncompressed
|
||||
is a file-like object.
|
||||
|
||||
Returns the fully qualified path to the uncompessed file
|
||||
'''
|
||||
def uncompress(compressed, uncompressed, format=None):
|
||||
if format is None:
|
||||
format = guess_uncompress_format(compressed)
|
||||
|
||||
if format == "xz":
|
||||
lzma_uncompress(str(compressed), uncompressed)
|
||||
elif format == "gz":
|
||||
gzip_uncompress(str(compressed), uncompressed)
|
||||
elif format == "zstd":
|
||||
zstd_uncompress(str(compressed), uncompressed)
|
||||
else:
|
||||
raise Exception(f"Unknown compression format {format}")
|
||||
|
||||
'''
|
||||
@params compressed: filename, Asset, or file-like object to guess
|
||||
|
||||
Guess the format of @compressed, raising an exception if
|
||||
no format can be determined
|
||||
'''
|
||||
def guess_uncompress_format(compressed):
|
||||
if type(compressed) == Asset:
|
||||
compressed = urlparse(compressed.url).path
|
||||
elif type(compressed) != str:
|
||||
raise Exception(f"Unable to guess compression cformat for {compressed}")
|
||||
|
||||
(name, ext) = os.path.splitext(compressed)
|
||||
if ext == ".xz":
|
||||
return "xz"
|
||||
elif ext == ".gz":
|
||||
return "gz"
|
||||
elif ext in [".zstd", ".zst"]:
|
||||
return 'zstd'
|
||||
else:
|
||||
raise Exception(f"Unknown compression format for {compressed}")
|
Loading…
Add table
Add a link
Reference in a new issue