diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 03:32:49 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 03:32:49 +0000 |
commit | 8053187731ae8e3eb368d8360989cf5fd6eed9f7 (patch) | |
tree | 32bada84ff5d7460cdf3934fcbdbe770d6afe4cd /src/tests/cli_common.py | |
parent | Initial commit. (diff) | |
download | rnp-8053187731ae8e3eb368d8360989cf5fd6eed9f7.tar.xz rnp-8053187731ae8e3eb368d8360989cf5fd6eed9f7.zip |
Adding upstream version 0.17.0.upstream/0.17.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tests/cli_common.py')
-rw-r--r-- | src/tests/cli_common.py | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/src/tests/cli_common.py b/src/tests/cli_common.py new file mode 100644 index 0000000..12bf5d8 --- /dev/null +++ b/src/tests/cli_common.py @@ -0,0 +1,237 @@ +import sys +import distutils.spawn +import random +import string +import logging +import os +import re +from subprocess import Popen, PIPE + +RNP_ROOT = None +WORKDIR = '' +CONSOLE_ENCODING = 'UTF-8' + +class CLIError(Exception): + def __init__(self, message, log = None): + super(CLIError, self).__init__(message) + self.message = message + self.log = log + logging.info(self.message) + logging.debug(self.log.strip()) + + def __str__(self): + return self.message + '\n' + self.log + +def set_workdir(dir): + global WORKDIR + WORKDIR = dir + +def is_windows(): + return sys.platform.startswith('win') or sys.platform.startswith('msys') + +def path_for_gpg(path): + # GPG built for mingw/msys doesn't work with Windows paths + if re.match(r'^[a-z]:[\\\/].*', path.lower()): + path = '/' + path[0] + '/' + path[3:].replace('\\', '/') + return path + +def raise_err(msg, log = None): + raise CLIError(msg, log) + +def size_to_readable(num, suffix = 'B'): + for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) + +def list_upto(lst, count): + return (list(lst)*(count//len(lst)+1))[:count] + +def pswd_pipe(password): + pr, pw = os.pipe() + with os.fdopen(pw, 'w') as fw: + fw.write(password) + fw.write('\n') + fw.write(password) + os.set_inheritable(pr, True) + + if not is_windows(): + return pr + # On Windows pipe is not inheritable so dup() is needed + prd = os.dup(pr) + os.close(pr) + return prd + +def random_text(path, size): + # Generate random text, with 50% probability good-compressible + if random.randint(0, 10) < 5: + st = ''.join(random.choice(string.ascii_letters + string.digits + " \t\r\n-,.") + for _ in range(size)) + else: + st = ''.join(random.choice("abcdef0123456789 \t\r\n-,.") for _ in range(size)) + with open(path, 'w+') as f: + f.write(st) + +def file_text(path, encoding = CONSOLE_ENCODING): + with open(path, 'rb') as f: + return f.read().decode(encoding).replace('\r\r', '\r') + +def find_utility(name, exitifnone = True): + path = distutils.spawn.find_executable(name) + if not path and exitifnone: + logging.error('Cannot find utility {}. Exiting.'.format(name)) + sys.exit(1) + + return path + +def rnp_file_path(relpath, check = True): + global RNP_ROOT + if not RNP_ROOT: + pypath = os.path.dirname(__file__) + RNP_ROOT = os.path.realpath(os.path.join(pypath, '../..')) + + fpath = os.path.realpath(os.path.join(RNP_ROOT, relpath)) + + if check and not os.path.isfile(fpath): + raise NameError('rnp: file ' + relpath + ' not found') + + return fpath + +def run_proc_windows(proc, params, stdin=None): + exe = os.path.basename(proc) + # test special quote cases + params = list(map(lambda st: st.replace('"', '\\"'), params)) + # We need to escape empty parameters/ones with spaces with quotes + params = tuple(map(lambda st: st if (st and not any(x in st for x in [' ','\r','\t'])) else '"%s"' % st, [exe] + params)) + logging.debug((proc + ' ' + ' '.join(params)).strip()) + logging.debug('Working directory: ' + os.getcwd()) + sys.stdout.flush() + + stdin_path = os.path.join(WORKDIR, 'stdin.txt') + stdout_path = os.path.join(WORKDIR, 'stdout.txt') + stderr_path = os.path.join(WORKDIR, 'stderr.txt') + pass_path = os.path.join(WORKDIR, 'pass.txt') + passfd = 0 + passfo = None + try: + idx = params.index('--pass-fd') + if idx < len(params): + passfd = int(params[idx+1]) + passfo = os.fdopen(passfd, 'r', closefd=False) + except (ValueError, OSError): + # Ignore if pass-fd is invalid/could not be opened + pass + # We may use pipes here (ensuring we use dup to inherit handles), but those have limited buffer + # so we'll need to poll process + if stdin: + with open(stdin_path, "wb+") as stdinf: + stdinf.write(stdin.encode() if isinstance(stdin, str) else stdin) + stdin_fl = os.open(stdin_path, os.O_RDONLY | os.O_BINARY) + stdin_no = sys.stdin.fileno() + stdin_cp = os.dup(stdin_no) + else: + stdin_fl = None + stdin_no = -1 + stdin_cp = None + + stdout_fl = os.open(stdout_path, os.O_CREAT | os.O_RDWR | os.O_BINARY) + stdout_no = sys.stdout.fileno() + stdout_cp = os.dup(stdout_no) + stderr_fl = os.open(stderr_path, os.O_CREAT | os.O_RDWR | os.O_BINARY) + stderr_no = sys.stderr.fileno() + stderr_cp = os.dup(stderr_no) + if passfo: + with open(pass_path, "w+") as passf: + passf.write(passfo.read()) + pass_fl = os.open(pass_path, os.O_RDONLY | os.O_BINARY) + pass_cp = os.dup(passfd) + + retcode = -1 + try: + os.dup2(stdout_fl, stdout_no) + os.close(stdout_fl) + os.dup2(stderr_fl, stderr_no) + os.close(stderr_fl) + if stdin: + os.dup2(stdin_fl, stdin_no) + os.close(stdin_fl) + if passfo: + os.dup2(pass_fl, passfd) + os.close(pass_fl) + retcode = os.spawnv(os.P_WAIT, proc, params) + finally: + os.dup2(stdout_cp, stdout_no) + os.close(stdout_cp) + os.dup2(stderr_cp, stderr_no) + os.close(stderr_cp) + if stdin: + os.dup2(stdin_cp, stdin_no) + os.close(stdin_cp) + if passfo: + os.dup2(pass_cp, passfd) + os.close(pass_cp) + passfo.close() + out = file_text(stdout_path).replace('\r\n', '\n') + err = file_text(stderr_path).replace('\r\n', '\n') + os.unlink(stdout_path) + os.unlink(stderr_path) + if stdin: + os.unlink(stdin_path) + if passfo: + os.unlink(pass_path) + logging.debug(err.strip()) + logging.debug(out.strip()) + return (retcode, out, err) + +if sys.version_info >= (3,): + def decode_string_escape(s): + bts = bytes(s, 'utf-8') + result = u'' + candidate = bytearray() + utf = bytearray() + for b in bts: + if b > 0x7F: + if len(candidate) > 0: + result += candidate.decode('unicode-escape') + candidate.clear() + utf.append(b) + else: + if len(utf) > 0: + result += utf.decode('utf-8') + utf.clear() + candidate.append(b) + if len(candidate) > 0: + result += candidate.decode('unicode-escape') + if len(utf) > 0: + result += utf.decode('utf-8') + return result + def _decode(s): + return s +else: # Python 2 + def decode_string_escape(s): + return s.encode(CONSOLE_ENCODING).decode('decode_string_escape') + def _decode(x): + return x.decode(CONSOLE_ENCODING) + +def run_proc(proc, params, stdin=None): + # On Windows we need to use spawnv() for handle inheritance in pswd_pipe() + if is_windows(): + return run_proc_windows(proc, params, stdin) + paramline = u' '.join(map(_decode, params)) + logging.debug((proc + ' ' + paramline).strip()) + param_bytes = list(map(lambda x: x.encode(CONSOLE_ENCODING), params)) + process = Popen([proc] + param_bytes, stdout=PIPE, stderr=PIPE, + stdin=PIPE if stdin else None, close_fds=False, + universal_newlines=True) + output, errout = process.communicate(stdin) + retcode = process.poll() + logging.debug(errout.strip()) + logging.debug(output.strip()) + + return (retcode, output, errout) + +def run_proc_fast(proc, params): + with open(os.devnull, 'w') as devnull: + proc = Popen([proc] + params, stdout=devnull, stderr=devnull) + return proc.wait() |