diff options
Diffstat (limited to '')
-rw-r--r-- | qa/shell.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/qa/shell.py b/qa/shell.py new file mode 100644 index 0000000..3ef874d --- /dev/null +++ b/qa/shell.py @@ -0,0 +1,118 @@ +# This code is mostly duplicated from the `gitlint.shell` module. We consciously duplicate this code as to not depend +# on gitlint internals for our integration testing framework. + +import subprocess + +from qa.utils import TERMINAL_ENCODING, USE_SH_LIB + +if USE_SH_LIB: + from sh import ( + echo, + git, + gitlint, + ) + + gitlint = gitlint.bake(_unify_ttys=True, _tty_in=True) + + # import exceptions separately, this makes it a little easier to mock them out in the unit tests + from sh import ( + CommandNotFound, + ErrorReturnCode, + RunningCommand, + ) +else: + + class CommandNotFound(Exception): + """Exception indicating a command was not found during execution""" + + class RunningCommand: + ... + + class ShResult(RunningCommand): + """Result wrapper class. We use this to more easily migrate from using https://amoffat.github.io/sh/ to using + the builtin subprocess module.""" + + def __init__(self, full_cmd, stdout, stderr="", exitcode=0): + self.full_cmd = full_cmd + # TODO(jorisroovers): The 'sh' library by default will merge stdout and stderr. We mimic this behavior + # for now until we fully remove the 'sh' library. + self._stdout = stdout + stderr + self._stderr = stderr + self.exit_code = exitcode + + def __str__(self): + return self.stdout.decode(TERMINAL_ENCODING) + + def __unicode__(self): + return self.stdout + + @property + def stdout(self): + return self._stdout + + @property + def stderr(self): + return self._stderr + + def __getattr__(self, p): + # https://github.com/amoffat/sh/blob/e0ed8e244e9d973ef4e0749b2b3c2695e7b5255b/sh.py#L952= + _unicode_methods = set(dir(str())) # noqa + if p in _unicode_methods: + return getattr(str(self), p) + + raise AttributeError + + class ErrorReturnCode(ShResult, Exception): + """ShResult subclass for unexpected results (acts as an exception).""" + + def git(*command_parts, **kwargs): + return run_command("git", *command_parts, **kwargs) + + def echo(*command_parts, **kwargs): + return run_command("echo", *command_parts, **kwargs) + + def gitlint(*command_parts, **kwargs): + return run_command("gitlint", *command_parts, **kwargs) + + def run_command(command, *args, **kwargs): + args = [command, *list(args)] + return _exec(*args, **kwargs) + + def _exec(*args, **kwargs): + popen_kwargs = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "stdin": subprocess.PIPE, + "shell": kwargs.get("_tty_out", False), + "cwd": kwargs.get("_cwd", None), + "env": kwargs.get("_env", None), + } + + stdin_input = None + if len(args) > 1 and isinstance(args[1], ShResult): + stdin_input = args[1].stdout + # pop args[1] from the array and use it as stdin + args = list(args) + args.pop(1) + popen_kwargs["stdin"] = subprocess.PIPE + + try: + with subprocess.Popen(args, **popen_kwargs) as p: + result = p.communicate(stdin_input) + + except FileNotFoundError as exc: + raise CommandNotFound from exc + + exit_code = p.returncode + stdout = result[0] + stderr = result[1] # 'sh' does not decode the stderr bytes to unicode + full_cmd = "" if args is None else " ".join(args) + + # If not _ok_code is specified, then only a 0 exit code is allowed + ok_exit_codes = kwargs.get("_ok_code", [0]) + + if exit_code in ok_exit_codes: + return ShResult(full_cmd, stdout, stderr, exit_code) + + # Unexpected error code => raise ErrorReturnCode + raise ErrorReturnCode(full_cmd, stdout, stderr, p.returncode) |