diff options
Diffstat (limited to '')
-rw-r--r-- | apt/__init__.py | 40 | ||||
-rw-r--r-- | apt/auth.py | 311 | ||||
-rw-r--r-- | apt/cache.py | 1004 | ||||
-rw-r--r-- | apt/cdrom.py | 91 | ||||
-rw-r--r-- | apt/debfile.py | 861 | ||||
-rw-r--r-- | apt/package.py | 1559 | ||||
-rw-r--r-- | apt/progress/__init__.py | 28 | ||||
-rw-r--r-- | apt/progress/base.py | 332 | ||||
-rw-r--r-- | apt/progress/text.py | 294 | ||||
-rw-r--r-- | apt/py.typed | 0 | ||||
-rw-r--r-- | apt/utils.py | 100 | ||||
-rw-r--r-- | aptsources/__init__.py | 6 | ||||
-rw-r--r-- | aptsources/_deb822.py | 144 | ||||
-rw-r--r-- | aptsources/distinfo.py | 415 | ||||
-rw-r--r-- | aptsources/distro.py | 648 | ||||
-rw-r--r-- | aptsources/sourceslist.py | 1083 |
16 files changed, 6916 insertions, 0 deletions
diff --git a/apt/__init__.py b/apt/__init__.py new file mode 100644 index 0000000..f22c9a0 --- /dev/null +++ b/apt/__init__.py @@ -0,0 +1,40 @@ +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +# import the core of apt_pkg +"""High-Level Interface for working with apt.""" +import apt_pkg + +from apt.cache import Cache as Cache +from apt.cache import ProblemResolver as ProblemResolver + +# import some fancy classes +from apt.package import Package as Package +from apt.package import Version as Version + +Cache # pyflakes +ProblemResolver # pyflakes +Version # pyflakes +from apt.cdrom import Cdrom as Cdrom + +# init the package system, but do not re-initialize config +if "APT" not in apt_pkg.config: + apt_pkg.init_config() +apt_pkg.init_system() + +__all__ = ["Cache", "Cdrom", "Package"] diff --git a/apt/auth.py b/apt/auth.py new file mode 100644 index 0000000..6d50616 --- /dev/null +++ b/apt/auth.py @@ -0,0 +1,311 @@ +#!/usr/bin/python3 +# auth - authentication key management +# +# Copyright (c) 2004 Canonical +# Copyright (c) 2012 Sebastian Heinlein +# +# Author: Michael Vogt <mvo@debian.org> +# Sebastian Heinlein <devel@glatzor.de> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Handle GnuPG keys used to trust signed repositories.""" + +import errno +import os +import os.path +import shutil +import subprocess +import sys +import tempfile + +import apt_pkg +from apt_pkg import gettext as _ + + +class AptKeyError(Exception): + pass + + +class AptKeyIDTooShortError(AptKeyError): + """Internal class do not rely on it.""" + + +class TrustedKey: + + """Represents a trusted key.""" + + def __init__(self, name: str, keyid: str, date: str) -> None: + self.raw_name = name + # Allow to translated some known keys + self.name = _(name) + self.keyid = keyid + self.date = date + + def __str__(self) -> str: + return f"{self.name}\n{self.keyid} {self.date}" + + +def _call_apt_key_script(*args: str, **kwargs: str | None) -> str: + """Run the apt-key script with the given arguments.""" + conf = None + cmd = [apt_pkg.config.find_file("Dir::Bin::Apt-Key", "/usr/bin/apt-key")] + cmd.extend(args) + env = os.environ.copy() + env["LANG"] = "C" + env["APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE"] = "1" + try: + if apt_pkg.config.find_dir("Dir") != "/": + # If the key is to be installed into a chroot we have to export the + # configuration from the chroot to the apt-key script by using + # a temporary APT_CONFIG file. The apt-key script uses apt-config + # shell internally + conf = tempfile.NamedTemporaryFile(prefix="apt-key", suffix=".conf") + conf.write(apt_pkg.config.dump().encode("UTF-8")) + conf.flush() + env["APT_CONFIG"] = conf.name + proc = subprocess.Popen( + cmd, + env=env, + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdin = kwargs.get("stdin", None) + + output, stderr = proc.communicate(stdin) # type: str, str + + if proc.returncode: + raise AptKeyError( + "The apt-key script failed with return code %s:\n" + "%s\n" + "stdout: %s\n" + "stderr: %s" % (proc.returncode, " ".join(cmd), output, stderr) + ) + elif stderr: + sys.stderr.write(stderr) # Forward stderr + + return output.strip() + finally: + if conf is not None: + conf.close() + + +def add_key_from_file(filename: str) -> None: + """Import a GnuPG key file to trust repositores signed by it. + + Keyword arguments: + filename -- the absolute path to the public GnuPG key file + """ + if not os.path.abspath(filename): + raise AptKeyError("An absolute path is required: %s" % filename) + if not os.access(filename, os.R_OK): + raise AptKeyError("Key file cannot be accessed: %s" % filename) + _call_apt_key_script("add", filename) + + +def add_key_from_keyserver(keyid: str, keyserver: str) -> None: + """Import a GnuPG key file to trust repositores signed by it. + + Keyword arguments: + keyid -- the long keyid (fingerprint) of the key, e.g. + A1BD8E9D78F7FE5C3E65D8AF8B48AD6246925553 + keyserver -- the URL or hostname of the key server + """ + tmp_keyring_dir = tempfile.mkdtemp() + try: + _add_key_from_keyserver(keyid, keyserver, tmp_keyring_dir) + except Exception: + raise + finally: + # We are racing with gpg when removing sockets, so ignore + # failure to delete non-existing files. + def onerror( + func: object, path: str, exc_info: tuple[type, Exception, object] + ) -> None: + if isinstance(exc_info[1], OSError) and exc_info[1].errno == errno.ENOENT: + return + raise + + shutil.rmtree(tmp_keyring_dir, onerror=onerror) + + +def _add_key_from_keyserver(keyid: str, keyserver: str, tmp_keyring_dir: str) -> None: + if len(keyid.replace(" ", "").replace("0x", "")) < (160 / 4): + raise AptKeyIDTooShortError("Only fingerprints (v4, 160bit) are supported") + # create a temp keyring dir + tmp_secret_keyring = os.path.join(tmp_keyring_dir, "secring.gpg") + tmp_keyring = os.path.join(tmp_keyring_dir, "pubring.gpg") + # default options for gpg + gpg_default_options = [ + "gpg", + "--no-default-keyring", + "--no-options", + "--homedir", + tmp_keyring_dir, + ] + # download the key to a temp keyring first + res = subprocess.call( + gpg_default_options + + [ + "--secret-keyring", + tmp_secret_keyring, + "--keyring", + tmp_keyring, + "--keyserver", + keyserver, + "--recv", + keyid, + ] + ) + if res != 0: + raise AptKeyError(f"recv from '{keyserver}' failed for '{keyid}'") + # FIXME: + # - with gnupg 1.4.18 the downloaded key is actually checked(!), + # i.e. gnupg will not import anything that the server sends + # into the keyring, so the below checks are now redundant *if* + # gnupg 1.4.18 is used + + # now export again using the long key id (to ensure that there is + # really only this one key in our keyring) and not someone MITM us + tmp_export_keyring = os.path.join(tmp_keyring_dir, "export-keyring.gpg") + res = subprocess.call( + gpg_default_options + + [ + "--keyring", + tmp_keyring, + "--output", + tmp_export_keyring, + "--export", + keyid, + ] + ) + if res != 0: + raise AptKeyError("export of '%s' failed", keyid) + # now verify the fingerprint, this is probably redundant as we + # exported by the fingerprint in the previous command but its + # still good paranoia + output = subprocess.Popen( + gpg_default_options + + [ + "--keyring", + tmp_export_keyring, + "--fingerprint", + "--batch", + "--fixed-list-mode", + "--with-colons", + ], + stdout=subprocess.PIPE, + universal_newlines=True, + ).communicate()[0] + got_fingerprint = None + for line in output.splitlines(): + if line.startswith("fpr:"): + got_fingerprint = line.split(":")[9] + # stop after the first to ensure no subkey trickery + break + # strip the leading "0x" is there is one and uppercase (as this is + # what gnupg is using) + signing_key_fingerprint = keyid.replace("0x", "").upper() + if got_fingerprint != signing_key_fingerprint: + # make the error match what gnupg >= 1.4.18 will output when + # it checks the key itself before importing it + raise AptKeyError( + f"recv from '{keyserver}' failed for '{signing_key_fingerprint}'" + ) + # finally add it + add_key_from_file(tmp_export_keyring) + + +def add_key(content: str) -> None: + """Import a GnuPG key to trust repositores signed by it. + + Keyword arguments: + content -- the content of the GnuPG public key + """ + _call_apt_key_script("adv", "--quiet", "--batch", "--import", "-", stdin=content) + + +def remove_key(fingerprint: str) -> None: + """Remove a GnuPG key to no longer trust repositores signed by it. + + Keyword arguments: + fingerprint -- the fingerprint identifying the key + """ + _call_apt_key_script("rm", fingerprint) + + +def export_key(fingerprint: str) -> str: + """Return the GnuPG key in text format. + + Keyword arguments: + fingerprint -- the fingerprint identifying the key + """ + return _call_apt_key_script("export", fingerprint) + + +def update() -> str: + """Update the local keyring with the archive keyring and remove from + the local keyring the archive keys which are no longer valid. The + archive keyring is shipped in the archive-keyring package of your + distribution, e.g. the debian-archive-keyring package in Debian. + """ + return _call_apt_key_script("update") + + +def net_update() -> str: + """Work similar to the update command above, but get the archive + keyring from an URI instead and validate it against a master key. + This requires an installed wget(1) and an APT build configured to + have a server to fetch from and a master keyring to validate. APT + in Debian does not support this command and relies on update + instead, but Ubuntu's APT does. + """ + return _call_apt_key_script("net-update") + + +def list_keys() -> list[TrustedKey]: + """Returns a list of TrustedKey instances for each key which is + used to trust repositories. + """ + # The output of `apt-key list` is difficult to parse since the + # --with-colons parameter isn't user + output = _call_apt_key_script( + "adv", "--with-colons", "--batch", "--fixed-list-mode", "--list-keys" + ) + res = [] + for line in output.split("\n"): + fields = line.split(":") + if fields[0] == "pub": + keyid = fields[4] + if fields[0] == "uid": + uid = fields[9] + creation_date = fields[5] + key = TrustedKey(uid, keyid, creation_date) + res.append(key) + return res + + +if __name__ == "__main__": + # Add some known keys we would like to see translated so that they get + # picked up by gettext + lambda: _("Ubuntu Archive Automatic Signing Key <ftpmaster@ubuntu.com>") + lambda: _("Ubuntu CD Image Automatic Signing Key <cdimage@ubuntu.com>") + + apt_pkg.init() + for trusted_key in list_keys(): + print(trusted_key) diff --git a/apt/cache.py b/apt/cache.py new file mode 100644 index 0000000..cf78026 --- /dev/null +++ b/apt/cache.py @@ -0,0 +1,1004 @@ +# cache.py - apt cache abstraction +# +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +from __future__ import annotations + +import fnmatch +import os +import warnings +import weakref +from collections.abc import Callable, Iterator, KeysView +from typing import Any, cast + +import apt_pkg + +import apt.progress.text +from apt.package import Package, Version +from apt.progress.base import AcquireProgress, InstallProgress, OpProgress + + +class FetchCancelledException(IOError): + """Exception that is thrown when the user cancels a fetch operation.""" + + +class FetchFailedException(IOError): + """Exception that is thrown when fetching fails.""" + + +class UntrustedException(FetchFailedException): + """Exception that is thrown when fetching fails for trust reasons""" + + +class LockFailedException(IOError): + """Exception that is thrown when locking fails.""" + + +class CacheClosedException(Exception): + """Exception that is thrown when the cache is used after close().""" + + +class _WrappedLock: + """Wraps an apt_pkg.FileLock to raise LockFailedException. + + Initialized using a directory path.""" + + def __init__(self, path: str) -> None: + self._path = path + self._lock = apt_pkg.FileLock(os.path.join(path, "lock")) + + def __enter__(self) -> None: + try: + return self._lock.__enter__() + except apt_pkg.Error as e: + raise LockFailedException( + ("Failed to lock directory %s: %s") % (self._path, e) + ) + + def __exit__(self, typ: object, value: object, traceback: object) -> None: + return self._lock.__exit__(typ, value, traceback) + + +class Cache: + """Dictionary-like package cache. + + The APT cache file contains a hash table mapping names of binary + packages to their metadata. A Cache object is the in-core + representation of the same. It provides access to APTs idea of the + list of available packages. + + The cache can be used like a mapping from package names to Package + objects (although only getting items is supported). + + Keyword arguments: + progress -- a OpProgress object, + rootdir -- an alternative root directory. if that is given the system + sources.list and system lists/files are not read, only file relative + to the given rootdir, + memonly -- build the cache in memory only. + + + .. versionchanged:: 1.0 + + The cache now supports package names with special architecture + qualifiers such as :all and :native. It does not export them + in :meth:`keys()`, though, to keep :meth:`keys()` a unique set. + """ + + def __init__( + self, + progress: OpProgress | None = None, + rootdir: str | None = None, + memonly: bool = False, + ) -> None: + self._cache: apt_pkg.Cache = cast(apt_pkg.Cache, None) + self._depcache: apt_pkg.DepCache = cast(apt_pkg.DepCache, None) + self._records: apt_pkg.PackageRecords = cast( + apt_pkg.PackageRecords, None + ) # noqa + self._list: apt_pkg.SourceList = cast(apt_pkg.SourceList, None) + self._callbacks: dict[str, list[Callable[..., None] | str]] = {} # noqa + self._callbacks2: dict[ + str, list[tuple[Callable[..., Any], tuple[Any, ...], dict[Any, Any]]] + ] = {} # noqa + self._weakref: weakref.WeakValueDictionary[ + str, apt.Package + ] = weakref.WeakValueDictionary() # noqa + self._weakversions: weakref.WeakSet[Version] = weakref.WeakSet() # noqa + self._changes_count = -1 + self._sorted_set: list[str] | None = None + + self.connect("cache_post_open", "_inc_changes_count") + self.connect("cache_post_change", "_inc_changes_count") + if memonly: + # force apt to build its caches in memory + apt_pkg.config.set("Dir::Cache::pkgcache", "") + if rootdir: + rootdir = os.path.abspath(rootdir) + if os.path.exists(rootdir + "/etc/apt/apt.conf"): + apt_pkg.read_config_file(apt_pkg.config, rootdir + "/etc/apt/apt.conf") + if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"): + apt_pkg.read_config_dir(apt_pkg.config, rootdir + "/etc/apt/apt.conf.d") + apt_pkg.config.set("Dir", rootdir) + apt_pkg.config.set("Dir::State::status", rootdir + "/var/lib/dpkg/status") + # also set dpkg to the rootdir path so that its called for the + # --print-foreign-architectures call + apt_pkg.config.set( + "Dir::bin::dpkg", os.path.join(rootdir, "usr", "bin", "dpkg") + ) + # create required dirs/files when run with special rootdir + # automatically + self._check_and_create_required_dirs(rootdir) + # Call InitSystem so the change to Dir::State::Status is actually + # recognized (LP: #320665) + apt_pkg.init_system() + + # Prepare a lock object (context manager for archive lock) + archive_dir = apt_pkg.config.find_dir("Dir::Cache::Archives") + self._archive_lock = _WrappedLock(archive_dir) + + self.open(progress) + + def fix_broken(self) -> None: + """Fix broken packages.""" + self._depcache.fix_broken() + + def _inc_changes_count(self) -> None: + """Increase the number of changes""" + self._changes_count += 1 + + def _check_and_create_required_dirs(self, rootdir: str) -> None: + """ + check if the required apt directories/files are there and if + not create them + """ + files = [ + "/var/lib/dpkg/status", + "/etc/apt/sources.list", + ] + dirs = [ + "/var/lib/dpkg", + "/etc/apt/", + "/var/cache/apt/archives/partial", + "/var/lib/apt/lists/partial", + ] + for d in dirs: + if not os.path.exists(rootdir + d): + # print "creating: ", rootdir + d + os.makedirs(rootdir + d) + for f in files: + if not os.path.exists(rootdir + f): + open(rootdir + f, "w").close() + + def _run_callbacks(self, name: str) -> None: + """internal helper to run a callback""" + if name in self._callbacks: + for callback in self._callbacks[name]: + if callback == "_inc_changes_count": + self._inc_changes_count() + else: + callback() # type: ignore + + if name in self._callbacks2: + for callback, args, kwds in self._callbacks2[name]: + callback(self, *args, **kwds) + + def open(self, progress: OpProgress | None = None) -> None: + """Open the package cache, after that it can be used like + a dictionary + """ + if progress is None: + progress = apt.progress.base.OpProgress() + # close old cache on (re)open + self.close() + self.op_progress = progress + self._run_callbacks("cache_pre_open") + + self._cache = apt_pkg.Cache(progress) + self._depcache = apt_pkg.DepCache(self._cache) + self._records = apt_pkg.PackageRecords(self._cache) + self._list = apt_pkg.SourceList() + self._list.read_main_list() + self._sorted_set = None + self.__remap() + + self._have_multi_arch = len(apt_pkg.get_architectures()) > 1 + + progress.done() + self._run_callbacks("cache_post_open") + + def __remap(self) -> None: + """Called after cache reopen() to relocate to new cache. + + Relocate objects like packages and versions from the old + underlying cache to the new one. + """ + for key in list(self._weakref.keys()): + try: + pkg = self._weakref[key] + except KeyError: + continue + + try: + pkg._pkg = self._cache[pkg._pkg.name, pkg._pkg.architecture] + except LookupError: + del self._weakref[key] + + for ver in list(self._weakversions): + # Package has been reseated above, reseat version + for v in ver.package._pkg.version_list: + # Requirements as in debListParser::SameVersion + if ( + v.hash == ver._cand.hash + and (v.size == 0 or ver._cand.size == 0 or v.size == ver._cand.size) + and v.multi_arch == ver._cand.multi_arch + and v.ver_str == ver._cand.ver_str + ): + ver._cand = v + break + else: + self._weakversions.remove(ver) + + def close(self) -> None: + """Close the package cache""" + # explicitely free the FDs that _records has open + del self._records + self._records = cast(apt_pkg.PackageRecords, None) + + def __enter__(self) -> Cache: + """Enter the with statement""" + return self + + def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None: + """Exit the with statement""" + self.close() + + def __getitem__(self, key: object) -> Package: + """look like a dictionary (get key)""" + try: + key = str(key) + rawpkg = self._cache[key] + except KeyError: + raise KeyError("The cache has no package named %r" % key) + + # It might be excluded due to not having a version or something + if not self.__is_real_pkg(rawpkg): + raise KeyError("The cache has no package named %r" % key) + + pkg = self._rawpkg_to_pkg(rawpkg) + + return pkg + + def get(self, key: object, default: object = None) -> Any: + """Return *self*[*key*] or *default* if *key* not in *self*. + + .. versionadded:: 1.1 + """ + try: + return self[key] + except KeyError: + return default + + def _rawpkg_to_pkg(self, rawpkg: apt_pkg.Package) -> Package: + """Returns the apt.Package object for an apt_pkg.Package object. + + .. versionadded:: 1.0.0 + """ + fullname = rawpkg.get_fullname(pretty=True) + + return self._weakref.setdefault(fullname, Package(self, rawpkg)) + + def __iter__(self) -> Iterator[Package]: + # We iterate sorted over package names here. With this we read the + # package lists linearly if we need to access the package records, + # instead of having to do thousands of random seeks; the latter + # is disastrous if we use compressed package indexes, and slower than + # necessary for uncompressed indexes. + for pkgname in self.keys(): + pkg = Package(self, self._cache[pkgname]) + yield self._weakref.setdefault(pkgname, pkg) + + def __is_real_pkg(self, rawpkg: apt_pkg.Package) -> bool: + """Check if the apt_pkg.Package provided is a real package.""" + return rawpkg.has_versions + + def has_key(self, key: object) -> bool: + return key in self + + def __contains__(self, key: object) -> bool: + try: + return self.__is_real_pkg(self._cache[str(key)]) + except KeyError: + return False + + def __len__(self) -> int: + return len(self.keys()) + + def keys(self) -> list[str]: + if self._sorted_set is None: + self._sorted_set = sorted( + p.get_fullname(pretty=True) + for p in self._cache.packages + if self.__is_real_pkg(p) + ) + return list(self._sorted_set) # We need a copy here, caller may modify + + def get_changes(self) -> list[Package]: + """Get the marked changes""" + changes = [] + marked_keep = self._depcache.marked_keep + for rawpkg in self._cache.packages: + if not marked_keep(rawpkg): + changes.append(self._rawpkg_to_pkg(rawpkg)) + return changes + + def upgrade(self, dist_upgrade: bool = False) -> None: + """Upgrade all packages. + + If the parameter *dist_upgrade* is True, new dependencies will be + installed as well (and conflicting packages may be removed). The + default value is False. + """ + self.cache_pre_change() + self._depcache.upgrade(dist_upgrade) + self.cache_post_change() + + @property + def required_download(self) -> int: + """Get the size of the packages that are required to download.""" + if self._records is None: + raise CacheClosedException("Cache object used after close() called") + pm = apt_pkg.PackageManager(self._depcache) + fetcher = apt_pkg.Acquire() + pm.get_archives(fetcher, self._list, self._records) + return fetcher.fetch_needed + + @property + def required_space(self) -> int: + """Get the size of the additional required space on the fs.""" + return self._depcache.usr_size + + @property + def req_reinstall_pkgs(self) -> set[str]: + """Return the packages not downloadable packages in reqreinst state.""" + reqreinst = set() + get_candidate_ver = self._depcache.get_candidate_ver + states = frozenset( + (apt_pkg.INSTSTATE_REINSTREQ, apt_pkg.INSTSTATE_HOLD_REINSTREQ) + ) + for pkg in self._cache.packages: + cand = get_candidate_ver(pkg) + if cand and not cand.downloadable and pkg.inst_state in states: + reqreinst.add(pkg.get_fullname(pretty=True)) + return reqreinst + + def _run_fetcher( + self, fetcher: apt_pkg.Acquire, allow_unauthenticated: bool | None + ) -> int: + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + + untrusted = [item for item in fetcher.items if not item.is_trusted] + if untrusted and not allow_unauthenticated: + raise UntrustedException( + "Untrusted packages:\n%s" % "\n".join(i.desc_uri for i in untrusted) + ) + + # do the actual fetching + res = fetcher.run() + + # now check the result (this is the code from apt-get.cc) + failed = False + err_msg = "" + for item in fetcher.items: + if item.status == item.STAT_DONE: + continue + if item.STAT_IDLE: + continue + err_msg += f"Failed to fetch {item.desc_uri} {item.error_text}\n" + failed = True + + # we raise a exception if the download failed or it was cancelt + if res == fetcher.RESULT_CANCELLED: + raise FetchCancelledException(err_msg) + elif failed: + raise FetchFailedException(err_msg) + return res + + def _fetch_archives( + self, + fetcher: apt_pkg.Acquire, + pm: apt_pkg.PackageManager, + allow_unauthenticated: bool | None = None, + ) -> int: + """fetch the needed archives""" + if self._records is None: + raise CacheClosedException("Cache object used after close() called") + + # this may as well throw a SystemError exception + if not pm.get_archives(fetcher, self._list, self._records): + return False + + # now run the fetcher, throw exception if something fails to be + # fetched + return self._run_fetcher(fetcher, allow_unauthenticated) + + def fetch_archives( + self, + progress: AcquireProgress | None = None, + fetcher: apt_pkg.Acquire | None = None, + allow_unauthenticated: bool | None = None, + ) -> int: + """Fetch the archives for all packages marked for install/upgrade. + + You can specify either an :class:`apt.progress.base.AcquireProgress()` + object for the parameter *progress*, or specify an already + existing :class:`apt_pkg.Acquire` object for the parameter *fetcher*. + + The return value of the function is undefined. If an error occurred, + an exception of type :class:`FetchFailedException` or + :class:`FetchCancelledException` is raised. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + + .. versionadded:: 0.8.0 + """ + if progress is not None and fetcher is not None: + raise ValueError("Takes a progress or a an Acquire object") + if progress is None: + progress = apt.progress.text.AcquireProgress() + if fetcher is None: + fetcher = apt_pkg.Acquire(progress) + + with self._archive_lock: + return self._fetch_archives( + fetcher, apt_pkg.PackageManager(self._depcache), allow_unauthenticated + ) + + def is_virtual_package(self, pkgname: str) -> bool: + """Return whether the package is a virtual package.""" + try: + pkg = self._cache[pkgname] + except KeyError: + return False + else: + return bool(pkg.has_provides and not pkg.has_versions) + + def get_providing_packages( + self, + pkgname: str, + candidate_only: bool = True, + include_nonvirtual: bool = False, + ) -> list[Package]: + """Return a list of all packages providing a package. + + Return a list of packages which provide the virtual package of the + specified name. + + If 'candidate_only' is False, return all packages with at + least one version providing the virtual package. Otherwise, + return only those packages where the candidate version + provides the virtual package. + + If 'include_nonvirtual' is True then it will search for all + packages providing pkgname, even if pkgname is not itself + a virtual pkg. + """ + + providers: set[Package] = set() + get_candidate_ver = self._depcache.get_candidate_ver + try: + vp = self._cache[pkgname] + if vp.has_versions and not include_nonvirtual: + return list(providers) + except KeyError: + return list(providers) + + for provides, providesver, version in vp.provides_list: + rawpkg = version.parent_pkg + if not candidate_only or (version == get_candidate_ver(rawpkg)): + providers.add(self._rawpkg_to_pkg(rawpkg)) + return list(providers) + + def update( + self, + fetch_progress: AcquireProgress | None = None, + pulse_interval: int = 0, + raise_on_error: bool = True, + sources_list: str | None = None, + ) -> int: + """Run the equivalent of apt-get update. + + You probably want to call open() afterwards, in order to utilise the + new cache. Otherwise, the old cache will be used which can lead to + strange bugs. + + The first parameter *fetch_progress* may be set to an instance of + apt.progress.FetchProgress, the default is apt.progress.FetchProgress() + . + sources_list -- Update a alternative sources.list than the default. + Note that the sources.list.d directory is ignored in this case + """ + with _WrappedLock(apt_pkg.config.find_dir("Dir::State::Lists")): + if sources_list: + old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist") + old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts") + old_cleanup = apt_pkg.config.find("APT::List-Cleanup") + apt_pkg.config.set( + "Dir::Etc::sourcelist", os.path.abspath(sources_list) + ) + apt_pkg.config.set("Dir::Etc::sourceparts", "xxx") + apt_pkg.config.set("APT::List-Cleanup", "0") + slist = apt_pkg.SourceList() + slist.read_main_list() + else: + slist = self._list + + try: + if fetch_progress is None: + fetch_progress = apt.progress.base.AcquireProgress() + try: + res = self._cache.update(fetch_progress, slist, pulse_interval) + except SystemError as e: + raise FetchFailedException(e) + if not res and raise_on_error: + raise FetchFailedException() + else: + return res + finally: + if sources_list: + apt_pkg.config.set("Dir::Etc::sourcelist", old_sources_list) + apt_pkg.config.set("Dir::Etc::sourceparts", old_sources_list_d) + apt_pkg.config.set("APT::List-Cleanup", old_cleanup) + + def install_archives( + self, pm: apt_pkg.PackageManager, install_progress: InstallProgress + ) -> int: + """ + The first parameter *pm* refers to an object returned by + apt_pkg.PackageManager(). + + The second parameter *install_progress* refers to an InstallProgress() + object of the module apt.progress. + + This releases a system lock in newer versions, if there is any, + and reestablishes it afterwards. + """ + # compat with older API + try: + install_progress.startUpdate() # type: ignore + except AttributeError: + install_progress.start_update() + + did_unlock = apt_pkg.pkgsystem_is_locked() + if did_unlock: + apt_pkg.pkgsystem_unlock_inner() + + try: + res = install_progress.run(pm) + finally: + if did_unlock: + apt_pkg.pkgsystem_lock_inner() + + try: + install_progress.finishUpdate() # type: ignore + except AttributeError: + install_progress.finish_update() + return res + + def commit( + self, + fetch_progress: AcquireProgress | None = None, + install_progress: InstallProgress | None = None, + allow_unauthenticated: bool | None = None, + ) -> bool: + """Apply the marked changes to the cache. + + The first parameter, *fetch_progress*, refers to a FetchProgress() + object as found in apt.progress, the default being + apt.progress.FetchProgress(). + + The second parameter, *install_progress*, is a + apt.progress.InstallProgress() object. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + """ + # FIXME: + # use the new acquire/pkgmanager interface here, + # raise exceptions when a download or install fails + # and send proper error strings to the application. + # Current a failed download will just display "error" + # which is less than optimal! + + if fetch_progress is None: + fetch_progress = apt.progress.base.AcquireProgress() + if install_progress is None: + install_progress = apt.progress.base.InstallProgress() + + assert install_progress is not None + + with apt_pkg.SystemLock(): + pm = apt_pkg.PackageManager(self._depcache) + fetcher = apt_pkg.Acquire(fetch_progress) + with self._archive_lock: + while True: + # fetch archives first + res = self._fetch_archives(fetcher, pm, allow_unauthenticated) + + # then install + res = self.install_archives(pm, install_progress) + if res == pm.RESULT_COMPLETED: + break + elif res == pm.RESULT_FAILED: + raise SystemError("installArchives() failed") + elif res == pm.RESULT_INCOMPLETE: + pass + else: + raise SystemError( + "internal-error: unknown result " + "code from InstallArchives: %s" % res + ) + # reload the fetcher for media swaping + fetcher.shutdown() + return res == pm.RESULT_COMPLETED + + def clear(self) -> None: + """Unmark all changes""" + self._depcache.init() + + # cache changes + + def cache_post_change(self) -> None: + "called internally if the cache has changed, emit a signal then" + self._run_callbacks("cache_post_change") + + def cache_pre_change(self) -> None: + """called internally if the cache is about to change, emit + a signal then""" + self._run_callbacks("cache_pre_change") + + def connect(self, name: str, callback: Callable[..., None] | str) -> None: + """Connect to a signal. + + .. deprecated:: 1.0 + + Please use connect2() instead, as this function is very + likely to cause a memory leak. + """ + if callback != "_inc_changes_count": + warnings.warn( + "connect() likely causes a reference" " cycle, use connect2() instead", + RuntimeWarning, + 2, + ) + if name not in self._callbacks: + self._callbacks[name] = [] + self._callbacks[name].append(callback) + + def connect2( + self, name: str, callback: Callable[..., Any], *args: object, **kwds: object + ) -> None: + """Connect to a signal. + + The callback will be passed the cache as an argument, and + any arguments passed to this function. Make sure that, if you + pass a method of a class as your callback, your class does not + contain a reference to the cache. + + Cyclic references to the cache can cause issues if the Cache object + is replaced by a new one, because the cache keeps a lot of objects and + tens of open file descriptors. + + currently only used for cache_{post,pre}_{changed,open}. + + .. versionadded:: 1.0 + """ + if name not in self._callbacks2: + self._callbacks2[name] = [] + self._callbacks2[name].append((callback, args, kwds)) + + def actiongroup(self) -> apt_pkg.ActionGroup: + """Return an `ActionGroup` object for the current cache. + + Action groups can be used to speedup actions. The action group is + active as soon as it is created, and disabled when the object is + deleted or when release() is called. + + You can use the action group as a context manager, this is the + recommended way:: + + with cache.actiongroup(): + for package in my_selected_packages: + package.mark_install() + + This way, the action group is automatically released as soon as the + with statement block is left. It also has the benefit of making it + clear which parts of the code run with a action group and which + don't. + """ + return apt_pkg.ActionGroup(self._depcache) + + @property + def dpkg_journal_dirty(self) -> bool: + """Return True if the dpkg was interrupted + + All dpkg operations will fail until this is fixed, the action to + fix the system if dpkg got interrupted is to run + 'dpkg --configure -a' as root. + """ + dpkg_status_dir = os.path.dirname( + apt_pkg.config.find_file("Dir::State::status") + ) + for f in os.listdir(os.path.join(dpkg_status_dir, "updates")): + if fnmatch.fnmatch(f, "[0-9]*"): + return True + return False + + @property + def broken_count(self) -> int: + """Return the number of packages with broken dependencies.""" + return self._depcache.broken_count + + @property + def delete_count(self) -> int: + """Return the number of packages marked for deletion.""" + return self._depcache.del_count + + @property + def install_count(self) -> int: + """Return the number of packages marked for installation.""" + return self._depcache.inst_count + + @property + def keep_count(self) -> int: + """Return the number of packages marked as keep.""" + return self._depcache.keep_count + + +class ProblemResolver: + """Resolve problems due to dependencies and conflicts. + + The first argument 'cache' is an instance of apt.Cache. + """ + + def __init__(self, cache: Cache) -> None: + self._resolver = apt_pkg.ProblemResolver(cache._depcache) + self._cache = cache + + def clear(self, package: Package) -> None: + """Reset the package to the default state.""" + self._resolver.clear(package._pkg) + + def protect(self, package: Package) -> None: + """Protect a package so it won't be removed.""" + self._resolver.protect(package._pkg) + + def remove(self, package: Package) -> None: + """Mark a package for removal.""" + self._resolver.remove(package._pkg) + + def resolve(self) -> None: + """Resolve dependencies, try to remove packages where needed.""" + self._cache.cache_pre_change() + self._resolver.resolve() + self._cache.cache_post_change() + + def resolve_by_keep(self) -> None: + """Resolve dependencies, do not try to remove packages.""" + self._cache.cache_pre_change() + self._resolver.resolve_by_keep() + self._cache.cache_post_change() + + def keep_phased_updates(self) -> None: + """Keep back phased updates.""" + self._cache.cache_pre_change() + self._resolver.keep_phased_updates() + self._cache.cache_post_change() + + +# ----------------------------- experimental interface + + +class Filter: + """Filter base class""" + + def apply(self, pkg: Package) -> bool: + """Filter function, return True if the package matchs a + filter criteria and False otherwise + """ + return True + + +class MarkedChangesFilter(Filter): + """Filter that returns all marked changes""" + + def apply(self, pkg: Package) -> bool: + if pkg.marked_install or pkg.marked_delete or pkg.marked_upgrade: + return True + else: + return False + + +class InstalledFilter(Filter): + """Filter that returns all installed packages. + + .. versionadded:: 1.0.0 + """ + + def apply(self, pkg: Package) -> bool: + return pkg.is_installed + + +class _FilteredCacheHelper: + """Helper class for FilteredCache to break a reference cycle.""" + + def __init__(self, cache: Cache) -> None: + # Do not keep a reference to the cache, or you have a cycle! + + self._filtered: dict[str, bool] = {} + self._filters: list[Filter] = [] + cache.connect2("cache_post_change", self.filter_cache_post_change) + cache.connect2("cache_post_open", self.filter_cache_post_change) + + def _reapply_filter(self, cache: Cache) -> None: + "internal helper to refilter" + # Do not keep a reference to the cache, or you have a cycle! + self._filtered = {} + for pkg in cache: + for f in self._filters: + if f.apply(pkg): + self._filtered[pkg.name] = True + break + + def set_filter(self, filter: Filter) -> None: + """Set the current active filter.""" + self._filters = [] + self._filters.append(filter) + + def filter_cache_post_change(self, cache: Cache) -> None: + """Called internally if the cache changes, emit a signal then.""" + # Do not keep a reference to the cache, or you have a cycle! + self._reapply_filter(cache) + + +class FilteredCache: + """A package cache that is filtered. + + Can work on a existing cache or create a new one + """ + + def __init__( + self, cache: Cache | None = None, progress: OpProgress | None = None + ) -> None: + if cache is None: + self.cache = Cache(progress) + else: + self.cache = cache + self._helper = _FilteredCacheHelper(self.cache) + + def __len__(self) -> int: + return len(self._helper._filtered) + + def __getitem__(self, key: str) -> Package: + return self.cache[key] + + def __iter__(self) -> Iterator[Package]: + for pkgname in self._helper._filtered: + yield self.cache[pkgname] + + def keys(self) -> KeysView[str]: + return self._helper._filtered.keys() + + def has_key(self, key: object) -> bool: + return key in self + + def __contains__(self, key: object) -> bool: + try: + # Normalize package name for multi arch + return self.cache[key].name in self._helper._filtered + except KeyError: + return False + + def set_filter(self, filter: Filter) -> None: + """Set the current active filter.""" + self._helper.set_filter(filter) + self.cache.cache_post_change() + + def filter_cache_post_change(self) -> None: + """Called internally if the cache changes, emit a signal then.""" + self._helper.filter_cache_post_change(self.cache) + + def __getattr__(self, key: str) -> Any: + """we try to look exactly like a real cache.""" + return getattr(self.cache, key) + + +def cache_pre_changed(cache: Cache) -> None: + print("cache pre changed") + + +def cache_post_changed(cache: Cache) -> None: + print("cache post changed") + + +def _test() -> None: + """Internal test code.""" + print("Cache self test") + apt_pkg.init() + cache = Cache(apt.progress.text.OpProgress()) + cache.connect2("cache_pre_change", cache_pre_changed) + cache.connect2("cache_post_change", cache_post_changed) + print("aptitude" in cache) + pkg = cache["aptitude"] + print(pkg.name) + print(len(cache)) + + for pkgname in cache.keys(): + assert cache[pkgname].name == pkgname + + cache.upgrade() + changes = cache.get_changes() + print(len(changes)) + for pkg in changes: + assert pkg.name + + # see if fetching works + for dirname in ["/tmp/pytest", "/tmp/pytest/partial"]: + if not os.path.exists(dirname): + os.mkdir(dirname) + apt_pkg.config.set("Dir::Cache::Archives", "/tmp/pytest") + pm = apt_pkg.PackageManager(cache._depcache) + fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress()) + cache._fetch_archives(fetcher, pm, None) + # sys.exit(1) + + print("Testing filtered cache (argument is old cache)") + filtered = FilteredCache(cache) + filtered.cache.connect2("cache_pre_change", cache_pre_changed) + filtered.cache.connect2("cache_post_change", cache_post_changed) + filtered.cache.upgrade() + filtered.set_filter(MarkedChangesFilter()) + print(len(filtered)) + for pkgname in filtered.keys(): + assert pkgname == filtered[pkgname].name + + print(len(filtered)) + + print("Testing filtered cache (no argument)") + filtered = FilteredCache(progress=apt.progress.base.OpProgress()) + filtered.cache.connect2("cache_pre_change", cache_pre_changed) + filtered.cache.connect2("cache_post_change", cache_post_changed) + filtered.cache.upgrade() + filtered.set_filter(MarkedChangesFilter()) + print(len(filtered)) + for pkgname in filtered.keys(): + assert pkgname == filtered[pkgname].name + + print(len(filtered)) + + +if __name__ == "__main__": + _test() diff --git a/apt/cdrom.py b/apt/cdrom.py new file mode 100644 index 0000000..dc15c5b --- /dev/null +++ b/apt/cdrom.py @@ -0,0 +1,91 @@ +# cdrom.py - CDROM handling +# +# Copyright (c) 2005-2009 Canonical +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Classes related to cdrom handling.""" +import glob + +import apt_pkg + +from apt.progress.base import CdromProgress + + +class Cdrom(apt_pkg.Cdrom): + """Support for apt-cdrom like features. + + This class has several optional parameters for initialisation, which may + be used to influence the behaviour of the object: + + The optional parameter `progress` is a CdromProgress() subclass, which will + ask for the correct cdrom, etc. If not specified or None, a CdromProgress() + object will be used. + + The optional parameter `mountpoint` may be used to specify an alternative + mountpoint. + + If the optional parameter `nomount` is True, the cdroms will not be + mounted. This is the default behaviour. + """ + + def __init__( + self, + progress: CdromProgress | None = None, + mountpoint: str | None = None, + nomount: bool = True, + ) -> None: + apt_pkg.Cdrom.__init__(self) + if progress is None: + self._progress = CdromProgress() + else: + self._progress = progress + # see if we have a alternative mountpoint + if mountpoint is not None: + apt_pkg.config.set("Acquire::cdrom::mount", mountpoint) + # do not mess with mount points by default + if nomount: + apt_pkg.config.set("APT::CDROM::NoMount", "true") + else: + apt_pkg.config.set("APT::CDROM::NoMount", "false") + + def add(self, progress: CdromProgress | None = None) -> bool: + """Add cdrom to the sources.list.""" + return apt_pkg.Cdrom.add(self, progress or self._progress) + + def ident(self, progress: CdromProgress | None = None) -> str: + """Identify the cdrom.""" + return apt_pkg.Cdrom.ident(self, progress or self._progress) + + @property + def in_sources_list(self) -> bool: + """Check if the cdrom is already in the current sources.list.""" + cd_id = self.ident() + if cd_id is None: + # FIXME: throw exception instead + return False + # Get a list of files + src = glob.glob(apt_pkg.config.find_dir("Dir::Etc::sourceparts") + "*") + src.append(apt_pkg.config.find_file("Dir::Etc::sourcelist")) + # Check each file + for fname in src: + with open(fname) as fobj: + for line in fobj: + if not line.lstrip().startswith("#") and cd_id in line: + return True + return False diff --git a/apt/debfile.py b/apt/debfile.py new file mode 100644 index 0000000..b3ef733 --- /dev/null +++ b/apt/debfile.py @@ -0,0 +1,861 @@ +# Copyright (c) 2005-2010 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Classes for working with locally available Debian packages.""" + +import gzip +import os +import sys +from collections.abc import Iterable +from io import BytesIO +from typing import cast + +import apt_inst +import apt_pkg +from apt_pkg import gettext as _ + +import apt + + +class NoDebArchiveException(IOError): + """Exception which is raised if a file is no Debian archive.""" + + +class DebPackage: + """A Debian Package (.deb file).""" + + # Constants for comparing the local package file with the version + # in the cache + (VERSION_NONE, VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER) = range(4) + + debug = 0 + + def __init__( + self, filename: str | None = None, cache: apt.Cache | None = None + ) -> None: + if cache is None: + cache = apt.Cache() + self._cache = cache + self._debfile = cast(apt_inst.DebFile, None) + self.pkgname = "" + self.filename: str | None = None + self._sections: dict[str, str] | apt_pkg.TagSection[str] = {} # noqa + self._need_pkgs: list[str] = [] + self._check_was_run = False + self._failure_string = "" + self._multiarch: str | None = None + if filename: + self.open(filename) + + def open(self, filename: str) -> None: + """open given debfile""" + self._dbg(3, "open '%s'" % filename) + self._need_pkgs = [] + self._installed_conflicts: set[str] = set() + self._failure_string = "" + self.filename = filename + self._debfile = apt_inst.DebFile(self.filename) + control = self._debfile.control.extractdata("control") + self._sections = apt_pkg.TagSection(control) + self.pkgname = self._sections["Package"] + self._check_was_run = False + + def __getitem__(self, key: str) -> str: + return self._sections[key] + + def __contains__(self, key: str) -> bool: + return key in self._sections + + @property + def filelist(self) -> list[str]: + """return the list of files in the deb.""" + files = [] + try: + self._debfile.data.go(lambda item, data: files.append(item.name)) + except SystemError: + return [_("List of files for '%s' could not be read") % self.filename] + return files + + @property + def control_filelist(self) -> list[str]: + """return the list of files in control.tar.gz""" + control = [] + try: + self._debfile.control.go(lambda item, data: control.append(item.name)) + except SystemError: + return [ + _("List of control files for '%s' could not be read") % self.filename + ] + return sorted(control) + + # helper that will return a pkgname with a multiarch suffix if needed + def _maybe_append_multiarch_suffix( + self, pkgname: str, in_conflict_checking: bool = False + ) -> str: + # trivial cases + if ":" in pkgname: + return pkgname + if not self._multiarch: + return pkgname + elif self._cache.is_virtual_package(pkgname): + return pkgname + elif ( + pkgname in self._cache + and self._cache[pkgname].candidate is not None + and cast(apt.package.Version, self._cache[pkgname].candidate).architecture + == "all" + ): + return pkgname + # now do the real multiarch checking + multiarch_pkgname = f"{pkgname}:{self._multiarch}" + # the upper layers will handle this + if multiarch_pkgname not in self._cache: + return multiarch_pkgname + multiarch_pkg = self._cache[multiarch_pkgname] + if multiarch_pkg.candidate is None: + return multiarch_pkgname + # now check the multiarch state + cand = multiarch_pkg.candidate._cand + # print pkgname, multiarch_pkgname, cand.multi_arch + # the default is to add the suffix, unless its a pkg that can satify + # foreign dependencies + if cand.multi_arch & cand.MULTI_ARCH_FOREIGN: + return pkgname + # for conflicts we need a special case here, any not multiarch enabled + # package has a implicit conflict + if in_conflict_checking and not (cand.multi_arch & cand.MULTI_ARCH_SAME): + return pkgname + return multiarch_pkgname + + def _is_or_group_satisfied(self, or_group: list[tuple[str, str, str]]) -> bool: + """Return True if at least one dependency of the or-group is satisfied. + + This method gets an 'or_group' and analyzes if at least one dependency + of this group is already satisfied. + """ + self._dbg(2, "_checkOrGroup(): %s " % (or_group)) + + for dep in or_group: + depname = dep[0] + ver = dep[1] + oper = dep[2] + + # multiarch + depname = self._maybe_append_multiarch_suffix(depname) + + # check for virtual pkgs + if depname not in self._cache: + if self._cache.is_virtual_package(depname): + self._dbg( + 3, "_is_or_group_satisfied(): %s is virtual dep" % depname + ) + for pkg in self._cache.get_providing_packages(depname): + if pkg.is_installed: + return True + continue + # check real dependency + inst = self._cache[depname].installed + if inst is not None and apt_pkg.check_dep(inst.version, oper, ver): + return True + + # if no real dependency is installed, check if there is + # a package installed that provides this dependency + # (e.g. scrollkeeper dependecies are provided by rarian-compat) + # but only do that if there is no version required in the + # dependency (we do not supprot versionized dependencies) + if not oper: + for ppkg in self._cache.get_providing_packages( + depname, include_nonvirtual=True + ): + if ppkg.is_installed: + self._dbg( + 3, + "found installed '%s' that provides '%s'" + % (ppkg.name, depname), + ) + return True + return False + + def _satisfy_or_group(self, or_group: list[tuple[str, str, str]]) -> bool: + """Try to satisfy the or_group.""" + for dep in or_group: + depname, ver, oper = dep + + # multiarch + depname = self._maybe_append_multiarch_suffix(depname) + + # if we don't have it in the cache, it may be virtual + if depname not in self._cache: + if not self._cache.is_virtual_package(depname): + continue + providers = self._cache.get_providing_packages(depname) + # if a package just has a single virtual provider, we + # just pick that (just like apt) + if len(providers) != 1: + continue + depname = providers[0].name + + # now check if we can satisfy the deps with the candidate(s) + # in the cache + pkg = self._cache[depname] + cand = self._cache._depcache.get_candidate_ver(pkg._pkg) + if not cand: + continue + if not apt_pkg.check_dep(cand.ver_str, oper, ver): + continue + + # check if we need to install it + self._dbg(2, "Need to get: %s" % depname) + self._need_pkgs.append(depname) + return True + + # if we reach this point, we failed + or_str = "" + for dep in or_group: + or_str += dep[0] + if ver and oper: + or_str += f" ({dep[2]} {dep[1]})" + if dep != or_group[len(or_group) - 1]: + or_str += "|" + self._failure_string += _("Dependency is not satisfiable: %s\n") % or_str + return False + + def _check_single_pkg_conflict(self, pkgname: str, ver: str, oper: str) -> bool: + """Return True if a pkg conflicts with a real installed/marked pkg.""" + # FIXME: deal with conflicts against its own provides + # (e.g. Provides: ftp-server, Conflicts: ftp-server) + self._dbg( + 3, + "_check_single_pkg_conflict() pkg='%s' ver='%s' oper='%s'" + % (pkgname, ver, oper), + ) + pkg = self._cache[pkgname] + if pkg.is_installed: + assert pkg.installed is not None + pkgver = pkg.installed.version + elif pkg.marked_install: + assert pkg.candidate is not None + pkgver = pkg.candidate.version + else: + return False + # print "pkg: %s" % pkgname + # print "ver: %s" % ver + # print "pkgver: %s " % pkgver + # print "oper: %s " % oper + if apt_pkg.check_dep(pkgver, oper, ver) and not self.replaces_real_pkg( + pkgname, oper, ver + ): + self._failure_string += ( + _("Conflicts with the installed package " "'%s'") % pkg.name + ) + self._dbg(3, "conflicts with installed pkg '%s'" % pkg.name) + return True + return False + + def _check_conflicts_or_group(self, or_group: list[tuple[str, str, str]]) -> bool: + """Check the or-group for conflicts with installed pkgs.""" + self._dbg(2, "_check_conflicts_or_group(): %s " % (or_group)) + for dep in or_group: + depname = dep[0] + ver = dep[1] + oper = dep[2] + + # FIXME: is this good enough? i.e. will apt always populate + # the cache with conflicting pkgnames for our arch? + depname = self._maybe_append_multiarch_suffix( + depname, in_conflict_checking=True + ) + + # check conflicts with virtual pkgs + if depname not in self._cache: + # FIXME: we have to check for virtual replaces here as + # well (to pass tests/gdebi-test8.deb) + if self._cache.is_virtual_package(depname): + for pkg in self._cache.get_providing_packages(depname): + self._dbg(3, "conflicts virtual check: %s" % pkg.name) + # P/C/R on virtal pkg, e.g. ftpd + if self.pkgname == pkg.name: + self._dbg(3, "conflict on self, ignoring") + continue + if self._check_single_pkg_conflict(pkg.name, ver, oper): + self._installed_conflicts.add(pkg.name) + continue + if self._check_single_pkg_conflict(depname, ver, oper): + self._installed_conflicts.add(depname) + return bool(self._installed_conflicts) + + @property + def conflicts(self) -> list[list[tuple[str, str, str]]]: + """List of packages conflicting with this package.""" + key = "Conflicts" + try: + return apt_pkg.parse_depends(self._sections[key], False) + except KeyError: + return [] + + @property + def depends(self) -> list[list[tuple[str, str, str]]]: + """List of packages on which this package depends on.""" + depends = [] + # find depends + for key in "Depends", "Pre-Depends": + try: + depends.extend(apt_pkg.parse_depends(self._sections[key], False)) + except KeyError: + pass + return depends + + @property + def provides(self) -> list[list[tuple[str, str, str]]]: + """List of virtual packages which are provided by this package.""" + key = "Provides" + try: + return apt_pkg.parse_depends(self._sections[key], False) + except KeyError: + return [] + + @property + def replaces(self) -> list[list[tuple[str, str, str]]]: + """List of packages which are replaced by this package.""" + key = "Replaces" + try: + return apt_pkg.parse_depends(self._sections[key], False) + except KeyError: + return [] + + def replaces_real_pkg(self, pkgname: str, oper: str, ver: str) -> bool: + """Return True if a given non-virtual package is replaced. + + Return True if the deb packages replaces a real (not virtual) + packages named (pkgname, oper, ver). + """ + self._dbg(3, f"replaces_real_pkg() {pkgname} {oper} {ver}") + pkg = self._cache[pkgname] + pkgver: str | None = None + if pkg.is_installed: + assert pkg.installed is not None + pkgver = pkg.installed.version + elif pkg.marked_install: + assert pkg.candidate is not None + pkgver = pkg.candidate.version + else: + pkgver = None + for or_group in self.replaces: + for name, ver, oper in or_group: + if name == pkgname and ( + pkgver is None or apt_pkg.check_dep(pkgver, oper, ver) + ): + self._dbg( + 3, + "we have a replaces in our package for the " + "conflict against '%s'" % (pkgname), + ) + return True + return False + + def check_conflicts(self) -> bool: + """Check if there are conflicts with existing or selected packages. + + Check if the package conflicts with a existing or to be installed + package. Return True if the pkg is OK. + """ + res = True + for or_group in self.conflicts: + if self._check_conflicts_or_group(or_group): + # print "Conflicts with a exisiting pkg!" + # self._failure_string = "Conflicts with a exisiting pkg!" + res = False + return res + + def check_breaks_existing_packages(self) -> bool: + """ + check if installing the package would break exsisting + package on the system, e.g. system has: + smc depends on smc-data (= 1.4) + and user tries to installs smc-data 1.6 + """ + # show progress information as this step may take some time + size = float(len(self._cache)) + steps = max(int(size / 50), 1) + debver = self._sections["Version"] + debarch = self._sections["Architecture"] + # store what we provide so that we can later check against that + provides = [x[0][0] for x in self.provides] + for i, pkg in enumerate(self._cache): + if i % steps == 0: + self._cache.op_progress.update(float(i) / size * 100.0) + if not pkg.is_installed: + continue + assert pkg.installed is not None + # check if the exising dependencies are still satisfied + # with the package + ver = pkg._pkg.current_ver + for dep_or in pkg.installed.dependencies: + for dep in dep_or.or_dependencies: + if dep.name == self.pkgname: + if not apt_pkg.check_dep(debver, dep.relation, dep.version): + self._dbg(2, "would break (depends) %s" % pkg.name) + # TRANSLATORS: the first '%s' is the package that + # breaks, the second the dependency that makes it + # break, the third the relation (e.g. >=) and the + # latest the version for the releation + self._failure_string += _( + "Breaks existing package '%(pkgname)s' " + "dependency %(depname)s " + "(%(deprelation)s %(depversion)s)" + ) % { + "pkgname": pkg.name, + "depname": dep.name, + "deprelation": dep.relation, + "depversion": dep.version, + } + self._cache.op_progress.done() + return False + # now check if there are conflicts against this package on + # the existing system + if "Conflicts" in ver.depends_list: + for conflicts_ver_list in ver.depends_list["Conflicts"]: + for c_or in conflicts_ver_list: + if ( + c_or.target_pkg.name == self.pkgname + and c_or.target_pkg.architecture == debarch + ): + if apt_pkg.check_dep( + debver, c_or.comp_type, c_or.target_ver + ): + self._dbg(2, "would break (conflicts) %s" % pkg.name) + # TRANSLATORS: the first '%s' is the package + # that conflicts, the second the packagename + # that it conflicts with (so the name of the + # deb the user tries to install), the third is + # the relation (e.g. >=) and the last is the + # version for the relation + self._failure_string += _( + "Breaks existing package '%(pkgname)s' " + "conflict: %(targetpkg)s " + "(%(comptype)s %(targetver)s)" + ) % { + "pkgname": pkg.name, + "targetpkg": c_or.target_pkg.name, + "comptype": c_or.comp_type, + "targetver": c_or.target_ver, + } + self._cache.op_progress.done() + return False + if ( + c_or.target_pkg.name in provides + and self.pkgname != pkg.name + ): + self._dbg(2, "would break (conflicts) %s" % provides) + self._failure_string += _( + "Breaks existing package '%(pkgname)s' " + "that conflict: '%(targetpkg)s'. But the " + "'%(debfile)s' provides it via: " + "'%(provides)s'" + ) % { + "provides": ",".join(provides), + "debfile": self.filename, + "targetpkg": c_or.target_pkg.name, + "pkgname": pkg.name, + } + self._cache.op_progress.done() + return False + self._cache.op_progress.done() + return True + + def compare_to_version_in_cache(self, use_installed: bool = True) -> int: + """Compare the package to the version available in the cache. + + Checks if the package is already installed or availabe in the cache + and if so in what version, returns one of (VERSION_NONE, + VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER). + """ + self._dbg(3, "compare_to_version_in_cache") + pkgname = self._sections["Package"] + architecture = self._sections["Architecture"] + + # Arch qualify the package name + pkgname = ":".join([pkgname, architecture]) + + debver = self._sections["Version"] + self._dbg(1, "debver: %s" % debver) + if pkgname in self._cache: + pkg = self._cache[pkgname] + if use_installed and pkg.installed is not None: + cachever = pkg.installed.version + elif not use_installed and pkg.candidate is not None: + cachever = pkg.candidate.version + else: + return self.VERSION_NONE + if cachever is not None: + cmp = apt_pkg.version_compare(cachever, debver) + self._dbg(1, "CompareVersion(debver,instver): %s" % cmp) + if cmp == 0: + return self.VERSION_SAME + elif cmp < 0: + return self.VERSION_NEWER + elif cmp > 0: + return self.VERSION_OUTDATED + return self.VERSION_NONE + + def check(self, allow_downgrade: bool = False) -> bool: + """Check if the package is installable.""" + self._dbg(3, "check") + + self._check_was_run = True + + # check arch + if "Architecture" not in self._sections: + self._dbg(1, "ERROR: no architecture field") + self._failure_string = _("No Architecture field in the package") + return False + arch = self._sections["Architecture"] + if arch != "all" and arch != apt_pkg.config.find("APT::Architecture"): + if arch in apt_pkg.get_architectures(): + self._multiarch = arch + self.pkgname = f"{self.pkgname}:{self._multiarch}" + self._dbg(1, "Found multiarch arch: '%s'" % arch) + else: + self._dbg(1, "ERROR: Wrong architecture dude!") + self._failure_string = ( + _( + "Wrong architecture '%s' " + "-- Run dpkg --add-architecture to " + "add it and update afterwards" + ) + % arch + ) + return False + + # check version + if ( + not allow_downgrade + and self.compare_to_version_in_cache() == self.VERSION_OUTDATED + ): + if self._cache[self.pkgname].installed: + # the deb is older than the installed + self._failure_string = _("A later version is already installed") + return False + + # FIXME: this sort of error handling sux + self._failure_string = "" + + # check conflicts + if not self.check_conflicts(): + return False + + # check if installing it would break anything on the + # current system + if not self.check_breaks_existing_packages(): + return False + + # try to satisfy the dependencies + if not self._satisfy_depends(self.depends): + return False + + # check for conflicts again (this time with the packages that are + # makeed for install) + if not self.check_conflicts(): + return False + + if self._cache._depcache.broken_count > 0: + self._failure_string = _( + "Failed to satisfy all dependencies " "(broken cache)" + ) + # clean the cache again + self._cache.clear() + return False + return True + + def satisfy_depends_str(self, dependsstr: str) -> bool: + """Satisfy the dependencies in the given string.""" + return self._satisfy_depends(apt_pkg.parse_depends(dependsstr, False)) + + def _satisfy_depends(self, depends: list[list[tuple[str, str, str]]]) -> bool: + """Satisfy the dependencies.""" + # turn off MarkAndSweep via a action group (if available) + try: + _actiongroup = apt_pkg.ActionGroup(self._cache._depcache) + _actiongroup # pyflakes + except AttributeError: + pass + # check depends + for or_group in depends: + if not self._is_or_group_satisfied(or_group): + if not self._satisfy_or_group(or_group): + return False + # now try it out in the cache + for pkg in self._need_pkgs: + try: + self._cache[pkg].mark_install(from_user=False) + except SystemError: + self._failure_string = _("Cannot install '%s'") % pkg + self._cache.clear() + return False + return True + + @property + def missing_deps(self) -> list[str]: + """Return missing dependencies.""" + self._dbg(1, "Installing: %s" % self._need_pkgs) + if not self._check_was_run: + raise AttributeError("property only available after check() was run") + return self._need_pkgs + + @property + def required_changes(self) -> tuple[list[str], list[str], list[str]]: + """Get the changes required to satisfy the dependencies. + + Returns: a tuple with (install, remove, unauthenticated) + """ + install = [] + remove = [] + unauthenticated = [] + if not self._check_was_run: + raise AttributeError("property only available after check() was run") + for pkg in self._cache: + if pkg.marked_install or pkg.marked_upgrade: + assert pkg.candidate is not None + install.append(pkg.name) + # check authentication, one authenticated origin is enough + # libapt will skip non-authenticated origins then + authenticated = False + for origin in pkg.candidate.origins: + authenticated |= origin.trusted + if not authenticated: + unauthenticated.append(pkg.name) + if pkg.marked_delete: + remove.append(pkg.name) + return (install, remove, unauthenticated) + + @staticmethod + def to_hex(in_data: str) -> str: + hex = "" + for i, c in enumerate(in_data): + if i % 80 == 0: + hex += "\n" + hex += "%2.2x " % ord(c) + return hex + + @staticmethod + def to_strish(in_data: str | Iterable[int]) -> str: + s = "" + # py2 compat, in_data is type string + if isinstance(in_data, str): + for c in in_data: + if ord(c) < 10 or ord(c) > 127: + s += " " + else: + s += c + # py3 compat, in_data is type bytes + else: + for b in in_data: + if b < 10 or b > 127: + s += " " + else: + s += chr(b) + return s + + def _get_content( + self, + part: apt_inst.TarFile, + name: str, + auto_decompress: bool = True, + auto_hex: bool = True, + ) -> str: + if name.startswith("./"): + name = name[2:] + data = part.extractdata(name) + # check for zip content + if name.endswith(".gz") and auto_decompress: + io = BytesIO(data) + gz = gzip.GzipFile(fileobj=io) + data = _("Automatically decompressed:\n\n").encode("utf-8") + data += gz.read() + # auto-convert to hex + try: + return data.decode("utf-8") + except Exception: + new_data = _("Automatically converted to printable ascii:\n") + new_data += self.to_strish(data) + return new_data + + def control_content(self, name: str) -> str: + """return the content of a specific control.tar.gz file""" + try: + return self._get_content(self._debfile.control, name) + except LookupError: + return "" + + def data_content(self, name: str) -> str: + """return the content of a specific control.tar.gz file""" + try: + return self._get_content(self._debfile.data, name) + except LookupError: + return "" + + def _dbg(self, level: int, msg: str) -> None: + """Write debugging output to sys.stderr.""" + if level <= self.debug: + print(msg, file=sys.stderr) + + def install( + self, install_progress: apt.progress.base.InstallProgress | None = None + ) -> int: + """Install the package.""" + if self.filename is None: + raise apt_pkg.Error("No filename specified") + if install_progress is None: + return os.spawnlp(os.P_WAIT, "dpkg", "dpkg", "-i", self.filename) + else: + try: + install_progress.start_update() + except AttributeError: + install_progress.startUpdate() # type: ignore + res = install_progress.run(self.filename) + try: + install_progress.finish_update() + except AttributeError: + install_progress.finishUpdate() # type: ignore + return res + + +class DscSrcPackage(DebPackage): + """A locally available source package.""" + + def __init__( + self, filename: str | None = None, cache: apt.Cache | None = None + ) -> None: + DebPackage.__init__(self, None, cache) + self.filename: str | None = filename + self._depends: list[list[tuple[str, str, str]]] = [] + self._conflicts: list[list[tuple[str, str, str]]] = [] + self._installed_conflicts: set[str] = set() + self.pkgname = "" + self.binaries: list[str] = [] + self._sections: dict[str, str] = {} + if self.filename is not None: + self.open(self.filename) + + @property + def depends(self) -> list[list[tuple[str, str, str]]]: + """Return the dependencies of the package""" + return self._depends + + @property + def conflicts(self) -> list[list[tuple[str, str, str]]]: + """Return the dependencies of the package""" + return self._conflicts + + @property + def filelist(self) -> list[str]: + """Return the list of files associated with this dsc file""" + # Files stanza looks like (hash, size, filename, ...) + return self._sections["Files"].split()[2::3] + + def open(self, file: str) -> None: + """Open the package.""" + depends_tags = ["Build-Depends", "Build-Depends-Indep"] + conflicts_tags = ["Build-Conflicts", "Build-Conflicts-Indep"] + fd = apt_pkg.open_maybe_clear_signed_file(file) + fobj = os.fdopen(fd) + tagfile = apt_pkg.TagFile(fobj) + try: + for sec in tagfile: + for tag in depends_tags: + if tag not in sec: + continue + self._depends.extend(apt_pkg.parse_src_depends(sec[tag])) + for tag in conflicts_tags: + if tag not in sec: + continue + self._conflicts.extend(apt_pkg.parse_src_depends(sec[tag])) + if "Source" in sec: + self.pkgname = sec["Source"] + if "Binary" in sec: + self.binaries = [b.strip() for b in sec["Binary"].split(",")] + for tag in sec.keys(): + if tag in sec: + self._sections[tag] = sec[tag] + finally: + del tagfile + fobj.close() + + s = _( + "Install Build-Dependencies for " "source package '%s' that builds %s\n" + ) % (self.pkgname, " ".join(self.binaries)) + self._sections["Description"] = s + self._check_was_run = False + + def check(self, allow_downgrade: bool = False) -> bool: + """Check if the package is installable. + + The second parameter is ignored and only exists for compatibility + with parent type.""" + if not self.check_conflicts(): + for pkgname in self._installed_conflicts: + if self._cache[pkgname]._pkg.essential: + raise Exception(_("An essential package would be removed")) + self._cache[pkgname].mark_delete() + # properties are ok now + self._check_was_run = True + # FIXME: a additional run of the check_conflicts() + # after _satisfy_depends() should probably be done + return self._satisfy_depends(self.depends) + + +def _test() -> None: + """Test function""" + from apt.cache import Cache + from apt.progress.base import InstallProgress + + cache = Cache() + + vp = "www-browser" + print(f"{vp} virtual: {cache.is_virtual_package(vp)}") + providers = cache.get_providing_packages(vp) + print("Providers for %s :" % vp) + for pkg in providers: + print(" %s" % pkg.name) + + d = DebPackage(sys.argv[1], cache) + print("Deb: %s" % d.pkgname) + if not d.check(): + print("can't be satified") + print(d._failure_string) + print("missing deps: %s" % d.missing_deps) + print(d.required_changes) + + print(d.filelist) + + print("Installing ...") + ret = d.install(InstallProgress()) + print(ret) + + # s = DscSrcPackage(cache, "../tests/3ddesktop_0.2.9-6.dsc") + # s.check_dep() + # print "Missing deps: ",s.missingDeps + # print "Print required changes: ", s.requiredChanges + + s = DscSrcPackage(cache=cache) + ds = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)" + print(s._satisfy_depends(apt_pkg.parse_depends(ds, False))) + + +if __name__ == "__main__": + _test() diff --git a/apt/package.py b/apt/package.py new file mode 100644 index 0000000..50ed6d1 --- /dev/null +++ b/apt/package.py @@ -0,0 +1,1559 @@ +# package.py - apt package abstraction +# +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Functionality related to packages.""" +from __future__ import annotations + +import logging +import os +import re +import socket +import subprocess +import sys +import threading +from collections.abc import Iterable, Iterator, Mapping, Sequence +from http.client import BadStatusLine +from typing import Any, no_type_check +from urllib.error import HTTPError +from urllib.request import urlopen + +import apt_pkg +from apt_pkg import gettext as _ + +import apt.progress.text +from apt.progress.base import AcquireProgress, InstallProgress + +__all__ = ( + "BaseDependency", + "Dependency", + "Origin", + "Package", + "Record", + "Version", + "VersionList", +) + + +def _file_is_same(path: str, size: int, hashes: apt_pkg.HashStringList) -> bool: + """Return ``True`` if the file is the same.""" + if os.path.exists(path) and os.path.getsize(path) == size: + with open(path) as fobj: + return apt_pkg.Hashes(fobj).hashes == hashes + return False + + +class FetchError(Exception): + """Raised when a file could not be fetched.""" + + +class UntrustedError(FetchError): + """Raised when a file did not have a trusted hash.""" + + +class BaseDependency: + """A single dependency.""" + + class __dstr(str): + """Compare helper for compatibility with old third-party code. + + Old third-party code might still compare the relation with the + previously used relations (<<,<=,==,!=,>=,>>,) instead of the curently + used ones (<,<=,=,!=,>=,>,). This compare helper lets < match to <<, + > match to >> and = match to ==. + """ + + def __eq__(self, other: object) -> bool: + if str.__eq__(self, other): + return True + elif str.__eq__(self, "<"): + return str.__eq__("<<", other) + elif str.__eq__(self, ">"): + return str.__eq__(">>", other) + elif str.__eq__(self, "="): + return str.__eq__("==", other) + else: + return False + + def __ne__(self, other: object) -> bool: + return not self.__eq__(other) + + def __init__(self, version: Version, dep: apt_pkg.Dependency) -> None: + self._version = version # apt.package.Version + self._dep = dep # apt_pkg.Dependency + + def __str__(self) -> str: + return f"{self.rawtype}: {self.rawstr}" + + def __repr__(self) -> str: + return "<BaseDependency: name:{!r} relation:{!r} version:{!r} rawtype:{!r}>".format( + self.name, + self.relation, + self.version, + self.rawtype, + ) + + @property + def name(self) -> str: + """The name of the target package.""" + return self._dep.target_pkg.name + + @property + def relation(self) -> str: + """The relation (<, <=, =, !=, >=, >, '') in mathematical notation. + + The empty string will be returned in case of an unversioned dependency. + """ + return self.__dstr(self._dep.comp_type) + + @property + def relation_deb(self) -> str: + """The relation (<<, <=, =, !=, >=, >>, '') in Debian notation. + + The empty string will be returned in case of an unversioned dependency. + For more details see the Debian Policy Manual on the syntax of + relationship fields: + https://www.debian.org/doc/debian-policy/ch-relationships.html#s-depsyntax # noqa + + .. versionadded:: 1.0.0 + """ + return self._dep.comp_type_deb + + @property + def version(self) -> str: + """The target version or an empty string. + + Note that the version is only an empty string in case of an unversioned + dependency. In this case the relation is also an empty string. + """ + return self._dep.target_ver + + @property + def target_versions(self) -> list[Version]: + """A list of all Version objects which satisfy this dependency. + + .. versionadded:: 1.0.0 + """ + tvers = [] + _tvers: list[apt_pkg.Version] = self._dep.all_targets() + for _tver in _tvers: # type: apt_pkg.Version + _pkg: apt_pkg.Package = _tver.parent_pkg + cache = self._version.package._pcache # apt.cache.Cache + pkg = cache._rawpkg_to_pkg(_pkg) # apt.package.Package + tver = Version(pkg, _tver) # apt.package.Version + tvers.append(tver) + return tvers + + @property + def installed_target_versions(self) -> list[Version]: + """A list of all installed Version objects which satisfy this dep. + + .. versionadded:: 1.0.0 + """ + return [tver for tver in self.target_versions if tver.is_installed] + + @property + def rawstr(self) -> str: + """String represenation of the dependency. + + Returns the string representation of the dependency as it would be + written in the debian/control file. The string representation does not + include the type of the dependency. + + Example for an unversioned dependency: + python3 + + Example for a versioned dependency: + python3 >= 3.2 + + .. versionadded:: 1.0.0 + """ + if self.version: + return f"{self.name} {self.relation_deb} {self.version}" + else: + return self.name + + @property + def rawtype(self) -> str: + """Type of the dependency. + + This should be one of 'Breaks', 'Conflicts', 'Depends', 'Enhances', + 'PreDepends', 'Recommends', 'Replaces', 'Suggests'. + + Additional types might be added in the future. + """ + return self._dep.dep_type_untranslated + + @property + def pre_depend(self) -> bool: + """Whether this is a PreDepends.""" + return self._dep.dep_type_untranslated == "PreDepends" + + +class Dependency(list[BaseDependency]): + """Represent an Or-group of dependencies. + + Attributes defined here: + or_dependencies - The possible choices + rawstr - String represenation of the Or-group of dependencies + rawtype - The type of the dependencies in the Or-group + target_version - A list of Versions which satisfy this Or-group of deps + """ + + def __init__( + self, version: Version, base_deps: list[BaseDependency], rawtype: str + ) -> None: + super().__init__(base_deps) + self._version = version # apt.package.Version + self._rawtype = rawtype + + def __str__(self) -> str: + return f"{self.rawtype}: {self.rawstr}" + + def __repr__(self) -> str: + return "<Dependency: [%s]>" % (", ".join(repr(bd) for bd in self)) + + @property + def or_dependencies(self) -> Dependency: + return self + + @property + def rawstr(self) -> str: + """String represenation of the Or-group of dependencies. + + Returns the string representation of the Or-group of dependencies as it + would be written in the debian/control file. The string representation + does not include the type of the Or-group of dependencies. + + Example: + python2 >= 2.7 | python3 + + .. versionadded:: 1.0.0 + """ + return " | ".join(bd.rawstr for bd in self) + + @property + def rawtype(self) -> str: + """Type of the Or-group of dependency. + + This should be one of 'Breaks', 'Conflicts', 'Depends', 'Enhances', + 'PreDepends', 'Recommends', 'Replaces', 'Suggests'. + + Additional types might be added in the future. + + .. versionadded:: 1.0.0 + """ + return self._rawtype + + @property + def target_versions(self) -> list[Version]: + """A list of all Version objects which satisfy this Or-group of deps. + + .. versionadded:: 1.0.0 + """ + tvers: list[Version] = [] + for bd in self: # apt.package.Dependency + for tver in bd.target_versions: # apt.package.Version + if tver not in tvers: + tvers.append(tver) + return tvers + + @property + def installed_target_versions(self) -> list[Version]: + """A list of all installed Version objects which satisfy this dep. + + .. versionadded:: 1.0.0 + """ + return [tver for tver in self.target_versions if tver.is_installed] + + +class Origin: + """The origin of a version. + + Attributes defined here: + archive - The archive (eg. unstable) + component - The component (eg. main) + label - The Label, as set in the Release file + origin - The Origin, as set in the Release file + codename - The Codename, as set in the Release file + site - The hostname of the site. + trusted - Boolean value whether this is trustworthy. + """ + + def __init__(self, pkg: Package, packagefile: apt_pkg.PackageFile) -> None: + self.archive = packagefile.archive + self.component = packagefile.component + self.label = packagefile.label + self.origin = packagefile.origin + self.codename = packagefile.codename + self.site = packagefile.site + self.not_automatic = packagefile.not_automatic + # check the trust + indexfile = pkg._pcache._list.find_index(packagefile) + if indexfile and indexfile.is_trusted: + self.trusted = True + else: + self.trusted = False + + def __repr__(self) -> str: + return ( + "<Origin component:%r archive:%r origin:%r label:%r " + "site:%r isTrusted:%r>" + ) % ( + self.component, + self.archive, + self.origin, + self.label, + self.site, + self.trusted, + ) + + +class Record(Mapping[Any, Any]): + """Record in a Packages file + + Represent a record as stored in a Packages file. You can use this like + a dictionary mapping the field names of the record to their values:: + + >>> record = Record("Package: python-apt\\nVersion: 0.8.0\\n\\n") + >>> record["Package"] + 'python-apt' + >>> record["Version"] + '0.8.0' + + For example, to get the tasks of a package from a cache, you could do:: + + package.candidate.record["Tasks"].split() + + Of course, you can also use the :attr:`Version.tasks` property. + + """ + + def __init__(self, record_str: str) -> None: + self._rec = apt_pkg.TagSection(record_str) + + def __hash__(self) -> int: + return hash(self._rec) + + def __str__(self) -> str: + return str(self._rec) + + def __getitem__(self, key: str) -> str: + return self._rec[key] + + def __contains__(self, key: object) -> bool: + return key in self._rec + + def __iter__(self) -> Iterator[str]: + return iter(self._rec.keys()) + + def iteritems(self) -> Iterable[tuple[object, str]]: + """An iterator over the (key, value) items of the record.""" + for key in self._rec.keys(): + yield key, self._rec[key] + + def get(self, key: str, default: object = None) -> object: + """Return record[key] if key in record, else *default*. + + The parameter *default* must be either a string or None. + """ + return self._rec.get(key, default) + + def has_key(self, key: str) -> bool: + """deprecated form of ``key in x``.""" + return key in self._rec + + def __len__(self) -> int: + return len(self._rec) + + +class Version: + """Representation of a package version. + + The Version class contains all information related to a + specific package version. + + .. versionadded:: 0.7.9 + """ + + def __init__(self, package: Package, cand: apt_pkg.Version) -> None: + self.package = package + self._cand = cand + self.package._pcache._weakversions.add(self) + + def _cmp(self, other: Any) -> int | Any: + """Compares against another apt.Version object or a version string. + + This method behaves like Python 2's cmp builtin and returns an integer + according to the outcome. The return value is negative in case of + self < other, zero if self == other and positive if self > other. + + The comparison includes the package name and architecture if other is + an apt.Version object. If other isn't an apt.Version object it'll be + assumed that other is a version string (without package name/arch). + + .. versionchanged:: 1.0.0 + """ + # Assume that other is an apt.Version object. + try: + self_name = self.package.fullname + other_name = other.package.fullname + if self_name < other_name: + return -1 + elif self_name > other_name: + return 1 + return apt_pkg.version_compare(self._cand.ver_str, other.version) + except AttributeError: + # Assume that other is a string that only contains the version. + try: + return apt_pkg.version_compare(self._cand.ver_str, other) + except TypeError: + return NotImplemented + + def __eq__(self, other: object) -> bool: + return self._cmp(other) == 0 + + def __ge__(self, other: Version) -> bool: + return self._cmp(other) >= 0 + + def __gt__(self, other: Version) -> bool: + return self._cmp(other) > 0 + + def __le__(self, other: Version) -> bool: + return self._cmp(other) <= 0 + + def __lt__(self, other: Version) -> bool: + return self._cmp(other) < 0 + + def __ne__(self, other: object) -> bool | Any: + try: + return self._cmp(other) != 0 + except TypeError: + return NotImplemented + + def __hash__(self) -> int: + return self._cand.hash + + def __str__(self) -> str: + return f"{self.package.name}={self.version}" + + def __repr__(self) -> str: + return f"<Version: package:{self.package.name!r} version:{self.version!r}>" + + @property + def _records(self) -> apt_pkg.PackageRecords: + """Internal helper that moves the Records to the right position.""" + # If changing lookup, change fetch_binary() as well + if not self.package._pcache._records.lookup(self._cand.file_list[0]): + raise LookupError("Could not lookup record") + + return self.package._pcache._records + + @property + def _translated_records(self) -> apt_pkg.PackageRecords | None: + """Internal helper to get the translated description.""" + desc_iter = self._cand.translated_description + if self.package._pcache._records.lookup(desc_iter.file_list.pop(0)): + return self.package._pcache._records + return None + + @property + def is_security_update(self) -> bool: + """Return whether this version is a security update.""" + return bool(self._cand.is_security_update) + + @property + def installed_size(self) -> int: + """Return the size of the package when installed.""" + return self._cand.installed_size + + @property + def homepage(self) -> str: + """Return the homepage for the package.""" + return self._records.homepage + + @property + def size(self) -> int: + """Return the size of the package.""" + return self._cand.size + + @property + def architecture(self) -> str: + """Return the architecture of the package version.""" + return self._cand.arch + + @property + def downloadable(self) -> bool: + """Return whether the version of the package is downloadable.""" + return bool(self._cand.downloadable) + + @property + def is_installed(self) -> bool: + """Return wether this version of the package is currently installed. + + .. versionadded:: 1.0.0 + """ + inst_ver = self.package.installed + return inst_ver is not None and inst_ver._cand.id == self._cand.id + + @property + def version(self) -> str: + """Return the version as a string.""" + return self._cand.ver_str + + @property + def summary(self) -> str | None: + """Return the short description (one line summary).""" + records = self._translated_records + return records.short_desc if records is not None else None + + @property + def raw_description(self) -> str: + """return the long description (raw).""" + return self._records.long_desc + + @property + def section(self) -> str: + """Return the section of the package.""" + return self._cand.section + + @property + def description(self) -> str: + """Return the formatted long description. + + Return the formatted long description according to the Debian policy + (Chapter 5.6.13). + See http://www.debian.org/doc/debian-policy/ch-controlfields.html + for more information. + """ + desc = "" + records = self._translated_records + dsc = records.long_desc if records is not None else None + + if not dsc: + return _("Missing description for '%s'." "Please report.") % ( + self.package.name + ) + + try: + if not isinstance(dsc, str): + # Only convert where needed (i.e. Python 2.X) + dsc = dsc.decode("utf-8") + except UnicodeDecodeError as err: + return _( + "Invalid unicode in description for '%s' (%s). " "Please report." + ) % (self.package.name, err) + + lines = iter(dsc.split("\n")) + # Skip the first line, since its a duplication of the summary + next(lines) + for raw_line in lines: + if raw_line.strip() == ".": + # The line is just line break + if not desc.endswith("\n"): + desc += "\n\n" + continue + if raw_line.startswith(" "): + # The line should be displayed verbatim without word wrapping + if not desc.endswith("\n"): + line = "\n%s\n" % raw_line[2:] + else: + line = "%s\n" % raw_line[2:] + elif raw_line.startswith(" "): + # The line is part of a paragraph. + if desc.endswith("\n") or desc == "": + # Skip the leading white space + line = raw_line[1:] + else: + line = raw_line + else: + line = raw_line + # Add current line to the description + desc += line + return desc + + @property + def source_name(self) -> str: + """Return the name of the source package.""" + try: + return self._records.source_pkg or self.package.shortname + except IndexError: + return self.package.shortname + + @property + def source_version(self) -> str: + """Return the version of the source package.""" + try: + return self._records.source_ver or self._cand.ver_str + except IndexError: + return self._cand.ver_str + + @property + def priority(self) -> str: + """Return the priority of the package, as string.""" + return self._cand.priority_str + + @property + def policy_priority(self) -> int: + """Return the internal policy priority as a number. + See apt_preferences(5) for more information about what it means. + """ + return self.package._pcache._depcache.policy.get_priority(self._cand) + + @property + def record(self) -> Record: + """Return a Record() object for this version. + + Return a Record() object for this version which provides access + to the raw attributes of the candidate version + """ + return Record(self._records.record) + + def get_dependencies(self, *types: str) -> list[Dependency]: + """Return a list of Dependency objects for the given types. + + Multiple types can be specified. Possible types are: + 'Breaks', 'Conflicts', 'Depends', 'Enhances', 'PreDepends', + 'Recommends', 'Replaces', 'Suggests' + + Additional types might be added in the future. + """ + depends_list = [] + depends = self._cand.depends_list + for type_ in types: + try: + for dep_ver_list in depends[type_]: + base_deps = [] + for dep_or in dep_ver_list: + base_deps.append(BaseDependency(self, dep_or)) + depends_list.append(Dependency(self, base_deps, type_)) + except KeyError: + pass + return depends_list + + @property + def provides(self) -> list[str]: + """Return a list of names that this version provides.""" + return [p[0] for p in self._cand.provides_list] + + @property + def enhances(self) -> list[Dependency]: + """Return the list of enhances for the package version.""" + return self.get_dependencies("Enhances") + + @property + def dependencies(self) -> list[Dependency]: + """Return the dependencies of the package version.""" + return self.get_dependencies("PreDepends", "Depends") + + @property + def recommends(self) -> list[Dependency]: + """Return the recommends of the package version.""" + return self.get_dependencies("Recommends") + + @property + def suggests(self) -> list[Dependency]: + """Return the suggests of the package version.""" + return self.get_dependencies("Suggests") + + @property + def origins(self) -> list[Origin]: + """Return a list of origins for the package version.""" + origins = [] + for packagefile, _unused in self._cand.file_list: + origins.append(Origin(self.package, packagefile)) + return origins + + @property + def filename(self) -> str: + """Return the path to the file inside the archive. + + .. versionadded:: 0.7.10 + """ + return self._records.filename + + @property + def md5(self) -> str: + """Return the md5sum of the binary. + + .. versionadded:: 0.7.10 + """ + return self._records.md5_hash + + @property + def sha1(self) -> str: + """Return the sha1sum of the binary. + + .. versionadded:: 0.7.10 + """ + return self._records.sha1_hash + + @property + def sha256(self) -> str: + """Return the sha256sum of the binary. + + .. versionadded:: 0.7.10 + """ + return self._records.sha256_hash + + @property + def tasks(self) -> set[str]: + """Get the tasks of the package. + + A set of the names of the tasks this package belongs to. + + .. versionadded:: 0.8.0 + """ + return set(self.record["Task"].split()) + + def _uris(self) -> Iterator[str]: + """Return an iterator over all available urls. + + .. versionadded:: 0.7.10 + """ + for packagefile, _unused in self._cand.file_list: + indexfile = self.package._pcache._list.find_index(packagefile) + if indexfile: + yield indexfile.archive_uri(self._records.filename) + + @property + def uris(self) -> list[str]: + """Return a list of all available uris for the binary. + + .. versionadded:: 0.7.10 + """ + return list(self._uris()) + + @property + def uri(self) -> str | None: + """Return a single URI for the binary. + + .. versionadded:: 0.7.10 + """ + try: + return next(iter(self._uris())) + except StopIteration: + return None + + def fetch_binary( + self, + destdir: str = "", + progress: AcquireProgress | None = None, + allow_unauthenticated: bool | None = None, + ) -> str: + """Fetch the binary version of the package. + + The parameter *destdir* specifies the directory where the package will + be fetched to. + + The parameter *progress* may refer to an apt_pkg.AcquireProgress() + object. If not specified or None, apt.progress.text.AcquireProgress() + is used. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + + .. versionadded:: 0.7.10 + """ + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + base = os.path.basename(self._records.filename) + destfile = os.path.join(destdir, base) + if _file_is_same(destfile, self.size, self._records.hashes): + logging.debug("Ignoring already existing file: %s" % destfile) + return os.path.abspath(destfile) + + # Verify that the index is actually trusted + pfile, offset = self._cand.file_list[0] + index = self.package._pcache._list.find_index(pfile) + + if not (allow_unauthenticated or (index and index.is_trusted)): + raise UntrustedError( + "Could not fetch %s %s source package: " + "Source %r is not trusted" + % ( + self.package.name, + self.version, + getattr(index, "describe", "<unknown>"), + ) + ) + if not self.uri: + raise ValueError("No URI for this binary.") + hashes = self._records.hashes + if not (allow_unauthenticated or hashes.usable): + raise UntrustedError( + "The item %r could not be fetched: " "No trusted hash found." % destfile + ) + acq = apt_pkg.Acquire(progress or apt.progress.text.AcquireProgress()) + acqfile = apt_pkg.AcquireFile( + acq, self.uri, hashes, self.size, base, destfile=destfile + ) + acq.run() + + if acqfile.status != acqfile.STAT_DONE: + raise FetchError( + "The item %r could not be fetched: %s" + % (acqfile.destfile, acqfile.error_text) + ) + + return os.path.abspath(destfile) + + def fetch_source( + self, + destdir: str = "", + progress: AcquireProgress | None = None, + unpack: bool = True, + allow_unauthenticated: bool | None = None, + ) -> str: + """Get the source code of a package. + + The parameter *destdir* specifies the directory where the source will + be fetched to. + + The parameter *progress* may refer to an apt_pkg.AcquireProgress() + object. If not specified or None, apt.progress.text.AcquireProgress() + is used. + + The parameter *unpack* describes whether the source should be unpacked + (``True``) or not (``False``). By default, it is unpacked. + + If *unpack* is ``True``, the path to the extracted directory is + returned. Otherwise, the path to the .dsc file is returned. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + """ + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + + src = apt_pkg.SourceRecords() + acq = apt_pkg.Acquire(progress or apt.progress.text.AcquireProgress()) + + dsc = None + record = self._records + source_name = record.source_pkg or self.package.shortname + source_version = record.source_ver or self._cand.ver_str + source_lookup = src.lookup(source_name) + + while source_lookup and source_version != src.version: + source_lookup = src.lookup(source_name) + if not source_lookup: + raise ValueError("No source for %r" % self) + files = list() + + if not (allow_unauthenticated or src.index.is_trusted): + raise UntrustedError( + "Could not fetch %s %s source package: " + "Source %r is not trusted" + % (self.package.name, self.version, src.index.describe) + ) + for fil in src.files: + base = os.path.basename(fil.path) + destfile = os.path.join(destdir, base) + if fil.type == "dsc": + dsc = destfile + if _file_is_same(destfile, fil.size, fil.hashes): + logging.debug("Ignoring already existing file: %s" % destfile) + continue + + if not (allow_unauthenticated or fil.hashes.usable): + raise UntrustedError( + "The item %r could not be fetched: " + "No trusted hash found." % destfile + ) + files.append( + apt_pkg.AcquireFile( + acq, + src.index.archive_uri(fil.path), + fil.hashes, + fil.size, + base, + destfile=destfile, + ) + ) + acq.run() + + if dsc is None: + raise ValueError("No source for %r" % self) + + for item in acq.items: + if item.status != item.STAT_DONE: + raise FetchError( + "The item %r could not be fetched: %s" + % (item.destfile, item.error_text) + ) + + if unpack: + outdir = src.package + "-" + apt_pkg.upstream_version(src.version) + outdir = os.path.join(destdir, outdir) + subprocess.check_call(["dpkg-source", "-x", dsc, outdir]) + return os.path.abspath(outdir) + else: + return os.path.abspath(dsc) + + +class VersionList(Sequence[Version]): + """Provide a mapping & sequence interface to all versions of a package. + + This class can be used like a dictionary, where version strings are the + keys. It can also be used as a sequence, where integers are the keys. + + You can also convert this to a dictionary or a list, using the usual way + of dict(version_list) or list(version_list). This is useful if you need + to access the version objects multiple times, because they do not have to + be recreated this way. + + Examples ('package.versions' being a version list): + '0.7.92' in package.versions # Check whether 0.7.92 is a valid version. + package.versions[0] # Return first version or raise IndexError + package.versions[0:2] # Return a new VersionList for objects 0-2 + package.versions['0.7.92'] # Return version 0.7.92 or raise KeyError + package.versions.keys() # All keys, as strings. + max(package.versions) + """ + + def __init__(self, package: Package, slice_: slice | None = None) -> None: + self._package = package # apt.package.Package() + self._versions = package._pkg.version_list # [apt_pkg.Version(), ...] + if slice_: + self._versions = self._versions[slice_] + + def __getitem__(self, item: int | slice | str) -> Any: + # FIXME: Should not be returning Any, should have overloads; but + # pyflakes complains + if isinstance(item, slice): + return self.__class__(self._package, item) + try: + # Sequence interface, item is an integer + return Version(self._package, self._versions[item]) # type: ignore + except TypeError: + # Dictionary interface item is a string. + for ver in self._versions: + if ver.ver_str == item: + return Version(self._package, ver) + raise KeyError("Version: %r not found." % (item)) + + def __str__(self) -> str: + return "[%s]" % (", ".join(str(ver) for ver in self)) + + def __repr__(self) -> str: + return "<VersionList: %r>" % self.keys() + + def __iter__(self) -> Iterator[Version]: + """Return an iterator over all value objects.""" + return (Version(self._package, ver) for ver in self._versions) + + def __contains__(self, item: object) -> bool: + if isinstance(item, Version): # Sequence interface + item = item.version + # Dictionary interface. + for ver in self._versions: + if ver.ver_str == item: + return True + return False + + def __eq__(self, other: Any) -> bool: + return list(self) == list(other) + + def __len__(self) -> int: + return len(self._versions) + + # Mapping interface + + def keys(self) -> list[str]: + """Return a list of all versions, as strings.""" + return [ver.ver_str for ver in self._versions] + + def get(self, key: str, default: Version | None = None) -> Version | None: + """Return the key or the default.""" + try: + return self[key] # type: ignore # FIXME: should be deterined automatically # noqa + except LookupError: + return default + + +class Package: + """Representation of a package in a cache. + + This class provides methods and properties for working with a package. It + lets you mark the package for installation, check if it is installed, and + much more. + """ + + def __init__(self, pcache: apt.Cache, pkgiter: apt_pkg.Package) -> None: + """Init the Package object""" + self._pkg = pkgiter + self._pcache = pcache # python cache in cache.py + self._changelog = "" # Cached changelog + + def __str__(self) -> str: + return self.name + + def __repr__(self) -> str: + return "<Package: name:{!r} architecture={!r} id:{!r}>".format( + self._pkg.name, + self._pkg.architecture, + self._pkg.id, + ) + + def __lt__(self, other: Package) -> bool: + return self.name < other.name + + @property + def candidate(self) -> Version | None: + """Return the candidate version of the package. + + This property is writeable to allow you to set the candidate version + of the package. Just assign a Version() object, and it will be set as + the candidate version. + """ + cand = self._pcache._depcache.get_candidate_ver(self._pkg) + if cand is not None: + return Version(self, cand) + return None + + @candidate.setter + def candidate(self, version: Version) -> None: + """Set the candidate version of the package.""" + self._pcache.cache_pre_change() + self._pcache._depcache.set_candidate_ver(self._pkg, version._cand) + self._pcache.cache_post_change() + + @property + def installed(self) -> Version | None: + """Return the currently installed version of the package. + + .. versionadded:: 0.7.9 + """ + if self._pkg.current_ver is not None: + return Version(self, self._pkg.current_ver) + return None + + @property + def name(self) -> str: + """Return the name of the package, possibly including architecture. + + If the package is not part of the system's preferred architecture, + return the same as :attr:`fullname`, otherwise return the same + as :attr:`shortname` + + .. versionchanged:: 0.7.100.3 + + As part of multi-arch, this field now may include architecture + information. + """ + return self._pkg.get_fullname(True) + + @property + def fullname(self) -> str: + """Return the name of the package, including architecture. + + Note that as for :meth:`architecture`, this returns the + native architecture for Architecture: all packages. + + .. versionadded:: 0.7.100.3""" + return self._pkg.get_fullname(False) + + @property + def shortname(self) -> str: + """Return the name of the package, without architecture. + + .. versionadded:: 0.7.100.3""" + return self._pkg.name + + @property + def id(self) -> int: + """Return a uniq ID for the package. + + This can be used eg. to store additional information about the pkg.""" + return self._pkg.id + + @property + def essential(self) -> bool: + """Return True if the package is an essential part of the system.""" + return self._pkg.essential + + def architecture(self) -> str: + """Return the Architecture of the package. + + Note that for Architecture: all packages, this returns the + native architecture, as they are internally treated like native + packages. To get the concrete architecture, look at the + :attr:`Version.architecture` attribute. + + .. versionchanged:: 0.7.100.3 + This is now the package's architecture in the multi-arch sense, + previously it was the architecture of the candidate version + and deprecated. + """ + return self._pkg.architecture + + # depcache states + + @property + def marked_install(self) -> bool: + """Return ``True`` if the package is marked for install.""" + return self._pcache._depcache.marked_install(self._pkg) + + @property + def marked_upgrade(self) -> bool: + """Return ``True`` if the package is marked for upgrade.""" + return self._pcache._depcache.marked_upgrade(self._pkg) + + @property + def marked_delete(self) -> bool: + """Return ``True`` if the package is marked for delete.""" + return self._pcache._depcache.marked_delete(self._pkg) + + @property + def marked_keep(self) -> bool: + """Return ``True`` if the package is marked for keep.""" + return self._pcache._depcache.marked_keep(self._pkg) + + @property + def marked_downgrade(self) -> bool: + """Package is marked for downgrade""" + return self._pcache._depcache.marked_downgrade(self._pkg) + + @property + def marked_reinstall(self) -> bool: + """Return ``True`` if the package is marked for reinstall.""" + return self._pcache._depcache.marked_reinstall(self._pkg) + + @property + def is_installed(self) -> bool: + """Return ``True`` if the package is installed.""" + return self._pkg.current_ver is not None + + @property + def is_upgradable(self) -> bool: + """Return ``True`` if the package is upgradable.""" + return self.is_installed and self._pcache._depcache.is_upgradable(self._pkg) + + @property + def is_auto_removable(self) -> bool: + """Return ``True`` if the package is no longer required. + + If the package has been installed automatically as a dependency of + another package, and if no packages depend on it anymore, the package + is no longer required. + """ + return ( + self.is_installed or self.marked_install + ) and self._pcache._depcache.is_garbage(self._pkg) + + @property + def is_auto_installed(self) -> bool: + """Return whether the package is marked as automatically installed.""" + return self._pcache._depcache.is_auto_installed(self._pkg) + + @property + def phasing_applied(self) -> bool: + """Return ``True`` if the package update is being phased.""" + return self._pcache._depcache.phasing_applied(self._pkg) + + # sizes + + @property + def installed_files(self) -> list[str]: + """Return a list of files installed by the package. + + Return a list of unicode names of the files which have + been installed by this package + """ + for name in self.name, self.fullname: + path = "/var/lib/dpkg/info/%s.list" % name + try: + with open(path, "rb") as file_list: + return file_list.read().decode("utf-8").strip().split("\n") + except OSError: + continue + + return [] + + def get_changelog( + self, uri: str | None = None, cancel_lock: threading.Event | None = None + ) -> str: + """ + Download the changelog of the package and return it as unicode + string. + + The parameter *uri* refers to the uri of the changelog file. It may + contain multiple named variables which will be substitued. These + variables are (src_section, prefix, src_pkg, src_ver). An example is + the Ubuntu changelog:: + + "http://changelogs.ubuntu.com/changelogs/pool" \\ + "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ + "/%(src_pkg)s_%(src_ver)s/changelog" + + The parameter *cancel_lock* refers to an instance of threading.Event, + which if set, prevents the download. + """ + # Return a cached changelog if available + if self._changelog != "": + return self._changelog + + if not self.candidate: + return _("The list of changes is not available") + + if uri is None: + if self.candidate.origins[0].origin == "Debian": + uri = ( + "http://packages.debian.org/changelogs/pool" + "/%(src_section)s/%(prefix)s/%(src_pkg)s" + "/%(src_pkg)s_%(src_ver)s/changelog" + ) + elif self.candidate.origins[0].origin == "Ubuntu": + uri = ( + "http://changelogs.ubuntu.com/changelogs/pool" + "/%(src_section)s/%(prefix)s/%(src_pkg)s" + "/%(src_pkg)s_%(src_ver)s/changelog" + ) + else: + res = _("The list of changes is not available") + if isinstance(res, str): + return res + else: + return res.decode("utf-8") + + # get the src package name + src_pkg = self.candidate.source_name + + # assume "main" section + src_section = "main" + # use the section of the candidate as a starting point + section = self.candidate.section + + # get the source version + src_ver = self.candidate.source_version + + try: + # try to get the source version of the pkg, this differs + # for some (e.g. libnspr4 on ubuntu) + # this feature only works if the correct deb-src are in the + # sources.list otherwise we fall back to the binary version number + src_records = apt_pkg.SourceRecords() + except SystemError: + pass + else: + while src_records.lookup(src_pkg): + if not src_records.version: + continue + if self.candidate.source_version == src_records.version: + # Direct match, use it and do not do more lookups. + src_ver = src_records.version + section = src_records.section + break + if apt_pkg.version_compare(src_records.version, src_ver) > 0: + # The version is higher, it seems to match. + src_ver = src_records.version + section = src_records.section + + section_split = section.split("/", 1) + if len(section_split) > 1: + src_section = section_split[0] + del section_split + + # lib is handled special + prefix = src_pkg[0] + if src_pkg.startswith("lib"): + prefix = "lib" + src_pkg[3] + + # stip epoch + src_ver_split = src_ver.split(":", 1) + if len(src_ver_split) > 1: + src_ver = "".join(src_ver_split[1:]) + del src_ver_split + + uri = uri % { + "src_section": src_section, + "prefix": prefix, + "src_pkg": src_pkg, + "src_ver": src_ver, + } + + timeout = socket.getdefaulttimeout() + + # FIXME: when python2.4 vanishes from the archive, + # merge this into a single try..finally block (pep 341) + try: + try: + # Set a timeout for the changelog download + socket.setdefaulttimeout(2) + + # Check if the download was canceled + if cancel_lock and cancel_lock.is_set(): + return "" + # FIXME: python3.2: Should be closed manually + changelog_file = urlopen(uri) + # do only get the lines that are new + changelog = "" + regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(src_pkg)) + while True: + # Check if the download was canceled + if cancel_lock and cancel_lock.is_set(): + return "" + # Read changelog line by line + line_raw = changelog_file.readline() + if not line_raw: + break + # The changelog is encoded in utf-8, but since there isn't + # any http header, urllib2 seems to treat it as ascii + line = line_raw.decode("utf-8") + + # print line.encode('utf-8') + match = re.match(regexp, line) + if match: + # strip epoch from installed version + # and from changelog too + installed = getattr(self.installed, "version", None) + if installed and ":" in installed: + installed = installed.split(":", 1)[1] + changelog_ver = match.group(1) + if changelog_ver and ":" in changelog_ver: + changelog_ver = changelog_ver.split(":", 1)[1] + + if ( + installed + and apt_pkg.version_compare(changelog_ver, installed) <= 0 + ): + break + # EOF (shouldn't really happen) + changelog += line + + # Print an error if we failed to extract a changelog + if len(changelog) == 0: + changelog = _("The list of changes is not available") + if not isinstance(changelog, str): + changelog = changelog.decode("utf-8") + self._changelog = changelog + + except HTTPError: + if self.candidate.origins[0].origin == "Ubuntu": + res = _( + "The list of changes is not available yet.\n\n" + "Please use " + "http://launchpad.net/ubuntu/+source/%s/" + "%s/+changelog\n" + "until the changes become available or try again " + "later." + ) % (src_pkg, src_ver) + else: + res = _("The list of changes is not available") + if isinstance(res, str): + return res + else: + return res.decode("utf-8") + except (OSError, BadStatusLine): + res = _( + "Failed to download the list of changes. \nPlease " + "check your Internet connection." + ) + if isinstance(res, str): + return res + else: + return res.decode("utf-8") + finally: + socket.setdefaulttimeout(timeout) + return self._changelog + + @property + def versions(self) -> VersionList: + """Return a VersionList() object for all available versions. + + .. versionadded:: 0.7.9 + """ + return VersionList(self) + + @property + def is_inst_broken(self) -> bool: + """Return True if the to-be-installed package is broken.""" + return self._pcache._depcache.is_inst_broken(self._pkg) + + @property + def is_now_broken(self) -> bool: + """Return True if the installed package is broken.""" + return self._pcache._depcache.is_now_broken(self._pkg) + + @property + def has_config_files(self) -> bool: + """Checks whether the package is is the config-files state.""" + return self._pkg.current_state == apt_pkg.CURSTATE_CONFIG_FILES + + # depcache actions + + def mark_keep(self) -> None: + """Mark a package for keep.""" + self._pcache.cache_pre_change() + self._pcache._depcache.mark_keep(self._pkg) + self._pcache.cache_post_change() + + def mark_delete(self, auto_fix: bool = True, purge: bool = False) -> None: + """Mark a package for deletion. + + If *auto_fix* is ``True``, the resolver will be run, trying to fix + broken packages. This is the default. + + If *purge* is ``True``, remove the configuration files of the package + as well. The default is to keep the configuration. + """ + self._pcache.cache_pre_change() + self._pcache._depcache.mark_delete(self._pkg, purge) + # try to fix broken stuffsta + if auto_fix and self._pcache._depcache.broken_count > 0: + fix = apt_pkg.ProblemResolver(self._pcache._depcache) + fix.clear(self._pkg) + fix.protect(self._pkg) + fix.remove(self._pkg) + fix.resolve() + self._pcache.cache_post_change() + + def mark_install( + self, auto_fix: bool = True, auto_inst: bool = True, from_user: bool = True + ) -> None: + """Mark a package for install. + + If *autoFix* is ``True``, the resolver will be run, trying to fix + broken packages. This is the default. + + If *autoInst* is ``True``, the dependencies of the packages will be + installed automatically. This is the default. + + If *fromUser* is ``True``, this package will not be marked as + automatically installed. This is the default. Set it to False if you + want to be able to automatically remove the package at a later stage + when no other package depends on it. + """ + self._pcache.cache_pre_change() + self._pcache._depcache.mark_install(self._pkg, auto_inst, from_user) + # try to fix broken stuff + if auto_fix and self._pcache._depcache.broken_count > 0: + fixer = apt_pkg.ProblemResolver(self._pcache._depcache) + fixer.clear(self._pkg) + fixer.protect(self._pkg) + fixer.resolve(True) + self._pcache.cache_post_change() + + def mark_upgrade(self, from_user: bool = True) -> None: + """Mark a package for upgrade.""" + if self.is_upgradable: + auto = self.is_auto_installed + self.mark_install(from_user=from_user) + self.mark_auto(auto) + else: + # FIXME: we may want to throw a exception here + sys.stderr.write( + ("MarkUpgrade() called on a non-upgradeable pkg: " "'%s'\n") + % self._pkg.name + ) + + def mark_auto(self, auto: bool = True) -> None: + """Mark a package as automatically installed. + + Call this function to mark a package as automatically installed. If the + optional parameter *auto* is set to ``False``, the package will not be + marked as automatically installed anymore. The default is ``True``. + """ + self._pcache._depcache.mark_auto(self._pkg, auto) + + def commit(self, fprogress: AcquireProgress, iprogress: InstallProgress) -> None: + """Commit the changes. + + The parameter *fprogress* refers to a apt_pkg.AcquireProgress() object, + like apt.progress.text.AcquireProgress(). + + The parameter *iprogress* refers to an InstallProgress() object, as + found in apt.progress.base. + """ + self._pcache._depcache.commit(fprogress, iprogress) + + +@no_type_check +def _test(): + """Self-test.""" + print("Self-test for the Package modul") + import random + + apt_pkg.init() + progress = apt.progress.text.OpProgress() + cache = apt.Cache(progress) + pkg = cache["apt-utils"] + print("Name: %s " % pkg.name) + print("ID: %s " % pkg.id) + print("Priority (Candidate): %s " % pkg.candidate.priority) + print("Priority (Installed): %s " % pkg.installed.priority) + print("Installed: %s " % pkg.installed.version) + print("Candidate: %s " % pkg.candidate.version) + print("CandidateDownloadable: %s" % pkg.candidate.downloadable) + print("CandidateOrigins: %s" % pkg.candidate.origins) + print("SourcePkg: %s " % pkg.candidate.source_name) + print("Section: %s " % pkg.section) + print("Summary: %s" % pkg.candidate.summary) + print("Description (formatted) :\n%s" % pkg.candidate.description) + print("Description (unformatted):\n%s" % pkg.candidate.raw_description) + print("InstalledSize: %s " % pkg.candidate.installed_size) + print("PackageSize: %s " % pkg.candidate.size) + print("Dependencies: %s" % pkg.installed.dependencies) + print("Recommends: %s" % pkg.installed.recommends) + for dep in pkg.candidate.dependencies: + print( + ",".join( + f"{o.name} ({o.version}) ({o.relation}) ({o.pre_depend})" + for o in dep.or_dependencies + ) + ) + print("arch: %s" % pkg.candidate.architecture) + print("homepage: %s" % pkg.candidate.homepage) + print("rec: ", pkg.candidate.record) + + print(cache["2vcard"].get_changelog()) + for i in True, False: + print("Running install on random upgradable pkgs with AutoFix: ", i) + for pkg in cache: + if pkg.is_upgradable: + if random.randint(0, 1) == 1: + pkg.mark_install(i) + print("Broken: %s " % cache._depcache.broken_count) + print("InstCount: %s " % cache._depcache.inst_count) + + print() + # get a new cache + for i in True, False: + print("Randomly remove some packages with AutoFix: %s" % i) + cache = apt.Cache(progress) + for name in cache.keys(): + if random.randint(0, 1) == 1: + try: + cache[name].mark_delete(i) + except SystemError: + print("Error trying to remove: %s " % name) + print("Broken: %s " % cache._depcache.broken_count) + print("DelCount: %s " % cache._depcache.del_count) + + +# self-test +if __name__ == "__main__": + _test() diff --git a/apt/progress/__init__.py b/apt/progress/__init__.py new file mode 100644 index 0000000..d1687d5 --- /dev/null +++ b/apt/progress/__init__.py @@ -0,0 +1,28 @@ +# apt/progress/__init__.py - Initialization file for apt.progress. +# +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting. + +This package provides progress reporting for the python-apt package. The module +'base' provides classes with no output, and the module 'text' provides classes +for terminals, etc. +""" + +from collections.abc import Sequence + +__all__: Sequence[str] = [] diff --git a/apt/progress/base.py b/apt/progress/base.py new file mode 100644 index 0000000..ede5e5c --- /dev/null +++ b/apt/progress/base.py @@ -0,0 +1,332 @@ +# apt/progress/base.py - Base classes for progress reporting. +# +# Copyright (C) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +# pylint: disable-msg = R0201 +"""Base classes for progress reporting. + +Custom progress classes should inherit from these classes. They can also be +used as dummy progress classes which simply do nothing. +""" +from __future__ import annotations + +import errno +import fcntl +import io +import os +import re +import select +import sys + +import apt_pkg + +__all__ = ["AcquireProgress", "CdromProgress", "InstallProgress", "OpProgress"] + + +class AcquireProgress: + """Monitor object for downloads controlled by the Acquire class. + + This is an mostly abstract class. You should subclass it and implement the + methods to get something useful. + """ + + current_bytes = current_cps = fetched_bytes = last_bytes = total_bytes = 0.0 + current_items = elapsed_time = total_items = 0 + + def done(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item is successfully and completely fetched.""" + + def fail(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item could not be fetched.""" + + def fetch(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when some of the item's data is fetched.""" + + def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item is confirmed to be up-to-date. + + Invoked when an item is confirmed to be up-to-date. For instance, + when an HTTP download is informed that the file on the server was + not modified. + """ + + def media_change(self, media: str, drive: str) -> bool: + """Prompt the user to change the inserted removable media. + + The parameter 'media' decribes the name of the media type that + should be changed, whereas the parameter 'drive' should be the + identifying name of the drive whose media should be changed. + + This method should not return until the user has confirmed to the user + interface that the media change is complete. It must return True if + the user confirms the media change, or False to cancel it. + """ + return False + + def pulse(self, owner: apt_pkg.Acquire) -> bool: + """Periodically invoked while the Acquire process is underway. + + This method gets invoked while the Acquire progress given by the + parameter 'owner' is underway. It should display information about + the current state. + + This function returns a boolean value indicating whether the + acquisition should be continued (True) or cancelled (False). + """ + return True + + def start(self) -> None: + """Invoked when the Acquire process starts running.""" + # Reset all our values. + self.current_bytes = 0.0 + self.current_cps = 0.0 + self.current_items = 0 + self.elapsed_time = 0 + self.fetched_bytes = 0.0 + self.last_bytes = 0.0 + self.total_bytes = 0.0 + self.total_items = 0 + + def stop(self) -> None: + """Invoked when the Acquire process stops running.""" + + +class CdromProgress: + """Base class for reporting the progress of adding a cdrom. + + Can be used with apt_pkg.Cdrom to produce an utility like apt-cdrom. The + attribute 'total_steps' defines the total number of steps and can be used + in update() to display the current progress. + """ + + total_steps = 0 + + def ask_cdrom_name(self) -> str | None: + """Ask for the name of the cdrom. + + If a name has been provided, return it. Otherwise, return None to + cancel the operation. + """ + + def change_cdrom(self) -> bool: + """Ask for the CD-ROM to be changed. + + Return True once the cdrom has been changed or False to cancel the + operation. + """ + return False + + def update(self, text: str, current: int) -> None: + """Periodically invoked to update the interface. + + The string 'text' defines the text which should be displayed. The + integer 'current' defines the number of completed steps. + """ + + +class InstallProgress: + """Class to report the progress of installing packages.""" + + child_pid, percent, select_timeout, status = 0, 0.0, 0.1, "" + + def __init__(self) -> None: + (self.statusfd, self.writefd) = os.pipe() + # These will leak fds, but fixing this safely requires API changes. + self.write_stream: io.TextIOBase = os.fdopen(self.writefd, "w") + self.status_stream: io.TextIOBase = os.fdopen(self.statusfd, "r") # noqa + fcntl.fcntl(self.statusfd, fcntl.F_SETFL, os.O_NONBLOCK) + + def start_update(self) -> None: + """(Abstract) Start update.""" + + def finish_update(self) -> None: + """(Abstract) Called when update has finished.""" + + def __enter__(self) -> InstallProgress: + return self + + def __exit__(self, type: object, value: object, traceback: object) -> None: + self.write_stream.close() + self.status_stream.close() + + def error(self, pkg: str, errormsg: str) -> None: + """(Abstract) Called when a error is detected during the install.""" + + def conffile(self, current: str, new: str) -> None: + """(Abstract) Called when a conffile question from dpkg is detected.""" + + def status_change(self, pkg: str, percent: float, status: str) -> None: + """(Abstract) Called when the APT status changed.""" + + def dpkg_status_change(self, pkg: str, status: str) -> None: + """(Abstract) Called when the dpkg status changed.""" + + def processing(self, pkg: str, stage: str) -> None: + """(Abstract) Sent just before a processing stage starts. + + The parameter 'stage' is one of "upgrade", "install" + (both sent before unpacking), "configure", "trigproc", "remove", + "purge". This method is used for dpkg only. + """ + + def run(self, obj: apt_pkg.PackageManager | bytes | str) -> int: + """Install using the object 'obj'. + + This functions runs install actions. The parameter 'obj' may either + be a PackageManager object in which case its do_install() method is + called or the path to a deb file. + + If the object is a PackageManager, the functions returns the result + of calling its do_install() method. Otherwise, the function returns + the exit status of dpkg. In both cases, 0 means that there were no + problems. + """ + pid = self.fork() + if pid == 0: + try: + # PEP-446 implemented in Python 3.4 made all descriptors + # CLOEXEC, but we need to be able to pass writefd to dpkg + # when we spawn it + os.set_inheritable(self.writefd, True) + except AttributeError: # if we don't have os.set_inheritable() + pass + # pm.do_install might raise a exception, + # when this happens, we need to catch + # it, otherwise os._exit() is not run + # and the execution continues in the + # parent code leading to very confusing bugs + try: + os._exit(obj.do_install(self.write_stream.fileno())) # type: ignore # noqa + except AttributeError: + os._exit( + os.spawnlp( + os.P_WAIT, + "dpkg", + "dpkg", + "--status-fd", + str(self.write_stream.fileno()), + "-i", + obj, # type: ignore # noqa + ) + ) + except Exception as e: + sys.stderr.write("%s\n" % e) + os._exit(apt_pkg.PackageManager.RESULT_FAILED) + + self.child_pid = pid + res = self.wait_child() + return os.WEXITSTATUS(res) + + def fork(self) -> int: + """Fork.""" + return os.fork() + + def update_interface(self) -> None: + """Update the interface.""" + try: + line = self.status_stream.readline() + except OSError as err: + # resource temporarly unavailable is ignored + if err.errno != errno.EAGAIN and err.errno != errno.EWOULDBLOCK: + print(err.strerror) + return + + pkgname = status = status_str = percent = base = "" + + if line.startswith("pm"): + try: + (status, pkgname, percent, status_str) = line.split(":", 3) + except ValueError: + # silently ignore lines that can't be parsed + return + elif line.startswith("status"): + try: + (base, pkgname, status, status_str) = line.split(":", 3) + except ValueError: + (base, pkgname, status) = line.split(":", 2) + elif line.startswith("processing"): + (status, status_str, pkgname) = line.split(":", 2) + self.processing(pkgname.strip(), status_str.strip()) + + # Always strip the status message + pkgname = pkgname.strip() + status_str = status_str.strip() + status = status.strip() + + if status == "pmerror" or status == "error": + self.error(pkgname, status_str) + elif status == "conffile-prompt" or status == "pmconffile": + match = re.match("\\s*'(.*)'\\s*'(.*)'.*", status_str) + if match: + self.conffile(match.group(1), match.group(2)) + elif status == "pmstatus": + # FIXME: Float comparison + if float(percent) != self.percent or status_str != self.status: + self.status_change(pkgname, float(percent), status_str.strip()) + self.percent = float(percent) + self.status = status_str.strip() + elif base == "status": + self.dpkg_status_change(pkgname, status) + + def wait_child(self) -> int: + """Wait for child progress to exit. + + This method is responsible for calling update_interface() from time to + time. It exits once the child has exited. The return values is the + full status returned from os.waitpid() (not only the return code). + """ + (pid, res) = (0, 0) + while True: + try: + select.select([self.status_stream], [], [], self.select_timeout) + except OSError as error: + (errno_, _errstr) = error.args + if errno_ != errno.EINTR: + raise + + self.update_interface() + try: + (pid, res) = os.waitpid(self.child_pid, os.WNOHANG) + if pid == self.child_pid: + break + except OSError as err: + if err.errno == errno.ECHILD: + break + if err.errno != errno.EINTR: + raise + + return res + + +class OpProgress: + """Monitor objects for operations. + + Display the progress of operations such as opening the cache.""" + + major_change, op, percent, subop = False, "", 0.0, "" + + def update(self, percent: float | None = None) -> None: + """Called periodically to update the user interface. + + You may use the optional argument 'percent' to set the attribute + 'percent' in this call. + """ + if percent is not None: + self.percent = percent + + def done(self) -> None: + """Called once an operation has been completed.""" diff --git a/apt/progress/text.py b/apt/progress/text.py new file mode 100644 index 0000000..ea1a176 --- /dev/null +++ b/apt/progress/text.py @@ -0,0 +1,294 @@ +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting for text interfaces.""" +import io +import os +import signal +import sys +import types +from collections.abc import Callable + +import apt_pkg + +from apt.progress import base + +__all__ = ["AcquireProgress", "CdromProgress", "OpProgress"] + + +def _(msg: str) -> str: + """Translate the message, also try apt if translation is missing.""" + res = apt_pkg.gettext(msg) + if res == msg: + res = apt_pkg.gettext(msg, "apt") + return res + + +class TextProgress: + """Internal Base class for text progress classes.""" + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + self._file = outfile or sys.stdout + self._width = 0 + + def _write(self, msg: str, newline: bool = True, maximize: bool = False) -> None: + """Write the message on the terminal, fill remaining space.""" + self._file.write("\r") + self._file.write(msg) + + # Fill remaining stuff with whitespace + if self._width > len(msg): + self._file.write((self._width - len(msg)) * " ") + elif maximize: # Needed for OpProgress. + self._width = max(self._width, len(msg)) + if newline: + self._file.write("\n") + else: + # self._file.write("\r") + self._file.flush() + + +class OpProgress(base.OpProgress, TextProgress): + """Operation progress reporting. + + This closely resembles OpTextProgress in libapt-pkg. + """ + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + TextProgress.__init__(self, outfile) + base.OpProgress.__init__(self) + self.old_op = "" + + def update(self, percent: float | None = None) -> None: + """Called periodically to update the user interface.""" + base.OpProgress.update(self, percent) + if self.major_change and self.old_op: + self._write(self.old_op) + self._write("%s... %i%%\r" % (self.op, self.percent), False, True) + self.old_op = self.op + + def done(self) -> None: + """Called once an operation has been completed.""" + base.OpProgress.done(self) + if self.old_op: + self._write(_("%c%s... Done") % ("\r", self.old_op), True, True) + self.old_op = "" + + +class AcquireProgress(base.AcquireProgress, TextProgress): + """AcquireProgress for the text interface.""" + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + TextProgress.__init__(self, outfile) + base.AcquireProgress.__init__(self) + self._signal: ( + Callable[[int, types.FrameType | None], None] | int | signal.Handlers | None + ) = None # noqa + self._width = 80 + self._id = 1 + + def start(self) -> None: + """Start an Acquire progress. + + In this case, the function sets up a signal handler for SIGWINCH, i.e. + window resize signals. And it also sets id to 1. + """ + base.AcquireProgress.start(self) + self._signal = signal.signal(signal.SIGWINCH, self._winch) + # Get the window size. + self._winch() + self._id = 1 + + def _winch(self, *dummy: object) -> None: + """Signal handler for window resize signals.""" + if hasattr(self._file, "fileno") and os.isatty(self._file.fileno()): + import fcntl + import struct + import termios + + buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * b" ") # noqa + dummy, col, dummy, dummy = struct.unpack("hhhh", buf) + self._width = col - 1 # 1 for the cursor + + def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when an item is update (e.g. not modified on the server).""" + base.AcquireProgress.ims_hit(self, item) + line = _("Hit ") + item.description + if item.owner.filesize: + line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize) + self._write(line) + + def fail(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when an item is failed.""" + base.AcquireProgress.fail(self, item) + if item.owner.status == item.owner.STAT_DONE: + self._write(_("Ign ") + item.description) + else: + self._write(_("Err ") + item.description) + self._write(" %s" % item.owner.error_text) + + def fetch(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when some of the item's data is fetched.""" + base.AcquireProgress.fetch(self, item) + # It's complete already (e.g. Hit) + if item.owner.complete: + return + item.owner.id = self._id + self._id += 1 + line = _("Get:") + f"{item.owner.id} {item.description}" + if item.owner.filesize: + line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize) + + self._write(line) + + def pulse(self, owner: apt_pkg.Acquire) -> bool: + """Periodically invoked while the Acquire process is underway. + + Return False if the user asked to cancel the whole Acquire process.""" + base.AcquireProgress.pulse(self, owner) + # only show progress on a tty to not clutter log files etc + if hasattr(self._file, "fileno") and not os.isatty(self._file.fileno()): + return True + + # calculate progress + percent = ((self.current_bytes + self.current_items) * 100.0) / float( + self.total_bytes + self.total_items + ) + + shown = False + tval = "%i%%" % percent + end = "" + if self.current_cps: + eta = int(float(self.total_bytes - self.current_bytes) / self.current_cps) + end = " {}B/s {}".format( + apt_pkg.size_to_str(self.current_cps), + apt_pkg.time_to_str(eta), + ) + + for worker in owner.workers: + val = "" + if not worker.current_item: + if worker.status: + val = " [%s]" % worker.status + if len(tval) + len(val) + len(end) >= self._width: + break + tval += val + shown = True + continue + shown = True + + if worker.current_item.owner.id: + val += " [%i %s" % ( + worker.current_item.owner.id, + worker.current_item.shortdesc, + ) + else: + val += " [%s" % worker.current_item.description + if worker.current_item.owner.active_subprocess: + val += " %s" % worker.current_item.owner.active_subprocess + + val += " %sB" % apt_pkg.size_to_str(worker.current_size) + + # Add the total size and percent + if worker.total_size and not worker.current_item.owner.complete: + val += "/%sB %i%%" % ( + apt_pkg.size_to_str(worker.total_size), + worker.current_size * 100.0 / worker.total_size, + ) + + val += "]" + + if len(tval) + len(val) + len(end) >= self._width: + # Display as many items as screen width + break + else: + tval += val + + if not shown: + tval += _(" [Working]") + + if self.current_cps: + tval += (self._width - len(end) - len(tval)) * " " + end + + self._write(tval, False) + return True + + def media_change(self, medium: str, drive: str) -> bool: + """Prompt the user to change the inserted removable media.""" + base.AcquireProgress.media_change(self, medium, drive) + self._write( + _( + "Media change: please insert the disc labeled\n" + " '%s'\n" + "in the drive '%s' and press enter\n" + ) + % (medium, drive) + ) + return input() not in ("c", "C") + + def stop(self) -> None: + """Invoked when the Acquire process stops running.""" + base.AcquireProgress.stop(self) + # Trick for getting a translation from apt + self._write( + ( + _("Fetched %sB in %s (%sB/s)\n") + % ( + apt_pkg.size_to_str(self.fetched_bytes), + apt_pkg.time_to_str(self.elapsed_time), + apt_pkg.size_to_str(self.current_cps), + ) + ).rstrip("\n") + ) + + # Delete the signal again. + import signal + + signal.signal(signal.SIGWINCH, self._signal) + + +class CdromProgress(base.CdromProgress, TextProgress): + """Text CD-ROM progress.""" + + def ask_cdrom_name(self) -> str | None: + """Ask the user to provide a name for the disc.""" + base.CdromProgress.ask_cdrom_name(self) + self._write( + _( + "Please provide a name for this medium, such as " + "'Debian 2.1r1 Disk 1'" + ), + False, + ) + try: + return str(input(":")) + except KeyboardInterrupt: + return None + + def update(self, text: str, current: int) -> None: + """Set the current progress.""" + base.CdromProgress.update(self, text, current) + if text: + self._write(text, False) + + def change_cdrom(self) -> bool: + """Ask the user to change the CD-ROM.""" + base.CdromProgress.change_cdrom(self) + self._write(_("Please insert an installation medium and press enter"), False) + try: + return bool(input() == "") + except KeyboardInterrupt: + return False diff --git a/apt/py.typed b/apt/py.typed new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/apt/py.typed diff --git a/apt/utils.py b/apt/utils.py new file mode 100644 index 0000000..5b1fd46 --- /dev/null +++ b/apt/utils.py @@ -0,0 +1,100 @@ +# Copyright (C) 2009 Canonical +# +# Authors: +# Michael Vogt +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +import datetime +import os + +import apt_pkg + +import apt + + +def get_maintenance_end_date( + release_date: datetime.datetime, m_months: int +) -> tuple[int, int]: + """ + get the (year, month) tuple when the maintenance for the distribution + ends. Needs the data of the release and the number of months that + its is supported as input + """ + # calc end date + years = m_months // 12 + months = m_months % 12 + support_end_year = release_date.year + years + (release_date.month + months) // 12 + support_end_month = (release_date.month + months) % 12 + # special case: this happens when e.g. doing 2010-06 + 18 months + if support_end_month == 0: + support_end_month = 12 + support_end_year -= 1 + return (support_end_year, support_end_month) + + +def get_release_date_from_release_file(path: str) -> int | None: + """ + return the release date as time_t for the given release file + """ + if not path or not os.path.exists(path): + return None + + with os.fdopen(apt_pkg.open_maybe_clear_signed_file(path)) as data: + tag = apt_pkg.TagFile(data) + section = next(tag) + if "Date" not in section: + return None + date = section["Date"] + return apt_pkg.str_to_time(date) + + +def get_release_filename_for_pkg( + cache: apt.Cache, pkgname: str, label: str, release: str +) -> str | None: + "get the release file that provides this pkg" + if pkgname not in cache: + return None + pkg = cache[pkgname] + ver = None + # look for the version that comes from the repos with + # the given label and origin + for aver in pkg._pkg.version_list: + if aver is None or aver.file_list is None: + continue + for ver_file, _index in aver.file_list: + # print verFile + if ( + ver_file.origin == label + and ver_file.label == label + and ver_file.archive == release + ): + ver = aver + if not ver: + return None + indexfile = cache._list.find_index(ver.file_list[0][0]) + for metaindex in cache._list.list: + for m in metaindex.index_files: + if indexfile and indexfile.describe == m.describe and indexfile.is_trusted: + dirname = apt_pkg.config.find_dir("Dir::State::lists") + for relfile in ["InRelease", "Release"]: + name = apt_pkg.uri_to_filename( + metaindex.uri + ) + "dists_{}_{}".format( + metaindex.dist, + relfile, + ) + if os.path.exists(dirname + name): + return dirname + name + return None diff --git a/aptsources/__init__.py b/aptsources/__init__.py new file mode 100644 index 0000000..2ccf4fc --- /dev/null +++ b/aptsources/__init__.py @@ -0,0 +1,6 @@ +import apt_pkg + +# init the package system, but do not re-initialize config +if "APT" not in apt_pkg.config: + apt_pkg.init_config() +apt_pkg.init_system() diff --git a/aptsources/_deb822.py b/aptsources/_deb822.py new file mode 100644 index 0000000..d3b3a7c --- /dev/null +++ b/aptsources/_deb822.py @@ -0,0 +1,144 @@ +#!/usr/bin/python3 +# +# Copyright (C) Canonical Ltd +# +# SPDX-License-Identifier: GPL-2.0+ + +"""deb822 parser with support for comment headers and footers.""" + +import collections +import io +import typing + +import apt_pkg + +T = typing.TypeVar("T") + + +class Section: + """A single deb822 section, possibly with comments. + + This represents a single deb822 section. + """ + + tags: collections.OrderedDict[str, str] + _case_mapping: dict[str, str] + header: str + footer: str + + def __init__(self, section: typing.Union[str, "Section"]): + if isinstance(section, Section): + self.tags = collections.OrderedDict(section.tags) + self._case_mapping = {k.casefold(): k for k in self.tags} + self.header = section.header + self.footer = section.footer + return + + comments = ["", ""] + in_section = False + trimmed_section = "" + + for line in section.split("\n"): + if line.startswith("#"): + # remove the leading # + line = line[1:] + comments[in_section] += line + "\n" + continue + + in_section = True + trimmed_section += line + "\n" + + self.tags = collections.OrderedDict(apt_pkg.TagSection(trimmed_section)) + self._case_mapping = {k.casefold(): k for k in self.tags} + self.header, self.footer = comments + + def __getitem__(self, key: str) -> str: + """Get the value of a field.""" + return self.tags[self._case_mapping.get(key.casefold(), key)] + + def __delitem__(self, key: str) -> None: + """Delete a field""" + del self.tags[self._case_mapping.get(key.casefold(), key)] + + def __setitem__(self, key: str, val: str) -> None: + """Set the value of a field.""" + if key.casefold() not in self._case_mapping: + self._case_mapping[key.casefold()] = key + self.tags[self._case_mapping[key.casefold()]] = val + + def __bool__(self) -> bool: + return bool(self.tags) + + @typing.overload + def get(self, key: str) -> str | None: + ... + + @typing.overload + def get(self, key: str, default: T) -> T | str: + ... + + def get(self, key: str, default: T | None = None) -> T | None | str: + try: + return self[key] + except KeyError: + return default + + @staticmethod + def __comment_lines(content: str) -> str: + return ( + "\n".join("#" + line for line in content.splitlines()) + "\n" + if content + else "" + ) + + def __str__(self) -> str: + """Canonical string rendering of this section.""" + return ( + self.__comment_lines(self.header) + + "".join(f"{k}: {v}\n" for k, v in self.tags.items()) + + self.__comment_lines(self.footer) + ) + + +class File: + """ + Parse a given file object into a list of Section objects. + """ + + def __init__(self, fobj: io.TextIOBase): + self.sections = [] + section = "" + for line in fobj: + if not line.isspace(): + # A line is part of the section if it has non-whitespace characters + section += line + elif section: + # Our line is just whitespace and we have gathered section content, so let's write out the section + self.sections.append(Section(section)) + section = "" + + # The final section may not be terminated by an empty line + if section: + self.sections.append(Section(section)) + + def __iter__(self) -> typing.Iterator[Section]: + return iter(self.sections) + + def __str__(self) -> str: + return "\n".join(str(s) for s in self.sections) + + +if __name__ == "__main__": + st = """# Header +# More header +K1: V1 +# Inline +K2: V2 + # not a comment +# Footer +# More footer +""" + + s = Section(st) + + print(s) diff --git a/aptsources/distinfo.py b/aptsources/distinfo.py new file mode 100644 index 0000000..bd30f81 --- /dev/null +++ b/aptsources/distinfo.py @@ -0,0 +1,415 @@ +# distinfo.py - provide meta information for distro repositories +# +# Copyright (c) 2005 Gustavo Noronha Silva <kov@debian.org> +# Copyright (c) 2006-2007 Sebastian Heinlein <glatzor@ubuntu.com> +# +# Authors: Gustavo Noronha Silva <kov@debian.org> +# Sebastian Heinlein <glatzor@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import csv +import errno +import logging +import os +import re +from collections.abc import Iterator +from subprocess import PIPE, Popen +from typing import cast + +import apt_pkg +from apt_pkg import gettext as _ + + +def _expand_template(template: str, csv_path: str) -> Iterator[str]: + """Expand the given template. + + A template file consists of a header, followed by paragraphs + of templated suites, followed by a footer. A templated suite + is any paragraph where the Suite field contains {. + + This function expands all templated suites using the information + found in the CSV file supplied by distro-info-data. + + It yields lines of template info. + """ + + known_suites = set() + + # Copy out any header, and gather all hardcoded suites + with apt_pkg.TagFile(template) as tmpl: + for section in tmpl: + if "X-Exclude-Suites" in section: + known_suites.update(section["X-Exclude-Suites"].split(", ")) + if "Suite" in section: + if "{" in section["Suite"]: + break + + known_suites.add(section["Suite"]) + + yield from str(section).splitlines() + else: + # We did not break, so we did copy all of them + return + + for section in tmpl: + if "Suite" in section: + known_suites.add(section["Suite"]) + + with open(csv_path) as csv_object: + releases = reversed(list(csv.DictReader(csv_object))) + + # Perform template substitution on the middle of the list + for rel in releases: + if rel["series"] in known_suites: + continue + yield "" + rel["version"] = rel["version"].replace(" LTS", "") + with apt_pkg.TagFile(template) as tmpl: + for section in tmpl: + # Only work on template sections, this skips head and tails + if "Suite" not in section or "{" not in section["Suite"]: + continue + if "X-Version" in section: + # Version requirements. Maybe should be made nicer + ver = rel["version"] + if any( + ( + field.startswith("le") + and apt_pkg.version_compare(field[3:], ver) < 0 + ) + or ( + field.startswith("ge") + and apt_pkg.version_compare(field[3:], ver) > 0 + ) + for field in section["X-Version"].split(", ") + ): + continue + + for line in str(section).format(**rel).splitlines(): + if line.startswith("X-Version"): + continue + yield line + + # Copy out remaining suites + with apt_pkg.TagFile(template) as tmpl: + # Skip the head again, we don't want to copy it twice + for section in tmpl: + if "Suite" in section and "{" in section["Suite"]: + break + + for section in tmpl: + # Ignore any template parts and copy the rest out, + # this is the inverse of the template substitution loop + if "Suite" in section and "{" in section["Suite"]: + continue + + yield from str(section).splitlines() + + +class Template: + def __init__(self) -> None: + self.name: str | None = None + self.child = False + self.parents: list[Template] = [] # ref to parent template(s) + self.match_name: str | None = None + self.description: str | None = None + self.base_uri: str | None = None + self.type: str | None = None + self.components: list[Component] = [] + self.children: list[Template] = [] + self.match_uri: str | None = None + self.mirror_set: dict[str, Mirror] = {} + self.distribution: str | None = None + self.available = True + self.official = True + + def has_component(self, comp: str) -> bool: + """Check if the distribution provides the given component""" + return comp in (c.name for c in self.components) + + def is_mirror(self, url: str) -> bool: + """Check if a given url of a repository is a valid mirror""" + proto, hostname, dir = split_url(url) + if hostname in self.mirror_set: + return self.mirror_set[hostname].has_repository(proto, dir) + else: + return False + + +class Component: + def __init__( + self, + name: str, + desc: str | None = None, + long_desc: str | None = None, + parent_component: str | None = None, + ): + self.name = name + self.description = desc + self.description_long = long_desc + self.parent_component = parent_component + + def get_parent_component(self) -> str | None: + return self.parent_component + + def set_parent_component(self, parent: str) -> None: + self.parent_component = parent + + def get_description(self) -> str | None: + if self.description_long is not None: + return self.description_long + elif self.description is not None: + return self.description + else: + return None + + def set_description(self, desc: str) -> None: + self.description = desc + + def set_description_long(self, desc: str) -> None: + self.description_long = desc + + def get_description_long(self) -> str | None: + return self.description_long + + +class Mirror: + """Storage for mirror related information""" + + def __init__( + self, proto: str, hostname: str, dir: str, location: str | None = None + ): + self.hostname = hostname + self.repositories: list[Repository] = [] + self.add_repository(proto, dir) + self.location = location + + def add_repository(self, proto: str, dir: str) -> None: + self.repositories.append(Repository(proto, dir)) + + def get_repositories_for_proto(self, proto: str) -> list["Repository"]: + return [r for r in self.repositories if r.proto == proto] + + def has_repository(self, proto: str, dir: str) -> bool: + if dir is None: + return False + for r in self.repositories: + if r.proto == proto and dir in r.dir: + return True + return False + + def get_repo_urls(self) -> list[str]: + return [r.get_url(self.hostname) for r in self.repositories] + + def get_location(self) -> str | None: + return self.location + + def set_location(self, location: str) -> None: + self.location = location + + +class Repository: + def __init__(self, proto: str, dir: str) -> None: + self.proto = proto + self.dir = dir + + def get_info(self) -> tuple[str, str]: + return self.proto, self.dir + + def get_url(self, hostname: str) -> str: + return f"{self.proto}://{hostname}/{self.dir}" + + +def split_url(url: str) -> list[str]: + """split a given URL into the protocoll, the hostname and the dir part""" + split = re.split(":*\\/+", url, maxsplit=2) + while len(split) < 3: + split.append(None) + return split + + +class DistInfo: + def __init__( + self, + dist: str | None = None, + base_dir: str = "/usr/share/python-apt/templates", + ): + self.metarelease_uri = "" + self.templates: list[Template] = [] + self.arch = apt_pkg.config.find("APT::Architecture") + + location = None + match_loc = re.compile(r"^#LOC:(.+)$") + match_mirror_line = re.compile( + r"^(#LOC:.+)|(((http)|(ftp)|(rsync)|(file)|(mirror)|(https))://" + r"[A-Za-z0-9/\.:\-_@]+)$" + ) + # match_mirror_line = re.compile(r".+") + + if not dist: + try: + dist = ( + Popen( + ["lsb_release", "-i", "-s"], + universal_newlines=True, + stdout=PIPE, + ) + .communicate()[0] + .strip() + ) + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("lsb_release failed, using defaults: %s" % exc) + dist = "Debian" + + self.dist = dist + + map_mirror_sets = {} + + dist_fname = f"{base_dir}/{dist}.info" + csv_fname = f"/usr/share/distro-info/{dist.lower()}.csv" + + # FIXME: Logic doesn't work with types. + template = cast(Template, None) + component = cast(Component, None) + for line in _expand_template(dist_fname, csv_fname): + tokens = line.split(":", 1) + if len(tokens) < 2: + continue + field = tokens[0].strip() + value = tokens[1].strip() + if field == "ChangelogURI": + self.changelogs_uri = _(value) + elif field == "MetaReleaseURI": + self.metarelease_uri = value + elif field == "Suite": + self.finish_template(template, component) + component = cast(Component, None) # FIXME + template = Template() + template.name = value + template.distribution = dist + template.match_name = "^%s$" % value + elif field == "MatchName": + template.match_name = value + elif field == "ParentSuite": + template.child = True + for nanny in self.templates: + # look for parent and add back ref to it + if nanny.name == value: + template.parents.append(nanny) + nanny.children.append(template) + elif field == "Available": + template.available = apt_pkg.string_to_bool(value) + elif field == "Official": + template.official = apt_pkg.string_to_bool(value) + elif field == "RepositoryType": + template.type = value + elif field == "BaseURI" and not template.base_uri: + template.base_uri = value + elif field == "BaseURI-%s" % self.arch: + template.base_uri = value + elif field == "MatchURI" and not template.match_uri: + template.match_uri = value + elif field == "MatchURI-%s" % self.arch: + template.match_uri = value + elif field == "MirrorsFile" or field == "MirrorsFile-%s" % self.arch: + # Make the path absolute. + value = ( + os.path.isabs(value) + and value + or os.path.abspath(os.path.join(base_dir, value)) + ) + if value not in map_mirror_sets: + mirror_set: dict[str, Mirror] = {} + try: + with open(value) as value_f: + mirror_data = list( + filter( + match_mirror_line.match, + [x.strip() for x in value_f], + ) + ) + except Exception: + print(f"WARNING: Failed to read mirror file {value}") + mirror_data = [] + for line in mirror_data: + if line.startswith("#LOC:"): + location = match_loc.sub(r"\1", line) + continue + (proto, hostname, dir) = split_url(line) + if hostname in mirror_set: + mirror_set[hostname].add_repository(proto, dir) + else: + mirror_set[hostname] = Mirror( + proto, hostname, dir, location + ) + map_mirror_sets[value] = mirror_set + template.mirror_set = map_mirror_sets[value] + elif field == "Description": + template.description = _(value) + elif field == "Component": + if component and not template.has_component(component.name): + template.components.append(component) + component = Component(value) + elif field == "CompDescription": + component.set_description(_(value)) + elif field == "CompDescriptionLong": + component.set_description_long(_(value)) + elif field == "ParentComponent": + component.set_parent_component(value) + self.finish_template(template, component) + template = cast(Template, None) + component = cast(Component, None) + + def finish_template(self, template: Template, component: Component | None) -> None: + "finish the current tempalte" + if not template: + return + # reuse some properties of the parent template + if template.match_uri is None and template.child: + for t in template.parents: + if t.match_uri: + template.match_uri = t.match_uri + break + if template.mirror_set == {} and template.child: + for t in template.parents: + if t.match_uri: + template.mirror_set = t.mirror_set + break + if component and not template.has_component(component.name): + template.components.append(component) + component = None + # the official attribute is inherited + for t in template.parents: + template.official = t.official + self.templates.append(template) + + +if __name__ == "__main__": + d = DistInfo("Ubuntu", "/usr/share/python-apt/templates") + logging.info(d.changelogs_uri) + for template in d.templates: + logging.info("\nSuite: %s" % template.name) + logging.info("Desc: %s" % template.description) + logging.info("BaseURI: %s" % template.base_uri) + logging.info("MatchURI: %s" % template.match_uri) + if template.mirror_set != {}: + logging.info("Mirrors: %s" % list(template.mirror_set.keys())) + for comp in template.components: + logging.info(f" {comp.name} -{comp.description} -{comp.description_long}") + for child in template.children: + logging.info(" %s" % child.description) diff --git a/aptsources/distro.py b/aptsources/distro.py new file mode 100644 index 0000000..546d0e7 --- /dev/null +++ b/aptsources/distro.py @@ -0,0 +1,648 @@ +# distro.py - Provide a distro abstraction of the sources.list +# +# Copyright (c) 2004-2009 Canonical Ltd. +# Copyright (c) 2006-2007 Sebastian Heinlein +# Copyright (c) 2016 Harald Sitter +# +# Authors: Sebastian Heinlein <glatzor@ubuntu.com> +# Michael Vogt <mvo@debian.org> +# Harald Sitter <sitter@kde.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import gettext +import logging +import os +import re +import shlex +import warnings +from xml.etree.ElementTree import ElementTree + +from apt_pkg import gettext as _ + + +class NoDistroTemplateException(Exception): + pass + + +class Distribution: + def __init__(self, id, codename, description, release, is_like=[]): + """Container for distribution specific informations""" + # LSB information + self.id = id + self.codename = codename + self.description = description + self.release = release + self.is_like = is_like + + self.binary_type = "deb" + self.source_type = "deb-src" + + def get_sources(self, sourceslist): + """ + Find the corresponding template, main and child sources + for the distribution + """ + + self.sourceslist = sourceslist + # corresponding sources + self.source_template = None + self.child_sources = [] + self.main_sources = [] + self.disabled_sources = [] + self.cdrom_sources = [] + self.download_comps = [] + self.enabled_comps = [] + self.cdrom_comps = [] + self.used_media = [] + self.get_source_code = False + self.source_code_sources = [] + + # location of the sources + self.default_server = "" + self.main_server = "" + self.nearest_server = "" + self.used_servers = [] + + # find the distro template + for template in self.sourceslist.matcher.templates: + if self.is_codename(template.name) and template.distribution == self.id: + # print "yeah! found a template for %s" % self.description + # print template.description, template.base_uri, \ + # template.components + self.source_template = template + break + if self.source_template is None: + raise NoDistroTemplateException( + "Error: could not find a distribution template for %s/%s" + % (self.id, self.codename) + ) + + # find main and child sources + media = [] + comps = [] + cdrom_comps = [] + enabled_comps = [] + # source_code = [] + for source in self.sourceslist.exploded_list(): + if ( + not source.invalid + and self.is_codename(source.dist) + and source.template + and source.template.official + and self.is_codename(source.template.name) + ): + # print "yeah! found a distro repo: %s" % source.line + # cdroms need do be handled differently + if source.uri.startswith("cdrom:") and not source.disabled: + self.cdrom_sources.append(source) + cdrom_comps.extend(source.comps) + elif source.uri.startswith("cdrom:") and source.disabled: + self.cdrom_sources.append(source) + elif source.type == self.binary_type and not source.disabled: + self.main_sources.append(source) + comps.extend(source.comps) + media.append(source.uri) + elif source.type == self.binary_type and source.disabled: + self.disabled_sources.append(source) + elif source.type == self.source_type and not source.disabled: + self.source_code_sources.append(source) + elif source.type == self.source_type and source.disabled: + self.disabled_sources.append(source) + if not source.invalid and source.template in self.source_template.children: + if not source.disabled and source.type == self.binary_type: + self.child_sources.append(source) + elif not source.disabled and source.type == self.source_type: + self.source_code_sources.append(source) + else: + self.disabled_sources.append(source) + self.download_comps = set(comps) + self.cdrom_comps = set(cdrom_comps) + enabled_comps.extend(comps) + enabled_comps.extend(cdrom_comps) + self.enabled_comps = set(enabled_comps) + self.used_media = set(media) + self.get_mirrors() + + def get_mirrors(self, mirror_template=None): + """ + Provide a set of mirrors where you can get the distribution from + """ + # the main server is stored in the template + self.main_server = self.source_template.base_uri + + # other used servers + for medium in self.used_media: + if not medium.startswith("cdrom:"): + # seems to be a network source + self.used_servers.append(medium) + + if len(self.main_sources) == 0: + self.default_server = self.main_server + else: + self.default_server = self.main_sources[0].uri + + # get a list of country codes and real names + self.countries = {} + fname = "/usr/share/xml/iso-codes/iso_3166.xml" + if os.path.exists(fname): + et = ElementTree(file=fname) + # python2.6 compat, the next two lines can get removed + # once we do not use py2.6 anymore + if getattr(et, "iter", None) is None: + et.iter = et.getiterator + it = et.iter("iso_3166_entry") + for elm in it: + try: + descr = elm.attrib["common_name"] + except KeyError: + descr = elm.attrib["name"] + try: + code = elm.attrib["alpha_2_code"] + except KeyError: + code = elm.attrib["alpha_3_code"] + self.countries[code.lower()] = gettext.dgettext("iso_3166", descr) + + # try to guess the nearest mirror from the locale + self.country = None + self.country_code = None + locale = os.getenv("LANG", default="en_UK") + a = locale.find("_") + z = locale.find(".") + if z == -1: + z = len(locale) + country_code = locale[a + 1 : z].lower() + + if mirror_template: + self.nearest_server = mirror_template % country_code + + if country_code in self.countries: + self.country = self.countries[country_code] + self.country_code = country_code + + def _get_mirror_name(self, server): + """Try to get a human readable name for the main mirror of a country + Customize for different distributions""" + country = None + i = server.find("://") + li = server.find(".archive.ubuntu.com") + if i != -1 and li != -1: + country = server[i + len("://") : li] + if country in self.countries: + # TRANSLATORS: %s is a country + return _("Server for %s") % self.countries[country] + else: + return "%s" % server.rstrip("/ ") + + def get_server_list(self): + """Return a list of used and suggested servers""" + + def compare_mirrors(mir1, mir2): + """Helper function that handles comaprision of mirror urls + that could contain trailing slashes""" + return re.match(mir1.strip("/ "), mir2.rstrip("/ ")) + + # Store all available servers: + # Name, URI, active + mirrors = [] + if len(self.used_servers) < 1 or ( + len(self.used_servers) == 1 + and compare_mirrors(self.used_servers[0], self.main_server) + ): + mirrors.append([_("Main server"), self.main_server, True]) + if self.nearest_server: + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + elif len(self.used_servers) == 1 and not compare_mirrors( + self.used_servers[0], self.main_server + ): + mirrors.append([_("Main server"), self.main_server, False]) + # Only one server is used + server = self.used_servers[0] + + # Append the nearest server if it's not already used + if self.nearest_server: + if not compare_mirrors(server, self.nearest_server): + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + if server: + mirrors.append([self._get_mirror_name(server), server, True]) + + elif len(self.used_servers) > 1: + # More than one server is used. Since we don't handle this case + # in the user interface we set "custom servers" to true and + # append a list of all used servers + mirrors.append([_("Main server"), self.main_server, False]) + if self.nearest_server: + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + mirrors.append([_("Custom servers"), None, True]) + for server in self.used_servers: + mirror_entry = [self._get_mirror_name(server), server, False] + if compare_mirrors(server, self.nearest_server) or compare_mirrors( + server, self.main_server + ): + continue + elif mirror_entry not in mirrors: + mirrors.append(mirror_entry) + + return mirrors + + def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""): + """ + Add distribution specific sources + """ + if uri is None: + # FIXME: Add support for the server selector + uri = self.default_server + if dist is None: + dist = self.codename + if comps is None: + comps = list(self.enabled_comps) + if type is None: + type = self.binary_type + + parent = None + file = None + for parent in reversed(self.child_sources) or reversed(self.main_sources): + file = parent.file + break + + new_source = self.sourceslist.add( + type, uri, dist, comps, comment, parent=parent, file=file + ) + # if source code is enabled add a deb-src line after the new + # source + if self.get_source_code and type == self.binary_type: + self.sourceslist.add( + self.source_type, + uri, + dist, + comps, + comment, + file=new_source.file, + parent=new_source, + pos=self.sourceslist.list.index(new_source) + 1, + ) + + def enable_component(self, comp): + """ + Enable a component in all main, child and source code sources + (excluding cdrom based sources) + + comp: the component that should be enabled + """ + comps = list([comp]) + # look for parent components that we may have to add + for source in self.main_sources: + for c in source.template.components: + if c.name == comp and c.parent_component: + if c.parent_component not in comps: + comps.append(c.parent_component) + for c in comps: + self._enable_component(c) + + def _enable_component(self, comp): + def add_component_only_once(source, comps_per_dist): + """ + Check if we already added the component to the repository, since + a repository could be splitted into different apt lines. If not + add the component + """ + # if we don't have that distro, just return (can happen for e.g. + # dapper-update only in deb-src + if source.dist not in comps_per_dist: + return + # if we have seen this component already for this distro, + # return (nothing to do) + if comp in comps_per_dist[source.dist]: + return + # add it + source.comps = source.comps + [comp] + comps_per_dist[source.dist].add(comp) + + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + # store what comps are enabled already per distro (where distro is + # e.g. "dapper", "dapper-updates") + comps_per_dist = {} + comps_per_sdist = {} + for s in sources: + if s.type == self.binary_type: + if s.dist not in comps_per_dist: + comps_per_dist[s.dist] = set() + for c in s.comps: + comps_per_dist[s.dist].add(c) + for s in self.source_code_sources: + if s.type == self.source_type: + if s.dist not in comps_per_sdist: + comps_per_sdist[s.dist] = set() + for c in s.comps: + comps_per_sdist[s.dist].add(c) + + # check if there is a main source at all + if len(self.main_sources) < 1: + # create a new main source + self.add_source(comps=["%s" % comp]) + else: + # add the comp to all main, child and source code sources + for source in sources: + add_component_only_once(source, comps_per_dist) + + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + # check if there is a main source code source at all + if self.get_source_code: + if len(self.source_code_sources) < 1: + # create a new main source + self.add_source(type=self.source_type, comps=["%s" % comp]) + else: + # add the comp to all main, child and source code sources + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + def disable_component(self, comp): + """ + Disable a component in all main, child and source code sources + (excluding cdrom based sources) + """ + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + sources.extend(self.source_code_sources) + if comp in self.cdrom_comps: + sources = [] + sources.extend(self.main_sources) + for source in sources: + if comp in source.comps: + comps = source.comps + comps.remove(comp) + source.comps = comps + if len(source.comps) < 1: + self.sourceslist.remove(source) + + def change_server(self, uri): + """Change the server of all distro specific sources to + a given host""" + + def change_server_of_source(source, uri, seen): + # Avoid creating duplicate entries + source.uri = uri + for comp in source.comps: + if [source.uri, source.dist, comp] in seen: + source.comps.remove(comp) + else: + seen.append([source.uri, source.dist, comp]) + if len(source.comps) < 1: + self.sourceslist.remove(source) + + seen_binary = [] + seen_source = [] + self.default_server = uri + for source in self.main_sources: + change_server_of_source(source, uri, seen_binary) + for source in self.child_sources: + # Do not change the forces server of a child source + if ( + source.template.base_uri is None + or source.template.base_uri != source.uri + ): + change_server_of_source(source, uri, seen_binary) + for source in self.source_code_sources: + change_server_of_source(source, uri, seen_source) + + def is_codename(self, name): + """Compare a given name with the release codename.""" + if name == self.codename: + return True + else: + return False + + +class DebianDistribution(Distribution): + """Class to support specific Debian features""" + + def is_codename(self, name): + """Compare a given name with the release codename and check if + if it can be used as a synonym for a development releases""" + if name == self.codename or self.release in ("testing", "unstable"): + return True + else: + return False + + def _get_mirror_name(self, server): + """Try to get a human readable name for the main mirror of a country + Debian specific""" + country = None + i = server.find("://ftp.") + li = server.find(".debian.org") + if i != -1 and li != -1: + country = server[i + len("://ftp.") : li] + if country in self.countries: + # TRANSLATORS: %s is a country + return ( + _("Server for %s") + % gettext.dgettext( + "iso_3166", self.countries[country].rstrip() + ).rstrip() + ) + else: + return "%s" % server.rstrip("/ ") + + def get_mirrors(self): + Distribution.get_mirrors( + self, mirror_template="http://ftp.%s.debian.org/debian/" + ) + + +class UbuntuDistribution(Distribution): + """Class to support specific Ubuntu features""" + + def get_mirrors(self): + Distribution.get_mirrors( + self, mirror_template="http://%s.archive.ubuntu.com/ubuntu/" + ) + + +class UbuntuRTMDistribution(UbuntuDistribution): + """Class to support specific Ubuntu RTM features""" + + def get_mirrors(self): + self.main_server = self.source_template.base_uri + + +def _lsb_release(): + """Call lsb_release --idrc and return a mapping.""" + import errno + from subprocess import PIPE, Popen + + result = { + "Codename": "sid", + "Distributor ID": "Debian", + "Description": "Debian GNU/Linux unstable (sid)", + "Release": "unstable", + } + try: + out = Popen(["lsb_release", "-idrc"], stdout=PIPE).communicate()[0] + # Convert to unicode string, needed for Python 3.1 + out = out.decode("utf-8") + result.update(line.split(":\t") for line in out.split("\n") if ":\t" in line) + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("lsb_release failed, using defaults:" % exc) + return result + + +def _system_image_channel(): + """Get the current channel from system-image-cli -i if possible.""" + import errno + from subprocess import DEVNULL, PIPE, Popen + + try: + out = Popen( + ["system-image-cli", "-i"], + stdout=PIPE, + stderr=DEVNULL, + universal_newlines=True, + ).communicate()[0] + for line in out.splitlines(): + if line.startswith("channel: "): + return line.split(": ", 1)[1] + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("system-image-cli failed, using defaults: %s" % exc) + return None + + +class _OSRelease: + DEFAULT_OS_RELEASE_FILE = "/etc/os-release" + OS_RELEASE_FILE = "/etc/os-release" + + def __init__(self, lsb_compat=True): + self.result = {} + self.valid = False + self.file = _OSRelease.OS_RELEASE_FILE + + if not os.path.isfile(self.file): + return + + self.parse() + self.valid = True + + if lsb_compat: + self.inject_lsb_compat() + + def inject_lsb_compat(self): + self.result["Distributor ID"] = self.result["ID"] + self.result["Description"] = self.result["PRETTY_NAME"] + # Optionals as per os-release spec. + self.result["Codename"] = self.result.get("VERSION_CODENAME") + if not self.result["Codename"]: + # Transient Ubuntu 16.04 field (LP: #1598212) + self.result["Codename"] = self.result.get("UBUNTU_CODENAME") + self.result["Release"] = self.result.get("VERSION_ID") + + def parse(self): + f = open(self.file) + for line in f: + line = line.strip() + if not line: + continue + self.parse_entry(*line.split("=", 1)) + f.close() + + def parse_entry(self, key, value): + value = self.parse_value(value) # Values can be shell strings... + if key == "ID_LIKE" and isinstance(value, str): + # ID_LIKE is specified as quoted space-separated list. This will + # be parsed as string that we need to split manually. + value = value.split(" ") + self.result[key] = value + + def parse_value(self, value): + values = shlex.split(value) + if len(values) == 1: + return values[0] + return values + + +def get_distro(id=None, codename=None, description=None, release=None, is_like=[]): + """ + Check the currently used distribution and return the corresponding + distriubtion class that supports distro specific features. + + If no paramter are given the distro will be auto detected via + a call to lsb-release + """ + # make testing easier + if not (id and codename and description and release): + if id or codename or description or release: + warnings.warn( + "Provided only a subset of arguments", DeprecationWarning, stacklevel=2 + ) + os_release = _OSRelease() + os_result = [] + lsb_result = _lsb_release() + if os_release.valid: + os_result = os_release.result + # TODO: We cannot presently use os-release to fully replace lsb_release + # because os-release's ID, VERSION_ID and VERSION_CODENAME fields + # are specified as lowercase. In lsb_release they can be upcase + # or captizalized. So, switching to os-release would consitute + # a behavior break a which point lsb_release support should be + # fully removed. + # This in particular is a problem for template matching, as this + # matches against Distribution objects and depends on string + # case. + lsb_result = _lsb_release() + id = lsb_result["Distributor ID"] + codename = lsb_result["Codename"] + description = lsb_result["Description"] + release = lsb_result["Release"] + # Not available with LSB, use get directly. + is_like = os_result.get("ID_LIKE", []) + if id == "Ubuntu": + channel = _system_image_channel() + if channel is not None and "ubuntu-rtm/" in channel: + id = "Ubuntu-RTM" + codename = channel.rsplit("/", 1)[1].split("-", 1)[0] + description = codename + release = codename + if id == "Ubuntu": + return UbuntuDistribution(id, codename, description, release, is_like) + if id == "Ubuntu-RTM": + return UbuntuRTMDistribution(id, codename, description, release, is_like) + elif id == "Debian": + return DebianDistribution(id, codename, description, release, is_like) + else: + return Distribution(id, codename, description, release, is_like) diff --git a/aptsources/sourceslist.py b/aptsources/sourceslist.py new file mode 100644 index 0000000..b227690 --- /dev/null +++ b/aptsources/sourceslist.py @@ -0,0 +1,1083 @@ +# sourceslist.py - Provide an abstraction of the sources.list +# +# Copyright (c) 2004-2023 Canonical Ltd. +# Copyright (c) 2004 Michiel Sikkes +# Copyright (c) 2006-2007 Sebastian Heinlein +# +# Authors: Michiel Sikkes <michiel@eyesopened.nl> +# Michael Vogt <mvo@debian.org> +# Sebastian Heinlein <glatzor@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import builtins +import glob +import io +import logging +import os.path +import re +import shutil +import time +import weakref +from collections.abc import Callable, Iterable, Iterator +from typing import Any, Generic, Optional, TypeVar, Union + +import apt_pkg + +from . import _deb822 +from .distinfo import DistInfo, Template + +# from apt_pkg import gettext as _ + +T = TypeVar("T") + +# some global helpers + +__all__ = [ + "is_mirror", + "Deb822SourceEntry", + "SourceEntry", + "NullMatcher", + "SourcesList", + "SourceEntryMatcher", +] + + +def is_mirror(master_uri: str, compare_uri: str) -> bool: + """check if the given add_url is idential or a mirror of orig_uri e.g.: + master_uri = archive.ubuntu.com + compare_uri = de.archive.ubuntu.com + -> True + """ + # remove traling spaces and "/" + compare_uri = compare_uri.rstrip("/ ") + master_uri = master_uri.rstrip("/ ") + # uri is identical + if compare_uri == master_uri: + # print "Identical" + return True + # add uri is a master site and orig_uri has the from "XX.mastersite" + # (e.g. de.archive.ubuntu.com) + try: + compare_srv = compare_uri.split("//")[1] + master_srv = master_uri.split("//")[1] + # print "%s == %s " % (add_srv, orig_srv) + except IndexError: # ok, somethings wrong here + # print "IndexError" + return False + # remove the leading "<country>." (if any) and see if that helps + if "." in compare_srv and compare_srv[compare_srv.index(".") + 1 :] == master_srv: + # print "Mirror" + return True + return False + + +def uniq(s: Iterable[T]) -> list[T]: + """simple and efficient way to return uniq collection + + This is not intended for use with a SourceList. It is provided + for internal use only. It does not have a leading underscore to + not break any old code that uses it; but it should not be used + in new code (and is not listed in __all__).""" + return list(set(s)) + + +class SingleValueProperty(property): + def __init__(self, key: str, doc: str): + self.key = key + self.__doc__ = doc + + def __get__( + self, obj: Optional["Deb822SourceEntry"], objtype: type | None = None + ) -> str | None: + if obj is None: + return self # type: ignore + return obj.section.get(self.key, None) + + def __set__(self, obj: "Deb822SourceEntry", value: str | None) -> None: + if value is None: + del obj.section[self.key] + else: + obj.section[self.key] = value + + +class MultiValueProperty(property): + def __init__(self, key: str, doc: str): + self.key = key + self.__doc__ = doc + + def __get__( + self, obj: Optional["Deb822SourceEntry"], objtype: type | None = None + ) -> list[str]: + if obj is None: + return self # type: ignore + return SourceEntry.mysplit(obj.section.get(self.key, "")) + + def __set__(self, obj: "Deb822SourceEntry", values: list[str]) -> None: + obj.section[self.key] = " ".join(values) + + +class ExplodedEntryProperty(property, Generic[T]): + def __init__(self, parent: T): + self.parent = parent + + def __get__( + self, obj: Optional["ExplodedDeb822SourceEntry"], objtype: type | None = None + ) -> T: + if obj is None: + return self # type: ignore + return self.parent.__get__(obj.parent) # type: ignore + + def __set__(self, obj: "ExplodedDeb822SourceEntry", value: T) -> None: + obj.split_out() + self.parent.__set__(obj.parent, value) # type: ignore + + +def DeprecatedProperty(prop: T) -> T: + return prop + + +def _null_weakref() -> None: + """Behaves like an expired weakref.ref, returning None""" + return None + + +class Deb822SourceEntry: + def __init__( + self, + section: _deb822.Section | str | None, + file: str, + list: Optional["SourcesList"] = None, + ): + if section is None: + self.section = _deb822.Section("") + elif isinstance(section, str): + self.section = _deb822.Section(section) + else: + self.section = section + + self._line = str(self.section) + self.file = file + self.template: Template | None = None # type DistInfo.Suite + self.may_merge = False + self._children = weakref.WeakSet["ExplodedDeb822SourceEntry"]() + + if list: + self._list: Callable[[], SourcesList | None] = weakref.ref(list) + else: + self._list = _null_weakref + + def __eq__(self, other: Any) -> Any: + # FIXME: Implement plurals more correctly + """equal operator for two sources.list entries""" + return ( + self.disabled == other.disabled + and self.type == other.type + and self.uri + and self.uri.rstrip("/") == other.uri.rstrip("/") + and self.dist == other.dist + and self.comps == other.comps + ) + + architectures = MultiValueProperty("Architectures", "The list of architectures") + types = MultiValueProperty("Types", "The list of types") + type = DeprecatedProperty(SingleValueProperty("Types", "The list of types")) + uris = MultiValueProperty("URIs", "URIs in the source") + uri = DeprecatedProperty(SingleValueProperty("URIs", "URIs in the source")) + suites = MultiValueProperty("Suites", "Suites in the source") + dist = DeprecatedProperty(SingleValueProperty("Suites", "Suites in the source")) + comps = MultiValueProperty("Components", "Components in the source") + + @property + def comment(self) -> str: + """Legacy attribute describing the paragraph header.""" + return self.section.header + + @comment.setter + def comment(self, comment: str) -> None: + """Legacy attribute describing the paragraph header.""" + self.section.header = comment + + @property + def trusted(self) -> bool | None: + """Return the value of the Trusted field""" + try: + return apt_pkg.string_to_bool(self.section["Trusted"]) + except KeyError: + return None + + @trusted.setter + def trusted(self, value: bool | None) -> None: + if value is None: + try: + del self.section["Trusted"] + except KeyError: + pass + else: + self.section["Trusted"] = "yes" if value else "no" + + @property + def disabled(self) -> bool: + """Check if Enabled: no is set.""" + return not apt_pkg.string_to_bool(self.section.get("Enabled", "yes")) + + @disabled.setter + def disabled(self, value: bool) -> None: + if value: + self.section["Enabled"] = "no" + else: + try: + del self.section["Enabled"] + except KeyError: + pass + + @property + def invalid(self) -> bool: + """A section is invalid if it doesn't have proper entries.""" + return not self.section + + @property + def line(self) -> str: + """The entire (original) paragraph.""" + return self._line + + def __str__(self) -> str: + return self.str().strip() + + def str(self) -> str: + """Section as a string, newline terminated.""" + return str(self.section) + + def set_enabled(self, enabled: bool) -> None: + """Deprecated (for deb822) accessor for .disabled""" + self.disabled = not enabled + + def merge(self, other: "AnySourceEntry") -> bool: + """Merge the two entries if they are compatible.""" + if ( + not self.may_merge + and self.template is None + and not all(child.template for child in self._children) + ): + return False + if self.file != other.file: + return False + if not isinstance(other, Deb822SourceEntry): + return False + if self.comment != other.comment and not any( + "Added by software-properties" in c for c in (self.comment, other.comment) + ): + return False + + for tag in set(list(self.section.tags) + list(other.section.tags)): + if tag.lower() in ( + "types", + "uris", + "suites", + "components", + "architectures", + "signed-by", + ): + continue + in_self = self.section.get(tag, None) + in_other = other.section.get(tag, None) + if in_self != in_other: + return False + + if ( + sum( + [ + set(self.types) != set(other.types), + set(self.uris) != set(other.uris), + set(self.suites) != set(other.suites), + set(self.comps) != set(other.comps), + set(self.architectures) != set(other.architectures), + ] + ) + > 1 + ): + return False + + for typ in other.types: + if typ not in self.types: + self.types += [typ] + + for uri in other.uris: + if uri not in self.uris: + self.uris += [uri] + + for suite in other.suites: + if suite not in self.suites: + self.suites += [suite] + + for component in other.comps: + if component not in self.comps: + self.comps += [component] + + for arch in other.architectures: + if arch not in self.architectures: + self.architectures += [arch] + + return True + + def _reparent_children(self, to: "Deb822SourceEntry") -> None: + """If we end up being split, check if any of our children need to be reparented to the new parent.""" + for child in self._children: + for typ in to.types: + for uri in to.uris: + for suite in to.suites: + if (child._type, child._uri, child._suite) == (typ, uri, suite): + assert child.parent == self + child._parent = weakref.ref(to) + + +class ExplodedDeb822SourceEntry: + """This represents a bit of a deb822 paragraph corresponding to a legacy sources.list entry""" + + # Mostly we use slots to prevent accidentally assigning unproxied attributes + __slots__ = ["_parent", "_type", "_uri", "_suite", "template", "__weakref__"] + + def __init__(self, parent: Deb822SourceEntry, typ: str, uri: str, suite: str): + self._parent = weakref.ref(parent) + self._type = typ + self._uri = uri + self._suite = suite + self.template = parent.template + parent._children.add(self) + + @property + def parent(self) -> Deb822SourceEntry: + if self._parent is not None: + if (parent := self._parent()) is not None: + return parent + raise ValueError("The parent entry is no longer valid") + + @property + def uri(self) -> str: + self.__check_valid() + return self._uri + + @uri.setter + def uri(self, uri: str) -> None: + self.split_out() + self.parent.uris = [u if u != self._uri else uri for u in self.parent.uris] + self._uri = uri + + @property + def types(self) -> list[str]: + return [self.type] + + @property + def suites(self) -> list[str]: + return [self.dist] + + @property + def uris(self) -> list[str]: + return [self.uri] + + @property + def type(self) -> str: + self.__check_valid() + return self._type + + @type.setter + def type(self, typ: str) -> None: + self.split_out() + self.parent.types = [typ] + self._type = typ + self.__check_valid() + assert self._type == typ + assert self.parent.types == [self._type] + + @property + def dist(self) -> str: + self.__check_valid() + return self._suite + + @dist.setter + def dist(self, suite: str) -> None: + self.split_out() + self.parent.suites = [suite] + self._suite = suite + self.__check_valid() + assert self._suite == suite + assert self.parent.suites == [self._suite] + + def __check_valid(self) -> None: + if self.parent._list() is None: + raise ValueError("The parent entry is dead") + for type in self.parent.types: + for uri in self.parent.uris: + for suite in self.parent.suites: + if (type, uri, suite) == (self._type, self._uri, self._suite): + return + raise ValueError(f"Could not find parent of {self}") + + def split_out(self) -> None: + parent = self.parent + if (parent.types, parent.uris, parent.suites) == ( + [self._type], + [self._uri], + [self._suite], + ): + return + sources_list = parent._list() + if sources_list is None: + raise ValueError("The parent entry is dead") + + try: + index = sources_list.list.index(parent) + except ValueError as e: + raise ValueError( + f"Parent entry for partial deb822 {self} no longer valid" + ) from e + + sources_list.remove(parent) + + reparented = False + for type in reversed(parent.types): + for uri in reversed(parent.uris): + for suite in reversed(parent.suites): + new = Deb822SourceEntry( + section=_deb822.Section(parent.section), + file=parent.file, + list=sources_list, + ) + new.types = [type] + new.uris = [uri] + new.suites = [suite] + new.may_merge = True + + parent._reparent_children(new) + sources_list.list.insert(index, new) + if (type, uri, suite) == (self._type, self._uri, self._suite): + self._parent = weakref.ref(new) + reparented = True + if not reparented: + raise ValueError(f"Could not find parent of {self}") + + def __repr__(self) -> str: + return f"<child {self._type} {self._uri} {self._suite} of {self._parent}" + + architectures = ExplodedEntryProperty(Deb822SourceEntry.architectures) + comps = ExplodedEntryProperty(Deb822SourceEntry.comps) + invalid = ExplodedEntryProperty(Deb822SourceEntry.invalid) + disabled = ExplodedEntryProperty[bool](Deb822SourceEntry.disabled) # type: ignore + trusted = ExplodedEntryProperty(Deb822SourceEntry.trusted) + comment = ExplodedEntryProperty(Deb822SourceEntry.comment) + + def set_enabled(self, enabled: bool) -> None: + """Set the source to enabled.""" + self.disabled = not enabled + + @property + def file(self) -> str: + """Return the file.""" + return self.parent.file + + +class SourceEntry: + """single sources.list entry""" + + def __init__(self, line: str, file: str | None = None): + self.invalid = False # is the source entry valid + self.disabled = False # is it disabled ('#' in front) + self.type = "" # what type (deb, deb-src) + self.architectures: list[str] = [] # architectures + self.trusted: bool | None = None # Trusted + self.uri = "" # base-uri + self.dist = "" # distribution (dapper, edgy, etc) + self.comps: list[str] = [] # list of available componetns (may empty) + self.comment = "" # (optional) comment + self.line = line # the original sources.list line + if file is None: + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if file.endswith(".sources"): + raise ValueError("Classic SourceEntry cannot be written to .sources file") + self.file = file # the file that the entry is located in + self.parse(line) + self.template: Template | None = None # type DistInfo.Suite + self.children: list[SourceEntry] = [] + + def __eq__(self, other: Any) -> Any: + """equal operator for two sources.list entries""" + return ( + self.disabled == other.disabled + and self.type == other.type + and self.uri.rstrip("/") == other.uri.rstrip("/") + and self.dist == other.dist + and self.comps == other.comps + ) + + @staticmethod + def mysplit(line: str) -> list[str]: + """a split() implementation that understands the sources.list + format better and takes [] into account (for e.g. cdroms)""" + line = line.strip() + pieces = [] + tmp = "" + # we are inside a [..] block + p_found = False + space_found = False + for i in range(len(line)): + if line[i] == "[": + if space_found: + space_found = False + p_found = True + pieces.append(tmp) + tmp = line[i] + else: + p_found = True + tmp += line[i] + elif line[i] == "]": + p_found = False + tmp += line[i] + elif space_found and not line[i].isspace(): + # we skip one or more space + space_found = False + pieces.append(tmp) + tmp = line[i] + elif line[i].isspace() and not p_found: + # found a whitespace + space_found = True + else: + tmp += line[i] + # append last piece + if len(tmp) > 0: + pieces.append(tmp) + return pieces + + def parse(self, line: str) -> None: + """parse a given sources.list (textual) line and break it up + into the field we have""" + self.line = line + line = line.strip() + # check if the source is enabled/disabled + if line == "" or line == "#": # empty line + self.invalid = True + return + if line[0] == "#": + self.disabled = True + pieces = line[1:].strip().split() + # if it looks not like a disabled deb line return + if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"): + self.invalid = True + return + else: + line = line[1:] + # check for another "#" in the line (this is treated as a comment) + i = line.find("#") + if i > 0: + self.comment = line[i + 1 :] + line = line[:i] + # source is ok, split it and see what we have + pieces = self.mysplit(line) + # Sanity check + if len(pieces) < 3: + self.invalid = True + return + # Type, deb or deb-src + self.type = pieces[0].strip() + # Sanity check + if self.type not in ("deb", "deb-src", "rpm", "rpm-src"): + self.invalid = True + return + + if pieces[1].strip()[0] == "[": + options = pieces.pop(1).strip("[]").split() + for option in options: + try: + key, value = option.split("=", 1) + except Exception: + self.invalid = True + else: + if key == "arch": + self.architectures = value.split(",") + elif key == "trusted": + self.trusted = apt_pkg.string_to_bool(value) + else: + self.invalid = True + + # URI + self.uri = pieces[1].strip() + if len(self.uri) < 1: + self.invalid = True + # distro and components (optional) + # Directory or distro + self.dist = pieces[2].strip() + if len(pieces) > 3: + # List of components + self.comps = pieces[3:] + else: + self.comps = [] + + def set_enabled(self, new_value: bool) -> None: + """set a line to enabled or disabled""" + self.disabled = not new_value + # enable, remove all "#" from the start of the line + if new_value: + self.line = self.line.lstrip().lstrip("#") + else: + # disabled, add a "#" + if self.line.strip()[0] != "#": + self.line = "#" + self.line + + def __str__(self) -> str: + """debug helper""" + return self.str().strip() + + def str(self) -> str: + """return the current line as string""" + if self.invalid: + return self.line + line = "" + if self.disabled: + line = "# " + + line += self.type + + if self.architectures and self.trusted is not None: + line += " [arch={} trusted={}]".format( + ",".join(self.architectures), + "yes" if self.trusted else "no", + ) + elif self.trusted is not None: + line += " [trusted=%s]" % ("yes" if self.trusted else "no") + elif self.architectures: + line += " [arch=%s]" % ",".join(self.architectures) + line += f" {self.uri} {self.dist}" + if len(self.comps) > 0: + line += " " + " ".join(self.comps) + if self.comment != "": + line += " #" + self.comment + line += "\n" + return line + + @property + def types(self) -> list[builtins.str]: + """deb822 compatible accessor for the type""" + return [self.type] + + @property + def uris(self) -> list[builtins.str]: + """deb822 compatible accessor for the uri""" + return [self.uri] + + @property + def suites(self) -> list[builtins.str]: + """deb822 compatible accessor for the suite""" + return [self.dist] + + +AnySourceEntry = Union[SourceEntry, Deb822SourceEntry] +AnyExplodedSourceEntry = Union[ + SourceEntry, Deb822SourceEntry, ExplodedDeb822SourceEntry +] + + +class NullMatcher: + """a Matcher that does nothing""" + + def match(self, s: AnyExplodedSourceEntry) -> bool: + return True + + +class SourcesList: + """represents the full sources.list + sources.list.d file""" + + def __init__( + self, + withMatcher: bool = True, + matcherPath: str = "/usr/share/python-apt/templates/", + *, + deb822: bool = False, + ): + self.list: list[AnySourceEntry] = [] # the actual SourceEntries Type + self.matcher: NullMatcher | SourceEntryMatcher + if withMatcher: + self.matcher = SourceEntryMatcher(matcherPath) + else: + self.matcher = NullMatcher() + self.deb822 = deb822 + self.refresh() + + def refresh(self) -> None: + """update the list of known entries""" + self.list = [] + # read sources.list + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if file != "/dev/null" and os.path.exists(file): + self.load(file) + # read sources.list.d + partsdir = apt_pkg.config.find_dir("Dir::Etc::sourceparts") + if partsdir != "/dev/null" and os.path.exists(partsdir): + for file in os.listdir(partsdir): + if (self.deb822 and file.endswith(".sources")) or file.endswith( + ".list" + ): + self.load(os.path.join(partsdir, file)) + # check if the source item fits a predefined template + for source in self.list: + if not source.invalid: + self.matcher.match(source) + + def __iter__(self) -> Iterator[AnySourceEntry]: + """simple iterator to go over self.list, returns SourceEntry + types""" + yield from self.list + + def __find( + self, *predicates: Callable[[AnyExplodedSourceEntry], bool], **attrs: Any + ) -> Iterator[AnyExplodedSourceEntry]: + uri = attrs.pop("uri", None) + for source in self.exploded_list(): + if uri and source.uri and uri.rstrip("/") != source.uri.rstrip("/"): + continue + if all(getattr(source, key) == attrs[key] for key in attrs) and all( + predicate(source) for predicate in predicates + ): + yield source + + def add( + self, + type: str, + uri: str, + dist: str, + orig_comps: list[str], + comment: str = "", + pos: int = -1, + file: str | None = None, + architectures: Iterable[str] = [], + parent: AnyExplodedSourceEntry | None = None, + ) -> AnyExplodedSourceEntry: + """ + Add a new source to the sources.list. + The method will search for existing matching repos and will try to + reuse them as far as possible + """ + + type = type.strip() + disabled = type.startswith("#") + if disabled: + type = type[1:].lstrip() + architectures = set(architectures) + # create a working copy of the component list so that + # we can modify it later + comps = orig_comps[:] + sources = self.__find( + lambda s: set(s.architectures) == architectures, + disabled=disabled, + invalid=False, + type=type, + uri=uri, + dist=dist, + ) + # check if we have this source already in the sources.list + for source in sources: + for new_comp in comps: + if new_comp in source.comps: + # we have this component already, delete it + # from the new_comps list + del comps[comps.index(new_comp)] + if len(comps) == 0: + return source + + sources = self.__find( + lambda s: set(s.architectures) == architectures, + invalid=False, + type=type, + uri=uri, + dist=dist, + ) + for source in sources: + if source.disabled == disabled: + # if there is a repo with the same (disabled, type, uri, dist) + # just add the components + if set(source.comps) != set(comps): + source.comps = uniq(source.comps + comps) + return source + elif source.disabled and not disabled: + # enable any matching (type, uri, dist), but disabled repo + if set(source.comps) == set(comps): + source.disabled = False + return source + + new_entry: AnySourceEntry + if file is None: + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if file.endswith(".sources"): + new_entry = Deb822SourceEntry(None, file=file, list=self) + if parent: + parent = getattr(parent, "parent", parent) + assert isinstance(parent, Deb822SourceEntry) + for k in parent.section.tags: + new_entry.section[k] = parent.section[k] + new_entry.types = [type] + new_entry.uris = [uri] + new_entry.suites = [dist] + new_entry.comps = comps + if architectures: + new_entry.architectures = list(architectures) + new_entry.section.header = comment + new_entry.disabled = disabled + else: + # there isn't any matching source, so create a new line and parse it + parts = [ + "#" if disabled else "", + type, + ("[arch=%s]" % ",".join(architectures)) if architectures else "", + uri, + dist, + ] + parts.extend(comps) + if comment: + parts.append("#" + comment) + line = " ".join(part for part in parts if part) + "\n" + + new_entry = SourceEntry(line) + if file is not None: + new_entry.file = file + + self.matcher.match(new_entry) + if pos < 0: + self.list.append(new_entry) + else: + self.list.insert(pos, new_entry) + return new_entry + + def remove(self, source_entry: AnyExplodedSourceEntry) -> None: + """remove the specified entry from the sources.list""" + if isinstance(source_entry, ExplodedDeb822SourceEntry): + source_entry.split_out() + source_entry = source_entry.parent + self.list.remove(source_entry) + + def restore_backup(self, backup_ext: str) -> None: + "restore sources.list files based on the backup extension" + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if os.path.exists(file + backup_ext) and os.path.exists(file): + shutil.copy(file + backup_ext, file) + # now sources.list.d + partsdir = apt_pkg.config.find_dir("Dir::Etc::sourceparts") + for file in glob.glob("%s/*" % partsdir): + if os.path.exists(file + backup_ext): + shutil.copy(file + backup_ext, file) + + def backup(self, backup_ext: str | None = None) -> str: + """make a backup of the current source files, if no backup extension + is given, the current date/time is used (and returned)""" + already_backuped: Iterable[str] = set() + if backup_ext is None: + backup_ext = time.strftime("%y%m%d.%H%M") + for source in self.list: + if source.file not in already_backuped and os.path.exists(source.file): + shutil.copy(source.file, f"{source.file}{backup_ext}") + return backup_ext + + def load(self, file: str) -> None: + """(re)load the current sources""" + try: + with open(file) as f: + if file.endswith(".sources"): + for section in _deb822.File(f): + self.list.append(Deb822SourceEntry(section, file, list=self)) + else: + for line in f: + source = SourceEntry(line, file) + self.list.append(source) + except Exception as exc: + logging.warning(f"could not open file '{file}': {exc}\n") + + def index(self, entry: AnyExplodedSourceEntry) -> int: + if isinstance(entry, ExplodedDeb822SourceEntry): + return self.list.index(entry.parent) + return self.list.index(entry) + + def merge(self) -> None: + """Merge consecutive entries that have been split back together.""" + merged = True + while merged: + i = 0 + merged = False + while i + 1 < len(self.list): + entry = self.list[i] + if isinstance(entry, Deb822SourceEntry): + j = i + 1 + while j < len(self.list): + if entry.merge(self.list[j]): + del self.list[j] + merged = True + else: + j += 1 + i += 1 + + def save(self) -> None: + """save the current sources""" + files: dict[str, io.TextIOWrapper] = {} + # write an empty default config file if there aren't any sources + if len(self.list) == 0: + path = apt_pkg.config.find_file("Dir::Etc::sourcelist") + header = ( + "## See sources.list(5) for more information, especialy\n" + "# Remember that you can only use http, ftp or file URIs\n" + "# CDROMs are managed through the apt-cdrom tool.\n" + ) + + with open(path, "w") as f: + f.write(header) + return + + self.merge() + try: + for source in self.list: + if source.file not in files: + files[source.file] = open(source.file, "w") + elif isinstance(source, Deb822SourceEntry): + files[source.file].write("\n") + files[source.file].write(source.str()) + finally: + for f in files.values(): + f.close() + + def check_for_relations( + self, sources_list: Iterable[AnySourceEntry] + ) -> tuple[list[AnySourceEntry], dict[Template, list[AnySourceEntry]]]: + """get all parent and child channels in the sources list""" + parents = [] + used_child_templates: dict[Template, list[AnySourceEntry]] = {} + for source in sources_list: + # try to avoid checking uninterressting sources + if source.template is None: + continue + # set up a dict with all used child templates and corresponding + # source entries + if source.template.child: + key = source.template + if key not in used_child_templates: + used_child_templates[key] = [] + temp = used_child_templates[key] + temp.append(source) + else: + # store each source with children aka. a parent :) + if len(source.template.children) > 0: + parents.append(source) + # print self.used_child_templates + # print self.parents + return (parents, used_child_templates) + + def exploded_list(self) -> list[AnyExplodedSourceEntry]: + """Present an exploded view of the list where each entry corresponds exactly to a Release file. + + A release file is uniquely identified by the triplet (type, uri, suite). Old style entries + always referred to a single release file, but deb822 entries allow multiple values for each + of those fields. + """ + res: list[AnyExplodedSourceEntry] = [] + for entry in self.list: + if isinstance(entry, SourceEntry): + res.append(entry) + elif ( + len(entry.types) == 1 + and len(entry.uris) == 1 + and len(entry.suites) == 1 + ): + res.append(entry) + else: + for typ in entry.types: + for uri in entry.uris: + for sui in entry.suites: + res.append(ExplodedDeb822SourceEntry(entry, typ, uri, sui)) + self.matcher.match(res[-1]) + + return res + + +class SourceEntryMatcher: + """matcher class to make a source entry look nice + lots of predefined matchers to make it i18n/gettext friendly + """ + + def __init__(self, matcherPath: str): + self.templates: list[Template] = [] + # Get the human readable channel and comp names from the channel .infos + spec_files = glob.glob("%s/*.info" % matcherPath) + for f in spec_files: + f = os.path.basename(f) + i = f.find(".info") + f = f[0:i] + dist = DistInfo(f, base_dir=matcherPath) + for template in dist.templates: + if template.match_uri is not None: + self.templates.append(template) + return + + def match(self, source: AnyExplodedSourceEntry) -> bool: + """Add a matching template to the source""" + found = False + for template in self.templates: + if source.uri is None or source.dist is None: + continue + if ( + template.match_uri is not None + and template.match_name is not None + and source.uri is not None + and source.dist is not None + and re.search(template.match_uri, source.uri) + and re.match(template.match_name, source.dist) + and + # deb is a valid fallback for deb-src (if that is not + # definied, see #760035 + (source.type == template.type or template.type == "deb") + ): + found = True + source.template = template + break + elif ( + template.is_mirror(source.uri) + and template.match_name is not None + and source.dist is not None + and re.match(template.match_name, source.dist) + ): + found = True + source.template = template + break + return found + + +# some simple tests +if __name__ == "__main__": + apt_pkg.init_config() + sources = SourcesList() + + for entry in sources: + logging.info("entry %s" % entry.str()) + # print entry.uri + + mirror = is_mirror( + "http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu/" + ) + logging.info("is_mirror(): %s" % mirror) + + logging.info( + is_mirror( + "http://archive.ubuntu.com/ubuntu", "http://de.archive.ubuntu.com/ubuntu/" + ) + ) + logging.info( + is_mirror( + "http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu" + ) + ) |