summaryrefslogtreecommitdiffstats
path: root/apt
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 18:07:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 18:07:41 +0000
commit76926159194e180003aa78de97e5f287bf4325a5 (patch)
tree2cea7245cdc3f66355900c820c145eba90598766 /apt
parentInitial commit. (diff)
downloadpython-apt-76926159194e180003aa78de97e5f287bf4325a5.tar.xz
python-apt-76926159194e180003aa78de97e5f287bf4325a5.zip
Adding upstream version 2.7.6.upstream/2.7.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'apt')
-rw-r--r--apt/__init__.py40
-rw-r--r--apt/auth.py311
-rw-r--r--apt/cache.py1004
-rw-r--r--apt/cdrom.py91
-rw-r--r--apt/debfile.py861
-rw-r--r--apt/package.py1559
-rw-r--r--apt/progress/__init__.py28
-rw-r--r--apt/progress/base.py332
-rw-r--r--apt/progress/text.py294
-rw-r--r--apt/py.typed0
-rw-r--r--apt/utils.py100
11 files changed, 4620 insertions, 0 deletions
diff --git a/apt/__init__.py b/apt/__init__.py
new file mode 100644
index 0000000..f22c9a0
--- /dev/null
+++ b/apt/__init__.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2005-2009 Canonical
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+# import the core of apt_pkg
+"""High-Level Interface for working with apt."""
+import apt_pkg
+
+from apt.cache import Cache as Cache
+from apt.cache import ProblemResolver as ProblemResolver
+
+# import some fancy classes
+from apt.package import Package as Package
+from apt.package import Version as Version
+
+Cache # pyflakes
+ProblemResolver # pyflakes
+Version # pyflakes
+from apt.cdrom import Cdrom as Cdrom
+
+# init the package system, but do not re-initialize config
+if "APT" not in apt_pkg.config:
+ apt_pkg.init_config()
+apt_pkg.init_system()
+
+__all__ = ["Cache", "Cdrom", "Package"]
diff --git a/apt/auth.py b/apt/auth.py
new file mode 100644
index 0000000..6d50616
--- /dev/null
+++ b/apt/auth.py
@@ -0,0 +1,311 @@
+#!/usr/bin/python3
+# auth - authentication key management
+#
+# Copyright (c) 2004 Canonical
+# Copyright (c) 2012 Sebastian Heinlein
+#
+# Author: Michael Vogt <mvo@debian.org>
+# Sebastian Heinlein <devel@glatzor.de>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Handle GnuPG keys used to trust signed repositories."""
+
+import errno
+import os
+import os.path
+import shutil
+import subprocess
+import sys
+import tempfile
+
+import apt_pkg
+from apt_pkg import gettext as _
+
+
+class AptKeyError(Exception):
+ pass
+
+
+class AptKeyIDTooShortError(AptKeyError):
+ """Internal class do not rely on it."""
+
+
+class TrustedKey:
+
+ """Represents a trusted key."""
+
+ def __init__(self, name: str, keyid: str, date: str) -> None:
+ self.raw_name = name
+ # Allow to translated some known keys
+ self.name = _(name)
+ self.keyid = keyid
+ self.date = date
+
+ def __str__(self) -> str:
+ return f"{self.name}\n{self.keyid} {self.date}"
+
+
+def _call_apt_key_script(*args: str, **kwargs: str | None) -> str:
+ """Run the apt-key script with the given arguments."""
+ conf = None
+ cmd = [apt_pkg.config.find_file("Dir::Bin::Apt-Key", "/usr/bin/apt-key")]
+ cmd.extend(args)
+ env = os.environ.copy()
+ env["LANG"] = "C"
+ env["APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE"] = "1"
+ try:
+ if apt_pkg.config.find_dir("Dir") != "/":
+ # If the key is to be installed into a chroot we have to export the
+ # configuration from the chroot to the apt-key script by using
+ # a temporary APT_CONFIG file. The apt-key script uses apt-config
+ # shell internally
+ conf = tempfile.NamedTemporaryFile(prefix="apt-key", suffix=".conf")
+ conf.write(apt_pkg.config.dump().encode("UTF-8"))
+ conf.flush()
+ env["APT_CONFIG"] = conf.name
+ proc = subprocess.Popen(
+ cmd,
+ env=env,
+ universal_newlines=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+
+ stdin = kwargs.get("stdin", None)
+
+ output, stderr = proc.communicate(stdin) # type: str, str
+
+ if proc.returncode:
+ raise AptKeyError(
+ "The apt-key script failed with return code %s:\n"
+ "%s\n"
+ "stdout: %s\n"
+ "stderr: %s" % (proc.returncode, " ".join(cmd), output, stderr)
+ )
+ elif stderr:
+ sys.stderr.write(stderr) # Forward stderr
+
+ return output.strip()
+ finally:
+ if conf is not None:
+ conf.close()
+
+
+def add_key_from_file(filename: str) -> None:
+ """Import a GnuPG key file to trust repositores signed by it.
+
+ Keyword arguments:
+ filename -- the absolute path to the public GnuPG key file
+ """
+ if not os.path.abspath(filename):
+ raise AptKeyError("An absolute path is required: %s" % filename)
+ if not os.access(filename, os.R_OK):
+ raise AptKeyError("Key file cannot be accessed: %s" % filename)
+ _call_apt_key_script("add", filename)
+
+
+def add_key_from_keyserver(keyid: str, keyserver: str) -> None:
+ """Import a GnuPG key file to trust repositores signed by it.
+
+ Keyword arguments:
+ keyid -- the long keyid (fingerprint) of the key, e.g.
+ A1BD8E9D78F7FE5C3E65D8AF8B48AD6246925553
+ keyserver -- the URL or hostname of the key server
+ """
+ tmp_keyring_dir = tempfile.mkdtemp()
+ try:
+ _add_key_from_keyserver(keyid, keyserver, tmp_keyring_dir)
+ except Exception:
+ raise
+ finally:
+ # We are racing with gpg when removing sockets, so ignore
+ # failure to delete non-existing files.
+ def onerror(
+ func: object, path: str, exc_info: tuple[type, Exception, object]
+ ) -> None:
+ if isinstance(exc_info[1], OSError) and exc_info[1].errno == errno.ENOENT:
+ return
+ raise
+
+ shutil.rmtree(tmp_keyring_dir, onerror=onerror)
+
+
+def _add_key_from_keyserver(keyid: str, keyserver: str, tmp_keyring_dir: str) -> None:
+ if len(keyid.replace(" ", "").replace("0x", "")) < (160 / 4):
+ raise AptKeyIDTooShortError("Only fingerprints (v4, 160bit) are supported")
+ # create a temp keyring dir
+ tmp_secret_keyring = os.path.join(tmp_keyring_dir, "secring.gpg")
+ tmp_keyring = os.path.join(tmp_keyring_dir, "pubring.gpg")
+ # default options for gpg
+ gpg_default_options = [
+ "gpg",
+ "--no-default-keyring",
+ "--no-options",
+ "--homedir",
+ tmp_keyring_dir,
+ ]
+ # download the key to a temp keyring first
+ res = subprocess.call(
+ gpg_default_options
+ + [
+ "--secret-keyring",
+ tmp_secret_keyring,
+ "--keyring",
+ tmp_keyring,
+ "--keyserver",
+ keyserver,
+ "--recv",
+ keyid,
+ ]
+ )
+ if res != 0:
+ raise AptKeyError(f"recv from '{keyserver}' failed for '{keyid}'")
+ # FIXME:
+ # - with gnupg 1.4.18 the downloaded key is actually checked(!),
+ # i.e. gnupg will not import anything that the server sends
+ # into the keyring, so the below checks are now redundant *if*
+ # gnupg 1.4.18 is used
+
+ # now export again using the long key id (to ensure that there is
+ # really only this one key in our keyring) and not someone MITM us
+ tmp_export_keyring = os.path.join(tmp_keyring_dir, "export-keyring.gpg")
+ res = subprocess.call(
+ gpg_default_options
+ + [
+ "--keyring",
+ tmp_keyring,
+ "--output",
+ tmp_export_keyring,
+ "--export",
+ keyid,
+ ]
+ )
+ if res != 0:
+ raise AptKeyError("export of '%s' failed", keyid)
+ # now verify the fingerprint, this is probably redundant as we
+ # exported by the fingerprint in the previous command but its
+ # still good paranoia
+ output = subprocess.Popen(
+ gpg_default_options
+ + [
+ "--keyring",
+ tmp_export_keyring,
+ "--fingerprint",
+ "--batch",
+ "--fixed-list-mode",
+ "--with-colons",
+ ],
+ stdout=subprocess.PIPE,
+ universal_newlines=True,
+ ).communicate()[0]
+ got_fingerprint = None
+ for line in output.splitlines():
+ if line.startswith("fpr:"):
+ got_fingerprint = line.split(":")[9]
+ # stop after the first to ensure no subkey trickery
+ break
+ # strip the leading "0x" is there is one and uppercase (as this is
+ # what gnupg is using)
+ signing_key_fingerprint = keyid.replace("0x", "").upper()
+ if got_fingerprint != signing_key_fingerprint:
+ # make the error match what gnupg >= 1.4.18 will output when
+ # it checks the key itself before importing it
+ raise AptKeyError(
+ f"recv from '{keyserver}' failed for '{signing_key_fingerprint}'"
+ )
+ # finally add it
+ add_key_from_file(tmp_export_keyring)
+
+
+def add_key(content: str) -> None:
+ """Import a GnuPG key to trust repositores signed by it.
+
+ Keyword arguments:
+ content -- the content of the GnuPG public key
+ """
+ _call_apt_key_script("adv", "--quiet", "--batch", "--import", "-", stdin=content)
+
+
+def remove_key(fingerprint: str) -> None:
+ """Remove a GnuPG key to no longer trust repositores signed by it.
+
+ Keyword arguments:
+ fingerprint -- the fingerprint identifying the key
+ """
+ _call_apt_key_script("rm", fingerprint)
+
+
+def export_key(fingerprint: str) -> str:
+ """Return the GnuPG key in text format.
+
+ Keyword arguments:
+ fingerprint -- the fingerprint identifying the key
+ """
+ return _call_apt_key_script("export", fingerprint)
+
+
+def update() -> str:
+ """Update the local keyring with the archive keyring and remove from
+ the local keyring the archive keys which are no longer valid. The
+ archive keyring is shipped in the archive-keyring package of your
+ distribution, e.g. the debian-archive-keyring package in Debian.
+ """
+ return _call_apt_key_script("update")
+
+
+def net_update() -> str:
+ """Work similar to the update command above, but get the archive
+ keyring from an URI instead and validate it against a master key.
+ This requires an installed wget(1) and an APT build configured to
+ have a server to fetch from and a master keyring to validate. APT
+ in Debian does not support this command and relies on update
+ instead, but Ubuntu's APT does.
+ """
+ return _call_apt_key_script("net-update")
+
+
+def list_keys() -> list[TrustedKey]:
+ """Returns a list of TrustedKey instances for each key which is
+ used to trust repositories.
+ """
+ # The output of `apt-key list` is difficult to parse since the
+ # --with-colons parameter isn't user
+ output = _call_apt_key_script(
+ "adv", "--with-colons", "--batch", "--fixed-list-mode", "--list-keys"
+ )
+ res = []
+ for line in output.split("\n"):
+ fields = line.split(":")
+ if fields[0] == "pub":
+ keyid = fields[4]
+ if fields[0] == "uid":
+ uid = fields[9]
+ creation_date = fields[5]
+ key = TrustedKey(uid, keyid, creation_date)
+ res.append(key)
+ return res
+
+
+if __name__ == "__main__":
+ # Add some known keys we would like to see translated so that they get
+ # picked up by gettext
+ lambda: _("Ubuntu Archive Automatic Signing Key <ftpmaster@ubuntu.com>")
+ lambda: _("Ubuntu CD Image Automatic Signing Key <cdimage@ubuntu.com>")
+
+ apt_pkg.init()
+ for trusted_key in list_keys():
+ print(trusted_key)
diff --git a/apt/cache.py b/apt/cache.py
new file mode 100644
index 0000000..cf78026
--- /dev/null
+++ b/apt/cache.py
@@ -0,0 +1,1004 @@
+# cache.py - apt cache abstraction
+#
+# Copyright (c) 2005-2009 Canonical
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+from __future__ import annotations
+
+import fnmatch
+import os
+import warnings
+import weakref
+from collections.abc import Callable, Iterator, KeysView
+from typing import Any, cast
+
+import apt_pkg
+
+import apt.progress.text
+from apt.package import Package, Version
+from apt.progress.base import AcquireProgress, InstallProgress, OpProgress
+
+
+class FetchCancelledException(IOError):
+ """Exception that is thrown when the user cancels a fetch operation."""
+
+
+class FetchFailedException(IOError):
+ """Exception that is thrown when fetching fails."""
+
+
+class UntrustedException(FetchFailedException):
+ """Exception that is thrown when fetching fails for trust reasons"""
+
+
+class LockFailedException(IOError):
+ """Exception that is thrown when locking fails."""
+
+
+class CacheClosedException(Exception):
+ """Exception that is thrown when the cache is used after close()."""
+
+
+class _WrappedLock:
+ """Wraps an apt_pkg.FileLock to raise LockFailedException.
+
+ Initialized using a directory path."""
+
+ def __init__(self, path: str) -> None:
+ self._path = path
+ self._lock = apt_pkg.FileLock(os.path.join(path, "lock"))
+
+ def __enter__(self) -> None:
+ try:
+ return self._lock.__enter__()
+ except apt_pkg.Error as e:
+ raise LockFailedException(
+ ("Failed to lock directory %s: %s") % (self._path, e)
+ )
+
+ def __exit__(self, typ: object, value: object, traceback: object) -> None:
+ return self._lock.__exit__(typ, value, traceback)
+
+
+class Cache:
+ """Dictionary-like package cache.
+
+ The APT cache file contains a hash table mapping names of binary
+ packages to their metadata. A Cache object is the in-core
+ representation of the same. It provides access to APTs idea of the
+ list of available packages.
+
+ The cache can be used like a mapping from package names to Package
+ objects (although only getting items is supported).
+
+ Keyword arguments:
+ progress -- a OpProgress object,
+ rootdir -- an alternative root directory. if that is given the system
+ sources.list and system lists/files are not read, only file relative
+ to the given rootdir,
+ memonly -- build the cache in memory only.
+
+
+ .. versionchanged:: 1.0
+
+ The cache now supports package names with special architecture
+ qualifiers such as :all and :native. It does not export them
+ in :meth:`keys()`, though, to keep :meth:`keys()` a unique set.
+ """
+
+ def __init__(
+ self,
+ progress: OpProgress | None = None,
+ rootdir: str | None = None,
+ memonly: bool = False,
+ ) -> None:
+ self._cache: apt_pkg.Cache = cast(apt_pkg.Cache, None)
+ self._depcache: apt_pkg.DepCache = cast(apt_pkg.DepCache, None)
+ self._records: apt_pkg.PackageRecords = cast(
+ apt_pkg.PackageRecords, None
+ ) # noqa
+ self._list: apt_pkg.SourceList = cast(apt_pkg.SourceList, None)
+ self._callbacks: dict[str, list[Callable[..., None] | str]] = {} # noqa
+ self._callbacks2: dict[
+ str, list[tuple[Callable[..., Any], tuple[Any, ...], dict[Any, Any]]]
+ ] = {} # noqa
+ self._weakref: weakref.WeakValueDictionary[
+ str, apt.Package
+ ] = weakref.WeakValueDictionary() # noqa
+ self._weakversions: weakref.WeakSet[Version] = weakref.WeakSet() # noqa
+ self._changes_count = -1
+ self._sorted_set: list[str] | None = None
+
+ self.connect("cache_post_open", "_inc_changes_count")
+ self.connect("cache_post_change", "_inc_changes_count")
+ if memonly:
+ # force apt to build its caches in memory
+ apt_pkg.config.set("Dir::Cache::pkgcache", "")
+ if rootdir:
+ rootdir = os.path.abspath(rootdir)
+ if os.path.exists(rootdir + "/etc/apt/apt.conf"):
+ apt_pkg.read_config_file(apt_pkg.config, rootdir + "/etc/apt/apt.conf")
+ if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"):
+ apt_pkg.read_config_dir(apt_pkg.config, rootdir + "/etc/apt/apt.conf.d")
+ apt_pkg.config.set("Dir", rootdir)
+ apt_pkg.config.set("Dir::State::status", rootdir + "/var/lib/dpkg/status")
+ # also set dpkg to the rootdir path so that its called for the
+ # --print-foreign-architectures call
+ apt_pkg.config.set(
+ "Dir::bin::dpkg", os.path.join(rootdir, "usr", "bin", "dpkg")
+ )
+ # create required dirs/files when run with special rootdir
+ # automatically
+ self._check_and_create_required_dirs(rootdir)
+ # Call InitSystem so the change to Dir::State::Status is actually
+ # recognized (LP: #320665)
+ apt_pkg.init_system()
+
+ # Prepare a lock object (context manager for archive lock)
+ archive_dir = apt_pkg.config.find_dir("Dir::Cache::Archives")
+ self._archive_lock = _WrappedLock(archive_dir)
+
+ self.open(progress)
+
+ def fix_broken(self) -> None:
+ """Fix broken packages."""
+ self._depcache.fix_broken()
+
+ def _inc_changes_count(self) -> None:
+ """Increase the number of changes"""
+ self._changes_count += 1
+
+ def _check_and_create_required_dirs(self, rootdir: str) -> None:
+ """
+ check if the required apt directories/files are there and if
+ not create them
+ """
+ files = [
+ "/var/lib/dpkg/status",
+ "/etc/apt/sources.list",
+ ]
+ dirs = [
+ "/var/lib/dpkg",
+ "/etc/apt/",
+ "/var/cache/apt/archives/partial",
+ "/var/lib/apt/lists/partial",
+ ]
+ for d in dirs:
+ if not os.path.exists(rootdir + d):
+ # print "creating: ", rootdir + d
+ os.makedirs(rootdir + d)
+ for f in files:
+ if not os.path.exists(rootdir + f):
+ open(rootdir + f, "w").close()
+
+ def _run_callbacks(self, name: str) -> None:
+ """internal helper to run a callback"""
+ if name in self._callbacks:
+ for callback in self._callbacks[name]:
+ if callback == "_inc_changes_count":
+ self._inc_changes_count()
+ else:
+ callback() # type: ignore
+
+ if name in self._callbacks2:
+ for callback, args, kwds in self._callbacks2[name]:
+ callback(self, *args, **kwds)
+
+ def open(self, progress: OpProgress | None = None) -> None:
+ """Open the package cache, after that it can be used like
+ a dictionary
+ """
+ if progress is None:
+ progress = apt.progress.base.OpProgress()
+ # close old cache on (re)open
+ self.close()
+ self.op_progress = progress
+ self._run_callbacks("cache_pre_open")
+
+ self._cache = apt_pkg.Cache(progress)
+ self._depcache = apt_pkg.DepCache(self._cache)
+ self._records = apt_pkg.PackageRecords(self._cache)
+ self._list = apt_pkg.SourceList()
+ self._list.read_main_list()
+ self._sorted_set = None
+ self.__remap()
+
+ self._have_multi_arch = len(apt_pkg.get_architectures()) > 1
+
+ progress.done()
+ self._run_callbacks("cache_post_open")
+
+ def __remap(self) -> None:
+ """Called after cache reopen() to relocate to new cache.
+
+ Relocate objects like packages and versions from the old
+ underlying cache to the new one.
+ """
+ for key in list(self._weakref.keys()):
+ try:
+ pkg = self._weakref[key]
+ except KeyError:
+ continue
+
+ try:
+ pkg._pkg = self._cache[pkg._pkg.name, pkg._pkg.architecture]
+ except LookupError:
+ del self._weakref[key]
+
+ for ver in list(self._weakversions):
+ # Package has been reseated above, reseat version
+ for v in ver.package._pkg.version_list:
+ # Requirements as in debListParser::SameVersion
+ if (
+ v.hash == ver._cand.hash
+ and (v.size == 0 or ver._cand.size == 0 or v.size == ver._cand.size)
+ and v.multi_arch == ver._cand.multi_arch
+ and v.ver_str == ver._cand.ver_str
+ ):
+ ver._cand = v
+ break
+ else:
+ self._weakversions.remove(ver)
+
+ def close(self) -> None:
+ """Close the package cache"""
+ # explicitely free the FDs that _records has open
+ del self._records
+ self._records = cast(apt_pkg.PackageRecords, None)
+
+ def __enter__(self) -> Cache:
+ """Enter the with statement"""
+ return self
+
+ def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None:
+ """Exit the with statement"""
+ self.close()
+
+ def __getitem__(self, key: object) -> Package:
+ """look like a dictionary (get key)"""
+ try:
+ key = str(key)
+ rawpkg = self._cache[key]
+ except KeyError:
+ raise KeyError("The cache has no package named %r" % key)
+
+ # It might be excluded due to not having a version or something
+ if not self.__is_real_pkg(rawpkg):
+ raise KeyError("The cache has no package named %r" % key)
+
+ pkg = self._rawpkg_to_pkg(rawpkg)
+
+ return pkg
+
+ def get(self, key: object, default: object = None) -> Any:
+ """Return *self*[*key*] or *default* if *key* not in *self*.
+
+ .. versionadded:: 1.1
+ """
+ try:
+ return self[key]
+ except KeyError:
+ return default
+
+ def _rawpkg_to_pkg(self, rawpkg: apt_pkg.Package) -> Package:
+ """Returns the apt.Package object for an apt_pkg.Package object.
+
+ .. versionadded:: 1.0.0
+ """
+ fullname = rawpkg.get_fullname(pretty=True)
+
+ return self._weakref.setdefault(fullname, Package(self, rawpkg))
+
+ def __iter__(self) -> Iterator[Package]:
+ # We iterate sorted over package names here. With this we read the
+ # package lists linearly if we need to access the package records,
+ # instead of having to do thousands of random seeks; the latter
+ # is disastrous if we use compressed package indexes, and slower than
+ # necessary for uncompressed indexes.
+ for pkgname in self.keys():
+ pkg = Package(self, self._cache[pkgname])
+ yield self._weakref.setdefault(pkgname, pkg)
+
+ def __is_real_pkg(self, rawpkg: apt_pkg.Package) -> bool:
+ """Check if the apt_pkg.Package provided is a real package."""
+ return rawpkg.has_versions
+
+ def has_key(self, key: object) -> bool:
+ return key in self
+
+ def __contains__(self, key: object) -> bool:
+ try:
+ return self.__is_real_pkg(self._cache[str(key)])
+ except KeyError:
+ return False
+
+ def __len__(self) -> int:
+ return len(self.keys())
+
+ def keys(self) -> list[str]:
+ if self._sorted_set is None:
+ self._sorted_set = sorted(
+ p.get_fullname(pretty=True)
+ for p in self._cache.packages
+ if self.__is_real_pkg(p)
+ )
+ return list(self._sorted_set) # We need a copy here, caller may modify
+
+ def get_changes(self) -> list[Package]:
+ """Get the marked changes"""
+ changes = []
+ marked_keep = self._depcache.marked_keep
+ for rawpkg in self._cache.packages:
+ if not marked_keep(rawpkg):
+ changes.append(self._rawpkg_to_pkg(rawpkg))
+ return changes
+
+ def upgrade(self, dist_upgrade: bool = False) -> None:
+ """Upgrade all packages.
+
+ If the parameter *dist_upgrade* is True, new dependencies will be
+ installed as well (and conflicting packages may be removed). The
+ default value is False.
+ """
+ self.cache_pre_change()
+ self._depcache.upgrade(dist_upgrade)
+ self.cache_post_change()
+
+ @property
+ def required_download(self) -> int:
+ """Get the size of the packages that are required to download."""
+ if self._records is None:
+ raise CacheClosedException("Cache object used after close() called")
+ pm = apt_pkg.PackageManager(self._depcache)
+ fetcher = apt_pkg.Acquire()
+ pm.get_archives(fetcher, self._list, self._records)
+ return fetcher.fetch_needed
+
+ @property
+ def required_space(self) -> int:
+ """Get the size of the additional required space on the fs."""
+ return self._depcache.usr_size
+
+ @property
+ def req_reinstall_pkgs(self) -> set[str]:
+ """Return the packages not downloadable packages in reqreinst state."""
+ reqreinst = set()
+ get_candidate_ver = self._depcache.get_candidate_ver
+ states = frozenset(
+ (apt_pkg.INSTSTATE_REINSTREQ, apt_pkg.INSTSTATE_HOLD_REINSTREQ)
+ )
+ for pkg in self._cache.packages:
+ cand = get_candidate_ver(pkg)
+ if cand and not cand.downloadable and pkg.inst_state in states:
+ reqreinst.add(pkg.get_fullname(pretty=True))
+ return reqreinst
+
+ def _run_fetcher(
+ self, fetcher: apt_pkg.Acquire, allow_unauthenticated: bool | None
+ ) -> int:
+ if allow_unauthenticated is None:
+ allow_unauthenticated = apt_pkg.config.find_b(
+ "APT::Get::" "AllowUnauthenticated", False
+ )
+
+ untrusted = [item for item in fetcher.items if not item.is_trusted]
+ if untrusted and not allow_unauthenticated:
+ raise UntrustedException(
+ "Untrusted packages:\n%s" % "\n".join(i.desc_uri for i in untrusted)
+ )
+
+ # do the actual fetching
+ res = fetcher.run()
+
+ # now check the result (this is the code from apt-get.cc)
+ failed = False
+ err_msg = ""
+ for item in fetcher.items:
+ if item.status == item.STAT_DONE:
+ continue
+ if item.STAT_IDLE:
+ continue
+ err_msg += f"Failed to fetch {item.desc_uri} {item.error_text}\n"
+ failed = True
+
+ # we raise a exception if the download failed or it was cancelt
+ if res == fetcher.RESULT_CANCELLED:
+ raise FetchCancelledException(err_msg)
+ elif failed:
+ raise FetchFailedException(err_msg)
+ return res
+
+ def _fetch_archives(
+ self,
+ fetcher: apt_pkg.Acquire,
+ pm: apt_pkg.PackageManager,
+ allow_unauthenticated: bool | None = None,
+ ) -> int:
+ """fetch the needed archives"""
+ if self._records is None:
+ raise CacheClosedException("Cache object used after close() called")
+
+ # this may as well throw a SystemError exception
+ if not pm.get_archives(fetcher, self._list, self._records):
+ return False
+
+ # now run the fetcher, throw exception if something fails to be
+ # fetched
+ return self._run_fetcher(fetcher, allow_unauthenticated)
+
+ def fetch_archives(
+ self,
+ progress: AcquireProgress | None = None,
+ fetcher: apt_pkg.Acquire | None = None,
+ allow_unauthenticated: bool | None = None,
+ ) -> int:
+ """Fetch the archives for all packages marked for install/upgrade.
+
+ You can specify either an :class:`apt.progress.base.AcquireProgress()`
+ object for the parameter *progress*, or specify an already
+ existing :class:`apt_pkg.Acquire` object for the parameter *fetcher*.
+
+ The return value of the function is undefined. If an error occurred,
+ an exception of type :class:`FetchFailedException` or
+ :class:`FetchCancelledException` is raised.
+
+ The keyword-only parameter *allow_unauthenticated* specifies whether
+ to allow unauthenticated downloads. If not specified, it defaults to
+ the configuration option `APT::Get::AllowUnauthenticated`.
+
+ .. versionadded:: 0.8.0
+ """
+ if progress is not None and fetcher is not None:
+ raise ValueError("Takes a progress or a an Acquire object")
+ if progress is None:
+ progress = apt.progress.text.AcquireProgress()
+ if fetcher is None:
+ fetcher = apt_pkg.Acquire(progress)
+
+ with self._archive_lock:
+ return self._fetch_archives(
+ fetcher, apt_pkg.PackageManager(self._depcache), allow_unauthenticated
+ )
+
+ def is_virtual_package(self, pkgname: str) -> bool:
+ """Return whether the package is a virtual package."""
+ try:
+ pkg = self._cache[pkgname]
+ except KeyError:
+ return False
+ else:
+ return bool(pkg.has_provides and not pkg.has_versions)
+
+ def get_providing_packages(
+ self,
+ pkgname: str,
+ candidate_only: bool = True,
+ include_nonvirtual: bool = False,
+ ) -> list[Package]:
+ """Return a list of all packages providing a package.
+
+ Return a list of packages which provide the virtual package of the
+ specified name.
+
+ If 'candidate_only' is False, return all packages with at
+ least one version providing the virtual package. Otherwise,
+ return only those packages where the candidate version
+ provides the virtual package.
+
+ If 'include_nonvirtual' is True then it will search for all
+ packages providing pkgname, even if pkgname is not itself
+ a virtual pkg.
+ """
+
+ providers: set[Package] = set()
+ get_candidate_ver = self._depcache.get_candidate_ver
+ try:
+ vp = self._cache[pkgname]
+ if vp.has_versions and not include_nonvirtual:
+ return list(providers)
+ except KeyError:
+ return list(providers)
+
+ for provides, providesver, version in vp.provides_list:
+ rawpkg = version.parent_pkg
+ if not candidate_only or (version == get_candidate_ver(rawpkg)):
+ providers.add(self._rawpkg_to_pkg(rawpkg))
+ return list(providers)
+
+ def update(
+ self,
+ fetch_progress: AcquireProgress | None = None,
+ pulse_interval: int = 0,
+ raise_on_error: bool = True,
+ sources_list: str | None = None,
+ ) -> int:
+ """Run the equivalent of apt-get update.
+
+ You probably want to call open() afterwards, in order to utilise the
+ new cache. Otherwise, the old cache will be used which can lead to
+ strange bugs.
+
+ The first parameter *fetch_progress* may be set to an instance of
+ apt.progress.FetchProgress, the default is apt.progress.FetchProgress()
+ .
+ sources_list -- Update a alternative sources.list than the default.
+ Note that the sources.list.d directory is ignored in this case
+ """
+ with _WrappedLock(apt_pkg.config.find_dir("Dir::State::Lists")):
+ if sources_list:
+ old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist")
+ old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts")
+ old_cleanup = apt_pkg.config.find("APT::List-Cleanup")
+ apt_pkg.config.set(
+ "Dir::Etc::sourcelist", os.path.abspath(sources_list)
+ )
+ apt_pkg.config.set("Dir::Etc::sourceparts", "xxx")
+ apt_pkg.config.set("APT::List-Cleanup", "0")
+ slist = apt_pkg.SourceList()
+ slist.read_main_list()
+ else:
+ slist = self._list
+
+ try:
+ if fetch_progress is None:
+ fetch_progress = apt.progress.base.AcquireProgress()
+ try:
+ res = self._cache.update(fetch_progress, slist, pulse_interval)
+ except SystemError as e:
+ raise FetchFailedException(e)
+ if not res and raise_on_error:
+ raise FetchFailedException()
+ else:
+ return res
+ finally:
+ if sources_list:
+ apt_pkg.config.set("Dir::Etc::sourcelist", old_sources_list)
+ apt_pkg.config.set("Dir::Etc::sourceparts", old_sources_list_d)
+ apt_pkg.config.set("APT::List-Cleanup", old_cleanup)
+
+ def install_archives(
+ self, pm: apt_pkg.PackageManager, install_progress: InstallProgress
+ ) -> int:
+ """
+ The first parameter *pm* refers to an object returned by
+ apt_pkg.PackageManager().
+
+ The second parameter *install_progress* refers to an InstallProgress()
+ object of the module apt.progress.
+
+ This releases a system lock in newer versions, if there is any,
+ and reestablishes it afterwards.
+ """
+ # compat with older API
+ try:
+ install_progress.startUpdate() # type: ignore
+ except AttributeError:
+ install_progress.start_update()
+
+ did_unlock = apt_pkg.pkgsystem_is_locked()
+ if did_unlock:
+ apt_pkg.pkgsystem_unlock_inner()
+
+ try:
+ res = install_progress.run(pm)
+ finally:
+ if did_unlock:
+ apt_pkg.pkgsystem_lock_inner()
+
+ try:
+ install_progress.finishUpdate() # type: ignore
+ except AttributeError:
+ install_progress.finish_update()
+ return res
+
+ def commit(
+ self,
+ fetch_progress: AcquireProgress | None = None,
+ install_progress: InstallProgress | None = None,
+ allow_unauthenticated: bool | None = None,
+ ) -> bool:
+ """Apply the marked changes to the cache.
+
+ The first parameter, *fetch_progress*, refers to a FetchProgress()
+ object as found in apt.progress, the default being
+ apt.progress.FetchProgress().
+
+ The second parameter, *install_progress*, is a
+ apt.progress.InstallProgress() object.
+
+ The keyword-only parameter *allow_unauthenticated* specifies whether
+ to allow unauthenticated downloads. If not specified, it defaults to
+ the configuration option `APT::Get::AllowUnauthenticated`.
+ """
+ # FIXME:
+ # use the new acquire/pkgmanager interface here,
+ # raise exceptions when a download or install fails
+ # and send proper error strings to the application.
+ # Current a failed download will just display "error"
+ # which is less than optimal!
+
+ if fetch_progress is None:
+ fetch_progress = apt.progress.base.AcquireProgress()
+ if install_progress is None:
+ install_progress = apt.progress.base.InstallProgress()
+
+ assert install_progress is not None
+
+ with apt_pkg.SystemLock():
+ pm = apt_pkg.PackageManager(self._depcache)
+ fetcher = apt_pkg.Acquire(fetch_progress)
+ with self._archive_lock:
+ while True:
+ # fetch archives first
+ res = self._fetch_archives(fetcher, pm, allow_unauthenticated)
+
+ # then install
+ res = self.install_archives(pm, install_progress)
+ if res == pm.RESULT_COMPLETED:
+ break
+ elif res == pm.RESULT_FAILED:
+ raise SystemError("installArchives() failed")
+ elif res == pm.RESULT_INCOMPLETE:
+ pass
+ else:
+ raise SystemError(
+ "internal-error: unknown result "
+ "code from InstallArchives: %s" % res
+ )
+ # reload the fetcher for media swaping
+ fetcher.shutdown()
+ return res == pm.RESULT_COMPLETED
+
+ def clear(self) -> None:
+ """Unmark all changes"""
+ self._depcache.init()
+
+ # cache changes
+
+ def cache_post_change(self) -> None:
+ "called internally if the cache has changed, emit a signal then"
+ self._run_callbacks("cache_post_change")
+
+ def cache_pre_change(self) -> None:
+ """called internally if the cache is about to change, emit
+ a signal then"""
+ self._run_callbacks("cache_pre_change")
+
+ def connect(self, name: str, callback: Callable[..., None] | str) -> None:
+ """Connect to a signal.
+
+ .. deprecated:: 1.0
+
+ Please use connect2() instead, as this function is very
+ likely to cause a memory leak.
+ """
+ if callback != "_inc_changes_count":
+ warnings.warn(
+ "connect() likely causes a reference" " cycle, use connect2() instead",
+ RuntimeWarning,
+ 2,
+ )
+ if name not in self._callbacks:
+ self._callbacks[name] = []
+ self._callbacks[name].append(callback)
+
+ def connect2(
+ self, name: str, callback: Callable[..., Any], *args: object, **kwds: object
+ ) -> None:
+ """Connect to a signal.
+
+ The callback will be passed the cache as an argument, and
+ any arguments passed to this function. Make sure that, if you
+ pass a method of a class as your callback, your class does not
+ contain a reference to the cache.
+
+ Cyclic references to the cache can cause issues if the Cache object
+ is replaced by a new one, because the cache keeps a lot of objects and
+ tens of open file descriptors.
+
+ currently only used for cache_{post,pre}_{changed,open}.
+
+ .. versionadded:: 1.0
+ """
+ if name not in self._callbacks2:
+ self._callbacks2[name] = []
+ self._callbacks2[name].append((callback, args, kwds))
+
+ def actiongroup(self) -> apt_pkg.ActionGroup:
+ """Return an `ActionGroup` object for the current cache.
+
+ Action groups can be used to speedup actions. The action group is
+ active as soon as it is created, and disabled when the object is
+ deleted or when release() is called.
+
+ You can use the action group as a context manager, this is the
+ recommended way::
+
+ with cache.actiongroup():
+ for package in my_selected_packages:
+ package.mark_install()
+
+ This way, the action group is automatically released as soon as the
+ with statement block is left. It also has the benefit of making it
+ clear which parts of the code run with a action group and which
+ don't.
+ """
+ return apt_pkg.ActionGroup(self._depcache)
+
+ @property
+ def dpkg_journal_dirty(self) -> bool:
+ """Return True if the dpkg was interrupted
+
+ All dpkg operations will fail until this is fixed, the action to
+ fix the system if dpkg got interrupted is to run
+ 'dpkg --configure -a' as root.
+ """
+ dpkg_status_dir = os.path.dirname(
+ apt_pkg.config.find_file("Dir::State::status")
+ )
+ for f in os.listdir(os.path.join(dpkg_status_dir, "updates")):
+ if fnmatch.fnmatch(f, "[0-9]*"):
+ return True
+ return False
+
+ @property
+ def broken_count(self) -> int:
+ """Return the number of packages with broken dependencies."""
+ return self._depcache.broken_count
+
+ @property
+ def delete_count(self) -> int:
+ """Return the number of packages marked for deletion."""
+ return self._depcache.del_count
+
+ @property
+ def install_count(self) -> int:
+ """Return the number of packages marked for installation."""
+ return self._depcache.inst_count
+
+ @property
+ def keep_count(self) -> int:
+ """Return the number of packages marked as keep."""
+ return self._depcache.keep_count
+
+
+class ProblemResolver:
+ """Resolve problems due to dependencies and conflicts.
+
+ The first argument 'cache' is an instance of apt.Cache.
+ """
+
+ def __init__(self, cache: Cache) -> None:
+ self._resolver = apt_pkg.ProblemResolver(cache._depcache)
+ self._cache = cache
+
+ def clear(self, package: Package) -> None:
+ """Reset the package to the default state."""
+ self._resolver.clear(package._pkg)
+
+ def protect(self, package: Package) -> None:
+ """Protect a package so it won't be removed."""
+ self._resolver.protect(package._pkg)
+
+ def remove(self, package: Package) -> None:
+ """Mark a package for removal."""
+ self._resolver.remove(package._pkg)
+
+ def resolve(self) -> None:
+ """Resolve dependencies, try to remove packages where needed."""
+ self._cache.cache_pre_change()
+ self._resolver.resolve()
+ self._cache.cache_post_change()
+
+ def resolve_by_keep(self) -> None:
+ """Resolve dependencies, do not try to remove packages."""
+ self._cache.cache_pre_change()
+ self._resolver.resolve_by_keep()
+ self._cache.cache_post_change()
+
+ def keep_phased_updates(self) -> None:
+ """Keep back phased updates."""
+ self._cache.cache_pre_change()
+ self._resolver.keep_phased_updates()
+ self._cache.cache_post_change()
+
+
+# ----------------------------- experimental interface
+
+
+class Filter:
+ """Filter base class"""
+
+ def apply(self, pkg: Package) -> bool:
+ """Filter function, return True if the package matchs a
+ filter criteria and False otherwise
+ """
+ return True
+
+
+class MarkedChangesFilter(Filter):
+ """Filter that returns all marked changes"""
+
+ def apply(self, pkg: Package) -> bool:
+ if pkg.marked_install or pkg.marked_delete or pkg.marked_upgrade:
+ return True
+ else:
+ return False
+
+
+class InstalledFilter(Filter):
+ """Filter that returns all installed packages.
+
+ .. versionadded:: 1.0.0
+ """
+
+ def apply(self, pkg: Package) -> bool:
+ return pkg.is_installed
+
+
+class _FilteredCacheHelper:
+ """Helper class for FilteredCache to break a reference cycle."""
+
+ def __init__(self, cache: Cache) -> None:
+ # Do not keep a reference to the cache, or you have a cycle!
+
+ self._filtered: dict[str, bool] = {}
+ self._filters: list[Filter] = []
+ cache.connect2("cache_post_change", self.filter_cache_post_change)
+ cache.connect2("cache_post_open", self.filter_cache_post_change)
+
+ def _reapply_filter(self, cache: Cache) -> None:
+ "internal helper to refilter"
+ # Do not keep a reference to the cache, or you have a cycle!
+ self._filtered = {}
+ for pkg in cache:
+ for f in self._filters:
+ if f.apply(pkg):
+ self._filtered[pkg.name] = True
+ break
+
+ def set_filter(self, filter: Filter) -> None:
+ """Set the current active filter."""
+ self._filters = []
+ self._filters.append(filter)
+
+ def filter_cache_post_change(self, cache: Cache) -> None:
+ """Called internally if the cache changes, emit a signal then."""
+ # Do not keep a reference to the cache, or you have a cycle!
+ self._reapply_filter(cache)
+
+
+class FilteredCache:
+ """A package cache that is filtered.
+
+ Can work on a existing cache or create a new one
+ """
+
+ def __init__(
+ self, cache: Cache | None = None, progress: OpProgress | None = None
+ ) -> None:
+ if cache is None:
+ self.cache = Cache(progress)
+ else:
+ self.cache = cache
+ self._helper = _FilteredCacheHelper(self.cache)
+
+ def __len__(self) -> int:
+ return len(self._helper._filtered)
+
+ def __getitem__(self, key: str) -> Package:
+ return self.cache[key]
+
+ def __iter__(self) -> Iterator[Package]:
+ for pkgname in self._helper._filtered:
+ yield self.cache[pkgname]
+
+ def keys(self) -> KeysView[str]:
+ return self._helper._filtered.keys()
+
+ def has_key(self, key: object) -> bool:
+ return key in self
+
+ def __contains__(self, key: object) -> bool:
+ try:
+ # Normalize package name for multi arch
+ return self.cache[key].name in self._helper._filtered
+ except KeyError:
+ return False
+
+ def set_filter(self, filter: Filter) -> None:
+ """Set the current active filter."""
+ self._helper.set_filter(filter)
+ self.cache.cache_post_change()
+
+ def filter_cache_post_change(self) -> None:
+ """Called internally if the cache changes, emit a signal then."""
+ self._helper.filter_cache_post_change(self.cache)
+
+ def __getattr__(self, key: str) -> Any:
+ """we try to look exactly like a real cache."""
+ return getattr(self.cache, key)
+
+
+def cache_pre_changed(cache: Cache) -> None:
+ print("cache pre changed")
+
+
+def cache_post_changed(cache: Cache) -> None:
+ print("cache post changed")
+
+
+def _test() -> None:
+ """Internal test code."""
+ print("Cache self test")
+ apt_pkg.init()
+ cache = Cache(apt.progress.text.OpProgress())
+ cache.connect2("cache_pre_change", cache_pre_changed)
+ cache.connect2("cache_post_change", cache_post_changed)
+ print("aptitude" in cache)
+ pkg = cache["aptitude"]
+ print(pkg.name)
+ print(len(cache))
+
+ for pkgname in cache.keys():
+ assert cache[pkgname].name == pkgname
+
+ cache.upgrade()
+ changes = cache.get_changes()
+ print(len(changes))
+ for pkg in changes:
+ assert pkg.name
+
+ # see if fetching works
+ for dirname in ["/tmp/pytest", "/tmp/pytest/partial"]:
+ if not os.path.exists(dirname):
+ os.mkdir(dirname)
+ apt_pkg.config.set("Dir::Cache::Archives", "/tmp/pytest")
+ pm = apt_pkg.PackageManager(cache._depcache)
+ fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress())
+ cache._fetch_archives(fetcher, pm, None)
+ # sys.exit(1)
+
+ print("Testing filtered cache (argument is old cache)")
+ filtered = FilteredCache(cache)
+ filtered.cache.connect2("cache_pre_change", cache_pre_changed)
+ filtered.cache.connect2("cache_post_change", cache_post_changed)
+ filtered.cache.upgrade()
+ filtered.set_filter(MarkedChangesFilter())
+ print(len(filtered))
+ for pkgname in filtered.keys():
+ assert pkgname == filtered[pkgname].name
+
+ print(len(filtered))
+
+ print("Testing filtered cache (no argument)")
+ filtered = FilteredCache(progress=apt.progress.base.OpProgress())
+ filtered.cache.connect2("cache_pre_change", cache_pre_changed)
+ filtered.cache.connect2("cache_post_change", cache_post_changed)
+ filtered.cache.upgrade()
+ filtered.set_filter(MarkedChangesFilter())
+ print(len(filtered))
+ for pkgname in filtered.keys():
+ assert pkgname == filtered[pkgname].name
+
+ print(len(filtered))
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/apt/cdrom.py b/apt/cdrom.py
new file mode 100644
index 0000000..dc15c5b
--- /dev/null
+++ b/apt/cdrom.py
@@ -0,0 +1,91 @@
+# cdrom.py - CDROM handling
+#
+# Copyright (c) 2005-2009 Canonical
+# Copyright (c) 2009 Julian Andres Klode <jak@debian.org>
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Classes related to cdrom handling."""
+import glob
+
+import apt_pkg
+
+from apt.progress.base import CdromProgress
+
+
+class Cdrom(apt_pkg.Cdrom):
+ """Support for apt-cdrom like features.
+
+ This class has several optional parameters for initialisation, which may
+ be used to influence the behaviour of the object:
+
+ The optional parameter `progress` is a CdromProgress() subclass, which will
+ ask for the correct cdrom, etc. If not specified or None, a CdromProgress()
+ object will be used.
+
+ The optional parameter `mountpoint` may be used to specify an alternative
+ mountpoint.
+
+ If the optional parameter `nomount` is True, the cdroms will not be
+ mounted. This is the default behaviour.
+ """
+
+ def __init__(
+ self,
+ progress: CdromProgress | None = None,
+ mountpoint: str | None = None,
+ nomount: bool = True,
+ ) -> None:
+ apt_pkg.Cdrom.__init__(self)
+ if progress is None:
+ self._progress = CdromProgress()
+ else:
+ self._progress = progress
+ # see if we have a alternative mountpoint
+ if mountpoint is not None:
+ apt_pkg.config.set("Acquire::cdrom::mount", mountpoint)
+ # do not mess with mount points by default
+ if nomount:
+ apt_pkg.config.set("APT::CDROM::NoMount", "true")
+ else:
+ apt_pkg.config.set("APT::CDROM::NoMount", "false")
+
+ def add(self, progress: CdromProgress | None = None) -> bool:
+ """Add cdrom to the sources.list."""
+ return apt_pkg.Cdrom.add(self, progress or self._progress)
+
+ def ident(self, progress: CdromProgress | None = None) -> str:
+ """Identify the cdrom."""
+ return apt_pkg.Cdrom.ident(self, progress or self._progress)
+
+ @property
+ def in_sources_list(self) -> bool:
+ """Check if the cdrom is already in the current sources.list."""
+ cd_id = self.ident()
+ if cd_id is None:
+ # FIXME: throw exception instead
+ return False
+ # Get a list of files
+ src = glob.glob(apt_pkg.config.find_dir("Dir::Etc::sourceparts") + "*")
+ src.append(apt_pkg.config.find_file("Dir::Etc::sourcelist"))
+ # Check each file
+ for fname in src:
+ with open(fname) as fobj:
+ for line in fobj:
+ if not line.lstrip().startswith("#") and cd_id in line:
+ return True
+ return False
diff --git a/apt/debfile.py b/apt/debfile.py
new file mode 100644
index 0000000..b3ef733
--- /dev/null
+++ b/apt/debfile.py
@@ -0,0 +1,861 @@
+# Copyright (c) 2005-2010 Canonical
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Classes for working with locally available Debian packages."""
+
+import gzip
+import os
+import sys
+from collections.abc import Iterable
+from io import BytesIO
+from typing import cast
+
+import apt_inst
+import apt_pkg
+from apt_pkg import gettext as _
+
+import apt
+
+
+class NoDebArchiveException(IOError):
+ """Exception which is raised if a file is no Debian archive."""
+
+
+class DebPackage:
+ """A Debian Package (.deb file)."""
+
+ # Constants for comparing the local package file with the version
+ # in the cache
+ (VERSION_NONE, VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER) = range(4)
+
+ debug = 0
+
+ def __init__(
+ self, filename: str | None = None, cache: apt.Cache | None = None
+ ) -> None:
+ if cache is None:
+ cache = apt.Cache()
+ self._cache = cache
+ self._debfile = cast(apt_inst.DebFile, None)
+ self.pkgname = ""
+ self.filename: str | None = None
+ self._sections: dict[str, str] | apt_pkg.TagSection[str] = {} # noqa
+ self._need_pkgs: list[str] = []
+ self._check_was_run = False
+ self._failure_string = ""
+ self._multiarch: str | None = None
+ if filename:
+ self.open(filename)
+
+ def open(self, filename: str) -> None:
+ """open given debfile"""
+ self._dbg(3, "open '%s'" % filename)
+ self._need_pkgs = []
+ self._installed_conflicts: set[str] = set()
+ self._failure_string = ""
+ self.filename = filename
+ self._debfile = apt_inst.DebFile(self.filename)
+ control = self._debfile.control.extractdata("control")
+ self._sections = apt_pkg.TagSection(control)
+ self.pkgname = self._sections["Package"]
+ self._check_was_run = False
+
+ def __getitem__(self, key: str) -> str:
+ return self._sections[key]
+
+ def __contains__(self, key: str) -> bool:
+ return key in self._sections
+
+ @property
+ def filelist(self) -> list[str]:
+ """return the list of files in the deb."""
+ files = []
+ try:
+ self._debfile.data.go(lambda item, data: files.append(item.name))
+ except SystemError:
+ return [_("List of files for '%s' could not be read") % self.filename]
+ return files
+
+ @property
+ def control_filelist(self) -> list[str]:
+ """return the list of files in control.tar.gz"""
+ control = []
+ try:
+ self._debfile.control.go(lambda item, data: control.append(item.name))
+ except SystemError:
+ return [
+ _("List of control files for '%s' could not be read") % self.filename
+ ]
+ return sorted(control)
+
+ # helper that will return a pkgname with a multiarch suffix if needed
+ def _maybe_append_multiarch_suffix(
+ self, pkgname: str, in_conflict_checking: bool = False
+ ) -> str:
+ # trivial cases
+ if ":" in pkgname:
+ return pkgname
+ if not self._multiarch:
+ return pkgname
+ elif self._cache.is_virtual_package(pkgname):
+ return pkgname
+ elif (
+ pkgname in self._cache
+ and self._cache[pkgname].candidate is not None
+ and cast(apt.package.Version, self._cache[pkgname].candidate).architecture
+ == "all"
+ ):
+ return pkgname
+ # now do the real multiarch checking
+ multiarch_pkgname = f"{pkgname}:{self._multiarch}"
+ # the upper layers will handle this
+ if multiarch_pkgname not in self._cache:
+ return multiarch_pkgname
+ multiarch_pkg = self._cache[multiarch_pkgname]
+ if multiarch_pkg.candidate is None:
+ return multiarch_pkgname
+ # now check the multiarch state
+ cand = multiarch_pkg.candidate._cand
+ # print pkgname, multiarch_pkgname, cand.multi_arch
+ # the default is to add the suffix, unless its a pkg that can satify
+ # foreign dependencies
+ if cand.multi_arch & cand.MULTI_ARCH_FOREIGN:
+ return pkgname
+ # for conflicts we need a special case here, any not multiarch enabled
+ # package has a implicit conflict
+ if in_conflict_checking and not (cand.multi_arch & cand.MULTI_ARCH_SAME):
+ return pkgname
+ return multiarch_pkgname
+
+ def _is_or_group_satisfied(self, or_group: list[tuple[str, str, str]]) -> bool:
+ """Return True if at least one dependency of the or-group is satisfied.
+
+ This method gets an 'or_group' and analyzes if at least one dependency
+ of this group is already satisfied.
+ """
+ self._dbg(2, "_checkOrGroup(): %s " % (or_group))
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # multiarch
+ depname = self._maybe_append_multiarch_suffix(depname)
+
+ # check for virtual pkgs
+ if depname not in self._cache:
+ if self._cache.is_virtual_package(depname):
+ self._dbg(
+ 3, "_is_or_group_satisfied(): %s is virtual dep" % depname
+ )
+ for pkg in self._cache.get_providing_packages(depname):
+ if pkg.is_installed:
+ return True
+ continue
+ # check real dependency
+ inst = self._cache[depname].installed
+ if inst is not None and apt_pkg.check_dep(inst.version, oper, ver):
+ return True
+
+ # if no real dependency is installed, check if there is
+ # a package installed that provides this dependency
+ # (e.g. scrollkeeper dependecies are provided by rarian-compat)
+ # but only do that if there is no version required in the
+ # dependency (we do not supprot versionized dependencies)
+ if not oper:
+ for ppkg in self._cache.get_providing_packages(
+ depname, include_nonvirtual=True
+ ):
+ if ppkg.is_installed:
+ self._dbg(
+ 3,
+ "found installed '%s' that provides '%s'"
+ % (ppkg.name, depname),
+ )
+ return True
+ return False
+
+ def _satisfy_or_group(self, or_group: list[tuple[str, str, str]]) -> bool:
+ """Try to satisfy the or_group."""
+ for dep in or_group:
+ depname, ver, oper = dep
+
+ # multiarch
+ depname = self._maybe_append_multiarch_suffix(depname)
+
+ # if we don't have it in the cache, it may be virtual
+ if depname not in self._cache:
+ if not self._cache.is_virtual_package(depname):
+ continue
+ providers = self._cache.get_providing_packages(depname)
+ # if a package just has a single virtual provider, we
+ # just pick that (just like apt)
+ if len(providers) != 1:
+ continue
+ depname = providers[0].name
+
+ # now check if we can satisfy the deps with the candidate(s)
+ # in the cache
+ pkg = self._cache[depname]
+ cand = self._cache._depcache.get_candidate_ver(pkg._pkg)
+ if not cand:
+ continue
+ if not apt_pkg.check_dep(cand.ver_str, oper, ver):
+ continue
+
+ # check if we need to install it
+ self._dbg(2, "Need to get: %s" % depname)
+ self._need_pkgs.append(depname)
+ return True
+
+ # if we reach this point, we failed
+ or_str = ""
+ for dep in or_group:
+ or_str += dep[0]
+ if ver and oper:
+ or_str += f" ({dep[2]} {dep[1]})"
+ if dep != or_group[len(or_group) - 1]:
+ or_str += "|"
+ self._failure_string += _("Dependency is not satisfiable: %s\n") % or_str
+ return False
+
+ def _check_single_pkg_conflict(self, pkgname: str, ver: str, oper: str) -> bool:
+ """Return True if a pkg conflicts with a real installed/marked pkg."""
+ # FIXME: deal with conflicts against its own provides
+ # (e.g. Provides: ftp-server, Conflicts: ftp-server)
+ self._dbg(
+ 3,
+ "_check_single_pkg_conflict() pkg='%s' ver='%s' oper='%s'"
+ % (pkgname, ver, oper),
+ )
+ pkg = self._cache[pkgname]
+ if pkg.is_installed:
+ assert pkg.installed is not None
+ pkgver = pkg.installed.version
+ elif pkg.marked_install:
+ assert pkg.candidate is not None
+ pkgver = pkg.candidate.version
+ else:
+ return False
+ # print "pkg: %s" % pkgname
+ # print "ver: %s" % ver
+ # print "pkgver: %s " % pkgver
+ # print "oper: %s " % oper
+ if apt_pkg.check_dep(pkgver, oper, ver) and not self.replaces_real_pkg(
+ pkgname, oper, ver
+ ):
+ self._failure_string += (
+ _("Conflicts with the installed package " "'%s'") % pkg.name
+ )
+ self._dbg(3, "conflicts with installed pkg '%s'" % pkg.name)
+ return True
+ return False
+
+ def _check_conflicts_or_group(self, or_group: list[tuple[str, str, str]]) -> bool:
+ """Check the or-group for conflicts with installed pkgs."""
+ self._dbg(2, "_check_conflicts_or_group(): %s " % (or_group))
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # FIXME: is this good enough? i.e. will apt always populate
+ # the cache with conflicting pkgnames for our arch?
+ depname = self._maybe_append_multiarch_suffix(
+ depname, in_conflict_checking=True
+ )
+
+ # check conflicts with virtual pkgs
+ if depname not in self._cache:
+ # FIXME: we have to check for virtual replaces here as
+ # well (to pass tests/gdebi-test8.deb)
+ if self._cache.is_virtual_package(depname):
+ for pkg in self._cache.get_providing_packages(depname):
+ self._dbg(3, "conflicts virtual check: %s" % pkg.name)
+ # P/C/R on virtal pkg, e.g. ftpd
+ if self.pkgname == pkg.name:
+ self._dbg(3, "conflict on self, ignoring")
+ continue
+ if self._check_single_pkg_conflict(pkg.name, ver, oper):
+ self._installed_conflicts.add(pkg.name)
+ continue
+ if self._check_single_pkg_conflict(depname, ver, oper):
+ self._installed_conflicts.add(depname)
+ return bool(self._installed_conflicts)
+
+ @property
+ def conflicts(self) -> list[list[tuple[str, str, str]]]:
+ """List of packages conflicting with this package."""
+ key = "Conflicts"
+ try:
+ return apt_pkg.parse_depends(self._sections[key], False)
+ except KeyError:
+ return []
+
+ @property
+ def depends(self) -> list[list[tuple[str, str, str]]]:
+ """List of packages on which this package depends on."""
+ depends = []
+ # find depends
+ for key in "Depends", "Pre-Depends":
+ try:
+ depends.extend(apt_pkg.parse_depends(self._sections[key], False))
+ except KeyError:
+ pass
+ return depends
+
+ @property
+ def provides(self) -> list[list[tuple[str, str, str]]]:
+ """List of virtual packages which are provided by this package."""
+ key = "Provides"
+ try:
+ return apt_pkg.parse_depends(self._sections[key], False)
+ except KeyError:
+ return []
+
+ @property
+ def replaces(self) -> list[list[tuple[str, str, str]]]:
+ """List of packages which are replaced by this package."""
+ key = "Replaces"
+ try:
+ return apt_pkg.parse_depends(self._sections[key], False)
+ except KeyError:
+ return []
+
+ def replaces_real_pkg(self, pkgname: str, oper: str, ver: str) -> bool:
+ """Return True if a given non-virtual package is replaced.
+
+ Return True if the deb packages replaces a real (not virtual)
+ packages named (pkgname, oper, ver).
+ """
+ self._dbg(3, f"replaces_real_pkg() {pkgname} {oper} {ver}")
+ pkg = self._cache[pkgname]
+ pkgver: str | None = None
+ if pkg.is_installed:
+ assert pkg.installed is not None
+ pkgver = pkg.installed.version
+ elif pkg.marked_install:
+ assert pkg.candidate is not None
+ pkgver = pkg.candidate.version
+ else:
+ pkgver = None
+ for or_group in self.replaces:
+ for name, ver, oper in or_group:
+ if name == pkgname and (
+ pkgver is None or apt_pkg.check_dep(pkgver, oper, ver)
+ ):
+ self._dbg(
+ 3,
+ "we have a replaces in our package for the "
+ "conflict against '%s'" % (pkgname),
+ )
+ return True
+ return False
+
+ def check_conflicts(self) -> bool:
+ """Check if there are conflicts with existing or selected packages.
+
+ Check if the package conflicts with a existing or to be installed
+ package. Return True if the pkg is OK.
+ """
+ res = True
+ for or_group in self.conflicts:
+ if self._check_conflicts_or_group(or_group):
+ # print "Conflicts with a exisiting pkg!"
+ # self._failure_string = "Conflicts with a exisiting pkg!"
+ res = False
+ return res
+
+ def check_breaks_existing_packages(self) -> bool:
+ """
+ check if installing the package would break exsisting
+ package on the system, e.g. system has:
+ smc depends on smc-data (= 1.4)
+ and user tries to installs smc-data 1.6
+ """
+ # show progress information as this step may take some time
+ size = float(len(self._cache))
+ steps = max(int(size / 50), 1)
+ debver = self._sections["Version"]
+ debarch = self._sections["Architecture"]
+ # store what we provide so that we can later check against that
+ provides = [x[0][0] for x in self.provides]
+ for i, pkg in enumerate(self._cache):
+ if i % steps == 0:
+ self._cache.op_progress.update(float(i) / size * 100.0)
+ if not pkg.is_installed:
+ continue
+ assert pkg.installed is not None
+ # check if the exising dependencies are still satisfied
+ # with the package
+ ver = pkg._pkg.current_ver
+ for dep_or in pkg.installed.dependencies:
+ for dep in dep_or.or_dependencies:
+ if dep.name == self.pkgname:
+ if not apt_pkg.check_dep(debver, dep.relation, dep.version):
+ self._dbg(2, "would break (depends) %s" % pkg.name)
+ # TRANSLATORS: the first '%s' is the package that
+ # breaks, the second the dependency that makes it
+ # break, the third the relation (e.g. >=) and the
+ # latest the version for the releation
+ self._failure_string += _(
+ "Breaks existing package '%(pkgname)s' "
+ "dependency %(depname)s "
+ "(%(deprelation)s %(depversion)s)"
+ ) % {
+ "pkgname": pkg.name,
+ "depname": dep.name,
+ "deprelation": dep.relation,
+ "depversion": dep.version,
+ }
+ self._cache.op_progress.done()
+ return False
+ # now check if there are conflicts against this package on
+ # the existing system
+ if "Conflicts" in ver.depends_list:
+ for conflicts_ver_list in ver.depends_list["Conflicts"]:
+ for c_or in conflicts_ver_list:
+ if (
+ c_or.target_pkg.name == self.pkgname
+ and c_or.target_pkg.architecture == debarch
+ ):
+ if apt_pkg.check_dep(
+ debver, c_or.comp_type, c_or.target_ver
+ ):
+ self._dbg(2, "would break (conflicts) %s" % pkg.name)
+ # TRANSLATORS: the first '%s' is the package
+ # that conflicts, the second the packagename
+ # that it conflicts with (so the name of the
+ # deb the user tries to install), the third is
+ # the relation (e.g. >=) and the last is the
+ # version for the relation
+ self._failure_string += _(
+ "Breaks existing package '%(pkgname)s' "
+ "conflict: %(targetpkg)s "
+ "(%(comptype)s %(targetver)s)"
+ ) % {
+ "pkgname": pkg.name,
+ "targetpkg": c_or.target_pkg.name,
+ "comptype": c_or.comp_type,
+ "targetver": c_or.target_ver,
+ }
+ self._cache.op_progress.done()
+ return False
+ if (
+ c_or.target_pkg.name in provides
+ and self.pkgname != pkg.name
+ ):
+ self._dbg(2, "would break (conflicts) %s" % provides)
+ self._failure_string += _(
+ "Breaks existing package '%(pkgname)s' "
+ "that conflict: '%(targetpkg)s'. But the "
+ "'%(debfile)s' provides it via: "
+ "'%(provides)s'"
+ ) % {
+ "provides": ",".join(provides),
+ "debfile": self.filename,
+ "targetpkg": c_or.target_pkg.name,
+ "pkgname": pkg.name,
+ }
+ self._cache.op_progress.done()
+ return False
+ self._cache.op_progress.done()
+ return True
+
+ def compare_to_version_in_cache(self, use_installed: bool = True) -> int:
+ """Compare the package to the version available in the cache.
+
+ Checks if the package is already installed or availabe in the cache
+ and if so in what version, returns one of (VERSION_NONE,
+ VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER).
+ """
+ self._dbg(3, "compare_to_version_in_cache")
+ pkgname = self._sections["Package"]
+ architecture = self._sections["Architecture"]
+
+ # Arch qualify the package name
+ pkgname = ":".join([pkgname, architecture])
+
+ debver = self._sections["Version"]
+ self._dbg(1, "debver: %s" % debver)
+ if pkgname in self._cache:
+ pkg = self._cache[pkgname]
+ if use_installed and pkg.installed is not None:
+ cachever = pkg.installed.version
+ elif not use_installed and pkg.candidate is not None:
+ cachever = pkg.candidate.version
+ else:
+ return self.VERSION_NONE
+ if cachever is not None:
+ cmp = apt_pkg.version_compare(cachever, debver)
+ self._dbg(1, "CompareVersion(debver,instver): %s" % cmp)
+ if cmp == 0:
+ return self.VERSION_SAME
+ elif cmp < 0:
+ return self.VERSION_NEWER
+ elif cmp > 0:
+ return self.VERSION_OUTDATED
+ return self.VERSION_NONE
+
+ def check(self, allow_downgrade: bool = False) -> bool:
+ """Check if the package is installable."""
+ self._dbg(3, "check")
+
+ self._check_was_run = True
+
+ # check arch
+ if "Architecture" not in self._sections:
+ self._dbg(1, "ERROR: no architecture field")
+ self._failure_string = _("No Architecture field in the package")
+ return False
+ arch = self._sections["Architecture"]
+ if arch != "all" and arch != apt_pkg.config.find("APT::Architecture"):
+ if arch in apt_pkg.get_architectures():
+ self._multiarch = arch
+ self.pkgname = f"{self.pkgname}:{self._multiarch}"
+ self._dbg(1, "Found multiarch arch: '%s'" % arch)
+ else:
+ self._dbg(1, "ERROR: Wrong architecture dude!")
+ self._failure_string = (
+ _(
+ "Wrong architecture '%s' "
+ "-- Run dpkg --add-architecture to "
+ "add it and update afterwards"
+ )
+ % arch
+ )
+ return False
+
+ # check version
+ if (
+ not allow_downgrade
+ and self.compare_to_version_in_cache() == self.VERSION_OUTDATED
+ ):
+ if self._cache[self.pkgname].installed:
+ # the deb is older than the installed
+ self._failure_string = _("A later version is already installed")
+ return False
+
+ # FIXME: this sort of error handling sux
+ self._failure_string = ""
+
+ # check conflicts
+ if not self.check_conflicts():
+ return False
+
+ # check if installing it would break anything on the
+ # current system
+ if not self.check_breaks_existing_packages():
+ return False
+
+ # try to satisfy the dependencies
+ if not self._satisfy_depends(self.depends):
+ return False
+
+ # check for conflicts again (this time with the packages that are
+ # makeed for install)
+ if not self.check_conflicts():
+ return False
+
+ if self._cache._depcache.broken_count > 0:
+ self._failure_string = _(
+ "Failed to satisfy all dependencies " "(broken cache)"
+ )
+ # clean the cache again
+ self._cache.clear()
+ return False
+ return True
+
+ def satisfy_depends_str(self, dependsstr: str) -> bool:
+ """Satisfy the dependencies in the given string."""
+ return self._satisfy_depends(apt_pkg.parse_depends(dependsstr, False))
+
+ def _satisfy_depends(self, depends: list[list[tuple[str, str, str]]]) -> bool:
+ """Satisfy the dependencies."""
+ # turn off MarkAndSweep via a action group (if available)
+ try:
+ _actiongroup = apt_pkg.ActionGroup(self._cache._depcache)
+ _actiongroup # pyflakes
+ except AttributeError:
+ pass
+ # check depends
+ for or_group in depends:
+ if not self._is_or_group_satisfied(or_group):
+ if not self._satisfy_or_group(or_group):
+ return False
+ # now try it out in the cache
+ for pkg in self._need_pkgs:
+ try:
+ self._cache[pkg].mark_install(from_user=False)
+ except SystemError:
+ self._failure_string = _("Cannot install '%s'") % pkg
+ self._cache.clear()
+ return False
+ return True
+
+ @property
+ def missing_deps(self) -> list[str]:
+ """Return missing dependencies."""
+ self._dbg(1, "Installing: %s" % self._need_pkgs)
+ if not self._check_was_run:
+ raise AttributeError("property only available after check() was run")
+ return self._need_pkgs
+
+ @property
+ def required_changes(self) -> tuple[list[str], list[str], list[str]]:
+ """Get the changes required to satisfy the dependencies.
+
+ Returns: a tuple with (install, remove, unauthenticated)
+ """
+ install = []
+ remove = []
+ unauthenticated = []
+ if not self._check_was_run:
+ raise AttributeError("property only available after check() was run")
+ for pkg in self._cache:
+ if pkg.marked_install or pkg.marked_upgrade:
+ assert pkg.candidate is not None
+ install.append(pkg.name)
+ # check authentication, one authenticated origin is enough
+ # libapt will skip non-authenticated origins then
+ authenticated = False
+ for origin in pkg.candidate.origins:
+ authenticated |= origin.trusted
+ if not authenticated:
+ unauthenticated.append(pkg.name)
+ if pkg.marked_delete:
+ remove.append(pkg.name)
+ return (install, remove, unauthenticated)
+
+ @staticmethod
+ def to_hex(in_data: str) -> str:
+ hex = ""
+ for i, c in enumerate(in_data):
+ if i % 80 == 0:
+ hex += "\n"
+ hex += "%2.2x " % ord(c)
+ return hex
+
+ @staticmethod
+ def to_strish(in_data: str | Iterable[int]) -> str:
+ s = ""
+ # py2 compat, in_data is type string
+ if isinstance(in_data, str):
+ for c in in_data:
+ if ord(c) < 10 or ord(c) > 127:
+ s += " "
+ else:
+ s += c
+ # py3 compat, in_data is type bytes
+ else:
+ for b in in_data:
+ if b < 10 or b > 127:
+ s += " "
+ else:
+ s += chr(b)
+ return s
+
+ def _get_content(
+ self,
+ part: apt_inst.TarFile,
+ name: str,
+ auto_decompress: bool = True,
+ auto_hex: bool = True,
+ ) -> str:
+ if name.startswith("./"):
+ name = name[2:]
+ data = part.extractdata(name)
+ # check for zip content
+ if name.endswith(".gz") and auto_decompress:
+ io = BytesIO(data)
+ gz = gzip.GzipFile(fileobj=io)
+ data = _("Automatically decompressed:\n\n").encode("utf-8")
+ data += gz.read()
+ # auto-convert to hex
+ try:
+ return data.decode("utf-8")
+ except Exception:
+ new_data = _("Automatically converted to printable ascii:\n")
+ new_data += self.to_strish(data)
+ return new_data
+
+ def control_content(self, name: str) -> str:
+ """return the content of a specific control.tar.gz file"""
+ try:
+ return self._get_content(self._debfile.control, name)
+ except LookupError:
+ return ""
+
+ def data_content(self, name: str) -> str:
+ """return the content of a specific control.tar.gz file"""
+ try:
+ return self._get_content(self._debfile.data, name)
+ except LookupError:
+ return ""
+
+ def _dbg(self, level: int, msg: str) -> None:
+ """Write debugging output to sys.stderr."""
+ if level <= self.debug:
+ print(msg, file=sys.stderr)
+
+ def install(
+ self, install_progress: apt.progress.base.InstallProgress | None = None
+ ) -> int:
+ """Install the package."""
+ if self.filename is None:
+ raise apt_pkg.Error("No filename specified")
+ if install_progress is None:
+ return os.spawnlp(os.P_WAIT, "dpkg", "dpkg", "-i", self.filename)
+ else:
+ try:
+ install_progress.start_update()
+ except AttributeError:
+ install_progress.startUpdate() # type: ignore
+ res = install_progress.run(self.filename)
+ try:
+ install_progress.finish_update()
+ except AttributeError:
+ install_progress.finishUpdate() # type: ignore
+ return res
+
+
+class DscSrcPackage(DebPackage):
+ """A locally available source package."""
+
+ def __init__(
+ self, filename: str | None = None, cache: apt.Cache | None = None
+ ) -> None:
+ DebPackage.__init__(self, None, cache)
+ self.filename: str | None = filename
+ self._depends: list[list[tuple[str, str, str]]] = []
+ self._conflicts: list[list[tuple[str, str, str]]] = []
+ self._installed_conflicts: set[str] = set()
+ self.pkgname = ""
+ self.binaries: list[str] = []
+ self._sections: dict[str, str] = {}
+ if self.filename is not None:
+ self.open(self.filename)
+
+ @property
+ def depends(self) -> list[list[tuple[str, str, str]]]:
+ """Return the dependencies of the package"""
+ return self._depends
+
+ @property
+ def conflicts(self) -> list[list[tuple[str, str, str]]]:
+ """Return the dependencies of the package"""
+ return self._conflicts
+
+ @property
+ def filelist(self) -> list[str]:
+ """Return the list of files associated with this dsc file"""
+ # Files stanza looks like (hash, size, filename, ...)
+ return self._sections["Files"].split()[2::3]
+
+ def open(self, file: str) -> None:
+ """Open the package."""
+ depends_tags = ["Build-Depends", "Build-Depends-Indep"]
+ conflicts_tags = ["Build-Conflicts", "Build-Conflicts-Indep"]
+ fd = apt_pkg.open_maybe_clear_signed_file(file)
+ fobj = os.fdopen(fd)
+ tagfile = apt_pkg.TagFile(fobj)
+ try:
+ for sec in tagfile:
+ for tag in depends_tags:
+ if tag not in sec:
+ continue
+ self._depends.extend(apt_pkg.parse_src_depends(sec[tag]))
+ for tag in conflicts_tags:
+ if tag not in sec:
+ continue
+ self._conflicts.extend(apt_pkg.parse_src_depends(sec[tag]))
+ if "Source" in sec:
+ self.pkgname = sec["Source"]
+ if "Binary" in sec:
+ self.binaries = [b.strip() for b in sec["Binary"].split(",")]
+ for tag in sec.keys():
+ if tag in sec:
+ self._sections[tag] = sec[tag]
+ finally:
+ del tagfile
+ fobj.close()
+
+ s = _(
+ "Install Build-Dependencies for " "source package '%s' that builds %s\n"
+ ) % (self.pkgname, " ".join(self.binaries))
+ self._sections["Description"] = s
+ self._check_was_run = False
+
+ def check(self, allow_downgrade: bool = False) -> bool:
+ """Check if the package is installable.
+
+ The second parameter is ignored and only exists for compatibility
+ with parent type."""
+ if not self.check_conflicts():
+ for pkgname in self._installed_conflicts:
+ if self._cache[pkgname]._pkg.essential:
+ raise Exception(_("An essential package would be removed"))
+ self._cache[pkgname].mark_delete()
+ # properties are ok now
+ self._check_was_run = True
+ # FIXME: a additional run of the check_conflicts()
+ # after _satisfy_depends() should probably be done
+ return self._satisfy_depends(self.depends)
+
+
+def _test() -> None:
+ """Test function"""
+ from apt.cache import Cache
+ from apt.progress.base import InstallProgress
+
+ cache = Cache()
+
+ vp = "www-browser"
+ print(f"{vp} virtual: {cache.is_virtual_package(vp)}")
+ providers = cache.get_providing_packages(vp)
+ print("Providers for %s :" % vp)
+ for pkg in providers:
+ print(" %s" % pkg.name)
+
+ d = DebPackage(sys.argv[1], cache)
+ print("Deb: %s" % d.pkgname)
+ if not d.check():
+ print("can't be satified")
+ print(d._failure_string)
+ print("missing deps: %s" % d.missing_deps)
+ print(d.required_changes)
+
+ print(d.filelist)
+
+ print("Installing ...")
+ ret = d.install(InstallProgress())
+ print(ret)
+
+ # s = DscSrcPackage(cache, "../tests/3ddesktop_0.2.9-6.dsc")
+ # s.check_dep()
+ # print "Missing deps: ",s.missingDeps
+ # print "Print required changes: ", s.requiredChanges
+
+ s = DscSrcPackage(cache=cache)
+ ds = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)"
+ print(s._satisfy_depends(apt_pkg.parse_depends(ds, False)))
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/apt/package.py b/apt/package.py
new file mode 100644
index 0000000..50ed6d1
--- /dev/null
+++ b/apt/package.py
@@ -0,0 +1,1559 @@
+# package.py - apt package abstraction
+#
+# Copyright (c) 2005-2009 Canonical
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Functionality related to packages."""
+from __future__ import annotations
+
+import logging
+import os
+import re
+import socket
+import subprocess
+import sys
+import threading
+from collections.abc import Iterable, Iterator, Mapping, Sequence
+from http.client import BadStatusLine
+from typing import Any, no_type_check
+from urllib.error import HTTPError
+from urllib.request import urlopen
+
+import apt_pkg
+from apt_pkg import gettext as _
+
+import apt.progress.text
+from apt.progress.base import AcquireProgress, InstallProgress
+
+__all__ = (
+ "BaseDependency",
+ "Dependency",
+ "Origin",
+ "Package",
+ "Record",
+ "Version",
+ "VersionList",
+)
+
+
+def _file_is_same(path: str, size: int, hashes: apt_pkg.HashStringList) -> bool:
+ """Return ``True`` if the file is the same."""
+ if os.path.exists(path) and os.path.getsize(path) == size:
+ with open(path) as fobj:
+ return apt_pkg.Hashes(fobj).hashes == hashes
+ return False
+
+
+class FetchError(Exception):
+ """Raised when a file could not be fetched."""
+
+
+class UntrustedError(FetchError):
+ """Raised when a file did not have a trusted hash."""
+
+
+class BaseDependency:
+ """A single dependency."""
+
+ class __dstr(str):
+ """Compare helper for compatibility with old third-party code.
+
+ Old third-party code might still compare the relation with the
+ previously used relations (<<,<=,==,!=,>=,>>,) instead of the curently
+ used ones (<,<=,=,!=,>=,>,). This compare helper lets < match to <<,
+ > match to >> and = match to ==.
+ """
+
+ def __eq__(self, other: object) -> bool:
+ if str.__eq__(self, other):
+ return True
+ elif str.__eq__(self, "<"):
+ return str.__eq__("<<", other)
+ elif str.__eq__(self, ">"):
+ return str.__eq__(">>", other)
+ elif str.__eq__(self, "="):
+ return str.__eq__("==", other)
+ else:
+ return False
+
+ def __ne__(self, other: object) -> bool:
+ return not self.__eq__(other)
+
+ def __init__(self, version: Version, dep: apt_pkg.Dependency) -> None:
+ self._version = version # apt.package.Version
+ self._dep = dep # apt_pkg.Dependency
+
+ def __str__(self) -> str:
+ return f"{self.rawtype}: {self.rawstr}"
+
+ def __repr__(self) -> str:
+ return "<BaseDependency: name:{!r} relation:{!r} version:{!r} rawtype:{!r}>".format(
+ self.name,
+ self.relation,
+ self.version,
+ self.rawtype,
+ )
+
+ @property
+ def name(self) -> str:
+ """The name of the target package."""
+ return self._dep.target_pkg.name
+
+ @property
+ def relation(self) -> str:
+ """The relation (<, <=, =, !=, >=, >, '') in mathematical notation.
+
+ The empty string will be returned in case of an unversioned dependency.
+ """
+ return self.__dstr(self._dep.comp_type)
+
+ @property
+ def relation_deb(self) -> str:
+ """The relation (<<, <=, =, !=, >=, >>, '') in Debian notation.
+
+ The empty string will be returned in case of an unversioned dependency.
+ For more details see the Debian Policy Manual on the syntax of
+ relationship fields:
+ https://www.debian.org/doc/debian-policy/ch-relationships.html#s-depsyntax # noqa
+
+ .. versionadded:: 1.0.0
+ """
+ return self._dep.comp_type_deb
+
+ @property
+ def version(self) -> str:
+ """The target version or an empty string.
+
+ Note that the version is only an empty string in case of an unversioned
+ dependency. In this case the relation is also an empty string.
+ """
+ return self._dep.target_ver
+
+ @property
+ def target_versions(self) -> list[Version]:
+ """A list of all Version objects which satisfy this dependency.
+
+ .. versionadded:: 1.0.0
+ """
+ tvers = []
+ _tvers: list[apt_pkg.Version] = self._dep.all_targets()
+ for _tver in _tvers: # type: apt_pkg.Version
+ _pkg: apt_pkg.Package = _tver.parent_pkg
+ cache = self._version.package._pcache # apt.cache.Cache
+ pkg = cache._rawpkg_to_pkg(_pkg) # apt.package.Package
+ tver = Version(pkg, _tver) # apt.package.Version
+ tvers.append(tver)
+ return tvers
+
+ @property
+ def installed_target_versions(self) -> list[Version]:
+ """A list of all installed Version objects which satisfy this dep.
+
+ .. versionadded:: 1.0.0
+ """
+ return [tver for tver in self.target_versions if tver.is_installed]
+
+ @property
+ def rawstr(self) -> str:
+ """String represenation of the dependency.
+
+ Returns the string representation of the dependency as it would be
+ written in the debian/control file. The string representation does not
+ include the type of the dependency.
+
+ Example for an unversioned dependency:
+ python3
+
+ Example for a versioned dependency:
+ python3 >= 3.2
+
+ .. versionadded:: 1.0.0
+ """
+ if self.version:
+ return f"{self.name} {self.relation_deb} {self.version}"
+ else:
+ return self.name
+
+ @property
+ def rawtype(self) -> str:
+ """Type of the dependency.
+
+ This should be one of 'Breaks', 'Conflicts', 'Depends', 'Enhances',
+ 'PreDepends', 'Recommends', 'Replaces', 'Suggests'.
+
+ Additional types might be added in the future.
+ """
+ return self._dep.dep_type_untranslated
+
+ @property
+ def pre_depend(self) -> bool:
+ """Whether this is a PreDepends."""
+ return self._dep.dep_type_untranslated == "PreDepends"
+
+
+class Dependency(list[BaseDependency]):
+ """Represent an Or-group of dependencies.
+
+ Attributes defined here:
+ or_dependencies - The possible choices
+ rawstr - String represenation of the Or-group of dependencies
+ rawtype - The type of the dependencies in the Or-group
+ target_version - A list of Versions which satisfy this Or-group of deps
+ """
+
+ def __init__(
+ self, version: Version, base_deps: list[BaseDependency], rawtype: str
+ ) -> None:
+ super().__init__(base_deps)
+ self._version = version # apt.package.Version
+ self._rawtype = rawtype
+
+ def __str__(self) -> str:
+ return f"{self.rawtype}: {self.rawstr}"
+
+ def __repr__(self) -> str:
+ return "<Dependency: [%s]>" % (", ".join(repr(bd) for bd in self))
+
+ @property
+ def or_dependencies(self) -> Dependency:
+ return self
+
+ @property
+ def rawstr(self) -> str:
+ """String represenation of the Or-group of dependencies.
+
+ Returns the string representation of the Or-group of dependencies as it
+ would be written in the debian/control file. The string representation
+ does not include the type of the Or-group of dependencies.
+
+ Example:
+ python2 >= 2.7 | python3
+
+ .. versionadded:: 1.0.0
+ """
+ return " | ".join(bd.rawstr for bd in self)
+
+ @property
+ def rawtype(self) -> str:
+ """Type of the Or-group of dependency.
+
+ This should be one of 'Breaks', 'Conflicts', 'Depends', 'Enhances',
+ 'PreDepends', 'Recommends', 'Replaces', 'Suggests'.
+
+ Additional types might be added in the future.
+
+ .. versionadded:: 1.0.0
+ """
+ return self._rawtype
+
+ @property
+ def target_versions(self) -> list[Version]:
+ """A list of all Version objects which satisfy this Or-group of deps.
+
+ .. versionadded:: 1.0.0
+ """
+ tvers: list[Version] = []
+ for bd in self: # apt.package.Dependency
+ for tver in bd.target_versions: # apt.package.Version
+ if tver not in tvers:
+ tvers.append(tver)
+ return tvers
+
+ @property
+ def installed_target_versions(self) -> list[Version]:
+ """A list of all installed Version objects which satisfy this dep.
+
+ .. versionadded:: 1.0.0
+ """
+ return [tver for tver in self.target_versions if tver.is_installed]
+
+
+class Origin:
+ """The origin of a version.
+
+ Attributes defined here:
+ archive - The archive (eg. unstable)
+ component - The component (eg. main)
+ label - The Label, as set in the Release file
+ origin - The Origin, as set in the Release file
+ codename - The Codename, as set in the Release file
+ site - The hostname of the site.
+ trusted - Boolean value whether this is trustworthy.
+ """
+
+ def __init__(self, pkg: Package, packagefile: apt_pkg.PackageFile) -> None:
+ self.archive = packagefile.archive
+ self.component = packagefile.component
+ self.label = packagefile.label
+ self.origin = packagefile.origin
+ self.codename = packagefile.codename
+ self.site = packagefile.site
+ self.not_automatic = packagefile.not_automatic
+ # check the trust
+ indexfile = pkg._pcache._list.find_index(packagefile)
+ if indexfile and indexfile.is_trusted:
+ self.trusted = True
+ else:
+ self.trusted = False
+
+ def __repr__(self) -> str:
+ return (
+ "<Origin component:%r archive:%r origin:%r label:%r "
+ "site:%r isTrusted:%r>"
+ ) % (
+ self.component,
+ self.archive,
+ self.origin,
+ self.label,
+ self.site,
+ self.trusted,
+ )
+
+
+class Record(Mapping[Any, Any]):
+ """Record in a Packages file
+
+ Represent a record as stored in a Packages file. You can use this like
+ a dictionary mapping the field names of the record to their values::
+
+ >>> record = Record("Package: python-apt\\nVersion: 0.8.0\\n\\n")
+ >>> record["Package"]
+ 'python-apt'
+ >>> record["Version"]
+ '0.8.0'
+
+ For example, to get the tasks of a package from a cache, you could do::
+
+ package.candidate.record["Tasks"].split()
+
+ Of course, you can also use the :attr:`Version.tasks` property.
+
+ """
+
+ def __init__(self, record_str: str) -> None:
+ self._rec = apt_pkg.TagSection(record_str)
+
+ def __hash__(self) -> int:
+ return hash(self._rec)
+
+ def __str__(self) -> str:
+ return str(self._rec)
+
+ def __getitem__(self, key: str) -> str:
+ return self._rec[key]
+
+ def __contains__(self, key: object) -> bool:
+ return key in self._rec
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._rec.keys())
+
+ def iteritems(self) -> Iterable[tuple[object, str]]:
+ """An iterator over the (key, value) items of the record."""
+ for key in self._rec.keys():
+ yield key, self._rec[key]
+
+ def get(self, key: str, default: object = None) -> object:
+ """Return record[key] if key in record, else *default*.
+
+ The parameter *default* must be either a string or None.
+ """
+ return self._rec.get(key, default)
+
+ def has_key(self, key: str) -> bool:
+ """deprecated form of ``key in x``."""
+ return key in self._rec
+
+ def __len__(self) -> int:
+ return len(self._rec)
+
+
+class Version:
+ """Representation of a package version.
+
+ The Version class contains all information related to a
+ specific package version.
+
+ .. versionadded:: 0.7.9
+ """
+
+ def __init__(self, package: Package, cand: apt_pkg.Version) -> None:
+ self.package = package
+ self._cand = cand
+ self.package._pcache._weakversions.add(self)
+
+ def _cmp(self, other: Any) -> int | Any:
+ """Compares against another apt.Version object or a version string.
+
+ This method behaves like Python 2's cmp builtin and returns an integer
+ according to the outcome. The return value is negative in case of
+ self < other, zero if self == other and positive if self > other.
+
+ The comparison includes the package name and architecture if other is
+ an apt.Version object. If other isn't an apt.Version object it'll be
+ assumed that other is a version string (without package name/arch).
+
+ .. versionchanged:: 1.0.0
+ """
+ # Assume that other is an apt.Version object.
+ try:
+ self_name = self.package.fullname
+ other_name = other.package.fullname
+ if self_name < other_name:
+ return -1
+ elif self_name > other_name:
+ return 1
+ return apt_pkg.version_compare(self._cand.ver_str, other.version)
+ except AttributeError:
+ # Assume that other is a string that only contains the version.
+ try:
+ return apt_pkg.version_compare(self._cand.ver_str, other)
+ except TypeError:
+ return NotImplemented
+
+ def __eq__(self, other: object) -> bool:
+ return self._cmp(other) == 0
+
+ def __ge__(self, other: Version) -> bool:
+ return self._cmp(other) >= 0
+
+ def __gt__(self, other: Version) -> bool:
+ return self._cmp(other) > 0
+
+ def __le__(self, other: Version) -> bool:
+ return self._cmp(other) <= 0
+
+ def __lt__(self, other: Version) -> bool:
+ return self._cmp(other) < 0
+
+ def __ne__(self, other: object) -> bool | Any:
+ try:
+ return self._cmp(other) != 0
+ except TypeError:
+ return NotImplemented
+
+ def __hash__(self) -> int:
+ return self._cand.hash
+
+ def __str__(self) -> str:
+ return f"{self.package.name}={self.version}"
+
+ def __repr__(self) -> str:
+ return f"<Version: package:{self.package.name!r} version:{self.version!r}>"
+
+ @property
+ def _records(self) -> apt_pkg.PackageRecords:
+ """Internal helper that moves the Records to the right position."""
+ # If changing lookup, change fetch_binary() as well
+ if not self.package._pcache._records.lookup(self._cand.file_list[0]):
+ raise LookupError("Could not lookup record")
+
+ return self.package._pcache._records
+
+ @property
+ def _translated_records(self) -> apt_pkg.PackageRecords | None:
+ """Internal helper to get the translated description."""
+ desc_iter = self._cand.translated_description
+ if self.package._pcache._records.lookup(desc_iter.file_list.pop(0)):
+ return self.package._pcache._records
+ return None
+
+ @property
+ def is_security_update(self) -> bool:
+ """Return whether this version is a security update."""
+ return bool(self._cand.is_security_update)
+
+ @property
+ def installed_size(self) -> int:
+ """Return the size of the package when installed."""
+ return self._cand.installed_size
+
+ @property
+ def homepage(self) -> str:
+ """Return the homepage for the package."""
+ return self._records.homepage
+
+ @property
+ def size(self) -> int:
+ """Return the size of the package."""
+ return self._cand.size
+
+ @property
+ def architecture(self) -> str:
+ """Return the architecture of the package version."""
+ return self._cand.arch
+
+ @property
+ def downloadable(self) -> bool:
+ """Return whether the version of the package is downloadable."""
+ return bool(self._cand.downloadable)
+
+ @property
+ def is_installed(self) -> bool:
+ """Return wether this version of the package is currently installed.
+
+ .. versionadded:: 1.0.0
+ """
+ inst_ver = self.package.installed
+ return inst_ver is not None and inst_ver._cand.id == self._cand.id
+
+ @property
+ def version(self) -> str:
+ """Return the version as a string."""
+ return self._cand.ver_str
+
+ @property
+ def summary(self) -> str | None:
+ """Return the short description (one line summary)."""
+ records = self._translated_records
+ return records.short_desc if records is not None else None
+
+ @property
+ def raw_description(self) -> str:
+ """return the long description (raw)."""
+ return self._records.long_desc
+
+ @property
+ def section(self) -> str:
+ """Return the section of the package."""
+ return self._cand.section
+
+ @property
+ def description(self) -> str:
+ """Return the formatted long description.
+
+ Return the formatted long description according to the Debian policy
+ (Chapter 5.6.13).
+ See http://www.debian.org/doc/debian-policy/ch-controlfields.html
+ for more information.
+ """
+ desc = ""
+ records = self._translated_records
+ dsc = records.long_desc if records is not None else None
+
+ if not dsc:
+ return _("Missing description for '%s'." "Please report.") % (
+ self.package.name
+ )
+
+ try:
+ if not isinstance(dsc, str):
+ # Only convert where needed (i.e. Python 2.X)
+ dsc = dsc.decode("utf-8")
+ except UnicodeDecodeError as err:
+ return _(
+ "Invalid unicode in description for '%s' (%s). " "Please report."
+ ) % (self.package.name, err)
+
+ lines = iter(dsc.split("\n"))
+ # Skip the first line, since its a duplication of the summary
+ next(lines)
+ for raw_line in lines:
+ if raw_line.strip() == ".":
+ # The line is just line break
+ if not desc.endswith("\n"):
+ desc += "\n\n"
+ continue
+ if raw_line.startswith(" "):
+ # The line should be displayed verbatim without word wrapping
+ if not desc.endswith("\n"):
+ line = "\n%s\n" % raw_line[2:]
+ else:
+ line = "%s\n" % raw_line[2:]
+ elif raw_line.startswith(" "):
+ # The line is part of a paragraph.
+ if desc.endswith("\n") or desc == "":
+ # Skip the leading white space
+ line = raw_line[1:]
+ else:
+ line = raw_line
+ else:
+ line = raw_line
+ # Add current line to the description
+ desc += line
+ return desc
+
+ @property
+ def source_name(self) -> str:
+ """Return the name of the source package."""
+ try:
+ return self._records.source_pkg or self.package.shortname
+ except IndexError:
+ return self.package.shortname
+
+ @property
+ def source_version(self) -> str:
+ """Return the version of the source package."""
+ try:
+ return self._records.source_ver or self._cand.ver_str
+ except IndexError:
+ return self._cand.ver_str
+
+ @property
+ def priority(self) -> str:
+ """Return the priority of the package, as string."""
+ return self._cand.priority_str
+
+ @property
+ def policy_priority(self) -> int:
+ """Return the internal policy priority as a number.
+ See apt_preferences(5) for more information about what it means.
+ """
+ return self.package._pcache._depcache.policy.get_priority(self._cand)
+
+ @property
+ def record(self) -> Record:
+ """Return a Record() object for this version.
+
+ Return a Record() object for this version which provides access
+ to the raw attributes of the candidate version
+ """
+ return Record(self._records.record)
+
+ def get_dependencies(self, *types: str) -> list[Dependency]:
+ """Return a list of Dependency objects for the given types.
+
+ Multiple types can be specified. Possible types are:
+ 'Breaks', 'Conflicts', 'Depends', 'Enhances', 'PreDepends',
+ 'Recommends', 'Replaces', 'Suggests'
+
+ Additional types might be added in the future.
+ """
+ depends_list = []
+ depends = self._cand.depends_list
+ for type_ in types:
+ try:
+ for dep_ver_list in depends[type_]:
+ base_deps = []
+ for dep_or in dep_ver_list:
+ base_deps.append(BaseDependency(self, dep_or))
+ depends_list.append(Dependency(self, base_deps, type_))
+ except KeyError:
+ pass
+ return depends_list
+
+ @property
+ def provides(self) -> list[str]:
+ """Return a list of names that this version provides."""
+ return [p[0] for p in self._cand.provides_list]
+
+ @property
+ def enhances(self) -> list[Dependency]:
+ """Return the list of enhances for the package version."""
+ return self.get_dependencies("Enhances")
+
+ @property
+ def dependencies(self) -> list[Dependency]:
+ """Return the dependencies of the package version."""
+ return self.get_dependencies("PreDepends", "Depends")
+
+ @property
+ def recommends(self) -> list[Dependency]:
+ """Return the recommends of the package version."""
+ return self.get_dependencies("Recommends")
+
+ @property
+ def suggests(self) -> list[Dependency]:
+ """Return the suggests of the package version."""
+ return self.get_dependencies("Suggests")
+
+ @property
+ def origins(self) -> list[Origin]:
+ """Return a list of origins for the package version."""
+ origins = []
+ for packagefile, _unused in self._cand.file_list:
+ origins.append(Origin(self.package, packagefile))
+ return origins
+
+ @property
+ def filename(self) -> str:
+ """Return the path to the file inside the archive.
+
+ .. versionadded:: 0.7.10
+ """
+ return self._records.filename
+
+ @property
+ def md5(self) -> str:
+ """Return the md5sum of the binary.
+
+ .. versionadded:: 0.7.10
+ """
+ return self._records.md5_hash
+
+ @property
+ def sha1(self) -> str:
+ """Return the sha1sum of the binary.
+
+ .. versionadded:: 0.7.10
+ """
+ return self._records.sha1_hash
+
+ @property
+ def sha256(self) -> str:
+ """Return the sha256sum of the binary.
+
+ .. versionadded:: 0.7.10
+ """
+ return self._records.sha256_hash
+
+ @property
+ def tasks(self) -> set[str]:
+ """Get the tasks of the package.
+
+ A set of the names of the tasks this package belongs to.
+
+ .. versionadded:: 0.8.0
+ """
+ return set(self.record["Task"].split())
+
+ def _uris(self) -> Iterator[str]:
+ """Return an iterator over all available urls.
+
+ .. versionadded:: 0.7.10
+ """
+ for packagefile, _unused in self._cand.file_list:
+ indexfile = self.package._pcache._list.find_index(packagefile)
+ if indexfile:
+ yield indexfile.archive_uri(self._records.filename)
+
+ @property
+ def uris(self) -> list[str]:
+ """Return a list of all available uris for the binary.
+
+ .. versionadded:: 0.7.10
+ """
+ return list(self._uris())
+
+ @property
+ def uri(self) -> str | None:
+ """Return a single URI for the binary.
+
+ .. versionadded:: 0.7.10
+ """
+ try:
+ return next(iter(self._uris()))
+ except StopIteration:
+ return None
+
+ def fetch_binary(
+ self,
+ destdir: str = "",
+ progress: AcquireProgress | None = None,
+ allow_unauthenticated: bool | None = None,
+ ) -> str:
+ """Fetch the binary version of the package.
+
+ The parameter *destdir* specifies the directory where the package will
+ be fetched to.
+
+ The parameter *progress* may refer to an apt_pkg.AcquireProgress()
+ object. If not specified or None, apt.progress.text.AcquireProgress()
+ is used.
+
+ The keyword-only parameter *allow_unauthenticated* specifies whether
+ to allow unauthenticated downloads. If not specified, it defaults to
+ the configuration option `APT::Get::AllowUnauthenticated`.
+
+ .. versionadded:: 0.7.10
+ """
+ if allow_unauthenticated is None:
+ allow_unauthenticated = apt_pkg.config.find_b(
+ "APT::Get::" "AllowUnauthenticated", False
+ )
+ base = os.path.basename(self._records.filename)
+ destfile = os.path.join(destdir, base)
+ if _file_is_same(destfile, self.size, self._records.hashes):
+ logging.debug("Ignoring already existing file: %s" % destfile)
+ return os.path.abspath(destfile)
+
+ # Verify that the index is actually trusted
+ pfile, offset = self._cand.file_list[0]
+ index = self.package._pcache._list.find_index(pfile)
+
+ if not (allow_unauthenticated or (index and index.is_trusted)):
+ raise UntrustedError(
+ "Could not fetch %s %s source package: "
+ "Source %r is not trusted"
+ % (
+ self.package.name,
+ self.version,
+ getattr(index, "describe", "<unknown>"),
+ )
+ )
+ if not self.uri:
+ raise ValueError("No URI for this binary.")
+ hashes = self._records.hashes
+ if not (allow_unauthenticated or hashes.usable):
+ raise UntrustedError(
+ "The item %r could not be fetched: " "No trusted hash found." % destfile
+ )
+ acq = apt_pkg.Acquire(progress or apt.progress.text.AcquireProgress())
+ acqfile = apt_pkg.AcquireFile(
+ acq, self.uri, hashes, self.size, base, destfile=destfile
+ )
+ acq.run()
+
+ if acqfile.status != acqfile.STAT_DONE:
+ raise FetchError(
+ "The item %r could not be fetched: %s"
+ % (acqfile.destfile, acqfile.error_text)
+ )
+
+ return os.path.abspath(destfile)
+
+ def fetch_source(
+ self,
+ destdir: str = "",
+ progress: AcquireProgress | None = None,
+ unpack: bool = True,
+ allow_unauthenticated: bool | None = None,
+ ) -> str:
+ """Get the source code of a package.
+
+ The parameter *destdir* specifies the directory where the source will
+ be fetched to.
+
+ The parameter *progress* may refer to an apt_pkg.AcquireProgress()
+ object. If not specified or None, apt.progress.text.AcquireProgress()
+ is used.
+
+ The parameter *unpack* describes whether the source should be unpacked
+ (``True``) or not (``False``). By default, it is unpacked.
+
+ If *unpack* is ``True``, the path to the extracted directory is
+ returned. Otherwise, the path to the .dsc file is returned.
+
+ The keyword-only parameter *allow_unauthenticated* specifies whether
+ to allow unauthenticated downloads. If not specified, it defaults to
+ the configuration option `APT::Get::AllowUnauthenticated`.
+ """
+ if allow_unauthenticated is None:
+ allow_unauthenticated = apt_pkg.config.find_b(
+ "APT::Get::" "AllowUnauthenticated", False
+ )
+
+ src = apt_pkg.SourceRecords()
+ acq = apt_pkg.Acquire(progress or apt.progress.text.AcquireProgress())
+
+ dsc = None
+ record = self._records
+ source_name = record.source_pkg or self.package.shortname
+ source_version = record.source_ver or self._cand.ver_str
+ source_lookup = src.lookup(source_name)
+
+ while source_lookup and source_version != src.version:
+ source_lookup = src.lookup(source_name)
+ if not source_lookup:
+ raise ValueError("No source for %r" % self)
+ files = list()
+
+ if not (allow_unauthenticated or src.index.is_trusted):
+ raise UntrustedError(
+ "Could not fetch %s %s source package: "
+ "Source %r is not trusted"
+ % (self.package.name, self.version, src.index.describe)
+ )
+ for fil in src.files:
+ base = os.path.basename(fil.path)
+ destfile = os.path.join(destdir, base)
+ if fil.type == "dsc":
+ dsc = destfile
+ if _file_is_same(destfile, fil.size, fil.hashes):
+ logging.debug("Ignoring already existing file: %s" % destfile)
+ continue
+
+ if not (allow_unauthenticated or fil.hashes.usable):
+ raise UntrustedError(
+ "The item %r could not be fetched: "
+ "No trusted hash found." % destfile
+ )
+ files.append(
+ apt_pkg.AcquireFile(
+ acq,
+ src.index.archive_uri(fil.path),
+ fil.hashes,
+ fil.size,
+ base,
+ destfile=destfile,
+ )
+ )
+ acq.run()
+
+ if dsc is None:
+ raise ValueError("No source for %r" % self)
+
+ for item in acq.items:
+ if item.status != item.STAT_DONE:
+ raise FetchError(
+ "The item %r could not be fetched: %s"
+ % (item.destfile, item.error_text)
+ )
+
+ if unpack:
+ outdir = src.package + "-" + apt_pkg.upstream_version(src.version)
+ outdir = os.path.join(destdir, outdir)
+ subprocess.check_call(["dpkg-source", "-x", dsc, outdir])
+ return os.path.abspath(outdir)
+ else:
+ return os.path.abspath(dsc)
+
+
+class VersionList(Sequence[Version]):
+ """Provide a mapping & sequence interface to all versions of a package.
+
+ This class can be used like a dictionary, where version strings are the
+ keys. It can also be used as a sequence, where integers are the keys.
+
+ You can also convert this to a dictionary or a list, using the usual way
+ of dict(version_list) or list(version_list). This is useful if you need
+ to access the version objects multiple times, because they do not have to
+ be recreated this way.
+
+ Examples ('package.versions' being a version list):
+ '0.7.92' in package.versions # Check whether 0.7.92 is a valid version.
+ package.versions[0] # Return first version or raise IndexError
+ package.versions[0:2] # Return a new VersionList for objects 0-2
+ package.versions['0.7.92'] # Return version 0.7.92 or raise KeyError
+ package.versions.keys() # All keys, as strings.
+ max(package.versions)
+ """
+
+ def __init__(self, package: Package, slice_: slice | None = None) -> None:
+ self._package = package # apt.package.Package()
+ self._versions = package._pkg.version_list # [apt_pkg.Version(), ...]
+ if slice_:
+ self._versions = self._versions[slice_]
+
+ def __getitem__(self, item: int | slice | str) -> Any:
+ # FIXME: Should not be returning Any, should have overloads; but
+ # pyflakes complains
+ if isinstance(item, slice):
+ return self.__class__(self._package, item)
+ try:
+ # Sequence interface, item is an integer
+ return Version(self._package, self._versions[item]) # type: ignore
+ except TypeError:
+ # Dictionary interface item is a string.
+ for ver in self._versions:
+ if ver.ver_str == item:
+ return Version(self._package, ver)
+ raise KeyError("Version: %r not found." % (item))
+
+ def __str__(self) -> str:
+ return "[%s]" % (", ".join(str(ver) for ver in self))
+
+ def __repr__(self) -> str:
+ return "<VersionList: %r>" % self.keys()
+
+ def __iter__(self) -> Iterator[Version]:
+ """Return an iterator over all value objects."""
+ return (Version(self._package, ver) for ver in self._versions)
+
+ def __contains__(self, item: object) -> bool:
+ if isinstance(item, Version): # Sequence interface
+ item = item.version
+ # Dictionary interface.
+ for ver in self._versions:
+ if ver.ver_str == item:
+ return True
+ return False
+
+ def __eq__(self, other: Any) -> bool:
+ return list(self) == list(other)
+
+ def __len__(self) -> int:
+ return len(self._versions)
+
+ # Mapping interface
+
+ def keys(self) -> list[str]:
+ """Return a list of all versions, as strings."""
+ return [ver.ver_str for ver in self._versions]
+
+ def get(self, key: str, default: Version | None = None) -> Version | None:
+ """Return the key or the default."""
+ try:
+ return self[key] # type: ignore # FIXME: should be deterined automatically # noqa
+ except LookupError:
+ return default
+
+
+class Package:
+ """Representation of a package in a cache.
+
+ This class provides methods and properties for working with a package. It
+ lets you mark the package for installation, check if it is installed, and
+ much more.
+ """
+
+ def __init__(self, pcache: apt.Cache, pkgiter: apt_pkg.Package) -> None:
+ """Init the Package object"""
+ self._pkg = pkgiter
+ self._pcache = pcache # python cache in cache.py
+ self._changelog = "" # Cached changelog
+
+ def __str__(self) -> str:
+ return self.name
+
+ def __repr__(self) -> str:
+ return "<Package: name:{!r} architecture={!r} id:{!r}>".format(
+ self._pkg.name,
+ self._pkg.architecture,
+ self._pkg.id,
+ )
+
+ def __lt__(self, other: Package) -> bool:
+ return self.name < other.name
+
+ @property
+ def candidate(self) -> Version | None:
+ """Return the candidate version of the package.
+
+ This property is writeable to allow you to set the candidate version
+ of the package. Just assign a Version() object, and it will be set as
+ the candidate version.
+ """
+ cand = self._pcache._depcache.get_candidate_ver(self._pkg)
+ if cand is not None:
+ return Version(self, cand)
+ return None
+
+ @candidate.setter
+ def candidate(self, version: Version) -> None:
+ """Set the candidate version of the package."""
+ self._pcache.cache_pre_change()
+ self._pcache._depcache.set_candidate_ver(self._pkg, version._cand)
+ self._pcache.cache_post_change()
+
+ @property
+ def installed(self) -> Version | None:
+ """Return the currently installed version of the package.
+
+ .. versionadded:: 0.7.9
+ """
+ if self._pkg.current_ver is not None:
+ return Version(self, self._pkg.current_ver)
+ return None
+
+ @property
+ def name(self) -> str:
+ """Return the name of the package, possibly including architecture.
+
+ If the package is not part of the system's preferred architecture,
+ return the same as :attr:`fullname`, otherwise return the same
+ as :attr:`shortname`
+
+ .. versionchanged:: 0.7.100.3
+
+ As part of multi-arch, this field now may include architecture
+ information.
+ """
+ return self._pkg.get_fullname(True)
+
+ @property
+ def fullname(self) -> str:
+ """Return the name of the package, including architecture.
+
+ Note that as for :meth:`architecture`, this returns the
+ native architecture for Architecture: all packages.
+
+ .. versionadded:: 0.7.100.3"""
+ return self._pkg.get_fullname(False)
+
+ @property
+ def shortname(self) -> str:
+ """Return the name of the package, without architecture.
+
+ .. versionadded:: 0.7.100.3"""
+ return self._pkg.name
+
+ @property
+ def id(self) -> int:
+ """Return a uniq ID for the package.
+
+ This can be used eg. to store additional information about the pkg."""
+ return self._pkg.id
+
+ @property
+ def essential(self) -> bool:
+ """Return True if the package is an essential part of the system."""
+ return self._pkg.essential
+
+ def architecture(self) -> str:
+ """Return the Architecture of the package.
+
+ Note that for Architecture: all packages, this returns the
+ native architecture, as they are internally treated like native
+ packages. To get the concrete architecture, look at the
+ :attr:`Version.architecture` attribute.
+
+ .. versionchanged:: 0.7.100.3
+ This is now the package's architecture in the multi-arch sense,
+ previously it was the architecture of the candidate version
+ and deprecated.
+ """
+ return self._pkg.architecture
+
+ # depcache states
+
+ @property
+ def marked_install(self) -> bool:
+ """Return ``True`` if the package is marked for install."""
+ return self._pcache._depcache.marked_install(self._pkg)
+
+ @property
+ def marked_upgrade(self) -> bool:
+ """Return ``True`` if the package is marked for upgrade."""
+ return self._pcache._depcache.marked_upgrade(self._pkg)
+
+ @property
+ def marked_delete(self) -> bool:
+ """Return ``True`` if the package is marked for delete."""
+ return self._pcache._depcache.marked_delete(self._pkg)
+
+ @property
+ def marked_keep(self) -> bool:
+ """Return ``True`` if the package is marked for keep."""
+ return self._pcache._depcache.marked_keep(self._pkg)
+
+ @property
+ def marked_downgrade(self) -> bool:
+ """Package is marked for downgrade"""
+ return self._pcache._depcache.marked_downgrade(self._pkg)
+
+ @property
+ def marked_reinstall(self) -> bool:
+ """Return ``True`` if the package is marked for reinstall."""
+ return self._pcache._depcache.marked_reinstall(self._pkg)
+
+ @property
+ def is_installed(self) -> bool:
+ """Return ``True`` if the package is installed."""
+ return self._pkg.current_ver is not None
+
+ @property
+ def is_upgradable(self) -> bool:
+ """Return ``True`` if the package is upgradable."""
+ return self.is_installed and self._pcache._depcache.is_upgradable(self._pkg)
+
+ @property
+ def is_auto_removable(self) -> bool:
+ """Return ``True`` if the package is no longer required.
+
+ If the package has been installed automatically as a dependency of
+ another package, and if no packages depend on it anymore, the package
+ is no longer required.
+ """
+ return (
+ self.is_installed or self.marked_install
+ ) and self._pcache._depcache.is_garbage(self._pkg)
+
+ @property
+ def is_auto_installed(self) -> bool:
+ """Return whether the package is marked as automatically installed."""
+ return self._pcache._depcache.is_auto_installed(self._pkg)
+
+ @property
+ def phasing_applied(self) -> bool:
+ """Return ``True`` if the package update is being phased."""
+ return self._pcache._depcache.phasing_applied(self._pkg)
+
+ # sizes
+
+ @property
+ def installed_files(self) -> list[str]:
+ """Return a list of files installed by the package.
+
+ Return a list of unicode names of the files which have
+ been installed by this package
+ """
+ for name in self.name, self.fullname:
+ path = "/var/lib/dpkg/info/%s.list" % name
+ try:
+ with open(path, "rb") as file_list:
+ return file_list.read().decode("utf-8").strip().split("\n")
+ except OSError:
+ continue
+
+ return []
+
+ def get_changelog(
+ self, uri: str | None = None, cancel_lock: threading.Event | None = None
+ ) -> str:
+ """
+ Download the changelog of the package and return it as unicode
+ string.
+
+ The parameter *uri* refers to the uri of the changelog file. It may
+ contain multiple named variables which will be substitued. These
+ variables are (src_section, prefix, src_pkg, src_ver). An example is
+ the Ubuntu changelog::
+
+ "http://changelogs.ubuntu.com/changelogs/pool" \\
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s" \\
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+
+ The parameter *cancel_lock* refers to an instance of threading.Event,
+ which if set, prevents the download.
+ """
+ # Return a cached changelog if available
+ if self._changelog != "":
+ return self._changelog
+
+ if not self.candidate:
+ return _("The list of changes is not available")
+
+ if uri is None:
+ if self.candidate.origins[0].origin == "Debian":
+ uri = (
+ "http://packages.debian.org/changelogs/pool"
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s"
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+ )
+ elif self.candidate.origins[0].origin == "Ubuntu":
+ uri = (
+ "http://changelogs.ubuntu.com/changelogs/pool"
+ "/%(src_section)s/%(prefix)s/%(src_pkg)s"
+ "/%(src_pkg)s_%(src_ver)s/changelog"
+ )
+ else:
+ res = _("The list of changes is not available")
+ if isinstance(res, str):
+ return res
+ else:
+ return res.decode("utf-8")
+
+ # get the src package name
+ src_pkg = self.candidate.source_name
+
+ # assume "main" section
+ src_section = "main"
+ # use the section of the candidate as a starting point
+ section = self.candidate.section
+
+ # get the source version
+ src_ver = self.candidate.source_version
+
+ try:
+ # try to get the source version of the pkg, this differs
+ # for some (e.g. libnspr4 on ubuntu)
+ # this feature only works if the correct deb-src are in the
+ # sources.list otherwise we fall back to the binary version number
+ src_records = apt_pkg.SourceRecords()
+ except SystemError:
+ pass
+ else:
+ while src_records.lookup(src_pkg):
+ if not src_records.version:
+ continue
+ if self.candidate.source_version == src_records.version:
+ # Direct match, use it and do not do more lookups.
+ src_ver = src_records.version
+ section = src_records.section
+ break
+ if apt_pkg.version_compare(src_records.version, src_ver) > 0:
+ # The version is higher, it seems to match.
+ src_ver = src_records.version
+ section = src_records.section
+
+ section_split = section.split("/", 1)
+ if len(section_split) > 1:
+ src_section = section_split[0]
+ del section_split
+
+ # lib is handled special
+ prefix = src_pkg[0]
+ if src_pkg.startswith("lib"):
+ prefix = "lib" + src_pkg[3]
+
+ # stip epoch
+ src_ver_split = src_ver.split(":", 1)
+ if len(src_ver_split) > 1:
+ src_ver = "".join(src_ver_split[1:])
+ del src_ver_split
+
+ uri = uri % {
+ "src_section": src_section,
+ "prefix": prefix,
+ "src_pkg": src_pkg,
+ "src_ver": src_ver,
+ }
+
+ timeout = socket.getdefaulttimeout()
+
+ # FIXME: when python2.4 vanishes from the archive,
+ # merge this into a single try..finally block (pep 341)
+ try:
+ try:
+ # Set a timeout for the changelog download
+ socket.setdefaulttimeout(2)
+
+ # Check if the download was canceled
+ if cancel_lock and cancel_lock.is_set():
+ return ""
+ # FIXME: python3.2: Should be closed manually
+ changelog_file = urlopen(uri)
+ # do only get the lines that are new
+ changelog = ""
+ regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(src_pkg))
+ while True:
+ # Check if the download was canceled
+ if cancel_lock and cancel_lock.is_set():
+ return ""
+ # Read changelog line by line
+ line_raw = changelog_file.readline()
+ if not line_raw:
+ break
+ # The changelog is encoded in utf-8, but since there isn't
+ # any http header, urllib2 seems to treat it as ascii
+ line = line_raw.decode("utf-8")
+
+ # print line.encode('utf-8')
+ match = re.match(regexp, line)
+ if match:
+ # strip epoch from installed version
+ # and from changelog too
+ installed = getattr(self.installed, "version", None)
+ if installed and ":" in installed:
+ installed = installed.split(":", 1)[1]
+ changelog_ver = match.group(1)
+ if changelog_ver and ":" in changelog_ver:
+ changelog_ver = changelog_ver.split(":", 1)[1]
+
+ if (
+ installed
+ and apt_pkg.version_compare(changelog_ver, installed) <= 0
+ ):
+ break
+ # EOF (shouldn't really happen)
+ changelog += line
+
+ # Print an error if we failed to extract a changelog
+ if len(changelog) == 0:
+ changelog = _("The list of changes is not available")
+ if not isinstance(changelog, str):
+ changelog = changelog.decode("utf-8")
+ self._changelog = changelog
+
+ except HTTPError:
+ if self.candidate.origins[0].origin == "Ubuntu":
+ res = _(
+ "The list of changes is not available yet.\n\n"
+ "Please use "
+ "http://launchpad.net/ubuntu/+source/%s/"
+ "%s/+changelog\n"
+ "until the changes become available or try again "
+ "later."
+ ) % (src_pkg, src_ver)
+ else:
+ res = _("The list of changes is not available")
+ if isinstance(res, str):
+ return res
+ else:
+ return res.decode("utf-8")
+ except (OSError, BadStatusLine):
+ res = _(
+ "Failed to download the list of changes. \nPlease "
+ "check your Internet connection."
+ )
+ if isinstance(res, str):
+ return res
+ else:
+ return res.decode("utf-8")
+ finally:
+ socket.setdefaulttimeout(timeout)
+ return self._changelog
+
+ @property
+ def versions(self) -> VersionList:
+ """Return a VersionList() object for all available versions.
+
+ .. versionadded:: 0.7.9
+ """
+ return VersionList(self)
+
+ @property
+ def is_inst_broken(self) -> bool:
+ """Return True if the to-be-installed package is broken."""
+ return self._pcache._depcache.is_inst_broken(self._pkg)
+
+ @property
+ def is_now_broken(self) -> bool:
+ """Return True if the installed package is broken."""
+ return self._pcache._depcache.is_now_broken(self._pkg)
+
+ @property
+ def has_config_files(self) -> bool:
+ """Checks whether the package is is the config-files state."""
+ return self._pkg.current_state == apt_pkg.CURSTATE_CONFIG_FILES
+
+ # depcache actions
+
+ def mark_keep(self) -> None:
+ """Mark a package for keep."""
+ self._pcache.cache_pre_change()
+ self._pcache._depcache.mark_keep(self._pkg)
+ self._pcache.cache_post_change()
+
+ def mark_delete(self, auto_fix: bool = True, purge: bool = False) -> None:
+ """Mark a package for deletion.
+
+ If *auto_fix* is ``True``, the resolver will be run, trying to fix
+ broken packages. This is the default.
+
+ If *purge* is ``True``, remove the configuration files of the package
+ as well. The default is to keep the configuration.
+ """
+ self._pcache.cache_pre_change()
+ self._pcache._depcache.mark_delete(self._pkg, purge)
+ # try to fix broken stuffsta
+ if auto_fix and self._pcache._depcache.broken_count > 0:
+ fix = apt_pkg.ProblemResolver(self._pcache._depcache)
+ fix.clear(self._pkg)
+ fix.protect(self._pkg)
+ fix.remove(self._pkg)
+ fix.resolve()
+ self._pcache.cache_post_change()
+
+ def mark_install(
+ self, auto_fix: bool = True, auto_inst: bool = True, from_user: bool = True
+ ) -> None:
+ """Mark a package for install.
+
+ If *autoFix* is ``True``, the resolver will be run, trying to fix
+ broken packages. This is the default.
+
+ If *autoInst* is ``True``, the dependencies of the packages will be
+ installed automatically. This is the default.
+
+ If *fromUser* is ``True``, this package will not be marked as
+ automatically installed. This is the default. Set it to False if you
+ want to be able to automatically remove the package at a later stage
+ when no other package depends on it.
+ """
+ self._pcache.cache_pre_change()
+ self._pcache._depcache.mark_install(self._pkg, auto_inst, from_user)
+ # try to fix broken stuff
+ if auto_fix and self._pcache._depcache.broken_count > 0:
+ fixer = apt_pkg.ProblemResolver(self._pcache._depcache)
+ fixer.clear(self._pkg)
+ fixer.protect(self._pkg)
+ fixer.resolve(True)
+ self._pcache.cache_post_change()
+
+ def mark_upgrade(self, from_user: bool = True) -> None:
+ """Mark a package for upgrade."""
+ if self.is_upgradable:
+ auto = self.is_auto_installed
+ self.mark_install(from_user=from_user)
+ self.mark_auto(auto)
+ else:
+ # FIXME: we may want to throw a exception here
+ sys.stderr.write(
+ ("MarkUpgrade() called on a non-upgradeable pkg: " "'%s'\n")
+ % self._pkg.name
+ )
+
+ def mark_auto(self, auto: bool = True) -> None:
+ """Mark a package as automatically installed.
+
+ Call this function to mark a package as automatically installed. If the
+ optional parameter *auto* is set to ``False``, the package will not be
+ marked as automatically installed anymore. The default is ``True``.
+ """
+ self._pcache._depcache.mark_auto(self._pkg, auto)
+
+ def commit(self, fprogress: AcquireProgress, iprogress: InstallProgress) -> None:
+ """Commit the changes.
+
+ The parameter *fprogress* refers to a apt_pkg.AcquireProgress() object,
+ like apt.progress.text.AcquireProgress().
+
+ The parameter *iprogress* refers to an InstallProgress() object, as
+ found in apt.progress.base.
+ """
+ self._pcache._depcache.commit(fprogress, iprogress)
+
+
+@no_type_check
+def _test():
+ """Self-test."""
+ print("Self-test for the Package modul")
+ import random
+
+ apt_pkg.init()
+ progress = apt.progress.text.OpProgress()
+ cache = apt.Cache(progress)
+ pkg = cache["apt-utils"]
+ print("Name: %s " % pkg.name)
+ print("ID: %s " % pkg.id)
+ print("Priority (Candidate): %s " % pkg.candidate.priority)
+ print("Priority (Installed): %s " % pkg.installed.priority)
+ print("Installed: %s " % pkg.installed.version)
+ print("Candidate: %s " % pkg.candidate.version)
+ print("CandidateDownloadable: %s" % pkg.candidate.downloadable)
+ print("CandidateOrigins: %s" % pkg.candidate.origins)
+ print("SourcePkg: %s " % pkg.candidate.source_name)
+ print("Section: %s " % pkg.section)
+ print("Summary: %s" % pkg.candidate.summary)
+ print("Description (formatted) :\n%s" % pkg.candidate.description)
+ print("Description (unformatted):\n%s" % pkg.candidate.raw_description)
+ print("InstalledSize: %s " % pkg.candidate.installed_size)
+ print("PackageSize: %s " % pkg.candidate.size)
+ print("Dependencies: %s" % pkg.installed.dependencies)
+ print("Recommends: %s" % pkg.installed.recommends)
+ for dep in pkg.candidate.dependencies:
+ print(
+ ",".join(
+ f"{o.name} ({o.version}) ({o.relation}) ({o.pre_depend})"
+ for o in dep.or_dependencies
+ )
+ )
+ print("arch: %s" % pkg.candidate.architecture)
+ print("homepage: %s" % pkg.candidate.homepage)
+ print("rec: ", pkg.candidate.record)
+
+ print(cache["2vcard"].get_changelog())
+ for i in True, False:
+ print("Running install on random upgradable pkgs with AutoFix: ", i)
+ for pkg in cache:
+ if pkg.is_upgradable:
+ if random.randint(0, 1) == 1:
+ pkg.mark_install(i)
+ print("Broken: %s " % cache._depcache.broken_count)
+ print("InstCount: %s " % cache._depcache.inst_count)
+
+ print()
+ # get a new cache
+ for i in True, False:
+ print("Randomly remove some packages with AutoFix: %s" % i)
+ cache = apt.Cache(progress)
+ for name in cache.keys():
+ if random.randint(0, 1) == 1:
+ try:
+ cache[name].mark_delete(i)
+ except SystemError:
+ print("Error trying to remove: %s " % name)
+ print("Broken: %s " % cache._depcache.broken_count)
+ print("DelCount: %s " % cache._depcache.del_count)
+
+
+# self-test
+if __name__ == "__main__":
+ _test()
diff --git a/apt/progress/__init__.py b/apt/progress/__init__.py
new file mode 100644
index 0000000..d1687d5
--- /dev/null
+++ b/apt/progress/__init__.py
@@ -0,0 +1,28 @@
+# apt/progress/__init__.py - Initialization file for apt.progress.
+#
+# Copyright (c) 2009 Julian Andres Klode <jak@debian.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Progress reporting.
+
+This package provides progress reporting for the python-apt package. The module
+'base' provides classes with no output, and the module 'text' provides classes
+for terminals, etc.
+"""
+
+from collections.abc import Sequence
+
+__all__: Sequence[str] = []
diff --git a/apt/progress/base.py b/apt/progress/base.py
new file mode 100644
index 0000000..ede5e5c
--- /dev/null
+++ b/apt/progress/base.py
@@ -0,0 +1,332 @@
+# apt/progress/base.py - Base classes for progress reporting.
+#
+# Copyright (C) 2009 Julian Andres Klode <jak@debian.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+# pylint: disable-msg = R0201
+"""Base classes for progress reporting.
+
+Custom progress classes should inherit from these classes. They can also be
+used as dummy progress classes which simply do nothing.
+"""
+from __future__ import annotations
+
+import errno
+import fcntl
+import io
+import os
+import re
+import select
+import sys
+
+import apt_pkg
+
+__all__ = ["AcquireProgress", "CdromProgress", "InstallProgress", "OpProgress"]
+
+
+class AcquireProgress:
+ """Monitor object for downloads controlled by the Acquire class.
+
+ This is an mostly abstract class. You should subclass it and implement the
+ methods to get something useful.
+ """
+
+ current_bytes = current_cps = fetched_bytes = last_bytes = total_bytes = 0.0
+ current_items = elapsed_time = total_items = 0
+
+ def done(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Invoked when an item is successfully and completely fetched."""
+
+ def fail(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Invoked when an item could not be fetched."""
+
+ def fetch(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Invoked when some of the item's data is fetched."""
+
+ def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Invoked when an item is confirmed to be up-to-date.
+
+ Invoked when an item is confirmed to be up-to-date. For instance,
+ when an HTTP download is informed that the file on the server was
+ not modified.
+ """
+
+ def media_change(self, media: str, drive: str) -> bool:
+ """Prompt the user to change the inserted removable media.
+
+ The parameter 'media' decribes the name of the media type that
+ should be changed, whereas the parameter 'drive' should be the
+ identifying name of the drive whose media should be changed.
+
+ This method should not return until the user has confirmed to the user
+ interface that the media change is complete. It must return True if
+ the user confirms the media change, or False to cancel it.
+ """
+ return False
+
+ def pulse(self, owner: apt_pkg.Acquire) -> bool:
+ """Periodically invoked while the Acquire process is underway.
+
+ This method gets invoked while the Acquire progress given by the
+ parameter 'owner' is underway. It should display information about
+ the current state.
+
+ This function returns a boolean value indicating whether the
+ acquisition should be continued (True) or cancelled (False).
+ """
+ return True
+
+ def start(self) -> None:
+ """Invoked when the Acquire process starts running."""
+ # Reset all our values.
+ self.current_bytes = 0.0
+ self.current_cps = 0.0
+ self.current_items = 0
+ self.elapsed_time = 0
+ self.fetched_bytes = 0.0
+ self.last_bytes = 0.0
+ self.total_bytes = 0.0
+ self.total_items = 0
+
+ def stop(self) -> None:
+ """Invoked when the Acquire process stops running."""
+
+
+class CdromProgress:
+ """Base class for reporting the progress of adding a cdrom.
+
+ Can be used with apt_pkg.Cdrom to produce an utility like apt-cdrom. The
+ attribute 'total_steps' defines the total number of steps and can be used
+ in update() to display the current progress.
+ """
+
+ total_steps = 0
+
+ def ask_cdrom_name(self) -> str | None:
+ """Ask for the name of the cdrom.
+
+ If a name has been provided, return it. Otherwise, return None to
+ cancel the operation.
+ """
+
+ def change_cdrom(self) -> bool:
+ """Ask for the CD-ROM to be changed.
+
+ Return True once the cdrom has been changed or False to cancel the
+ operation.
+ """
+ return False
+
+ def update(self, text: str, current: int) -> None:
+ """Periodically invoked to update the interface.
+
+ The string 'text' defines the text which should be displayed. The
+ integer 'current' defines the number of completed steps.
+ """
+
+
+class InstallProgress:
+ """Class to report the progress of installing packages."""
+
+ child_pid, percent, select_timeout, status = 0, 0.0, 0.1, ""
+
+ def __init__(self) -> None:
+ (self.statusfd, self.writefd) = os.pipe()
+ # These will leak fds, but fixing this safely requires API changes.
+ self.write_stream: io.TextIOBase = os.fdopen(self.writefd, "w")
+ self.status_stream: io.TextIOBase = os.fdopen(self.statusfd, "r") # noqa
+ fcntl.fcntl(self.statusfd, fcntl.F_SETFL, os.O_NONBLOCK)
+
+ def start_update(self) -> None:
+ """(Abstract) Start update."""
+
+ def finish_update(self) -> None:
+ """(Abstract) Called when update has finished."""
+
+ def __enter__(self) -> InstallProgress:
+ return self
+
+ def __exit__(self, type: object, value: object, traceback: object) -> None:
+ self.write_stream.close()
+ self.status_stream.close()
+
+ def error(self, pkg: str, errormsg: str) -> None:
+ """(Abstract) Called when a error is detected during the install."""
+
+ def conffile(self, current: str, new: str) -> None:
+ """(Abstract) Called when a conffile question from dpkg is detected."""
+
+ def status_change(self, pkg: str, percent: float, status: str) -> None:
+ """(Abstract) Called when the APT status changed."""
+
+ def dpkg_status_change(self, pkg: str, status: str) -> None:
+ """(Abstract) Called when the dpkg status changed."""
+
+ def processing(self, pkg: str, stage: str) -> None:
+ """(Abstract) Sent just before a processing stage starts.
+
+ The parameter 'stage' is one of "upgrade", "install"
+ (both sent before unpacking), "configure", "trigproc", "remove",
+ "purge". This method is used for dpkg only.
+ """
+
+ def run(self, obj: apt_pkg.PackageManager | bytes | str) -> int:
+ """Install using the object 'obj'.
+
+ This functions runs install actions. The parameter 'obj' may either
+ be a PackageManager object in which case its do_install() method is
+ called or the path to a deb file.
+
+ If the object is a PackageManager, the functions returns the result
+ of calling its do_install() method. Otherwise, the function returns
+ the exit status of dpkg. In both cases, 0 means that there were no
+ problems.
+ """
+ pid = self.fork()
+ if pid == 0:
+ try:
+ # PEP-446 implemented in Python 3.4 made all descriptors
+ # CLOEXEC, but we need to be able to pass writefd to dpkg
+ # when we spawn it
+ os.set_inheritable(self.writefd, True)
+ except AttributeError: # if we don't have os.set_inheritable()
+ pass
+ # pm.do_install might raise a exception,
+ # when this happens, we need to catch
+ # it, otherwise os._exit() is not run
+ # and the execution continues in the
+ # parent code leading to very confusing bugs
+ try:
+ os._exit(obj.do_install(self.write_stream.fileno())) # type: ignore # noqa
+ except AttributeError:
+ os._exit(
+ os.spawnlp(
+ os.P_WAIT,
+ "dpkg",
+ "dpkg",
+ "--status-fd",
+ str(self.write_stream.fileno()),
+ "-i",
+ obj, # type: ignore # noqa
+ )
+ )
+ except Exception as e:
+ sys.stderr.write("%s\n" % e)
+ os._exit(apt_pkg.PackageManager.RESULT_FAILED)
+
+ self.child_pid = pid
+ res = self.wait_child()
+ return os.WEXITSTATUS(res)
+
+ def fork(self) -> int:
+ """Fork."""
+ return os.fork()
+
+ def update_interface(self) -> None:
+ """Update the interface."""
+ try:
+ line = self.status_stream.readline()
+ except OSError as err:
+ # resource temporarly unavailable is ignored
+ if err.errno != errno.EAGAIN and err.errno != errno.EWOULDBLOCK:
+ print(err.strerror)
+ return
+
+ pkgname = status = status_str = percent = base = ""
+
+ if line.startswith("pm"):
+ try:
+ (status, pkgname, percent, status_str) = line.split(":", 3)
+ except ValueError:
+ # silently ignore lines that can't be parsed
+ return
+ elif line.startswith("status"):
+ try:
+ (base, pkgname, status, status_str) = line.split(":", 3)
+ except ValueError:
+ (base, pkgname, status) = line.split(":", 2)
+ elif line.startswith("processing"):
+ (status, status_str, pkgname) = line.split(":", 2)
+ self.processing(pkgname.strip(), status_str.strip())
+
+ # Always strip the status message
+ pkgname = pkgname.strip()
+ status_str = status_str.strip()
+ status = status.strip()
+
+ if status == "pmerror" or status == "error":
+ self.error(pkgname, status_str)
+ elif status == "conffile-prompt" or status == "pmconffile":
+ match = re.match("\\s*'(.*)'\\s*'(.*)'.*", status_str)
+ if match:
+ self.conffile(match.group(1), match.group(2))
+ elif status == "pmstatus":
+ # FIXME: Float comparison
+ if float(percent) != self.percent or status_str != self.status:
+ self.status_change(pkgname, float(percent), status_str.strip())
+ self.percent = float(percent)
+ self.status = status_str.strip()
+ elif base == "status":
+ self.dpkg_status_change(pkgname, status)
+
+ def wait_child(self) -> int:
+ """Wait for child progress to exit.
+
+ This method is responsible for calling update_interface() from time to
+ time. It exits once the child has exited. The return values is the
+ full status returned from os.waitpid() (not only the return code).
+ """
+ (pid, res) = (0, 0)
+ while True:
+ try:
+ select.select([self.status_stream], [], [], self.select_timeout)
+ except OSError as error:
+ (errno_, _errstr) = error.args
+ if errno_ != errno.EINTR:
+ raise
+
+ self.update_interface()
+ try:
+ (pid, res) = os.waitpid(self.child_pid, os.WNOHANG)
+ if pid == self.child_pid:
+ break
+ except OSError as err:
+ if err.errno == errno.ECHILD:
+ break
+ if err.errno != errno.EINTR:
+ raise
+
+ return res
+
+
+class OpProgress:
+ """Monitor objects for operations.
+
+ Display the progress of operations such as opening the cache."""
+
+ major_change, op, percent, subop = False, "", 0.0, ""
+
+ def update(self, percent: float | None = None) -> None:
+ """Called periodically to update the user interface.
+
+ You may use the optional argument 'percent' to set the attribute
+ 'percent' in this call.
+ """
+ if percent is not None:
+ self.percent = percent
+
+ def done(self) -> None:
+ """Called once an operation has been completed."""
diff --git a/apt/progress/text.py b/apt/progress/text.py
new file mode 100644
index 0000000..ea1a176
--- /dev/null
+++ b/apt/progress/text.py
@@ -0,0 +1,294 @@
+# Copyright (c) 2009 Julian Andres Klode <jak@debian.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Progress reporting for text interfaces."""
+import io
+import os
+import signal
+import sys
+import types
+from collections.abc import Callable
+
+import apt_pkg
+
+from apt.progress import base
+
+__all__ = ["AcquireProgress", "CdromProgress", "OpProgress"]
+
+
+def _(msg: str) -> str:
+ """Translate the message, also try apt if translation is missing."""
+ res = apt_pkg.gettext(msg)
+ if res == msg:
+ res = apt_pkg.gettext(msg, "apt")
+ return res
+
+
+class TextProgress:
+ """Internal Base class for text progress classes."""
+
+ def __init__(self, outfile: io.TextIOBase | None = None) -> None:
+ self._file = outfile or sys.stdout
+ self._width = 0
+
+ def _write(self, msg: str, newline: bool = True, maximize: bool = False) -> None:
+ """Write the message on the terminal, fill remaining space."""
+ self._file.write("\r")
+ self._file.write(msg)
+
+ # Fill remaining stuff with whitespace
+ if self._width > len(msg):
+ self._file.write((self._width - len(msg)) * " ")
+ elif maximize: # Needed for OpProgress.
+ self._width = max(self._width, len(msg))
+ if newline:
+ self._file.write("\n")
+ else:
+ # self._file.write("\r")
+ self._file.flush()
+
+
+class OpProgress(base.OpProgress, TextProgress):
+ """Operation progress reporting.
+
+ This closely resembles OpTextProgress in libapt-pkg.
+ """
+
+ def __init__(self, outfile: io.TextIOBase | None = None) -> None:
+ TextProgress.__init__(self, outfile)
+ base.OpProgress.__init__(self)
+ self.old_op = ""
+
+ def update(self, percent: float | None = None) -> None:
+ """Called periodically to update the user interface."""
+ base.OpProgress.update(self, percent)
+ if self.major_change and self.old_op:
+ self._write(self.old_op)
+ self._write("%s... %i%%\r" % (self.op, self.percent), False, True)
+ self.old_op = self.op
+
+ def done(self) -> None:
+ """Called once an operation has been completed."""
+ base.OpProgress.done(self)
+ if self.old_op:
+ self._write(_("%c%s... Done") % ("\r", self.old_op), True, True)
+ self.old_op = ""
+
+
+class AcquireProgress(base.AcquireProgress, TextProgress):
+ """AcquireProgress for the text interface."""
+
+ def __init__(self, outfile: io.TextIOBase | None = None) -> None:
+ TextProgress.__init__(self, outfile)
+ base.AcquireProgress.__init__(self)
+ self._signal: (
+ Callable[[int, types.FrameType | None], None] | int | signal.Handlers | None
+ ) = None # noqa
+ self._width = 80
+ self._id = 1
+
+ def start(self) -> None:
+ """Start an Acquire progress.
+
+ In this case, the function sets up a signal handler for SIGWINCH, i.e.
+ window resize signals. And it also sets id to 1.
+ """
+ base.AcquireProgress.start(self)
+ self._signal = signal.signal(signal.SIGWINCH, self._winch)
+ # Get the window size.
+ self._winch()
+ self._id = 1
+
+ def _winch(self, *dummy: object) -> None:
+ """Signal handler for window resize signals."""
+ if hasattr(self._file, "fileno") and os.isatty(self._file.fileno()):
+ import fcntl
+ import struct
+ import termios
+
+ buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * b" ") # noqa
+ dummy, col, dummy, dummy = struct.unpack("hhhh", buf)
+ self._width = col - 1 # 1 for the cursor
+
+ def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Called when an item is update (e.g. not modified on the server)."""
+ base.AcquireProgress.ims_hit(self, item)
+ line = _("Hit ") + item.description
+ if item.owner.filesize:
+ line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize)
+ self._write(line)
+
+ def fail(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Called when an item is failed."""
+ base.AcquireProgress.fail(self, item)
+ if item.owner.status == item.owner.STAT_DONE:
+ self._write(_("Ign ") + item.description)
+ else:
+ self._write(_("Err ") + item.description)
+ self._write(" %s" % item.owner.error_text)
+
+ def fetch(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Called when some of the item's data is fetched."""
+ base.AcquireProgress.fetch(self, item)
+ # It's complete already (e.g. Hit)
+ if item.owner.complete:
+ return
+ item.owner.id = self._id
+ self._id += 1
+ line = _("Get:") + f"{item.owner.id} {item.description}"
+ if item.owner.filesize:
+ line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize)
+
+ self._write(line)
+
+ def pulse(self, owner: apt_pkg.Acquire) -> bool:
+ """Periodically invoked while the Acquire process is underway.
+
+ Return False if the user asked to cancel the whole Acquire process."""
+ base.AcquireProgress.pulse(self, owner)
+ # only show progress on a tty to not clutter log files etc
+ if hasattr(self._file, "fileno") and not os.isatty(self._file.fileno()):
+ return True
+
+ # calculate progress
+ percent = ((self.current_bytes + self.current_items) * 100.0) / float(
+ self.total_bytes + self.total_items
+ )
+
+ shown = False
+ tval = "%i%%" % percent
+ end = ""
+ if self.current_cps:
+ eta = int(float(self.total_bytes - self.current_bytes) / self.current_cps)
+ end = " {}B/s {}".format(
+ apt_pkg.size_to_str(self.current_cps),
+ apt_pkg.time_to_str(eta),
+ )
+
+ for worker in owner.workers:
+ val = ""
+ if not worker.current_item:
+ if worker.status:
+ val = " [%s]" % worker.status
+ if len(tval) + len(val) + len(end) >= self._width:
+ break
+ tval += val
+ shown = True
+ continue
+ shown = True
+
+ if worker.current_item.owner.id:
+ val += " [%i %s" % (
+ worker.current_item.owner.id,
+ worker.current_item.shortdesc,
+ )
+ else:
+ val += " [%s" % worker.current_item.description
+ if worker.current_item.owner.active_subprocess:
+ val += " %s" % worker.current_item.owner.active_subprocess
+
+ val += " %sB" % apt_pkg.size_to_str(worker.current_size)
+
+ # Add the total size and percent
+ if worker.total_size and not worker.current_item.owner.complete:
+ val += "/%sB %i%%" % (
+ apt_pkg.size_to_str(worker.total_size),
+ worker.current_size * 100.0 / worker.total_size,
+ )
+
+ val += "]"
+
+ if len(tval) + len(val) + len(end) >= self._width:
+ # Display as many items as screen width
+ break
+ else:
+ tval += val
+
+ if not shown:
+ tval += _(" [Working]")
+
+ if self.current_cps:
+ tval += (self._width - len(end) - len(tval)) * " " + end
+
+ self._write(tval, False)
+ return True
+
+ def media_change(self, medium: str, drive: str) -> bool:
+ """Prompt the user to change the inserted removable media."""
+ base.AcquireProgress.media_change(self, medium, drive)
+ self._write(
+ _(
+ "Media change: please insert the disc labeled\n"
+ " '%s'\n"
+ "in the drive '%s' and press enter\n"
+ )
+ % (medium, drive)
+ )
+ return input() not in ("c", "C")
+
+ def stop(self) -> None:
+ """Invoked when the Acquire process stops running."""
+ base.AcquireProgress.stop(self)
+ # Trick for getting a translation from apt
+ self._write(
+ (
+ _("Fetched %sB in %s (%sB/s)\n")
+ % (
+ apt_pkg.size_to_str(self.fetched_bytes),
+ apt_pkg.time_to_str(self.elapsed_time),
+ apt_pkg.size_to_str(self.current_cps),
+ )
+ ).rstrip("\n")
+ )
+
+ # Delete the signal again.
+ import signal
+
+ signal.signal(signal.SIGWINCH, self._signal)
+
+
+class CdromProgress(base.CdromProgress, TextProgress):
+ """Text CD-ROM progress."""
+
+ def ask_cdrom_name(self) -> str | None:
+ """Ask the user to provide a name for the disc."""
+ base.CdromProgress.ask_cdrom_name(self)
+ self._write(
+ _(
+ "Please provide a name for this medium, such as "
+ "'Debian 2.1r1 Disk 1'"
+ ),
+ False,
+ )
+ try:
+ return str(input(":"))
+ except KeyboardInterrupt:
+ return None
+
+ def update(self, text: str, current: int) -> None:
+ """Set the current progress."""
+ base.CdromProgress.update(self, text, current)
+ if text:
+ self._write(text, False)
+
+ def change_cdrom(self) -> bool:
+ """Ask the user to change the CD-ROM."""
+ base.CdromProgress.change_cdrom(self)
+ self._write(_("Please insert an installation medium and press enter"), False)
+ try:
+ return bool(input() == "")
+ except KeyboardInterrupt:
+ return False
diff --git a/apt/py.typed b/apt/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/apt/py.typed
diff --git a/apt/utils.py b/apt/utils.py
new file mode 100644
index 0000000..5b1fd46
--- /dev/null
+++ b/apt/utils.py
@@ -0,0 +1,100 @@
+# Copyright (C) 2009 Canonical
+#
+# Authors:
+# Michael Vogt
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+# details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+import datetime
+import os
+
+import apt_pkg
+
+import apt
+
+
+def get_maintenance_end_date(
+ release_date: datetime.datetime, m_months: int
+) -> tuple[int, int]:
+ """
+ get the (year, month) tuple when the maintenance for the distribution
+ ends. Needs the data of the release and the number of months that
+ its is supported as input
+ """
+ # calc end date
+ years = m_months // 12
+ months = m_months % 12
+ support_end_year = release_date.year + years + (release_date.month + months) // 12
+ support_end_month = (release_date.month + months) % 12
+ # special case: this happens when e.g. doing 2010-06 + 18 months
+ if support_end_month == 0:
+ support_end_month = 12
+ support_end_year -= 1
+ return (support_end_year, support_end_month)
+
+
+def get_release_date_from_release_file(path: str) -> int | None:
+ """
+ return the release date as time_t for the given release file
+ """
+ if not path or not os.path.exists(path):
+ return None
+
+ with os.fdopen(apt_pkg.open_maybe_clear_signed_file(path)) as data:
+ tag = apt_pkg.TagFile(data)
+ section = next(tag)
+ if "Date" not in section:
+ return None
+ date = section["Date"]
+ return apt_pkg.str_to_time(date)
+
+
+def get_release_filename_for_pkg(
+ cache: apt.Cache, pkgname: str, label: str, release: str
+) -> str | None:
+ "get the release file that provides this pkg"
+ if pkgname not in cache:
+ return None
+ pkg = cache[pkgname]
+ ver = None
+ # look for the version that comes from the repos with
+ # the given label and origin
+ for aver in pkg._pkg.version_list:
+ if aver is None or aver.file_list is None:
+ continue
+ for ver_file, _index in aver.file_list:
+ # print verFile
+ if (
+ ver_file.origin == label
+ and ver_file.label == label
+ and ver_file.archive == release
+ ):
+ ver = aver
+ if not ver:
+ return None
+ indexfile = cache._list.find_index(ver.file_list[0][0])
+ for metaindex in cache._list.list:
+ for m in metaindex.index_files:
+ if indexfile and indexfile.describe == m.describe and indexfile.is_trusted:
+ dirname = apt_pkg.config.find_dir("Dir::State::lists")
+ for relfile in ["InRelease", "Release"]:
+ name = apt_pkg.uri_to_filename(
+ metaindex.uri
+ ) + "dists_{}_{}".format(
+ metaindex.dist,
+ relfile,
+ )
+ if os.path.exists(dirname + name):
+ return dirname + name
+ return None