summaryrefslogtreecommitdiffstats
path: root/src/debputy/util.py
blob: 4da2772d3d93f14f837a2f5ea4a2382042cf2f5b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
import argparse
import collections
import functools
import glob
import logging
import os
import re
import shutil
import subprocess
import sys
import time
from itertools import zip_longest
from pathlib import Path
from typing import (
    NoReturn,
    TYPE_CHECKING,
    Union,
    Set,
    FrozenSet,
    Optional,
    TypeVar,
    Dict,
    Iterator,
    Iterable,
    Literal,
    Tuple,
    Sequence,
    List,
    Mapping,
    Any,
)

from debian.deb822 import Deb822

from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
from debputy.exceptions import DebputySubstitutionError

if TYPE_CHECKING:
    from debputy.packages import BinaryPackage
    from debputy.substitution import Substitution


T = TypeVar("T")


SLASH_PRUNE = re.compile("//+")
PKGNAME_REGEX = re.compile(r"[a-z0-9][-+.a-z0-9]+", re.ASCII)
PKGVERSION_REGEX = re.compile(
    r"""
                 (?: \d+ : )?                # Optional epoch
                 \d[0-9A-Za-z.+:~]*          # Upstream version (with no hyphens)
                 (?: - [0-9A-Za-z.+:~]+ )*   # Optional debian revision (+ upstreams versions with hyphens)
""",
    re.VERBOSE | re.ASCII,
)
DEFAULT_PACKAGE_TYPE = "deb"
DBGSYM_PACKAGE_TYPE = "deb"
UDEB_PACKAGE_TYPE = "udeb"

POSTINST_DEFAULT_CONDITION = (
    '[ "$1" = "configure" ]'
    ' || [ "$1" = "abort-upgrade" ]'
    ' || [ "$1" = "abort-deconfigure" ]'
    ' || [ "$1" = "abort-remove" ]'
)


_SPACE_RE = re.compile(r"\s")
_DOUBLE_ESCAPEES = re.compile(r'([\n`$"\\])')
_REGULAR_ESCAPEES = re.compile(r'([\s!"$()*+#;<>?@\[\]\\`|~])')
_PROFILE_GROUP_SPLIT = re.compile(r">\s+<")
_DEFAULT_LOGGER: Optional[logging.Logger] = None
_STDOUT_HANDLER: Optional[logging.StreamHandler] = None
_STDERR_HANDLER: Optional[logging.StreamHandler] = None


def assume_not_none(x: Optional[T]) -> T:
    if x is None:  # pragma: no cover
        raise ValueError(
            'Internal error: None was given, but the receiver assumed "not None" here'
        )
    return x


def _info(msg: str) -> None:
    global _DEFAULT_LOGGER
    logger = _DEFAULT_LOGGER
    if logger:
        logger.info(msg)
    # No fallback print for info


def _error(msg: str, *, prog: Optional[str] = None) -> "NoReturn":
    global _DEFAULT_LOGGER
    logger = _DEFAULT_LOGGER
    if logger:
        logger.error(msg)
    else:
        me = os.path.basename(sys.argv[0]) if prog is None else prog
        print(
            f"{me}: error: {msg}",
            file=sys.stderr,
        )
    sys.exit(1)


def _warn(msg: str, *, prog: Optional[str] = None) -> None:
    global _DEFAULT_LOGGER
    logger = _DEFAULT_LOGGER
    if logger:
        logger.warning(msg)
    else:
        me = os.path.basename(sys.argv[0]) if prog is None else prog

        print(
            f"{me}: warning: {msg}",
            file=sys.stderr,
        )


class ColorizedArgumentParser(argparse.ArgumentParser):
    def error(self, message: str) -> NoReturn:
        self.print_usage(sys.stderr)
        _error(message, prog=self.prog)


def ensure_dir(path: str) -> None:
    if not os.path.isdir(path):
        os.makedirs(path, mode=0o755, exist_ok=True)


def _clean_path(orig_p: str) -> str:
    p = SLASH_PRUNE.sub("/", orig_p)
    if "." in p:
        path_base = p
        # We permit a single leading "./" because we add that when we normalize a path, and we want normalization
        # of a normalized path to be a no-op.
        if path_base.startswith("./"):
            path_base = path_base[2:]
            assert path_base
        for segment in path_base.split("/"):
            if segment in (".", ".."):
                raise ValueError(
                    'Please provide paths that are normalized (i.e., no ".." or ".").'
                    f' Offending input "{orig_p}"'
                )
    return p


def _normalize_path(path: str, with_prefix: bool = True) -> str:
    path = path.strip("/")
    if not path or path == ".":
        return "."
    if "//" in path or "." in path:
        path = _clean_path(path)
    if with_prefix ^ path.startswith("./"):
        if with_prefix:
            path = "./" + path
        else:
            path = path[2:]
    return path


def _normalize_link_target(link_target: str) -> str:
    link_target = SLASH_PRUNE.sub("/", link_target.lstrip("/"))
    result: List[str] = []
    for segment in link_target.split("/"):
        if segment in (".", ""):
            # Ignore these - the empty string is generally a trailing slash
            continue
        if segment == "..":
            # We ignore "root escape attempts" like the OS would (mapping /.. -> /)
            if result:
                result.pop()
        else:
            result.append(segment)
    return "/".join(result)


def _backslash_escape(m: re.Match[str]) -> str:
    return "\\" + m.group(0)


def _escape_shell_word(w: str) -> str:
    if _SPACE_RE.match(w):
        w = _DOUBLE_ESCAPEES.sub(_backslash_escape, w)
        return f'"{w}"'
    return _REGULAR_ESCAPEES.sub(_backslash_escape, w)


def escape_shell(*args: str) -> str:
    return " ".join(_escape_shell_word(w) for w in args)


def print_command(*args: str) -> None:
    print(f"   {escape_shell(*args)}")


def debian_policy_normalize_symlink_target(
    link_path: str,
    link_target: str,
    normalize_link_path: bool = False,
) -> str:
    if normalize_link_path:
        link_path = _normalize_path(link_path)
    elif not link_path.startswith("./"):
        raise ValueError("Link part was not normalized")

    link_path = link_path[2:]

    if not link_target.startswith("/"):
        link_target = "/" + os.path.dirname(link_path) + "/" + link_target

    link_path_parts = link_path.split("/")
    link_target_parts = [
        s for s in _normalize_link_target(link_target).split("/") if s != "."
    ]

    assert link_path_parts

    if link_target_parts and link_path_parts[0] == link_target_parts[0]:
        # Per Debian Policy, must be relative

        # First determine the length of the overlap
        common_segment_count = 1
        shortest_path_length = min(len(link_target_parts), len(link_path_parts))
        while (
            common_segment_count < shortest_path_length
            and link_target_parts[common_segment_count]
            == link_path_parts[common_segment_count]
        ):
            common_segment_count += 1

        if common_segment_count == shortest_path_length and len(
            link_path_parts
        ) - 1 == len(link_target_parts):
            normalized_link_target = "."
        else:
            up_dir_count = len(link_path_parts) - 1 - common_segment_count
            normalized_link_target_parts = []
            if up_dir_count:
                up_dir_part = "../" * up_dir_count
                # We overshoot with a single '/', so rstrip it away
                normalized_link_target_parts.append(up_dir_part.rstrip("/"))
            # Add the relevant down parts
            normalized_link_target_parts.extend(
                link_target_parts[common_segment_count:]
            )

            normalized_link_target = "/".join(normalized_link_target_parts)
    else:
        # Per Debian Policy, must be absolute
        normalized_link_target = "/" + "/".join(link_target_parts)

    return normalized_link_target


def has_glob_magic(pattern: str) -> bool:
    return glob.has_magic(pattern) or "{" in pattern


def glob_escape(replacement_value: str) -> str:
    if not glob.has_magic(replacement_value) or "{" not in replacement_value:
        return replacement_value
    return (
        replacement_value.replace("[", "[[]")
        .replace("]", "[]]")
        .replace("*", "[*]")
        .replace("?", "[?]")
        .replace("{", "[{]")
        .replace("}", "[}]")
    )


# TODO: This logic should probably be moved to `python-debian`
def active_profiles_match(
    profiles_raw: str,
    active_build_profiles: Union[Set[str], FrozenSet[str]],
) -> bool:
    profiles_raw = profiles_raw.strip()
    if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>":
        raise ValueError(
            'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
        )
    profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
    for profile_group_raw in profile_groups:
        should_process_package = True
        for profile_name in profile_group_raw.split():
            negation = False
            if profile_name[0] == "!":
                negation = True
                profile_name = profile_name[1:]

            matched_profile = profile_name in active_build_profiles
            if matched_profile == negation:
                should_process_package = False
                break

        if should_process_package:
            return True

    return False


def _parse_build_profiles(build_profiles_raw: str) -> FrozenSet[FrozenSet[str]]:
    profiles_raw = build_profiles_raw.strip()
    if profiles_raw[0] != "<" or profiles_raw[-1] != ">" or profiles_raw == "<>":
        raise ValueError(
            'Invalid Build-Profiles: Must start start and end with "<" + ">" but cannot be a literal "<>"'
        )
    profile_groups = _PROFILE_GROUP_SPLIT.split(profiles_raw[1:-1])
    return frozenset(frozenset(g.split()) for g in profile_groups)


def resolve_source_date_epoch(
    command_line_value: Optional[int],
    *,
    substitution: Optional["Substitution"] = None,
) -> int:
    mtime = command_line_value
    if mtime is None and "SOURCE_DATE_EPOCH" in os.environ:
        sde_raw = os.environ["SOURCE_DATE_EPOCH"]
        if sde_raw == "":
            _error("SOURCE_DATE_EPOCH is set but empty.")
        mtime = int(sde_raw)
    if mtime is None and substitution is not None:
        try:
            sde_raw = substitution.substitute(
                "{{SOURCE_DATE_EPOCH}}",
                "Internal resolution",
            )
            mtime = int(sde_raw)
        except (DebputySubstitutionError, ValueError):
            pass
    if mtime is None:
        mtime = int(time.time())
    os.environ["SOURCE_DATE_EPOCH"] = str(mtime)
    return mtime


def compute_output_filename(control_root_dir: str, is_udeb: bool) -> str:
    with open(os.path.join(control_root_dir, "control"), "rt") as fd:
        control_file = Deb822(fd)

    package_name = control_file["Package"]
    package_version = control_file["Version"]
    package_architecture = control_file["Architecture"]
    extension = control_file.get("Package-Type") or "deb"
    if ":" in package_version:
        package_version = package_version.split(":", 1)[1]
    if is_udeb:
        extension = "udeb"

    return f"{package_name}_{package_version}_{package_architecture}.{extension}"


_SCRATCH_DIR = None
_DH_INTEGRATION_MODE = False


def integrated_with_debhelper() -> None:
    global _DH_INTEGRATION_MODE
    _DH_INTEGRATION_MODE = True


def scratch_dir() -> str:
    global _SCRATCH_DIR
    if _SCRATCH_DIR is not None:
        return _SCRATCH_DIR
    debputy_scratch_dir = "debian/.debputy/scratch-dir"
    is_debputy_dir = True
    if os.path.isdir("debian/.debputy") and not _DH_INTEGRATION_MODE:
        _SCRATCH_DIR = debputy_scratch_dir
    elif os.path.isdir("debian/.debhelper") or _DH_INTEGRATION_MODE:
        _SCRATCH_DIR = "debian/.debhelper/_debputy/scratch-dir"
        is_debputy_dir = False
    else:
        _SCRATCH_DIR = debputy_scratch_dir
    ensure_dir(_SCRATCH_DIR)
    if is_debputy_dir:
        Path("debian/.debputy/.gitignore").write_text("*\n")
    return _SCRATCH_DIR


_RUNTIME_CONTAINER_DIR_KEY: Optional[str] = None


def generated_content_dir(
    *,
    package: Optional["BinaryPackage"] = None,
    subdir_key: Optional[str] = None,
) -> str:
    global _RUNTIME_CONTAINER_DIR_KEY
    container_dir = _RUNTIME_CONTAINER_DIR_KEY
    first_run = False

    if container_dir is None:
        first_run = True
        container_dir = f"_pb-{os.getpid()}"
        _RUNTIME_CONTAINER_DIR_KEY = container_dir

    directory = os.path.join(scratch_dir(), container_dir)

    if first_run and os.path.isdir(directory):
        # In the unlikely case there is a re-run with exactly the same pid, `debputy` should not
        # see "stale" data.
        # TODO: Ideally, we would always clean up this directory on failure, but `atexit` is not
        #  reliable enough for that and we do not have an obvious hook for it.
        shutil.rmtree(directory)

    directory = os.path.join(
        directory,
        "generated-fs-content",
        f"pkg_{package.name}" if package else "no-package",
    )
    if subdir_key is not None:
        directory = os.path.join(directory, subdir_key)

    os.makedirs(directory, exist_ok=True)
    return directory


PerlIncDir = collections.namedtuple("PerlIncDir", ["vendorlib", "vendorarch"])
PerlConfigData = collections.namedtuple("PerlConfigData", ["version", "debian_abi"])
_PERL_MODULE_DIRS: Dict[str, PerlIncDir] = {}


@functools.lru_cache(1)
def _perl_config_data() -> PerlConfigData:
    d = (
        subprocess.check_output(
            [
                "perl",
                "-MConfig",
                "-e",
                'print "$Config{version}\n$Config{debian_abi}\n"',
            ]
        )
        .decode("utf-8")
        .splitlines()
    )
    return PerlConfigData(*d)


def _perl_version() -> str:
    return _perl_config_data().version


def perlxs_api_dependency() -> str:
    # dh_perl used the build version of perl for this, so we will too.  Most of the perl cross logic
    # assumes that the major version of build variant of Perl is the same as the host variant of Perl.
    config = _perl_config_data()
    if config.debian_abi is not None and config.debian_abi != "":
        return f"perlapi-{config.debian_abi}"
    return f"perlapi-{config.version}"


def perl_module_dirs(
    dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
    dctrl_bin: "BinaryPackage",
) -> PerlIncDir:
    global _PERL_MODULE_DIRS
    arch = (
        dctrl_bin.resolved_architecture
        if dpkg_architecture_variables.is_cross_compiling
        else "_default_"
    )
    module_dir = _PERL_MODULE_DIRS.get(arch)
    if module_dir is None:
        cmd = ["perl"]
        if dpkg_architecture_variables.is_cross_compiling:
            version = _perl_version()
            inc_dir = f"/usr/lib/{dctrl_bin.deb_multiarch}/perl/cross-config-{version}"
            # FIXME: This should not fallback to "build-arch" but on the other hand, we use the perl module dirs
            #  for every package at the moment. So mandating correct perl dirs implies mandating perl-xs-dev in
            #  cross builds... meh.
            if os.path.exists(os.path.join(inc_dir, "Config.pm")):
                cmd.append(f"-I{inc_dir}")
        cmd.extend(
            ["-MConfig", "-e", 'print "$Config{vendorlib}\n$Config{vendorarch}\n"']
        )
        output = subprocess.check_output(cmd).decode("utf-8").splitlines(keepends=False)
        if len(output) != 2:
            raise ValueError(
                "Internal error: Unable to determine the perl include directories:"
                f" Raw output from perl snippet: {output}"
            )
        module_dir = PerlIncDir(
            vendorlib=_normalize_path(output[0]),
            vendorarch=_normalize_path(output[1]),
        )
        _PERL_MODULE_DIRS[arch] = module_dir
    return module_dir


@functools.lru_cache(1)
def detect_fakeroot() -> bool:
    if os.getuid() != 0 or "LD_PRELOAD" not in os.environ:
        return False
    env = dict(os.environ)
    del env["LD_PRELOAD"]
    try:
        return subprocess.check_output(["id", "-u"], env=env).strip() != b"0"
    except subprocess.CalledProcessError:
        print(
            'Could not run "id -u" with LD_PRELOAD unset; assuming we are not run under fakeroot',
            file=sys.stderr,
        )
        return False


@functools.lru_cache(1)
def _sc_arg_max() -> Optional[int]:
    try:
        return os.sysconf("SC_ARG_MAX")
    except RuntimeError:
        _warn("Could not resolve SC_ARG_MAX, falling back to a hard-coded limit")
        return None


def _split_xargs_args(
    static_cmd: Sequence[str],
    max_args_byte_len: int,
    varargs: Iterable[str],
    reuse_list_ok: bool,
) -> Iterator[List[str]]:
    static_cmd_len = len(static_cmd)
    remaining_len = max_args_byte_len
    pending_args = list(static_cmd)
    for arg in varargs:
        arg_len = len(arg.encode("utf-8")) + 1  # +1 for leading space
        remaining_len -= arg_len
        if not remaining_len:
            if len(pending_args) <= static_cmd_len:
                raise ValueError(
                    f"Could not fit a single argument into the command line !?"
                    f" {max_args_byte_len} (variable argument limit) < {arg_len} (argument length)"
                )
            yield pending_args
            remaining_len = max_args_byte_len - arg_len
            if reuse_list_ok:
                pending_args.clear()
                pending_args.extend(static_cmd)
            else:
                pending_args = list(static_cmd)
        pending_args.append(arg)

    if len(pending_args) > static_cmd_len:
        yield pending_args


def xargs(
    static_cmd: Sequence[str],
    varargs: Iterable[str],
    *,
    env: Optional[Mapping[str, str]] = None,
    reuse_list_ok: bool = False,
) -> Iterator[List[str]]:
    max_args_bytes = _sc_arg_max()
    # len overshoots with one space explaining the -1.  The _split_xargs_args
    # will account for the space for the first argument
    static_byte_len = (
        len(static_cmd) - 1 + sum(len(a.encode("utf-8")) for a in static_cmd)
    )
    if max_args_bytes is not None:
        if env is None:
            # +2 for nul bytes after key and value
            static_byte_len += sum(len(k) + len(v) + 2 for k, v in os.environb.items())
        else:
            # +2 for nul bytes after key and value
            static_byte_len += sum(
                len(k.encode("utf-8")) + len(v.encode("utf-8")) + 2
                for k, v in env.items()
            )
        # Add a fixed buffer for OS overhead here (in case env and cmd both must be page-aligned or something like
        # that)
        static_byte_len += 2 * 4096
    else:
        # The 20 000 limit is from debhelper, and it did not account for environment.  So neither will we here.
        max_args_bytes = 20_000
    remain_len = max_args_bytes - static_byte_len
    yield from _split_xargs_args(static_cmd, remain_len, varargs, reuse_list_ok)


# itertools recipe
def grouper(
    iterable: Iterable[T],
    n: int,
    *,
    incomplete: Literal["fill", "strict", "ignore"] = "fill",
    fillvalue: Optional[T] = None,
) -> Iterator[Tuple[T, ...]]:
    """Collect data into non-overlapping fixed-length chunks or blocks"""
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == "fill":
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == "strict":
        return zip(*args, strict=True)
    if incomplete == "ignore":
        return zip(*args)
    else:
        raise ValueError("Expected fill, strict, or ignore")


_LOGGING_SET_UP = False


def _check_color() -> Tuple[bool, bool, Optional[str]]:
    dpkg_or_default = os.environ.get(
        "DPKG_COLORS", "never" if "NO_COLOR" in os.environ else "auto"
    )
    requested_color = os.environ.get("DEBPUTY_COLORS", dpkg_or_default)
    bad_request = None
    if requested_color not in {"auto", "always", "never"}:
        bad_request = requested_color
        requested_color = "auto"

    if requested_color == "auto":
        stdout_color = sys.stdout.isatty()
        stderr_color = sys.stdout.isatty()
    else:
        enable = requested_color == "always"
        stdout_color = enable
        stderr_color = enable
    return stdout_color, stderr_color, bad_request


def program_name() -> str:
    name = os.path.basename(sys.argv[0])
    if name.endswith(".py"):
        name = name[:-3]
    if name == "__main__":
        name = os.path.basename(os.path.dirname(sys.argv[0]))
    # FIXME: Not optimal that we have to hardcode these kind of things here
    if name == "debputy_cmd":
        name = "debputy"
    return name


def package_cross_check_precheck(
    pkg_a: "BinaryPackage",
    pkg_b: "BinaryPackage",
) -> Tuple[bool, bool]:
    """Whether these two packages can do content cross-checks

    :param pkg_a: The first package
    :param pkg_b: The second package
    :return: A tuple if two booleans. If the first is True, then binary_package_a may do content cross-checks
      that invoĺves binary_package_b. If the second is True, then binary_package_b may do content cross-checks
      that involves binary_package_a. Both can be True and both can be False at the same time, which
      happens in common cases (arch:all + arch:any cases both to be False as a common example).
    """

    # Handle the two most obvious base-cases
    if not pkg_a.should_be_acted_on or not pkg_b.should_be_acted_on:
        return False, False
    if pkg_a.is_arch_all ^ pkg_b.is_arch_all:
        return False, False

    a_may_see_b = True
    b_may_see_a = True

    a_bp = pkg_a.fields.get("Build-Profiles", "")
    b_bp = pkg_b.fields.get("Build-Profiles", "")

    if a_bp != b_bp:
        a_bp_set = _parse_build_profiles(a_bp) if a_bp != "" else frozenset()
        b_bp_set = _parse_build_profiles(b_bp) if b_bp != "" else frozenset()

        # Check for build profiles being identically but just ordered differently.
        if a_bp_set != b_bp_set:
            # For simplicity, we let groups cancel each other out. If one side has no clauses
            # left, then it will always be built when the other is built.
            #
            # Eventually, someone will be here with a special case where more complex logic is
            # required. Good luck to you! Remember to add test cases for it (the existing logic
            # has some for a reason and if the logic is going to be more complex, it will need
            # tests cases to assert it fixes the problem and does not regress)
            if a_bp_set - b_bp_set:
                a_may_see_b = False
            if b_bp_set - a_bp_set:
                b_may_see_a = False

    if pkg_a.declared_architecture != pkg_b.declared_architecture:
        # Also here we could do a subset check, but wildcards vs. non-wildcards make that a pain
        if pkg_a.declared_architecture != "any":
            b_may_see_a = False
        if pkg_a.declared_architecture != "any":
            a_may_see_b = False

    return a_may_see_b, b_may_see_a


def setup_logging(
    *, log_only_to_stderr: bool = False, reconfigure_logging: bool = False
) -> None:
    global _LOGGING_SET_UP, _DEFAULT_LOGGER, _STDOUT_HANDLER, _STDERR_HANDLER
    if _LOGGING_SET_UP and not reconfigure_logging:
        raise RuntimeError(
            "Logging has already been configured."
            " Use reconfigure_logging=True if you need to reconfigure it"
        )
    stdout_color, stderr_color, bad_request = _check_color()

    if stdout_color or stderr_color:
        try:
            import colorlog
        except ImportError:
            stdout_color = False
            stderr_color = False

    if log_only_to_stderr:
        stdout = sys.stderr
        stdout_color = stderr_color
    else:
        stdout = sys.stderr

    class LogLevelFilter(logging.Filter):
        def __init__(self, threshold: int, above: bool):
            super().__init__()
            self.threshold = threshold
            self.above = above

        def filter(self, record: logging.LogRecord) -> bool:
            if self.above:
                return record.levelno >= self.threshold
            else:
                return record.levelno < self.threshold

    color_format = (
        "{bold}{name}{reset}: {bold}{log_color}{levelnamelower}{reset}: {message}"
    )
    colorless_format = "{name}: {levelnamelower}: {message}"

    existing_stdout_handler = _STDOUT_HANDLER
    existing_stderr_handler = _STDERR_HANDLER

    if stdout_color:
        stdout_handler = colorlog.StreamHandler(stdout)
        stdout_handler.setFormatter(
            colorlog.ColoredFormatter(color_format, style="{", force_color=True)
        )
        logger = colorlog.getLogger()
        if existing_stdout_handler is not None:
            logger.removeHandler(existing_stdout_handler)
        _STDOUT_HANDLER = stdout_handler
        logger.addHandler(stdout_handler)
    else:
        stdout_handler = logging.StreamHandler(stdout)
        stdout_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
        logger = logging.getLogger()
        if existing_stdout_handler is not None:
            logger.removeHandler(existing_stdout_handler)
        _STDOUT_HANDLER = stdout_handler
        logger.addHandler(stdout_handler)

    if stderr_color:
        stderr_handler = colorlog.StreamHandler(sys.stderr)
        stderr_handler.setFormatter(
            colorlog.ColoredFormatter(color_format, style="{", force_color=True)
        )
        logger = logging.getLogger()
        if existing_stdout_handler is not None:
            logger.removeHandler(existing_stderr_handler)
        _STDERR_HANDLER = stderr_handler
        logger.addHandler(stderr_handler)
    else:
        stderr_handler = logging.StreamHandler(sys.stderr)
        stderr_handler.setFormatter(logging.Formatter(colorless_format, style="{"))
        logger = logging.getLogger()
        if existing_stdout_handler is not None:
            logger.removeHandler(existing_stderr_handler)
        _STDERR_HANDLER = stderr_handler
        logger.addHandler(stderr_handler)

    stdout_handler.addFilter(LogLevelFilter(logging.WARN, False))
    stderr_handler.addFilter(LogLevelFilter(logging.WARN, True))

    name = program_name()

    old_factory = logging.getLogRecordFactory()

    def record_factory(
        *args: Any, **kwargs: Any
    ) -> logging.LogRecord:  # pragma: no cover
        record = old_factory(*args, **kwargs)
        record.levelnamelower = record.levelname.lower()
        return record

    logging.setLogRecordFactory(record_factory)

    logging.getLogger().setLevel(logging.INFO)
    _DEFAULT_LOGGER = logging.getLogger(name)

    if bad_request:
        _DEFAULT_LOGGER.warning(
            f'Invalid color request for "{bad_request}" in either DEBPUTY_COLORS or DPKG_COLORS.'
            ' Resetting to "auto".'
        )

    _LOGGING_SET_UP = True