summaryrefslogtreecommitdiffstats
path: root/src/tests/cli_common.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 03:32:49 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 03:32:49 +0000
commit8053187731ae8e3eb368d8360989cf5fd6eed9f7 (patch)
tree32bada84ff5d7460cdf3934fcbdbe770d6afe4cd /src/tests/cli_common.py
parentInitial commit. (diff)
downloadrnp-8053187731ae8e3eb368d8360989cf5fd6eed9f7.tar.xz
rnp-8053187731ae8e3eb368d8360989cf5fd6eed9f7.zip
Adding upstream version 0.17.0.upstream/0.17.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tests/cli_common.py')
-rw-r--r--src/tests/cli_common.py237
1 files changed, 237 insertions, 0 deletions
diff --git a/src/tests/cli_common.py b/src/tests/cli_common.py
new file mode 100644
index 0000000..12bf5d8
--- /dev/null
+++ b/src/tests/cli_common.py
@@ -0,0 +1,237 @@
+import sys
+import distutils.spawn
+import random
+import string
+import logging
+import os
+import re
+from subprocess import Popen, PIPE
+
+RNP_ROOT = None
+WORKDIR = ''
+CONSOLE_ENCODING = 'UTF-8'
+
+class CLIError(Exception):
+ def __init__(self, message, log = None):
+ super(CLIError, self).__init__(message)
+ self.message = message
+ self.log = log
+ logging.info(self.message)
+ logging.debug(self.log.strip())
+
+ def __str__(self):
+ return self.message + '\n' + self.log
+
+def set_workdir(dir):
+ global WORKDIR
+ WORKDIR = dir
+
+def is_windows():
+ return sys.platform.startswith('win') or sys.platform.startswith('msys')
+
+def path_for_gpg(path):
+ # GPG built for mingw/msys doesn't work with Windows paths
+ if re.match(r'^[a-z]:[\\\/].*', path.lower()):
+ path = '/' + path[0] + '/' + path[3:].replace('\\', '/')
+ return path
+
+def raise_err(msg, log = None):
+ raise CLIError(msg, log)
+
+def size_to_readable(num, suffix = 'B'):
+ for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
+ if abs(num) < 1024.0:
+ return "%3.1f%s%s" % (num, unit, suffix)
+ num /= 1024.0
+ return "%.1f%s%s" % (num, 'Yi', suffix)
+
+def list_upto(lst, count):
+ return (list(lst)*(count//len(lst)+1))[:count]
+
+def pswd_pipe(password):
+ pr, pw = os.pipe()
+ with os.fdopen(pw, 'w') as fw:
+ fw.write(password)
+ fw.write('\n')
+ fw.write(password)
+ os.set_inheritable(pr, True)
+
+ if not is_windows():
+ return pr
+ # On Windows pipe is not inheritable so dup() is needed
+ prd = os.dup(pr)
+ os.close(pr)
+ return prd
+
+def random_text(path, size):
+ # Generate random text, with 50% probability good-compressible
+ if random.randint(0, 10) < 5:
+ st = ''.join(random.choice(string.ascii_letters + string.digits + " \t\r\n-,.")
+ for _ in range(size))
+ else:
+ st = ''.join(random.choice("abcdef0123456789 \t\r\n-,.") for _ in range(size))
+ with open(path, 'w+') as f:
+ f.write(st)
+
+def file_text(path, encoding = CONSOLE_ENCODING):
+ with open(path, 'rb') as f:
+ return f.read().decode(encoding).replace('\r\r', '\r')
+
+def find_utility(name, exitifnone = True):
+ path = distutils.spawn.find_executable(name)
+ if not path and exitifnone:
+ logging.error('Cannot find utility {}. Exiting.'.format(name))
+ sys.exit(1)
+
+ return path
+
+def rnp_file_path(relpath, check = True):
+ global RNP_ROOT
+ if not RNP_ROOT:
+ pypath = os.path.dirname(__file__)
+ RNP_ROOT = os.path.realpath(os.path.join(pypath, '../..'))
+
+ fpath = os.path.realpath(os.path.join(RNP_ROOT, relpath))
+
+ if check and not os.path.isfile(fpath):
+ raise NameError('rnp: file ' + relpath + ' not found')
+
+ return fpath
+
+def run_proc_windows(proc, params, stdin=None):
+ exe = os.path.basename(proc)
+ # test special quote cases
+ params = list(map(lambda st: st.replace('"', '\\"'), params))
+ # We need to escape empty parameters/ones with spaces with quotes
+ params = tuple(map(lambda st: st if (st and not any(x in st for x in [' ','\r','\t'])) else '"%s"' % st, [exe] + params))
+ logging.debug((proc + ' ' + ' '.join(params)).strip())
+ logging.debug('Working directory: ' + os.getcwd())
+ sys.stdout.flush()
+
+ stdin_path = os.path.join(WORKDIR, 'stdin.txt')
+ stdout_path = os.path.join(WORKDIR, 'stdout.txt')
+ stderr_path = os.path.join(WORKDIR, 'stderr.txt')
+ pass_path = os.path.join(WORKDIR, 'pass.txt')
+ passfd = 0
+ passfo = None
+ try:
+ idx = params.index('--pass-fd')
+ if idx < len(params):
+ passfd = int(params[idx+1])
+ passfo = os.fdopen(passfd, 'r', closefd=False)
+ except (ValueError, OSError):
+ # Ignore if pass-fd is invalid/could not be opened
+ pass
+ # We may use pipes here (ensuring we use dup to inherit handles), but those have limited buffer
+ # so we'll need to poll process
+ if stdin:
+ with open(stdin_path, "wb+") as stdinf:
+ stdinf.write(stdin.encode() if isinstance(stdin, str) else stdin)
+ stdin_fl = os.open(stdin_path, os.O_RDONLY | os.O_BINARY)
+ stdin_no = sys.stdin.fileno()
+ stdin_cp = os.dup(stdin_no)
+ else:
+ stdin_fl = None
+ stdin_no = -1
+ stdin_cp = None
+
+ stdout_fl = os.open(stdout_path, os.O_CREAT | os.O_RDWR | os.O_BINARY)
+ stdout_no = sys.stdout.fileno()
+ stdout_cp = os.dup(stdout_no)
+ stderr_fl = os.open(stderr_path, os.O_CREAT | os.O_RDWR | os.O_BINARY)
+ stderr_no = sys.stderr.fileno()
+ stderr_cp = os.dup(stderr_no)
+ if passfo:
+ with open(pass_path, "w+") as passf:
+ passf.write(passfo.read())
+ pass_fl = os.open(pass_path, os.O_RDONLY | os.O_BINARY)
+ pass_cp = os.dup(passfd)
+
+ retcode = -1
+ try:
+ os.dup2(stdout_fl, stdout_no)
+ os.close(stdout_fl)
+ os.dup2(stderr_fl, stderr_no)
+ os.close(stderr_fl)
+ if stdin:
+ os.dup2(stdin_fl, stdin_no)
+ os.close(stdin_fl)
+ if passfo:
+ os.dup2(pass_fl, passfd)
+ os.close(pass_fl)
+ retcode = os.spawnv(os.P_WAIT, proc, params)
+ finally:
+ os.dup2(stdout_cp, stdout_no)
+ os.close(stdout_cp)
+ os.dup2(stderr_cp, stderr_no)
+ os.close(stderr_cp)
+ if stdin:
+ os.dup2(stdin_cp, stdin_no)
+ os.close(stdin_cp)
+ if passfo:
+ os.dup2(pass_cp, passfd)
+ os.close(pass_cp)
+ passfo.close()
+ out = file_text(stdout_path).replace('\r\n', '\n')
+ err = file_text(stderr_path).replace('\r\n', '\n')
+ os.unlink(stdout_path)
+ os.unlink(stderr_path)
+ if stdin:
+ os.unlink(stdin_path)
+ if passfo:
+ os.unlink(pass_path)
+ logging.debug(err.strip())
+ logging.debug(out.strip())
+ return (retcode, out, err)
+
+if sys.version_info >= (3,):
+ def decode_string_escape(s):
+ bts = bytes(s, 'utf-8')
+ result = u''
+ candidate = bytearray()
+ utf = bytearray()
+ for b in bts:
+ if b > 0x7F:
+ if len(candidate) > 0:
+ result += candidate.decode('unicode-escape')
+ candidate.clear()
+ utf.append(b)
+ else:
+ if len(utf) > 0:
+ result += utf.decode('utf-8')
+ utf.clear()
+ candidate.append(b)
+ if len(candidate) > 0:
+ result += candidate.decode('unicode-escape')
+ if len(utf) > 0:
+ result += utf.decode('utf-8')
+ return result
+ def _decode(s):
+ return s
+else: # Python 2
+ def decode_string_escape(s):
+ return s.encode(CONSOLE_ENCODING).decode('decode_string_escape')
+ def _decode(x):
+ return x.decode(CONSOLE_ENCODING)
+
+def run_proc(proc, params, stdin=None):
+ # On Windows we need to use spawnv() for handle inheritance in pswd_pipe()
+ if is_windows():
+ return run_proc_windows(proc, params, stdin)
+ paramline = u' '.join(map(_decode, params))
+ logging.debug((proc + ' ' + paramline).strip())
+ param_bytes = list(map(lambda x: x.encode(CONSOLE_ENCODING), params))
+ process = Popen([proc] + param_bytes, stdout=PIPE, stderr=PIPE,
+ stdin=PIPE if stdin else None, close_fds=False,
+ universal_newlines=True)
+ output, errout = process.communicate(stdin)
+ retcode = process.poll()
+ logging.debug(errout.strip())
+ logging.debug(output.strip())
+
+ return (retcode, output, errout)
+
+def run_proc_fast(proc, params):
+ with open(os.devnull, 'w') as devnull:
+ proc = Popen([proc] + params, stdout=devnull, stderr=devnull)
+ return proc.wait()