summaryrefslogtreecommitdiffstats
path: root/third_party/python/pip/pip/_vendor/rich/progress.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/python/pip/pip/_vendor/rich/progress.py')
-rw-r--r--third_party/python/pip/pip/_vendor/rich/progress.py1707
1 files changed, 1707 insertions, 0 deletions
diff --git a/third_party/python/pip/pip/_vendor/rich/progress.py b/third_party/python/pip/pip/_vendor/rich/progress.py
new file mode 100644
index 0000000000..e7d163c137
--- /dev/null
+++ b/third_party/python/pip/pip/_vendor/rich/progress.py
@@ -0,0 +1,1707 @@
+import io
+import sys
+import typing
+import warnings
+from abc import ABC, abstractmethod
+from collections import deque
+from collections.abc import Sized
+from dataclasses import dataclass, field
+from datetime import timedelta
+from io import RawIOBase, UnsupportedOperation
+from math import ceil
+from mmap import mmap
+from os import PathLike, stat
+from threading import Event, RLock, Thread
+from types import TracebackType
+from typing import (
+ Any,
+ BinaryIO,
+ Callable,
+ ContextManager,
+ Deque,
+ Dict,
+ Generic,
+ Iterable,
+ List,
+ NamedTuple,
+ NewType,
+ Optional,
+ Sequence,
+ TextIO,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+)
+
+if sys.version_info >= (3, 8):
+ from typing import Literal
+else:
+ from pip._vendor.typing_extensions import Literal # pragma: no cover
+
+from . import filesize, get_console
+from .console import Console, Group, JustifyMethod, RenderableType
+from .highlighter import Highlighter
+from .jupyter import JupyterMixin
+from .live import Live
+from .progress_bar import ProgressBar
+from .spinner import Spinner
+from .style import StyleType
+from .table import Column, Table
+from .text import Text, TextType
+
+TaskID = NewType("TaskID", int)
+
+ProgressType = TypeVar("ProgressType")
+
+GetTimeCallable = Callable[[], float]
+
+
+_I = typing.TypeVar("_I", TextIO, BinaryIO)
+
+
+class _TrackThread(Thread):
+ """A thread to periodically update progress."""
+
+ def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float):
+ self.progress = progress
+ self.task_id = task_id
+ self.update_period = update_period
+ self.done = Event()
+
+ self.completed = 0
+ super().__init__()
+
+ def run(self) -> None:
+ task_id = self.task_id
+ advance = self.progress.advance
+ update_period = self.update_period
+ last_completed = 0
+ wait = self.done.wait
+ while not wait(update_period):
+ completed = self.completed
+ if last_completed != completed:
+ advance(task_id, completed - last_completed)
+ last_completed = completed
+
+ self.progress.update(self.task_id, completed=self.completed, refresh=True)
+
+ def __enter__(self) -> "_TrackThread":
+ self.start()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.done.set()
+ self.join()
+
+
+def track(
+ sequence: Union[Sequence[ProgressType], Iterable[ProgressType]],
+ description: str = "Working...",
+ total: Optional[float] = None,
+ auto_refresh: bool = True,
+ console: Optional[Console] = None,
+ transient: bool = False,
+ get_time: Optional[Callable[[], float]] = None,
+ refresh_per_second: float = 10,
+ style: StyleType = "bar.back",
+ complete_style: StyleType = "bar.complete",
+ finished_style: StyleType = "bar.finished",
+ pulse_style: StyleType = "bar.pulse",
+ update_period: float = 0.1,
+ disable: bool = False,
+ show_speed: bool = True,
+) -> Iterable[ProgressType]:
+ """Track progress by iterating over a sequence.
+
+ Args:
+ sequence (Iterable[ProgressType]): A sequence (must support "len") you wish to iterate over.
+ description (str, optional): Description of task show next to progress bar. Defaults to "Working".
+ total: (float, optional): Total number of steps. Default is len(sequence).
+ auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
+ transient: (bool, optional): Clear the progress on exit. Defaults to False.
+ console (Console, optional): Console to write to. Default creates internal Console instance.
+ refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
+ style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
+ complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
+ finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
+ pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
+ update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
+ disable (bool, optional): Disable display of progress.
+ show_speed (bool, optional): Show speed if total isn't known. Defaults to True.
+ Returns:
+ Iterable[ProgressType]: An iterable of the values in the sequence.
+
+ """
+
+ columns: List["ProgressColumn"] = (
+ [TextColumn("[progress.description]{task.description}")] if description else []
+ )
+ columns.extend(
+ (
+ BarColumn(
+ style=style,
+ complete_style=complete_style,
+ finished_style=finished_style,
+ pulse_style=pulse_style,
+ ),
+ TaskProgressColumn(show_speed=show_speed),
+ TimeRemainingColumn(),
+ )
+ )
+ progress = Progress(
+ *columns,
+ auto_refresh=auto_refresh,
+ console=console,
+ transient=transient,
+ get_time=get_time,
+ refresh_per_second=refresh_per_second or 10,
+ disable=disable,
+ )
+
+ with progress:
+ yield from progress.track(
+ sequence, total=total, description=description, update_period=update_period
+ )
+
+
+class _Reader(RawIOBase, BinaryIO):
+ """A reader that tracks progress while it's being read from."""
+
+ def __init__(
+ self,
+ handle: BinaryIO,
+ progress: "Progress",
+ task: TaskID,
+ close_handle: bool = True,
+ ) -> None:
+ self.handle = handle
+ self.progress = progress
+ self.task = task
+ self.close_handle = close_handle
+ self._closed = False
+
+ def __enter__(self) -> "_Reader":
+ self.handle.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.close()
+
+ def __iter__(self) -> BinaryIO:
+ return self
+
+ def __next__(self) -> bytes:
+ line = next(self.handle)
+ self.progress.advance(self.task, advance=len(line))
+ return line
+
+ @property
+ def closed(self) -> bool:
+ return self._closed
+
+ def fileno(self) -> int:
+ return self.handle.fileno()
+
+ def isatty(self) -> bool:
+ return self.handle.isatty()
+
+ @property
+ def mode(self) -> str:
+ return self.handle.mode
+
+ @property
+ def name(self) -> str:
+ return self.handle.name
+
+ def readable(self) -> bool:
+ return self.handle.readable()
+
+ def seekable(self) -> bool:
+ return self.handle.seekable()
+
+ def writable(self) -> bool:
+ return False
+
+ def read(self, size: int = -1) -> bytes:
+ block = self.handle.read(size)
+ self.progress.advance(self.task, advance=len(block))
+ return block
+
+ def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override]
+ n = self.handle.readinto(b) # type: ignore[attr-defined]
+ self.progress.advance(self.task, advance=n)
+ return n
+
+ def readline(self, size: int = -1) -> bytes: # type: ignore[override]
+ line = self.handle.readline(size)
+ self.progress.advance(self.task, advance=len(line))
+ return line
+
+ def readlines(self, hint: int = -1) -> List[bytes]:
+ lines = self.handle.readlines(hint)
+ self.progress.advance(self.task, advance=sum(map(len, lines)))
+ return lines
+
+ def close(self) -> None:
+ if self.close_handle:
+ self.handle.close()
+ self._closed = True
+
+ def seek(self, offset: int, whence: int = 0) -> int:
+ pos = self.handle.seek(offset, whence)
+ self.progress.update(self.task, completed=pos)
+ return pos
+
+ def tell(self) -> int:
+ return self.handle.tell()
+
+ def write(self, s: Any) -> int:
+ raise UnsupportedOperation("write")
+
+
+class _ReadContext(ContextManager[_I], Generic[_I]):
+ """A utility class to handle a context for both a reader and a progress."""
+
+ def __init__(self, progress: "Progress", reader: _I) -> None:
+ self.progress = progress
+ self.reader: _I = reader
+
+ def __enter__(self) -> _I:
+ self.progress.start()
+ return self.reader.__enter__()
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.progress.stop()
+ self.reader.__exit__(exc_type, exc_val, exc_tb)
+
+
+def wrap_file(
+ file: BinaryIO,
+ total: int,
+ *,
+ description: str = "Reading...",
+ auto_refresh: bool = True,
+ console: Optional[Console] = None,
+ transient: bool = False,
+ get_time: Optional[Callable[[], float]] = None,
+ refresh_per_second: float = 10,
+ style: StyleType = "bar.back",
+ complete_style: StyleType = "bar.complete",
+ finished_style: StyleType = "bar.finished",
+ pulse_style: StyleType = "bar.pulse",
+ disable: bool = False,
+) -> ContextManager[BinaryIO]:
+ """Read bytes from a file while tracking progress.
+
+ Args:
+ file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
+ total (int): Total number of bytes to read.
+ description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
+ auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
+ transient: (bool, optional): Clear the progress on exit. Defaults to False.
+ console (Console, optional): Console to write to. Default creates internal Console instance.
+ refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
+ style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
+ complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
+ finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
+ pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
+ disable (bool, optional): Disable display of progress.
+ Returns:
+ ContextManager[BinaryIO]: A context manager yielding a progress reader.
+
+ """
+
+ columns: List["ProgressColumn"] = (
+ [TextColumn("[progress.description]{task.description}")] if description else []
+ )
+ columns.extend(
+ (
+ BarColumn(
+ style=style,
+ complete_style=complete_style,
+ finished_style=finished_style,
+ pulse_style=pulse_style,
+ ),
+ DownloadColumn(),
+ TimeRemainingColumn(),
+ )
+ )
+ progress = Progress(
+ *columns,
+ auto_refresh=auto_refresh,
+ console=console,
+ transient=transient,
+ get_time=get_time,
+ refresh_per_second=refresh_per_second or 10,
+ disable=disable,
+ )
+
+ reader = progress.wrap_file(file, total=total, description=description)
+ return _ReadContext(progress, reader)
+
+
+@typing.overload
+def open(
+ file: Union[str, "PathLike[str]", bytes],
+ mode: Union[Literal["rt"], Literal["r"]],
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ *,
+ total: Optional[int] = None,
+ description: str = "Reading...",
+ auto_refresh: bool = True,
+ console: Optional[Console] = None,
+ transient: bool = False,
+ get_time: Optional[Callable[[], float]] = None,
+ refresh_per_second: float = 10,
+ style: StyleType = "bar.back",
+ complete_style: StyleType = "bar.complete",
+ finished_style: StyleType = "bar.finished",
+ pulse_style: StyleType = "bar.pulse",
+ disable: bool = False,
+) -> ContextManager[TextIO]:
+ pass
+
+
+@typing.overload
+def open(
+ file: Union[str, "PathLike[str]", bytes],
+ mode: Literal["rb"],
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ *,
+ total: Optional[int] = None,
+ description: str = "Reading...",
+ auto_refresh: bool = True,
+ console: Optional[Console] = None,
+ transient: bool = False,
+ get_time: Optional[Callable[[], float]] = None,
+ refresh_per_second: float = 10,
+ style: StyleType = "bar.back",
+ complete_style: StyleType = "bar.complete",
+ finished_style: StyleType = "bar.finished",
+ pulse_style: StyleType = "bar.pulse",
+ disable: bool = False,
+) -> ContextManager[BinaryIO]:
+ pass
+
+
+def open(
+ file: Union[str, "PathLike[str]", bytes],
+ mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ *,
+ total: Optional[int] = None,
+ description: str = "Reading...",
+ auto_refresh: bool = True,
+ console: Optional[Console] = None,
+ transient: bool = False,
+ get_time: Optional[Callable[[], float]] = None,
+ refresh_per_second: float = 10,
+ style: StyleType = "bar.back",
+ complete_style: StyleType = "bar.complete",
+ finished_style: StyleType = "bar.finished",
+ pulse_style: StyleType = "bar.pulse",
+ disable: bool = False,
+) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]:
+ """Read bytes from a file while tracking progress.
+
+ Args:
+ path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
+ mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
+ buffering (int): The buffering strategy to use, see :func:`io.open`.
+ encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
+ errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
+ newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`
+ total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size.
+ description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
+ auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
+ transient: (bool, optional): Clear the progress on exit. Defaults to False.
+ console (Console, optional): Console to write to. Default creates internal Console instance.
+ refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
+ style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
+ complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
+ finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
+ pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
+ disable (bool, optional): Disable display of progress.
+ encoding (str, optional): The encoding to use when reading in text mode.
+
+ Returns:
+ ContextManager[BinaryIO]: A context manager yielding a progress reader.
+
+ """
+
+ columns: List["ProgressColumn"] = (
+ [TextColumn("[progress.description]{task.description}")] if description else []
+ )
+ columns.extend(
+ (
+ BarColumn(
+ style=style,
+ complete_style=complete_style,
+ finished_style=finished_style,
+ pulse_style=pulse_style,
+ ),
+ DownloadColumn(),
+ TimeRemainingColumn(),
+ )
+ )
+ progress = Progress(
+ *columns,
+ auto_refresh=auto_refresh,
+ console=console,
+ transient=transient,
+ get_time=get_time,
+ refresh_per_second=refresh_per_second or 10,
+ disable=disable,
+ )
+
+ reader = progress.open(
+ file,
+ mode=mode,
+ buffering=buffering,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ total=total,
+ description=description,
+ )
+ return _ReadContext(progress, reader) # type: ignore[return-value, type-var]
+
+
+class ProgressColumn(ABC):
+ """Base class for a widget to use in progress display."""
+
+ max_refresh: Optional[float] = None
+
+ def __init__(self, table_column: Optional[Column] = None) -> None:
+ self._table_column = table_column
+ self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {}
+ self._update_time: Optional[float] = None
+
+ def get_table_column(self) -> Column:
+ """Get a table column, used to build tasks table."""
+ return self._table_column or Column()
+
+ def __call__(self, task: "Task") -> RenderableType:
+ """Called by the Progress object to return a renderable for the given task.
+
+ Args:
+ task (Task): An object containing information regarding the task.
+
+ Returns:
+ RenderableType: Anything renderable (including str).
+ """
+ current_time = task.get_time()
+ if self.max_refresh is not None and not task.completed:
+ try:
+ timestamp, renderable = self._renderable_cache[task.id]
+ except KeyError:
+ pass
+ else:
+ if timestamp + self.max_refresh > current_time:
+ return renderable
+
+ renderable = self.render(task)
+ self._renderable_cache[task.id] = (current_time, renderable)
+ return renderable
+
+ @abstractmethod
+ def render(self, task: "Task") -> RenderableType:
+ """Should return a renderable object."""
+
+
+class RenderableColumn(ProgressColumn):
+ """A column to insert an arbitrary column.
+
+ Args:
+ renderable (RenderableType, optional): Any renderable. Defaults to empty string.
+ """
+
+ def __init__(
+ self, renderable: RenderableType = "", *, table_column: Optional[Column] = None
+ ):
+ self.renderable = renderable
+ super().__init__(table_column=table_column)
+
+ def render(self, task: "Task") -> RenderableType:
+ return self.renderable
+
+
+class SpinnerColumn(ProgressColumn):
+ """A column with a 'spinner' animation.
+
+ Args:
+ spinner_name (str, optional): Name of spinner animation. Defaults to "dots".
+ style (StyleType, optional): Style of spinner. Defaults to "progress.spinner".
+ speed (float, optional): Speed factor of spinner. Defaults to 1.0.
+ finished_text (TextType, optional): Text used when task is finished. Defaults to " ".
+ """
+
+ def __init__(
+ self,
+ spinner_name: str = "dots",
+ style: Optional[StyleType] = "progress.spinner",
+ speed: float = 1.0,
+ finished_text: TextType = " ",
+ table_column: Optional[Column] = None,
+ ):
+ self.spinner = Spinner(spinner_name, style=style, speed=speed)
+ self.finished_text = (
+ Text.from_markup(finished_text)
+ if isinstance(finished_text, str)
+ else finished_text
+ )
+ super().__init__(table_column=table_column)
+
+ def set_spinner(
+ self,
+ spinner_name: str,
+ spinner_style: Optional[StyleType] = "progress.spinner",
+ speed: float = 1.0,
+ ) -> None:
+ """Set a new spinner.
+
+ Args:
+ spinner_name (str): Spinner name, see python -m rich.spinner.
+ spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner".
+ speed (float, optional): Speed factor of spinner. Defaults to 1.0.
+ """
+ self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed)
+
+ def render(self, task: "Task") -> RenderableType:
+ text = (
+ self.finished_text
+ if task.finished
+ else self.spinner.render(task.get_time())
+ )
+ return text
+
+
+class TextColumn(ProgressColumn):
+ """A column containing text."""
+
+ def __init__(
+ self,
+ text_format: str,
+ style: StyleType = "none",
+ justify: JustifyMethod = "left",
+ markup: bool = True,
+ highlighter: Optional[Highlighter] = None,
+ table_column: Optional[Column] = None,
+ ) -> None:
+ self.text_format = text_format
+ self.justify: JustifyMethod = justify
+ self.style = style
+ self.markup = markup
+ self.highlighter = highlighter
+ super().__init__(table_column=table_column or Column(no_wrap=True))
+
+ def render(self, task: "Task") -> Text:
+ _text = self.text_format.format(task=task)
+ if self.markup:
+ text = Text.from_markup(_text, style=self.style, justify=self.justify)
+ else:
+ text = Text(_text, style=self.style, justify=self.justify)
+ if self.highlighter:
+ self.highlighter.highlight(text)
+ return text
+
+
+class BarColumn(ProgressColumn):
+ """Renders a visual progress bar.
+
+ Args:
+ bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40.
+ style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
+ complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
+ finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
+ pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
+ """
+
+ def __init__(
+ self,
+ bar_width: Optional[int] = 40,
+ style: StyleType = "bar.back",
+ complete_style: StyleType = "bar.complete",
+ finished_style: StyleType = "bar.finished",
+ pulse_style: StyleType = "bar.pulse",
+ table_column: Optional[Column] = None,
+ ) -> None:
+ self.bar_width = bar_width
+ self.style = style
+ self.complete_style = complete_style
+ self.finished_style = finished_style
+ self.pulse_style = pulse_style
+ super().__init__(table_column=table_column)
+
+ def render(self, task: "Task") -> ProgressBar:
+ """Gets a progress bar widget for a task."""
+ return ProgressBar(
+ total=max(0, task.total) if task.total is not None else None,
+ completed=max(0, task.completed),
+ width=None if self.bar_width is None else max(1, self.bar_width),
+ pulse=not task.started,
+ animation_time=task.get_time(),
+ style=self.style,
+ complete_style=self.complete_style,
+ finished_style=self.finished_style,
+ pulse_style=self.pulse_style,
+ )
+
+
+class TimeElapsedColumn(ProgressColumn):
+ """Renders time elapsed."""
+
+ def render(self, task: "Task") -> Text:
+ """Show time remaining."""
+ elapsed = task.finished_time if task.finished else task.elapsed
+ if elapsed is None:
+ return Text("-:--:--", style="progress.elapsed")
+ delta = timedelta(seconds=int(elapsed))
+ return Text(str(delta), style="progress.elapsed")
+
+
+class TaskProgressColumn(TextColumn):
+ """Show task progress as a percentage.
+
+ Args:
+ text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%".
+ text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "".
+ style (StyleType, optional): Style of output. Defaults to "none".
+ justify (JustifyMethod, optional): Text justification. Defaults to "left".
+ markup (bool, optional): Enable markup. Defaults to True.
+ highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None.
+ table_column (Optional[Column], optional): Table Column to use. Defaults to None.
+ show_speed (bool, optional): Show speed if total is unknown. Defaults to False.
+ """
+
+ def __init__(
+ self,
+ text_format: str = "[progress.percentage]{task.percentage:>3.0f}%",
+ text_format_no_percentage: str = "",
+ style: StyleType = "none",
+ justify: JustifyMethod = "left",
+ markup: bool = True,
+ highlighter: Optional[Highlighter] = None,
+ table_column: Optional[Column] = None,
+ show_speed: bool = False,
+ ) -> None:
+
+ self.text_format_no_percentage = text_format_no_percentage
+ self.show_speed = show_speed
+ super().__init__(
+ text_format=text_format,
+ style=style,
+ justify=justify,
+ markup=markup,
+ highlighter=highlighter,
+ table_column=table_column,
+ )
+
+ @classmethod
+ def render_speed(cls, speed: Optional[float]) -> Text:
+ """Render the speed in iterations per second.
+
+ Args:
+ task (Task): A Task object.
+
+ Returns:
+ Text: Text object containing the task speed.
+ """
+ if speed is None:
+ return Text("", style="progress.percentage")
+ unit, suffix = filesize.pick_unit_and_suffix(
+ int(speed),
+ ["", "×10³", "×10⁶", "×10⁹", "×10¹²"],
+ 1000,
+ )
+ data_speed = speed / unit
+ return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage")
+
+ def render(self, task: "Task") -> Text:
+ if task.total is None and self.show_speed:
+ return self.render_speed(task.finished_speed or task.speed)
+ text_format = (
+ self.text_format_no_percentage if task.total is None else self.text_format
+ )
+ _text = text_format.format(task=task)
+ if self.markup:
+ text = Text.from_markup(_text, style=self.style, justify=self.justify)
+ else:
+ text = Text(_text, style=self.style, justify=self.justify)
+ if self.highlighter:
+ self.highlighter.highlight(text)
+ return text
+
+
+class TimeRemainingColumn(ProgressColumn):
+ """Renders estimated time remaining.
+
+ Args:
+ compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False.
+ elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False.
+ """
+
+ # Only refresh twice a second to prevent jitter
+ max_refresh = 0.5
+
+ def __init__(
+ self,
+ compact: bool = False,
+ elapsed_when_finished: bool = False,
+ table_column: Optional[Column] = None,
+ ):
+ self.compact = compact
+ self.elapsed_when_finished = elapsed_when_finished
+ super().__init__(table_column=table_column)
+
+ def render(self, task: "Task") -> Text:
+ """Show time remaining."""
+ if self.elapsed_when_finished and task.finished:
+ task_time = task.finished_time
+ style = "progress.elapsed"
+ else:
+ task_time = task.time_remaining
+ style = "progress.remaining"
+
+ if task.total is None:
+ return Text("", style=style)
+
+ if task_time is None:
+ return Text("--:--" if self.compact else "-:--:--", style=style)
+
+ # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py
+ minutes, seconds = divmod(int(task_time), 60)
+ hours, minutes = divmod(minutes, 60)
+
+ if self.compact and not hours:
+ formatted = f"{minutes:02d}:{seconds:02d}"
+ else:
+ formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}"
+
+ return Text(formatted, style=style)
+
+
+class FileSizeColumn(ProgressColumn):
+ """Renders completed filesize."""
+
+ def render(self, task: "Task") -> Text:
+ """Show data completed."""
+ data_size = filesize.decimal(int(task.completed))
+ return Text(data_size, style="progress.filesize")
+
+
+class TotalFileSizeColumn(ProgressColumn):
+ """Renders total filesize."""
+
+ def render(self, task: "Task") -> Text:
+ """Show data completed."""
+ data_size = filesize.decimal(int(task.total)) if task.total is not None else ""
+ return Text(data_size, style="progress.filesize.total")
+
+
+class MofNCompleteColumn(ProgressColumn):
+ """Renders completed count/total, e.g. ' 10/1000'.
+
+ Best for bounded tasks with int quantities.
+
+ Space pads the completed count so that progress length does not change as task progresses
+ past powers of 10.
+
+ Args:
+ separator (str, optional): Text to separate completed and total values. Defaults to "/".
+ """
+
+ def __init__(self, separator: str = "/", table_column: Optional[Column] = None):
+ self.separator = separator
+ super().__init__(table_column=table_column)
+
+ def render(self, task: "Task") -> Text:
+ """Show completed/total."""
+ completed = int(task.completed)
+ total = int(task.total) if task.total is not None else "?"
+ total_width = len(str(total))
+ return Text(
+ f"{completed:{total_width}d}{self.separator}{total}",
+ style="progress.download",
+ )
+
+
+class DownloadColumn(ProgressColumn):
+ """Renders file size downloaded and total, e.g. '0.5/2.3 GB'.
+
+ Args:
+ binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False.
+ """
+
+ def __init__(
+ self, binary_units: bool = False, table_column: Optional[Column] = None
+ ) -> None:
+ self.binary_units = binary_units
+ super().__init__(table_column=table_column)
+
+ def render(self, task: "Task") -> Text:
+ """Calculate common unit for completed and total."""
+ completed = int(task.completed)
+
+ unit_and_suffix_calculation_base = (
+ int(task.total) if task.total is not None else completed
+ )
+ if self.binary_units:
+ unit, suffix = filesize.pick_unit_and_suffix(
+ unit_and_suffix_calculation_base,
+ ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"],
+ 1024,
+ )
+ else:
+ unit, suffix = filesize.pick_unit_and_suffix(
+ unit_and_suffix_calculation_base,
+ ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"],
+ 1000,
+ )
+ precision = 0 if unit == 1 else 1
+
+ completed_ratio = completed / unit
+ completed_str = f"{completed_ratio:,.{precision}f}"
+
+ if task.total is not None:
+ total = int(task.total)
+ total_ratio = total / unit
+ total_str = f"{total_ratio:,.{precision}f}"
+ else:
+ total_str = "?"
+
+ download_status = f"{completed_str}/{total_str} {suffix}"
+ download_text = Text(download_status, style="progress.download")
+ return download_text
+
+
+class TransferSpeedColumn(ProgressColumn):
+ """Renders human readable transfer speed."""
+
+ def render(self, task: "Task") -> Text:
+ """Show data transfer speed."""
+ speed = task.finished_speed or task.speed
+ if speed is None:
+ return Text("?", style="progress.data.speed")
+ data_speed = filesize.decimal(int(speed))
+ return Text(f"{data_speed}/s", style="progress.data.speed")
+
+
+class ProgressSample(NamedTuple):
+ """Sample of progress for a given time."""
+
+ timestamp: float
+ """Timestamp of sample."""
+ completed: float
+ """Number of steps completed."""
+
+
+@dataclass
+class Task:
+ """Information regarding a progress task.
+
+ This object should be considered read-only outside of the :class:`~Progress` class.
+
+ """
+
+ id: TaskID
+ """Task ID associated with this task (used in Progress methods)."""
+
+ description: str
+ """str: Description of the task."""
+
+ total: Optional[float]
+ """Optional[float]: Total number of steps in this task."""
+
+ completed: float
+ """float: Number of steps completed"""
+
+ _get_time: GetTimeCallable
+ """Callable to get the current time."""
+
+ finished_time: Optional[float] = None
+ """float: Time task was finished."""
+
+ visible: bool = True
+ """bool: Indicates if this task is visible in the progress display."""
+
+ fields: Dict[str, Any] = field(default_factory=dict)
+ """dict: Arbitrary fields passed in via Progress.update."""
+
+ start_time: Optional[float] = field(default=None, init=False, repr=False)
+ """Optional[float]: Time this task was started, or None if not started."""
+
+ stop_time: Optional[float] = field(default=None, init=False, repr=False)
+ """Optional[float]: Time this task was stopped, or None if not stopped."""
+
+ finished_speed: Optional[float] = None
+ """Optional[float]: The last speed for a finished task."""
+
+ _progress: Deque[ProgressSample] = field(
+ default_factory=lambda: deque(maxlen=1000), init=False, repr=False
+ )
+
+ _lock: RLock = field(repr=False, default_factory=RLock)
+ """Thread lock."""
+
+ def get_time(self) -> float:
+ """float: Get the current time, in seconds."""
+ return self._get_time()
+
+ @property
+ def started(self) -> bool:
+ """bool: Check if the task as started."""
+ return self.start_time is not None
+
+ @property
+ def remaining(self) -> Optional[float]:
+ """Optional[float]: Get the number of steps remaining, if a non-None total was set."""
+ if self.total is None:
+ return None
+ return self.total - self.completed
+
+ @property
+ def elapsed(self) -> Optional[float]:
+ """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started."""
+ if self.start_time is None:
+ return None
+ if self.stop_time is not None:
+ return self.stop_time - self.start_time
+ return self.get_time() - self.start_time
+
+ @property
+ def finished(self) -> bool:
+ """Check if the task has finished."""
+ return self.finished_time is not None
+
+ @property
+ def percentage(self) -> float:
+ """float: Get progress of task as a percentage. If a None total was set, returns 0"""
+ if not self.total:
+ return 0.0
+ completed = (self.completed / self.total) * 100.0
+ completed = min(100.0, max(0.0, completed))
+ return completed
+
+ @property
+ def speed(self) -> Optional[float]:
+ """Optional[float]: Get the estimated speed in steps per second."""
+ if self.start_time is None:
+ return None
+ with self._lock:
+ progress = self._progress
+ if not progress:
+ return None
+ total_time = progress[-1].timestamp - progress[0].timestamp
+ if total_time == 0:
+ return None
+ iter_progress = iter(progress)
+ next(iter_progress)
+ total_completed = sum(sample.completed for sample in iter_progress)
+ speed = total_completed / total_time
+ return speed
+
+ @property
+ def time_remaining(self) -> Optional[float]:
+ """Optional[float]: Get estimated time to completion, or ``None`` if no data."""
+ if self.finished:
+ return 0.0
+ speed = self.speed
+ if not speed:
+ return None
+ remaining = self.remaining
+ if remaining is None:
+ return None
+ estimate = ceil(remaining / speed)
+ return estimate
+
+ def _reset(self) -> None:
+ """Reset progress."""
+ self._progress.clear()
+ self.finished_time = None
+ self.finished_speed = None
+
+
+class Progress(JupyterMixin):
+ """Renders an auto-updating progress bar(s).
+
+ Args:
+ console (Console, optional): Optional Console instance. Default will an internal Console instance writing to stdout.
+ auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`.
+ refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None.
+ speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30.
+ transient: (bool, optional): Clear the progress on exit. Defaults to False.
+ redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
+ redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True.
+ get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None.
+ disable (bool, optional): Disable progress display. Defaults to False
+ expand (bool, optional): Expand tasks table to fit width. Defaults to False.
+ """
+
+ def __init__(
+ self,
+ *columns: Union[str, ProgressColumn],
+ console: Optional[Console] = None,
+ auto_refresh: bool = True,
+ refresh_per_second: float = 10,
+ speed_estimate_period: float = 30.0,
+ transient: bool = False,
+ redirect_stdout: bool = True,
+ redirect_stderr: bool = True,
+ get_time: Optional[GetTimeCallable] = None,
+ disable: bool = False,
+ expand: bool = False,
+ ) -> None:
+ assert refresh_per_second > 0, "refresh_per_second must be > 0"
+ self._lock = RLock()
+ self.columns = columns or self.get_default_columns()
+ self.speed_estimate_period = speed_estimate_period
+
+ self.disable = disable
+ self.expand = expand
+ self._tasks: Dict[TaskID, Task] = {}
+ self._task_index: TaskID = TaskID(0)
+ self.live = Live(
+ console=console or get_console(),
+ auto_refresh=auto_refresh,
+ refresh_per_second=refresh_per_second,
+ transient=transient,
+ redirect_stdout=redirect_stdout,
+ redirect_stderr=redirect_stderr,
+ get_renderable=self.get_renderable,
+ )
+ self.get_time = get_time or self.console.get_time
+ self.print = self.console.print
+ self.log = self.console.log
+
+ @classmethod
+ def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
+ """Get the default columns used for a new Progress instance:
+ - a text column for the description (TextColumn)
+ - the bar itself (BarColumn)
+ - a text column showing completion percentage (TextColumn)
+ - an estimated-time-remaining column (TimeRemainingColumn)
+ If the Progress instance is created without passing a columns argument,
+ the default columns defined here will be used.
+
+ You can also create a Progress instance using custom columns before
+ and/or after the defaults, as in this example:
+
+ progress = Progress(
+ SpinnerColumn(),
+ *Progress.default_columns(),
+ "Elapsed:",
+ TimeElapsedColumn(),
+ )
+
+ This code shows the creation of a Progress display, containing
+ a spinner to the left, the default columns, and a labeled elapsed
+ time column.
+ """
+ return (
+ TextColumn("[progress.description]{task.description}"),
+ BarColumn(),
+ TaskProgressColumn(),
+ TimeRemainingColumn(),
+ )
+
+ @property
+ def console(self) -> Console:
+ return self.live.console
+
+ @property
+ def tasks(self) -> List[Task]:
+ """Get a list of Task instances."""
+ with self._lock:
+ return list(self._tasks.values())
+
+ @property
+ def task_ids(self) -> List[TaskID]:
+ """A list of task IDs."""
+ with self._lock:
+ return list(self._tasks.keys())
+
+ @property
+ def finished(self) -> bool:
+ """Check if all tasks have been completed."""
+ with self._lock:
+ if not self._tasks:
+ return True
+ return all(task.finished for task in self._tasks.values())
+
+ def start(self) -> None:
+ """Start the progress display."""
+ if not self.disable:
+ self.live.start(refresh=True)
+
+ def stop(self) -> None:
+ """Stop the progress display."""
+ self.live.stop()
+ if not self.console.is_interactive:
+ self.console.print()
+
+ def __enter__(self) -> "Progress":
+ self.start()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self.stop()
+
+ def track(
+ self,
+ sequence: Union[Iterable[ProgressType], Sequence[ProgressType]],
+ total: Optional[float] = None,
+ task_id: Optional[TaskID] = None,
+ description: str = "Working...",
+ update_period: float = 0.1,
+ ) -> Iterable[ProgressType]:
+ """Track progress by iterating over a sequence.
+
+ Args:
+ sequence (Sequence[ProgressType]): A sequence of values you want to iterate over and track progress.
+ total: (float, optional): Total number of steps. Default is len(sequence).
+ task_id: (TaskID): Task to track. Default is new task.
+ description: (str, optional): Description of task, if new task is created.
+ update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
+
+ Returns:
+ Iterable[ProgressType]: An iterable of values taken from the provided sequence.
+ """
+
+ task_total: Optional[float] = None
+ if total is None:
+ if isinstance(sequence, Sized):
+ task_total = float(len(sequence))
+ else:
+ task_total = total
+
+ if task_id is None:
+ task_id = self.add_task(description, total=task_total)
+ else:
+ self.update(task_id, total=task_total)
+
+ if self.live.auto_refresh:
+ with _TrackThread(self, task_id, update_period) as track_thread:
+ for value in sequence:
+ yield value
+ track_thread.completed += 1
+ else:
+ advance = self.advance
+ refresh = self.refresh
+ for value in sequence:
+ yield value
+ advance(task_id, 1)
+ refresh()
+
+ def wrap_file(
+ self,
+ file: BinaryIO,
+ total: Optional[int] = None,
+ *,
+ task_id: Optional[TaskID] = None,
+ description: str = "Reading...",
+ ) -> BinaryIO:
+ """Track progress file reading from a binary file.
+
+ Args:
+ file (BinaryIO): A file-like object opened in binary mode.
+ total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given.
+ task_id (TaskID): Task to track. Default is new task.
+ description (str, optional): Description of task, if new task is created.
+
+ Returns:
+ BinaryIO: A readable file-like object in binary mode.
+
+ Raises:
+ ValueError: When no total value can be extracted from the arguments or the task.
+ """
+ # attempt to recover the total from the task
+ total_bytes: Optional[float] = None
+ if total is not None:
+ total_bytes = total
+ elif task_id is not None:
+ with self._lock:
+ total_bytes = self._tasks[task_id].total
+ if total_bytes is None:
+ raise ValueError(
+ f"unable to get the total number of bytes, please specify 'total'"
+ )
+
+ # update total of task or create new task
+ if task_id is None:
+ task_id = self.add_task(description, total=total_bytes)
+ else:
+ self.update(task_id, total=total_bytes)
+
+ return _Reader(file, self, task_id, close_handle=False)
+
+ @typing.overload
+ def open(
+ self,
+ file: Union[str, "PathLike[str]", bytes],
+ mode: Literal["rb"],
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ *,
+ total: Optional[int] = None,
+ task_id: Optional[TaskID] = None,
+ description: str = "Reading...",
+ ) -> BinaryIO:
+ pass
+
+ @typing.overload
+ def open(
+ self,
+ file: Union[str, "PathLike[str]", bytes],
+ mode: Union[Literal["r"], Literal["rt"]],
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ *,
+ total: Optional[int] = None,
+ task_id: Optional[TaskID] = None,
+ description: str = "Reading...",
+ ) -> TextIO:
+ pass
+
+ def open(
+ self,
+ file: Union[str, "PathLike[str]", bytes],
+ mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ *,
+ total: Optional[int] = None,
+ task_id: Optional[TaskID] = None,
+ description: str = "Reading...",
+ ) -> Union[BinaryIO, TextIO]:
+ """Track progress while reading from a binary file.
+
+ Args:
+ path (Union[str, PathLike[str]]): The path to the file to read.
+ mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
+ buffering (int): The buffering strategy to use, see :func:`io.open`.
+ encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
+ errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
+ newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`.
+ total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used.
+ task_id (TaskID): Task to track. Default is new task.
+ description (str, optional): Description of task, if new task is created.
+
+ Returns:
+ BinaryIO: A readable file-like object in binary mode.
+
+ Raises:
+ ValueError: When an invalid mode is given.
+ """
+ # normalize the mode (always rb, rt)
+ _mode = "".join(sorted(mode, reverse=False))
+ if _mode not in ("br", "rt", "r"):
+ raise ValueError("invalid mode {!r}".format(mode))
+
+ # patch buffering to provide the same behaviour as the builtin `open`
+ line_buffering = buffering == 1
+ if _mode == "br" and buffering == 1:
+ warnings.warn(
+ "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used",
+ RuntimeWarning,
+ )
+ buffering = -1
+ elif _mode == "rt" or _mode == "r":
+ if buffering == 0:
+ raise ValueError("can't have unbuffered text I/O")
+ elif buffering == 1:
+ buffering = -1
+
+ # attempt to get the total with `os.stat`
+ if total is None:
+ total = stat(file).st_size
+
+ # update total of task or create new task
+ if task_id is None:
+ task_id = self.add_task(description, total=total)
+ else:
+ self.update(task_id, total=total)
+
+ # open the file in binary mode,
+ handle = io.open(file, "rb", buffering=buffering)
+ reader = _Reader(handle, self, task_id, close_handle=True)
+
+ # wrap the reader in a `TextIOWrapper` if text mode
+ if mode == "r" or mode == "rt":
+ return io.TextIOWrapper(
+ reader,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ line_buffering=line_buffering,
+ )
+
+ return reader
+
+ def start_task(self, task_id: TaskID) -> None:
+ """Start a task.
+
+ Starts a task (used when calculating elapsed time). You may need to call this manually,
+ if you called ``add_task`` with ``start=False``.
+
+ Args:
+ task_id (TaskID): ID of task.
+ """
+ with self._lock:
+ task = self._tasks[task_id]
+ if task.start_time is None:
+ task.start_time = self.get_time()
+
+ def stop_task(self, task_id: TaskID) -> None:
+ """Stop a task.
+
+ This will freeze the elapsed time on the task.
+
+ Args:
+ task_id (TaskID): ID of task.
+ """
+ with self._lock:
+ task = self._tasks[task_id]
+ current_time = self.get_time()
+ if task.start_time is None:
+ task.start_time = current_time
+ task.stop_time = current_time
+
+ def update(
+ self,
+ task_id: TaskID,
+ *,
+ total: Optional[float] = None,
+ completed: Optional[float] = None,
+ advance: Optional[float] = None,
+ description: Optional[str] = None,
+ visible: Optional[bool] = None,
+ refresh: bool = False,
+ **fields: Any,
+ ) -> None:
+ """Update information associated with a task.
+
+ Args:
+ task_id (TaskID): Task id (returned by add_task).
+ total (float, optional): Updates task.total if not None.
+ completed (float, optional): Updates task.completed if not None.
+ advance (float, optional): Add a value to task.completed if not None.
+ description (str, optional): Change task description if not None.
+ visible (bool, optional): Set visible flag if not None.
+ refresh (bool): Force a refresh of progress information. Default is False.
+ **fields (Any): Additional data fields required for rendering.
+ """
+ with self._lock:
+ task = self._tasks[task_id]
+ completed_start = task.completed
+
+ if total is not None and total != task.total:
+ task.total = total
+ task._reset()
+ if advance is not None:
+ task.completed += advance
+ if completed is not None:
+ task.completed = completed
+ if description is not None:
+ task.description = description
+ if visible is not None:
+ task.visible = visible
+ task.fields.update(fields)
+ update_completed = task.completed - completed_start
+
+ current_time = self.get_time()
+ old_sample_time = current_time - self.speed_estimate_period
+ _progress = task._progress
+
+ popleft = _progress.popleft
+ while _progress and _progress[0].timestamp < old_sample_time:
+ popleft()
+ if update_completed > 0:
+ _progress.append(ProgressSample(current_time, update_completed))
+ if (
+ task.total is not None
+ and task.completed >= task.total
+ and task.finished_time is None
+ ):
+ task.finished_time = task.elapsed
+
+ if refresh:
+ self.refresh()
+
+ def reset(
+ self,
+ task_id: TaskID,
+ *,
+ start: bool = True,
+ total: Optional[float] = None,
+ completed: int = 0,
+ visible: Optional[bool] = None,
+ description: Optional[str] = None,
+ **fields: Any,
+ ) -> None:
+ """Reset a task so completed is 0 and the clock is reset.
+
+ Args:
+ task_id (TaskID): ID of task.
+ start (bool, optional): Start the task after reset. Defaults to True.
+ total (float, optional): New total steps in task, or None to use current total. Defaults to None.
+ completed (int, optional): Number of steps completed. Defaults to 0.
+ visible (bool, optional): Enable display of the task. Defaults to True.
+ description (str, optional): Change task description if not None. Defaults to None.
+ **fields (str): Additional data fields required for rendering.
+ """
+ current_time = self.get_time()
+ with self._lock:
+ task = self._tasks[task_id]
+ task._reset()
+ task.start_time = current_time if start else None
+ if total is not None:
+ task.total = total
+ task.completed = completed
+ if visible is not None:
+ task.visible = visible
+ if fields:
+ task.fields = fields
+ if description is not None:
+ task.description = description
+ task.finished_time = None
+ self.refresh()
+
+ def advance(self, task_id: TaskID, advance: float = 1) -> None:
+ """Advance task by a number of steps.
+
+ Args:
+ task_id (TaskID): ID of task.
+ advance (float): Number of steps to advance. Default is 1.
+ """
+ current_time = self.get_time()
+ with self._lock:
+ task = self._tasks[task_id]
+ completed_start = task.completed
+ task.completed += advance
+ update_completed = task.completed - completed_start
+ old_sample_time = current_time - self.speed_estimate_period
+ _progress = task._progress
+
+ popleft = _progress.popleft
+ while _progress and _progress[0].timestamp < old_sample_time:
+ popleft()
+ while len(_progress) > 1000:
+ popleft()
+ _progress.append(ProgressSample(current_time, update_completed))
+ if (
+ task.total is not None
+ and task.completed >= task.total
+ and task.finished_time is None
+ ):
+ task.finished_time = task.elapsed
+ task.finished_speed = task.speed
+
+ def refresh(self) -> None:
+ """Refresh (render) the progress information."""
+ if not self.disable and self.live.is_started:
+ self.live.refresh()
+
+ def get_renderable(self) -> RenderableType:
+ """Get a renderable for the progress display."""
+ renderable = Group(*self.get_renderables())
+ return renderable
+
+ def get_renderables(self) -> Iterable[RenderableType]:
+ """Get a number of renderables for the progress display."""
+ table = self.make_tasks_table(self.tasks)
+ yield table
+
+ def make_tasks_table(self, tasks: Iterable[Task]) -> Table:
+ """Get a table to render the Progress display.
+
+ Args:
+ tasks (Iterable[Task]): An iterable of Task instances, one per row of the table.
+
+ Returns:
+ Table: A table instance.
+ """
+ table_columns = (
+ (
+ Column(no_wrap=True)
+ if isinstance(_column, str)
+ else _column.get_table_column().copy()
+ )
+ for _column in self.columns
+ )
+ table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand)
+
+ for task in tasks:
+ if task.visible:
+ table.add_row(
+ *(
+ (
+ column.format(task=task)
+ if isinstance(column, str)
+ else column(task)
+ )
+ for column in self.columns
+ )
+ )
+ return table
+
+ def __rich__(self) -> RenderableType:
+ """Makes the Progress class itself renderable."""
+ with self._lock:
+ return self.get_renderable()
+
+ def add_task(
+ self,
+ description: str,
+ start: bool = True,
+ total: Optional[float] = 100.0,
+ completed: int = 0,
+ visible: bool = True,
+ **fields: Any,
+ ) -> TaskID:
+ """Add a new 'task' to the Progress display.
+
+ Args:
+ description (str): A description of the task.
+ start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False,
+ you will need to call `start` manually. Defaults to True.
+ total (float, optional): Number of total steps in the progress if known.
+ Set to None to render a pulsing animation. Defaults to 100.
+ completed (int, optional): Number of steps completed so far. Defaults to 0.
+ visible (bool, optional): Enable display of the task. Defaults to True.
+ **fields (str): Additional data fields required for rendering.
+
+ Returns:
+ TaskID: An ID you can use when calling `update`.
+ """
+ with self._lock:
+ task = Task(
+ self._task_index,
+ description,
+ total,
+ completed,
+ visible=visible,
+ fields=fields,
+ _get_time=self.get_time,
+ _lock=self._lock,
+ )
+ self._tasks[self._task_index] = task
+ if start:
+ self.start_task(self._task_index)
+ new_task_index = self._task_index
+ self._task_index = TaskID(int(self._task_index) + 1)
+ self.refresh()
+ return new_task_index
+
+ def remove_task(self, task_id: TaskID) -> None:
+ """Delete a task if it exists.
+
+ Args:
+ task_id (TaskID): A task ID.
+
+ """
+ with self._lock:
+ del self._tasks[task_id]
+
+
+if __name__ == "__main__": # pragma: no coverage
+
+ import random
+ import time
+
+ from .panel import Panel
+ from .rule import Rule
+ from .syntax import Syntax
+ from .table import Table
+
+ syntax = Syntax(
+ '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
+ """Iterate and generate a tuple with a flag for last value."""
+ iter_values = iter(values)
+ try:
+ previous_value = next(iter_values)
+ except StopIteration:
+ return
+ for value in iter_values:
+ yield False, previous_value
+ previous_value = value
+ yield True, previous_value''',
+ "python",
+ line_numbers=True,
+ )
+
+ table = Table("foo", "bar", "baz")
+ table.add_row("1", "2", "3")
+
+ progress_renderables = [
+ "Text may be printed while the progress bars are rendering.",
+ Panel("In fact, [i]any[/i] renderable will work"),
+ "Such as [magenta]tables[/]...",
+ table,
+ "Pretty printed structures...",
+ {"type": "example", "text": "Pretty printed"},
+ "Syntax...",
+ syntax,
+ Rule("Give it a try!"),
+ ]
+
+ from itertools import cycle
+
+ examples = cycle(progress_renderables)
+
+ console = Console(record=True)
+
+ with Progress(
+ SpinnerColumn(),
+ *Progress.get_default_columns(),
+ TimeElapsedColumn(),
+ console=console,
+ transient=False,
+ ) as progress:
+
+ task1 = progress.add_task("[red]Downloading", total=1000)
+ task2 = progress.add_task("[green]Processing", total=1000)
+ task3 = progress.add_task("[yellow]Thinking", total=None)
+
+ while not progress.finished:
+ progress.update(task1, advance=0.5)
+ progress.update(task2, advance=0.3)
+ time.sleep(0.01)
+ if random.randint(0, 100) < 1:
+ progress.log(next(examples))