summaryrefslogtreecommitdiffstats
path: root/qa/shell.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-03-11 08:03:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-03-11 08:03:03 +0000
commitde139943d8272773b5f19ed824d687b0232b9ba3 (patch)
tree47e73755bffd41bdde2d59d76cc595f5a1fa75d4 /qa/shell.py
parentAdding upstream version 0.19.0~dev. (diff)
downloadgitlint-upstream.tar.xz
gitlint-upstream.zip
Adding upstream version 0.19.1.upstream/0.19.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--qa/shell.py83
1 files changed, 56 insertions, 27 deletions
diff --git a/qa/shell.py b/qa/shell.py
index 44716c0..3ef874d 100644
--- a/qa/shell.py
+++ b/qa/shell.py
@@ -2,24 +2,31 @@
# on gitlint internals for our integration testing framework.
import subprocess
-from qa.utils import USE_SH_LIB, DEFAULT_ENCODING
+
+from qa.utils import TERMINAL_ENCODING, USE_SH_LIB
if USE_SH_LIB:
- from sh import git, echo, gitlint # pylint: disable=unused-import,no-name-in-module,import-error
+ from sh import (
+ echo,
+ git,
+ gitlint,
+ )
- gitlint = gitlint.bake(_unify_ttys=True, _tty_in=True) # pylint: disable=invalid-name
+ gitlint = gitlint.bake(_unify_ttys=True, _tty_in=True)
# import exceptions separately, this makes it a little easier to mock them out in the unit tests
- from sh import CommandNotFound, ErrorReturnCode, RunningCommand # pylint: disable=import-error
+ from sh import (
+ CommandNotFound,
+ ErrorReturnCode,
+ RunningCommand,
+ )
else:
class CommandNotFound(Exception):
"""Exception indicating a command was not found during execution"""
- pass
-
class RunningCommand:
- pass
+ ...
class ShResult(RunningCommand):
"""Result wrapper class. We use this to more easily migrate from using https://amoffat.github.io/sh/ to using
@@ -29,18 +36,35 @@ else:
self.full_cmd = full_cmd
# TODO(jorisroovers): The 'sh' library by default will merge stdout and stderr. We mimic this behavior
# for now until we fully remove the 'sh' library.
- self.stdout = stdout + stderr.decode(DEFAULT_ENCODING)
- self.stderr = stderr
+ self._stdout = stdout + stderr
+ self._stderr = stderr
self.exit_code = exitcode
def __str__(self):
+ return self.stdout.decode(TERMINAL_ENCODING)
+
+ def __unicode__(self):
return self.stdout
+ @property
+ def stdout(self):
+ return self._stdout
+
+ @property
+ def stderr(self):
+ return self._stderr
+
+ def __getattr__(self, p):
+ # https://github.com/amoffat/sh/blob/e0ed8e244e9d973ef4e0749b2b3c2695e7b5255b/sh.py#L952=
+ _unicode_methods = set(dir(str())) # noqa
+ if p in _unicode_methods:
+ return getattr(str(self), p)
+
+ raise AttributeError
+
class ErrorReturnCode(ShResult, Exception):
"""ShResult subclass for unexpected results (acts as an exception)."""
- pass
-
def git(*command_parts, **kwargs):
return run_command("git", *command_parts, **kwargs)
@@ -51,31 +75,36 @@ else:
return run_command("gitlint", *command_parts, **kwargs)
def run_command(command, *args, **kwargs):
- args = [command] + list(args)
- result = _exec(*args, **kwargs)
- # If we reach this point and the result has an exit_code that is larger than 0, this means that we didn't
- # get an exception (which is the default sh behavior for non-zero exit codes) and so the user is expecting
- # a non-zero exit code -> just return the entire result
- if hasattr(result, "exit_code") and result.exit_code > 0:
- return result
- return str(result)
+ args = [command, *list(args)]
+ return _exec(*args, **kwargs)
def _exec(*args, **kwargs):
- pipe = subprocess.PIPE
- popen_kwargs = {"stdout": pipe, "stderr": pipe, "shell": kwargs.get("_tty_out", False)}
- if "_cwd" in kwargs:
- popen_kwargs["cwd"] = kwargs["_cwd"]
- if "_env" in kwargs:
- popen_kwargs["env"] = kwargs["_env"]
+ popen_kwargs = {
+ "stdout": subprocess.PIPE,
+ "stderr": subprocess.PIPE,
+ "stdin": subprocess.PIPE,
+ "shell": kwargs.get("_tty_out", False),
+ "cwd": kwargs.get("_cwd", None),
+ "env": kwargs.get("_env", None),
+ }
+
+ stdin_input = None
+ if len(args) > 1 and isinstance(args[1], ShResult):
+ stdin_input = args[1].stdout
+ # pop args[1] from the array and use it as stdin
+ args = list(args)
+ args.pop(1)
+ popen_kwargs["stdin"] = subprocess.PIPE
try:
with subprocess.Popen(args, **popen_kwargs) as p:
- result = p.communicate()
+ result = p.communicate(stdin_input)
+
except FileNotFoundError as exc:
raise CommandNotFound from exc
exit_code = p.returncode
- stdout = result[0].decode(DEFAULT_ENCODING)
+ stdout = result[0]
stderr = result[1] # 'sh' does not decode the stderr bytes to unicode
full_cmd = "" if args is None else " ".join(args)