summaryrefslogtreecommitdiffstats
path: root/src/debputy/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/util.py')
-rw-r--r--src/debputy/util.py804
1 files changed, 804 insertions, 0 deletions
diff --git a/src/debputy/util.py b/src/debputy/util.py
new file mode 100644
index 0000000..4da2772
--- /dev/null
+++ b/src/debputy/util.py
@@ -0,0 +1,804 @@
+import argparse
+import collections
+import functools
+import glob
+import logging
+import os
+import re
+import shutil
+import subprocess
+import sys
+import time
+from itertools import zip_longest
+from pathlib import Path
+from typing import (
+ NoReturn,
+ TYPE_CHECKING,
+ Union,
+ Set,
+ FrozenSet,
+ Optional,
+ TypeVar,
+ Dict,
+ Iterator,
+ Iterable,
+ Literal,
+ Tuple,
+ Sequence,
+ List,
+ Mapping,
+ Any,
+)
+
+from debian.deb822 import Deb822
+
+from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
+from debputy.exceptions import DebputySubstitutionError
+
+if TYPE_CHECKING:
+ from debputy.packages import BinaryPackage
+ from debputy.substitution import Substitution
+
+
+T = TypeVar("T")
+
+
+SLASH_PRUNE = re.compile("//+")
+PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
+PKGVERSION_REGEX = re.compile(
+ r"""
+ (?: \d+ : )? # Optional epoch
+ \d[0-9A-Za-z.+:~]* # Upstream version (with no hyphens)
+ (?: - [0-9A-Za-z.+:~]+ )* # Optional debian revision (+ upstreams versions with hyphens)
+""",
+ re.VERBOSE | re.ASCII,
+)
+DEFAULT_PACKAGE_TYPE = "deb"
+DBGSYM_PACKAGE_TYPE = "deb"
+UDEB_PACKAGE_TYPE = "udeb"
+
+POSTINST_DEFAULT_CONDITION = (
+ '[ "$1" = "configure" ]'
+ ' || [ "$1" = "abort-upgrade" ]'
+ ' || [ "$1" = "abort-deconfigure" ]'
+ ' || [ "$1" = "abort-remove" ]'
+)
+
+
+_SPACE_RE = re.compile(r"\s")
+_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
+_REGULAR_ESCAPEES = re.compile(r'([\s!"$()*+#;<>?@\[\]\\`|~])')
+_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
+_DEFAULT_LOGGER: Optional[logging.Logger] = None
+_STDOUT_HANDLER: Optional[logging.StreamHandler] = None
+_STDERR_HANDLER: Optional[logging.StreamHandler] = None
+
+
+def assume_not_none(x: Optional[T]) -> T:
+ if x is None: # pragma: no cover
+ raise ValueError(
+ 'Internal error: None was given, but the receiver assumed "not None" here'
+ )
+ return x
+
+
+def _info(msg: str) -> None:
+ global _DEFAULT_LOGGER
+ logger = _DEFAULT_LOGGER
+ if logger:
+ logger.info(msg)
+ # No fallback print for info
+
+
+def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn":
+ global _DEFAULT_LOGGER
+ logger = _DEFAULT_LOGGER
+ if logger:
+ logger.error(msg)
+ else:
+ me = os.path.basename(sys.argv[0]) if prog is None else prog
+ print(
+ f"{me}: error: {msg}",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+
+def _warn(msg: str, *, prog: Optional[str] = None) -> None:
+ global _DEFAULT_LOGGER
+ logger = _DEFAULT_LOGGER
+ if logger:
+ logger.warning(msg)
+ else:
+ me = os.path.basename(sys.argv[0]) if prog is None else prog
+
+ print(
+ f"{me}: warning: {msg}",
+ file=sys.stderr,
+ )
+
+
+class ColorizedArgumentParser(argparse.ArgumentParser):
+ def error(self, message: str) -> NoReturn:
+ self.print_usage(sys.stderr)
+ _error(message, prog=self.prog)
+
+
+def ensure_dir(path: str) -> None:
+ if not os.path.isdir(path):
+ os.makedirs(path, mode=0o755, exist_ok=True)
+
+
+def _clean_path(orig_p: str) -> str:
+ p = SLASH_PRUNE.sub("/", orig_p)
+ if "." in p:
+ path_base = p
+ # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
+ # of a normalized path to be a no-op.
+ if path_base.startswith("./"):
+ path_base = path_base[2:]
+ assert path_base
+ for segment in path_base.split("/"):
+ if segment in (".", ".."):
+ raise ValueError(
+ 'Please provide paths that are normalized (i.e., no ".." or ".").'
+ f' Offending input "{orig_p}"'
+ )
+ return p
+
+
+def _normalize_path(path: str, with_prefix: bool = True) -> str:
+ path = path.strip("/")
+ if not path or path == ".":
+ return "."
+ if "//" in path or "." in path:
+ path = _clean_path(path)
+ if with_prefix ^ path.startswith("./"):
+ if with_prefix:
+ path = "./" + path
+ else:
+ path = path[2:]
+ return path
+
+
+def _normalize_link_target(link_target: str) -> str:
+ link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
+ result: List[str] = []
+ for segment in link_target.split("/"):
+ if segment in (".", ""):
+ # Ignore these - the empty string is generally a trailing slash
+ continue
+ if segment == "..":
+ # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
+ if result:
+ result.pop()
+ else:
+ result.append(segment)
+ return "/".join(result)
+
+
+def _backslash_escape(m: re.Match[str]) -> str:
+ return "\\" + m.group(0)
+
+
+def _escape_shell_word(w: str) -> str:
+ if _SPACE_RE.match(w):
+ w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
+ return f'"{w}"'
+ return _REGULAR_ESCAPEES.sub(_backslash_escape, w)
+
+
+def escape_shell(*args: str) -> str:
+ return " ".join(_escape_shell_word(w) for w in args)
+
+
+def print_command(*args: str) -> None:
+ print(f" {escape_shell(*args)}")
+
+
+def debian_policy_normalize_symlink_target(
+ link_path: str,
+ link_target: str,
+ normalize_link_path: bool = False,
+) -> str:
+ if normalize_link_path:
+ link_path = _normalize_path(link_path)
+ elif not link_path.startswith("./"):
+ raise ValueError("Link part was not normalized")
+
+ link_path = link_path[2:]
+
+ if not link_target.startswith("/"):
+ link_target = "/" + os.path.dirname(link_path) + "/" + link_target
+
+ link_path_parts = link_path.split("/")
+ link_target_parts = [
+ s for s in _normalize_link_target(link_target).split("/") if s != "."
+ ]
+
+ assert link_path_parts
+
+ if link_target_parts and link_path_parts[0] == link_target_parts[0]:
+ # Per Debian Policy, must be relative
+
+ # First determine the length of the overlap
+ common_segment_count = 1
+ shortest_path_length = min(len(link_target_parts), len(link_path_parts))
+ while (
+ common_segment_count < shortest_path_length
+ and link_target_parts[common_segment_count]
+ == link_path_parts[common_segment_count]
+ ):
+ common_segment_count += 1
+
+ if common_segment_count == shortest_path_length and len(
+ link_path_parts
+ ) - 1 == len(link_target_parts):
+ normalized_link_target = "."
+ else:
+ up_dir_count = len(link_path_parts) - 1 - common_segment_count
+ normalized_link_target_parts = []
+ if up_dir_count:
+ up_dir_part = "../" * up_dir_count
+ # We overshoot with a single '/', so rstrip it away
+ normalized_link_target_parts.append(up_dir_part.rstrip("/"))
+ # Add the relevant down parts
+ normalized_link_target_parts.extend(
+ link_target_parts[common_segment_count:]
+ )
+
+ normalized_link_target = "/".join(normalized_link_target_parts)
+ else:
+ # Per Debian Policy, must be absolute
+ normalized_link_target = "/" + "/".join(link_target_parts)
+
+ return normalized_link_target
+
+
+def has_glob_magic(pattern: str) -> bool:
+ return glob.has_magic(pattern) or "{" in pattern
+
+
+def glob_escape(replacement_value: str) -> str:
+ if not glob.has_magic(replacement_value) or "{" not in replacement_value:
+ return replacement_value
+ return (
+ replacement_value.replace("[", "[[]")
+ .replace("]", "[]]")
+ .replace("*", "[*]")
+ .replace("?", "[?]")
+ .replace("{", "[{]")
+ .replace("}", "[}]")
+ )
+
+
+# TODO: This logic should probably be moved to `python-debian`
+def active_profiles_match(
+ profiles_raw: str,
+ active_build_profiles: Union[Set[str], FrozenSet[str]],
+) -> bool:
+ profiles_raw = profiles_raw.strip()
+ if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>":
+ raise ValueError(
+ 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
+ )
+ profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
+ for profile_group_raw in profile_groups:
+ should_process_package = True
+ for profile_name in profile_group_raw.split():
+ negation = False
+ if profile_name[0] == "!":
+ negation = True
+ profile_name = profile_name[1:]
+
+ matched_profile = profile_name in active_build_profiles
+ if matched_profile == negation:
+ should_process_package = False
+ break
+
+ if should_process_package:
+ return True
+
+ return False
+
+
+def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]:
+ profiles_raw = build_profiles_raw.strip()
+ if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>":
+ raise ValueError(
+ 'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
+ )
+ profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
+ return frozenset(frozenset(g.split()) for g in profile_groups)
+
+
+def resolve_source_date_epoch(
+ command_line_value: Optional[int],
+ *,
+ substitution: Optional["Substitution"] = None,
+) -> int:
+ mtime = command_line_value
+ if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
+ sde_raw = os.environ["SOURCE_DATE_EPOCH"]
+ if sde_raw == "":
+ _error("SOURCE_DATE_EPOCH is set but empty.")
+ mtime = int(sde_raw)
+ if mtime is None and substitution is not None:
+ try:
+ sde_raw = substitution.substitute(
+ "{{SOURCE_DATE_EPOCH}}",
+ "Internal resolution",
+ )
+ mtime = int(sde_raw)
+ except (DebputySubstitutionError, ValueError):
+ pass
+ if mtime is None:
+ mtime = int(time.time())
+ os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
+ return mtime
+
+
+def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
+ with open(os.path.join(control_root_dir, "control"), "rt") as fd:
+ control_file = Deb822(fd)
+
+ package_name = control_file["Package"]
+ package_version = control_file["Version"]
+ package_architecture = control_file["Architecture"]
+ extension = control_file.get("Package-Type") or "deb"
+ if ":" in package_version:
+ package_version = package_version.split(":", 1)[1]
+ if is_udeb:
+ extension = "udeb"
+
+ return f"{package_name}_{package_version}_{package_architecture}.{extension}"
+
+
+_SCRATCH_DIR = None
+_DH_INTEGRATION_MODE = False
+
+
+def integrated_with_debhelper() -> None:
+ global _DH_INTEGRATION_MODE
+ _DH_INTEGRATION_MODE = True
+
+
+def scratch_dir() -> str:
+ global _SCRATCH_DIR
+ if _SCRATCH_DIR is not None:
+ return _SCRATCH_DIR
+ debputy_scratch_dir = "debian/.debputy/scratch-dir"
+ is_debputy_dir = True
+ if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE:
+ _SCRATCH_DIR = debputy_scratch_dir
+ elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE:
+ _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
+ is_debputy_dir = False
+ else:
+ _SCRATCH_DIR = debputy_scratch_dir
+ ensure_dir(_SCRATCH_DIR)
+ if is_debputy_dir:
+ Path("debian/.debputy/.gitignore").write_text("*\n")
+ return _SCRATCH_DIR
+
+
+_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None
+
+
+def generated_content_dir(
+ *,
+ package: Optional["BinaryPackage"] = None,
+ subdir_key: Optional[str] = None,
+) -> str:
+ global _RUNTIME_CONTAINER_DIR_KEY
+ container_dir = _RUNTIME_CONTAINER_DIR_KEY
+ first_run = False
+
+ if container_dir is None:
+ first_run = True
+ container_dir = f"_pb-{os.getpid()}"
+ _RUNTIME_CONTAINER_DIR_KEY = container_dir
+
+ directory = os.path.join(scratch_dir(), container_dir)
+
+ if first_run and os.path.isdir(directory):
+ # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
+ # see "stale" data.
+ # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
+ # reliable enough for that and we do not have an obvious hook for it.
+ shutil.rmtree(directory)
+
+ directory = os.path.join(
+ directory,
+ "generated-fs-content",
+ f"pkg_{package.name}" if package else "no-package",
+ )
+ if subdir_key is not None:
+ directory = os.path.join(directory, subdir_key)
+
+ os.makedirs(directory, exist_ok=True)
+ return directory
+
+
+PerlIncDir = collections.namedtuple("PerlIncDir", ["vendorlib", "vendorarch"])
+PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
+_PERL_MODULE_DIRS: Dict[str, PerlIncDir] = {}
+
+
+@functools.lru_cache(1)
+def _perl_config_data() -> PerlConfigData:
+ d = (
+ subprocess.check_output(
+ [
+ "perl",
+ "-MConfig",
+ "-e",
+ 'print "$Config{version}\n$Config{debian_abi}\n"',
+ ]
+ )
+ .decode("utf-8")
+ .splitlines()
+ )
+ return PerlConfigData(*d)
+
+
+def _perl_version() -> str:
+ return _perl_config_data().version
+
+
+def perlxs_api_dependency() -> str:
+ # dh_perl used the build version of perl for this, so we will too. Most of the perl cross logic
+ # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
+ config = _perl_config_data()
+ if config.debian_abi is not None and config.debian_abi != "":
+ return f"perlapi-{config.debian_abi}"
+ return f"perlapi-{config.version}"
+
+
+def perl_module_dirs(
+ dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
+ dctrl_bin: "BinaryPackage",
+) -> PerlIncDir:
+ global _PERL_MODULE_DIRS
+ arch = (
+ dctrl_bin.resolved_architecture
+ if dpkg_architecture_variables.is_cross_compiling
+ else "_default_"
+ )
+ module_dir = _PERL_MODULE_DIRS.get(arch)
+ if module_dir is None:
+ cmd = ["perl"]
+ if dpkg_architecture_variables.is_cross_compiling:
+ version = _perl_version()
+ inc_dir = f"/usr/lib/{dctrl_bin.deb_multiarch}/perl/cross-config-{version}"
+ # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
+ # for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
+ # cross builds... meh.
+ if os.path.exists(os.path.join(inc_dir, "Config.pm")):
+ cmd.append(f"-I{inc_dir}")
+ cmd.extend(
+ ["-MConfig", "-e", 'print "$Config{vendorlib}\n$Config{vendorarch}\n"']
+ )
+ output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
+ if len(output) != 2:
+ raise ValueError(
+ "Internal error: Unable to determine the perl include directories:"
+ f" Raw output from perl snippet: {output}"
+ )
+ module_dir = PerlIncDir(
+ vendorlib=_normalize_path(output[0]),
+ vendorarch=_normalize_path(output[1]),
+ )
+ _PERL_MODULE_DIRS[arch] = module_dir
+ return module_dir
+
+
+@functools.lru_cache(1)
+def detect_fakeroot() -> bool:
+ if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
+ return False
+ env = dict(os.environ)
+ del env["LD_PRELOAD"]
+ try:
+ return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
+ except subprocess.CalledProcessError:
+ print(
+ 'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
+ file=sys.stderr,
+ )
+ return False
+
+
+@functools.lru_cache(1)
+def _sc_arg_max() -> Optional[int]:
+ try:
+ return os.sysconf("SC_ARG_MAX")
+ except RuntimeError:
+ _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
+ return None
+
+
+def _split_xargs_args(
+ static_cmd: Sequence[str],
+ max_args_byte_len: int,
+ varargs: Iterable[str],
+ reuse_list_ok: bool,
+) -> Iterator[List[str]]:
+ static_cmd_len = len(static_cmd)
+ remaining_len = max_args_byte_len
+ pending_args = list(static_cmd)
+ for arg in varargs:
+ arg_len = len(arg.encode("utf-8")) + 1 # +1 for leading space
+ remaining_len -= arg_len
+ if not remaining_len:
+ if len(pending_args) <= static_cmd_len:
+ raise ValueError(
+ f"Could not fit a single argument into the command line !?"
+ f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
+ )
+ yield pending_args
+ remaining_len = max_args_byte_len - arg_len
+ if reuse_list_ok:
+ pending_args.clear()
+ pending_args.extend(static_cmd)
+ else:
+ pending_args = list(static_cmd)
+ pending_args.append(arg)
+
+ if len(pending_args) > static_cmd_len:
+ yield pending_args
+
+
+def xargs(
+ static_cmd: Sequence[str],
+ varargs: Iterable[str],
+ *,
+ env: Optional[Mapping[str, str]] = None,
+ reuse_list_ok: bool = False,
+) -> Iterator[List[str]]:
+ max_args_bytes = _sc_arg_max()
+ # len overshoots with one space explaining the -1. The _split_xargs_args
+ # will account for the space for the first argument
+ static_byte_len = (
+ len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
+ )
+ if max_args_bytes is not None:
+ if env is None:
+ # +2 for nul bytes after key and value
+ static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
+ else:
+ # +2 for nul bytes after key and value
+ static_byte_len += sum(
+ len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
+ for k, v in env.items()
+ )
+ # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
+ # that)
+ static_byte_len += 2 * 4096
+ else:
+ # The 20 000 limit is from debhelper, and it did not account for environment. So neither will we here.
+ max_args_bytes = 20_000
+ remain_len = max_args_bytes - static_byte_len
+ yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)
+
+
+# itertools recipe
+def grouper(
+ iterable: Iterable[T],
+ n: int,
+ *,
+ incomplete: Literal["fill", "strict", "ignore"] = "fill",
+ fillvalue: Optional[T] = None,
+) -> Iterator[Tuple[T, ...]]:
+ """Collect data into non-overlapping fixed-length chunks or blocks"""
+ # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
+ # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
+ # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
+ args = [iter(iterable)] * n
+ if incomplete == "fill":
+ return zip_longest(*args, fillvalue=fillvalue)
+ if incomplete == "strict":
+ return zip(*args, strict=True)
+ if incomplete == "ignore":
+ return zip(*args)
+ else:
+ raise ValueError("Expected fill, strict, or ignore")
+
+
+_LOGGING_SET_UP = False
+
+
+def _check_color() -> Tuple[bool, bool, Optional[str]]:
+ dpkg_or_default = os.environ.get(
+ "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
+ )
+ requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
+ bad_request = None
+ if requested_color not in {"auto", "always", "never"}:
+ bad_request = requested_color
+ requested_color = "auto"
+
+ if requested_color == "auto":
+ stdout_color = sys.stdout.isatty()
+ stderr_color = sys.stdout.isatty()
+ else:
+ enable = requested_color == "always"
+ stdout_color = enable
+ stderr_color = enable
+ return stdout_color, stderr_color, bad_request
+
+
+def program_name() -> str:
+ name = os.path.basename(sys.argv[0])
+ if name.endswith(".py"):
+ name = name[:-3]
+ if name == "__main__":
+ name = os.path.basename(os.path.dirname(sys.argv[0]))
+ # FIXME: Not optimal that we have to hardcode these kind of things here
+ if name == "debputy_cmd":
+ name = "debputy"
+ return name
+
+
+def package_cross_check_precheck(
+ pkg_a: "BinaryPackage",
+ pkg_b: "BinaryPackage",
+) -> Tuple[bool, bool]:
+ """Whether these two packages can do content cross-checks
+
+ :param pkg_a: The first package
+ :param pkg_b: The second package
+ :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
+ that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
+ that involves binary_package_a. Both can be True and both can be False at the same time, which
+ happens in common cases (arch:all + arch:any cases both to be False as a common example).
+ """
+
+ # Handle the two most obvious base-cases
+ if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
+ return False, False
+ if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
+ return False, False
+
+ a_may_see_b = True
+ b_may_see_a = True
+
+ a_bp = pkg_a.fields.get("Build-Profiles", "")
+ b_bp = pkg_b.fields.get("Build-Profiles", "")
+
+ if a_bp != b_bp:
+ a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
+ b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()
+
+ # Check for build profiles being identically but just ordered differently.
+ if a_bp_set != b_bp_set:
+ # For simplicity, we let groups cancel each other out. If one side has no clauses
+ # left, then it will always be built when the other is built.
+ #
+ # Eventually, someone will be here with a special case where more complex logic is
+ # required. Good luck to you! Remember to add test cases for it (the existing logic
+ # has some for a reason and if the logic is going to be more complex, it will need
+ # tests cases to assert it fixes the problem and does not regress)
+ if a_bp_set - b_bp_set:
+ a_may_see_b = False
+ if b_bp_set - a_bp_set:
+ b_may_see_a = False
+
+ if pkg_a.declared_architecture != pkg_b.declared_architecture:
+ # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
+ if pkg_a.declared_architecture != "any":
+ b_may_see_a = False
+ if pkg_a.declared_architecture != "any":
+ a_may_see_b = False
+
+ return a_may_see_b, b_may_see_a
+
+
+def setup_logging(
+ *, log_only_to_stderr: bool = False, reconfigure_logging: bool = False
+) -> None:
+ global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
+ if _LOGGING_SET_UP and not reconfigure_logging:
+ raise RuntimeError(
+ "Logging has already been configured."
+ " Use reconfigure_logging=True if you need to reconfigure it"
+ )
+ stdout_color, stderr_color, bad_request = _check_color()
+
+ if stdout_color or stderr_color:
+ try:
+ import colorlog
+ except ImportError:
+ stdout_color = False
+ stderr_color = False
+
+ if log_only_to_stderr:
+ stdout = sys.stderr
+ stdout_color = stderr_color
+ else:
+ stdout = sys.stderr
+
+ class LogLevelFilter(logging.Filter):
+ def __init__(self, threshold: int, above: bool):
+ super().__init__()
+ self.threshold = threshold
+ self.above = above
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ if self.above:
+ return record.levelno >= self.threshold
+ else:
+ return record.levelno < self.threshold
+
+ color_format = (
+ "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
+ )
+ colorless_format = "{name}: {levelnamelower}: {message}"
+
+ existing_stdout_handler = _STDOUT_HANDLER
+ existing_stderr_handler = _STDERR_HANDLER
+
+ if stdout_color:
+ stdout_handler = colorlog.StreamHandler(stdout)
+ stdout_handler.setFormatter(
+ colorlog.ColoredFormatter(color_format, style="{", force_color=True)
+ )
+ logger = colorlog.getLogger()
+ if existing_stdout_handler is not None:
+ logger.removeHandler(existing_stdout_handler)
+ _STDOUT_HANDLER = stdout_handler
+ logger.addHandler(stdout_handler)
+ else:
+ stdout_handler = logging.StreamHandler(stdout)
+ stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
+ logger = logging.getLogger()
+ if existing_stdout_handler is not None:
+ logger.removeHandler(existing_stdout_handler)
+ _STDOUT_HANDLER = stdout_handler
+ logger.addHandler(stdout_handler)
+
+ if stderr_color:
+ stderr_handler = colorlog.StreamHandler(sys.stderr)
+ stderr_handler.setFormatter(
+ colorlog.ColoredFormatter(color_format, style="{", force_color=True)
+ )
+ logger = logging.getLogger()
+ if existing_stdout_handler is not None:
+ logger.removeHandler(existing_stderr_handler)
+ _STDERR_HANDLER = stderr_handler
+ logger.addHandler(stderr_handler)
+ else:
+ stderr_handler = logging.StreamHandler(sys.stderr)
+ stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
+ logger = logging.getLogger()
+ if existing_stdout_handler is not None:
+ logger.removeHandler(existing_stderr_handler)
+ _STDERR_HANDLER = stderr_handler
+ logger.addHandler(stderr_handler)
+
+ stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
+ stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))
+
+ name = program_name()
+
+ old_factory = logging.getLogRecordFactory()
+
+ def record_factory(
+ *args: Any, **kwargs: Any
+ ) -> logging.LogRecord: # pragma: no cover
+ record = old_factory(*args, **kwargs)
+ record.levelnamelower = record.levelname.lower()
+ return record
+
+ logging.setLogRecordFactory(record_factory)
+
+ logging.getLogger().setLevel(logging.INFO)
+ _DEFAULT_LOGGER = logging.getLogger(name)
+
+ if bad_request:
+ _DEFAULT_LOGGER.warning(
+ f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
+ ' Resetting to "auto".'
+ )
+
+ _LOGGING_SET_UP = True