summaryrefslogtreecommitdiffstats
path: root/apt/debfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'apt/debfile.py')
-rw-r--r--apt/debfile.py861
1 files changed, 861 insertions, 0 deletions
diff --git a/apt/debfile.py b/apt/debfile.py
new file mode 100644
index 0000000..b3ef733
--- /dev/null
+++ b/apt/debfile.py
@@ -0,0 +1,861 @@
+# Copyright (c) 2005-2010 Canonical
+#
+# Author: Michael Vogt <michael.vogt@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Classes for working with locally available Debian packages."""
+
+import gzip
+import os
+import sys
+from collections.abc import Iterable
+from io import BytesIO
+from typing import cast
+
+import apt_inst
+import apt_pkg
+from apt_pkg import gettext as _
+
+import apt
+
+
+class NoDebArchiveException(IOError):
+ """Exception which is raised if a file is no Debian archive."""
+
+
+class DebPackage:
+ """A Debian Package (.deb file)."""
+
+ # Constants for comparing the local package file with the version
+ # in the cache
+ (VERSION_NONE, VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER) = range(4)
+
+ debug = 0
+
+ def __init__(
+ self, filename: str | None = None, cache: apt.Cache | None = None
+ ) -> None:
+ if cache is None:
+ cache = apt.Cache()
+ self._cache = cache
+ self._debfile = cast(apt_inst.DebFile, None)
+ self.pkgname = ""
+ self.filename: str | None = None
+ self._sections: dict[str, str] | apt_pkg.TagSection[str] = {} # noqa
+ self._need_pkgs: list[str] = []
+ self._check_was_run = False
+ self._failure_string = ""
+ self._multiarch: str | None = None
+ if filename:
+ self.open(filename)
+
+ def open(self, filename: str) -> None:
+ """open given debfile"""
+ self._dbg(3, "open '%s'" % filename)
+ self._need_pkgs = []
+ self._installed_conflicts: set[str] = set()
+ self._failure_string = ""
+ self.filename = filename
+ self._debfile = apt_inst.DebFile(self.filename)
+ control = self._debfile.control.extractdata("control")
+ self._sections = apt_pkg.TagSection(control)
+ self.pkgname = self._sections["Package"]
+ self._check_was_run = False
+
+ def __getitem__(self, key: str) -> str:
+ return self._sections[key]
+
+ def __contains__(self, key: str) -> bool:
+ return key in self._sections
+
+ @property
+ def filelist(self) -> list[str]:
+ """return the list of files in the deb."""
+ files = []
+ try:
+ self._debfile.data.go(lambda item, data: files.append(item.name))
+ except SystemError:
+ return [_("List of files for '%s' could not be read") % self.filename]
+ return files
+
+ @property
+ def control_filelist(self) -> list[str]:
+ """return the list of files in control.tar.gz"""
+ control = []
+ try:
+ self._debfile.control.go(lambda item, data: control.append(item.name))
+ except SystemError:
+ return [
+ _("List of control files for '%s' could not be read") % self.filename
+ ]
+ return sorted(control)
+
+ # helper that will return a pkgname with a multiarch suffix if needed
+ def _maybe_append_multiarch_suffix(
+ self, pkgname: str, in_conflict_checking: bool = False
+ ) -> str:
+ # trivial cases
+ if ":" in pkgname:
+ return pkgname
+ if not self._multiarch:
+ return pkgname
+ elif self._cache.is_virtual_package(pkgname):
+ return pkgname
+ elif (
+ pkgname in self._cache
+ and self._cache[pkgname].candidate is not None
+ and cast(apt.package.Version, self._cache[pkgname].candidate).architecture
+ == "all"
+ ):
+ return pkgname
+ # now do the real multiarch checking
+ multiarch_pkgname = f"{pkgname}:{self._multiarch}"
+ # the upper layers will handle this
+ if multiarch_pkgname not in self._cache:
+ return multiarch_pkgname
+ multiarch_pkg = self._cache[multiarch_pkgname]
+ if multiarch_pkg.candidate is None:
+ return multiarch_pkgname
+ # now check the multiarch state
+ cand = multiarch_pkg.candidate._cand
+ # print pkgname, multiarch_pkgname, cand.multi_arch
+ # the default is to add the suffix, unless its a pkg that can satify
+ # foreign dependencies
+ if cand.multi_arch & cand.MULTI_ARCH_FOREIGN:
+ return pkgname
+ # for conflicts we need a special case here, any not multiarch enabled
+ # package has a implicit conflict
+ if in_conflict_checking and not (cand.multi_arch & cand.MULTI_ARCH_SAME):
+ return pkgname
+ return multiarch_pkgname
+
+ def _is_or_group_satisfied(self, or_group: list[tuple[str, str, str]]) -> bool:
+ """Return True if at least one dependency of the or-group is satisfied.
+
+ This method gets an 'or_group' and analyzes if at least one dependency
+ of this group is already satisfied.
+ """
+ self._dbg(2, "_checkOrGroup(): %s " % (or_group))
+
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # multiarch
+ depname = self._maybe_append_multiarch_suffix(depname)
+
+ # check for virtual pkgs
+ if depname not in self._cache:
+ if self._cache.is_virtual_package(depname):
+ self._dbg(
+ 3, "_is_or_group_satisfied(): %s is virtual dep" % depname
+ )
+ for pkg in self._cache.get_providing_packages(depname):
+ if pkg.is_installed:
+ return True
+ continue
+ # check real dependency
+ inst = self._cache[depname].installed
+ if inst is not None and apt_pkg.check_dep(inst.version, oper, ver):
+ return True
+
+ # if no real dependency is installed, check if there is
+ # a package installed that provides this dependency
+ # (e.g. scrollkeeper dependecies are provided by rarian-compat)
+ # but only do that if there is no version required in the
+ # dependency (we do not supprot versionized dependencies)
+ if not oper:
+ for ppkg in self._cache.get_providing_packages(
+ depname, include_nonvirtual=True
+ ):
+ if ppkg.is_installed:
+ self._dbg(
+ 3,
+ "found installed '%s' that provides '%s'"
+ % (ppkg.name, depname),
+ )
+ return True
+ return False
+
+ def _satisfy_or_group(self, or_group: list[tuple[str, str, str]]) -> bool:
+ """Try to satisfy the or_group."""
+ for dep in or_group:
+ depname, ver, oper = dep
+
+ # multiarch
+ depname = self._maybe_append_multiarch_suffix(depname)
+
+ # if we don't have it in the cache, it may be virtual
+ if depname not in self._cache:
+ if not self._cache.is_virtual_package(depname):
+ continue
+ providers = self._cache.get_providing_packages(depname)
+ # if a package just has a single virtual provider, we
+ # just pick that (just like apt)
+ if len(providers) != 1:
+ continue
+ depname = providers[0].name
+
+ # now check if we can satisfy the deps with the candidate(s)
+ # in the cache
+ pkg = self._cache[depname]
+ cand = self._cache._depcache.get_candidate_ver(pkg._pkg)
+ if not cand:
+ continue
+ if not apt_pkg.check_dep(cand.ver_str, oper, ver):
+ continue
+
+ # check if we need to install it
+ self._dbg(2, "Need to get: %s" % depname)
+ self._need_pkgs.append(depname)
+ return True
+
+ # if we reach this point, we failed
+ or_str = ""
+ for dep in or_group:
+ or_str += dep[0]
+ if ver and oper:
+ or_str += f" ({dep[2]} {dep[1]})"
+ if dep != or_group[len(or_group) - 1]:
+ or_str += "|"
+ self._failure_string += _("Dependency is not satisfiable: %s\n") % or_str
+ return False
+
+ def _check_single_pkg_conflict(self, pkgname: str, ver: str, oper: str) -> bool:
+ """Return True if a pkg conflicts with a real installed/marked pkg."""
+ # FIXME: deal with conflicts against its own provides
+ # (e.g. Provides: ftp-server, Conflicts: ftp-server)
+ self._dbg(
+ 3,
+ "_check_single_pkg_conflict() pkg='%s' ver='%s' oper='%s'"
+ % (pkgname, ver, oper),
+ )
+ pkg = self._cache[pkgname]
+ if pkg.is_installed:
+ assert pkg.installed is not None
+ pkgver = pkg.installed.version
+ elif pkg.marked_install:
+ assert pkg.candidate is not None
+ pkgver = pkg.candidate.version
+ else:
+ return False
+ # print "pkg: %s" % pkgname
+ # print "ver: %s" % ver
+ # print "pkgver: %s " % pkgver
+ # print "oper: %s " % oper
+ if apt_pkg.check_dep(pkgver, oper, ver) and not self.replaces_real_pkg(
+ pkgname, oper, ver
+ ):
+ self._failure_string += (
+ _("Conflicts with the installed package " "'%s'") % pkg.name
+ )
+ self._dbg(3, "conflicts with installed pkg '%s'" % pkg.name)
+ return True
+ return False
+
+ def _check_conflicts_or_group(self, or_group: list[tuple[str, str, str]]) -> bool:
+ """Check the or-group for conflicts with installed pkgs."""
+ self._dbg(2, "_check_conflicts_or_group(): %s " % (or_group))
+ for dep in or_group:
+ depname = dep[0]
+ ver = dep[1]
+ oper = dep[2]
+
+ # FIXME: is this good enough? i.e. will apt always populate
+ # the cache with conflicting pkgnames for our arch?
+ depname = self._maybe_append_multiarch_suffix(
+ depname, in_conflict_checking=True
+ )
+
+ # check conflicts with virtual pkgs
+ if depname not in self._cache:
+ # FIXME: we have to check for virtual replaces here as
+ # well (to pass tests/gdebi-test8.deb)
+ if self._cache.is_virtual_package(depname):
+ for pkg in self._cache.get_providing_packages(depname):
+ self._dbg(3, "conflicts virtual check: %s" % pkg.name)
+ # P/C/R on virtal pkg, e.g. ftpd
+ if self.pkgname == pkg.name:
+ self._dbg(3, "conflict on self, ignoring")
+ continue
+ if self._check_single_pkg_conflict(pkg.name, ver, oper):
+ self._installed_conflicts.add(pkg.name)
+ continue
+ if self._check_single_pkg_conflict(depname, ver, oper):
+ self._installed_conflicts.add(depname)
+ return bool(self._installed_conflicts)
+
+ @property
+ def conflicts(self) -> list[list[tuple[str, str, str]]]:
+ """List of packages conflicting with this package."""
+ key = "Conflicts"
+ try:
+ return apt_pkg.parse_depends(self._sections[key], False)
+ except KeyError:
+ return []
+
+ @property
+ def depends(self) -> list[list[tuple[str, str, str]]]:
+ """List of packages on which this package depends on."""
+ depends = []
+ # find depends
+ for key in "Depends", "Pre-Depends":
+ try:
+ depends.extend(apt_pkg.parse_depends(self._sections[key], False))
+ except KeyError:
+ pass
+ return depends
+
+ @property
+ def provides(self) -> list[list[tuple[str, str, str]]]:
+ """List of virtual packages which are provided by this package."""
+ key = "Provides"
+ try:
+ return apt_pkg.parse_depends(self._sections[key], False)
+ except KeyError:
+ return []
+
+ @property
+ def replaces(self) -> list[list[tuple[str, str, str]]]:
+ """List of packages which are replaced by this package."""
+ key = "Replaces"
+ try:
+ return apt_pkg.parse_depends(self._sections[key], False)
+ except KeyError:
+ return []
+
+ def replaces_real_pkg(self, pkgname: str, oper: str, ver: str) -> bool:
+ """Return True if a given non-virtual package is replaced.
+
+ Return True if the deb packages replaces a real (not virtual)
+ packages named (pkgname, oper, ver).
+ """
+ self._dbg(3, f"replaces_real_pkg() {pkgname} {oper} {ver}")
+ pkg = self._cache[pkgname]
+ pkgver: str | None = None
+ if pkg.is_installed:
+ assert pkg.installed is not None
+ pkgver = pkg.installed.version
+ elif pkg.marked_install:
+ assert pkg.candidate is not None
+ pkgver = pkg.candidate.version
+ else:
+ pkgver = None
+ for or_group in self.replaces:
+ for name, ver, oper in or_group:
+ if name == pkgname and (
+ pkgver is None or apt_pkg.check_dep(pkgver, oper, ver)
+ ):
+ self._dbg(
+ 3,
+ "we have a replaces in our package for the "
+ "conflict against '%s'" % (pkgname),
+ )
+ return True
+ return False
+
+ def check_conflicts(self) -> bool:
+ """Check if there are conflicts with existing or selected packages.
+
+ Check if the package conflicts with a existing or to be installed
+ package. Return True if the pkg is OK.
+ """
+ res = True
+ for or_group in self.conflicts:
+ if self._check_conflicts_or_group(or_group):
+ # print "Conflicts with a exisiting pkg!"
+ # self._failure_string = "Conflicts with a exisiting pkg!"
+ res = False
+ return res
+
+ def check_breaks_existing_packages(self) -> bool:
+ """
+ check if installing the package would break exsisting
+ package on the system, e.g. system has:
+ smc depends on smc-data (= 1.4)
+ and user tries to installs smc-data 1.6
+ """
+ # show progress information as this step may take some time
+ size = float(len(self._cache))
+ steps = max(int(size / 50), 1)
+ debver = self._sections["Version"]
+ debarch = self._sections["Architecture"]
+ # store what we provide so that we can later check against that
+ provides = [x[0][0] for x in self.provides]
+ for i, pkg in enumerate(self._cache):
+ if i % steps == 0:
+ self._cache.op_progress.update(float(i) / size * 100.0)
+ if not pkg.is_installed:
+ continue
+ assert pkg.installed is not None
+ # check if the exising dependencies are still satisfied
+ # with the package
+ ver = pkg._pkg.current_ver
+ for dep_or in pkg.installed.dependencies:
+ for dep in dep_or.or_dependencies:
+ if dep.name == self.pkgname:
+ if not apt_pkg.check_dep(debver, dep.relation, dep.version):
+ self._dbg(2, "would break (depends) %s" % pkg.name)
+ # TRANSLATORS: the first '%s' is the package that
+ # breaks, the second the dependency that makes it
+ # break, the third the relation (e.g. >=) and the
+ # latest the version for the releation
+ self._failure_string += _(
+ "Breaks existing package '%(pkgname)s' "
+ "dependency %(depname)s "
+ "(%(deprelation)s %(depversion)s)"
+ ) % {
+ "pkgname": pkg.name,
+ "depname": dep.name,
+ "deprelation": dep.relation,
+ "depversion": dep.version,
+ }
+ self._cache.op_progress.done()
+ return False
+ # now check if there are conflicts against this package on
+ # the existing system
+ if "Conflicts" in ver.depends_list:
+ for conflicts_ver_list in ver.depends_list["Conflicts"]:
+ for c_or in conflicts_ver_list:
+ if (
+ c_or.target_pkg.name == self.pkgname
+ and c_or.target_pkg.architecture == debarch
+ ):
+ if apt_pkg.check_dep(
+ debver, c_or.comp_type, c_or.target_ver
+ ):
+ self._dbg(2, "would break (conflicts) %s" % pkg.name)
+ # TRANSLATORS: the first '%s' is the package
+ # that conflicts, the second the packagename
+ # that it conflicts with (so the name of the
+ # deb the user tries to install), the third is
+ # the relation (e.g. >=) and the last is the
+ # version for the relation
+ self._failure_string += _(
+ "Breaks existing package '%(pkgname)s' "
+ "conflict: %(targetpkg)s "
+ "(%(comptype)s %(targetver)s)"
+ ) % {
+ "pkgname": pkg.name,
+ "targetpkg": c_or.target_pkg.name,
+ "comptype": c_or.comp_type,
+ "targetver": c_or.target_ver,
+ }
+ self._cache.op_progress.done()
+ return False
+ if (
+ c_or.target_pkg.name in provides
+ and self.pkgname != pkg.name
+ ):
+ self._dbg(2, "would break (conflicts) %s" % provides)
+ self._failure_string += _(
+ "Breaks existing package '%(pkgname)s' "
+ "that conflict: '%(targetpkg)s'. But the "
+ "'%(debfile)s' provides it via: "
+ "'%(provides)s'"
+ ) % {
+ "provides": ",".join(provides),
+ "debfile": self.filename,
+ "targetpkg": c_or.target_pkg.name,
+ "pkgname": pkg.name,
+ }
+ self._cache.op_progress.done()
+ return False
+ self._cache.op_progress.done()
+ return True
+
+ def compare_to_version_in_cache(self, use_installed: bool = True) -> int:
+ """Compare the package to the version available in the cache.
+
+ Checks if the package is already installed or availabe in the cache
+ and if so in what version, returns one of (VERSION_NONE,
+ VERSION_OUTDATED, VERSION_SAME, VERSION_NEWER).
+ """
+ self._dbg(3, "compare_to_version_in_cache")
+ pkgname = self._sections["Package"]
+ architecture = self._sections["Architecture"]
+
+ # Arch qualify the package name
+ pkgname = ":".join([pkgname, architecture])
+
+ debver = self._sections["Version"]
+ self._dbg(1, "debver: %s" % debver)
+ if pkgname in self._cache:
+ pkg = self._cache[pkgname]
+ if use_installed and pkg.installed is not None:
+ cachever = pkg.installed.version
+ elif not use_installed and pkg.candidate is not None:
+ cachever = pkg.candidate.version
+ else:
+ return self.VERSION_NONE
+ if cachever is not None:
+ cmp = apt_pkg.version_compare(cachever, debver)
+ self._dbg(1, "CompareVersion(debver,instver): %s" % cmp)
+ if cmp == 0:
+ return self.VERSION_SAME
+ elif cmp < 0:
+ return self.VERSION_NEWER
+ elif cmp > 0:
+ return self.VERSION_OUTDATED
+ return self.VERSION_NONE
+
+ def check(self, allow_downgrade: bool = False) -> bool:
+ """Check if the package is installable."""
+ self._dbg(3, "check")
+
+ self._check_was_run = True
+
+ # check arch
+ if "Architecture" not in self._sections:
+ self._dbg(1, "ERROR: no architecture field")
+ self._failure_string = _("No Architecture field in the package")
+ return False
+ arch = self._sections["Architecture"]
+ if arch != "all" and arch != apt_pkg.config.find("APT::Architecture"):
+ if arch in apt_pkg.get_architectures():
+ self._multiarch = arch
+ self.pkgname = f"{self.pkgname}:{self._multiarch}"
+ self._dbg(1, "Found multiarch arch: '%s'" % arch)
+ else:
+ self._dbg(1, "ERROR: Wrong architecture dude!")
+ self._failure_string = (
+ _(
+ "Wrong architecture '%s' "
+ "-- Run dpkg --add-architecture to "
+ "add it and update afterwards"
+ )
+ % arch
+ )
+ return False
+
+ # check version
+ if (
+ not allow_downgrade
+ and self.compare_to_version_in_cache() == self.VERSION_OUTDATED
+ ):
+ if self._cache[self.pkgname].installed:
+ # the deb is older than the installed
+ self._failure_string = _("A later version is already installed")
+ return False
+
+ # FIXME: this sort of error handling sux
+ self._failure_string = ""
+
+ # check conflicts
+ if not self.check_conflicts():
+ return False
+
+ # check if installing it would break anything on the
+ # current system
+ if not self.check_breaks_existing_packages():
+ return False
+
+ # try to satisfy the dependencies
+ if not self._satisfy_depends(self.depends):
+ return False
+
+ # check for conflicts again (this time with the packages that are
+ # makeed for install)
+ if not self.check_conflicts():
+ return False
+
+ if self._cache._depcache.broken_count > 0:
+ self._failure_string = _(
+ "Failed to satisfy all dependencies " "(broken cache)"
+ )
+ # clean the cache again
+ self._cache.clear()
+ return False
+ return True
+
+ def satisfy_depends_str(self, dependsstr: str) -> bool:
+ """Satisfy the dependencies in the given string."""
+ return self._satisfy_depends(apt_pkg.parse_depends(dependsstr, False))
+
+ def _satisfy_depends(self, depends: list[list[tuple[str, str, str]]]) -> bool:
+ """Satisfy the dependencies."""
+ # turn off MarkAndSweep via a action group (if available)
+ try:
+ _actiongroup = apt_pkg.ActionGroup(self._cache._depcache)
+ _actiongroup # pyflakes
+ except AttributeError:
+ pass
+ # check depends
+ for or_group in depends:
+ if not self._is_or_group_satisfied(or_group):
+ if not self._satisfy_or_group(or_group):
+ return False
+ # now try it out in the cache
+ for pkg in self._need_pkgs:
+ try:
+ self._cache[pkg].mark_install(from_user=False)
+ except SystemError:
+ self._failure_string = _("Cannot install '%s'") % pkg
+ self._cache.clear()
+ return False
+ return True
+
+ @property
+ def missing_deps(self) -> list[str]:
+ """Return missing dependencies."""
+ self._dbg(1, "Installing: %s" % self._need_pkgs)
+ if not self._check_was_run:
+ raise AttributeError("property only available after check() was run")
+ return self._need_pkgs
+
+ @property
+ def required_changes(self) -> tuple[list[str], list[str], list[str]]:
+ """Get the changes required to satisfy the dependencies.
+
+ Returns: a tuple with (install, remove, unauthenticated)
+ """
+ install = []
+ remove = []
+ unauthenticated = []
+ if not self._check_was_run:
+ raise AttributeError("property only available after check() was run")
+ for pkg in self._cache:
+ if pkg.marked_install or pkg.marked_upgrade:
+ assert pkg.candidate is not None
+ install.append(pkg.name)
+ # check authentication, one authenticated origin is enough
+ # libapt will skip non-authenticated origins then
+ authenticated = False
+ for origin in pkg.candidate.origins:
+ authenticated |= origin.trusted
+ if not authenticated:
+ unauthenticated.append(pkg.name)
+ if pkg.marked_delete:
+ remove.append(pkg.name)
+ return (install, remove, unauthenticated)
+
+ @staticmethod
+ def to_hex(in_data: str) -> str:
+ hex = ""
+ for i, c in enumerate(in_data):
+ if i % 80 == 0:
+ hex += "\n"
+ hex += "%2.2x " % ord(c)
+ return hex
+
+ @staticmethod
+ def to_strish(in_data: str | Iterable[int]) -> str:
+ s = ""
+ # py2 compat, in_data is type string
+ if isinstance(in_data, str):
+ for c in in_data:
+ if ord(c) < 10 or ord(c) > 127:
+ s += " "
+ else:
+ s += c
+ # py3 compat, in_data is type bytes
+ else:
+ for b in in_data:
+ if b < 10 or b > 127:
+ s += " "
+ else:
+ s += chr(b)
+ return s
+
+ def _get_content(
+ self,
+ part: apt_inst.TarFile,
+ name: str,
+ auto_decompress: bool = True,
+ auto_hex: bool = True,
+ ) -> str:
+ if name.startswith("./"):
+ name = name[2:]
+ data = part.extractdata(name)
+ # check for zip content
+ if name.endswith(".gz") and auto_decompress:
+ io = BytesIO(data)
+ gz = gzip.GzipFile(fileobj=io)
+ data = _("Automatically decompressed:\n\n").encode("utf-8")
+ data += gz.read()
+ # auto-convert to hex
+ try:
+ return data.decode("utf-8")
+ except Exception:
+ new_data = _("Automatically converted to printable ascii:\n")
+ new_data += self.to_strish(data)
+ return new_data
+
+ def control_content(self, name: str) -> str:
+ """return the content of a specific control.tar.gz file"""
+ try:
+ return self._get_content(self._debfile.control, name)
+ except LookupError:
+ return ""
+
+ def data_content(self, name: str) -> str:
+ """return the content of a specific control.tar.gz file"""
+ try:
+ return self._get_content(self._debfile.data, name)
+ except LookupError:
+ return ""
+
+ def _dbg(self, level: int, msg: str) -> None:
+ """Write debugging output to sys.stderr."""
+ if level <= self.debug:
+ print(msg, file=sys.stderr)
+
+ def install(
+ self, install_progress: apt.progress.base.InstallProgress | None = None
+ ) -> int:
+ """Install the package."""
+ if self.filename is None:
+ raise apt_pkg.Error("No filename specified")
+ if install_progress is None:
+ return os.spawnlp(os.P_WAIT, "dpkg", "dpkg", "-i", self.filename)
+ else:
+ try:
+ install_progress.start_update()
+ except AttributeError:
+ install_progress.startUpdate() # type: ignore
+ res = install_progress.run(self.filename)
+ try:
+ install_progress.finish_update()
+ except AttributeError:
+ install_progress.finishUpdate() # type: ignore
+ return res
+
+
+class DscSrcPackage(DebPackage):
+ """A locally available source package."""
+
+ def __init__(
+ self, filename: str | None = None, cache: apt.Cache | None = None
+ ) -> None:
+ DebPackage.__init__(self, None, cache)
+ self.filename: str | None = filename
+ self._depends: list[list[tuple[str, str, str]]] = []
+ self._conflicts: list[list[tuple[str, str, str]]] = []
+ self._installed_conflicts: set[str] = set()
+ self.pkgname = ""
+ self.binaries: list[str] = []
+ self._sections: dict[str, str] = {}
+ if self.filename is not None:
+ self.open(self.filename)
+
+ @property
+ def depends(self) -> list[list[tuple[str, str, str]]]:
+ """Return the dependencies of the package"""
+ return self._depends
+
+ @property
+ def conflicts(self) -> list[list[tuple[str, str, str]]]:
+ """Return the dependencies of the package"""
+ return self._conflicts
+
+ @property
+ def filelist(self) -> list[str]:
+ """Return the list of files associated with this dsc file"""
+ # Files stanza looks like (hash, size, filename, ...)
+ return self._sections["Files"].split()[2::3]
+
+ def open(self, file: str) -> None:
+ """Open the package."""
+ depends_tags = ["Build-Depends", "Build-Depends-Indep"]
+ conflicts_tags = ["Build-Conflicts", "Build-Conflicts-Indep"]
+ fd = apt_pkg.open_maybe_clear_signed_file(file)
+ fobj = os.fdopen(fd)
+ tagfile = apt_pkg.TagFile(fobj)
+ try:
+ for sec in tagfile:
+ for tag in depends_tags:
+ if tag not in sec:
+ continue
+ self._depends.extend(apt_pkg.parse_src_depends(sec[tag]))
+ for tag in conflicts_tags:
+ if tag not in sec:
+ continue
+ self._conflicts.extend(apt_pkg.parse_src_depends(sec[tag]))
+ if "Source" in sec:
+ self.pkgname = sec["Source"]
+ if "Binary" in sec:
+ self.binaries = [b.strip() for b in sec["Binary"].split(",")]
+ for tag in sec.keys():
+ if tag in sec:
+ self._sections[tag] = sec[tag]
+ finally:
+ del tagfile
+ fobj.close()
+
+ s = _(
+ "Install Build-Dependencies for " "source package '%s' that builds %s\n"
+ ) % (self.pkgname, " ".join(self.binaries))
+ self._sections["Description"] = s
+ self._check_was_run = False
+
+ def check(self, allow_downgrade: bool = False) -> bool:
+ """Check if the package is installable.
+
+ The second parameter is ignored and only exists for compatibility
+ with parent type."""
+ if not self.check_conflicts():
+ for pkgname in self._installed_conflicts:
+ if self._cache[pkgname]._pkg.essential:
+ raise Exception(_("An essential package would be removed"))
+ self._cache[pkgname].mark_delete()
+ # properties are ok now
+ self._check_was_run = True
+ # FIXME: a additional run of the check_conflicts()
+ # after _satisfy_depends() should probably be done
+ return self._satisfy_depends(self.depends)
+
+
+def _test() -> None:
+ """Test function"""
+ from apt.cache import Cache
+ from apt.progress.base import InstallProgress
+
+ cache = Cache()
+
+ vp = "www-browser"
+ print(f"{vp} virtual: {cache.is_virtual_package(vp)}")
+ providers = cache.get_providing_packages(vp)
+ print("Providers for %s :" % vp)
+ for pkg in providers:
+ print(" %s" % pkg.name)
+
+ d = DebPackage(sys.argv[1], cache)
+ print("Deb: %s" % d.pkgname)
+ if not d.check():
+ print("can't be satified")
+ print(d._failure_string)
+ print("missing deps: %s" % d.missing_deps)
+ print(d.required_changes)
+
+ print(d.filelist)
+
+ print("Installing ...")
+ ret = d.install(InstallProgress())
+ print(ret)
+
+ # s = DscSrcPackage(cache, "../tests/3ddesktop_0.2.9-6.dsc")
+ # s.check_dep()
+ # print "Missing deps: ",s.missingDeps
+ # print "Print required changes: ", s.requiredChanges
+
+ s = DscSrcPackage(cache=cache)
+ ds = "libc6 (>= 2.3.2), libaio (>= 0.3.96) | libaio1 (>= 0.3.96)"
+ print(s._satisfy_depends(apt_pkg.parse_depends(ds, False)))
+
+
+if __name__ == "__main__":
+ _test()