summaryrefslogtreecommitdiffstats
path: root/tests/test_progress.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_progress.py')
-rw-r--r--tests/test_progress.py430
1 files changed, 430 insertions, 0 deletions
diff --git a/tests/test_progress.py b/tests/test_progress.py
new file mode 100644
index 0000000..565e971
--- /dev/null
+++ b/tests/test_progress.py
@@ -0,0 +1,430 @@
+# encoding=utf-8
+
+import io
+from time import sleep
+
+import pytest
+
+from rich.progress_bar import ProgressBar
+from rich.console import Console
+from rich.highlighter import NullHighlighter
+from rich.progress import (
+ BarColumn,
+ FileSizeColumn,
+ TotalFileSizeColumn,
+ DownloadColumn,
+ TransferSpeedColumn,
+ RenderableColumn,
+ SpinnerColumn,
+ Progress,
+ Task,
+ TextColumn,
+ TimeElapsedColumn,
+ TimeRemainingColumn,
+ track,
+ _TrackThread,
+ TaskID,
+)
+from rich.text import Text
+
+
+class MockClock:
+ """A clock that is manually advanced."""
+
+ def __init__(self, time=0.0, auto=True) -> None:
+ self.time = time
+ self.auto = auto
+
+ def __call__(self) -> float:
+ try:
+ return self.time
+ finally:
+ if self.auto:
+ self.time += 1
+
+ def tick(self, advance: float = 1) -> None:
+ self.time += advance
+
+
+def test_bar_columns():
+ bar_column = BarColumn(100)
+ assert bar_column.bar_width == 100
+ task = Task(1, "test", 100, 20, _get_time=lambda: 1.0)
+ bar = bar_column(task)
+ assert isinstance(bar, ProgressBar)
+ assert bar.completed == 20
+ assert bar.total == 100
+
+
+def test_text_column():
+ text_column = TextColumn("[b]foo", highlighter=NullHighlighter())
+ task = Task(1, "test", 100, 20, _get_time=lambda: 1.0)
+ text = text_column.render(task)
+ assert str(text) == "foo"
+
+ text_column = TextColumn("[b]bar", markup=False)
+ task = Task(1, "test", 100, 20, _get_time=lambda: 1.0)
+ text = text_column.render(task)
+ assert text == Text("[b]bar")
+
+
+def test_time_remaining_column():
+ class FakeTask(Task):
+ time_remaining = 60
+
+ column = TimeRemainingColumn()
+ task = Task(1, "test", 100, 20, _get_time=lambda: 1.0)
+ text = column(task)
+ assert str(text) == "-:--:--"
+
+ text = column(FakeTask(1, "test", 100, 20, _get_time=lambda: 1.0))
+ assert str(text) == "0:01:00"
+
+
+def test_renderable_column():
+ column = RenderableColumn("foo")
+ task = Task(1, "test", 100, 20, _get_time=lambda: 1.0)
+ assert column.render(task) == "foo"
+
+
+def test_spinner_column():
+ column = SpinnerColumn()
+ column.set_spinner("dots2")
+ task = Task(1, "test", 100, 20, _get_time=lambda: 1.0)
+ result = column.render(task)
+ print(repr(result))
+ expected = "⡿"
+ assert str(result) == expected
+
+
+def test_download_progress_uses_decimal_units() -> None:
+
+ column = DownloadColumn()
+ test_task = Task(1, "test", 1000, 500, _get_time=lambda: 1.0)
+ rendered_progress = str(column.render(test_task))
+ expected = "0.5/1.0 KB"
+ assert rendered_progress == expected
+
+
+def test_download_progress_uses_binary_units() -> None:
+
+ column = DownloadColumn(binary_units=True)
+ test_task = Task(1, "test", 1024, 512, _get_time=lambda: 1.0)
+ rendered_progress = str(column.render(test_task))
+ expected = "0.5/1.0 KiB"
+ assert rendered_progress == expected
+
+
+def test_task_ids():
+ progress = make_progress()
+ assert progress.task_ids == [0, 1, 2, 4]
+
+
+def test_finished():
+ progress = make_progress()
+ assert not progress.finished
+
+
+def make_progress() -> Progress:
+ _time = 0.0
+
+ def fake_time():
+ nonlocal _time
+ try:
+ return _time
+ finally:
+ _time += 1
+
+ console = Console(
+ file=io.StringIO(),
+ force_terminal=True,
+ color_system="truecolor",
+ width=80,
+ legacy_windows=False,
+ )
+ progress = Progress(console=console, get_time=fake_time, auto_refresh=False)
+ task1 = progress.add_task("foo")
+ task2 = progress.add_task("bar", total=30)
+ progress.advance(task2, 16)
+ task3 = progress.add_task("baz", visible=False)
+ task4 = progress.add_task("egg")
+ progress.remove_task(task4)
+ task4 = progress.add_task("foo2", completed=50, start=False)
+ progress.stop_task(task4)
+ progress.start_task(task4)
+ progress.update(
+ task4, total=200, advance=50, completed=200, visible=True, refresh=True
+ )
+ progress.stop_task(task4)
+ return progress
+
+
+def render_progress() -> str:
+ progress = make_progress()
+ progress.start() # superfluous noop
+ with progress:
+ pass
+ progress.stop() # superfluous noop
+ progress_render = progress.console.file.getvalue()
+ return progress_render
+
+
+def test_expand_bar() -> None:
+ console = Console(
+ file=io.StringIO(),
+ force_terminal=True,
+ width=10,
+ color_system="truecolor",
+ legacy_windows=False,
+ )
+ progress = Progress(
+ BarColumn(bar_width=None),
+ console=console,
+ get_time=lambda: 1.0,
+ auto_refresh=False,
+ )
+ progress.add_task("foo")
+ with progress:
+ pass
+ expected = "\x1b[?25l\x1b[38;5;237m━━━━━━━━━━\x1b[0m\r\x1b[2K\x1b[38;5;237m━━━━━━━━━━\x1b[0m\n\x1b[?25h"
+ render_result = console.file.getvalue()
+ print("RESULT\n", repr(render_result))
+ print("EXPECTED\n", repr(expected))
+ assert render_result == expected
+
+
+def test_render() -> None:
+ expected = "\x1b[?25lfoo \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 0%\x1b[0m \x1b[36m-:--:--\x1b[0m\nbar \x1b[38;2;249;38;114m━━━━━━━━━━━━━━━━━━━━━\x1b[0m\x1b[38;5;237m╺\x1b[0m\x1b[38;5;237m━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 53%\x1b[0m \x1b[36m-:--:--\x1b[0m\nfoo2 \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m100%\x1b[0m \x1b[36m0:00:00\x1b[0m\r\x1b[2K\x1b[1A\x1b[2K\x1b[1A\x1b[2Kfoo \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 0%\x1b[0m \x1b[36m-:--:--\x1b[0m\nbar \x1b[38;2;249;38;114m━━━━━━━━━━━━━━━━━━━━━\x1b[0m\x1b[38;5;237m╺\x1b[0m\x1b[38;5;237m━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 53%\x1b[0m \x1b[36m-:--:--\x1b[0m\nfoo2 \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m100%\x1b[0m \x1b[36m0:00:00\x1b[0m\n\x1b[?25h"
+ render_result = render_progress()
+ print(repr(render_result))
+ assert render_result == expected
+
+
+def test_track() -> None:
+
+ console = Console(
+ file=io.StringIO(),
+ force_terminal=True,
+ width=60,
+ color_system="truecolor",
+ legacy_windows=False,
+ )
+ test = ["foo", "bar", "baz"]
+ expected_values = iter(test)
+ for value in track(
+ test, "test", console=console, auto_refresh=False, get_time=MockClock(auto=True)
+ ):
+ assert value == next(expected_values)
+ result = console.file.getvalue()
+ print(repr(result))
+ expected = "\x1b[?25l\r\x1b[2Ktest \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 0%\x1b[0m \x1b[36m-:--:--\x1b[0m\r\x1b[2Ktest \x1b[38;2;249;38;114m━━━━━━━━━━━━━\x1b[0m\x1b[38;5;237m╺\x1b[0m\x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 33%\x1b[0m \x1b[36m-:--:--\x1b[0m\r\x1b[2Ktest \x1b[38;2;249;38;114m━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m\x1b[38;2;249;38;114m╸\x1b[0m\x1b[38;5;237m━━━━━━━━━━━━━\x1b[0m \x1b[35m 67%\x1b[0m \x1b[36m0:00:06\x1b[0m\r\x1b[2Ktest \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m100%\x1b[0m \x1b[36m0:00:00\x1b[0m\r\x1b[2Ktest \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m100%\x1b[0m \x1b[36m0:00:00\x1b[0m\n\x1b[?25h"
+ print("--")
+ print("RESULT:")
+ print(result)
+ print(repr(result))
+ print("EXPECTED:")
+ print(expected)
+ print(repr(expected))
+
+ assert result == expected
+
+ with pytest.raises(ValueError):
+ for n in track(5):
+ pass
+
+
+def test_progress_track() -> None:
+ console = Console(
+ file=io.StringIO(),
+ force_terminal=True,
+ width=60,
+ color_system="truecolor",
+ legacy_windows=False,
+ )
+ progress = Progress(
+ console=console, auto_refresh=False, get_time=MockClock(auto=True)
+ )
+ test = ["foo", "bar", "baz"]
+ expected_values = iter(test)
+ with progress:
+ for value in progress.track(test, description="test"):
+ assert value == next(expected_values)
+ result = console.file.getvalue()
+ print(repr(result))
+ expected = "\x1b[?25l\r\x1b[2Ktest \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 0%\x1b[0m \x1b[36m-:--:--\x1b[0m\r\x1b[2Ktest \x1b[38;2;249;38;114m━━━━━━━━━━━━━\x1b[0m\x1b[38;5;237m╺\x1b[0m\x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m 33%\x1b[0m \x1b[36m-:--:--\x1b[0m\r\x1b[2Ktest \x1b[38;2;249;38;114m━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m\x1b[38;2;249;38;114m╸\x1b[0m\x1b[38;5;237m━━━━━━━━━━━━━\x1b[0m \x1b[35m 67%\x1b[0m \x1b[36m0:00:06\x1b[0m\r\x1b[2Ktest \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m100%\x1b[0m \x1b[36m0:00:00\x1b[0m\r\x1b[2Ktest \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[35m100%\x1b[0m \x1b[36m0:00:00\x1b[0m\n\x1b[?25h"
+
+ print(expected)
+ print(repr(expected))
+ print(result)
+ print(repr(result))
+
+ assert result == expected
+
+ with pytest.raises(ValueError):
+ for n in progress.track(5):
+ pass
+
+
+def test_columns() -> None:
+
+ console = Console(
+ file=io.StringIO(),
+ force_terminal=True,
+ width=80,
+ log_time_format="[TIME]",
+ color_system="truecolor",
+ legacy_windows=False,
+ log_path=False,
+ )
+ progress = Progress(
+ "test",
+ TextColumn("{task.description}"),
+ BarColumn(bar_width=None),
+ TimeRemainingColumn(),
+ TimeElapsedColumn(),
+ FileSizeColumn(),
+ TotalFileSizeColumn(),
+ DownloadColumn(),
+ TransferSpeedColumn(),
+ transient=True,
+ console=console,
+ auto_refresh=False,
+ get_time=MockClock(),
+ )
+ task1 = progress.add_task("foo", total=10)
+ task2 = progress.add_task("bar", total=7)
+ with progress:
+ for n in range(4):
+ progress.advance(task1, 3)
+ progress.advance(task2, 4)
+ print("foo")
+ console.log("hello")
+ console.print("world")
+ progress.refresh()
+ from .render import replace_link_ids
+
+ result = replace_link_ids(console.file.getvalue())
+ print(repr(result))
+ expected = "\x1b[?25ltest foo \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:37\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m10 bytes\x1b[0m \x1b[32m0/10 bytes\x1b[0m \x1b[31m?\x1b[0m\ntest bar \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:36\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m7 bytes \x1b[0m \x1b[32m0/7 bytes \x1b[0m \x1b[31m?\x1b[0m\r\x1b[2K\x1b[1A\x1b[2Kfoo\ntest foo \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:37\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m10 bytes\x1b[0m \x1b[32m0/10 bytes\x1b[0m \x1b[31m?\x1b[0m\ntest bar \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:36\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m7 bytes \x1b[0m \x1b[32m0/7 bytes \x1b[0m \x1b[31m?\x1b[0m\r\x1b[2K\x1b[1A\x1b[2K\x1b[2;36m[TIME]\x1b[0m\x1b[2;36m \x1b[0mhello \ntest foo \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:37\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m10 bytes\x1b[0m \x1b[32m0/10 bytes\x1b[0m \x1b[31m?\x1b[0m\ntest bar \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:36\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m7 bytes \x1b[0m \x1b[32m0/7 bytes \x1b[0m \x1b[31m?\x1b[0m\r\x1b[2K\x1b[1A\x1b[2Kworld\ntest foo \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:37\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m10 bytes\x1b[0m \x1b[32m0/10 bytes\x1b[0m \x1b[31m?\x1b[0m\ntest bar \x1b[38;5;237m━━━━━━━━━━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m-:--:--\x1b[0m \x1b[33m0:00:36\x1b[0m \x1b[32m0 bytes\x1b[0m \x1b[32m7 bytes \x1b[0m \x1b[32m0/7 bytes \x1b[0m \x1b[31m?\x1b[0m\r\x1b[2K\x1b[1A\x1b[2Ktest foo \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m0:00:00\x1b[0m \x1b[33m0:01:00\x1b[0m \x1b[32m12 bytes\x1b[0m \x1b[32m10 bytes\x1b[0m \x1b[32m12/10 bytes\x1b[0m \x1b[31m1 byte/s\x1b[0m\ntest bar \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m0:00:00\x1b[0m \x1b[33m0:00:45\x1b[0m \x1b[32m16 bytes\x1b[0m \x1b[32m7 bytes \x1b[0m \x1b[32m16/7 bytes \x1b[0m \x1b[31m1 byte/s\x1b[0m\r\x1b[2K\x1b[1A\x1b[2Ktest foo \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m0:00:00\x1b[0m \x1b[33m0:01:00\x1b[0m \x1b[32m12 bytes\x1b[0m \x1b[32m10 bytes\x1b[0m \x1b[32m12/10 bytes\x1b[0m \x1b[31m1 byte/s\x1b[0m\ntest bar \x1b[38;2;114;156;31m━━━━━━━━━━━━━━━━\x1b[0m \x1b[36m0:00:00\x1b[0m \x1b[33m0:00:45\x1b[0m \x1b[32m16 bytes\x1b[0m \x1b[32m7 bytes \x1b[0m \x1b[32m16/7 bytes \x1b[0m \x1b[31m1 byte/s\x1b[0m\n\x1b[?25h\r\x1b[1A\x1b[2K\x1b[1A\x1b[2K"
+ assert result == expected
+
+
+def test_task_create() -> None:
+ task = Task(TaskID(1), "foo", 100, 0, _get_time=lambda: 1)
+ assert task.elapsed is None
+ assert not task.finished
+ assert task.percentage == 0.0
+ assert task.speed is None
+ assert task.time_remaining is None
+
+
+def test_task_start() -> None:
+ current_time = 1
+
+ def get_time():
+ nonlocal current_time
+ return current_time
+
+ task = Task(TaskID(1), "foo", 100, 0, _get_time=get_time)
+ task.start_time = get_time()
+ assert task.started == True
+ assert task.elapsed == 0
+ current_time += 1
+ assert task.elapsed == 1
+ current_time += 1
+ task.stop_time = get_time()
+ current_time += 1
+ assert task.elapsed == 2
+
+
+def test_task_zero_total() -> None:
+ task = Task(TaskID(1), "foo", 0, 0, _get_time=lambda: 1)
+ assert task.percentage == 0
+
+
+def test_progress_create() -> None:
+ progress = Progress()
+ assert progress.finished
+ assert progress.tasks == []
+ assert progress.task_ids == []
+
+
+def test_track_thread() -> None:
+ progress = Progress()
+ task_id = progress.add_task("foo")
+ track_thread = _TrackThread(progress, task_id, 0.1)
+ assert track_thread.completed == 0
+ from time import sleep
+
+ with track_thread:
+ track_thread.completed = 1
+ sleep(0.3)
+ assert progress.tasks[task_id].completed >= 1
+ track_thread.completed += 1
+
+
+def test_reset() -> None:
+ progress = Progress()
+ task_id = progress.add_task("foo")
+ progress.advance(task_id, 1)
+ progress.advance(task_id, 1)
+ progress.advance(task_id, 1)
+ progress.advance(task_id, 7)
+ task = progress.tasks[task_id]
+ assert task.completed == 10
+ progress.reset(
+ task_id,
+ total=200,
+ completed=20,
+ visible=False,
+ description="bar",
+ example="egg",
+ )
+ assert task.total == 200
+ assert task.completed == 20
+ assert task.visible == False
+ assert task.description == "bar"
+ assert task.fields == {"example": "egg"}
+ assert not task._progress
+
+
+def test_progress_max_refresh() -> None:
+ """Test max_refresh argment."""
+ time = 0.0
+
+ def get_time() -> float:
+ nonlocal time
+ try:
+ return time
+ finally:
+ time = time + 1.0
+
+ console = Console(
+ color_system=None, width=80, legacy_windows=False, force_terminal=True
+ )
+ column = TextColumn("{task.description}")
+ column.max_refresh = 3
+ progress = Progress(
+ column,
+ get_time=get_time,
+ auto_refresh=False,
+ console=console,
+ )
+ console.begin_capture()
+ with progress:
+ task_id = progress.add_task("start")
+ for tick in range(6):
+ progress.update(task_id, description=f"tick {tick}")
+ progress.refresh()
+ result = console.end_capture()
+ print(repr(result))
+ assert (
+ result
+ == "\x1b[?25l\r\x1b[2Kstart\r\x1b[2Kstart\r\x1b[2Ktick 1\r\x1b[2Ktick 1\r\x1b[2Ktick 3\r\x1b[2Ktick 3\r\x1b[2Ktick 5\r\x1b[2Ktick 5\n\x1b[?25h"
+ )
+
+
+if __name__ == "__main__":
+ _render = render_progress()
+ print(_render)
+ print(repr(_render))