diff options
Diffstat (limited to 'apt')
-rw-r--r-- | apt/__init__.py | 40 | ||||
-rw-r--r-- | apt/auth.py | 311 | ||||
-rw-r--r-- | apt/cache.py | 1004 | ||||
-rw-r--r-- | apt/cdrom.py | 91 | ||||
-rw-r--r-- | apt/debfile.py | 861 | ||||
-rw-r--r-- | apt/package.py | 1559 | ||||
-rw-r--r-- | apt/progress/__init__.py | 28 | ||||
-rw-r--r-- | apt/progress/base.py | 332 | ||||
-rw-r--r-- | apt/progress/text.py | 294 | ||||
-rw-r--r-- | apt/py.typed | 0 | ||||
-rw-r--r-- | apt/utils.py | 100 |
11 files changed, 4620 insertions, 0 deletions
diff --git a/apt/__init__.py b/apt/__init__.py new file mode 100644 index 0000000..f22c9a0 --- /dev/null +++ b/apt/__init__.py @@ -0,0 +1,40 @@ +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +# import the core of apt_pkg +"""High-Level Interface for working with apt.""" +import apt_pkg + +from apt.cache import Cache as Cache +from apt.cache import ProblemResolver as ProblemResolver + +# import some fancy classes +from apt.package import Package as Package +from apt.package import Version as Version + +Cache # pyflakes +ProblemResolver # pyflakes +Version # pyflakes +from apt.cdrom import Cdrom as Cdrom + +# init the package system, but do not re-initialize config +if "APT" not in apt_pkg.config: + apt_pkg.init_config() +apt_pkg.init_system() + +__all__ = ["Cache", "Cdrom", "Package"] diff --git a/apt/auth.py b/apt/auth.py new file mode 100644 index 0000000..6d50616 --- /dev/null +++ b/apt/auth.py @@ -0,0 +1,311 @@ +#!/usr/bin/python3 +# auth - authentication key management +# +# Copyright (c) 2004 Canonical +# Copyright (c) 2012 Sebastian Heinlein +# +# Author: Michael Vogt <mvo@debian.org> +# Sebastian Heinlein <devel@glatzor.de> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Handle GnuPG keys used to trust signed repositories.""" + +import errno +import os +import os.path +import shutil +import subprocess +import sys +import tempfile + +import apt_pkg +from apt_pkg import gettext as _ + + +class AptKeyError(Exception): + pass + + +class AptKeyIDTooShortError(AptKeyError): + """Internal class do not rely on it.""" + + +class TrustedKey: + + """Represents a trusted key.""" + + def __init__(self, name: str, keyid: str, date: str) -> None: + self.raw_name = name + # Allow to translated some known keys + self.name = _(name) + self.keyid = keyid + self.date = date + + def __str__(self) -> str: + return f"{self.name}\n{self.keyid} {self.date}" + + +def _call_apt_key_script(*args: str, **kwargs: str | None) -> str: + """Run the apt-key script with the given arguments.""" + conf = None + cmd = [apt_pkg.config.find_file("Dir::Bin::Apt-Key", "/usr/bin/apt-key")] + cmd.extend(args) + env = os.environ.copy() + env["LANG"] = "C" + env["APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE"] = "1" + try: + if apt_pkg.config.find_dir("Dir") != "/": + # If the key is to be installed into a chroot we have to export the + # configuration from the chroot to the apt-key script by using + # a temporary APT_CONFIG file. The apt-key script uses apt-config + # shell internally + conf = tempfile.NamedTemporaryFile(prefix="apt-key", suffix=".conf") + conf.write(apt_pkg.config.dump().encode("UTF-8")) + conf.flush() + env["APT_CONFIG"] = conf.name + proc = subprocess.Popen( + cmd, + env=env, + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdin = kwargs.get("stdin", None) + + output, stderr = proc.communicate(stdin) # type: str, str + + if proc.returncode: + raise AptKeyError( + "The apt-key script failed with return code %s:\n" + "%s\n" + "stdout: %s\n" + "stderr: %s" % (proc.returncode, " ".join(cmd), output, stderr) + ) + elif stderr: + sys.stderr.write(stderr) # Forward stderr + + return output.strip() + finally: + if conf is not None: + conf.close() + + +def add_key_from_file(filename: str) -> None: + """Import a GnuPG key file to trust repositores signed by it. + + Keyword arguments: + filename -- the absolute path to the public GnuPG key file + """ + if not os.path.abspath(filename): + raise AptKeyError("An absolute path is required: %s" % filename) + if not os.access(filename, os.R_OK): + raise AptKeyError("Key file cannot be accessed: %s" % filename) + _call_apt_key_script("add", filename) + + +def add_key_from_keyserver(keyid: str, keyserver: str) -> None: + """Import a GnuPG key file to trust repositores signed by it. + + Keyword arguments: + keyid -- the long keyid (fingerprint) of the key, e.g. + A1BD8E9D78F7FE5C3E65D8AF8B48AD6246925553 + keyserver -- the URL or hostname of the key server + """ + tmp_keyring_dir = tempfile.mkdtemp() + try: + _add_key_from_keyserver(keyid, keyserver, tmp_keyring_dir) + except Exception: + raise + finally: + # We are racing with gpg when removing sockets, so ignore + # failure to delete non-existing files. + def onerror( + func: object, path: str, exc_info: tuple[type, Exception, object] + ) -> None: + if isinstance(exc_info[1], OSError) and exc_info[1].errno == errno.ENOENT: + return + raise + + shutil.rmtree(tmp_keyring_dir, onerror=onerror) + + +def _add_key_from_keyserver(keyid: str, keyserver: str, tmp_keyring_dir: str) -> None: + if len(keyid.replace(" ", "").replace("0x", "")) < (160 / 4): + raise AptKeyIDTooShortError("Only fingerprints (v4, 160bit) are supported") + # create a temp keyring dir + tmp_secret_keyring = os.path.join(tmp_keyring_dir, "secring.gpg") + tmp_keyring = os.path.join(tmp_keyring_dir, "pubring.gpg") + # default options for gpg + gpg_default_options = [ + "gpg", + "--no-default-keyring", + "--no-options", + "--homedir", + tmp_keyring_dir, + ] + # download the key to a temp keyring first + res = subprocess.call( + gpg_default_options + + [ + "--secret-keyring", + tmp_secret_keyring, + "--keyring", + tmp_keyring, + "--keyserver", + keyserver, + "--recv", + keyid, + ] + ) + if res != 0: + raise AptKeyError(f"recv from '{keyserver}' failed for '{keyid}'") + # FIXME: + # - with gnupg 1.4.18 the downloaded key is actually checked(!), + # i.e. gnupg will not import anything that the server sends + # into the keyring, so the below checks are now redundant *if* + # gnupg 1.4.18 is used + + # now export again using the long key id (to ensure that there is + # really only this one key in our keyring) and not someone MITM us + tmp_export_keyring = os.path.join(tmp_keyring_dir, "export-keyring.gpg") + res = subprocess.call( + gpg_default_options + + [ + "--keyring", + tmp_keyring, + "--output", + tmp_export_keyring, + "--export", + keyid, + ] + ) + if res != 0: + raise AptKeyError("export of '%s' failed", keyid) + # now verify the fingerprint, this is probably redundant as we + # exported by the fingerprint in the previous command but its + # still good paranoia + output = subprocess.Popen( + gpg_default_options + + [ + "--keyring", + tmp_export_keyring, + "--fingerprint", + "--batch", + "--fixed-list-mode", + "--with-colons", + ], + stdout=subprocess.PIPE, + universal_newlines=True, + ).communicate()[0] + got_fingerprint = None + for line in output.splitlines(): + if line.startswith("fpr:"): + got_fingerprint = line.split(":")[9] + # stop after the first to ensure no subkey trickery + break + # strip the leading "0x" is there is one and uppercase (as this is + # what gnupg is using) + signing_key_fingerprint = keyid.replace("0x", "").upper() + if got_fingerprint != signing_key_fingerprint: + # make the error match what gnupg >= 1.4.18 will output when + # it checks the key itself before importing it + raise AptKeyError( + f"recv from '{keyserver}' failed for '{signing_key_fingerprint}'" + ) + # finally add it + add_key_from_file(tmp_export_keyring) + + +def add_key(content: str) -> None: + """Import a GnuPG key to trust repositores signed by it. + + Keyword arguments: + content -- the content of the GnuPG public key + """ + _call_apt_key_script("adv", "--quiet", "--batch", "--import", "-", stdin=content) + + +def remove_key(fingerprint: str) -> None: + """Remove a GnuPG key to no longer trust repositores signed by it. + + Keyword arguments: + fingerprint -- the fingerprint identifying the key + """ + _call_apt_key_script("rm", fingerprint) + + +def export_key(fingerprint: str) -> str: + """Return the GnuPG key in text format. + + Keyword arguments: + fingerprint -- the fingerprint identifying the key + """ + return _call_apt_key_script("export", fingerprint) + + +def update() -> str: + """Update the local keyring with the archive keyring and remove from + the local keyring the archive keys which are no longer valid. The + archive keyring is shipped in the archive-keyring package of your + distribution, e.g. the debian-archive-keyring package in Debian. + """ + return _call_apt_key_script("update") + + +def net_update() -> str: + """Work similar to the update command above, but get the archive + keyring from an URI instead and validate it against a master key. + This requires an installed wget(1) and an APT build configured to + have a server to fetch from and a master keyring to validate. APT + in Debian does not support this command and relies on update + instead, but Ubuntu's APT does. + """ + return _call_apt_key_script("net-update") + + +def list_keys() -> list[TrustedKey]: + """Returns a list of TrustedKey instances for each key which is + used to trust repositories. + """ + # The output of `apt-key list` is difficult to parse since the + # --with-colons parameter isn't user + output = _call_apt_key_script( + "adv", "--with-colons", "--batch", "--fixed-list-mode", "--list-keys" + ) + res = [] + for line in output.split("\n"): + fields = line.split(":") + if fields[0] == "pub": + keyid = fields[4] + if fields[0] == "uid": + uid = fields[9] + creation_date = fields[5] + key = TrustedKey(uid, keyid, creation_date) + res.append(key) + return res + + +if __name__ == "__main__": + # Add some known keys we would like to see translated so that they get + # picked up by gettext + lambda: _("Ubuntu Archive Automatic Signing Key <ftpmaster@ubuntu.com>") + lambda: _("Ubuntu CD Image Automatic Signing Key <cdimage@ubuntu.com>") + + apt_pkg.init() + for trusted_key in list_keys(): + print(trusted_key) diff --git a/apt/cache.py b/apt/cache.py new file mode 100644 index 0000000..cf78026 --- /dev/null +++ b/apt/cache.py @@ -0,0 +1,1004 @@ +# cache.py - apt cache abstraction +# +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +from __future__ import annotations + +import fnmatch +import os +import warnings +import weakref +from collections.abc import Callable, Iterator, KeysView +from typing import Any, cast + +import apt_pkg + +import apt.progress.text +from apt.package import Package, Version +from apt.progress.base import AcquireProgress, InstallProgress, OpProgress + + +class FetchCancelledException(IOError): + """Exception that is thrown when the user cancels a fetch operation.""" + + +class FetchFailedException(IOError): + """Exception that is thrown when fetching fails.""" + + +class UntrustedException(FetchFailedException): + """Exception that is thrown when fetching fails for trust reasons""" + + +class LockFailedException(IOError): + """Exception that is thrown when locking fails.""" + + +class CacheClosedException(Exception): + """Exception that is thrown when the cache is used after close().""" + + +class _WrappedLock: + """Wraps an apt_pkg.FileLock to raise LockFailedException. + + Initialized using a directory path.""" + + def __init__(self, path: str) -> None: + self._path = path + self._lock = apt_pkg.FileLock(os.path.join(path, "lock")) + + def __enter__(self) -> None: + try: + return self._lock.__enter__() + except apt_pkg.Error as e: + raise LockFailedException( + ("Failed to lock directory %s: %s") % (self._path, e) + ) + + def __exit__(self, typ: object, value: object, traceback: object) -> None: + return self._lock.__exit__(typ, value, traceback) + + +class Cache: + """Dictionary-like package cache. + + The APT cache file contains a hash table mapping names of binary + packages to their metadata. A Cache object is the in-core + representation of the same. It provides access to APTs idea of the + list of available packages. + + The cache can be used like a mapping from package names to Package + objects (although only getting items is supported). + + Keyword arguments: + progress -- a OpProgress object, + rootdir -- an alternative root directory. if that is given the system + sources.list and system lists/files are not read, only file relative + to the given rootdir, + memonly -- build the cache in memory only. + + + .. versionchanged:: 1.0 + + The cache now supports package names with special architecture + qualifiers such as :all and :native. It does not export them + in :meth:`keys()`, though, to keep :meth:`keys()` a unique set. + """ + + def __init__( + self, + progress: OpProgress | None = None, + rootdir: str | None = None, + memonly: bool = False, + ) -> None: + self._cache: apt_pkg.Cache = cast(apt_pkg.Cache, None) + self._depcache: apt_pkg.DepCache = cast(apt_pkg.DepCache, None) + self._records: apt_pkg.PackageRecords = cast( + apt_pkg.PackageRecords, None + ) # noqa + self._list: apt_pkg.SourceList = cast(apt_pkg.SourceList, None) + self._callbacks: dict[str, list[Callable[..., None] | str]] = {} # noqa + self._callbacks2: dict[ + str, list[tuple[Callable[..., Any], tuple[Any, ...], dict[Any, Any]]] + ] = {} # noqa + self._weakref: weakref.WeakValueDictionary[ + str, apt.Package + ] = weakref.WeakValueDictionary() # noqa + self._weakversions: weakref.WeakSet[Version] = weakref.WeakSet() # noqa + self._changes_count = -1 + self._sorted_set: list[str] | None = None + + self.connect("cache_post_open", "_inc_changes_count") + self.connect("cache_post_change", "_inc_changes_count") + if memonly: + # force apt to build its caches in memory + apt_pkg.config.set("Dir::Cache::pkgcache", "") + if rootdir: + rootdir = os.path.abspath(rootdir) + if os.path.exists(rootdir + "/etc/apt/apt.conf"): + apt_pkg.read_config_file(apt_pkg.config, rootdir + "/etc/apt/apt.conf") + if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"): + apt_pkg.read_config_dir(apt_pkg.config, rootdir + "/etc/apt/apt.conf.d") + apt_pkg.config.set("Dir", rootdir) + apt_pkg.config.set("Dir::State::status", rootdir + "/var/lib/dpkg/status") + # also set dpkg to the rootdir path so that its called for the + # --print-foreign-architectures call + apt_pkg.config.set( + "Dir::bin::dpkg", os.path.join(rootdir, "usr", "bin", "dpkg") + ) + # create required dirs/files when run with special rootdir + # automatically + self._check_and_create_required_dirs(rootdir) + # Call InitSystem so the change to Dir::State::Status is actually + # recognized (LP: #320665) + apt_pkg.init_system() + + # Prepare a lock object (context manager for archive lock) + archive_dir = apt_pkg.config.find_dir("Dir::Cache::Archives") + self._archive_lock = _WrappedLock(archive_dir) + + self.open(progress) + + def fix_broken(self) -> None: + """Fix broken packages.""" + self._depcache.fix_broken() + + def _inc_changes_count(self) -> None: + """Increase the number of changes""" + self._changes_count += 1 + + def _check_and_create_required_dirs(self, rootdir: str) -> None: + """ + check if the required apt directories/files are there and if + not create them + """ + files = [ + "/var/lib/dpkg/status", + "/etc/apt/sources.list", + ] + dirs = [ + "/var/lib/dpkg", + "/etc/apt/", + "/var/cache/apt/archives/partial", + "/var/lib/apt/lists/partial", + ] + for d in dirs: + if not os.path.exists(rootdir + d): + # print "creating: ", rootdir + d + os.makedirs(rootdir + d) + for f in files: + if not os.path.exists(rootdir + f): + open(rootdir + f, "w").close() + + def _run_callbacks(self, name: str) -> None: + """internal helper to run a callback""" + if name in self._callbacks: + for callback in self._callbacks[name]: + if callback == "_inc_changes_count": + self._inc_changes_count() + else: + callback() # type: ignore + + if name in self._callbacks2: + for callback, args, kwds in self._callbacks2[name]: + callback(self, *args, **kwds) + + def open(self, progress: OpProgress | None = None) -> None: + """Open the package cache, after that it can be used like + a dictionary + """ + if progress is None: + progress = apt.progress.base.OpProgress() + # close old cache on (re)open + self.close() + self.op_progress = progress + self._run_callbacks("cache_pre_open") + + self._cache = apt_pkg.Cache(progress) + self._depcache = apt_pkg.DepCache(self._cache) + self._records = apt_pkg.PackageRecords(self._cache) + self._list = apt_pkg.SourceList() + self._list.read_main_list() + self._sorted_set = None + self.__remap() + + self._have_multi_arch = len(apt_pkg.get_architectures()) > 1 + + progress.done() + self._run_callbacks("cache_post_open") + + def __remap(self) -> None: + """Called after cache reopen() to relocate to new cache. + + Relocate objects like packages and versions from the old + underlying cache to the new one. + """ + for key in list(self._weakref.keys()): + try: + pkg = self._weakref[key] + except KeyError: + continue + + try: + pkg._pkg = self._cache[pkg._pkg.name, pkg._pkg.architecture] + except LookupError: + del self._weakref[key] + + for ver in list(self._weakversions): + # Package has been reseated above, reseat version + for v in ver.package._pkg.version_list: + # Requirements as in debListParser::SameVersion + if ( + v.hash == ver._cand.hash + and (v.size == 0 or ver._cand.size == 0 or v.size == ver._cand.size) + and v.multi_arch == ver._cand.multi_arch + and v.ver_str == ver._cand.ver_str + ): + ver._cand = v + break + else: + self._weakversions.remove(ver) + + def close(self) -> None: + """Close the package cache""" + # explicitely free the FDs that _records has open + del self._records + self._records = cast(apt_pkg.PackageRecords, None) + + def __enter__(self) -> Cache: + """Enter the with statement""" + return self + + def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None: + """Exit the with statement""" + self.close() + + def __getitem__(self, key: object) -> Package: + """look like a dictionary (get key)""" + try: + key = str(key) + rawpkg = self._cache[key] + except KeyError: + raise KeyError("The cache has no package named %r" % key) + + # It might be excluded due to not having a version or something + if not self.__is_real_pkg(rawpkg): + raise KeyError("The cache has no package named %r" % key) + + pkg = self._rawpkg_to_pkg(rawpkg) + + return pkg + + def get(self, key: object, default: object = None) -> Any: + """Return *self*[*key*] or *default* if *key* not in *self*. + + .. versionadded:: 1.1 + """ + try: + return self[key] + except KeyError: + return default + + def _rawpkg_to_pkg(self, rawpkg: apt_pkg.Package) -> Package: + """Returns the apt.Package object for an apt_pkg.Package object. + + .. versionadded:: 1.0.0 + """ + fullname = rawpkg.get_fullname(pretty=True) + + return self._weakref.setdefault(fullname, Package(self, rawpkg)) + + def __iter__(self) -> Iterator[Package]: + # We iterate sorted over package names here. With this we read the + # package lists linearly if we need to access the package records, + # instead of having to do thousands of random seeks; the latter + # is disastrous if we use compressed package indexes, and slower than + # necessary for uncompressed indexes. + for pkgname in self.keys(): + pkg = Package(self, self._cache[pkgname]) + yield self._weakref.setdefault(pkgname, pkg) + + def __is_real_pkg(self, rawpkg: apt_pkg.Package) -> bool: + """Check if the apt_pkg.Package provided is a real package.""" + return rawpkg.has_versions + + def has_key(self, key: object) -> bool: + return key in self + + def __contains__(self, key: object) -> bool: + try: + return self.__is_real_pkg(self._cache[str(key)]) + except KeyError: + return False + + def __len__(self) -> int: + return len(self.keys()) + + def keys(self) -> list[str]: + if self._sorted_set is None: + self._sorted_set = sorted( + p.get_fullname(pretty=True) + for p in self._cache.packages + if self.__is_real_pkg(p) + ) + return list(self._sorted_set) # We need a copy here, caller may modify + + def get_changes(self) -> list[Package]: + """Get the marked changes""" + changes = [] + marked_keep = self._depcache.marked_keep + for rawpkg in self._cache.packages: + if not marked_keep(rawpkg): + changes.append(self._rawpkg_to_pkg(rawpkg)) + return changes + + def upgrade(self, dist_upgrade: bool = False) -> None: + """Upgrade all packages. + + If the parameter *dist_upgrade* is True, new dependencies will be + installed as well (and conflicting packages may be removed). The + default value is False. + """ + self.cache_pre_change() + self._depcache.upgrade(dist_upgrade) + self.cache_post_change() + + @property + def required_download(self) -> int: + """Get the size of the packages that are required to download.""" + if self._records is None: + raise CacheClosedException("Cache object used after close() called") + pm = apt_pkg.PackageManager(self._depcache) + fetcher = apt_pkg.Acquire() + pm.get_archives(fetcher, self._list, self._records) + return fetcher.fetch_needed + + @property + def required_space(self) -> int: + """Get the size of the additional required space on the fs.""" + return self._depcache.usr_size + + @property + def req_reinstall_pkgs(self) -> set[str]: + """Return the packages not downloadable packages in reqreinst state.""" + reqreinst = set() + get_candidate_ver = self._depcache.get_candidate_ver + states = frozenset( + (apt_pkg.INSTSTATE_REINSTREQ, apt_pkg.INSTSTATE_HOLD_REINSTREQ) + ) + for pkg in self._cache.packages: + cand = get_candidate_ver(pkg) + if cand and not cand.downloadable and pkg.inst_state in states: + reqreinst.add(pkg.get_fullname(pretty=True)) + return reqreinst + + def _run_fetcher( + self, fetcher: apt_pkg.Acquire, allow_unauthenticated: bool | None + ) -> int: + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + + untrusted = [item for item in fetcher.items if not item.is_trusted] + if untrusted and not allow_unauthenticated: + raise UntrustedException( + "Untrusted packages:\n%s" % "\n".join(i.desc_uri for i in untrusted) + ) + + # do the actual fetching + res = fetcher.run() + + # now check the result (this is the code from apt-get.cc) + failed = False + err_msg = "" + for item in fetcher.items: + if item.status == item.STAT_DONE: + continue + if item.STAT_IDLE: + continue + err_msg += f"Failed to fetch {item.desc_uri} {item.error_text}\n" + failed = True + + # we raise a exception if the download failed or it was cancelt + if res == fetcher.RESULT_CANCELLED: + raise FetchCancelledException(err_msg) + elif failed: + raise FetchFailedException(err_msg) + return res + + def _fetch_archives( + self, + fetcher: apt_pkg.Acquire, + pm: apt_pkg.PackageManager, + allow_unauthenticated: bool | None = None, + ) -> int: + """fetch the needed archives""" + if self._records is None: + raise CacheClosedException("Cache object used after close() called") + + # this may as well throw a SystemError exception + if not pm.get_archives(fetcher, self._list, self._records): + return False + + # now run the fetcher, throw exception if something fails to be + # fetched + return self._run_fetcher(fetcher, allow_unauthenticated) + + def fetch_archives( + self, + progress: AcquireProgress | None = None, + fetcher: apt_pkg.Acquire | None = None, + allow_unauthenticated: bool | None = None, + ) -> int: + """Fetch the archives for all packages marked for install/upgrade. + + You can specify either an :class:`apt.progress.base.AcquireProgress()` + object for the parameter *progress*, or specify an already + existing :class:`apt_pkg.Acquire` object for the parameter *fetcher*. + + The return value of the function is undefined. If an error occurred, + an exception of type :class:`FetchFailedException` or + :class:`FetchCancelledException` is raised. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + + .. versionadded:: 0.8.0 + """ + if progress is not None and fetcher is not None: + raise ValueError("Takes a progress or a an Acquire object") + if progress is None: + progress = apt.progress.text.AcquireProgress() + if fetcher is None: + fetcher = apt_pkg.Acquire(progress) + + with self._archive_lock: + return self._fetch_archives( + fetcher, apt_pkg.PackageManager(self._depcache), allow_unauthenticated + ) + + def is_virtual_package(self, pkgname: str) -> bool: + """Return whether the package is a virtual package.""" + try: + pkg = self._cache[pkgname] + except KeyError: + return False + else: + return bool(pkg.has_provides and not pkg.has_versions) + + def get_providing_packages( + self, + pkgname: str, + candidate_only: bool = True, + include_nonvirtual: bool = False, + ) -> list[Package]: + """Return a list of all packages providing a package. + + Return a list of packages which provide the virtual package of the + specified name. + + If 'candidate_only' is False, return all packages with at + least one version providing the virtual package. Otherwise, + return only those packages where the candidate version + provides the virtual package. + + If 'include_nonvirtual' is True then it will search for all + packages providing pkgname, even if pkgname is not itself + a virtual pkg. + """ + + providers: set[Package] = set() + get_candidate_ver = self._depcache.get_candidate_ver + try: + vp = self._cache[pkgname] + if vp.has_versions and not include_nonvirtual: + return list(providers) + except KeyError: + return list(providers) + + for provides, providesver, version in vp.provides_list: + rawpkg = version.parent_pkg + if not candidate_only or (version == get_candidate_ver(rawpkg)): + providers.add(self._rawpkg_to_pkg(rawpkg)) + return list(providers) + + def update( + self, + fetch_progress: AcquireProgress | None = None, + pulse_interval: int = 0, + raise_on_error: bool = True, + sources_list: str | None = None, + ) -> int: + """Run the equivalent of apt-get update. + + You probably want to call open() afterwards, in order to utilise the + new cache. Otherwise, the old cache will be used which can lead to + strange bugs. + + The first parameter *fetch_progress* may be set to an instance of + apt.progress.FetchProgress, the default is apt.progress.FetchProgress() + . + sources_list -- Update a alternative sources.list than the default. + Note that the sources.list.d directory is ignored in this case + """ + with _WrappedLock(apt_pkg.config.find_dir("Dir::State::Lists")): + if sources_list: + old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist") + old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts") + old_cleanup = apt_pkg.config.find("APT::List-Cleanup") + apt_pkg.config.set( + "Dir::Etc::sourcelist", os.path.abspath(sources_list) + ) + apt_pkg.config.set("Dir::Etc::sourceparts", "xxx") + apt_pkg.config.set("APT::List-Cleanup", "0") + slist = apt_pkg.SourceList() + slist.read_main_list() + else: + slist = self._list + + try: + if fetch_progress is None: + fetch_progress = apt.progress.base.AcquireProgress() + try: + res = self._cache.update(fetch_progress, slist, pulse_interval) + except SystemError as e: + raise FetchFailedException(e) + if not res and raise_on_error: + raise FetchFailedException() + else: + return res + finally: + if sources_list: + apt_pkg.config.set("Dir::Etc::sourcelist", old_sources_list) + apt_pkg.config.set("Dir::Etc::sourceparts", old_sources_list_d) + apt_pkg.config.set("APT::List-Cleanup", old_cleanup) + + def install_archives( + self, pm: apt_pkg.PackageManager, install_progress: InstallProgress + ) -> int: + """ + The first parameter *pm* refers to an object returned by + apt_pkg.PackageManager(). + + The second parameter *install_progress* refers to an InstallProgress() + object of the module apt.progress. + + This releases a system lock in newer versions, if there is any, + and reestablishes it afterwards. + """ + # compat with older API + try: + install_progress.startUpdate() # type: ignore + except AttributeError: + install_progress.start_update() + + did_unlock = apt_pkg.pkgsystem_is_locked() + if did_unlock: + apt_pkg.pkgsystem_unlock_inner() + + try: + res = install_progress.run(pm) + finally: + if did_unlock: + apt_pkg.pkgsystem_lock_inner() + + try: + install_progress.finishUpdate() # type: ignore + except AttributeError: + install_progress.finish_update() + return res + + def commit( + self, + fetch_progress: AcquireProgress | None = None, + install_progress: InstallProgress | None = None, + allow_unauthenticated: bool | None = None, + ) -> bool: + """Apply the marked changes to the cache. + + The first parameter, *fetch_progress*, refers to a FetchProgress() + object as found in apt.progress, the default being + apt.progress.FetchProgress(). + + The second parameter, *install_progress*, is a + apt.progress.InstallProgress() object. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + """ + # FIXME: + # use the new acquire/pkgmanager interface here, + # raise exceptions when a download or install fails + # and send proper error strings to the application. + # Current a failed download will just display "error" + # which is less than optimal! + + if fetch_progress is None: + fetch_progress = apt.progress.base.AcquireProgress() + if install_progress is None: + install_progress = apt.progress.base.InstallProgress() + + assert install_progress is not None + + with apt_pkg.SystemLock(): + pm = apt_pkg.PackageManager(self._depcache) + fetcher = apt_pkg.Acquire(fetch_progress) + with self._archive_lock: + while True: + # fetch archives first + res = self._fetch_archives(fetcher, pm, allow_unauthenticated) + + # then install + res = self.install_archives(pm, install_progress) + if res == pm.RESULT_COMPLETED: + break + elif res == pm.RESULT_FAILED: + raise SystemError("installArchives() failed") + elif res == pm.RESULT_INCOMPLETE: + pass + else: + raise SystemError( + "internal-error: unknown result " + "code from InstallArchives: %s" % res + ) + # reload the fetcher for media swaping + fetcher.shutdown() + return res == pm.RESULT_COMPLETED + + def clear(self) -> None: + """Unmark all changes""" + self._depcache.init() + + # cache changes + + def cache_post_change(self) -> None: + "called internally if the cache has changed, emit a signal then" + self._run_callbacks("cache_post_change") + + def cache_pre_change(self) -> None: + """called internally if the cache is about to change, emit + a signal then""" + self._run_callbacks("cache_pre_change") + + def connect(self, name: str, callback: Callable[..., None] | str) -> None: + """Connect to a signal. + + .. deprecated:: 1.0 + + Please use connect2() instead, as this function is very + likely to cause a memory leak. + """ + if callback != "_inc_changes_count": + warnings.warn( + "connect() likely causes a reference" " cycle, use connect2() instead", + RuntimeWarning, + 2, + ) + if name not in self._callbacks: + self._callbacks[name] = [] + self._callbacks[name].append(callback) + + def connect2( + self, name: str, callback: Callable[..., Any], *args: object, **kwds: object + ) -> None: + """Connect to a signal. + + The callback will be passed the cache as an argument, and + any arguments passed to this function. Make sure that, if you + pass a method of a class as your callback, your class does not + contain a reference to the cache. + + Cyclic references to the cache can cause issues if the Cache object + is replaced by a new one, because the cache keeps a lot of objects and + tens of open file descriptors. + + currently only used for cache_{post,pre}_{changed,open}. + + .. versionadded:: 1.0 + """ + if name not in self._callbacks2: + self._callbacks2[name] = [] + self._callbacks2[name].append((callback, args, kwds)) + + def actiongroup(self) -> apt_pkg.ActionGroup: + """Return an `ActionGroup` object for the current cache. + + Action groups can be used to speedup actions. The action group is + active as soon as it is created, and disabled when the object is + deleted or when release() is called. + + You can use the action group as a context manager, this is the + recommended way:: + + with cache.actiongroup(): + for package in my_selected_packages: + package.mark_install() + + This way, the action group is automatically released as soon as the + with statement block is left. It also has the benefit of making it + clear which parts of the code run with a action group and which + don't. + """ + return apt_pkg.ActionGroup(self._depcache) + + @property + def dpkg_journal_dirty(self) -> bool: + """Return True if the dpkg was interrupted + + All dpkg operations will fail until this is fixed, the action to + fix the system if dpkg got interrupted is to run + 'dpkg --configure -a' as root. + """ + dpkg_status_dir = os.path.dirname( + apt_pkg.config.find_file("Dir::State::status") + ) + for f in os.listdir(os.path.join(dpkg_status_dir, "updates")): + if fnmatch.fnmatch(f, "[0-9]*"): + return True + return False + + @property + def broken_count(self) -> int: + """Return the number of packages with broken dependencies.""" + return self._depcache.broken_count + + @property + def delete_count(self) -> int: + """Return the number of packages marked for deletion.""" + return self._depcache.del_count + + @property + def install_count(self) -> int: + """Return the number of packages marked for installation.""" + return self._depcache.inst_count + + @property + def keep_count(self) -> int: + """Return the number of packages marked as keep.""" + return self._depcache.keep_count + + +class ProblemResolver: + """Resolve problems due to dependencies and conflicts. + + The first argument 'cache' is an instance of apt.Cache. + """ + + def __init__(self, cache: Cache) -> None: + self._resolver = apt_pkg.ProblemResolver(cache._depcache) + self._cache = cache + + def clear(self, package: Package) -> None: + """Reset the package to the default state.""" + self._resolver.clear(package._pkg) + + def protect(self, package: Package) -> None: + """Protect a package so it won't be removed.""" + self._resolver.protect(package._pkg) + + def remove(self, package: Package) -> None: + """Mark a package for removal.""" + self._resolver.remove(package._pkg) + + def resolve(self) -> None: + """Resolve dependencies, try to remove packages where needed.""" + self._cache.cache_pre_change() + self._resolver.resolve() + self._cache.cache_post_change() + + def resolve_by_keep(self) -> None: + """Resolve dependencies, do not try to remove packages.""" + self._cache.cache_pre_change() + self._resolver.resolve_by_keep() + self._cache.cache_post_change() + + def keep_phased_updates(self) -> None: + """Keep back phased updates.""" + self._cache.cache_pre_change() + self._resolver.keep_phased_updates() + self._cache.cache_post_change() + + +# ----------------------------- experimental interface + + +class Filter: + """Filter base class""" + + def apply(self, pkg: Package) -> bool: + """Filter function, return True if the package matchs a + filter criteria and False otherwise + """ + return True + + +class MarkedChangesFilter(Filter): + """Filter that returns all marked changes""" + + def apply(self, pkg: Package) -> bool: + if pkg.marked_install or pkg.marked_delete or pkg.marked_upgrade: + return True + else: + return False + + +class InstalledFilter(Filter): + """Filter that returns all installed packages. + + .. versionadded:: 1.0.0 + """ + + def apply(self, pkg: Package) -> bool: + return pkg.is_installed + + +class _FilteredCacheHelper: + """Helper class for FilteredCache to break a reference cycle.""" + + def __init__(self, cache: Cache) -> None: + # Do not keep a reference to the cache, or you have a cycle! + + self._filtered: dict[str, bool] = {} + self._filters: list[Filter] = [] + cache.connect2("cache_post_change", self.filter_cache_post_change) + cache.connect2("cache_post_open", self.filter_cache_post_change) + + def _reapply_filter(self, cache: Cache) -> None: + "internal helper to refilter" + # Do not keep a reference to the cache, or you have a cycle! + self._filtered = {} + for pkg in cache: + for f in self._filters: + if f.apply(pkg): + self._filtered[pkg.name] = True + break + + def set_filter(self, filter: Filter) -> None: + """Set the current active filter.""" + self._filters = [] + self._filters.append(filter) + + def filter_cache_post_change(self, cache: Cache) -> None: + """Called internally if the cache changes, emit a signal then.""" + # Do not keep a reference to the cache, or you have a cycle! + self._reapply_filter(cache) + + +class FilteredCache: + """A package cache that is filtered. + + Can work on a existing cache or create a new one + """ + + def __init__( + self, cache: Cache | None = None, progress: OpProgress | None = None + ) -> None: + if cache is None: + self.cache = Cache(progress) + else: + self.cache = cache + self._helper = _FilteredCacheHelper(self.cache) + + def __len__(self) -> int: + return len(self._helper._filtered) + + def __getitem__(self, key: str) -> Package: + return self.cache[key] + + def __iter__(self) -> Iterator[Package]: + for pkgname in self._helper._filtered: + yield self.cache[pkgname] + + def keys(self) -> KeysView[str]: + return self._helper._filtered.keys() + + def has_key(self, key: object) -> bool: + return key in self + + def __contains__(self, key: object) -> bool: + try: + # Normalize package name for multi arch + return self.cache[key].name in self._helper._filtered + except KeyError: + return False + + def set_filter(self, filter: Filter) -> None: + """Set the current active filter.""" + self._helper.set_filter(filter) + self.cache.cache_post_change() + + def filter_cache_post_change(self) -> None: + """Called internally if the cache changes, emit a signal then.""" + self._helper.filter_cache_post_change(self.cache) + + def __getattr__(self, key: str) -> Any: + """we try to look exactly like a real cache.""" + return getattr(self.cache, key) + + +def cache_pre_changed(cache: Cache) -> None: + print("cache pre changed") + + +def cache_post_changed(cache: Cache) -> None: + print("cache post changed") + + +def _test() -> None: + """Internal test code.""" + print("Cache self test") + apt_pkg.init() + cache = Cache(apt.progress.text.OpProgress()) + cache.connect2("cache_pre_change", cache_pre_changed) + cache.connect2("cache_post_change", cache_post_changed) + print("aptitude" in cache) + pkg = cache["aptitude"] + print(pkg.name) + print(len(cache)) + + for pkgname in cache.keys(): + assert cache[pkgname].name == pkgname + + cache.upgrade() + changes = cache.get_changes() + print(len(changes)) + for pkg in changes: + assert pkg.name + + # see if fetching works + for dirname in ["/tmp/pytest", "/tmp/pytest/partial"]: + if not os.path.exists(dirname): + os.mkdir(dirname) + apt_pkg.config.set("Dir::Cache::Archives", "/tmp/pytest") + pm = apt_pkg.PackageManager(cache._depcache) + fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress()) + cache._fetch_archives(fetcher, pm, None) + # sys.exit(1) + + print("Testing filtered cache (argument is old cache)") + filtered = FilteredCache(cache) + filtered.cache.connect2("cache_pre_change", cache_pre_changed) + filtered.cache.connect2("cache_post_change", cache_post_changed) + filtered.cache.upgrade() + filtered.set_filter(MarkedChangesFilter()) + print(len(filtered)) + for pkgname in filtered.keys(): + assert pkgname == filtered[pkgname].name + + print(len(filtered)) + + print("Testing filtered cache (no argument)") + filtered = FilteredCache(progress=apt.progress.base.OpProgress()) + filtered.cache.connect2("cache_pre_change", cache_pre_changed) + filtered.cache.connect2("cache_post_change", cache_post_changed) + filtered.cache.upgrade() + filtered.set_filter(MarkedChangesFilter()) + print(len(filtered)) + for pkgname in filtered.keys(): + assert pkgname == filtered[pkgname].name + + print(len(filtered)) + + +if __name__ == "__main__": + _test() diff --git a/apt/cdrom.py b/apt/cdrom.py new file mode 100644 index 0000000..dc15c5b --- /dev/null +++ b/apt/cdrom.py @@ -0,0 +1,91 @@ +# cdrom.py - CDROM handling +# +# Copyright (c) 2005-2009 Canonical +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Classes related to cdrom handling.""" +import glob + +import apt_pkg + +from apt.progress.base import CdromProgress + + +class Cdrom(apt_pkg.Cdrom): + """Support for apt-cdrom like features. + + This class has several optional parameters for initialisation, which may + be used to influence the behaviour of the object: + + The optional parameter `progress` is a CdromProgress() subclass, which will + ask for the correct cdrom, etc. If not specified or None, a CdromProgress() + object will be used. + + The optional parameter `mountpoint` may be used to specify an alternative + mountpoint. + + If the optional parameter `nomount` is True, the cdroms will not be + mounted. This is the default behaviour. + """ + + def __init__( + self, + progress: CdromProgress | None = None, + mountpoint: str | None = None, + nomount: bool = True, + ) -> None: + apt_pkg.Cdrom.__init__(self) + if progress is None: + self._progress = CdromProgress() + else: + self._progress = progress + # see if we have a alternative mountpoint + if mountpoint is not None: + apt_pkg.config.set("Acquire::cdrom::mount", mountpoint) + # do not mess with mount points by default + if nomount: + apt_pkg.config.set("APT::CDROM::NoMount", "true") + else: + apt_pkg.config.set("APT::CDROM::NoMount", "false") + + def add(self, progress: CdromProgress | None = None) -> bool: + """Add cdrom to the sources.list.""" + return apt_pkg.Cdrom.add(self, progress or self._progress) + + def ident(self, progress: CdromProgress | None = None) -> str: + """Identify the cdrom.""" + return apt_pkg.Cdrom.ident(self, progress or self._progress) + + @property + def in_sources_list(self) -> bool: + """Check if the cdrom is already in the current sources.list.""" + cd_id = self.ident() + if cd_id is None: + # FIXME: throw exception instead + return False + # Get a list of files + src = glob.glob(apt_pkg.config.find_dir("Dir::Etc::sourceparts") + "*") + src.append(apt_pkg.config.find_file("Dir::Etc::sourcelist")) + # Check each file + for fname in src: + with open(fname) as fobj: + for line in fobj: + if not line.lstrip().startswith("#") and cd_id in line: + return True + return False diff --git a/apt/debfile.py b/apt/debfile.py new file mode 100644 index 0000000..b3ef733 --- /dev/null +++ b/apt/debfile.py @@ -0,0 +1,861 @@ +# Copyright (c) 2005-2010 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Classes for working with locally available Debian packages.""" + +import gzip +import os +import sys +from collections.abc import Iterable +from io import BytesIO +from typing import cast + +import apt_inst +import apt_pkg +from apt_pkg import gettext as _ + +import apt + + +class NoDebArchiveException(IOError): + """Exception which is raised if a file is no Debian archive.""" + + +class DebPackage: + """A Debian Package (.deb file).""" + + # Constants for comparing the local package file with the version + # in the cache + (VERSION_NONE, VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER) = range(4) + + debug = 0 + + def __init__( + self, filename: str | None = None, cache: apt.Cache | None = None + ) -> None: + if cache is None: + cache = apt.Cache() + self._cache = cache + self._debfile = cast(apt_inst.DebFile, None) + self.pkgname = "" + self.filename: str | None = None + self._sections: dict[str, str] | apt_pkg.TagSection[str] = {} # noqa + self._need_pkgs: list[str] = [] + self._check_was_run = False + self._failure_string = "" + self._multiarch: str | None = None + if filename: + self.open(filename) + + def open(self, filename: str) -> None: + """open given debfile""" + self._dbg(3, "open '%s'" % filename) + self._need_pkgs = [] + self._installed_conflicts: set[str] = set() + self._failure_string = "" + self.filename = filename + self._debfile = apt_inst.DebFile(self.filename) + control = self._debfile.control.extractdata("control") + self._sections = apt_pkg.TagSection(control) + self.pkgname = self._sections["Package"] + self._check_was_run = False + + def __getitem__(self, key: str) -> str: + return self._sections[key] + + def __contains__(self, key: str) -> bool: + return key in self._sections + + @property + def filelist(self) -> list[str]: + """return the list of files in the deb.""" + files = [] + try: + self._debfile.data.go(lambda item, data: files.append(item.name)) + except SystemError: + return [_("List of files for '%s' could not be read") % self.filename] + return files + + @property + def control_filelist(self) -> list[str]: + """return the list of files in control.tar.gz""" + control = [] + try: + self._debfile.control.go(lambda item, data: control.append(item.name)) + except SystemError: + return [ + _("List of control files for '%s' could not be read") % self.filename + ] + return sorted(control) + + # helper that will return a pkgname with a multiarch suffix if needed + def _maybe_append_multiarch_suffix( + self, pkgname: str, in_conflict_checking: bool = False + ) -> str: + # trivial cases + if ":" in pkgname: + return pkgname + if not self._multiarch: + return pkgname + elif self._cache.is_virtual_package(pkgname): + return pkgname + elif ( + pkgname in self._cache + and self._cache[pkgname].candidate is not None + and cast(apt.package.Version, self._cache[pkgname].candidate).architecture + == "all" + ): + return pkgname + # now do the real multiarch checking + multiarch_pkgname = f"{pkgname}:{self._multiarch}" + # the upper layers will handle this + if multiarch_pkgname not in self._cache: + return multiarch_pkgname + multiarch_pkg = self._cache[multiarch_pkgname] + if multiarch_pkg.candidate is None: + return multiarch_pkgname + # now check the multiarch state + cand = multiarch_pkg.candidate._cand + # print pkgname, multiarch_pkgname, cand.multi_arch + # the default is to add the suffix, unless its a pkg that can satify + # foreign dependencies + if cand.multi_arch & cand.MULTI_ARCH_FOREIGN: + return pkgname + # for conflicts we need a special case here, any not multiarch enabled + # package has a implicit conflict + if in_conflict_checking and not (cand.multi_arch & cand.MULTI_ARCH_SAME): + return pkgname + return multiarch_pkgname + + def _is_or_group_satisfied(self, or_group: list[tuple[str, str, str]]) -> bool: + """Return True if at least one dependency of the or-group is satisfied. + + This method gets an 'or_group' and analyzes if at least one dependency + of this group is already satisfied. + """ + self._dbg(2, "_checkOrGroup(): %s " % (or_group)) + + for dep in or_group: + depname = dep[0] + ver = dep[1] + oper = dep[2] + + # multiarch + depname = self._maybe_append_multiarch_suffix(depname) + + # check for virtual pkgs + if depname not in self._cache: + if self._cache.is_virtual_package(depname): + self._dbg( + 3, "_is_or_group_satisfied(): %s is virtual dep" % depname + ) + for pkg in self._cache.get_providing_packages(depname): + if pkg.is_installed: + return True + continue + # check real dependency + inst = self._cache[depname].installed + if inst is not None and apt_pkg.check_dep(inst.version, oper, ver): + return True + + # if no real dependency is installed, check if there is + # a package installed that provides this dependency + # (e.g. scrollkeeper dependecies are provided by rarian-compat) + # but only do that if there is no version required in the + # dependency (we do not supprot versionized dependencies) + if not oper: + for ppkg in self._cache.get_providing_packages( + depname, include_nonvirtual=True + ): + if ppkg.is_installed: + self._dbg( + 3, + "found installed '%s' that provides '%s'" + % (ppkg.name, depname), + ) + return True + return False + + def _satisfy_or_group(self, or_group: list[tuple[str, str, str]]) -> bool: + """Try to satisfy the or_group.""" + for dep in or_group: + depname, ver, oper = dep + + # multiarch + depname = self._maybe_append_multiarch_suffix(depname) + + # if we don't have it in the cache, it may be virtual + if depname not in self._cache: + if not self._cache.is_virtual_package(depname): + continue + providers = self._cache.get_providing_packages(depname) + # if a package just has a single virtual provider, we + # just pick that (just like apt) + if len(providers) != 1: + continue + depname = providers[0].name + + # now check if we can satisfy the deps with the candidate(s) + # in the cache + pkg = self._cache[depname] + cand = self._cache._depcache.get_candidate_ver(pkg._pkg) + if not cand: + continue + if not apt_pkg.check_dep(cand.ver_str, oper, ver): + continue + + # check if we need to install it + self._dbg(2, "Need to get: %s" % depname) + self._need_pkgs.append(depname) + return True + + # if we reach this point, we failed + or_str = "" + for dep in or_group: + or_str += dep[0] + if ver and oper: + or_str += f" ({dep[2]} {dep[1]})" + if dep != or_group[len(or_group) - 1]: + or_str += "|" + self._failure_string += _("Dependency is not satisfiable: %s\n") % or_str + return False + + def _check_single_pkg_conflict(self, pkgname: str, ver: str, oper: str) -> bool: + """Return True if a pkg conflicts with a real installed/marked pkg.""" + # FIXME: deal with conflicts against its own provides + # (e.g. Provides: ftp-server, Conflicts: ftp-server) + self._dbg( + 3, + "_check_single_pkg_conflict() pkg='%s' ver='%s' oper='%s'" + % (pkgname, ver, oper), + ) + pkg = self._cache[pkgname] + if pkg.is_installed: + assert pkg.installed is not None + pkgver = pkg.installed.version + elif pkg.marked_install: + assert pkg.candidate is not None + pkgver = pkg.candidate.version + else: + return False + # print "pkg: %s" % pkgname + # print "ver: %s" % ver + # print "pkgver: %s " % pkgver + # print "oper: %s " % oper + if apt_pkg.check_dep(pkgver, oper, ver) and not self.replaces_real_pkg( + pkgname, oper, ver + ): + self._failure_string += ( + _("Conflicts with the installed package " "'%s'") % pkg.name + ) + self._dbg(3, "conflicts with installed pkg '%s'" % pkg.name) + return True + return False + + def _check_conflicts_or_group(self, or_group: list[tuple[str, str, str]]) -> bool: + """Check the or-group for conflicts with installed pkgs.""" + self._dbg(2, "_check_conflicts_or_group(): %s " % (or_group)) + for dep in or_group: + depname = dep[0] + ver = dep[1] + oper = dep[2] + + # FIXME: is this good enough? i.e. will apt always populate + # the cache with conflicting pkgnames for our arch? + depname = self._maybe_append_multiarch_suffix( + depname, in_conflict_checking=True + ) + + # check conflicts with virtual pkgs + if depname not in self._cache: + # FIXME: we have to check for virtual replaces here as + # well (to pass tests/gdebi-test8.deb) + if self._cache.is_virtual_package(depname): + for pkg in self._cache.get_providing_packages(depname): + self._dbg(3, "conflicts virtual check: %s" % pkg.name) + # P/C/R on virtal pkg, e.g. ftpd + if self.pkgname == pkg.name: + self._dbg(3, "conflict on self, ignoring") + continue + if self._check_single_pkg_conflict(pkg.name, ver, oper): + self._installed_conflicts.add(pkg.name) + continue + if self._check_single_pkg_conflict(depname, ver, oper): + self._installed_conflicts.add(depname) + return bool(self._installed_conflicts) + + @property + def conflicts(self) -> list[list[tuple[str, str, str]]]: + """List of packages conflicting with this package.""" + key = "Conflicts" + try: + return apt_pkg.parse_depends(self._sections[key], False) + except KeyError: + return [] + + @property + def depends(self) -> list[list[tuple[str, str, str]]]: + """List of packages on which this package depends on.""" + depends = [] + # find depends + for key in "Depends", "Pre-Depends": + try: + depends.extend(apt_pkg.parse_depends(self._sections[key], False)) + except KeyError: + pass + return depends + + @property + def provides(self) -> list[list[tuple[str, str, str]]]: + """List of virtual packages which are provided by this package.""" + key = "Provides" + try: + return apt_pkg.parse_depends(self._sections[key], False) + except KeyError: + return [] + + @property + def replaces(self) -> list[list[tuple[str, str, str]]]: + """List of packages which are replaced by this package.""" + key = "Replaces" + try: + return apt_pkg.parse_depends(self._sections[key], False) + except KeyError: + return [] + + def replaces_real_pkg(self, pkgname: str, oper: str, ver: str) -> bool: + """Return True if a given non-virtual package is replaced. + + Return True if the deb packages replaces a real (not virtual) + packages named (pkgname, oper, ver). + """ + self._dbg(3, f"replaces_real_pkg() {pkgname} {oper} {ver}") + pkg = self._cache[pkgname] + pkgver: str | None = None + if pkg.is_installed: + assert pkg.installed is not None + pkgver = pkg.installed.version + elif pkg.marked_install: + assert pkg.candidate is not None + pkgver = pkg.candidate.version + else: + pkgver = None + for or_group in self.replaces: + for name, ver, oper in or_group: + if name == pkgname and ( + pkgver is None or apt_pkg.check_dep(pkgver, oper, ver) + ): + self._dbg( + 3, + "we have a replaces in our package for the " + "conflict against '%s'" % (pkgname), + ) + return True + return False + + def check_conflicts(self) -> bool: + """Check if there are conflicts with existing or selected packages. + + Check if the package conflicts with a existing or to be installed + package. Return True if the pkg is OK. + """ + res = True + for or_group in self.conflicts: + if self._check_conflicts_or_group(or_group): + # print "Conflicts with a exisiting pkg!" + # self._failure_string = "Conflicts with a exisiting pkg!" + res = False + return res + + def check_breaks_existing_packages(self) -> bool: + """ + check if installing the package would break exsisting + package on the system, e.g. system has: + smc depends on smc-data (= 1.4) + and user tries to installs smc-data 1.6 + """ + # show progress information as this step may take some time + size = float(len(self._cache)) + steps = max(int(size / 50), 1) + debver = self._sections["Version"] + debarch = self._sections["Architecture"] + # store what we provide so that we can later check against that + provides = [x[0][0] for x in self.provides] + for i, pkg in enumerate(self._cache): + if i % steps == 0: + self._cache.op_progress.update(float(i) / size * 100.0) + if not pkg.is_installed: + continue + assert pkg.installed is not None + # check if the exising dependencies are still satisfied + # with the package + ver = pkg._pkg.current_ver + for dep_or in pkg.installed.dependencies: + for dep in dep_or.or_dependencies: + if dep.name == self.pkgname: + if not apt_pkg.check_dep(debver, dep.relation, dep.version): + self._dbg(2, "would break (depends) %s" % pkg.name) + # TRANSLATORS: the first '%s' is the package that + # breaks, the second the dependency that makes it + # break, the third the relation (e.g. >=) and the + # latest the version for the releation + self._failure_string += _( + "Breaks existing package '%(pkgname)s' " + "dependency %(depname)s " + "(%(deprelation)s %(depversion)s)" + ) % { + "pkgname": pkg.name, + "depname": dep.name, + "deprelation": dep.relation, + "depversion": dep.version, + } + self._cache.op_progress.done() + return False + # now check if there are conflicts against this package on + # the existing system + if "Conflicts" in ver.depends_list: + for conflicts_ver_list in ver.depends_list["Conflicts"]: + for c_or in conflicts_ver_list: + if ( + c_or.target_pkg.name == self.pkgname + and c_or.target_pkg.architecture == debarch + ): + if apt_pkg.check_dep( + debver, c_or.comp_type, c_or.target_ver + ): + self._dbg(2, "would break (conflicts) %s" % pkg.name) + # TRANSLATORS: the first '%s' is the package + # that conflicts, the second the packagename + # that it conflicts with (so the name of the + # deb the user tries to install), the third is + # the relation (e.g. >=) and the last is the + # version for the relation + self._failure_string += _( + "Breaks existing package '%(pkgname)s' " + "conflict: %(targetpkg)s " + "(%(comptype)s %(targetver)s)" + ) % { + "pkgname": pkg.name, + "targetpkg": c_or.target_pkg.name, + "comptype": c_or.comp_type, + "targetver": c_or.target_ver, + } + self._cache.op_progress.done() + return False + if ( + c_or.target_pkg.name in provides + and self.pkgname != pkg.name + ): + self._dbg(2, "would break (conflicts) %s" % provides) + self._failure_string += _( + "Breaks existing package '%(pkgname)s' " + "that conflict: '%(targetpkg)s'. But the " + "'%(debfile)s' provides it via: " + "'%(provides)s'" + ) % { + "provides": ",".join(provides), + "debfile": self.filename, + "targetpkg": c_or.target_pkg.name, + "pkgname": pkg.name, + } + self._cache.op_progress.done() + return False + self._cache.op_progress.done() + return True + + def compare_to_version_in_cache(self, use_installed: bool = True) -> int: + """Compare the package to the version available in the cache. + + Checks if the package is already installed or availabe in the cache + and if so in what version, returns one of (VERSION_NONE, + VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER). + """ + self._dbg(3, "compare_to_version_in_cache") + pkgname = self._sections["Package"] + architecture = self._sections["Architecture"] + + # Arch qualify the package name + pkgname = ":".join([pkgname, architecture]) + + debver = self._sections["Version"] + self._dbg(1, "debver: %s" % debver) + if pkgname in self._cache: + pkg = self._cache[pkgname] + if use_installed and pkg.installed is not None: + cachever = pkg.installed.version + elif not use_installed and pkg.candidate is not None: + cachever = pkg.candidate.version + else: + return self.VERSION_NONE + if cachever is not None: + cmp = apt_pkg.version_compare(cachever, debver) + self._dbg(1, "CompareVersion(debver,instver): %s" % cmp) + if cmp == 0: + return self.VERSION_SAME + elif cmp < 0: + return self.VERSION_NEWER + elif cmp > 0: + return self.VERSION_OUTDATED + return self.VERSION_NONE + + def check(self, allow_downgrade: bool = False) -> bool: + """Check if the package is installable.""" + self._dbg(3, "check") + + self._check_was_run = True + + # check arch + if "Architecture" not in self._sections: + self._dbg(1, "ERROR: no architecture field") + self._failure_string = _("No Architecture field in the package") + return False + arch = self._sections["Architecture"] + if arch != "all" and arch != apt_pkg.config.find("APT::Architecture"): + if arch in apt_pkg.get_architectures(): + self._multiarch = arch + self.pkgname = f"{self.pkgname}:{self._multiarch}" + self._dbg(1, "Found multiarch arch: '%s'" % arch) + else: + self._dbg(1, "ERROR: Wrong architecture dude!") + self._failure_string = ( + _( + "Wrong architecture '%s' " + "-- Run dpkg --add-architecture to " + "add it and update afterwards" + ) + % arch + ) + return False + + # check version + if ( + not allow_downgrade + and self.compare_to_version_in_cache() == self.VERSION_OUTDATED + ): + if self._cache[self.pkgname].installed: + # the deb is older than the installed + self._failure_string = _("A later version is already installed") + return False + + # FIXME: this sort of error handling sux + self._failure_string = "" + + # check conflicts + if not self.check_conflicts(): + return False + + # check if installing it would break anything on the + # current system + if not self.check_breaks_existing_packages(): + return False + + # try to satisfy the dependencies + if not self._satisfy_depends(self.depends): + return False + + # check for conflicts again (this time with the packages that are + # makeed for install) + if not self.check_conflicts(): + return False + + if self._cache._depcache.broken_count > 0: + self._failure_string = _( + "Failed to satisfy all dependencies " "(broken cache)" + ) + # clean the cache again + self._cache.clear() + return False + return True + + def satisfy_depends_str(self, dependsstr: str) -> bool: + """Satisfy the dependencies in the given string.""" + return self._satisfy_depends(apt_pkg.parse_depends(dependsstr, False)) + + def _satisfy_depends(self, depends: list[list[tuple[str, str, str]]]) -> bool: + """Satisfy the dependencies.""" + # turn off MarkAndSweep via a action group (if available) + try: + _actiongroup = apt_pkg.ActionGroup(self._cache._depcache) + _actiongroup # pyflakes + except AttributeError: + pass + # check depends + for or_group in depends: + if not self._is_or_group_satisfied(or_group): + if not self._satisfy_or_group(or_group): + return False + # now try it out in the cache + for pkg in self._need_pkgs: + try: + self._cache[pkg].mark_install(from_user=False) + except SystemError: + self._failure_string = _("Cannot install '%s'") % pkg + self._cache.clear() + return False + return True + + @property + def missing_deps(self) -> list[str]: + """Return missing dependencies.""" + self._dbg(1, "Installing: %s" % self._need_pkgs) + if not self._check_was_run: + raise AttributeError("property only available after check() was run") + return self._need_pkgs + + @property + def required_changes(self) -> tuple[list[str], list[str], list[str]]: + """Get the changes required to satisfy the dependencies. + + Returns: a tuple with (install, remove, unauthenticated) + """ + install = [] + remove = [] + unauthenticated = [] + if not self._check_was_run: + raise AttributeError("property only available after check() was run") + for pkg in self._cache: + if pkg.marked_install or pkg.marked_upgrade: + assert pkg.candidate is not None + install.append(pkg.name) + # check authentication, one authenticated origin is enough + # libapt will skip non-authenticated origins then + authenticated = False + for origin in pkg.candidate.origins: + authenticated |= origin.trusted + if not authenticated: + unauthenticated.append(pkg.name) + if pkg.marked_delete: + remove.append(pkg.name) + return (install, remove, unauthenticated) + + @staticmethod + def to_hex(in_data: str) -> str: + hex = "" + for i, c in enumerate(in_data): + if i % 80 == 0: + hex += "\n" + hex += "%2.2x " % ord(c) + return hex + + @staticmethod + def to_strish(in_data: str | Iterable[int]) -> str: + s = "" + # py2 compat, in_data is type string + if isinstance(in_data, str): + for c in in_data: + if ord(c) < 10 or ord(c) > 127: + s += " " + else: + s += c + # py3 compat, in_data is type bytes + else: + for b in in_data: + if b < 10 or b > 127: + s += " " + else: + s += chr(b) + return s + + def _get_content( + self, + part: apt_inst.TarFile, + name: str, + auto_decompress: bool = True, + auto_hex: bool = True, + ) -> str: + if name.startswith("./"): + name = name[2:] + data = part.extractdata(name) + # check for zip content + if name.endswith(".gz") and auto_decompress: + io = BytesIO(data) + gz = gzip.GzipFile(fileobj=io) + data = _("Automatically decompressed:\n\n").encode("utf-8") + data += gz.read() + # auto-convert to hex + try: + return data.decode("utf-8") + except Exception: + new_data = _("Automatically converted to printable ascii:\n") + new_data += self.to_strish(data) + return new_data + + def control_content(self, name: str) -> str: + """return the content of a specific control.tar.gz file""" + try: + return self._get_content(self._debfile.control, name) + except LookupError: + return "" + + def data_content(self, name: str) -> str: + """return the content of a specific control.tar.gz file""" + try: + return self._get_content(self._debfile.data, name) + except LookupError: + return "" + + def _dbg(self, level: int, msg: str) -> None: + """Write debugging output to sys.stderr.""" + if level <= self.debug: + print(msg, file=sys.stderr) + + def install( + self, install_progress: apt.progress.base.InstallProgress | None = None + ) -> int: + """Install the package.""" + if self.filename is None: + raise apt_pkg.Error("No filename specified") + if install_progress is None: + return os.spawnlp(os.P_WAIT, "dpkg", "dpkg", "-i", self.filename) + else: + try: + install_progress.start_update() + except AttributeError: + install_progress.startUpdate() # type: ignore + res = install_progress.run(self.filename) + try: + install_progress.finish_update() + except AttributeError: + install_progress.finishUpdate() # type: ignore + return res + + +class DscSrcPackage(DebPackage): + """A locally available source package.""" + + def __init__( + self, filename: str | None = None, cache: apt.Cache | None = None + ) -> None: + DebPackage.__init__(self, None, cache) + self.filename: str | None = filename + self._depends: list[list[tuple[str, str, str]]] = [] + self._conflicts: list[list[tuple[str, str, str]]] = [] + self._installed_conflicts: set[str] = set() + self.pkgname = "" + self.binaries: list[str] = [] + self._sections: dict[str, str] = {} + if self.filename is not None: + self.open(self.filename) + + @property + def depends(self) -> list[list[tuple[str, str, str]]]: + """Return the dependencies of the package""" + return self._depends + + @property + def conflicts(self) -> list[list[tuple[str, str, str]]]: + """Return the dependencies of the package""" + return self._conflicts + + @property + def filelist(self) -> list[str]: + """Return the list of files associated with this dsc file""" + # Files stanza looks like (hash, size, filename, ...) + return self._sections["Files"].split()[2::3] + + def open(self, file: str) -> None: + """Open the package.""" + depends_tags = ["Build-Depends", "Build-Depends-Indep"] + conflicts_tags = ["Build-Conflicts", "Build-Conflicts-Indep"] + fd = apt_pkg.open_maybe_clear_signed_file(file) + fobj = os.fdopen(fd) + tagfile = apt_pkg.TagFile(fobj) + try: + for sec in tagfile: + for tag in depends_tags: + if tag not in sec: + continue + self._depends.extend(apt_pkg.parse_src_depends(sec[tag])) + for tag in conflicts_tags: + if tag not in sec: + continue + self._conflicts.extend(apt_pkg.parse_src_depends(sec[tag])) + if "Source" in sec: + self.pkgname = sec["Source"] + if "Binary" in sec: + self.binaries = [b.strip() for b in sec["Binary"].split(",")] + for tag in sec.keys(): + if tag in sec: + self._sections[tag] = sec[tag] + finally: + del tagfile + fobj.close() + + s = _( + "Install Build-Dependencies for " "source package '%s' that builds %s\n" + ) % (self.pkgname, " ".join(self.binaries)) + self._sections["Description"] = s + self._check_was_run = False + + def check(self, allow_downgrade: bool = False) -> bool: + """Check if the package is installable. + + The second parameter is ignored and only exists for compatibility + with parent type.""" + if not self.check_conflicts(): + for pkgname in self._installed_conflicts: + if self._cache[pkgname]._pkg.essential: + raise Exception(_("An essential package would be removed")) + self._cache[pkgname].mark_delete() + # properties are ok now + self._check_was_run = True + # FIXME: a additional run of the check_conflicts() + # after _satisfy_depends() should probably be done + return self._satisfy_depends(self.depends) + + +def _test() -> None: + """Test function""" + from apt.cache import Cache + from apt.progress.base import InstallProgress + + cache = Cache() + + vp = "www-browser" + print(f"{vp} virtual: {cache.is_virtual_package(vp)}") + providers = cache.get_providing_packages(vp) + print("Providers for %s :" % vp) + for pkg in providers: + print(" %s" % pkg.name) + + d = DebPackage(sys.argv[1], cache) + print("Deb: %s" % d.pkgname) + if not d.check(): + print("can't be satified") + print(d._failure_string) + print("missing deps: %s" % d.missing_deps) + print(d.required_changes) + + print(d.filelist) + + print("Installing ...") + ret = d.install(InstallProgress()) + print(ret) + + # s = DscSrcPackage(cache, "../tests/3ddesktop_0.2.9-6.dsc") + # s.check_dep() + # print "Missing deps: ",s.missingDeps + # print "Print required changes: ", s.requiredChanges + + s = DscSrcPackage(cache=cache) + ds = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)" + print(s._satisfy_depends(apt_pkg.parse_depends(ds, False))) + + +if __name__ == "__main__": + _test() diff --git a/apt/package.py b/apt/package.py new file mode 100644 index 0000000..50ed6d1 --- /dev/null +++ b/apt/package.py @@ -0,0 +1,1559 @@ +# package.py - apt package abstraction +# +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Functionality related to packages.""" +from __future__ import annotations + +import logging +import os +import re +import socket +import subprocess +import sys +import threading +from collections.abc import Iterable, Iterator, Mapping, Sequence +from http.client import BadStatusLine +from typing import Any, no_type_check +from urllib.error import HTTPError +from urllib.request import urlopen + +import apt_pkg +from apt_pkg import gettext as _ + +import apt.progress.text +from apt.progress.base import AcquireProgress, InstallProgress + +__all__ = ( + "BaseDependency", + "Dependency", + "Origin", + "Package", + "Record", + "Version", + "VersionList", +) + + +def _file_is_same(path: str, size: int, hashes: apt_pkg.HashStringList) -> bool: + """Return ``True`` if the file is the same.""" + if os.path.exists(path) and os.path.getsize(path) == size: + with open(path) as fobj: + return apt_pkg.Hashes(fobj).hashes == hashes + return False + + +class FetchError(Exception): + """Raised when a file could not be fetched.""" + + +class UntrustedError(FetchError): + """Raised when a file did not have a trusted hash.""" + + +class BaseDependency: + """A single dependency.""" + + class __dstr(str): + """Compare helper for compatibility with old third-party code. + + Old third-party code might still compare the relation with the + previously used relations (<<,<=,==,!=,>=,>>,) instead of the curently + used ones (<,<=,=,!=,>=,>,). This compare helper lets < match to <<, + > match to >> and = match to ==. + """ + + def __eq__(self, other: object) -> bool: + if str.__eq__(self, other): + return True + elif str.__eq__(self, "<"): + return str.__eq__("<<", other) + elif str.__eq__(self, ">"): + return str.__eq__(">>", other) + elif str.__eq__(self, "="): + return str.__eq__("==", other) + else: + return False + + def __ne__(self, other: object) -> bool: + return not self.__eq__(other) + + def __init__(self, version: Version, dep: apt_pkg.Dependency) -> None: + self._version = version # apt.package.Version + self._dep = dep # apt_pkg.Dependency + + def __str__(self) -> str: + return f"{self.rawtype}: {self.rawstr}" + + def __repr__(self) -> str: + return "<BaseDependency: name:{!r} relation:{!r} version:{!r} rawtype:{!r}>".format( + self.name, + self.relation, + self.version, + self.rawtype, + ) + + @property + def name(self) -> str: + """The name of the target package.""" + return self._dep.target_pkg.name + + @property + def relation(self) -> str: + """The relation (<, <=, =, !=, >=, >, '') in mathematical notation. + + The empty string will be returned in case of an unversioned dependency. + """ + return self.__dstr(self._dep.comp_type) + + @property + def relation_deb(self) -> str: + """The relation (<<, <=, =, !=, >=, >>, '') in Debian notation. + + The empty string will be returned in case of an unversioned dependency. + For more details see the Debian Policy Manual on the syntax of + relationship fields: + https://www.debian.org/doc/debian-policy/ch-relationships.html#s-depsyntax # noqa + + .. versionadded:: 1.0.0 + """ + return self._dep.comp_type_deb + + @property + def version(self) -> str: + """The target version or an empty string. + + Note that the version is only an empty string in case of an unversioned + dependency. In this case the relation is also an empty string. + """ + return self._dep.target_ver + + @property + def target_versions(self) -> list[Version]: + """A list of all Version objects which satisfy this dependency. + + .. versionadded:: 1.0.0 + """ + tvers = [] + _tvers: list[apt_pkg.Version] = self._dep.all_targets() + for _tver in _tvers: # type: apt_pkg.Version + _pkg: apt_pkg.Package = _tver.parent_pkg + cache = self._version.package._pcache # apt.cache.Cache + pkg = cache._rawpkg_to_pkg(_pkg) # apt.package.Package + tver = Version(pkg, _tver) # apt.package.Version + tvers.append(tver) + return tvers + + @property + def installed_target_versions(self) -> list[Version]: + """A list of all installed Version objects which satisfy this dep. + + .. versionadded:: 1.0.0 + """ + return [tver for tver in self.target_versions if tver.is_installed] + + @property + def rawstr(self) -> str: + """String represenation of the dependency. + + Returns the string representation of the dependency as it would be + written in the debian/control file. The string representation does not + include the type of the dependency. + + Example for an unversioned dependency: + python3 + + Example for a versioned dependency: + python3 >= 3.2 + + .. versionadded:: 1.0.0 + """ + if self.version: + return f"{self.name} {self.relation_deb} {self.version}" + else: + return self.name + + @property + def rawtype(self) -> str: + """Type of the dependency. + + This should be one of 'Breaks', 'Conflicts', 'Depends', 'Enhances', + 'PreDepends', 'Recommends', 'Replaces', 'Suggests'. + + Additional types might be added in the future. + """ + return self._dep.dep_type_untranslated + + @property + def pre_depend(self) -> bool: + """Whether this is a PreDepends.""" + return self._dep.dep_type_untranslated == "PreDepends" + + +class Dependency(list[BaseDependency]): + """Represent an Or-group of dependencies. + + Attributes defined here: + or_dependencies - The possible choices + rawstr - String represenation of the Or-group of dependencies + rawtype - The type of the dependencies in the Or-group + target_version - A list of Versions which satisfy this Or-group of deps + """ + + def __init__( + self, version: Version, base_deps: list[BaseDependency], rawtype: str + ) -> None: + super().__init__(base_deps) + self._version = version # apt.package.Version + self._rawtype = rawtype + + def __str__(self) -> str: + return f"{self.rawtype}: {self.rawstr}" + + def __repr__(self) -> str: + return "<Dependency: [%s]>" % (", ".join(repr(bd) for bd in self)) + + @property + def or_dependencies(self) -> Dependency: + return self + + @property + def rawstr(self) -> str: + """String represenation of the Or-group of dependencies. + + Returns the string representation of the Or-group of dependencies as it + would be written in the debian/control file. The string representation + does not include the type of the Or-group of dependencies. + + Example: + python2 >= 2.7 | python3 + + .. versionadded:: 1.0.0 + """ + return " | ".join(bd.rawstr for bd in self) + + @property + def rawtype(self) -> str: + """Type of the Or-group of dependency. + + This should be one of 'Breaks', 'Conflicts', 'Depends', 'Enhances', + 'PreDepends', 'Recommends', 'Replaces', 'Suggests'. + + Additional types might be added in the future. + + .. versionadded:: 1.0.0 + """ + return self._rawtype + + @property + def target_versions(self) -> list[Version]: + """A list of all Version objects which satisfy this Or-group of deps. + + .. versionadded:: 1.0.0 + """ + tvers: list[Version] = [] + for bd in self: # apt.package.Dependency + for tver in bd.target_versions: # apt.package.Version + if tver not in tvers: + tvers.append(tver) + return tvers + + @property + def installed_target_versions(self) -> list[Version]: + """A list of all installed Version objects which satisfy this dep. + + .. versionadded:: 1.0.0 + """ + return [tver for tver in self.target_versions if tver.is_installed] + + +class Origin: + """The origin of a version. + + Attributes defined here: + archive - The archive (eg. unstable) + component - The component (eg. main) + label - The Label, as set in the Release file + origin - The Origin, as set in the Release file + codename - The Codename, as set in the Release file + site - The hostname of the site. + trusted - Boolean value whether this is trustworthy. + """ + + def __init__(self, pkg: Package, packagefile: apt_pkg.PackageFile) -> None: + self.archive = packagefile.archive + self.component = packagefile.component + self.label = packagefile.label + self.origin = packagefile.origin + self.codename = packagefile.codename + self.site = packagefile.site + self.not_automatic = packagefile.not_automatic + # check the trust + indexfile = pkg._pcache._list.find_index(packagefile) + if indexfile and indexfile.is_trusted: + self.trusted = True + else: + self.trusted = False + + def __repr__(self) -> str: + return ( + "<Origin component:%r archive:%r origin:%r label:%r " + "site:%r isTrusted:%r>" + ) % ( + self.component, + self.archive, + self.origin, + self.label, + self.site, + self.trusted, + ) + + +class Record(Mapping[Any, Any]): + """Record in a Packages file + + Represent a record as stored in a Packages file. You can use this like + a dictionary mapping the field names of the record to their values:: + + >>> record = Record("Package: python-apt\\nVersion: 0.8.0\\n\\n") + >>> record["Package"] + 'python-apt' + >>> record["Version"] + '0.8.0' + + For example, to get the tasks of a package from a cache, you could do:: + + package.candidate.record["Tasks"].split() + + Of course, you can also use the :attr:`Version.tasks` property. + + """ + + def __init__(self, record_str: str) -> None: + self._rec = apt_pkg.TagSection(record_str) + + def __hash__(self) -> int: + return hash(self._rec) + + def __str__(self) -> str: + return str(self._rec) + + def __getitem__(self, key: str) -> str: + return self._rec[key] + + def __contains__(self, key: object) -> bool: + return key in self._rec + + def __iter__(self) -> Iterator[str]: + return iter(self._rec.keys()) + + def iteritems(self) -> Iterable[tuple[object, str]]: + """An iterator over the (key, value) items of the record.""" + for key in self._rec.keys(): + yield key, self._rec[key] + + def get(self, key: str, default: object = None) -> object: + """Return record[key] if key in record, else *default*. + + The parameter *default* must be either a string or None. + """ + return self._rec.get(key, default) + + def has_key(self, key: str) -> bool: + """deprecated form of ``key in x``.""" + return key in self._rec + + def __len__(self) -> int: + return len(self._rec) + + +class Version: + """Representation of a package version. + + The Version class contains all information related to a + specific package version. + + .. versionadded:: 0.7.9 + """ + + def __init__(self, package: Package, cand: apt_pkg.Version) -> None: + self.package = package + self._cand = cand + self.package._pcache._weakversions.add(self) + + def _cmp(self, other: Any) -> int | Any: + """Compares against another apt.Version object or a version string. + + This method behaves like Python 2's cmp builtin and returns an integer + according to the outcome. The return value is negative in case of + self < other, zero if self == other and positive if self > other. + + The comparison includes the package name and architecture if other is + an apt.Version object. If other isn't an apt.Version object it'll be + assumed that other is a version string (without package name/arch). + + .. versionchanged:: 1.0.0 + """ + # Assume that other is an apt.Version object. + try: + self_name = self.package.fullname + other_name = other.package.fullname + if self_name < other_name: + return -1 + elif self_name > other_name: + return 1 + return apt_pkg.version_compare(self._cand.ver_str, other.version) + except AttributeError: + # Assume that other is a string that only contains the version. + try: + return apt_pkg.version_compare(self._cand.ver_str, other) + except TypeError: + return NotImplemented + + def __eq__(self, other: object) -> bool: + return self._cmp(other) == 0 + + def __ge__(self, other: Version) -> bool: + return self._cmp(other) >= 0 + + def __gt__(self, other: Version) -> bool: + return self._cmp(other) > 0 + + def __le__(self, other: Version) -> bool: + return self._cmp(other) <= 0 + + def __lt__(self, other: Version) -> bool: + return self._cmp(other) < 0 + + def __ne__(self, other: object) -> bool | Any: + try: + return self._cmp(other) != 0 + except TypeError: + return NotImplemented + + def __hash__(self) -> int: + return self._cand.hash + + def __str__(self) -> str: + return f"{self.package.name}={self.version}" + + def __repr__(self) -> str: + return f"<Version: package:{self.package.name!r} version:{self.version!r}>" + + @property + def _records(self) -> apt_pkg.PackageRecords: + """Internal helper that moves the Records to the right position.""" + # If changing lookup, change fetch_binary() as well + if not self.package._pcache._records.lookup(self._cand.file_list[0]): + raise LookupError("Could not lookup record") + + return self.package._pcache._records + + @property + def _translated_records(self) -> apt_pkg.PackageRecords | None: + """Internal helper to get the translated description.""" + desc_iter = self._cand.translated_description + if self.package._pcache._records.lookup(desc_iter.file_list.pop(0)): + return self.package._pcache._records + return None + + @property + def is_security_update(self) -> bool: + """Return whether this version is a security update.""" + return bool(self._cand.is_security_update) + + @property + def installed_size(self) -> int: + """Return the size of the package when installed.""" + return self._cand.installed_size + + @property + def homepage(self) -> str: + """Return the homepage for the package.""" + return self._records.homepage + + @property + def size(self) -> int: + """Return the size of the package.""" + return self._cand.size + + @property + def architecture(self) -> str: + """Return the architecture of the package version.""" + return self._cand.arch + + @property + def downloadable(self) -> bool: + """Return whether the version of the package is downloadable.""" + return bool(self._cand.downloadable) + + @property + def is_installed(self) -> bool: + """Return wether this version of the package is currently installed. + + .. versionadded:: 1.0.0 + """ + inst_ver = self.package.installed + return inst_ver is not None and inst_ver._cand.id == self._cand.id + + @property + def version(self) -> str: + """Return the version as a string.""" + return self._cand.ver_str + + @property + def summary(self) -> str | None: + """Return the short description (one line summary).""" + records = self._translated_records + return records.short_desc if records is not None else None + + @property + def raw_description(self) -> str: + """return the long description (raw).""" + return self._records.long_desc + + @property + def section(self) -> str: + """Return the section of the package.""" + return self._cand.section + + @property + def description(self) -> str: + """Return the formatted long description. + + Return the formatted long description according to the Debian policy + (Chapter 5.6.13). + See http://www.debian.org/doc/debian-policy/ch-controlfields.html + for more information. + """ + desc = "" + records = self._translated_records + dsc = records.long_desc if records is not None else None + + if not dsc: + return _("Missing description for '%s'." "Please report.") % ( + self.package.name + ) + + try: + if not isinstance(dsc, str): + # Only convert where needed (i.e. Python 2.X) + dsc = dsc.decode("utf-8") + except UnicodeDecodeError as err: + return _( + "Invalid unicode in description for '%s' (%s). " "Please report." + ) % (self.package.name, err) + + lines = iter(dsc.split("\n")) + # Skip the first line, since its a duplication of the summary + next(lines) + for raw_line in lines: + if raw_line.strip() == ".": + # The line is just line break + if not desc.endswith("\n"): + desc += "\n\n" + continue + if raw_line.startswith(" "): + # The line should be displayed verbatim without word wrapping + if not desc.endswith("\n"): + line = "\n%s\n" % raw_line[2:] + else: + line = "%s\n" % raw_line[2:] + elif raw_line.startswith(" "): + # The line is part of a paragraph. + if desc.endswith("\n") or desc == "": + # Skip the leading white space + line = raw_line[1:] + else: + line = raw_line + else: + line = raw_line + # Add current line to the description + desc += line + return desc + + @property + def source_name(self) -> str: + """Return the name of the source package.""" + try: + return self._records.source_pkg or self.package.shortname + except IndexError: + return self.package.shortname + + @property + def source_version(self) -> str: + """Return the version of the source package.""" + try: + return self._records.source_ver or self._cand.ver_str + except IndexError: + return self._cand.ver_str + + @property + def priority(self) -> str: + """Return the priority of the package, as string.""" + return self._cand.priority_str + + @property + def policy_priority(self) -> int: + """Return the internal policy priority as a number. + See apt_preferences(5) for more information about what it means. + """ + return self.package._pcache._depcache.policy.get_priority(self._cand) + + @property + def record(self) -> Record: + """Return a Record() object for this version. + + Return a Record() object for this version which provides access + to the raw attributes of the candidate version + """ + return Record(self._records.record) + + def get_dependencies(self, *types: str) -> list[Dependency]: + """Return a list of Dependency objects for the given types. + + Multiple types can be specified. Possible types are: + 'Breaks', 'Conflicts', 'Depends', 'Enhances', 'PreDepends', + 'Recommends', 'Replaces', 'Suggests' + + Additional types might be added in the future. + """ + depends_list = [] + depends = self._cand.depends_list + for type_ in types: + try: + for dep_ver_list in depends[type_]: + base_deps = [] + for dep_or in dep_ver_list: + base_deps.append(BaseDependency(self, dep_or)) + depends_list.append(Dependency(self, base_deps, type_)) + except KeyError: + pass + return depends_list + + @property + def provides(self) -> list[str]: + """Return a list of names that this version provides.""" + return [p[0] for p in self._cand.provides_list] + + @property + def enhances(self) -> list[Dependency]: + """Return the list of enhances for the package version.""" + return self.get_dependencies("Enhances") + + @property + def dependencies(self) -> list[Dependency]: + """Return the dependencies of the package version.""" + return self.get_dependencies("PreDepends", "Depends") + + @property + def recommends(self) -> list[Dependency]: + """Return the recommends of the package version.""" + return self.get_dependencies("Recommends") + + @property + def suggests(self) -> list[Dependency]: + """Return the suggests of the package version.""" + return self.get_dependencies("Suggests") + + @property + def origins(self) -> list[Origin]: + """Return a list of origins for the package version.""" + origins = [] + for packagefile, _unused in self._cand.file_list: + origins.append(Origin(self.package, packagefile)) + return origins + + @property + def filename(self) -> str: + """Return the path to the file inside the archive. + + .. versionadded:: 0.7.10 + """ + return self._records.filename + + @property + def md5(self) -> str: + """Return the md5sum of the binary. + + .. versionadded:: 0.7.10 + """ + return self._records.md5_hash + + @property + def sha1(self) -> str: + """Return the sha1sum of the binary. + + .. versionadded:: 0.7.10 + """ + return self._records.sha1_hash + + @property + def sha256(self) -> str: + """Return the sha256sum of the binary. + + .. versionadded:: 0.7.10 + """ + return self._records.sha256_hash + + @property + def tasks(self) -> set[str]: + """Get the tasks of the package. + + A set of the names of the tasks this package belongs to. + + .. versionadded:: 0.8.0 + """ + return set(self.record["Task"].split()) + + def _uris(self) -> Iterator[str]: + """Return an iterator over all available urls. + + .. versionadded:: 0.7.10 + """ + for packagefile, _unused in self._cand.file_list: + indexfile = self.package._pcache._list.find_index(packagefile) + if indexfile: + yield indexfile.archive_uri(self._records.filename) + + @property + def uris(self) -> list[str]: + """Return a list of all available uris for the binary. + + .. versionadded:: 0.7.10 + """ + return list(self._uris()) + + @property + def uri(self) -> str | None: + """Return a single URI for the binary. + + .. versionadded:: 0.7.10 + """ + try: + return next(iter(self._uris())) + except StopIteration: + return None + + def fetch_binary( + self, + destdir: str = "", + progress: AcquireProgress | None = None, + allow_unauthenticated: bool | None = None, + ) -> str: + """Fetch the binary version of the package. + + The parameter *destdir* specifies the directory where the package will + be fetched to. + + The parameter *progress* may refer to an apt_pkg.AcquireProgress() + object. If not specified or None, apt.progress.text.AcquireProgress() + is used. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + + .. versionadded:: 0.7.10 + """ + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + base = os.path.basename(self._records.filename) + destfile = os.path.join(destdir, base) + if _file_is_same(destfile, self.size, self._records.hashes): + logging.debug("Ignoring already existing file: %s" % destfile) + return os.path.abspath(destfile) + + # Verify that the index is actually trusted + pfile, offset = self._cand.file_list[0] + index = self.package._pcache._list.find_index(pfile) + + if not (allow_unauthenticated or (index and index.is_trusted)): + raise UntrustedError( + "Could not fetch %s %s source package: " + "Source %r is not trusted" + % ( + self.package.name, + self.version, + getattr(index, "describe", "<unknown>"), + ) + ) + if not self.uri: + raise ValueError("No URI for this binary.") + hashes = self._records.hashes + if not (allow_unauthenticated or hashes.usable): + raise UntrustedError( + "The item %r could not be fetched: " "No trusted hash found." % destfile + ) + acq = apt_pkg.Acquire(progress or apt.progress.text.AcquireProgress()) + acqfile = apt_pkg.AcquireFile( + acq, self.uri, hashes, self.size, base, destfile=destfile + ) + acq.run() + + if acqfile.status != acqfile.STAT_DONE: + raise FetchError( + "The item %r could not be fetched: %s" + % (acqfile.destfile, acqfile.error_text) + ) + + return os.path.abspath(destfile) + + def fetch_source( + self, + destdir: str = "", + progress: AcquireProgress | None = None, + unpack: bool = True, + allow_unauthenticated: bool | None = None, + ) -> str: + """Get the source code of a package. + + The parameter *destdir* specifies the directory where the source will + be fetched to. + + The parameter *progress* may refer to an apt_pkg.AcquireProgress() + object. If not specified or None, apt.progress.text.AcquireProgress() + is used. + + The parameter *unpack* describes whether the source should be unpacked + (``True``) or not (``False``). By default, it is unpacked. + + If *unpack* is ``True``, the path to the extracted directory is + returned. Otherwise, the path to the .dsc file is returned. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + """ + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + + src = apt_pkg.SourceRecords() + acq = apt_pkg.Acquire(progress or apt.progress.text.AcquireProgress()) + + dsc = None + record = self._records + source_name = record.source_pkg or self.package.shortname + source_version = record.source_ver or self._cand.ver_str + source_lookup = src.lookup(source_name) + + while source_lookup and source_version != src.version: + source_lookup = src.lookup(source_name) + if not source_lookup: + raise ValueError("No source for %r" % self) + files = list() + + if not (allow_unauthenticated or src.index.is_trusted): + raise UntrustedError( + "Could not fetch %s %s source package: " + "Source %r is not trusted" + % (self.package.name, self.version, src.index.describe) + ) + for fil in src.files: + base = os.path.basename(fil.path) + destfile = os.path.join(destdir, base) + if fil.type == "dsc": + dsc = destfile + if _file_is_same(destfile, fil.size, fil.hashes): + logging.debug("Ignoring already existing file: %s" % destfile) + continue + + if not (allow_unauthenticated or fil.hashes.usable): + raise UntrustedError( + "The item %r could not be fetched: " + "No trusted hash found." % destfile + ) + files.append( + apt_pkg.AcquireFile( + acq, + src.index.archive_uri(fil.path), + fil.hashes, + fil.size, + base, + destfile=destfile, + ) + ) + acq.run() + + if dsc is None: + raise ValueError("No source for %r" % self) + + for item in acq.items: + if item.status != item.STAT_DONE: + raise FetchError( + "The item %r could not be fetched: %s" + % (item.destfile, item.error_text) + ) + + if unpack: + outdir = src.package + "-" + apt_pkg.upstream_version(src.version) + outdir = os.path.join(destdir, outdir) + subprocess.check_call(["dpkg-source", "-x", dsc, outdir]) + return os.path.abspath(outdir) + else: + return os.path.abspath(dsc) + + +class VersionList(Sequence[Version]): + """Provide a mapping & sequence interface to all versions of a package. + + This class can be used like a dictionary, where version strings are the + keys. It can also be used as a sequence, where integers are the keys. + + You can also convert this to a dictionary or a list, using the usual way + of dict(version_list) or list(version_list). This is useful if you need + to access the version objects multiple times, because they do not have to + be recreated this way. + + Examples ('package.versions' being a version list): + '0.7.92' in package.versions # Check whether 0.7.92 is a valid version. + package.versions[0] # Return first version or raise IndexError + package.versions[0:2] # Return a new VersionList for objects 0-2 + package.versions['0.7.92'] # Return version 0.7.92 or raise KeyError + package.versions.keys() # All keys, as strings. + max(package.versions) + """ + + def __init__(self, package: Package, slice_: slice | None = None) -> None: + self._package = package # apt.package.Package() + self._versions = package._pkg.version_list # [apt_pkg.Version(), ...] + if slice_: + self._versions = self._versions[slice_] + + def __getitem__(self, item: int | slice | str) -> Any: + # FIXME: Should not be returning Any, should have overloads; but + # pyflakes complains + if isinstance(item, slice): + return self.__class__(self._package, item) + try: + # Sequence interface, item is an integer + return Version(self._package, self._versions[item]) # type: ignore + except TypeError: + # Dictionary interface item is a string. + for ver in self._versions: + if ver.ver_str == item: + return Version(self._package, ver) + raise KeyError("Version: %r not found." % (item)) + + def __str__(self) -> str: + return "[%s]" % (", ".join(str(ver) for ver in self)) + + def __repr__(self) -> str: + return "<VersionList: %r>" % self.keys() + + def __iter__(self) -> Iterator[Version]: + """Return an iterator over all value objects.""" + return (Version(self._package, ver) for ver in self._versions) + + def __contains__(self, item: object) -> bool: + if isinstance(item, Version): # Sequence interface + item = item.version + # Dictionary interface. + for ver in self._versions: + if ver.ver_str == item: + return True + return False + + def __eq__(self, other: Any) -> bool: + return list(self) == list(other) + + def __len__(self) -> int: + return len(self._versions) + + # Mapping interface + + def keys(self) -> list[str]: + """Return a list of all versions, as strings.""" + return [ver.ver_str for ver in self._versions] + + def get(self, key: str, default: Version | None = None) -> Version | None: + """Return the key or the default.""" + try: + return self[key] # type: ignore # FIXME: should be deterined automatically # noqa + except LookupError: + return default + + +class Package: + """Representation of a package in a cache. + + This class provides methods and properties for working with a package. It + lets you mark the package for installation, check if it is installed, and + much more. + """ + + def __init__(self, pcache: apt.Cache, pkgiter: apt_pkg.Package) -> None: + """Init the Package object""" + self._pkg = pkgiter + self._pcache = pcache # python cache in cache.py + self._changelog = "" # Cached changelog + + def __str__(self) -> str: + return self.name + + def __repr__(self) -> str: + return "<Package: name:{!r} architecture={!r} id:{!r}>".format( + self._pkg.name, + self._pkg.architecture, + self._pkg.id, + ) + + def __lt__(self, other: Package) -> bool: + return self.name < other.name + + @property + def candidate(self) -> Version | None: + """Return the candidate version of the package. + + This property is writeable to allow you to set the candidate version + of the package. Just assign a Version() object, and it will be set as + the candidate version. + """ + cand = self._pcache._depcache.get_candidate_ver(self._pkg) + if cand is not None: + return Version(self, cand) + return None + + @candidate.setter + def candidate(self, version: Version) -> None: + """Set the candidate version of the package.""" + self._pcache.cache_pre_change() + self._pcache._depcache.set_candidate_ver(self._pkg, version._cand) + self._pcache.cache_post_change() + + @property + def installed(self) -> Version | None: + """Return the currently installed version of the package. + + .. versionadded:: 0.7.9 + """ + if self._pkg.current_ver is not None: + return Version(self, self._pkg.current_ver) + return None + + @property + def name(self) -> str: + """Return the name of the package, possibly including architecture. + + If the package is not part of the system's preferred architecture, + return the same as :attr:`fullname`, otherwise return the same + as :attr:`shortname` + + .. versionchanged:: 0.7.100.3 + + As part of multi-arch, this field now may include architecture + information. + """ + return self._pkg.get_fullname(True) + + @property + def fullname(self) -> str: + """Return the name of the package, including architecture. + + Note that as for :meth:`architecture`, this returns the + native architecture for Architecture: all packages. + + .. versionadded:: 0.7.100.3""" + return self._pkg.get_fullname(False) + + @property + def shortname(self) -> str: + """Return the name of the package, without architecture. + + .. versionadded:: 0.7.100.3""" + return self._pkg.name + + @property + def id(self) -> int: + """Return a uniq ID for the package. + + This can be used eg. to store additional information about the pkg.""" + return self._pkg.id + + @property + def essential(self) -> bool: + """Return True if the package is an essential part of the system.""" + return self._pkg.essential + + def architecture(self) -> str: + """Return the Architecture of the package. + + Note that for Architecture: all packages, this returns the + native architecture, as they are internally treated like native + packages. To get the concrete architecture, look at the + :attr:`Version.architecture` attribute. + + .. versionchanged:: 0.7.100.3 + This is now the package's architecture in the multi-arch sense, + previously it was the architecture of the candidate version + and deprecated. + """ + return self._pkg.architecture + + # depcache states + + @property + def marked_install(self) -> bool: + """Return ``True`` if the package is marked for install.""" + return self._pcache._depcache.marked_install(self._pkg) + + @property + def marked_upgrade(self) -> bool: + """Return ``True`` if the package is marked for upgrade.""" + return self._pcache._depcache.marked_upgrade(self._pkg) + + @property + def marked_delete(self) -> bool: + """Return ``True`` if the package is marked for delete.""" + return self._pcache._depcache.marked_delete(self._pkg) + + @property + def marked_keep(self) -> bool: + """Return ``True`` if the package is marked for keep.""" + return self._pcache._depcache.marked_keep(self._pkg) + + @property + def marked_downgrade(self) -> bool: + """Package is marked for downgrade""" + return self._pcache._depcache.marked_downgrade(self._pkg) + + @property + def marked_reinstall(self) -> bool: + """Return ``True`` if the package is marked for reinstall.""" + return self._pcache._depcache.marked_reinstall(self._pkg) + + @property + def is_installed(self) -> bool: + """Return ``True`` if the package is installed.""" + return self._pkg.current_ver is not None + + @property + def is_upgradable(self) -> bool: + """Return ``True`` if the package is upgradable.""" + return self.is_installed and self._pcache._depcache.is_upgradable(self._pkg) + + @property + def is_auto_removable(self) -> bool: + """Return ``True`` if the package is no longer required. + + If the package has been installed automatically as a dependency of + another package, and if no packages depend on it anymore, the package + is no longer required. + """ + return ( + self.is_installed or self.marked_install + ) and self._pcache._depcache.is_garbage(self._pkg) + + @property + def is_auto_installed(self) -> bool: + """Return whether the package is marked as automatically installed.""" + return self._pcache._depcache.is_auto_installed(self._pkg) + + @property + def phasing_applied(self) -> bool: + """Return ``True`` if the package update is being phased.""" + return self._pcache._depcache.phasing_applied(self._pkg) + + # sizes + + @property + def installed_files(self) -> list[str]: + """Return a list of files installed by the package. + + Return a list of unicode names of the files which have + been installed by this package + """ + for name in self.name, self.fullname: + path = "/var/lib/dpkg/info/%s.list" % name + try: + with open(path, "rb") as file_list: + return file_list.read().decode("utf-8").strip().split("\n") + except OSError: + continue + + return [] + + def get_changelog( + self, uri: str | None = None, cancel_lock: threading.Event | None = None + ) -> str: + """ + Download the changelog of the package and return it as unicode + string. + + The parameter *uri* refers to the uri of the changelog file. It may + contain multiple named variables which will be substitued. These + variables are (src_section, prefix, src_pkg, src_ver). An example is + the Ubuntu changelog:: + + "http://changelogs.ubuntu.com/changelogs/pool" \\ + "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\ + "/%(src_pkg)s_%(src_ver)s/changelog" + + The parameter *cancel_lock* refers to an instance of threading.Event, + which if set, prevents the download. + """ + # Return a cached changelog if available + if self._changelog != "": + return self._changelog + + if not self.candidate: + return _("The list of changes is not available") + + if uri is None: + if self.candidate.origins[0].origin == "Debian": + uri = ( + "http://packages.debian.org/changelogs/pool" + "/%(src_section)s/%(prefix)s/%(src_pkg)s" + "/%(src_pkg)s_%(src_ver)s/changelog" + ) + elif self.candidate.origins[0].origin == "Ubuntu": + uri = ( + "http://changelogs.ubuntu.com/changelogs/pool" + "/%(src_section)s/%(prefix)s/%(src_pkg)s" + "/%(src_pkg)s_%(src_ver)s/changelog" + ) + else: + res = _("The list of changes is not available") + if isinstance(res, str): + return res + else: + return res.decode("utf-8") + + # get the src package name + src_pkg = self.candidate.source_name + + # assume "main" section + src_section = "main" + # use the section of the candidate as a starting point + section = self.candidate.section + + # get the source version + src_ver = self.candidate.source_version + + try: + # try to get the source version of the pkg, this differs + # for some (e.g. libnspr4 on ubuntu) + # this feature only works if the correct deb-src are in the + # sources.list otherwise we fall back to the binary version number + src_records = apt_pkg.SourceRecords() + except SystemError: + pass + else: + while src_records.lookup(src_pkg): + if not src_records.version: + continue + if self.candidate.source_version == src_records.version: + # Direct match, use it and do not do more lookups. + src_ver = src_records.version + section = src_records.section + break + if apt_pkg.version_compare(src_records.version, src_ver) > 0: + # The version is higher, it seems to match. + src_ver = src_records.version + section = src_records.section + + section_split = section.split("/", 1) + if len(section_split) > 1: + src_section = section_split[0] + del section_split + + # lib is handled special + prefix = src_pkg[0] + if src_pkg.startswith("lib"): + prefix = "lib" + src_pkg[3] + + # stip epoch + src_ver_split = src_ver.split(":", 1) + if len(src_ver_split) > 1: + src_ver = "".join(src_ver_split[1:]) + del src_ver_split + + uri = uri % { + "src_section": src_section, + "prefix": prefix, + "src_pkg": src_pkg, + "src_ver": src_ver, + } + + timeout = socket.getdefaulttimeout() + + # FIXME: when python2.4 vanishes from the archive, + # merge this into a single try..finally block (pep 341) + try: + try: + # Set a timeout for the changelog download + socket.setdefaulttimeout(2) + + # Check if the download was canceled + if cancel_lock and cancel_lock.is_set(): + return "" + # FIXME: python3.2: Should be closed manually + changelog_file = urlopen(uri) + # do only get the lines that are new + changelog = "" + regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(src_pkg)) + while True: + # Check if the download was canceled + if cancel_lock and cancel_lock.is_set(): + return "" + # Read changelog line by line + line_raw = changelog_file.readline() + if not line_raw: + break + # The changelog is encoded in utf-8, but since there isn't + # any http header, urllib2 seems to treat it as ascii + line = line_raw.decode("utf-8") + + # print line.encode('utf-8') + match = re.match(regexp, line) + if match: + # strip epoch from installed version + # and from changelog too + installed = getattr(self.installed, "version", None) + if installed and ":" in installed: + installed = installed.split(":", 1)[1] + changelog_ver = match.group(1) + if changelog_ver and ":" in changelog_ver: + changelog_ver = changelog_ver.split(":", 1)[1] + + if ( + installed + and apt_pkg.version_compare(changelog_ver, installed) <= 0 + ): + break + # EOF (shouldn't really happen) + changelog += line + + # Print an error if we failed to extract a changelog + if len(changelog) == 0: + changelog = _("The list of changes is not available") + if not isinstance(changelog, str): + changelog = changelog.decode("utf-8") + self._changelog = changelog + + except HTTPError: + if self.candidate.origins[0].origin == "Ubuntu": + res = _( + "The list of changes is not available yet.\n\n" + "Please use " + "http://launchpad.net/ubuntu/+source/%s/" + "%s/+changelog\n" + "until the changes become available or try again " + "later." + ) % (src_pkg, src_ver) + else: + res = _("The list of changes is not available") + if isinstance(res, str): + return res + else: + return res.decode("utf-8") + except (OSError, BadStatusLine): + res = _( + "Failed to download the list of changes. \nPlease " + "check your Internet connection." + ) + if isinstance(res, str): + return res + else: + return res.decode("utf-8") + finally: + socket.setdefaulttimeout(timeout) + return self._changelog + + @property + def versions(self) -> VersionList: + """Return a VersionList() object for all available versions. + + .. versionadded:: 0.7.9 + """ + return VersionList(self) + + @property + def is_inst_broken(self) -> bool: + """Return True if the to-be-installed package is broken.""" + return self._pcache._depcache.is_inst_broken(self._pkg) + + @property + def is_now_broken(self) -> bool: + """Return True if the installed package is broken.""" + return self._pcache._depcache.is_now_broken(self._pkg) + + @property + def has_config_files(self) -> bool: + """Checks whether the package is is the config-files state.""" + return self._pkg.current_state == apt_pkg.CURSTATE_CONFIG_FILES + + # depcache actions + + def mark_keep(self) -> None: + """Mark a package for keep.""" + self._pcache.cache_pre_change() + self._pcache._depcache.mark_keep(self._pkg) + self._pcache.cache_post_change() + + def mark_delete(self, auto_fix: bool = True, purge: bool = False) -> None: + """Mark a package for deletion. + + If *auto_fix* is ``True``, the resolver will be run, trying to fix + broken packages. This is the default. + + If *purge* is ``True``, remove the configuration files of the package + as well. The default is to keep the configuration. + """ + self._pcache.cache_pre_change() + self._pcache._depcache.mark_delete(self._pkg, purge) + # try to fix broken stuffsta + if auto_fix and self._pcache._depcache.broken_count > 0: + fix = apt_pkg.ProblemResolver(self._pcache._depcache) + fix.clear(self._pkg) + fix.protect(self._pkg) + fix.remove(self._pkg) + fix.resolve() + self._pcache.cache_post_change() + + def mark_install( + self, auto_fix: bool = True, auto_inst: bool = True, from_user: bool = True + ) -> None: + """Mark a package for install. + + If *autoFix* is ``True``, the resolver will be run, trying to fix + broken packages. This is the default. + + If *autoInst* is ``True``, the dependencies of the packages will be + installed automatically. This is the default. + + If *fromUser* is ``True``, this package will not be marked as + automatically installed. This is the default. Set it to False if you + want to be able to automatically remove the package at a later stage + when no other package depends on it. + """ + self._pcache.cache_pre_change() + self._pcache._depcache.mark_install(self._pkg, auto_inst, from_user) + # try to fix broken stuff + if auto_fix and self._pcache._depcache.broken_count > 0: + fixer = apt_pkg.ProblemResolver(self._pcache._depcache) + fixer.clear(self._pkg) + fixer.protect(self._pkg) + fixer.resolve(True) + self._pcache.cache_post_change() + + def mark_upgrade(self, from_user: bool = True) -> None: + """Mark a package for upgrade.""" + if self.is_upgradable: + auto = self.is_auto_installed + self.mark_install(from_user=from_user) + self.mark_auto(auto) + else: + # FIXME: we may want to throw a exception here + sys.stderr.write( + ("MarkUpgrade() called on a non-upgradeable pkg: " "'%s'\n") + % self._pkg.name + ) + + def mark_auto(self, auto: bool = True) -> None: + """Mark a package as automatically installed. + + Call this function to mark a package as automatically installed. If the + optional parameter *auto* is set to ``False``, the package will not be + marked as automatically installed anymore. The default is ``True``. + """ + self._pcache._depcache.mark_auto(self._pkg, auto) + + def commit(self, fprogress: AcquireProgress, iprogress: InstallProgress) -> None: + """Commit the changes. + + The parameter *fprogress* refers to a apt_pkg.AcquireProgress() object, + like apt.progress.text.AcquireProgress(). + + The parameter *iprogress* refers to an InstallProgress() object, as + found in apt.progress.base. + """ + self._pcache._depcache.commit(fprogress, iprogress) + + +@no_type_check +def _test(): + """Self-test.""" + print("Self-test for the Package modul") + import random + + apt_pkg.init() + progress = apt.progress.text.OpProgress() + cache = apt.Cache(progress) + pkg = cache["apt-utils"] + print("Name: %s " % pkg.name) + print("ID: %s " % pkg.id) + print("Priority (Candidate): %s " % pkg.candidate.priority) + print("Priority (Installed): %s " % pkg.installed.priority) + print("Installed: %s " % pkg.installed.version) + print("Candidate: %s " % pkg.candidate.version) + print("CandidateDownloadable: %s" % pkg.candidate.downloadable) + print("CandidateOrigins: %s" % pkg.candidate.origins) + print("SourcePkg: %s " % pkg.candidate.source_name) + print("Section: %s " % pkg.section) + print("Summary: %s" % pkg.candidate.summary) + print("Description (formatted) :\n%s" % pkg.candidate.description) + print("Description (unformatted):\n%s" % pkg.candidate.raw_description) + print("InstalledSize: %s " % pkg.candidate.installed_size) + print("PackageSize: %s " % pkg.candidate.size) + print("Dependencies: %s" % pkg.installed.dependencies) + print("Recommends: %s" % pkg.installed.recommends) + for dep in pkg.candidate.dependencies: + print( + ",".join( + f"{o.name} ({o.version}) ({o.relation}) ({o.pre_depend})" + for o in dep.or_dependencies + ) + ) + print("arch: %s" % pkg.candidate.architecture) + print("homepage: %s" % pkg.candidate.homepage) + print("rec: ", pkg.candidate.record) + + print(cache["2vcard"].get_changelog()) + for i in True, False: + print("Running install on random upgradable pkgs with AutoFix: ", i) + for pkg in cache: + if pkg.is_upgradable: + if random.randint(0, 1) == 1: + pkg.mark_install(i) + print("Broken: %s " % cache._depcache.broken_count) + print("InstCount: %s " % cache._depcache.inst_count) + + print() + # get a new cache + for i in True, False: + print("Randomly remove some packages with AutoFix: %s" % i) + cache = apt.Cache(progress) + for name in cache.keys(): + if random.randint(0, 1) == 1: + try: + cache[name].mark_delete(i) + except SystemError: + print("Error trying to remove: %s " % name) + print("Broken: %s " % cache._depcache.broken_count) + print("DelCount: %s " % cache._depcache.del_count) + + +# self-test +if __name__ == "__main__": + _test() diff --git a/apt/progress/__init__.py b/apt/progress/__init__.py new file mode 100644 index 0000000..d1687d5 --- /dev/null +++ b/apt/progress/__init__.py @@ -0,0 +1,28 @@ +# apt/progress/__init__.py - Initialization file for apt.progress. +# +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting. + +This package provides progress reporting for the python-apt package. The module +'base' provides classes with no output, and the module 'text' provides classes +for terminals, etc. +""" + +from collections.abc import Sequence + +__all__: Sequence[str] = [] diff --git a/apt/progress/base.py b/apt/progress/base.py new file mode 100644 index 0000000..ede5e5c --- /dev/null +++ b/apt/progress/base.py @@ -0,0 +1,332 @@ +# apt/progress/base.py - Base classes for progress reporting. +# +# Copyright (C) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +# pylint: disable-msg = R0201 +"""Base classes for progress reporting. + +Custom progress classes should inherit from these classes. They can also be +used as dummy progress classes which simply do nothing. +""" +from __future__ import annotations + +import errno +import fcntl +import io +import os +import re +import select +import sys + +import apt_pkg + +__all__ = ["AcquireProgress", "CdromProgress", "InstallProgress", "OpProgress"] + + +class AcquireProgress: + """Monitor object for downloads controlled by the Acquire class. + + This is an mostly abstract class. You should subclass it and implement the + methods to get something useful. + """ + + current_bytes = current_cps = fetched_bytes = last_bytes = total_bytes = 0.0 + current_items = elapsed_time = total_items = 0 + + def done(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item is successfully and completely fetched.""" + + def fail(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item could not be fetched.""" + + def fetch(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when some of the item's data is fetched.""" + + def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item is confirmed to be up-to-date. + + Invoked when an item is confirmed to be up-to-date. For instance, + when an HTTP download is informed that the file on the server was + not modified. + """ + + def media_change(self, media: str, drive: str) -> bool: + """Prompt the user to change the inserted removable media. + + The parameter 'media' decribes the name of the media type that + should be changed, whereas the parameter 'drive' should be the + identifying name of the drive whose media should be changed. + + This method should not return until the user has confirmed to the user + interface that the media change is complete. It must return True if + the user confirms the media change, or False to cancel it. + """ + return False + + def pulse(self, owner: apt_pkg.Acquire) -> bool: + """Periodically invoked while the Acquire process is underway. + + This method gets invoked while the Acquire progress given by the + parameter 'owner' is underway. It should display information about + the current state. + + This function returns a boolean value indicating whether the + acquisition should be continued (True) or cancelled (False). + """ + return True + + def start(self) -> None: + """Invoked when the Acquire process starts running.""" + # Reset all our values. + self.current_bytes = 0.0 + self.current_cps = 0.0 + self.current_items = 0 + self.elapsed_time = 0 + self.fetched_bytes = 0.0 + self.last_bytes = 0.0 + self.total_bytes = 0.0 + self.total_items = 0 + + def stop(self) -> None: + """Invoked when the Acquire process stops running.""" + + +class CdromProgress: + """Base class for reporting the progress of adding a cdrom. + + Can be used with apt_pkg.Cdrom to produce an utility like apt-cdrom. The + attribute 'total_steps' defines the total number of steps and can be used + in update() to display the current progress. + """ + + total_steps = 0 + + def ask_cdrom_name(self) -> str | None: + """Ask for the name of the cdrom. + + If a name has been provided, return it. Otherwise, return None to + cancel the operation. + """ + + def change_cdrom(self) -> bool: + """Ask for the CD-ROM to be changed. + + Return True once the cdrom has been changed or False to cancel the + operation. + """ + return False + + def update(self, text: str, current: int) -> None: + """Periodically invoked to update the interface. + + The string 'text' defines the text which should be displayed. The + integer 'current' defines the number of completed steps. + """ + + +class InstallProgress: + """Class to report the progress of installing packages.""" + + child_pid, percent, select_timeout, status = 0, 0.0, 0.1, "" + + def __init__(self) -> None: + (self.statusfd, self.writefd) = os.pipe() + # These will leak fds, but fixing this safely requires API changes. + self.write_stream: io.TextIOBase = os.fdopen(self.writefd, "w") + self.status_stream: io.TextIOBase = os.fdopen(self.statusfd, "r") # noqa + fcntl.fcntl(self.statusfd, fcntl.F_SETFL, os.O_NONBLOCK) + + def start_update(self) -> None: + """(Abstract) Start update.""" + + def finish_update(self) -> None: + """(Abstract) Called when update has finished.""" + + def __enter__(self) -> InstallProgress: + return self + + def __exit__(self, type: object, value: object, traceback: object) -> None: + self.write_stream.close() + self.status_stream.close() + + def error(self, pkg: str, errormsg: str) -> None: + """(Abstract) Called when a error is detected during the install.""" + + def conffile(self, current: str, new: str) -> None: + """(Abstract) Called when a conffile question from dpkg is detected.""" + + def status_change(self, pkg: str, percent: float, status: str) -> None: + """(Abstract) Called when the APT status changed.""" + + def dpkg_status_change(self, pkg: str, status: str) -> None: + """(Abstract) Called when the dpkg status changed.""" + + def processing(self, pkg: str, stage: str) -> None: + """(Abstract) Sent just before a processing stage starts. + + The parameter 'stage' is one of "upgrade", "install" + (both sent before unpacking), "configure", "trigproc", "remove", + "purge". This method is used for dpkg only. + """ + + def run(self, obj: apt_pkg.PackageManager | bytes | str) -> int: + """Install using the object 'obj'. + + This functions runs install actions. The parameter 'obj' may either + be a PackageManager object in which case its do_install() method is + called or the path to a deb file. + + If the object is a PackageManager, the functions returns the result + of calling its do_install() method. Otherwise, the function returns + the exit status of dpkg. In both cases, 0 means that there were no + problems. + """ + pid = self.fork() + if pid == 0: + try: + # PEP-446 implemented in Python 3.4 made all descriptors + # CLOEXEC, but we need to be able to pass writefd to dpkg + # when we spawn it + os.set_inheritable(self.writefd, True) + except AttributeError: # if we don't have os.set_inheritable() + pass + # pm.do_install might raise a exception, + # when this happens, we need to catch + # it, otherwise os._exit() is not run + # and the execution continues in the + # parent code leading to very confusing bugs + try: + os._exit(obj.do_install(self.write_stream.fileno())) # type: ignore # noqa + except AttributeError: + os._exit( + os.spawnlp( + os.P_WAIT, + "dpkg", + "dpkg", + "--status-fd", + str(self.write_stream.fileno()), + "-i", + obj, # type: ignore # noqa + ) + ) + except Exception as e: + sys.stderr.write("%s\n" % e) + os._exit(apt_pkg.PackageManager.RESULT_FAILED) + + self.child_pid = pid + res = self.wait_child() + return os.WEXITSTATUS(res) + + def fork(self) -> int: + """Fork.""" + return os.fork() + + def update_interface(self) -> None: + """Update the interface.""" + try: + line = self.status_stream.readline() + except OSError as err: + # resource temporarly unavailable is ignored + if err.errno != errno.EAGAIN and err.errno != errno.EWOULDBLOCK: + print(err.strerror) + return + + pkgname = status = status_str = percent = base = "" + + if line.startswith("pm"): + try: + (status, pkgname, percent, status_str) = line.split(":", 3) + except ValueError: + # silently ignore lines that can't be parsed + return + elif line.startswith("status"): + try: + (base, pkgname, status, status_str) = line.split(":", 3) + except ValueError: + (base, pkgname, status) = line.split(":", 2) + elif line.startswith("processing"): + (status, status_str, pkgname) = line.split(":", 2) + self.processing(pkgname.strip(), status_str.strip()) + + # Always strip the status message + pkgname = pkgname.strip() + status_str = status_str.strip() + status = status.strip() + + if status == "pmerror" or status == "error": + self.error(pkgname, status_str) + elif status == "conffile-prompt" or status == "pmconffile": + match = re.match("\\s*'(.*)'\\s*'(.*)'.*", status_str) + if match: + self.conffile(match.group(1), match.group(2)) + elif status == "pmstatus": + # FIXME: Float comparison + if float(percent) != self.percent or status_str != self.status: + self.status_change(pkgname, float(percent), status_str.strip()) + self.percent = float(percent) + self.status = status_str.strip() + elif base == "status": + self.dpkg_status_change(pkgname, status) + + def wait_child(self) -> int: + """Wait for child progress to exit. + + This method is responsible for calling update_interface() from time to + time. It exits once the child has exited. The return values is the + full status returned from os.waitpid() (not only the return code). + """ + (pid, res) = (0, 0) + while True: + try: + select.select([self.status_stream], [], [], self.select_timeout) + except OSError as error: + (errno_, _errstr) = error.args + if errno_ != errno.EINTR: + raise + + self.update_interface() + try: + (pid, res) = os.waitpid(self.child_pid, os.WNOHANG) + if pid == self.child_pid: + break + except OSError as err: + if err.errno == errno.ECHILD: + break + if err.errno != errno.EINTR: + raise + + return res + + +class OpProgress: + """Monitor objects for operations. + + Display the progress of operations such as opening the cache.""" + + major_change, op, percent, subop = False, "", 0.0, "" + + def update(self, percent: float | None = None) -> None: + """Called periodically to update the user interface. + + You may use the optional argument 'percent' to set the attribute + 'percent' in this call. + """ + if percent is not None: + self.percent = percent + + def done(self) -> None: + """Called once an operation has been completed.""" diff --git a/apt/progress/text.py b/apt/progress/text.py new file mode 100644 index 0000000..ea1a176 --- /dev/null +++ b/apt/progress/text.py @@ -0,0 +1,294 @@ +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting for text interfaces.""" +import io +import os +import signal +import sys +import types +from collections.abc import Callable + +import apt_pkg + +from apt.progress import base + +__all__ = ["AcquireProgress", "CdromProgress", "OpProgress"] + + +def _(msg: str) -> str: + """Translate the message, also try apt if translation is missing.""" + res = apt_pkg.gettext(msg) + if res == msg: + res = apt_pkg.gettext(msg, "apt") + return res + + +class TextProgress: + """Internal Base class for text progress classes.""" + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + self._file = outfile or sys.stdout + self._width = 0 + + def _write(self, msg: str, newline: bool = True, maximize: bool = False) -> None: + """Write the message on the terminal, fill remaining space.""" + self._file.write("\r") + self._file.write(msg) + + # Fill remaining stuff with whitespace + if self._width > len(msg): + self._file.write((self._width - len(msg)) * " ") + elif maximize: # Needed for OpProgress. + self._width = max(self._width, len(msg)) + if newline: + self._file.write("\n") + else: + # self._file.write("\r") + self._file.flush() + + +class OpProgress(base.OpProgress, TextProgress): + """Operation progress reporting. + + This closely resembles OpTextProgress in libapt-pkg. + """ + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + TextProgress.__init__(self, outfile) + base.OpProgress.__init__(self) + self.old_op = "" + + def update(self, percent: float | None = None) -> None: + """Called periodically to update the user interface.""" + base.OpProgress.update(self, percent) + if self.major_change and self.old_op: + self._write(self.old_op) + self._write("%s... %i%%\r" % (self.op, self.percent), False, True) + self.old_op = self.op + + def done(self) -> None: + """Called once an operation has been completed.""" + base.OpProgress.done(self) + if self.old_op: + self._write(_("%c%s... Done") % ("\r", self.old_op), True, True) + self.old_op = "" + + +class AcquireProgress(base.AcquireProgress, TextProgress): + """AcquireProgress for the text interface.""" + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + TextProgress.__init__(self, outfile) + base.AcquireProgress.__init__(self) + self._signal: ( + Callable[[int, types.FrameType | None], None] | int | signal.Handlers | None + ) = None # noqa + self._width = 80 + self._id = 1 + + def start(self) -> None: + """Start an Acquire progress. + + In this case, the function sets up a signal handler for SIGWINCH, i.e. + window resize signals. And it also sets id to 1. + """ + base.AcquireProgress.start(self) + self._signal = signal.signal(signal.SIGWINCH, self._winch) + # Get the window size. + self._winch() + self._id = 1 + + def _winch(self, *dummy: object) -> None: + """Signal handler for window resize signals.""" + if hasattr(self._file, "fileno") and os.isatty(self._file.fileno()): + import fcntl + import struct + import termios + + buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * b" ") # noqa + dummy, col, dummy, dummy = struct.unpack("hhhh", buf) + self._width = col - 1 # 1 for the cursor + + def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when an item is update (e.g. not modified on the server).""" + base.AcquireProgress.ims_hit(self, item) + line = _("Hit ") + item.description + if item.owner.filesize: + line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize) + self._write(line) + + def fail(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when an item is failed.""" + base.AcquireProgress.fail(self, item) + if item.owner.status == item.owner.STAT_DONE: + self._write(_("Ign ") + item.description) + else: + self._write(_("Err ") + item.description) + self._write(" %s" % item.owner.error_text) + + def fetch(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when some of the item's data is fetched.""" + base.AcquireProgress.fetch(self, item) + # It's complete already (e.g. Hit) + if item.owner.complete: + return + item.owner.id = self._id + self._id += 1 + line = _("Get:") + f"{item.owner.id} {item.description}" + if item.owner.filesize: + line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize) + + self._write(line) + + def pulse(self, owner: apt_pkg.Acquire) -> bool: + """Periodically invoked while the Acquire process is underway. + + Return False if the user asked to cancel the whole Acquire process.""" + base.AcquireProgress.pulse(self, owner) + # only show progress on a tty to not clutter log files etc + if hasattr(self._file, "fileno") and not os.isatty(self._file.fileno()): + return True + + # calculate progress + percent = ((self.current_bytes + self.current_items) * 100.0) / float( + self.total_bytes + self.total_items + ) + + shown = False + tval = "%i%%" % percent + end = "" + if self.current_cps: + eta = int(float(self.total_bytes - self.current_bytes) / self.current_cps) + end = " {}B/s {}".format( + apt_pkg.size_to_str(self.current_cps), + apt_pkg.time_to_str(eta), + ) + + for worker in owner.workers: + val = "" + if not worker.current_item: + if worker.status: + val = " [%s]" % worker.status + if len(tval) + len(val) + len(end) >= self._width: + break + tval += val + shown = True + continue + shown = True + + if worker.current_item.owner.id: + val += " [%i %s" % ( + worker.current_item.owner.id, + worker.current_item.shortdesc, + ) + else: + val += " [%s" % worker.current_item.description + if worker.current_item.owner.active_subprocess: + val += " %s" % worker.current_item.owner.active_subprocess + + val += " %sB" % apt_pkg.size_to_str(worker.current_size) + + # Add the total size and percent + if worker.total_size and not worker.current_item.owner.complete: + val += "/%sB %i%%" % ( + apt_pkg.size_to_str(worker.total_size), + worker.current_size * 100.0 / worker.total_size, + ) + + val += "]" + + if len(tval) + len(val) + len(end) >= self._width: + # Display as many items as screen width + break + else: + tval += val + + if not shown: + tval += _(" [Working]") + + if self.current_cps: + tval += (self._width - len(end) - len(tval)) * " " + end + + self._write(tval, False) + return True + + def media_change(self, medium: str, drive: str) -> bool: + """Prompt the user to change the inserted removable media.""" + base.AcquireProgress.media_change(self, medium, drive) + self._write( + _( + "Media change: please insert the disc labeled\n" + " '%s'\n" + "in the drive '%s' and press enter\n" + ) + % (medium, drive) + ) + return input() not in ("c", "C") + + def stop(self) -> None: + """Invoked when the Acquire process stops running.""" + base.AcquireProgress.stop(self) + # Trick for getting a translation from apt + self._write( + ( + _("Fetched %sB in %s (%sB/s)\n") + % ( + apt_pkg.size_to_str(self.fetched_bytes), + apt_pkg.time_to_str(self.elapsed_time), + apt_pkg.size_to_str(self.current_cps), + ) + ).rstrip("\n") + ) + + # Delete the signal again. + import signal + + signal.signal(signal.SIGWINCH, self._signal) + + +class CdromProgress(base.CdromProgress, TextProgress): + """Text CD-ROM progress.""" + + def ask_cdrom_name(self) -> str | None: + """Ask the user to provide a name for the disc.""" + base.CdromProgress.ask_cdrom_name(self) + self._write( + _( + "Please provide a name for this medium, such as " + "'Debian 2.1r1 Disk 1'" + ), + False, + ) + try: + return str(input(":")) + except KeyboardInterrupt: + return None + + def update(self, text: str, current: int) -> None: + """Set the current progress.""" + base.CdromProgress.update(self, text, current) + if text: + self._write(text, False) + + def change_cdrom(self) -> bool: + """Ask the user to change the CD-ROM.""" + base.CdromProgress.change_cdrom(self) + self._write(_("Please insert an installation medium and press enter"), False) + try: + return bool(input() == "") + except KeyboardInterrupt: + return False diff --git a/apt/py.typed b/apt/py.typed new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/apt/py.typed diff --git a/apt/utils.py b/apt/utils.py new file mode 100644 index 0000000..5b1fd46 --- /dev/null +++ b/apt/utils.py @@ -0,0 +1,100 @@ +# Copyright (C) 2009 Canonical +# +# Authors: +# Michael Vogt +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +import datetime +import os + +import apt_pkg + +import apt + + +def get_maintenance_end_date( + release_date: datetime.datetime, m_months: int +) -> tuple[int, int]: + """ + get the (year, month) tuple when the maintenance for the distribution + ends. Needs the data of the release and the number of months that + its is supported as input + """ + # calc end date + years = m_months // 12 + months = m_months % 12 + support_end_year = release_date.year + years + (release_date.month + months) // 12 + support_end_month = (release_date.month + months) % 12 + # special case: this happens when e.g. doing 2010-06 + 18 months + if support_end_month == 0: + support_end_month = 12 + support_end_year -= 1 + return (support_end_year, support_end_month) + + +def get_release_date_from_release_file(path: str) -> int | None: + """ + return the release date as time_t for the given release file + """ + if not path or not os.path.exists(path): + return None + + with os.fdopen(apt_pkg.open_maybe_clear_signed_file(path)) as data: + tag = apt_pkg.TagFile(data) + section = next(tag) + if "Date" not in section: + return None + date = section["Date"] + return apt_pkg.str_to_time(date) + + +def get_release_filename_for_pkg( + cache: apt.Cache, pkgname: str, label: str, release: str +) -> str | None: + "get the release file that provides this pkg" + if pkgname not in cache: + return None + pkg = cache[pkgname] + ver = None + # look for the version that comes from the repos with + # the given label and origin + for aver in pkg._pkg.version_list: + if aver is None or aver.file_list is None: + continue + for ver_file, _index in aver.file_list: + # print verFile + if ( + ver_file.origin == label + and ver_file.label == label + and ver_file.archive == release + ): + ver = aver + if not ver: + return None + indexfile = cache._list.find_index(ver.file_list[0][0]) + for metaindex in cache._list.list: + for m in metaindex.index_files: + if indexfile and indexfile.describe == m.describe and indexfile.is_trusted: + dirname = apt_pkg.config.find_dir("Dir::State::lists") + for relfile in ["InRelease", "Release"]: + name = apt_pkg.uri_to_filename( + metaindex.uri + ) + "dists_{}_{}".format( + metaindex.dist, + relfile, + ) + if os.path.exists(dirname + name): + return dirname + name + return None |