summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/conftest.py18
-rw-r--r--tests/py37_asyncio.py128
-rw-r--r--tests/tests_asyncio.py140
-rw-r--r--tests/tests_contrib.py22
-rw-r--r--tests/tests_contrib_logging.py2
-rw-r--r--tests/tests_dask.py2
-rw-r--r--tests/tests_keras.py18
-rw-r--r--tests/tests_main.py63
-rw-r--r--tests/tests_pandas.py54
-rw-r--r--tests/tests_perf.py38
-rw-r--r--tests/tests_rich.py5
-rw-r--r--tests/tests_synchronisation.py25
-rw-r--r--tests/tests_tqdm.py124
-rw-r--r--tests/tests_utils.py51
14 files changed, 335 insertions, 355 deletions
diff --git a/tests/conftest.py b/tests/conftest.py
index 6717044..7960fa0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -18,24 +18,10 @@ def pretest_posttest():
n = len(tqdm._instances)
if n:
tqdm._instances.clear()
- raise EnvironmentError(
- "{0} `tqdm` instances still in existence PRE-test".format(n))
+ raise EnvironmentError(f"{n} `tqdm` instances still in existence PRE-test")
yield
if getattr(tqdm, "_instances", False):
n = len(tqdm._instances)
if n:
tqdm._instances.clear()
- raise EnvironmentError(
- "{0} `tqdm` instances still in existence POST-test".format(n))
-
-
-if sys.version_info[0] > 2:
- @fixture
- def capsysbin(capsysbinary):
- """alias for capsysbinary (py3)"""
- return capsysbinary
-else:
- @fixture
- def capsysbin(capsys):
- """alias for capsys (py2)"""
- return capsys
+ raise EnvironmentError(f"{n} `tqdm` instances still in existence POST-test")
diff --git a/tests/py37_asyncio.py b/tests/py37_asyncio.py
deleted file mode 100644
index 8bf61e7..0000000
--- a/tests/py37_asyncio.py
+++ /dev/null
@@ -1,128 +0,0 @@
-import asyncio
-from functools import partial
-from sys import platform
-from time import time
-
-from tqdm.asyncio import tarange, tqdm_asyncio
-
-from .tests_tqdm import StringIO, closing, mark
-
-tqdm = partial(tqdm_asyncio, miniters=0, mininterval=0)
-trange = partial(tarange, miniters=0, mininterval=0)
-as_completed = partial(tqdm_asyncio.as_completed, miniters=0, mininterval=0)
-gather = partial(tqdm_asyncio.gather, miniters=0, mininterval=0)
-
-
-def count(start=0, step=1):
- i = start
- while True:
- new_start = yield i
- if new_start is None:
- i += step
- else:
- i = new_start
-
-
-async def acount(*args, **kwargs):
- for i in count(*args, **kwargs):
- yield i
-
-
-@mark.asyncio
-async def test_break():
- """Test asyncio break"""
- pbar = tqdm(count())
- async for _ in pbar:
- break
- pbar.close()
-
-
-@mark.asyncio
-async def test_generators(capsys):
- """Test asyncio generators"""
- with tqdm(count(), desc="counter") as pbar:
- async for i in pbar:
- if i >= 8:
- break
- _, err = capsys.readouterr()
- assert '9it' in err
-
- with tqdm(acount(), desc="async_counter") as pbar:
- async for i in pbar:
- if i >= 8:
- break
- _, err = capsys.readouterr()
- assert '9it' in err
-
-
-@mark.asyncio
-async def test_range():
- """Test asyncio range"""
- with closing(StringIO()) as our_file:
- async for _ in tqdm(range(9), desc="range", file=our_file):
- pass
- assert '9/9' in our_file.getvalue()
- our_file.seek(0)
- our_file.truncate()
-
- async for _ in trange(9, desc="trange", file=our_file):
- pass
- assert '9/9' in our_file.getvalue()
-
-
-@mark.asyncio
-async def test_nested():
- """Test asyncio nested"""
- with closing(StringIO()) as our_file:
- async for _ in tqdm(trange(9, desc="inner", file=our_file),
- desc="outer", file=our_file):
- pass
- assert 'inner: 100%' in our_file.getvalue()
- assert 'outer: 100%' in our_file.getvalue()
-
-
-@mark.asyncio
-async def test_coroutines():
- """Test asyncio coroutine.send"""
- with closing(StringIO()) as our_file:
- with tqdm(count(), file=our_file) as pbar:
- async for i in pbar:
- if i == 9:
- pbar.send(-10)
- elif i < 0:
- assert i == -9
- break
- assert '10it' in our_file.getvalue()
-
-
-@mark.slow
-@mark.asyncio
-@mark.parametrize("tol", [0.2 if platform.startswith("darwin") else 0.1])
-async def test_as_completed(capsys, tol):
- """Test asyncio as_completed"""
- for retry in range(3):
- t = time()
- skew = time() - t
- for i in as_completed([asyncio.sleep(0.01 * i) for i in range(30, 0, -1)]):
- await i
- t = time() - t - 2 * skew
- try:
- assert 0.3 * (1 - tol) < t < 0.3 * (1 + tol), t
- _, err = capsys.readouterr()
- assert '30/30' in err
- except AssertionError:
- if retry == 2:
- raise
-
-
-async def double(i):
- return i * 2
-
-
-@mark.asyncio
-async def test_gather(capsys):
- """Test asyncio gather"""
- res = await gather(*map(double, range(30)))
- _, err = capsys.readouterr()
- assert '30/30' in err
- assert res == list(range(0, 30 * 2, 2))
diff --git a/tests/tests_asyncio.py b/tests/tests_asyncio.py
index 6f08926..bdef569 100644
--- a/tests/tests_asyncio.py
+++ b/tests/tests_asyncio.py
@@ -1,11 +1,129 @@
-"""Tests `tqdm.asyncio` on `python>=3.7`."""
-import sys
-
-if sys.version_info[:2] > (3, 6):
- from .py37_asyncio import * # NOQA, pylint: disable=wildcard-import
-else:
- from .tests_tqdm import skip
- try:
- skip("async not supported", allow_module_level=True)
- except TypeError:
- pass
+"""Tests `tqdm.asyncio`."""
+import asyncio
+from functools import partial
+from sys import platform
+from time import time
+
+from tqdm.asyncio import tarange, tqdm_asyncio
+
+from .tests_tqdm import StringIO, closing, mark
+
+tqdm = partial(tqdm_asyncio, miniters=0, mininterval=0)
+trange = partial(tarange, miniters=0, mininterval=0)
+as_completed = partial(tqdm_asyncio.as_completed, miniters=0, mininterval=0)
+gather = partial(tqdm_asyncio.gather, miniters=0, mininterval=0)
+
+
+def count(start=0, step=1):
+ i = start
+ while True:
+ new_start = yield i
+ if new_start is None:
+ i += step
+ else:
+ i = new_start
+
+
+async def acount(*args, **kwargs):
+ for i in count(*args, **kwargs):
+ yield i
+
+
+@mark.asyncio
+async def test_break():
+ """Test asyncio break"""
+ pbar = tqdm(count())
+ async for _ in pbar:
+ break
+ pbar.close()
+
+
+@mark.asyncio
+async def test_generators(capsys):
+ """Test asyncio generators"""
+ with tqdm(count(), desc="counter") as pbar:
+ async for i in pbar:
+ if i >= 8:
+ break
+ _, err = capsys.readouterr()
+ assert '9it' in err
+
+ with tqdm(acount(), desc="async_counter") as pbar:
+ async for i in pbar:
+ if i >= 8:
+ break
+ _, err = capsys.readouterr()
+ assert '9it' in err
+
+
+@mark.asyncio
+async def test_range():
+ """Test asyncio range"""
+ with closing(StringIO()) as our_file:
+ async for _ in tqdm(range(9), desc="range", file=our_file):
+ pass
+ assert '9/9' in our_file.getvalue()
+ our_file.seek(0)
+ our_file.truncate()
+
+ async for _ in trange(9, desc="trange", file=our_file):
+ pass
+ assert '9/9' in our_file.getvalue()
+
+
+@mark.asyncio
+async def test_nested():
+ """Test asyncio nested"""
+ with closing(StringIO()) as our_file:
+ async for _ in tqdm(trange(9, desc="inner", file=our_file),
+ desc="outer", file=our_file):
+ pass
+ assert 'inner: 100%' in our_file.getvalue()
+ assert 'outer: 100%' in our_file.getvalue()
+
+
+@mark.asyncio
+async def test_coroutines():
+ """Test asyncio coroutine.send"""
+ with closing(StringIO()) as our_file:
+ with tqdm(count(), file=our_file) as pbar:
+ async for i in pbar:
+ if i == 9:
+ pbar.send(-10)
+ elif i < 0:
+ assert i == -9
+ break
+ assert '10it' in our_file.getvalue()
+
+
+@mark.slow
+@mark.asyncio
+@mark.parametrize("tol", [0.2 if platform.startswith("darwin") else 0.1])
+async def test_as_completed(capsys, tol):
+ """Test asyncio as_completed"""
+ for retry in range(3):
+ t = time()
+ skew = time() - t
+ for i in as_completed([asyncio.sleep(0.01 * i) for i in range(30, 0, -1)]):
+ await i
+ t = time() - t - 2 * skew
+ try:
+ assert 0.3 * (1 - tol) < t < 0.3 * (1 + tol), t
+ _, err = capsys.readouterr()
+ assert '30/30' in err
+ except AssertionError:
+ if retry == 2:
+ raise
+
+
+async def double(i):
+ return i * 2
+
+
+@mark.asyncio
+async def test_gather(capsys):
+ """Test asyncio gather"""
+ res = await gather(*map(double, range(30)))
+ _, err = capsys.readouterr()
+ assert '30/30' in err
+ assert res == list(range(0, 30 * 2, 2))
diff --git a/tests/tests_contrib.py b/tests/tests_contrib.py
index 69a1cad..65c8cd5 100644
--- a/tests/tests_contrib.py
+++ b/tests/tests_contrib.py
@@ -1,8 +1,6 @@
"""
Tests for `tqdm.contrib`.
"""
-import sys
-
import pytest
from tqdm import tqdm
@@ -47,12 +45,9 @@ def test_zip(tqdm_kwargs):
with closing(StringIO()) as our_file:
a = range(9)
b = [i + 1 for i in a]
- if sys.version_info[:1] < (3,):
- assert tzip(a, b, file=our_file, **tqdm_kwargs) == zip(a, b)
- else:
- gen = tzip(a, b, file=our_file, **tqdm_kwargs)
- assert gen != list(zip(a, b))
- assert list(gen) == list(zip(a, b))
+ gen = tzip(a, b, file=our_file, **tqdm_kwargs)
+ assert gen != list(zip(a, b))
+ assert list(gen) == list(zip(a, b))
@pytest.mark.parametrize("tqdm_kwargs", [{}, {"tqdm_class": tqdm}])
@@ -61,11 +56,6 @@ def test_map(tqdm_kwargs):
with closing(StringIO()) as our_file:
a = range(9)
b = [i + 1 for i in a]
- if sys.version_info[:1] < (3,):
- assert tmap(lambda x: x + 1, a, file=our_file, **tqdm_kwargs) == map(
- incr, a
- )
- else:
- gen = tmap(lambda x: x + 1, a, file=our_file, **tqdm_kwargs)
- assert gen != b
- assert list(gen) == b
+ gen = tmap(lambda x: x + 1, a, file=our_file, **tqdm_kwargs)
+ assert gen != b
+ assert list(gen) == b
diff --git a/tests/tests_contrib_logging.py b/tests/tests_contrib_logging.py
index 6f675dd..b8d60b6 100644
--- a/tests/tests_contrib_logging.py
+++ b/tests/tests_contrib_logging.py
@@ -1,7 +1,5 @@
# pylint: disable=missing-module-docstring, missing-class-docstring
# pylint: disable=missing-function-docstring, no-self-use
-from __future__ import absolute_import
-
import logging
import logging.handlers
import sys
diff --git a/tests/tests_dask.py b/tests/tests_dask.py
index 8bf4b64..16992bf 100644
--- a/tests/tests_dask.py
+++ b/tests/tests_dask.py
@@ -1,5 +1,3 @@
-from __future__ import division
-
from time import sleep
from .tests_tqdm import importorskip, mark
diff --git a/tests/tests_keras.py b/tests/tests_keras.py
index 220f946..5b7db28 100644
--- a/tests/tests_keras.py
+++ b/tests/tests_keras.py
@@ -1,5 +1,3 @@
-from __future__ import division
-
from .tests_tqdm import importorskip, mark
pytestmark = mark.slow
@@ -41,8 +39,8 @@ def test_keras(capsys):
verbose=0)])
_, res = capsys.readouterr()
assert "training: " in res
- assert "{epochs}/{epochs}".format(epochs=epochs) in res
- assert "{batches}/{batches}".format(batches=batches) not in res
+ assert f"{epochs}/{epochs}" in res
+ assert f"{batches}/{batches}" not in res
# full (epoch and batch) progress
model.fit(
@@ -60,8 +58,8 @@ def test_keras(capsys):
verbose=2)])
_, res = capsys.readouterr()
assert "training: " in res
- assert "{epochs}/{epochs}".format(epochs=epochs) in res
- assert "{batches}/{batches}".format(batches=batches) in res
+ assert f"{epochs}/{epochs}" in res
+ assert f"{batches}/{batches}" in res
# auto-detect epochs and batches
model.fit(
@@ -73,8 +71,8 @@ def test_keras(capsys):
callbacks=[TqdmCallback(desc="training", verbose=2)])
_, res = capsys.readouterr()
assert "training: " in res
- assert "{epochs}/{epochs}".format(epochs=epochs) in res
- assert "{batches}/{batches}".format(batches=batches) in res
+ assert f"{epochs}/{epochs}" in res
+ assert f"{batches}/{batches}" in res
# continue training (start from epoch != 0)
initial_epoch = 3
@@ -89,5 +87,5 @@ def test_keras(capsys):
miniters=1, mininterval=0, maxinterval=0)])
_, res = capsys.readouterr()
assert "training: " in res
- assert "{epochs}/{epochs}".format(epochs=initial_epoch - 1) not in res
- assert "{epochs}/{epochs}".format(epochs=epochs) in res
+ assert f"{initial_epoch - 1}/{initial_epoch - 1}" not in res
+ assert f"{epochs}/{epochs}" in res
diff --git a/tests/tests_main.py b/tests/tests_main.py
index 0523cc7..039ce33 100644
--- a/tests/tests_main.py
+++ b/tests/tests_main.py
@@ -8,17 +8,17 @@ from os import linesep
from tqdm.cli import TqdmKeyError, TqdmTypeError, main
from tqdm.utils import IS_WIN
-from .tests_tqdm import BytesIO, _range, closing, mark, raises
+from .tests_tqdm import BytesIO, closing, mark, raises
def restore_sys(func):
- """Decorates `func(capsysbin)` to save & restore `sys.(stdin|argv)`."""
+ """Decorates `func(capsysbinary)` to save & restore `sys.(stdin|argv)`."""
@wraps(func)
- def inner(capsysbin):
- """function requiring capsysbin which may alter `sys.(stdin|argv)`"""
+ def inner(capsysbinary):
+ """function requiring capsysbinary which may alter `sys.(stdin|argv)`"""
_SYS = sys.stdin, sys.argv
try:
- res = func(capsysbin)
+ res = func(capsysbinary)
finally:
sys.stdin, sys.argv = _SYS
return res
@@ -58,7 +58,7 @@ def test_main_import():
N = 123
_SYS = sys.stdin, sys.argv
# test direct import
- sys.stdin = [str(i).encode() for i in _range(N)]
+ sys.stdin = [str(i).encode() for i in range(N)]
sys.argv = ['', '--desc', 'Test CLI import',
'--ascii', 'True', '--unit_scale', 'True']
try:
@@ -68,19 +68,19 @@ def test_main_import():
@restore_sys
-def test_main_bytes(capsysbin):
+def test_main_bytes(capsysbinary):
"""Test CLI --bytes"""
N = 123
# test --delim
- IN_DATA = '\0'.join(map(str, _range(N))).encode()
+ IN_DATA = '\0'.join(map(str, range(N))).encode()
with closing(BytesIO()) as sys.stdin:
sys.stdin.write(IN_DATA)
# sys.stdin.write(b'\xff') # TODO
sys.stdin.seek(0)
main(sys.stderr, ['--desc', 'Test CLI delim', '--ascii', 'True',
'--delim', r'\0', '--buf_size', '64'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert out == IN_DATA
assert str(N) + "it" in err.decode("U8")
@@ -90,27 +90,26 @@ def test_main_bytes(capsysbin):
sys.stdin.write(IN_DATA)
sys.stdin.seek(0)
main(sys.stderr, ['--ascii', '--bytes=True', '--unit_scale', 'False'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert out == IN_DATA
assert str(len(IN_DATA)) + "B" in err.decode("U8")
-@mark.skipif(sys.version_info[0] == 2, reason="no caplog on py2")
-def test_main_log(capsysbin, caplog):
+def test_main_log(capsysbinary, caplog):
"""Test CLI --log"""
_SYS = sys.stdin, sys.argv
N = 123
- sys.stdin = [(str(i) + '\n').encode() for i in _range(N)]
+ sys.stdin = [(str(i) + '\n').encode() for i in range(N)]
IN_DATA = b''.join(sys.stdin)
try:
with caplog.at_level(logging.INFO):
main(sys.stderr, ['--log', 'INFO'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
assert not caplog.record_tuples
with caplog.at_level(logging.DEBUG):
main(sys.stderr, ['--log', 'DEBUG'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
assert caplog.record_tuples
finally:
@@ -118,39 +117,39 @@ def test_main_log(capsysbin, caplog):
@restore_sys
-def test_main(capsysbin):
+def test_main(capsysbinary):
"""Test misc CLI options"""
N = 123
- sys.stdin = [(str(i) + '\n').encode() for i in _range(N)]
+ sys.stdin = [(str(i) + '\n').encode() for i in range(N)]
IN_DATA = b''.join(sys.stdin)
# test --tee
main(sys.stderr, ['--mininterval', '0', '--miniters', '1'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
assert N <= len(err.split(b"\r")) < N + 5
len_err = len(err)
main(sys.stderr, ['--tee', '--mininterval', '0', '--miniters', '1'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
# spaces to clear intermediate lines could increase length
assert len_err + len(norm(out)) <= len(err)
# test --null
main(sys.stderr, ['--null'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert not out and b"123/123" in err
# test integer --update
main(sys.stderr, ['--update'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA
assert (str(N // 2 * N) + "it").encode() in err, "expected arithmetic sum formula"
# test integer --update_to
main(sys.stderr, ['--update-to'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA
assert (str(N - 1) + "it").encode() in err
assert (str(N) + "it").encode() not in err
@@ -161,23 +160,23 @@ def test_main(capsysbin):
# test integer --update --delim
sys.stdin.seek(0)
main(sys.stderr, ['--update', '--delim', 'D'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert out == IN_DATA.replace(b'\n', b'D')
assert (str(N // 2 * N) + "it").encode() in err, "expected arithmetic sum"
# test integer --update_to --delim
sys.stdin.seek(0)
main(sys.stderr, ['--update-to', '--delim', 'D'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert out == IN_DATA.replace(b'\n', b'D')
assert (str(N - 1) + "it").encode() in err
assert (str(N) + "it").encode() not in err
# test float --update_to
- sys.stdin = [(str(i / 2.0) + '\n').encode() for i in _range(N)]
+ sys.stdin = [(str(i / 2.0) + '\n').encode() for i in range(N)]
IN_DATA = b''.join(sys.stdin)
main(sys.stderr, ['--update-to'])
- out, err = capsysbin.readouterr()
+ out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA
assert (str((N - 1) / 2.0) + "it").encode() in err
assert (str(N / 2.0) + "it").encode() not in err
@@ -213,30 +212,30 @@ def test_comppath(tmp_path):
@restore_sys
-def test_exceptions(capsysbin):
+def test_exceptions(capsysbinary):
"""Test CLI Exceptions"""
N = 123
- sys.stdin = [str(i) + '\n' for i in _range(N)]
+ sys.stdin = [str(i) + '\n' for i in range(N)]
IN_DATA = ''.join(sys.stdin).encode()
with raises(TqdmKeyError, match="bad_arg_u_ment"):
main(sys.stderr, argv=['-ascii', '-unit_scale', '--bad_arg_u_ment', 'foo'])
- out, _ = capsysbin.readouterr()
+ out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
with raises(TqdmTypeError, match="invalid_bool_value"):
main(sys.stderr, argv=['-ascii', '-unit_scale', 'invalid_bool_value'])
- out, _ = capsysbin.readouterr()
+ out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
with raises(TqdmTypeError, match="invalid_int_value"):
main(sys.stderr, argv=['-ascii', '--total', 'invalid_int_value'])
- out, _ = capsysbin.readouterr()
+ out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
with raises(TqdmKeyError, match="Can only have one of --"):
main(sys.stderr, argv=['--update', '--update_to'])
- out, _ = capsysbin.readouterr()
+ out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
# test SystemExits
diff --git a/tests/tests_pandas.py b/tests/tests_pandas.py
index 334a97c..09dff1e 100644
--- a/tests/tests_pandas.py
+++ b/tests/tests_pandas.py
@@ -4,6 +4,7 @@ from .tests_tqdm import StringIO, closing, importorskip, mark, skip
pytestmark = mark.slow
+np = importorskip('numpy')
random = importorskip('numpy.random')
rand = random.rand
randint = random.randint
@@ -39,8 +40,8 @@ def test_pandas_rolling_expanding():
our_file.seek(0)
if our_file.getvalue().count(exres) < 2:
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
- exres + " at least twice.", our_file.read()))
+ raise AssertionError(
+ f"\nExpected:\n{exres} at least twice.\nIn:\n{our_file.read()}\n")
def test_pandas_series():
@@ -62,10 +63,11 @@ def test_pandas_series():
our_file.seek(0)
if our_file.getvalue().count(exres) < 2:
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
- exres + " at least twice.", our_file.read()))
+ raise AssertionError(
+ f"\nExpected:\n{exres} at least twice.\nIn:\n{our_file.read()}\n")
+@mark.filterwarnings("ignore:DataFrame.applymap has been deprecated:FutureWarning")
def test_pandas_data_frame():
"""Test pandas.DataFrame.progress_apply and .progress_applymap"""
with closing(StringIO()) as our_file:
@@ -80,6 +82,12 @@ def test_pandas_data_frame():
res2 = df.applymap(task_func)
assert res1.equals(res2)
+ # map
+ if hasattr(df, 'map'): # pandas>=2.1.0
+ res1 = df.progress_map(task_func)
+ res2 = df.map(task_func)
+ assert res1.equals(res2)
+
# apply unhashable
res1 = []
df.progress_apply(res1.extend)
@@ -94,8 +102,8 @@ def test_pandas_data_frame():
our_file.seek(0)
if our_file.read().count('100%') < 3:
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
- '100% at least three times', our_file.read()))
+ raise AssertionError(
+ f"\nExpected:\n100% at least three times\nIn:\n{our_file.read()}\n")
# apply_map, apply axis=0, apply axis=1
expects = ['20000/20000', '200/200', '100/100']
@@ -103,10 +111,12 @@ def test_pandas_data_frame():
our_file.seek(0)
if our_file.getvalue().count(exres) < 1:
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:\n {1}\n".format(
- exres + " at least once.", our_file.read()))
+ raise AssertionError(
+ f"\nExpected:\n{exres} at least once.\nIn:\n{our_file.read()}\n")
+@mark.filterwarnings(
+ "ignore:DataFrameGroupBy.apply operated on the grouping columns:DeprecationWarning")
def test_pandas_groupby_apply():
"""Test pandas.DataFrame.groupby(...).progress_apply"""
with closing(StringIO()) as our_file:
@@ -119,8 +129,8 @@ def test_pandas_groupby_apply():
dfs.groupby(['a']).progress_apply(lambda x: None)
df2 = df = pd.DataFrame({'a': randint(1, 8, 10000), 'b': rand(10000)})
- res1 = df2.groupby("a").apply(max)
- res2 = df2.groupby("a").progress_apply(max)
+ res1 = df2.groupby("a").apply(np.maximum.reduce)
+ res2 = df2.groupby("a").progress_apply(np.maximum.reduce)
assert res1.equals(res2)
our_file.seek(0)
@@ -130,8 +140,7 @@ def test_pandas_groupby_apply():
nexres = '100%|##########|'
if nexres in our_file.read():
our_file.seek(0)
- raise AssertionError("\nDid not expect:\n{0}\nIn:{1}\n".format(
- nexres, our_file.read()))
+ raise AssertionError(f"\nDid not expect:\n{nexres}\nIn:{our_file.read()}\n")
with closing(StringIO()) as our_file:
tqdm.pandas(file=our_file, leave=True, ascii=True)
@@ -140,26 +149,28 @@ def test_pandas_groupby_apply():
dfs.loc[0] = [2, 1, 1]
dfs['d'] = 100
- expects = ['500/500', '1/1', '4/4', '2/2']
+ expects = ['500/500', '1/1', '4/4', '4/4']
dfs.groupby(dfs.index).progress_apply(lambda x: None)
dfs.groupby('d').progress_apply(lambda x: None)
- dfs.groupby(dfs.columns, axis=1).progress_apply(lambda x: None)
- dfs.groupby([2, 2, 1, 1], axis=1).progress_apply(lambda x: None)
+ dfs.T.groupby(dfs.columns).progress_apply(lambda x: None)
+ dfs.T.groupby([2, 2, 1, 1]).progress_apply(lambda x: None)
our_file.seek(0)
if our_file.read().count('100%') < 4:
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
- '100% at least four times', our_file.read()))
+ raise AssertionError(
+ f"\nExpected:\n100% at least four times\nIn:\n{our_file.read()}\n")
for exres in expects:
our_file.seek(0)
if our_file.getvalue().count(exres) < 1:
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:\n {1}\n".format(
- exres + " at least once.", our_file.read()))
+ raise AssertionError(
+ f"\nExpected:\n{exres} at least once.\nIn:\n{our_file.read()}\n")
+@mark.filterwarnings(
+ "ignore:DataFrameGroupBy.apply operated on the grouping columns:DeprecationWarning")
def test_pandas_leave():
"""Test pandas with `leave=True`"""
with closing(StringIO()) as our_file:
@@ -172,8 +183,7 @@ def test_pandas_leave():
exres = '100%|##########| 100/100'
if exres not in our_file.read():
our_file.seek(0)
- raise AssertionError("\nExpected:\n{0}\nIn:{1}\n".format(
- exres, our_file.read()))
+ raise AssertionError(f"\nExpected:\n{exres}\nIn:{our_file.read()}\n")
def test_pandas_apply_args_deprecation():
@@ -195,6 +205,8 @@ def test_pandas_apply_args_deprecation():
"keyword arguments instead"))
+@mark.filterwarnings(
+ "ignore:DataFrameGroupBy.apply operated on the grouping columns:DeprecationWarning")
def test_pandas_deprecation():
"""Test bar object instance as argument deprecation"""
try:
diff --git a/tests/tests_perf.py b/tests/tests_perf.py
index 552a169..a6c4823 100644
--- a/tests/tests_perf.py
+++ b/tests/tests_perf.py
@@ -1,5 +1,3 @@
-from __future__ import division, print_function
-
import sys
from contextlib import contextmanager
from functools import wraps
@@ -14,7 +12,7 @@ except ImportError:
from tqdm import tqdm, trange
-from .tests_tqdm import _range, importorskip, mark, patch_lock, skip
+from .tests_tqdm import importorskip, mark, patch_lock, skip
pytestmark = mark.slow
@@ -98,10 +96,7 @@ def simple_progress(iterable=None, total=None, file=sys.stdout, desc='',
def format_interval(t):
mins, s = divmod(int(t), 60)
h, m = divmod(mins, 60)
- if h:
- return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s)
- else:
- return '{0:02d}:{1:02d}'.format(m, s)
+ return f'{h:d}:{m:02d}:{s:02d}' if h else f'{m:02d}:{s:02d}'
def update_and_print(i=1):
n[0] += i
@@ -143,20 +138,15 @@ def simple_progress(iterable=None, total=None, file=sys.stdout, desc='',
update_and_print(0)
if iterable is not None:
return update_and_yield()
- else:
- return update_and_print
+ return update_and_print
def assert_performance(thresh, name_left, time_left, name_right, time_right):
"""raises if time_left > thresh * time_right"""
if time_left > thresh * time_right:
raise ValueError(
- ('{name[0]}: {time[0]:f}, '
- '{name[1]}: {time[1]:f}, '
- 'ratio {ratio:f} > {thresh:f}').format(
- name=(name_left, name_right),
- time=(time_left, time_right),
- ratio=time_left / time_right, thresh=thresh))
+ f'{name_left}: {time_left:f}, {name_right}: {time_right:f}'
+ f', ratio {time_left / time_right:f} > {thresh:f}')
@retry_on_except()
@@ -173,7 +163,7 @@ def test_iter_basic_overhead():
a = 0
with relative_timer() as time_bench:
- for i in _range(total):
+ for i in range(total):
a += i
sys.stdout.write(str(a))
@@ -188,13 +178,13 @@ def test_manual_basic_overhead():
with tqdm(total=total * 10, leave=True) as t:
a = 0
with relative_timer() as time_tqdm:
- for i in _range(total):
+ for i in range(total):
a += i
t.update(10)
a = 0
with relative_timer() as time_bench:
- for i in _range(total):
+ for i in range(total):
a += i
sys.stdout.write(str(a))
@@ -249,7 +239,7 @@ def test_iter_overhead_hard():
a = 0
with relative_timer() as time_bench:
- for i in _range(total):
+ for i in range(total):
a += i
sys.stdout.write(("%i" % a) * 40)
@@ -265,13 +255,13 @@ def test_manual_overhead_hard():
mininterval=0, maxinterval=0) as t:
a = 0
with relative_timer() as time_tqdm:
- for i in _range(total):
+ for i in range(total):
a += i
t.update(10)
a = 0
with relative_timer() as time_bench:
- for i in _range(total):
+ for i in range(total):
a += i
sys.stdout.write(("%i" % a) * 40)
@@ -292,7 +282,7 @@ def test_iter_overhead_simplebar_hard():
assert a == (total ** 2 - total) / 2.0
a = 0
- s = simple_progress(_range(total), leave=True,
+ s = simple_progress(range(total), leave=True,
miniters=1, mininterval=0)
with relative_timer() as time_bench:
for i in s:
@@ -310,7 +300,7 @@ def test_manual_overhead_simplebar_hard():
mininterval=0, maxinterval=0) as t:
a = 0
with relative_timer() as time_tqdm:
- for i in _range(total):
+ for i in range(total):
a += i
t.update(10)
@@ -318,7 +308,7 @@ def test_manual_overhead_simplebar_hard():
miniters=1, mininterval=0)
a = 0
with relative_timer() as time_bench:
- for i in _range(total):
+ for i in range(total):
a += i
simplebar_update(10)
diff --git a/tests/tests_rich.py b/tests/tests_rich.py
index c75e246..2fff78c 100644
--- a/tests/tests_rich.py
+++ b/tests/tests_rich.py
@@ -1,10 +1,7 @@
"""Test `tqdm.rich`."""
-import sys
+from .tests_tqdm import importorskip
-from .tests_tqdm import importorskip, mark
-
-@mark.skipif(sys.version_info[:3] < (3, 6, 1), reason="`rich` needs py>=3.6.1")
def test_rich_import():
"""Test `tqdm.rich` import"""
importorskip('tqdm.rich')
diff --git a/tests/tests_synchronisation.py b/tests/tests_synchronisation.py
index 7ee55fb..0cd9190 100644
--- a/tests/tests_synchronisation.py
+++ b/tests/tests_synchronisation.py
@@ -1,13 +1,9 @@
-from __future__ import division
-
-import sys
from functools import wraps
from threading import Event
from time import sleep, time
from tqdm import TMonitor, tqdm, trange
-from .tests_perf import retry_on_except
from .tests_tqdm import StringIO, closing, importorskip, patch_lock, skip
@@ -37,18 +33,13 @@ class Time(object):
sleep(0.000001) # sleep to allow interrupt (instead of pass)
-def FakeEvent():
+class FakeEvent(Event):
"""patched `threading.Event` where `wait()` uses `Time.fake_sleep()`"""
- event = Event() # not a class in py2 so can't inherit
-
- def wait(timeout=None):
+ def wait(self, timeout=None):
"""uses Time.fake_sleep"""
if timeout is not None:
Time.fake_sleep(timeout)
- return event.is_set()
-
- event.wait = wait
- return event
+ return self.is_set()
def patch_sleep(func):
@@ -206,19 +197,11 @@ def test_imap():
assert res[-1] == 100
-# py2: locks won't propagate to incr_bar so may cause `AttributeError`
-@retry_on_except(n=3 if sys.version_info < (3,) else 1, check_cpu_time=False)
@patch_lock(thread=True)
def test_threadpool():
"""Test concurrent.futures.ThreadPoolExecutor"""
ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor
with ThreadPoolExecutor(8) as pool:
- try:
- res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
- except AttributeError:
- if sys.version_info < (3,):
- skip("not supported on py2")
- else:
- raise
+ res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
assert sum(res) == sum(range(1, 101))
diff --git a/tests/tests_tqdm.py b/tests/tests_tqdm.py
index bba457a..d0ba14f 100644
--- a/tests/tests_tqdm.py
+++ b/tests/tests_tqdm.py
@@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
# Advice: use repr(our_file.read()) to print the full output of tqdm
# (else '\r' will replace the previous lines and you'll see only the latest.
-from __future__ import print_function
-
import csv
import os
import re
@@ -37,16 +35,6 @@ if getattr(StringIO, '__exit__', False) and getattr(StringIO, '__enter__', False
else:
from contextlib import closing
-try:
- _range = xrange
-except NameError:
- _range = range
-
-try:
- _unicode = unicode
-except NameError:
- _unicode = str
-
nt_and_no_colorama = False
if os.name == 'nt':
try:
@@ -201,6 +189,8 @@ def test_format_num():
assert float(format_num(1337)) == 1337
assert format_num(int(1e6)) == '1e+6'
assert format_num(1239876) == '1' '239' '876'
+ assert format_num(0.00001234) == '1.23e-5'
+ assert format_num(-0.1234) == '-0.123'
def test_format_meter():
@@ -271,11 +261,10 @@ def test_format_meter():
20, 100, 12, ncols=14, rate=8.1,
bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == " 20%|" + unich(0x258d) + " |20/100"
# Check wide characters
- if sys.version_info >= (3,):
- assert format_meter(0, 1000, 13, ncols=68, prefix='fullwidth: ') == (
- "fullwidth: 0%| | 0/1000 [00:13<?, ?it/s]")
- assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == (
- "ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]")
+ assert format_meter(0, 1000, 13, ncols=68, prefix='fullwidth: ') == (
+ "fullwidth: 0%| | 0/1000 [00:13<?, ?it/s]")
+ assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == (
+ "ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]")
# Check that bar_format can print only {bar} or just one side
assert format_meter(20, 100, 12, ncols=2, rate=8.1,
bar_format=r'{bar}') == unich(0x258d) + " "
@@ -328,11 +317,11 @@ def test_si_format():
def test_bar_formatspec():
"""Test Bar.__format__ spec"""
- assert "{0:5a}".format(Bar(0.3)) == "#5 "
- assert "{0:2}".format(Bar(0.5, charset=" .oO0")) == "0 "
- assert "{0:2a}".format(Bar(0.5, charset=" .oO0")) == "# "
- assert "{0:-6a}".format(Bar(0.5, 10)) == '## '
- assert "{0:2b}".format(Bar(0.5, 10)) == ' '
+ assert f"{Bar(0.3):5a}" == "#5 "
+ assert f"{Bar(0.5, charset=' .oO0'):2}" == "0 "
+ assert f"{Bar(0.5, charset=' .oO0'):2a}" == "# "
+ assert f"{Bar(0.5, 10):-6a}" == '## '
+ assert f"{Bar(0.5, 10):2b}" == ' '
def test_all_defaults():
@@ -401,7 +390,7 @@ def test_iterate_over_csv_rows():
# Create a test csv pseudo file
with closing(StringIO()) as test_csv_file:
writer = csv.writer(test_csv_file)
- for _ in _range(3):
+ for _ in range(3):
writer.writerow(['test'] * 3)
test_csv_file.seek(0)
@@ -415,7 +404,7 @@ def test_iterate_over_csv_rows():
def test_file_output():
"""Test output to arbitrary file-like objects"""
with closing(StringIO()) as our_file:
- for i in tqdm(_range(3), file=our_file):
+ for i in tqdm(range(3), file=our_file):
if i == 1:
our_file.seek(0)
assert '0/3' in our_file.read()
@@ -424,14 +413,14 @@ def test_file_output():
def test_leave_option():
"""Test `leave=True` always prints info about the last iteration"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, leave=True):
+ for _ in tqdm(range(3), file=our_file, leave=True):
pass
res = our_file.getvalue()
assert '| 3/3 ' in res
assert '\n' == res[-1] # not '\r'
with closing(StringIO()) as our_file2:
- for _ in tqdm(_range(3), file=our_file2, leave=False):
+ for _ in tqdm(range(3), file=our_file2, leave=False):
pass
assert '| 3/3 ' not in our_file2.getvalue()
@@ -452,7 +441,7 @@ def test_trange():
def test_min_interval():
"""Test mininterval"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, mininterval=1e-10):
+ for _ in tqdm(range(3), file=our_file, mininterval=1e-10):
pass
assert " 0%| | 0/3 [00:00<" in our_file.getvalue()
@@ -484,7 +473,7 @@ def test_max_interval():
t.update(bigstep)
t2.update(bigstep)
# The next iterations should not trigger maxinterval (step 10)
- for _ in _range(4):
+ for _ in range(4):
t.update(smallstep)
t2.update(smallstep)
timer.sleep(1e-5)
@@ -504,7 +493,7 @@ def test_max_interval():
# Increase 10 iterations at once
t.update(bigstep)
# The next iterations should trigger maxinterval (step 5)
- for _ in _range(4):
+ for _ in range(4):
t.update(smallstep)
timer.sleep(1e-2)
@@ -513,7 +502,7 @@ def test_max_interval():
# Test iteration based tqdm with maxinterval effect
timer = DiscreteTimer()
with closing(StringIO()) as our_file:
- with tqdm(_range(total), file=our_file, miniters=None,
+ with tqdm(range(total), file=our_file, miniters=None,
mininterval=1e-5, smoothing=1, maxinterval=1e-4) as t2:
cpu_timify(t2, timer)
@@ -560,9 +549,9 @@ def test_max_interval():
mininterval = 0.1
maxinterval = 10
with closing(StringIO()) as our_file:
- t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1,
+ t1 = tqdm(range(total), file=our_file, miniters=None, smoothing=1,
mininterval=mininterval, maxinterval=maxinterval)
- t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1,
+ t2 = tqdm(range(total), file=our_file, miniters=None, smoothing=1,
mininterval=0, maxinterval=maxinterval)
cpu_timify(t1, timer1)
@@ -605,7 +594,7 @@ def test_delay():
def test_min_iters():
"""Test miniters"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=2):
+ for _ in tqdm(range(3), file=our_file, leave=True, mininterval=0, miniters=2):
pass
out = our_file.getvalue()
@@ -615,7 +604,7 @@ def test_min_iters():
assert '| 3/3 ' in out
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=1):
+ for _ in tqdm(range(3), file=our_file, leave=True, mininterval=0, miniters=1):
pass
out = our_file.getvalue()
@@ -669,7 +658,7 @@ def test_dynamic_min_iters():
# Check iterable based tqdm
with closing(StringIO()) as our_file:
- t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None,
+ t = tqdm(range(10), file=our_file, miniters=None, mininterval=None,
smoothing=0.5)
for _ in t:
pass
@@ -677,7 +666,7 @@ def test_dynamic_min_iters():
# No smoothing
with closing(StringIO()) as our_file:
- t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None,
+ t = tqdm(range(10), file=our_file, miniters=None, mininterval=None,
smoothing=0)
for _ in t:
pass
@@ -685,7 +674,7 @@ def test_dynamic_min_iters():
# No dynamic_miniters (miniters is fixed manually)
with closing(StringIO()) as our_file:
- t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None)
+ t = tqdm(range(10), file=our_file, miniters=1, mininterval=None)
for _ in t:
pass
assert not t.dynamic_miniters
@@ -694,12 +683,12 @@ def test_dynamic_min_iters():
def test_big_min_interval():
"""Test large mininterval"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(2), file=our_file, mininterval=1E10):
+ for _ in tqdm(range(2), file=our_file, mininterval=1E10):
pass
assert '50%' not in our_file.getvalue()
with closing(StringIO()) as our_file:
- with tqdm(_range(2), file=our_file, mininterval=1E10) as t:
+ with tqdm(range(2), file=our_file, mininterval=1E10) as t:
t.update()
t.update()
assert '50%' not in our_file.getvalue()
@@ -718,10 +707,10 @@ def test_smoothed_dynamic_min_iters():
timer.sleep(1)
t.update(10)
# The next iterations should be partially skipped
- for _ in _range(2):
+ for _ in range(2):
timer.sleep(1)
t.update(4)
- for _ in _range(20):
+ for _ in range(20):
timer.sleep(1)
t.update()
@@ -750,7 +739,7 @@ def test_smoothed_dynamic_min_iters_with_min_interval():
t.update(10)
timer.sleep(1e-2)
- for _ in _range(4):
+ for _ in range(4):
t.update()
timer.sleep(1e-2)
out = our_file.getvalue()
@@ -758,7 +747,7 @@ def test_smoothed_dynamic_min_iters_with_min_interval():
with closing(StringIO()) as our_file:
# Test iteration-based tqdm
- with tqdm(_range(total), file=our_file, miniters=None,
+ with tqdm(range(total), file=our_file, miniters=None,
mininterval=0.01, smoothing=1, maxinterval=0) as t2:
cpu_timify(t2, timer)
@@ -817,7 +806,7 @@ def _rlock_creation_target():
def test_disable():
"""Test disable"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, disable=True):
+ for _ in tqdm(range(3), file=our_file, disable=True):
pass
assert our_file.getvalue() == ''
@@ -831,7 +820,7 @@ def test_disable():
def test_infinite_total():
"""Test treatment of infinite total"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, total=float("inf")):
+ for _ in tqdm(range(3), file=our_file, total=float("inf")):
pass
@@ -852,7 +841,7 @@ def test_nototal():
def test_unit():
"""Test SI unit prefix"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"):
+ for _ in tqdm(range(3), file=our_file, miniters=1, unit="bytes"):
pass
assert 'bytes/s' in our_file.getvalue()
@@ -866,7 +855,7 @@ def test_ascii():
# Test ascii bar
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(3), total=15, file=our_file, miniters=1,
+ for _ in tqdm(range(3), total=15, file=our_file, miniters=1,
mininterval=0, ascii=True):
pass
res = our_file.getvalue().strip("\r").split("\r")
@@ -877,7 +866,7 @@ def test_ascii():
# Test unicode bar
with closing(UnicodeIO()) as our_file:
with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t:
- for _ in _range(3):
+ for _ in range(3):
t.update()
res = our_file.getvalue().strip("\r").split("\r")
assert u"7%|\u258b" in res[1]
@@ -887,7 +876,7 @@ def test_ascii():
# Test custom bar
for bars in [" .oO0", " #"]:
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(len(bars) - 1), file=our_file, miniters=1,
+ for _ in tqdm(range(len(bars) - 1), file=our_file, miniters=1,
mininterval=0, ascii=bars, ncols=27):
pass
res = our_file.getvalue().strip("\r").split("\r")
@@ -949,8 +938,7 @@ def test_close():
res = our_file.getvalue()
assert res[-1] == '\n'
if not res.startswith(exres):
- raise AssertionError("\n<<< Expected:\n{0}\n>>> Got:\n{1}\n===".format(
- exres + ', ...it/s]\n', our_file.getvalue()))
+ raise AssertionError(f"\n<<< Expected:\n{exres}, ...it/s]\n>>> Got:\n{res}\n===")
# Closing after the output stream has closed
with closing(StringIO()) as our_file:
@@ -976,7 +964,7 @@ def test_smoothing():
# -- Test disabling smoothing
with closing(StringIO()) as our_file:
- with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t:
+ with tqdm(range(3), file=our_file, smoothing=None, leave=True) as t:
cpu_timify(t, timer)
for _ in t:
@@ -987,11 +975,11 @@ def test_smoothing():
# 1st case: no smoothing (only use average)
with closing(StringIO()) as our_file2:
with closing(StringIO()) as our_file:
- t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True,
+ t = tqdm(range(3), file=our_file2, smoothing=None, leave=True,
miniters=1, mininterval=0)
cpu_timify(t, timer)
- with tqdm(_range(3), file=our_file, smoothing=None, leave=True,
+ with tqdm(range(3), file=our_file, smoothing=None, leave=True,
miniters=1, mininterval=0) as t2:
cpu_timify(t2, timer)
@@ -1017,11 +1005,11 @@ def test_smoothing():
# 2nd case: use max smoothing (= instant rate)
with closing(StringIO()) as our_file2:
with closing(StringIO()) as our_file:
- t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True,
+ t = tqdm(range(3), file=our_file2, smoothing=1, leave=True,
miniters=1, mininterval=0)
cpu_timify(t, timer)
- with tqdm(_range(3), file=our_file, smoothing=1, leave=True,
+ with tqdm(range(3), file=our_file, smoothing=1, leave=True,
miniters=1, mininterval=0) as t2:
cpu_timify(t2, timer)
@@ -1040,11 +1028,11 @@ def test_smoothing():
# 3rd case: use medium smoothing
with closing(StringIO()) as our_file2:
with closing(StringIO()) as our_file:
- t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True,
+ t = tqdm(range(3), file=our_file2, smoothing=0.5, leave=True,
miniters=1, mininterval=0)
cpu_timify(t, timer)
- t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True,
+ t2 = tqdm(range(3), file=our_file, smoothing=0.5, leave=True,
miniters=1, mininterval=0)
cpu_timify(t2, timer)
@@ -1098,7 +1086,7 @@ def test_bar_format():
with closing(StringIO()) as our_file:
bar_format = r'hello world'
with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t:
- assert isinstance(t.bar_format, _unicode)
+ assert isinstance(t.bar_format, str)
def test_custom_format():
@@ -1127,7 +1115,7 @@ def test_eta(capsys):
bar_format='{l_bar}{eta:%Y-%m-%d}'):
pass
_, err = capsys.readouterr()
- assert "\r100%|{eta:%Y-%m-%d}\n".format(eta=dt.now()) in err
+ assert f"\r100%|{dt.now():%Y-%m-%d}\n" in err
def test_unpause():
@@ -1257,7 +1245,7 @@ def test_position():
t1 = tqdm(desc='pos0 bar', position=0, **kwargs)
t2 = tqdm(desc='pos1 bar', position=1, **kwargs)
t3 = tqdm(desc='pos2 bar', position=2, **kwargs)
- for _ in _range(2):
+ for _ in range(2):
t1.update()
t3.update()
t2.update()
@@ -1360,7 +1348,7 @@ def test_deprecated_gui():
# t.close()
# len(tqdm._instances) += 1 # undo the close() decrement
- t = tqdm(_range(3), gui=True, file=our_file, miniters=1, mininterval=0)
+ t = tqdm(range(3), gui=True, file=our_file, miniters=1, mininterval=0)
try:
for _ in t:
pass
@@ -1735,7 +1723,7 @@ def test_external_write():
def test_unit_scale():
"""Test numeric `unit_scale`"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(9), unit_scale=9, file=our_file,
+ for _ in tqdm(range(9), unit_scale=9, file=our_file,
miniters=1, mininterval=0):
pass
out = our_file.getvalue()
@@ -1937,7 +1925,7 @@ def test_screen_shape():
def test_initial():
"""Test `initial`"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(9), initial=10, total=19, file=our_file,
+ for _ in tqdm(range(9), initial=10, total=19, file=our_file,
miniters=1, mininterval=0):
pass
out = our_file.getvalue()
@@ -1948,7 +1936,7 @@ def test_initial():
def test_colour():
"""Test `colour`"""
with closing(StringIO()) as our_file:
- for _ in tqdm(_range(9), file=our_file, colour="#beefed"):
+ for _ in tqdm(range(9), file=our_file, colour="#beefed"):
pass
out = our_file.getvalue()
assert '\x1b[38;2;%d;%d;%dm' % (0xbe, 0xef, 0xed) in out
@@ -1961,7 +1949,7 @@ def test_colour():
assert "Unknown colour" in str(w[-1].message)
with closing(StringIO()) as our_file2:
- for _ in tqdm(_range(9), file=our_file2, colour="blue"):
+ for _ in tqdm(range(9), file=our_file2, colour="blue"):
pass
out = our_file2.getvalue()
assert '\x1b[34m' in out
@@ -1977,7 +1965,7 @@ def test_closed():
def test_reversed(capsys):
"""Test reversed()"""
- for _ in reversed(tqdm(_range(9))):
+ for _ in reversed(tqdm(range(9))):
pass
out, err = capsys.readouterr()
assert not out
@@ -1989,7 +1977,7 @@ def test_contains(capsys):
"""Test __contains__ doesn't iterate"""
with tqdm(list(range(9))) as t:
assert 9 not in t
- assert all(i in t for i in _range(9))
+ assert all(i in t for i in range(9))
out, err = capsys.readouterr()
assert not out
assert ' 0%' in err
diff --git a/tests/tests_utils.py b/tests/tests_utils.py
new file mode 100644
index 0000000..6cf1e6c
--- /dev/null
+++ b/tests/tests_utils.py
@@ -0,0 +1,51 @@
+from ast import literal_eval
+from collections import defaultdict
+from typing import Union # py<3.10
+
+from tqdm.utils import envwrap
+
+
+def test_envwrap(monkeypatch):
+ """Test @envwrap (basic)"""
+ monkeypatch.setenv('FUNC_A', "42")
+ monkeypatch.setenv('FUNC_TyPe_HiNt', "1337")
+ monkeypatch.setenv('FUNC_Unused', "x")
+
+ @envwrap("FUNC_")
+ def func(a=1, b=2, type_hint: int = None):
+ return a, b, type_hint
+
+ assert (42, 2, 1337) == func()
+ assert (99, 2, 1337) == func(a=99)
+
+
+def test_envwrap_types(monkeypatch):
+ """Test @envwrap(types)"""
+ monkeypatch.setenv('FUNC_notype', "3.14159")
+
+ @envwrap("FUNC_", types=defaultdict(lambda: literal_eval))
+ def func(notype=None):
+ return notype
+
+ assert 3.14159 == func()
+
+ monkeypatch.setenv('FUNC_number', "1")
+ monkeypatch.setenv('FUNC_string', "1")
+
+ @envwrap("FUNC_", types={'number': int})
+ def nofallback(number=None, string=None):
+ return number, string
+
+ assert 1, "1" == nofallback()
+
+
+def test_envwrap_annotations(monkeypatch):
+ """Test @envwrap with typehints"""
+ monkeypatch.setenv('FUNC_number', "1.1")
+ monkeypatch.setenv('FUNC_string', "1.1")
+
+ @envwrap("FUNC_")
+ def annotated(number: Union[int, float] = None, string: int = None):
+ return number, string
+
+ assert 1.1, "1.1" == annotated()