From 76926159194e180003aa78de97e5f287bf4325a5 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 20:07:41 +0200 Subject: Adding upstream version 2.7.6. Signed-off-by: Daniel Baumann --- apt/progress/__init__.py | 28 ++++ apt/progress/base.py | 332 +++++++++++++++++++++++++++++++++++++++++++++++ apt/progress/text.py | 294 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 654 insertions(+) create mode 100644 apt/progress/__init__.py create mode 100644 apt/progress/base.py create mode 100644 apt/progress/text.py (limited to 'apt/progress') diff --git a/apt/progress/__init__.py b/apt/progress/__init__.py new file mode 100644 index 0000000..d1687d5 --- /dev/null +++ b/apt/progress/__init__.py @@ -0,0 +1,28 @@ +# apt/progress/__init__.py - Initialization file for apt.progress. +# +# Copyright (c) 2009 Julian Andres Klode +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting. + +This package provides progress reporting for the python-apt package. The module +'base' provides classes with no output, and the module 'text' provides classes +for terminals, etc. +""" + +from collections.abc import Sequence + +__all__: Sequence[str] = [] diff --git a/apt/progress/base.py b/apt/progress/base.py new file mode 100644 index 0000000..ede5e5c --- /dev/null +++ b/apt/progress/base.py @@ -0,0 +1,332 @@ +# apt/progress/base.py - Base classes for progress reporting. +# +# Copyright (C) 2009 Julian Andres Klode +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +# pylint: disable-msg = R0201 +"""Base classes for progress reporting. + +Custom progress classes should inherit from these classes. They can also be +used as dummy progress classes which simply do nothing. +""" +from __future__ import annotations + +import errno +import fcntl +import io +import os +import re +import select +import sys + +import apt_pkg + +__all__ = ["AcquireProgress", "CdromProgress", "InstallProgress", "OpProgress"] + + +class AcquireProgress: + """Monitor object for downloads controlled by the Acquire class. + + This is an mostly abstract class. You should subclass it and implement the + methods to get something useful. + """ + + current_bytes = current_cps = fetched_bytes = last_bytes = total_bytes = 0.0 + current_items = elapsed_time = total_items = 0 + + def done(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item is successfully and completely fetched.""" + + def fail(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item could not be fetched.""" + + def fetch(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when some of the item's data is fetched.""" + + def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None: + """Invoked when an item is confirmed to be up-to-date. + + Invoked when an item is confirmed to be up-to-date. For instance, + when an HTTP download is informed that the file on the server was + not modified. + """ + + def media_change(self, media: str, drive: str) -> bool: + """Prompt the user to change the inserted removable media. + + The parameter 'media' decribes the name of the media type that + should be changed, whereas the parameter 'drive' should be the + identifying name of the drive whose media should be changed. + + This method should not return until the user has confirmed to the user + interface that the media change is complete. It must return True if + the user confirms the media change, or False to cancel it. + """ + return False + + def pulse(self, owner: apt_pkg.Acquire) -> bool: + """Periodically invoked while the Acquire process is underway. + + This method gets invoked while the Acquire progress given by the + parameter 'owner' is underway. It should display information about + the current state. + + This function returns a boolean value indicating whether the + acquisition should be continued (True) or cancelled (False). + """ + return True + + def start(self) -> None: + """Invoked when the Acquire process starts running.""" + # Reset all our values. + self.current_bytes = 0.0 + self.current_cps = 0.0 + self.current_items = 0 + self.elapsed_time = 0 + self.fetched_bytes = 0.0 + self.last_bytes = 0.0 + self.total_bytes = 0.0 + self.total_items = 0 + + def stop(self) -> None: + """Invoked when the Acquire process stops running.""" + + +class CdromProgress: + """Base class for reporting the progress of adding a cdrom. + + Can be used with apt_pkg.Cdrom to produce an utility like apt-cdrom. The + attribute 'total_steps' defines the total number of steps and can be used + in update() to display the current progress. + """ + + total_steps = 0 + + def ask_cdrom_name(self) -> str | None: + """Ask for the name of the cdrom. + + If a name has been provided, return it. Otherwise, return None to + cancel the operation. + """ + + def change_cdrom(self) -> bool: + """Ask for the CD-ROM to be changed. + + Return True once the cdrom has been changed or False to cancel the + operation. + """ + return False + + def update(self, text: str, current: int) -> None: + """Periodically invoked to update the interface. + + The string 'text' defines the text which should be displayed. The + integer 'current' defines the number of completed steps. + """ + + +class InstallProgress: + """Class to report the progress of installing packages.""" + + child_pid, percent, select_timeout, status = 0, 0.0, 0.1, "" + + def __init__(self) -> None: + (self.statusfd, self.writefd) = os.pipe() + # These will leak fds, but fixing this safely requires API changes. + self.write_stream: io.TextIOBase = os.fdopen(self.writefd, "w") + self.status_stream: io.TextIOBase = os.fdopen(self.statusfd, "r") # noqa + fcntl.fcntl(self.statusfd, fcntl.F_SETFL, os.O_NONBLOCK) + + def start_update(self) -> None: + """(Abstract) Start update.""" + + def finish_update(self) -> None: + """(Abstract) Called when update has finished.""" + + def __enter__(self) -> InstallProgress: + return self + + def __exit__(self, type: object, value: object, traceback: object) -> None: + self.write_stream.close() + self.status_stream.close() + + def error(self, pkg: str, errormsg: str) -> None: + """(Abstract) Called when a error is detected during the install.""" + + def conffile(self, current: str, new: str) -> None: + """(Abstract) Called when a conffile question from dpkg is detected.""" + + def status_change(self, pkg: str, percent: float, status: str) -> None: + """(Abstract) Called when the APT status changed.""" + + def dpkg_status_change(self, pkg: str, status: str) -> None: + """(Abstract) Called when the dpkg status changed.""" + + def processing(self, pkg: str, stage: str) -> None: + """(Abstract) Sent just before a processing stage starts. + + The parameter 'stage' is one of "upgrade", "install" + (both sent before unpacking), "configure", "trigproc", "remove", + "purge". This method is used for dpkg only. + """ + + def run(self, obj: apt_pkg.PackageManager | bytes | str) -> int: + """Install using the object 'obj'. + + This functions runs install actions. The parameter 'obj' may either + be a PackageManager object in which case its do_install() method is + called or the path to a deb file. + + If the object is a PackageManager, the functions returns the result + of calling its do_install() method. Otherwise, the function returns + the exit status of dpkg. In both cases, 0 means that there were no + problems. + """ + pid = self.fork() + if pid == 0: + try: + # PEP-446 implemented in Python 3.4 made all descriptors + # CLOEXEC, but we need to be able to pass writefd to dpkg + # when we spawn it + os.set_inheritable(self.writefd, True) + except AttributeError: # if we don't have os.set_inheritable() + pass + # pm.do_install might raise a exception, + # when this happens, we need to catch + # it, otherwise os._exit() is not run + # and the execution continues in the + # parent code leading to very confusing bugs + try: + os._exit(obj.do_install(self.write_stream.fileno())) # type: ignore # noqa + except AttributeError: + os._exit( + os.spawnlp( + os.P_WAIT, + "dpkg", + "dpkg", + "--status-fd", + str(self.write_stream.fileno()), + "-i", + obj, # type: ignore # noqa + ) + ) + except Exception as e: + sys.stderr.write("%s\n" % e) + os._exit(apt_pkg.PackageManager.RESULT_FAILED) + + self.child_pid = pid + res = self.wait_child() + return os.WEXITSTATUS(res) + + def fork(self) -> int: + """Fork.""" + return os.fork() + + def update_interface(self) -> None: + """Update the interface.""" + try: + line = self.status_stream.readline() + except OSError as err: + # resource temporarly unavailable is ignored + if err.errno != errno.EAGAIN and err.errno != errno.EWOULDBLOCK: + print(err.strerror) + return + + pkgname = status = status_str = percent = base = "" + + if line.startswith("pm"): + try: + (status, pkgname, percent, status_str) = line.split(":", 3) + except ValueError: + # silently ignore lines that can't be parsed + return + elif line.startswith("status"): + try: + (base, pkgname, status, status_str) = line.split(":", 3) + except ValueError: + (base, pkgname, status) = line.split(":", 2) + elif line.startswith("processing"): + (status, status_str, pkgname) = line.split(":", 2) + self.processing(pkgname.strip(), status_str.strip()) + + # Always strip the status message + pkgname = pkgname.strip() + status_str = status_str.strip() + status = status.strip() + + if status == "pmerror" or status == "error": + self.error(pkgname, status_str) + elif status == "conffile-prompt" or status == "pmconffile": + match = re.match("\\s*'(.*)'\\s*'(.*)'.*", status_str) + if match: + self.conffile(match.group(1), match.group(2)) + elif status == "pmstatus": + # FIXME: Float comparison + if float(percent) != self.percent or status_str != self.status: + self.status_change(pkgname, float(percent), status_str.strip()) + self.percent = float(percent) + self.status = status_str.strip() + elif base == "status": + self.dpkg_status_change(pkgname, status) + + def wait_child(self) -> int: + """Wait for child progress to exit. + + This method is responsible for calling update_interface() from time to + time. It exits once the child has exited. The return values is the + full status returned from os.waitpid() (not only the return code). + """ + (pid, res) = (0, 0) + while True: + try: + select.select([self.status_stream], [], [], self.select_timeout) + except OSError as error: + (errno_, _errstr) = error.args + if errno_ != errno.EINTR: + raise + + self.update_interface() + try: + (pid, res) = os.waitpid(self.child_pid, os.WNOHANG) + if pid == self.child_pid: + break + except OSError as err: + if err.errno == errno.ECHILD: + break + if err.errno != errno.EINTR: + raise + + return res + + +class OpProgress: + """Monitor objects for operations. + + Display the progress of operations such as opening the cache.""" + + major_change, op, percent, subop = False, "", 0.0, "" + + def update(self, percent: float | None = None) -> None: + """Called periodically to update the user interface. + + You may use the optional argument 'percent' to set the attribute + 'percent' in this call. + """ + if percent is not None: + self.percent = percent + + def done(self) -> None: + """Called once an operation has been completed.""" diff --git a/apt/progress/text.py b/apt/progress/text.py new file mode 100644 index 0000000..ea1a176 --- /dev/null +++ b/apt/progress/text.py @@ -0,0 +1,294 @@ +# Copyright (c) 2009 Julian Andres Klode +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting for text interfaces.""" +import io +import os +import signal +import sys +import types +from collections.abc import Callable + +import apt_pkg + +from apt.progress import base + +__all__ = ["AcquireProgress", "CdromProgress", "OpProgress"] + + +def _(msg: str) -> str: + """Translate the message, also try apt if translation is missing.""" + res = apt_pkg.gettext(msg) + if res == msg: + res = apt_pkg.gettext(msg, "apt") + return res + + +class TextProgress: + """Internal Base class for text progress classes.""" + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + self._file = outfile or sys.stdout + self._width = 0 + + def _write(self, msg: str, newline: bool = True, maximize: bool = False) -> None: + """Write the message on the terminal, fill remaining space.""" + self._file.write("\r") + self._file.write(msg) + + # Fill remaining stuff with whitespace + if self._width > len(msg): + self._file.write((self._width - len(msg)) * " ") + elif maximize: # Needed for OpProgress. + self._width = max(self._width, len(msg)) + if newline: + self._file.write("\n") + else: + # self._file.write("\r") + self._file.flush() + + +class OpProgress(base.OpProgress, TextProgress): + """Operation progress reporting. + + This closely resembles OpTextProgress in libapt-pkg. + """ + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + TextProgress.__init__(self, outfile) + base.OpProgress.__init__(self) + self.old_op = "" + + def update(self, percent: float | None = None) -> None: + """Called periodically to update the user interface.""" + base.OpProgress.update(self, percent) + if self.major_change and self.old_op: + self._write(self.old_op) + self._write("%s... %i%%\r" % (self.op, self.percent), False, True) + self.old_op = self.op + + def done(self) -> None: + """Called once an operation has been completed.""" + base.OpProgress.done(self) + if self.old_op: + self._write(_("%c%s... Done") % ("\r", self.old_op), True, True) + self.old_op = "" + + +class AcquireProgress(base.AcquireProgress, TextProgress): + """AcquireProgress for the text interface.""" + + def __init__(self, outfile: io.TextIOBase | None = None) -> None: + TextProgress.__init__(self, outfile) + base.AcquireProgress.__init__(self) + self._signal: ( + Callable[[int, types.FrameType | None], None] | int | signal.Handlers | None + ) = None # noqa + self._width = 80 + self._id = 1 + + def start(self) -> None: + """Start an Acquire progress. + + In this case, the function sets up a signal handler for SIGWINCH, i.e. + window resize signals. And it also sets id to 1. + """ + base.AcquireProgress.start(self) + self._signal = signal.signal(signal.SIGWINCH, self._winch) + # Get the window size. + self._winch() + self._id = 1 + + def _winch(self, *dummy: object) -> None: + """Signal handler for window resize signals.""" + if hasattr(self._file, "fileno") and os.isatty(self._file.fileno()): + import fcntl + import struct + import termios + + buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * b" ") # noqa + dummy, col, dummy, dummy = struct.unpack("hhhh", buf) + self._width = col - 1 # 1 for the cursor + + def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when an item is update (e.g. not modified on the server).""" + base.AcquireProgress.ims_hit(self, item) + line = _("Hit ") + item.description + if item.owner.filesize: + line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize) + self._write(line) + + def fail(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when an item is failed.""" + base.AcquireProgress.fail(self, item) + if item.owner.status == item.owner.STAT_DONE: + self._write(_("Ign ") + item.description) + else: + self._write(_("Err ") + item.description) + self._write(" %s" % item.owner.error_text) + + def fetch(self, item: apt_pkg.AcquireItemDesc) -> None: + """Called when some of the item's data is fetched.""" + base.AcquireProgress.fetch(self, item) + # It's complete already (e.g. Hit) + if item.owner.complete: + return + item.owner.id = self._id + self._id += 1 + line = _("Get:") + f"{item.owner.id} {item.description}" + if item.owner.filesize: + line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize) + + self._write(line) + + def pulse(self, owner: apt_pkg.Acquire) -> bool: + """Periodically invoked while the Acquire process is underway. + + Return False if the user asked to cancel the whole Acquire process.""" + base.AcquireProgress.pulse(self, owner) + # only show progress on a tty to not clutter log files etc + if hasattr(self._file, "fileno") and not os.isatty(self._file.fileno()): + return True + + # calculate progress + percent = ((self.current_bytes + self.current_items) * 100.0) / float( + self.total_bytes + self.total_items + ) + + shown = False + tval = "%i%%" % percent + end = "" + if self.current_cps: + eta = int(float(self.total_bytes - self.current_bytes) / self.current_cps) + end = " {}B/s {}".format( + apt_pkg.size_to_str(self.current_cps), + apt_pkg.time_to_str(eta), + ) + + for worker in owner.workers: + val = "" + if not worker.current_item: + if worker.status: + val = " [%s]" % worker.status + if len(tval) + len(val) + len(end) >= self._width: + break + tval += val + shown = True + continue + shown = True + + if worker.current_item.owner.id: + val += " [%i %s" % ( + worker.current_item.owner.id, + worker.current_item.shortdesc, + ) + else: + val += " [%s" % worker.current_item.description + if worker.current_item.owner.active_subprocess: + val += " %s" % worker.current_item.owner.active_subprocess + + val += " %sB" % apt_pkg.size_to_str(worker.current_size) + + # Add the total size and percent + if worker.total_size and not worker.current_item.owner.complete: + val += "/%sB %i%%" % ( + apt_pkg.size_to_str(worker.total_size), + worker.current_size * 100.0 / worker.total_size, + ) + + val += "]" + + if len(tval) + len(val) + len(end) >= self._width: + # Display as many items as screen width + break + else: + tval += val + + if not shown: + tval += _(" [Working]") + + if self.current_cps: + tval += (self._width - len(end) - len(tval)) * " " + end + + self._write(tval, False) + return True + + def media_change(self, medium: str, drive: str) -> bool: + """Prompt the user to change the inserted removable media.""" + base.AcquireProgress.media_change(self, medium, drive) + self._write( + _( + "Media change: please insert the disc labeled\n" + " '%s'\n" + "in the drive '%s' and press enter\n" + ) + % (medium, drive) + ) + return input() not in ("c", "C") + + def stop(self) -> None: + """Invoked when the Acquire process stops running.""" + base.AcquireProgress.stop(self) + # Trick for getting a translation from apt + self._write( + ( + _("Fetched %sB in %s (%sB/s)\n") + % ( + apt_pkg.size_to_str(self.fetched_bytes), + apt_pkg.time_to_str(self.elapsed_time), + apt_pkg.size_to_str(self.current_cps), + ) + ).rstrip("\n") + ) + + # Delete the signal again. + import signal + + signal.signal(signal.SIGWINCH, self._signal) + + +class CdromProgress(base.CdromProgress, TextProgress): + """Text CD-ROM progress.""" + + def ask_cdrom_name(self) -> str | None: + """Ask the user to provide a name for the disc.""" + base.CdromProgress.ask_cdrom_name(self) + self._write( + _( + "Please provide a name for this medium, such as " + "'Debian 2.1r1 Disk 1'" + ), + False, + ) + try: + return str(input(":")) + except KeyboardInterrupt: + return None + + def update(self, text: str, current: int) -> None: + """Set the current progress.""" + base.CdromProgress.update(self, text, current) + if text: + self._write(text, False) + + def change_cdrom(self) -> bool: + """Ask the user to change the CD-ROM.""" + base.CdromProgress.change_cdrom(self) + self._write(_("Please insert an installation medium and press enter"), False) + try: + return bool(input() == "") + except KeyboardInterrupt: + return False -- cgit v1.2.3