summaryrefslogtreecommitdiffstats
path: root/apt/progress/text.py
diff options
context:
space:
mode:
Diffstat (limited to 'apt/progress/text.py')
-rw-r--r--apt/progress/text.py294
1 files changed, 294 insertions, 0 deletions
diff --git a/apt/progress/text.py b/apt/progress/text.py
new file mode 100644
index 0000000..ea1a176
--- /dev/null
+++ b/apt/progress/text.py
@@ -0,0 +1,294 @@
+# Copyright (c) 2009 Julian Andres Klode <jak@debian.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+"""Progress reporting for text interfaces."""
+import io
+import os
+import signal
+import sys
+import types
+from collections.abc import Callable
+
+import apt_pkg
+
+from apt.progress import base
+
+__all__ = ["AcquireProgress", "CdromProgress", "OpProgress"]
+
+
+def _(msg: str) -> str:
+ """Translate the message, also try apt if translation is missing."""
+ res = apt_pkg.gettext(msg)
+ if res == msg:
+ res = apt_pkg.gettext(msg, "apt")
+ return res
+
+
+class TextProgress:
+ """Internal Base class for text progress classes."""
+
+ def __init__(self, outfile: io.TextIOBase | None = None) -> None:
+ self._file = outfile or sys.stdout
+ self._width = 0
+
+ def _write(self, msg: str, newline: bool = True, maximize: bool = False) -> None:
+ """Write the message on the terminal, fill remaining space."""
+ self._file.write("\r")
+ self._file.write(msg)
+
+ # Fill remaining stuff with whitespace
+ if self._width > len(msg):
+ self._file.write((self._width - len(msg)) * " ")
+ elif maximize: # Needed for OpProgress.
+ self._width = max(self._width, len(msg))
+ if newline:
+ self._file.write("\n")
+ else:
+ # self._file.write("\r")
+ self._file.flush()
+
+
+class OpProgress(base.OpProgress, TextProgress):
+ """Operation progress reporting.
+
+ This closely resembles OpTextProgress in libapt-pkg.
+ """
+
+ def __init__(self, outfile: io.TextIOBase | None = None) -> None:
+ TextProgress.__init__(self, outfile)
+ base.OpProgress.__init__(self)
+ self.old_op = ""
+
+ def update(self, percent: float | None = None) -> None:
+ """Called periodically to update the user interface."""
+ base.OpProgress.update(self, percent)
+ if self.major_change and self.old_op:
+ self._write(self.old_op)
+ self._write("%s... %i%%\r" % (self.op, self.percent), False, True)
+ self.old_op = self.op
+
+ def done(self) -> None:
+ """Called once an operation has been completed."""
+ base.OpProgress.done(self)
+ if self.old_op:
+ self._write(_("%c%s... Done") % ("\r", self.old_op), True, True)
+ self.old_op = ""
+
+
+class AcquireProgress(base.AcquireProgress, TextProgress):
+ """AcquireProgress for the text interface."""
+
+ def __init__(self, outfile: io.TextIOBase | None = None) -> None:
+ TextProgress.__init__(self, outfile)
+ base.AcquireProgress.__init__(self)
+ self._signal: (
+ Callable[[int, types.FrameType | None], None] | int | signal.Handlers | None
+ ) = None # noqa
+ self._width = 80
+ self._id = 1
+
+ def start(self) -> None:
+ """Start an Acquire progress.
+
+ In this case, the function sets up a signal handler for SIGWINCH, i.e.
+ window resize signals. And it also sets id to 1.
+ """
+ base.AcquireProgress.start(self)
+ self._signal = signal.signal(signal.SIGWINCH, self._winch)
+ # Get the window size.
+ self._winch()
+ self._id = 1
+
+ def _winch(self, *dummy: object) -> None:
+ """Signal handler for window resize signals."""
+ if hasattr(self._file, "fileno") and os.isatty(self._file.fileno()):
+ import fcntl
+ import struct
+ import termios
+
+ buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * b" ") # noqa
+ dummy, col, dummy, dummy = struct.unpack("hhhh", buf)
+ self._width = col - 1 # 1 for the cursor
+
+ def ims_hit(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Called when an item is update (e.g. not modified on the server)."""
+ base.AcquireProgress.ims_hit(self, item)
+ line = _("Hit ") + item.description
+ if item.owner.filesize:
+ line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize)
+ self._write(line)
+
+ def fail(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Called when an item is failed."""
+ base.AcquireProgress.fail(self, item)
+ if item.owner.status == item.owner.STAT_DONE:
+ self._write(_("Ign ") + item.description)
+ else:
+ self._write(_("Err ") + item.description)
+ self._write(" %s" % item.owner.error_text)
+
+ def fetch(self, item: apt_pkg.AcquireItemDesc) -> None:
+ """Called when some of the item's data is fetched."""
+ base.AcquireProgress.fetch(self, item)
+ # It's complete already (e.g. Hit)
+ if item.owner.complete:
+ return
+ item.owner.id = self._id
+ self._id += 1
+ line = _("Get:") + f"{item.owner.id} {item.description}"
+ if item.owner.filesize:
+ line += " [%sB]" % apt_pkg.size_to_str(item.owner.filesize)
+
+ self._write(line)
+
+ def pulse(self, owner: apt_pkg.Acquire) -> bool:
+ """Periodically invoked while the Acquire process is underway.
+
+ Return False if the user asked to cancel the whole Acquire process."""
+ base.AcquireProgress.pulse(self, owner)
+ # only show progress on a tty to not clutter log files etc
+ if hasattr(self._file, "fileno") and not os.isatty(self._file.fileno()):
+ return True
+
+ # calculate progress
+ percent = ((self.current_bytes + self.current_items) * 100.0) / float(
+ self.total_bytes + self.total_items
+ )
+
+ shown = False
+ tval = "%i%%" % percent
+ end = ""
+ if self.current_cps:
+ eta = int(float(self.total_bytes - self.current_bytes) / self.current_cps)
+ end = " {}B/s {}".format(
+ apt_pkg.size_to_str(self.current_cps),
+ apt_pkg.time_to_str(eta),
+ )
+
+ for worker in owner.workers:
+ val = ""
+ if not worker.current_item:
+ if worker.status:
+ val = " [%s]" % worker.status
+ if len(tval) + len(val) + len(end) >= self._width:
+ break
+ tval += val
+ shown = True
+ continue
+ shown = True
+
+ if worker.current_item.owner.id:
+ val += " [%i %s" % (
+ worker.current_item.owner.id,
+ worker.current_item.shortdesc,
+ )
+ else:
+ val += " [%s" % worker.current_item.description
+ if worker.current_item.owner.active_subprocess:
+ val += " %s" % worker.current_item.owner.active_subprocess
+
+ val += " %sB" % apt_pkg.size_to_str(worker.current_size)
+
+ # Add the total size and percent
+ if worker.total_size and not worker.current_item.owner.complete:
+ val += "/%sB %i%%" % (
+ apt_pkg.size_to_str(worker.total_size),
+ worker.current_size * 100.0 / worker.total_size,
+ )
+
+ val += "]"
+
+ if len(tval) + len(val) + len(end) >= self._width:
+ # Display as many items as screen width
+ break
+ else:
+ tval += val
+
+ if not shown:
+ tval += _(" [Working]")
+
+ if self.current_cps:
+ tval += (self._width - len(end) - len(tval)) * " " + end
+
+ self._write(tval, False)
+ return True
+
+ def media_change(self, medium: str, drive: str) -> bool:
+ """Prompt the user to change the inserted removable media."""
+ base.AcquireProgress.media_change(self, medium, drive)
+ self._write(
+ _(
+ "Media change: please insert the disc labeled\n"
+ " '%s'\n"
+ "in the drive '%s' and press enter\n"
+ )
+ % (medium, drive)
+ )
+ return input() not in ("c", "C")
+
+ def stop(self) -> None:
+ """Invoked when the Acquire process stops running."""
+ base.AcquireProgress.stop(self)
+ # Trick for getting a translation from apt
+ self._write(
+ (
+ _("Fetched %sB in %s (%sB/s)\n")
+ % (
+ apt_pkg.size_to_str(self.fetched_bytes),
+ apt_pkg.time_to_str(self.elapsed_time),
+ apt_pkg.size_to_str(self.current_cps),
+ )
+ ).rstrip("\n")
+ )
+
+ # Delete the signal again.
+ import signal
+
+ signal.signal(signal.SIGWINCH, self._signal)
+
+
+class CdromProgress(base.CdromProgress, TextProgress):
+ """Text CD-ROM progress."""
+
+ def ask_cdrom_name(self) -> str | None:
+ """Ask the user to provide a name for the disc."""
+ base.CdromProgress.ask_cdrom_name(self)
+ self._write(
+ _(
+ "Please provide a name for this medium, such as "
+ "'Debian 2.1r1 Disk 1'"
+ ),
+ False,
+ )
+ try:
+ return str(input(":"))
+ except KeyboardInterrupt:
+ return None
+
+ def update(self, text: str, current: int) -> None:
+ """Set the current progress."""
+ base.CdromProgress.update(self, text, current)
+ if text:
+ self._write(text, False)
+
+ def change_cdrom(self) -> bool:
+ """Ask the user to change the CD-ROM."""
+ base.CdromProgress.change_cdrom(self)
+ self._write(_("Please insert an installation medium and press enter"), False)
+ try:
+ return bool(input() == "")
+ except KeyboardInterrupt:
+ return False