summaryrefslogtreecommitdiffstats
path: root/qa/shell.py
diff options
context:
space:
mode:
Diffstat (limited to 'qa/shell.py')
-rw-r--r--qa/shell.py110
1 files changed, 69 insertions, 41 deletions
diff --git a/qa/shell.py b/qa/shell.py
index 8ba6dc1..3ef874d 100644
--- a/qa/shell.py
+++ b/qa/shell.py
@@ -1,43 +1,69 @@
-
-# This code is mostly duplicated from the `gitlint.shell` module. We conciously duplicate this code as to not depend
+# This code is mostly duplicated from the `gitlint.shell` module. We consciously duplicate this code as to not depend
# on gitlint internals for our integration testing framework.
import subprocess
-import sys
-from qa.utils import ustr, USE_SH_LIB
+
+from qa.utils import TERMINAL_ENCODING, USE_SH_LIB
if USE_SH_LIB:
- from sh import git, echo, gitlint # pylint: disable=unused-import,no-name-in-module,import-error
+ from sh import (
+ echo,
+ git,
+ gitlint,
+ )
+
+ gitlint = gitlint.bake(_unify_ttys=True, _tty_in=True)
# import exceptions separately, this makes it a little easier to mock them out in the unit tests
- from sh import CommandNotFound, ErrorReturnCode, RunningCommand # pylint: disable=import-error
+ from sh import (
+ CommandNotFound,
+ ErrorReturnCode,
+ RunningCommand,
+ )
else:
class CommandNotFound(Exception):
- """ Exception indicating a command was not found during execution """
- pass
+ """Exception indicating a command was not found during execution"""
- class RunningCommand(object):
- pass
+ class RunningCommand:
+ ...
class ShResult(RunningCommand):
- """ Result wrapper class. We use this to more easily migrate from using https://amoffat.github.io/sh/ to using
- the builtin subprocess module. """
+ """Result wrapper class. We use this to more easily migrate from using https://amoffat.github.io/sh/ to using
+ the builtin subprocess module."""
- def __init__(self, full_cmd, stdout, stderr='', exitcode=0):
+ def __init__(self, full_cmd, stdout, stderr="", exitcode=0):
self.full_cmd = full_cmd
# TODO(jorisroovers): The 'sh' library by default will merge stdout and stderr. We mimic this behavior
# for now until we fully remove the 'sh' library.
- self.stdout = stdout + ustr(stderr)
- self.stderr = stderr
+ self._stdout = stdout + stderr
+ self._stderr = stderr
self.exit_code = exitcode
def __str__(self):
+ return self.stdout.decode(TERMINAL_ENCODING)
+
+ def __unicode__(self):
return self.stdout
+ @property
+ def stdout(self):
+ return self._stdout
+
+ @property
+ def stderr(self):
+ return self._stderr
+
+ def __getattr__(self, p):
+ # https://github.com/amoffat/sh/blob/e0ed8e244e9d973ef4e0749b2b3c2695e7b5255b/sh.py#L952=
+ _unicode_methods = set(dir(str())) # noqa
+ if p in _unicode_methods:
+ return getattr(str(self), p)
+
+ raise AttributeError
+
class ErrorReturnCode(ShResult, Exception):
- """ ShResult subclass for unexpected results (acts as an exception). """
- pass
+ """ShResult subclass for unexpected results (acts as an exception)."""
def git(*command_parts, **kwargs):
return run_command("git", *command_parts, **kwargs)
@@ -49,39 +75,41 @@ else:
return run_command("gitlint", *command_parts, **kwargs)
def run_command(command, *args, **kwargs):
- args = [command] + list(args)
- result = _exec(*args, **kwargs)
- # If we reach this point and the result has an exit_code that is larger than 0, this means that we didn't
- # get an exception (which is the default sh behavior for non-zero exit codes) and so the user is expecting
- # a non-zero exit code -> just return the entire result
- if hasattr(result, 'exit_code') and result.exit_code > 0:
- return result
- return ustr(result)
+ args = [command, *list(args)]
+ return _exec(*args, **kwargs)
def _exec(*args, **kwargs):
- if sys.version_info[0] == 2:
- no_command_error = OSError # noqa pylint: disable=undefined-variable,invalid-name
- else:
- no_command_error = FileNotFoundError # noqa pylint: disable=undefined-variable
-
- pipe = subprocess.PIPE
- popen_kwargs = {'stdout': pipe, 'stderr': pipe, 'shell': kwargs.get('_tty_out', False)}
- if '_cwd' in kwargs:
- popen_kwargs['cwd'] = kwargs['_cwd']
+ popen_kwargs = {
+ "stdout": subprocess.PIPE,
+ "stderr": subprocess.PIPE,
+ "stdin": subprocess.PIPE,
+ "shell": kwargs.get("_tty_out", False),
+ "cwd": kwargs.get("_cwd", None),
+ "env": kwargs.get("_env", None),
+ }
+
+ stdin_input = None
+ if len(args) > 1 and isinstance(args[1], ShResult):
+ stdin_input = args[1].stdout
+ # pop args[1] from the array and use it as stdin
+ args = list(args)
+ args.pop(1)
+ popen_kwargs["stdin"] = subprocess.PIPE
try:
- p = subprocess.Popen(args, **popen_kwargs)
- result = p.communicate()
- except no_command_error:
- raise CommandNotFound
+ with subprocess.Popen(args, **popen_kwargs) as p:
+ result = p.communicate(stdin_input)
+
+ except FileNotFoundError as exc:
+ raise CommandNotFound from exc
exit_code = p.returncode
- stdout = ustr(result[0])
+ stdout = result[0]
stderr = result[1] # 'sh' does not decode the stderr bytes to unicode
- full_cmd = '' if args is None else ' '.join(args)
+ full_cmd = "" if args is None else " ".join(args)
# If not _ok_code is specified, then only a 0 exit code is allowed
- ok_exit_codes = kwargs.get('_ok_code', [0])
+ ok_exit_codes = kwargs.get("_ok_code", [0])
if exit_code in ok_exit_codes:
return ShResult(full_cmd, stdout, stderr, exit_code)