summaryrefslogtreecommitdiffstats
path: root/pre_commit/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'pre_commit/util.py')
-rw-r--r--pre_commit/util.py238
1 files changed, 238 insertions, 0 deletions
diff --git a/pre_commit/util.py b/pre_commit/util.py
new file mode 100644
index 0000000..b3682d4
--- /dev/null
+++ b/pre_commit/util.py
@@ -0,0 +1,238 @@
+from __future__ import annotations
+
+import contextlib
+import errno
+import importlib.resources
+import os.path
+import shutil
+import stat
+import subprocess
+import sys
+from collections.abc import Generator
+from types import TracebackType
+from typing import Any
+from typing import Callable
+
+from pre_commit import parse_shebang
+
+
+def force_bytes(exc: Any) -> bytes:
+ with contextlib.suppress(TypeError):
+ return bytes(exc)
+ with contextlib.suppress(Exception):
+ return str(exc).encode()
+ return f'<unprintable {type(exc).__name__} object>'.encode()
+
+
+@contextlib.contextmanager
+def clean_path_on_failure(path: str) -> Generator[None, None, None]:
+ """Cleans up the directory on an exceptional failure."""
+ try:
+ yield
+ except BaseException:
+ if os.path.exists(path):
+ rmtree(path)
+ raise
+
+
+def resource_text(filename: str) -> str:
+ files = importlib.resources.files('pre_commit.resources')
+ return files.joinpath(filename).read_text()
+
+
+def make_executable(filename: str) -> None:
+ original_mode = os.stat(filename).st_mode
+ new_mode = original_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
+ os.chmod(filename, new_mode)
+
+
+class CalledProcessError(RuntimeError):
+ def __init__(
+ self,
+ returncode: int,
+ cmd: tuple[str, ...],
+ stdout: bytes,
+ stderr: bytes | None,
+ ) -> None:
+ super().__init__(returncode, cmd, stdout, stderr)
+ self.returncode = returncode
+ self.cmd = cmd
+ self.stdout = stdout
+ self.stderr = stderr
+
+ def __bytes__(self) -> bytes:
+ def _indent_or_none(part: bytes | None) -> bytes:
+ if part:
+ return b'\n ' + part.replace(b'\n', b'\n ').rstrip()
+ else:
+ return b' (none)'
+
+ return b''.join((
+ f'command: {self.cmd!r}\n'.encode(),
+ f'return code: {self.returncode}\n'.encode(),
+ b'stdout:', _indent_or_none(self.stdout), b'\n',
+ b'stderr:', _indent_or_none(self.stderr),
+ ))
+
+ def __str__(self) -> str:
+ return self.__bytes__().decode()
+
+
+def _setdefault_kwargs(kwargs: dict[str, Any]) -> None:
+ for arg in ('stdin', 'stdout', 'stderr'):
+ kwargs.setdefault(arg, subprocess.PIPE)
+
+
+def _oserror_to_output(e: OSError) -> tuple[int, bytes, None]:
+ return 1, force_bytes(e).rstrip(b'\n') + b'\n', None
+
+
+def cmd_output_b(
+ *cmd: str,
+ check: bool = True,
+ **kwargs: Any,
+) -> tuple[int, bytes, bytes | None]:
+ _setdefault_kwargs(kwargs)
+
+ try:
+ cmd = parse_shebang.normalize_cmd(cmd, env=kwargs.get('env'))
+ except parse_shebang.ExecutableNotFoundError as e:
+ returncode, stdout_b, stderr_b = e.to_output()
+ else:
+ try:
+ proc = subprocess.Popen(cmd, **kwargs)
+ except OSError as e:
+ returncode, stdout_b, stderr_b = _oserror_to_output(e)
+ else:
+ stdout_b, stderr_b = proc.communicate()
+ returncode = proc.returncode
+
+ if check and returncode:
+ raise CalledProcessError(returncode, cmd, stdout_b, stderr_b)
+
+ return returncode, stdout_b, stderr_b
+
+
+def cmd_output(*cmd: str, **kwargs: Any) -> tuple[int, str, str | None]:
+ returncode, stdout_b, stderr_b = cmd_output_b(*cmd, **kwargs)
+ stdout = stdout_b.decode() if stdout_b is not None else None
+ stderr = stderr_b.decode() if stderr_b is not None else None
+ return returncode, stdout, stderr
+
+
+if sys.platform != 'win32': # pragma: win32 no cover
+ from os import openpty
+ import termios
+
+ class Pty:
+ def __init__(self) -> None:
+ self.r: int | None = None
+ self.w: int | None = None
+
+ def __enter__(self) -> Pty:
+ self.r, self.w = openpty()
+
+ # tty flags normally change \n to \r\n
+ attrs = termios.tcgetattr(self.w)
+ assert isinstance(attrs[1], int)
+ attrs[1] &= ~(termios.ONLCR | termios.OPOST)
+ termios.tcsetattr(self.w, termios.TCSANOW, attrs)
+
+ return self
+
+ def close_w(self) -> None:
+ if self.w is not None:
+ os.close(self.w)
+ self.w = None
+
+ def close_r(self) -> None:
+ assert self.r is not None
+ os.close(self.r)
+ self.r = None
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ self.close_w()
+ self.close_r()
+
+ def cmd_output_p(
+ *cmd: str,
+ check: bool = True,
+ **kwargs: Any,
+ ) -> tuple[int, bytes, bytes | None]:
+ assert check is False
+ assert kwargs['stderr'] == subprocess.STDOUT, kwargs['stderr']
+ _setdefault_kwargs(kwargs)
+
+ try:
+ cmd = parse_shebang.normalize_cmd(cmd)
+ except parse_shebang.ExecutableNotFoundError as e:
+ return e.to_output()
+
+ with open(os.devnull) as devnull, Pty() as pty:
+ assert pty.r is not None
+ kwargs.update({'stdin': devnull, 'stdout': pty.w, 'stderr': pty.w})
+ try:
+ proc = subprocess.Popen(cmd, **kwargs)
+ except OSError as e:
+ return _oserror_to_output(e)
+
+ pty.close_w()
+
+ buf = b''
+ while True:
+ try:
+ bts = os.read(pty.r, 4096)
+ except OSError as e:
+ if e.errno == errno.EIO:
+ bts = b''
+ else:
+ raise
+ else:
+ buf += bts
+ if not bts:
+ break
+
+ return proc.wait(), buf, None
+else: # pragma: no cover
+ cmd_output_p = cmd_output_b
+
+
+def _handle_readonly(
+ func: Callable[[str], object],
+ path: str,
+ exc: OSError,
+) -> None:
+ if (
+ func in (os.rmdir, os.remove, os.unlink) and
+ exc.errno in {errno.EACCES, errno.EPERM}
+ ):
+ for p in (path, os.path.dirname(path)):
+ os.chmod(p, os.stat(p).st_mode | stat.S_IWUSR)
+ func(path)
+ else:
+ raise
+
+
+if sys.version_info < (3, 12): # pragma: <3.12 cover
+ def _handle_readonly_old(
+ func: Callable[[str], object],
+ path: str,
+ excinfo: tuple[type[OSError], OSError, TracebackType],
+ ) -> None:
+ return _handle_readonly(func, path, excinfo[1])
+
+ def rmtree(path: str) -> None:
+ shutil.rmtree(path, ignore_errors=False, onerror=_handle_readonly_old)
+else: # pragma: >=3.12 cover
+ def rmtree(path: str) -> None:
+ """On windows, rmtree fails for readonly dirs."""
+ shutil.rmtree(path, ignore_errors=False, onexc=_handle_readonly)
+
+
+def win_exe(s: str) -> str:
+ return s if sys.platform != 'win32' else f'{s}.exe'