#!/usr/bin/env python3 # # Copyright © 2017, 2020 Chris Lamb # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or (at # your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # pylint: disable=invalid-name # pylint: enable=invalid-name import argparse import bz2 import collections import json import logging import os import re import subprocess import sys import time import apt import requests try: from xdg.BaseDirectory import xdg_cache_home except ImportError: print("This script requires the xdg python3 module.", file=sys.stderr) print( "Please install the python3-xdg Debian package in order to use this utility.", file=sys.stderr, ) sys.exit(1) class ReproducibleCheck: HELP = """ Reports on the reproducible status of installed packages. For more details please see . """ NAME = os.path.basename(__file__) VERSION = 1 STATUS_URL = "https://tests.reproducible-builds.org/debian/reproducible.json.bz2" CACHE = os.path.join(xdg_cache_home, NAME, os.path.basename(STATUS_URL)) CACHE_AGE_SECONDS = 86400 @classmethod def parse(cls): parser = argparse.ArgumentParser(description=cls.HELP) parser.add_argument( "-d", "--debug", help="show debugging messages", default=False, action="store_true", ) parser.add_argument( "-r", "--raw", help="print unreproducible binary packages only (for dd-list -i)", default=False, action="store_true", ) parser.add_argument( "--version", help="print version and exit", default=False, action="store_true", ) return cls(parser.parse_args()) def __init__(self, args): self.args = args logging.basicConfig( format="%(asctime).19s %(levelname).1s: %(message)s", level=logging.DEBUG if args.debug else logging.INFO, ) self.log = logging.getLogger() def main(self): if self.args.version: print(f"{self.NAME} version {self.VERSION}") return 0 if self.get_distributor_id() != "Debian": self.log.error("Refusing to return results for non-Debian distributions") return 2 self.update_cache() installed = self.get_installed_packages() reproducible = self.get_reproducible_packages() if self.args.raw: self.output_raw(installed, reproducible) else: self.output_by_source(installed, reproducible) self.log.info( "These results are based on data from the Reproducible Builds " "CI framework, showing only the theoretical (and " "unofficial) reproducibility of these Debian packages." ) return 0 def get_distributor_id(self): try: distribution_id = ( subprocess.check_output(("lsb_release", "-is")).decode("utf-8").strip() ) except subprocess.CalledProcessError: distribution_id = "" self.log.debug("Detected distribution %s", distribution_id or "(unknown)") return distribution_id def update_cache(self): self.log.debug("Checking cache file %s ...", self.CACHE) try: if os.path.getmtime(self.CACHE) >= time.time() - self.CACHE_AGE_SECONDS: self.log.debug("Cache is up to date") return except OSError: pass new_cache = f"{self.CACHE}.new" self.log.info("Updating cache to %s...", new_cache) response = requests.get(self.STATUS_URL, timeout=60) os.makedirs(os.path.dirname(self.CACHE), exist_ok=True) with open(new_cache, "wb") as f: for content in response.iter_content(chunk_size=2**16): f.write(content) os.rename(new_cache, self.CACHE) def get_reproducible_packages(self): """ Return (source, architecture, version) triplets for reproducible source packages. """ self.log.debug("Loading data from cache %s", self.CACHE) data = set() source_packages = set() with bz2.open(self.CACHE) as f: all_packages = json.loads(f.read().decode("utf-8")) for pkg in all_packages: if pkg["status"] != "reproducible": continue data.add((pkg["package"], pkg["architecture"], pkg["version"])) source_packages.add(pkg["package"]) self.log.debug("Parsed data about %d source packages", len(source_packages)) return data def get_installed_packages(self): """ Return (binary_package, architecture, version) triplets, mapped to their corresponding source package. """ result = {} for pkg in apt.Cache(): for pkg_ver in pkg.versions: if not pkg_ver.is_installed: continue # We may have installed a binNMU version locally so we need to # strip these off when looking up against the JSON of results. version = re.sub(r"\+b\d+$", "", pkg_ver.version) result[ (pkg.shortname, pkg_ver.architecture, version) ] = pkg_ver.source_name self.log.debug("Parsed %d installed binary packages", len(result)) return result def iter_installed_unreproducible(self, installed, reproducible): # "Architecture: all" binary packages should pretend to the system's # default architecture for lookup purposes. default_architecture = apt.apt_pkg.config.find("APT::Architecture") self.log.debug("Using %s as our 'Architecture: all' lookup") for (binary, architecture, version), source in sorted(installed.items()): if architecture == "all": architecture = default_architecture lookup_key = (source, architecture, version) if lookup_key not in reproducible: yield binary, source, version def output_by_source(self, installed, reproducible): by_source = collections.defaultdict(set) num_unreproducible = 0 for binary, source, version in self.iter_installed_unreproducible( installed, reproducible ): by_source[(source, version)].add(binary) num_unreproducible += 1 for (source, version), binaries in sorted(by_source.items()): # Calculate some clarifying suffixes/prefixes src = "" pkgs = "" if binaries != {source}: src = "src:" pkgs = f" ({', '.join(binaries)})" print( f"{src}{source} ({version}){pkgs} is not reproducible " f"" ) num_installed = len(installed) num_reproducible = len(installed) - num_unreproducible percent = 100.0 * num_reproducible / num_installed print( f"{num_unreproducible}/{num_installed} ({percent:.2f}%) of " f"installed binary packages are reproducible." ) def output_raw(self, installed, reproducible): for binary, _, _ in self.iter_installed_unreproducible(installed, reproducible): print(binary) if __name__ == "__main__": try: sys.exit(ReproducibleCheck.parse().main()) except (KeyboardInterrupt, BrokenPipeError): sys.exit(1)