diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 18:07:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-15 18:07:41 +0000 |
commit | 76926159194e180003aa78de97e5f287bf4325a5 (patch) | |
tree | 2cea7245cdc3f66355900c820c145eba90598766 /apt/cache.py | |
parent | Initial commit. (diff) | |
download | python-apt-76926159194e180003aa78de97e5f287bf4325a5.tar.xz python-apt-76926159194e180003aa78de97e5f287bf4325a5.zip |
Adding upstream version 2.7.6.upstream/2.7.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'apt/cache.py')
-rw-r--r-- | apt/cache.py | 1004 |
1 files changed, 1004 insertions, 0 deletions
diff --git a/apt/cache.py b/apt/cache.py new file mode 100644 index 0000000..cf78026 --- /dev/null +++ b/apt/cache.py @@ -0,0 +1,1004 @@ +# cache.py - apt cache abstraction +# +# Copyright (c) 2005-2009 Canonical +# +# Author: Michael Vogt <michael.vogt@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +from __future__ import annotations + +import fnmatch +import os +import warnings +import weakref +from collections.abc import Callable, Iterator, KeysView +from typing import Any, cast + +import apt_pkg + +import apt.progress.text +from apt.package import Package, Version +from apt.progress.base import AcquireProgress, InstallProgress, OpProgress + + +class FetchCancelledException(IOError): + """Exception that is thrown when the user cancels a fetch operation.""" + + +class FetchFailedException(IOError): + """Exception that is thrown when fetching fails.""" + + +class UntrustedException(FetchFailedException): + """Exception that is thrown when fetching fails for trust reasons""" + + +class LockFailedException(IOError): + """Exception that is thrown when locking fails.""" + + +class CacheClosedException(Exception): + """Exception that is thrown when the cache is used after close().""" + + +class _WrappedLock: + """Wraps an apt_pkg.FileLock to raise LockFailedException. + + Initialized using a directory path.""" + + def __init__(self, path: str) -> None: + self._path = path + self._lock = apt_pkg.FileLock(os.path.join(path, "lock")) + + def __enter__(self) -> None: + try: + return self._lock.__enter__() + except apt_pkg.Error as e: + raise LockFailedException( + ("Failed to lock directory %s: %s") % (self._path, e) + ) + + def __exit__(self, typ: object, value: object, traceback: object) -> None: + return self._lock.__exit__(typ, value, traceback) + + +class Cache: + """Dictionary-like package cache. + + The APT cache file contains a hash table mapping names of binary + packages to their metadata. A Cache object is the in-core + representation of the same. It provides access to APTs idea of the + list of available packages. + + The cache can be used like a mapping from package names to Package + objects (although only getting items is supported). + + Keyword arguments: + progress -- a OpProgress object, + rootdir -- an alternative root directory. if that is given the system + sources.list and system lists/files are not read, only file relative + to the given rootdir, + memonly -- build the cache in memory only. + + + .. versionchanged:: 1.0 + + The cache now supports package names with special architecture + qualifiers such as :all and :native. It does not export them + in :meth:`keys()`, though, to keep :meth:`keys()` a unique set. + """ + + def __init__( + self, + progress: OpProgress | None = None, + rootdir: str | None = None, + memonly: bool = False, + ) -> None: + self._cache: apt_pkg.Cache = cast(apt_pkg.Cache, None) + self._depcache: apt_pkg.DepCache = cast(apt_pkg.DepCache, None) + self._records: apt_pkg.PackageRecords = cast( + apt_pkg.PackageRecords, None + ) # noqa + self._list: apt_pkg.SourceList = cast(apt_pkg.SourceList, None) + self._callbacks: dict[str, list[Callable[..., None] | str]] = {} # noqa + self._callbacks2: dict[ + str, list[tuple[Callable[..., Any], tuple[Any, ...], dict[Any, Any]]] + ] = {} # noqa + self._weakref: weakref.WeakValueDictionary[ + str, apt.Package + ] = weakref.WeakValueDictionary() # noqa + self._weakversions: weakref.WeakSet[Version] = weakref.WeakSet() # noqa + self._changes_count = -1 + self._sorted_set: list[str] | None = None + + self.connect("cache_post_open", "_inc_changes_count") + self.connect("cache_post_change", "_inc_changes_count") + if memonly: + # force apt to build its caches in memory + apt_pkg.config.set("Dir::Cache::pkgcache", "") + if rootdir: + rootdir = os.path.abspath(rootdir) + if os.path.exists(rootdir + "/etc/apt/apt.conf"): + apt_pkg.read_config_file(apt_pkg.config, rootdir + "/etc/apt/apt.conf") + if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"): + apt_pkg.read_config_dir(apt_pkg.config, rootdir + "/etc/apt/apt.conf.d") + apt_pkg.config.set("Dir", rootdir) + apt_pkg.config.set("Dir::State::status", rootdir + "/var/lib/dpkg/status") + # also set dpkg to the rootdir path so that its called for the + # --print-foreign-architectures call + apt_pkg.config.set( + "Dir::bin::dpkg", os.path.join(rootdir, "usr", "bin", "dpkg") + ) + # create required dirs/files when run with special rootdir + # automatically + self._check_and_create_required_dirs(rootdir) + # Call InitSystem so the change to Dir::State::Status is actually + # recognized (LP: #320665) + apt_pkg.init_system() + + # Prepare a lock object (context manager for archive lock) + archive_dir = apt_pkg.config.find_dir("Dir::Cache::Archives") + self._archive_lock = _WrappedLock(archive_dir) + + self.open(progress) + + def fix_broken(self) -> None: + """Fix broken packages.""" + self._depcache.fix_broken() + + def _inc_changes_count(self) -> None: + """Increase the number of changes""" + self._changes_count += 1 + + def _check_and_create_required_dirs(self, rootdir: str) -> None: + """ + check if the required apt directories/files are there and if + not create them + """ + files = [ + "/var/lib/dpkg/status", + "/etc/apt/sources.list", + ] + dirs = [ + "/var/lib/dpkg", + "/etc/apt/", + "/var/cache/apt/archives/partial", + "/var/lib/apt/lists/partial", + ] + for d in dirs: + if not os.path.exists(rootdir + d): + # print "creating: ", rootdir + d + os.makedirs(rootdir + d) + for f in files: + if not os.path.exists(rootdir + f): + open(rootdir + f, "w").close() + + def _run_callbacks(self, name: str) -> None: + """internal helper to run a callback""" + if name in self._callbacks: + for callback in self._callbacks[name]: + if callback == "_inc_changes_count": + self._inc_changes_count() + else: + callback() # type: ignore + + if name in self._callbacks2: + for callback, args, kwds in self._callbacks2[name]: + callback(self, *args, **kwds) + + def open(self, progress: OpProgress | None = None) -> None: + """Open the package cache, after that it can be used like + a dictionary + """ + if progress is None: + progress = apt.progress.base.OpProgress() + # close old cache on (re)open + self.close() + self.op_progress = progress + self._run_callbacks("cache_pre_open") + + self._cache = apt_pkg.Cache(progress) + self._depcache = apt_pkg.DepCache(self._cache) + self._records = apt_pkg.PackageRecords(self._cache) + self._list = apt_pkg.SourceList() + self._list.read_main_list() + self._sorted_set = None + self.__remap() + + self._have_multi_arch = len(apt_pkg.get_architectures()) > 1 + + progress.done() + self._run_callbacks("cache_post_open") + + def __remap(self) -> None: + """Called after cache reopen() to relocate to new cache. + + Relocate objects like packages and versions from the old + underlying cache to the new one. + """ + for key in list(self._weakref.keys()): + try: + pkg = self._weakref[key] + except KeyError: + continue + + try: + pkg._pkg = self._cache[pkg._pkg.name, pkg._pkg.architecture] + except LookupError: + del self._weakref[key] + + for ver in list(self._weakversions): + # Package has been reseated above, reseat version + for v in ver.package._pkg.version_list: + # Requirements as in debListParser::SameVersion + if ( + v.hash == ver._cand.hash + and (v.size == 0 or ver._cand.size == 0 or v.size == ver._cand.size) + and v.multi_arch == ver._cand.multi_arch + and v.ver_str == ver._cand.ver_str + ): + ver._cand = v + break + else: + self._weakversions.remove(ver) + + def close(self) -> None: + """Close the package cache""" + # explicitely free the FDs that _records has open + del self._records + self._records = cast(apt_pkg.PackageRecords, None) + + def __enter__(self) -> Cache: + """Enter the with statement""" + return self + + def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> None: + """Exit the with statement""" + self.close() + + def __getitem__(self, key: object) -> Package: + """look like a dictionary (get key)""" + try: + key = str(key) + rawpkg = self._cache[key] + except KeyError: + raise KeyError("The cache has no package named %r" % key) + + # It might be excluded due to not having a version or something + if not self.__is_real_pkg(rawpkg): + raise KeyError("The cache has no package named %r" % key) + + pkg = self._rawpkg_to_pkg(rawpkg) + + return pkg + + def get(self, key: object, default: object = None) -> Any: + """Return *self*[*key*] or *default* if *key* not in *self*. + + .. versionadded:: 1.1 + """ + try: + return self[key] + except KeyError: + return default + + def _rawpkg_to_pkg(self, rawpkg: apt_pkg.Package) -> Package: + """Returns the apt.Package object for an apt_pkg.Package object. + + .. versionadded:: 1.0.0 + """ + fullname = rawpkg.get_fullname(pretty=True) + + return self._weakref.setdefault(fullname, Package(self, rawpkg)) + + def __iter__(self) -> Iterator[Package]: + # We iterate sorted over package names here. With this we read the + # package lists linearly if we need to access the package records, + # instead of having to do thousands of random seeks; the latter + # is disastrous if we use compressed package indexes, and slower than + # necessary for uncompressed indexes. + for pkgname in self.keys(): + pkg = Package(self, self._cache[pkgname]) + yield self._weakref.setdefault(pkgname, pkg) + + def __is_real_pkg(self, rawpkg: apt_pkg.Package) -> bool: + """Check if the apt_pkg.Package provided is a real package.""" + return rawpkg.has_versions + + def has_key(self, key: object) -> bool: + return key in self + + def __contains__(self, key: object) -> bool: + try: + return self.__is_real_pkg(self._cache[str(key)]) + except KeyError: + return False + + def __len__(self) -> int: + return len(self.keys()) + + def keys(self) -> list[str]: + if self._sorted_set is None: + self._sorted_set = sorted( + p.get_fullname(pretty=True) + for p in self._cache.packages + if self.__is_real_pkg(p) + ) + return list(self._sorted_set) # We need a copy here, caller may modify + + def get_changes(self) -> list[Package]: + """Get the marked changes""" + changes = [] + marked_keep = self._depcache.marked_keep + for rawpkg in self._cache.packages: + if not marked_keep(rawpkg): + changes.append(self._rawpkg_to_pkg(rawpkg)) + return changes + + def upgrade(self, dist_upgrade: bool = False) -> None: + """Upgrade all packages. + + If the parameter *dist_upgrade* is True, new dependencies will be + installed as well (and conflicting packages may be removed). The + default value is False. + """ + self.cache_pre_change() + self._depcache.upgrade(dist_upgrade) + self.cache_post_change() + + @property + def required_download(self) -> int: + """Get the size of the packages that are required to download.""" + if self._records is None: + raise CacheClosedException("Cache object used after close() called") + pm = apt_pkg.PackageManager(self._depcache) + fetcher = apt_pkg.Acquire() + pm.get_archives(fetcher, self._list, self._records) + return fetcher.fetch_needed + + @property + def required_space(self) -> int: + """Get the size of the additional required space on the fs.""" + return self._depcache.usr_size + + @property + def req_reinstall_pkgs(self) -> set[str]: + """Return the packages not downloadable packages in reqreinst state.""" + reqreinst = set() + get_candidate_ver = self._depcache.get_candidate_ver + states = frozenset( + (apt_pkg.INSTSTATE_REINSTREQ, apt_pkg.INSTSTATE_HOLD_REINSTREQ) + ) + for pkg in self._cache.packages: + cand = get_candidate_ver(pkg) + if cand and not cand.downloadable and pkg.inst_state in states: + reqreinst.add(pkg.get_fullname(pretty=True)) + return reqreinst + + def _run_fetcher( + self, fetcher: apt_pkg.Acquire, allow_unauthenticated: bool | None + ) -> int: + if allow_unauthenticated is None: + allow_unauthenticated = apt_pkg.config.find_b( + "APT::Get::" "AllowUnauthenticated", False + ) + + untrusted = [item for item in fetcher.items if not item.is_trusted] + if untrusted and not allow_unauthenticated: + raise UntrustedException( + "Untrusted packages:\n%s" % "\n".join(i.desc_uri for i in untrusted) + ) + + # do the actual fetching + res = fetcher.run() + + # now check the result (this is the code from apt-get.cc) + failed = False + err_msg = "" + for item in fetcher.items: + if item.status == item.STAT_DONE: + continue + if item.STAT_IDLE: + continue + err_msg += f"Failed to fetch {item.desc_uri} {item.error_text}\n" + failed = True + + # we raise a exception if the download failed or it was cancelt + if res == fetcher.RESULT_CANCELLED: + raise FetchCancelledException(err_msg) + elif failed: + raise FetchFailedException(err_msg) + return res + + def _fetch_archives( + self, + fetcher: apt_pkg.Acquire, + pm: apt_pkg.PackageManager, + allow_unauthenticated: bool | None = None, + ) -> int: + """fetch the needed archives""" + if self._records is None: + raise CacheClosedException("Cache object used after close() called") + + # this may as well throw a SystemError exception + if not pm.get_archives(fetcher, self._list, self._records): + return False + + # now run the fetcher, throw exception if something fails to be + # fetched + return self._run_fetcher(fetcher, allow_unauthenticated) + + def fetch_archives( + self, + progress: AcquireProgress | None = None, + fetcher: apt_pkg.Acquire | None = None, + allow_unauthenticated: bool | None = None, + ) -> int: + """Fetch the archives for all packages marked for install/upgrade. + + You can specify either an :class:`apt.progress.base.AcquireProgress()` + object for the parameter *progress*, or specify an already + existing :class:`apt_pkg.Acquire` object for the parameter *fetcher*. + + The return value of the function is undefined. If an error occurred, + an exception of type :class:`FetchFailedException` or + :class:`FetchCancelledException` is raised. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + + .. versionadded:: 0.8.0 + """ + if progress is not None and fetcher is not None: + raise ValueError("Takes a progress or a an Acquire object") + if progress is None: + progress = apt.progress.text.AcquireProgress() + if fetcher is None: + fetcher = apt_pkg.Acquire(progress) + + with self._archive_lock: + return self._fetch_archives( + fetcher, apt_pkg.PackageManager(self._depcache), allow_unauthenticated + ) + + def is_virtual_package(self, pkgname: str) -> bool: + """Return whether the package is a virtual package.""" + try: + pkg = self._cache[pkgname] + except KeyError: + return False + else: + return bool(pkg.has_provides and not pkg.has_versions) + + def get_providing_packages( + self, + pkgname: str, + candidate_only: bool = True, + include_nonvirtual: bool = False, + ) -> list[Package]: + """Return a list of all packages providing a package. + + Return a list of packages which provide the virtual package of the + specified name. + + If 'candidate_only' is False, return all packages with at + least one version providing the virtual package. Otherwise, + return only those packages where the candidate version + provides the virtual package. + + If 'include_nonvirtual' is True then it will search for all + packages providing pkgname, even if pkgname is not itself + a virtual pkg. + """ + + providers: set[Package] = set() + get_candidate_ver = self._depcache.get_candidate_ver + try: + vp = self._cache[pkgname] + if vp.has_versions and not include_nonvirtual: + return list(providers) + except KeyError: + return list(providers) + + for provides, providesver, version in vp.provides_list: + rawpkg = version.parent_pkg + if not candidate_only or (version == get_candidate_ver(rawpkg)): + providers.add(self._rawpkg_to_pkg(rawpkg)) + return list(providers) + + def update( + self, + fetch_progress: AcquireProgress | None = None, + pulse_interval: int = 0, + raise_on_error: bool = True, + sources_list: str | None = None, + ) -> int: + """Run the equivalent of apt-get update. + + You probably want to call open() afterwards, in order to utilise the + new cache. Otherwise, the old cache will be used which can lead to + strange bugs. + + The first parameter *fetch_progress* may be set to an instance of + apt.progress.FetchProgress, the default is apt.progress.FetchProgress() + . + sources_list -- Update a alternative sources.list than the default. + Note that the sources.list.d directory is ignored in this case + """ + with _WrappedLock(apt_pkg.config.find_dir("Dir::State::Lists")): + if sources_list: + old_sources_list = apt_pkg.config.find("Dir::Etc::sourcelist") + old_sources_list_d = apt_pkg.config.find("Dir::Etc::sourceparts") + old_cleanup = apt_pkg.config.find("APT::List-Cleanup") + apt_pkg.config.set( + "Dir::Etc::sourcelist", os.path.abspath(sources_list) + ) + apt_pkg.config.set("Dir::Etc::sourceparts", "xxx") + apt_pkg.config.set("APT::List-Cleanup", "0") + slist = apt_pkg.SourceList() + slist.read_main_list() + else: + slist = self._list + + try: + if fetch_progress is None: + fetch_progress = apt.progress.base.AcquireProgress() + try: + res = self._cache.update(fetch_progress, slist, pulse_interval) + except SystemError as e: + raise FetchFailedException(e) + if not res and raise_on_error: + raise FetchFailedException() + else: + return res + finally: + if sources_list: + apt_pkg.config.set("Dir::Etc::sourcelist", old_sources_list) + apt_pkg.config.set("Dir::Etc::sourceparts", old_sources_list_d) + apt_pkg.config.set("APT::List-Cleanup", old_cleanup) + + def install_archives( + self, pm: apt_pkg.PackageManager, install_progress: InstallProgress + ) -> int: + """ + The first parameter *pm* refers to an object returned by + apt_pkg.PackageManager(). + + The second parameter *install_progress* refers to an InstallProgress() + object of the module apt.progress. + + This releases a system lock in newer versions, if there is any, + and reestablishes it afterwards. + """ + # compat with older API + try: + install_progress.startUpdate() # type: ignore + except AttributeError: + install_progress.start_update() + + did_unlock = apt_pkg.pkgsystem_is_locked() + if did_unlock: + apt_pkg.pkgsystem_unlock_inner() + + try: + res = install_progress.run(pm) + finally: + if did_unlock: + apt_pkg.pkgsystem_lock_inner() + + try: + install_progress.finishUpdate() # type: ignore + except AttributeError: + install_progress.finish_update() + return res + + def commit( + self, + fetch_progress: AcquireProgress | None = None, + install_progress: InstallProgress | None = None, + allow_unauthenticated: bool | None = None, + ) -> bool: + """Apply the marked changes to the cache. + + The first parameter, *fetch_progress*, refers to a FetchProgress() + object as found in apt.progress, the default being + apt.progress.FetchProgress(). + + The second parameter, *install_progress*, is a + apt.progress.InstallProgress() object. + + The keyword-only parameter *allow_unauthenticated* specifies whether + to allow unauthenticated downloads. If not specified, it defaults to + the configuration option `APT::Get::AllowUnauthenticated`. + """ + # FIXME: + # use the new acquire/pkgmanager interface here, + # raise exceptions when a download or install fails + # and send proper error strings to the application. + # Current a failed download will just display "error" + # which is less than optimal! + + if fetch_progress is None: + fetch_progress = apt.progress.base.AcquireProgress() + if install_progress is None: + install_progress = apt.progress.base.InstallProgress() + + assert install_progress is not None + + with apt_pkg.SystemLock(): + pm = apt_pkg.PackageManager(self._depcache) + fetcher = apt_pkg.Acquire(fetch_progress) + with self._archive_lock: + while True: + # fetch archives first + res = self._fetch_archives(fetcher, pm, allow_unauthenticated) + + # then install + res = self.install_archives(pm, install_progress) + if res == pm.RESULT_COMPLETED: + break + elif res == pm.RESULT_FAILED: + raise SystemError("installArchives() failed") + elif res == pm.RESULT_INCOMPLETE: + pass + else: + raise SystemError( + "internal-error: unknown result " + "code from InstallArchives: %s" % res + ) + # reload the fetcher for media swaping + fetcher.shutdown() + return res == pm.RESULT_COMPLETED + + def clear(self) -> None: + """Unmark all changes""" + self._depcache.init() + + # cache changes + + def cache_post_change(self) -> None: + "called internally if the cache has changed, emit a signal then" + self._run_callbacks("cache_post_change") + + def cache_pre_change(self) -> None: + """called internally if the cache is about to change, emit + a signal then""" + self._run_callbacks("cache_pre_change") + + def connect(self, name: str, callback: Callable[..., None] | str) -> None: + """Connect to a signal. + + .. deprecated:: 1.0 + + Please use connect2() instead, as this function is very + likely to cause a memory leak. + """ + if callback != "_inc_changes_count": + warnings.warn( + "connect() likely causes a reference" " cycle, use connect2() instead", + RuntimeWarning, + 2, + ) + if name not in self._callbacks: + self._callbacks[name] = [] + self._callbacks[name].append(callback) + + def connect2( + self, name: str, callback: Callable[..., Any], *args: object, **kwds: object + ) -> None: + """Connect to a signal. + + The callback will be passed the cache as an argument, and + any arguments passed to this function. Make sure that, if you + pass a method of a class as your callback, your class does not + contain a reference to the cache. + + Cyclic references to the cache can cause issues if the Cache object + is replaced by a new one, because the cache keeps a lot of objects and + tens of open file descriptors. + + currently only used for cache_{post,pre}_{changed,open}. + + .. versionadded:: 1.0 + """ + if name not in self._callbacks2: + self._callbacks2[name] = [] + self._callbacks2[name].append((callback, args, kwds)) + + def actiongroup(self) -> apt_pkg.ActionGroup: + """Return an `ActionGroup` object for the current cache. + + Action groups can be used to speedup actions. The action group is + active as soon as it is created, and disabled when the object is + deleted or when release() is called. + + You can use the action group as a context manager, this is the + recommended way:: + + with cache.actiongroup(): + for package in my_selected_packages: + package.mark_install() + + This way, the action group is automatically released as soon as the + with statement block is left. It also has the benefit of making it + clear which parts of the code run with a action group and which + don't. + """ + return apt_pkg.ActionGroup(self._depcache) + + @property + def dpkg_journal_dirty(self) -> bool: + """Return True if the dpkg was interrupted + + All dpkg operations will fail until this is fixed, the action to + fix the system if dpkg got interrupted is to run + 'dpkg --configure -a' as root. + """ + dpkg_status_dir = os.path.dirname( + apt_pkg.config.find_file("Dir::State::status") + ) + for f in os.listdir(os.path.join(dpkg_status_dir, "updates")): + if fnmatch.fnmatch(f, "[0-9]*"): + return True + return False + + @property + def broken_count(self) -> int: + """Return the number of packages with broken dependencies.""" + return self._depcache.broken_count + + @property + def delete_count(self) -> int: + """Return the number of packages marked for deletion.""" + return self._depcache.del_count + + @property + def install_count(self) -> int: + """Return the number of packages marked for installation.""" + return self._depcache.inst_count + + @property + def keep_count(self) -> int: + """Return the number of packages marked as keep.""" + return self._depcache.keep_count + + +class ProblemResolver: + """Resolve problems due to dependencies and conflicts. + + The first argument 'cache' is an instance of apt.Cache. + """ + + def __init__(self, cache: Cache) -> None: + self._resolver = apt_pkg.ProblemResolver(cache._depcache) + self._cache = cache + + def clear(self, package: Package) -> None: + """Reset the package to the default state.""" + self._resolver.clear(package._pkg) + + def protect(self, package: Package) -> None: + """Protect a package so it won't be removed.""" + self._resolver.protect(package._pkg) + + def remove(self, package: Package) -> None: + """Mark a package for removal.""" + self._resolver.remove(package._pkg) + + def resolve(self) -> None: + """Resolve dependencies, try to remove packages where needed.""" + self._cache.cache_pre_change() + self._resolver.resolve() + self._cache.cache_post_change() + + def resolve_by_keep(self) -> None: + """Resolve dependencies, do not try to remove packages.""" + self._cache.cache_pre_change() + self._resolver.resolve_by_keep() + self._cache.cache_post_change() + + def keep_phased_updates(self) -> None: + """Keep back phased updates.""" + self._cache.cache_pre_change() + self._resolver.keep_phased_updates() + self._cache.cache_post_change() + + +# ----------------------------- experimental interface + + +class Filter: + """Filter base class""" + + def apply(self, pkg: Package) -> bool: + """Filter function, return True if the package matchs a + filter criteria and False otherwise + """ + return True + + +class MarkedChangesFilter(Filter): + """Filter that returns all marked changes""" + + def apply(self, pkg: Package) -> bool: + if pkg.marked_install or pkg.marked_delete or pkg.marked_upgrade: + return True + else: + return False + + +class InstalledFilter(Filter): + """Filter that returns all installed packages. + + .. versionadded:: 1.0.0 + """ + + def apply(self, pkg: Package) -> bool: + return pkg.is_installed + + +class _FilteredCacheHelper: + """Helper class for FilteredCache to break a reference cycle.""" + + def __init__(self, cache: Cache) -> None: + # Do not keep a reference to the cache, or you have a cycle! + + self._filtered: dict[str, bool] = {} + self._filters: list[Filter] = [] + cache.connect2("cache_post_change", self.filter_cache_post_change) + cache.connect2("cache_post_open", self.filter_cache_post_change) + + def _reapply_filter(self, cache: Cache) -> None: + "internal helper to refilter" + # Do not keep a reference to the cache, or you have a cycle! + self._filtered = {} + for pkg in cache: + for f in self._filters: + if f.apply(pkg): + self._filtered[pkg.name] = True + break + + def set_filter(self, filter: Filter) -> None: + """Set the current active filter.""" + self._filters = [] + self._filters.append(filter) + + def filter_cache_post_change(self, cache: Cache) -> None: + """Called internally if the cache changes, emit a signal then.""" + # Do not keep a reference to the cache, or you have a cycle! + self._reapply_filter(cache) + + +class FilteredCache: + """A package cache that is filtered. + + Can work on a existing cache or create a new one + """ + + def __init__( + self, cache: Cache | None = None, progress: OpProgress | None = None + ) -> None: + if cache is None: + self.cache = Cache(progress) + else: + self.cache = cache + self._helper = _FilteredCacheHelper(self.cache) + + def __len__(self) -> int: + return len(self._helper._filtered) + + def __getitem__(self, key: str) -> Package: + return self.cache[key] + + def __iter__(self) -> Iterator[Package]: + for pkgname in self._helper._filtered: + yield self.cache[pkgname] + + def keys(self) -> KeysView[str]: + return self._helper._filtered.keys() + + def has_key(self, key: object) -> bool: + return key in self + + def __contains__(self, key: object) -> bool: + try: + # Normalize package name for multi arch + return self.cache[key].name in self._helper._filtered + except KeyError: + return False + + def set_filter(self, filter: Filter) -> None: + """Set the current active filter.""" + self._helper.set_filter(filter) + self.cache.cache_post_change() + + def filter_cache_post_change(self) -> None: + """Called internally if the cache changes, emit a signal then.""" + self._helper.filter_cache_post_change(self.cache) + + def __getattr__(self, key: str) -> Any: + """we try to look exactly like a real cache.""" + return getattr(self.cache, key) + + +def cache_pre_changed(cache: Cache) -> None: + print("cache pre changed") + + +def cache_post_changed(cache: Cache) -> None: + print("cache post changed") + + +def _test() -> None: + """Internal test code.""" + print("Cache self test") + apt_pkg.init() + cache = Cache(apt.progress.text.OpProgress()) + cache.connect2("cache_pre_change", cache_pre_changed) + cache.connect2("cache_post_change", cache_post_changed) + print("aptitude" in cache) + pkg = cache["aptitude"] + print(pkg.name) + print(len(cache)) + + for pkgname in cache.keys(): + assert cache[pkgname].name == pkgname + + cache.upgrade() + changes = cache.get_changes() + print(len(changes)) + for pkg in changes: + assert pkg.name + + # see if fetching works + for dirname in ["/tmp/pytest", "/tmp/pytest/partial"]: + if not os.path.exists(dirname): + os.mkdir(dirname) + apt_pkg.config.set("Dir::Cache::Archives", "/tmp/pytest") + pm = apt_pkg.PackageManager(cache._depcache) + fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress()) + cache._fetch_archives(fetcher, pm, None) + # sys.exit(1) + + print("Testing filtered cache (argument is old cache)") + filtered = FilteredCache(cache) + filtered.cache.connect2("cache_pre_change", cache_pre_changed) + filtered.cache.connect2("cache_post_change", cache_post_changed) + filtered.cache.upgrade() + filtered.set_filter(MarkedChangesFilter()) + print(len(filtered)) + for pkgname in filtered.keys(): + assert pkgname == filtered[pkgname].name + + print(len(filtered)) + + print("Testing filtered cache (no argument)") + filtered = FilteredCache(progress=apt.progress.base.OpProgress()) + filtered.cache.connect2("cache_pre_change", cache_pre_changed) + filtered.cache.connect2("cache_post_change", cache_post_changed) + filtered.cache.upgrade() + filtered.set_filter(MarkedChangesFilter()) + print(len(filtered)) + for pkgname in filtered.keys(): + assert pkgname == filtered[pkgname].name + + print(len(filtered)) + + +if __name__ == "__main__": + _test() |