summaryrefslogtreecommitdiffstats
path: root/src/debputy/plugin/debputy/strip_non_determinism.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/plugin/debputy/strip_non_determinism.py')
-rw-r--r--src/debputy/plugin/debputy/strip_non_determinism.py264
1 files changed, 264 insertions, 0 deletions
diff --git a/src/debputy/plugin/debputy/strip_non_determinism.py b/src/debputy/plugin/debputy/strip_non_determinism.py
new file mode 100644
index 0000000..2f8fd39
--- /dev/null
+++ b/src/debputy/plugin/debputy/strip_non_determinism.py
@@ -0,0 +1,264 @@
+import dataclasses
+import os.path
+import re
+import subprocess
+from contextlib import ExitStack
+from enum import IntEnum
+from typing import Iterator, Optional, List, Callable, Any, Tuple, Union
+
+from debputy.plugin.api import VirtualPath
+from debputy.plugin.api.impl_types import PackageProcessingContextProvider
+from debputy.util import xargs, _info, escape_shell, _error
+
+
+class DetectionVerdict(IntEnum):
+ NOT_RELEVANT = 1
+ NEEDS_FILE_OUTPUT = 2
+ PROCESS = 3
+
+
+def _file_starts_with(
+ sequences: Union[bytes, Tuple[bytes, ...]]
+) -> Callable[[VirtualPath], bool]:
+ if isinstance(sequences, bytes):
+ longest_sequence = len(sequences)
+ sequences = (sequences,)
+ else:
+ longest_sequence = max(len(s) for s in sequences)
+
+ def _checker(path: VirtualPath) -> bool:
+ with path.open(byte_io=True, buffering=4096) as fd:
+ buffer = fd.read(longest_sequence)
+ return buffer in sequences
+
+ return _checker
+
+
+def _is_javadoc_file(path: VirtualPath) -> bool:
+ with path.open(buffering=4096) as fd:
+ c = fd.read(1024)
+ return "<!-- Generated by javadoc" in c
+
+
+class SndDetectionRule:
+ def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
+ raise NotImplementedError
+
+ def file_output_verdict(
+ self,
+ path: VirtualPath,
+ file_analysis: Optional[str],
+ ) -> bool:
+ raise TypeError(
+ "Should not have been called or the rule forgot to implement this method"
+ )
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class ExtensionPlusFileOutputRule(SndDetectionRule):
+ extensions: Tuple[str, ...]
+ file_pattern: Optional[re.Pattern[str]] = None
+
+ def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
+ _, ext = os.path.splitext(path.name)
+ if ext not in self.extensions:
+ return DetectionVerdict.NOT_RELEVANT
+ if self.file_pattern is None:
+ return DetectionVerdict.PROCESS
+ return DetectionVerdict.NEEDS_FILE_OUTPUT
+
+ def file_output_verdict(
+ self,
+ path: VirtualPath,
+ file_analysis: str,
+ ) -> bool:
+ file_pattern = self.file_pattern
+ assert file_pattern is not None
+ m = file_pattern.search(file_analysis)
+ return m is not None
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class ExtensionPlusContentCheck(SndDetectionRule):
+ extensions: Tuple[str, ...]
+ content_check: Callable[[VirtualPath], bool]
+
+ def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
+ _, ext = os.path.splitext(path.name)
+ if ext not in self.extensions:
+ return DetectionVerdict.NOT_RELEVANT
+ content_verdict = self.content_check(path)
+ if content_verdict:
+ return DetectionVerdict.PROCESS
+ return DetectionVerdict.NOT_RELEVANT
+
+
+class PyzipFileCheck(SndDetectionRule):
+ def _is_pyzip_file(self, path: VirtualPath) -> bool:
+ with path.open(byte_io=True, buffering=4096) as fd:
+ c = fd.read(32)
+ if not c.startswith(b"#!"):
+ return False
+
+ return b"\nPK\x03\x04" in c
+
+ def initial_verdict(self, path: VirtualPath) -> DetectionVerdict:
+ if self._is_pyzip_file(path):
+ return DetectionVerdict.PROCESS
+ return DetectionVerdict.NOT_RELEVANT
+
+
+# These detection rules should be aligned with `get_normalizer_for_file` in File::StripNondeterminism.
+# Note if we send a file too much, it is just bad for performance. If we send a file to little, we
+# risk non-determinism in the final output.
+SND_DETECTION_RULES: List[SndDetectionRule] = [
+ ExtensionPlusContentCheck(
+ extensions=(".a",),
+ content_check=_file_starts_with(
+ (
+ b"!<arch>\n",
+ b"!<thin>\n",
+ ),
+ ),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(".png",),
+ content_check=_file_starts_with(b"\x89PNG\x0D\x0A\x1A\x0A"),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(".gz", ".dz"),
+ content_check=_file_starts_with(b"\x1F\x8B"),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(
+ # .zip related
+ ".zip",
+ ".pk3",
+ ".epub",
+ ".whl",
+ ".xpi",
+ ".htb",
+ ".zhfst",
+ ".par",
+ ".codadef",
+ # .jar related
+ ".jar",
+ ".war",
+ ".hpi",
+ ".apk",
+ ".sym",
+ ),
+ content_check=_file_starts_with(
+ (
+ b"PK\x03\x04\x1F",
+ b"PK\x05\x06",
+ b"PK\x07\x08",
+ )
+ ),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(
+ ".mo",
+ ".gmo",
+ ),
+ content_check=_file_starts_with(
+ (
+ b"\x95\x04\x12\xde",
+ b"\xde\x12\x04\x95",
+ )
+ ),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(".uimage",),
+ content_check=_file_starts_with(b"\x27\x05\x19\x56"),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(".bflt",),
+ content_check=_file_starts_with(b"\x62\x46\x4C\x54"),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(".jmod",),
+ content_check=_file_starts_with(b"JM"),
+ ),
+ ExtensionPlusContentCheck(
+ extensions=(".html",),
+ content_check=_is_javadoc_file,
+ ),
+ PyzipFileCheck(),
+ ExtensionPlusFileOutputRule(
+ extensions=(".cpio",),
+ # XXX: Add file output check (requires the file output support)
+ ),
+]
+
+
+def _detect_paths_with_possible_non_determinism(
+ fs_root: VirtualPath,
+) -> Iterator[VirtualPath]:
+ needs_file_output = []
+ for path in fs_root.all_paths():
+ if not path.is_file:
+ continue
+ verdict = DetectionVerdict.NOT_RELEVANT
+ needs_file_output_rules = []
+ for rule in SND_DETECTION_RULES:
+ v = rule.initial_verdict(path)
+ if v > verdict:
+ verdict = v
+ if verdict == DetectionVerdict.PROCESS:
+ yield path
+ break
+ elif verdict == DetectionVerdict.NEEDS_FILE_OUTPUT:
+ needs_file_output_rules.append(rule)
+
+ if verdict == DetectionVerdict.NEEDS_FILE_OUTPUT:
+ needs_file_output.append((path, needs_file_output_rules))
+
+ assert not needs_file_output
+ # FIXME: Implement file check
+
+
+def _apply_strip_non_determinism(timestamp: str, paths: List[VirtualPath]) -> None:
+ static_cmd = [
+ "strip-nondeterminism",
+ f"--timestamp={timestamp}",
+ "-v",
+ "--normalizers=+all",
+ ]
+ with ExitStack() as manager:
+ affected_files = [
+ manager.enter_context(p.replace_fs_path_content()) for p in paths
+ ]
+ for cmd in xargs(static_cmd, affected_files):
+ _info(
+ f"Removing (possible) unnecessary non-deterministic content via: {escape_shell(*cmd)}"
+ )
+ try:
+ subprocess.check_call(
+ cmd,
+ stdin=subprocess.DEVNULL,
+ restore_signals=True,
+ )
+ except subprocess.CalledProcessError:
+ _error(
+ "Attempting to remove unnecessary non-deterministic content failed. Please review"
+ " the error from strip-nondeterminism above understand what went wrong."
+ )
+
+
+def strip_non_determinism(
+ fs_root: VirtualPath, _: Any, context: PackageProcessingContextProvider
+) -> None:
+ paths = list(_detect_paths_with_possible_non_determinism(fs_root))
+
+ if not paths:
+ _info("Detected no paths to be processed by strip-nondeterminism")
+ return
+
+ substitution = context._manifest.substitution
+
+ source_date_epoch = substitution.substitute(
+ "{{_DEBPUTY_SND_SOURCE_DATE_EPOCH}}", "Internal; strip-nondeterminism"
+ )
+
+ _apply_strip_non_determinism(source_date_epoch, paths)