diff options
Diffstat (limited to 'pre_commit/util.py')
-rw-r--r-- | pre_commit/util.py | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/pre_commit/util.py b/pre_commit/util.py new file mode 100644 index 0000000..2db579a --- /dev/null +++ b/pre_commit/util.py @@ -0,0 +1,272 @@ +import contextlib +import errno +import functools +import os.path +import shutil +import stat +import subprocess +import sys +import tempfile +from types import TracebackType +from typing import Any +from typing import Callable +from typing import Dict +from typing import Generator +from typing import IO +from typing import Optional +from typing import Tuple +from typing import Type +from typing import Union + +import yaml + +from pre_commit import parse_shebang + +if sys.version_info >= (3, 7): # pragma: no cover (PY37+) + from importlib.resources import open_binary + from importlib.resources import read_text +else: # pragma: no cover (<PY37) + from importlib_resources import open_binary + from importlib_resources import read_text + +EnvironT = Union[Dict[str, str], 'os._Environ'] + +Loader = getattr(yaml, 'CSafeLoader', yaml.SafeLoader) +yaml_load = functools.partial(yaml.load, Loader=Loader) +Dumper = getattr(yaml, 'CSafeDumper', yaml.SafeDumper) + + +def yaml_dump(o: Any) -> str: + # when python/mypy#1484 is solved, this can be `functools.partial` + return yaml.dump( + o, Dumper=Dumper, default_flow_style=False, indent=4, sort_keys=False, + ) + + +def force_bytes(exc: Any) -> bytes: + with contextlib.suppress(TypeError): + return bytes(exc) + with contextlib.suppress(Exception): + return str(exc).encode() + return f'<unprintable {type(exc).__name__} object>'.encode() + + +@contextlib.contextmanager +def clean_path_on_failure(path: str) -> Generator[None, None, None]: + """Cleans up the directory on an exceptional failure.""" + try: + yield + except BaseException: + if os.path.exists(path): + rmtree(path) + raise + + +@contextlib.contextmanager +def tmpdir() -> Generator[str, None, None]: + """Contextmanager to create a temporary directory. It will be cleaned up + afterwards. + """ + tempdir = tempfile.mkdtemp() + try: + yield tempdir + finally: + rmtree(tempdir) + + +def resource_bytesio(filename: str) -> IO[bytes]: + return open_binary('pre_commit.resources', filename) + + +def resource_text(filename: str) -> str: + return read_text('pre_commit.resources', filename) + + +def make_executable(filename: str) -> None: + original_mode = os.stat(filename).st_mode + new_mode = original_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH + os.chmod(filename, new_mode) + + +class CalledProcessError(RuntimeError): + def __init__( + self, + returncode: int, + cmd: Tuple[str, ...], + expected_returncode: int, + stdout: bytes, + stderr: Optional[bytes], + ) -> None: + super().__init__(returncode, cmd, expected_returncode, stdout, stderr) + self.returncode = returncode + self.cmd = cmd + self.expected_returncode = expected_returncode + self.stdout = stdout + self.stderr = stderr + + def __bytes__(self) -> bytes: + def _indent_or_none(part: Optional[bytes]) -> bytes: + if part: + return b'\n ' + part.replace(b'\n', b'\n ') + else: + return b' (none)' + + return b''.join(( + f'command: {self.cmd!r}\n'.encode(), + f'return code: {self.returncode}\n'.encode(), + f'expected return code: {self.expected_returncode}\n'.encode(), + b'stdout:', _indent_or_none(self.stdout), b'\n', + b'stderr:', _indent_or_none(self.stderr), + )) + + def __str__(self) -> str: + return self.__bytes__().decode() + + +def _setdefault_kwargs(kwargs: Dict[str, Any]) -> None: + for arg in ('stdin', 'stdout', 'stderr'): + kwargs.setdefault(arg, subprocess.PIPE) + + +def _oserror_to_output(e: OSError) -> Tuple[int, bytes, None]: + return 1, force_bytes(e).rstrip(b'\n') + b'\n', None + + +def cmd_output_b( + *cmd: str, + retcode: Optional[int] = 0, + **kwargs: Any, +) -> Tuple[int, bytes, Optional[bytes]]: + _setdefault_kwargs(kwargs) + + try: + cmd = parse_shebang.normalize_cmd(cmd) + except parse_shebang.ExecutableNotFoundError as e: + returncode, stdout_b, stderr_b = e.to_output() + else: + try: + proc = subprocess.Popen(cmd, **kwargs) + except OSError as e: + returncode, stdout_b, stderr_b = _oserror_to_output(e) + else: + stdout_b, stderr_b = proc.communicate() + returncode = proc.returncode + + if retcode is not None and retcode != returncode: + raise CalledProcessError(returncode, cmd, retcode, stdout_b, stderr_b) + + return returncode, stdout_b, stderr_b + + +def cmd_output(*cmd: str, **kwargs: Any) -> Tuple[int, str, Optional[str]]: + returncode, stdout_b, stderr_b = cmd_output_b(*cmd, **kwargs) + stdout = stdout_b.decode() if stdout_b is not None else None + stderr = stderr_b.decode() if stderr_b is not None else None + return returncode, stdout, stderr + + +if os.name != 'nt': # pragma: win32 no cover + from os import openpty + import termios + + class Pty: + def __init__(self) -> None: + self.r: Optional[int] = None + self.w: Optional[int] = None + + def __enter__(self) -> 'Pty': + self.r, self.w = openpty() + + # tty flags normally change \n to \r\n + attrs = termios.tcgetattr(self.r) + assert isinstance(attrs[1], int) + attrs[1] &= ~(termios.ONLCR | termios.OPOST) + termios.tcsetattr(self.r, termios.TCSANOW, attrs) + + return self + + def close_w(self) -> None: + if self.w is not None: + os.close(self.w) + self.w = None + + def close_r(self) -> None: + assert self.r is not None + os.close(self.r) + self.r = None + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + self.close_w() + self.close_r() + + def cmd_output_p( + *cmd: str, + retcode: Optional[int] = 0, + **kwargs: Any, + ) -> Tuple[int, bytes, Optional[bytes]]: + assert retcode is None + assert kwargs['stderr'] == subprocess.STDOUT, kwargs['stderr'] + _setdefault_kwargs(kwargs) + + try: + cmd = parse_shebang.normalize_cmd(cmd) + except parse_shebang.ExecutableNotFoundError as e: + return e.to_output() + + with open(os.devnull) as devnull, Pty() as pty: + assert pty.r is not None + kwargs.update({'stdin': devnull, 'stdout': pty.w, 'stderr': pty.w}) + try: + proc = subprocess.Popen(cmd, **kwargs) + except OSError as e: + return _oserror_to_output(e) + + pty.close_w() + + buf = b'' + while True: + try: + bts = os.read(pty.r, 4096) + except OSError as e: + if e.errno == errno.EIO: + bts = b'' + else: + raise + else: + buf += bts + if not bts: + break + + return proc.wait(), buf, None +else: # pragma: no cover + cmd_output_p = cmd_output_b + + +def rmtree(path: str) -> None: + """On windows, rmtree fails for readonly dirs.""" + def handle_remove_readonly( + func: Callable[..., Any], + path: str, + exc: Tuple[Type[OSError], OSError, TracebackType], + ) -> None: + excvalue = exc[1] + if ( + func in (os.rmdir, os.remove, os.unlink) and + excvalue.errno == errno.EACCES + ): + for p in (path, os.path.dirname(path)): + os.chmod(p, os.stat(p).st_mode | stat.S_IWUSR) + func(path) + else: + raise + shutil.rmtree(path, ignore_errors=False, onerror=handle_remove_readonly) + + +def parse_version(s: str) -> Tuple[int, ...]: + """poor man's version comparison""" + return tuple(int(p) for p in s.split('.')) |