From de139943d8272773b5f19ed824d687b0232b9ba3 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 11 Mar 2023 09:03:03 +0100 Subject: Adding upstream version 0.19.1. Signed-off-by: Daniel Baumann --- qa/shell.py | 83 +++++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 27 deletions(-) (limited to 'qa/shell.py') diff --git a/qa/shell.py b/qa/shell.py index 44716c0..3ef874d 100644 --- a/qa/shell.py +++ b/qa/shell.py @@ -2,24 +2,31 @@ # on gitlint internals for our integration testing framework. import subprocess -from qa.utils import USE_SH_LIB, DEFAULT_ENCODING + +from qa.utils import TERMINAL_ENCODING, USE_SH_LIB if USE_SH_LIB: - from sh import git, echo, gitlint # pylint: disable=unused-import,no-name-in-module,import-error + from sh import ( + echo, + git, + gitlint, + ) - gitlint = gitlint.bake(_unify_ttys=True, _tty_in=True) # pylint: disable=invalid-name + gitlint = gitlint.bake(_unify_ttys=True, _tty_in=True) # import exceptions separately, this makes it a little easier to mock them out in the unit tests - from sh import CommandNotFound, ErrorReturnCode, RunningCommand # pylint: disable=import-error + from sh import ( + CommandNotFound, + ErrorReturnCode, + RunningCommand, + ) else: class CommandNotFound(Exception): """Exception indicating a command was not found during execution""" - pass - class RunningCommand: - pass + ... class ShResult(RunningCommand): """Result wrapper class. We use this to more easily migrate from using https://amoffat.github.io/sh/ to using @@ -29,18 +36,35 @@ else: self.full_cmd = full_cmd # TODO(jorisroovers): The 'sh' library by default will merge stdout and stderr. We mimic this behavior # for now until we fully remove the 'sh' library. - self.stdout = stdout + stderr.decode(DEFAULT_ENCODING) - self.stderr = stderr + self._stdout = stdout + stderr + self._stderr = stderr self.exit_code = exitcode def __str__(self): + return self.stdout.decode(TERMINAL_ENCODING) + + def __unicode__(self): return self.stdout + @property + def stdout(self): + return self._stdout + + @property + def stderr(self): + return self._stderr + + def __getattr__(self, p): + # https://github.com/amoffat/sh/blob/e0ed8e244e9d973ef4e0749b2b3c2695e7b5255b/sh.py#L952= + _unicode_methods = set(dir(str())) # noqa + if p in _unicode_methods: + return getattr(str(self), p) + + raise AttributeError + class ErrorReturnCode(ShResult, Exception): """ShResult subclass for unexpected results (acts as an exception).""" - pass - def git(*command_parts, **kwargs): return run_command("git", *command_parts, **kwargs) @@ -51,31 +75,36 @@ else: return run_command("gitlint", *command_parts, **kwargs) def run_command(command, *args, **kwargs): - args = [command] + list(args) - result = _exec(*args, **kwargs) - # If we reach this point and the result has an exit_code that is larger than 0, this means that we didn't - # get an exception (which is the default sh behavior for non-zero exit codes) and so the user is expecting - # a non-zero exit code -> just return the entire result - if hasattr(result, "exit_code") and result.exit_code > 0: - return result - return str(result) + args = [command, *list(args)] + return _exec(*args, **kwargs) def _exec(*args, **kwargs): - pipe = subprocess.PIPE - popen_kwargs = {"stdout": pipe, "stderr": pipe, "shell": kwargs.get("_tty_out", False)} - if "_cwd" in kwargs: - popen_kwargs["cwd"] = kwargs["_cwd"] - if "_env" in kwargs: - popen_kwargs["env"] = kwargs["_env"] + popen_kwargs = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "stdin": subprocess.PIPE, + "shell": kwargs.get("_tty_out", False), + "cwd": kwargs.get("_cwd", None), + "env": kwargs.get("_env", None), + } + + stdin_input = None + if len(args) > 1 and isinstance(args[1], ShResult): + stdin_input = args[1].stdout + # pop args[1] from the array and use it as stdin + args = list(args) + args.pop(1) + popen_kwargs["stdin"] = subprocess.PIPE try: with subprocess.Popen(args, **popen_kwargs) as p: - result = p.communicate() + result = p.communicate(stdin_input) + except FileNotFoundError as exc: raise CommandNotFound from exc exit_code = p.returncode - stdout = result[0].decode(DEFAULT_ENCODING) + stdout = result[0] stderr = result[1] # 'sh' does not decode the stderr bytes to unicode full_cmd = "" if args is None else " ".join(args) -- cgit v1.2.3