diff options
Diffstat (limited to 'tests/tests_tqdm.py')
-rw-r--r-- | tests/tests_tqdm.py | 1996 |
1 files changed, 1996 insertions, 0 deletions
diff --git a/tests/tests_tqdm.py b/tests/tests_tqdm.py new file mode 100644 index 0000000..bba457a --- /dev/null +++ b/tests/tests_tqdm.py @@ -0,0 +1,1996 @@ +# -*- coding: utf-8 -*- +# Advice: use repr(our_file.read()) to print the full output of tqdm +# (else '\r' will replace the previous lines and you'll see only the latest. +from __future__ import print_function + +import csv +import os +import re +import sys +from contextlib import contextmanager +from functools import wraps +from warnings import catch_warnings, simplefilter + +from pytest import importorskip, mark, raises, skip + +from tqdm import TqdmDeprecationWarning, TqdmWarning, tqdm, trange +from tqdm.contrib import DummyTqdmFile +from tqdm.std import EMA, Bar + +try: + from StringIO import StringIO +except ImportError: + from io import StringIO + +from io import IOBase # to support unicode strings +from io import BytesIO + + +class DeprecationError(Exception): + pass + + +# Ensure we can use `with closing(...) as ... :` syntax +if getattr(StringIO, '__exit__', False) and getattr(StringIO, '__enter__', False): + def closing(arg): + return arg +else: + from contextlib import closing + +try: + _range = xrange +except NameError: + _range = range + +try: + _unicode = unicode +except NameError: + _unicode = str + +nt_and_no_colorama = False +if os.name == 'nt': + try: + import colorama # NOQA + except ImportError: + nt_and_no_colorama = True + +# Regex definitions +# List of control characters +CTRLCHR = [r'\r', r'\n', r'\x1b\[A'] # Need to escape [ for regex +# Regular expressions compilation +RE_rate = re.compile(r'[^\d](\d[.\d]+)it/s') +RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars +RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars +RE_pos = re.compile(r'([\r\n]+((pos\d+) bar:\s+\d+%|\s{3,6})?[^\r\n]*)') + + +def pos_line_diff(res_list, expected_list, raise_nonempty=True): + """ + Return differences between two bar output lists. + To be used with `RE_pos` + """ + res = [(r, e) for r, e in zip(res_list, expected_list) + for pos in [len(e) - len(e.lstrip('\n'))] # bar position + if r != e # simple comparison + if not r.startswith(e) # start matches + or not ( + # move up at end (maybe less due to closing bars) + any(r.endswith(end + i * '\x1b[A') for i in range(pos + 1) + for end in [ + ']', # bar + ' ']) # cleared + or '100%' in r # completed bar + or r == '\n') # final bar + or r[(-1 - pos) * len('\x1b[A'):] == '\x1b[A'] # too many moves up + if raise_nonempty and (res or len(res_list) != len(expected_list)): + if len(res_list) < len(expected_list): + res.extend([(None, e) for e in expected_list[len(res_list):]]) + elif len(res_list) > len(expected_list): + res.extend([(r, None) for r in res_list[len(expected_list):]]) + raise AssertionError( + "Got => Expected\n" + '\n'.join('%r => %r' % i for i in res)) + return res + + +class DiscreteTimer(object): + """Virtual discrete time manager, to precisely control time for tests""" + def __init__(self): + self.t = 0.0 + + def sleep(self, t): + """Sleep = increment the time counter (almost no CPU used)""" + self.t += t + + def time(self): + """Get the current time""" + return self.t + + +def cpu_timify(t, timer=None): + """Force tqdm to use the specified timer instead of system-wide time()""" + if timer is None: + timer = DiscreteTimer() + t._time = timer.time + t._sleep = timer.sleep + t.start_t = t.last_print_t = t._time() + return timer + + +class UnicodeIO(IOBase): + """Unicode version of StringIO""" + def __init__(self, *args, **kwargs): + super(UnicodeIO, self).__init__(*args, **kwargs) + self.encoding = 'U8' # io.StringIO supports unicode, but no encoding + self.text = '' + self.cursor = 0 + + def __len__(self): + return len(self.text) + + def seek(self, offset): + self.cursor = offset + + def tell(self): + return self.cursor + + def write(self, s): + self.text = self.text[:self.cursor] + s + self.text[self.cursor + len(s):] + self.cursor += len(s) + + def read(self, n=-1): + _cur = self.cursor + self.cursor = len(self) if n < 0 else min(_cur + n, len(self)) + return self.text[_cur:self.cursor] + + def getvalue(self): + return self.text + + +def get_bar(all_bars, i=None): + """Get a specific update from a whole bar traceback""" + # Split according to any used control characters + bars_split = RE_ctrlchr_excl.split(all_bars) + bars_split = list(filter(None, bars_split)) # filter out empty splits + return bars_split if i is None else bars_split[i] + + +def progressbar_rate(bar_str): + return float(RE_rate.search(bar_str).group(1)) + + +def squash_ctrlchars(s): + """Apply control characters in a string just like a terminal display""" + curline = 0 + lines = [''] # state of fake terminal + for nextctrl in filter(None, RE_ctrlchr.split(s)): + # apply control chars + if nextctrl == '\r': + # go to line beginning (simplified here: just empty the string) + lines[curline] = '' + elif nextctrl == '\n': + if curline >= len(lines) - 1: + # wrap-around creates newline + lines.append('') + # move cursor down + curline += 1 + elif nextctrl == '\x1b[A': + # move cursor up + if curline > 0: + curline -= 1 + else: + raise ValueError("Cannot go further up") + else: + # print message on current line + lines[curline] += nextctrl + return lines + + +def test_format_interval(): + """Test time interval format""" + format_interval = tqdm.format_interval + + assert format_interval(60) == '01:00' + assert format_interval(6160) == '1:42:40' + assert format_interval(238113) == '66:08:33' + + +def test_format_num(): + """Test number format""" + format_num = tqdm.format_num + + assert float(format_num(1337)) == 1337 + assert format_num(int(1e6)) == '1e+6' + assert format_num(1239876) == '1' '239' '876' + + +def test_format_meter(): + """Test statistics and progress bar formatting""" + try: + unich = unichr + except NameError: + unich = chr + + format_meter = tqdm.format_meter + + assert format_meter(0, 1000, 13) == " 0%| | 0/1000 [00:13<?, ?it/s]" + # If not implementing any changes to _tqdm.py, set prefix='desc' + # or else ": : " will be in output, so assertion should change + assert format_meter(0, 1000, 13, ncols=68, prefix='desc: ') == ( + "desc: 0%| | 0/1000 [00:13<?, ?it/s]") + assert format_meter(231, 1000, 392) == (" 23%|" + unich(0x2588) * 2 + unich(0x258e) + + " | 231/1000 [06:32<21:44, 1.70s/it]") + assert format_meter(10000, 1000, 13) == "10000it [00:13, 769.23it/s]" + assert format_meter(231, 1000, 392, ncols=56, ascii=True) == " 23%|" + '#' * 3 + '6' + ( + " | 231/1000 [06:32<21:44, 1.70s/it]") + assert format_meter(100000, 1000, 13, unit_scale=True, + unit='iB') == "100kiB [00:13, 7.69kiB/s]" + assert format_meter(100, 1000, 12, ncols=0, + rate=7.33) == " 10% 100/1000 [00:12<02:02, 7.33it/s]" + # ncols is small, l_bar is too large + # l_bar gets chopped + # no bar + # no r_bar + # 10/12 stars since ncols is 10 + assert format_meter( + 0, 1000, 13, ncols=10, + bar_format="************{bar:10}$$$$$$$$$$") == "**********" + # n_cols allows for l_bar and some of bar + # l_bar displays + # bar gets chopped + # no r_bar + # all 12 stars and 8/10 bar parts + assert format_meter( + 0, 1000, 13, ncols=20, + bar_format="************{bar:10}$$$$$$$$$$") == "************ " + # n_cols allows for l_bar, bar, and some of r_bar + # l_bar displays + # bar displays + # r_bar gets chopped + # all 12 stars and 10 bar parts, but only 8/10 dollar signs + assert format_meter( + 0, 1000, 13, ncols=30, + bar_format="************{bar:10}$$$$$$$$$$") == "************ $$$$$$$$" + # trim left ANSI; escape is before trim zone + # we only know it has ANSI codes, so we append an END code anyway + assert format_meter( + 0, 1000, 13, ncols=10, bar_format="*****\033[22m****\033[0m***{bar:10}$$$$$$$$$$" + ) == "*****\033[22m****\033[0m*\033[0m" + # trim left ANSI; escape is at trim zone + assert format_meter( + 0, 1000, 13, ncols=10, + bar_format="*****\033[22m*****\033[0m**{bar:10}$$$$$$$$$$") == "*****\033[22m*****\033[0m" + # trim left ANSI; escape is after trim zone + assert format_meter( + 0, 1000, 13, ncols=10, + bar_format="*****\033[22m******\033[0m*{bar:10}$$$$$$$$$$") == "*****\033[22m*****\033[0m" + # Check that bar_format correctly adapts {bar} size to the rest + assert format_meter( + 20, 100, 12, ncols=13, rate=8.1, + bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == " 20%|" + unich(0x258f) + "|20/100" + assert format_meter( + 20, 100, 12, ncols=14, rate=8.1, + bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == " 20%|" + unich(0x258d) + " |20/100" + # Check wide characters + if sys.version_info >= (3,): + assert format_meter(0, 1000, 13, ncols=68, prefix='fullwidth: ') == ( + "fullwidth: 0%| | 0/1000 [00:13<?, ?it/s]") + assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == ( + "ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]") + # Check that bar_format can print only {bar} or just one side + assert format_meter(20, 100, 12, ncols=2, rate=8.1, + bar_format=r'{bar}') == unich(0x258d) + " " + assert format_meter(20, 100, 12, ncols=7, rate=8.1, + bar_format=r'{l_bar}{bar}') == " 20%|" + unich(0x258d) + " " + assert format_meter(20, 100, 12, ncols=6, rate=8.1, + bar_format=r'{bar}|test') == unich(0x258f) + "|test" + + +def test_ansi_escape_codes(): + """Test stripping of ANSI escape codes""" + ansi = {'BOLD': '\033[1m', 'RED': '\033[91m', 'END': '\033[0m'} + desc_raw = '{BOLD}{RED}Colored{END} description' + ncols = 123 + + desc_stripped = desc_raw.format(BOLD='', RED='', END='') + meter = tqdm.format_meter(0, 100, 0, ncols=ncols, prefix=desc_stripped) + assert len(meter) == ncols + + desc = desc_raw.format(**ansi) + meter = tqdm.format_meter(0, 100, 0, ncols=ncols, prefix=desc) + # `format_meter` inserts an extra END for safety + ansi_len = len(desc) - len(desc_stripped) + len(ansi['END']) + assert len(meter) == ncols + ansi_len + + +def test_si_format(): + """Test SI unit prefixes""" + format_meter = tqdm.format_meter + + assert '9.00 ' in format_meter(1, 9, 1, unit_scale=True, unit='B') + assert '99.0 ' in format_meter(1, 99, 1, unit_scale=True) + assert '999 ' in format_meter(1, 999, 1, unit_scale=True) + assert '9.99k ' in format_meter(1, 9994, 1, unit_scale=True) + assert '10.0k ' in format_meter(1, 9999, 1, unit_scale=True) + assert '99.5k ' in format_meter(1, 99499, 1, unit_scale=True) + assert '100k ' in format_meter(1, 99999, 1, unit_scale=True) + assert '1.00M ' in format_meter(1, 999999, 1, unit_scale=True) + assert '1.00G ' in format_meter(1, 999999999, 1, unit_scale=True) + assert '1.00T ' in format_meter(1, 999999999999, 1, unit_scale=True) + assert '1.00P ' in format_meter(1, 999999999999999, 1, unit_scale=True) + assert '1.00E ' in format_meter(1, 999999999999999999, 1, unit_scale=True) + assert '1.00Z ' in format_meter(1, 999999999999999999999, 1, unit_scale=True) + assert '1.0Y ' in format_meter(1, 999999999999999999999999, 1, unit_scale=True) + assert '10.0Y ' in format_meter(1, 9999999999999999999999999, 1, unit_scale=True) + assert '100.0Y ' in format_meter(1, 99999999999999999999999999, 1, unit_scale=True) + assert '1000.0Y ' in format_meter(1, 999999999999999999999999999, 1, + unit_scale=True) + + +def test_bar_formatspec(): + """Test Bar.__format__ spec""" + assert "{0:5a}".format(Bar(0.3)) == "#5 " + assert "{0:2}".format(Bar(0.5, charset=" .oO0")) == "0 " + assert "{0:2a}".format(Bar(0.5, charset=" .oO0")) == "# " + assert "{0:-6a}".format(Bar(0.5, 10)) == '## ' + assert "{0:2b}".format(Bar(0.5, 10)) == ' ' + + +def test_all_defaults(): + """Test default kwargs""" + with closing(UnicodeIO()) as our_file: + with tqdm(range(10), file=our_file) as progressbar: + assert len(progressbar) == 10 + for _ in progressbar: + pass + # restore stdout/stderr output for `nosetest` interface + # try: + # sys.stderr.write('\x1b[A') + # except: + # pass + sys.stderr.write('\rTest default kwargs ... ') + + +class WriteTypeChecker(BytesIO): + """File-like to assert the expected type is written""" + def __init__(self, expected_type): + super(WriteTypeChecker, self).__init__() + self.expected_type = expected_type + + def write(self, s): + assert isinstance(s, self.expected_type) + + +def test_native_string_io_for_default_file(): + """Native strings written to unspecified files""" + stderr = sys.stderr + try: + sys.stderr = WriteTypeChecker(expected_type=type('')) + for _ in tqdm(range(3)): + pass + sys.stderr.encoding = None # py2 behaviour + for _ in tqdm(range(3)): + pass + finally: + sys.stderr = stderr + + +def test_unicode_string_io_for_specified_file(): + """Unicode strings written to specified files""" + for _ in tqdm(range(3), file=WriteTypeChecker(expected_type=type(u''))): + pass + + +def test_write_bytes(): + """Test write_bytes argument with and without `file`""" + # specified file (and bytes) + for _ in tqdm(range(3), file=WriteTypeChecker(expected_type=type(b'')), + write_bytes=True): + pass + # unspecified file (and unicode) + stderr = sys.stderr + try: + sys.stderr = WriteTypeChecker(expected_type=type(u'')) + for _ in tqdm(range(3), write_bytes=False): + pass + finally: + sys.stderr = stderr + + +def test_iterate_over_csv_rows(): + """Test csv iterator""" + # Create a test csv pseudo file + with closing(StringIO()) as test_csv_file: + writer = csv.writer(test_csv_file) + for _ in _range(3): + writer.writerow(['test'] * 3) + test_csv_file.seek(0) + + # Test that nothing fails if we iterate over rows + reader = csv.DictReader(test_csv_file, fieldnames=('row1', 'row2', 'row3')) + with closing(StringIO()) as our_file: + for _ in tqdm(reader, file=our_file): + pass + + +def test_file_output(): + """Test output to arbitrary file-like objects""" + with closing(StringIO()) as our_file: + for i in tqdm(_range(3), file=our_file): + if i == 1: + our_file.seek(0) + assert '0/3' in our_file.read() + + +def test_leave_option(): + """Test `leave=True` always prints info about the last iteration""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, leave=True): + pass + res = our_file.getvalue() + assert '| 3/3 ' in res + assert '\n' == res[-1] # not '\r' + + with closing(StringIO()) as our_file2: + for _ in tqdm(_range(3), file=our_file2, leave=False): + pass + assert '| 3/3 ' not in our_file2.getvalue() + + +def test_trange(): + """Test trange""" + with closing(StringIO()) as our_file: + for _ in trange(3, file=our_file, leave=True): + pass + assert '| 3/3 ' in our_file.getvalue() + + with closing(StringIO()) as our_file2: + for _ in trange(3, file=our_file2, leave=False): + pass + assert '| 3/3 ' not in our_file2.getvalue() + + +def test_min_interval(): + """Test mininterval""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, mininterval=1e-10): + pass + assert " 0%| | 0/3 [00:00<" in our_file.getvalue() + + +def test_max_interval(): + """Test maxinterval""" + total = 100 + bigstep = 10 + smallstep = 5 + + # Test without maxinterval + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + with closing(StringIO()) as our_file2: + # with maxinterval but higher than loop sleep time + t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, + smoothing=1, maxinterval=1e-2) + cpu_timify(t, timer) + + # without maxinterval + t2 = tqdm(total=total, file=our_file2, miniters=None, mininterval=0, + smoothing=1, maxinterval=None) + cpu_timify(t2, timer) + + assert t.dynamic_miniters + assert t2.dynamic_miniters + + # Increase 10 iterations at once + t.update(bigstep) + t2.update(bigstep) + # The next iterations should not trigger maxinterval (step 10) + for _ in _range(4): + t.update(smallstep) + t2.update(smallstep) + timer.sleep(1e-5) + t.close() # because PyPy doesn't gc immediately + t2.close() # as above + + assert "25%" not in our_file2.getvalue() + assert "25%" not in our_file.getvalue() + + # Test with maxinterval effect + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + with tqdm(total=total, file=our_file, miniters=None, mininterval=0, + smoothing=1, maxinterval=1e-4) as t: + cpu_timify(t, timer) + + # Increase 10 iterations at once + t.update(bigstep) + # The next iterations should trigger maxinterval (step 5) + for _ in _range(4): + t.update(smallstep) + timer.sleep(1e-2) + + assert "25%" in our_file.getvalue() + + # Test iteration based tqdm with maxinterval effect + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + with tqdm(_range(total), file=our_file, miniters=None, + mininterval=1e-5, smoothing=1, maxinterval=1e-4) as t2: + cpu_timify(t2, timer) + + for i in t2: + if i >= (bigstep - 1) and ((i - (bigstep - 1)) % smallstep) == 0: + timer.sleep(1e-2) + if i >= 3 * bigstep: + break + + assert "15%" in our_file.getvalue() + + # Test different behavior with and without mininterval + timer = DiscreteTimer() + total = 1000 + mininterval = 0.1 + maxinterval = 10 + with closing(StringIO()) as our_file: + with tqdm(total=total, file=our_file, miniters=None, smoothing=1, + mininterval=mininterval, maxinterval=maxinterval) as tm1: + with tqdm(total=total, file=our_file, miniters=None, smoothing=1, + mininterval=0, maxinterval=maxinterval) as tm2: + + cpu_timify(tm1, timer) + cpu_timify(tm2, timer) + + # Fast iterations, check if dynamic_miniters triggers + timer.sleep(mininterval) # to force update for t1 + tm1.update(total / 2) + tm2.update(total / 2) + assert int(tm1.miniters) == tm2.miniters == total / 2 + + # Slow iterations, check different miniters if mininterval + timer.sleep(maxinterval * 2) + tm1.update(total / 2) + tm2.update(total / 2) + res = [tm1.miniters, tm2.miniters] + assert res == [(total / 2) * mininterval / (maxinterval * 2), + (total / 2) * maxinterval / (maxinterval * 2)] + + # Same with iterable based tqdm + timer1 = DiscreteTimer() # need 2 timers for each bar because zip not work + timer2 = DiscreteTimer() + total = 100 + mininterval = 0.1 + maxinterval = 10 + with closing(StringIO()) as our_file: + t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, + mininterval=mininterval, maxinterval=maxinterval) + t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, + mininterval=0, maxinterval=maxinterval) + + cpu_timify(t1, timer1) + cpu_timify(t2, timer2) + + for i in t1: + if i == ((total / 2) - 2): + timer1.sleep(mininterval) + if i == (total - 1): + timer1.sleep(maxinterval * 2) + + for i in t2: + if i == ((total / 2) - 2): + timer2.sleep(mininterval) + if i == (total - 1): + timer2.sleep(maxinterval * 2) + + assert t1.miniters == 0.255 + assert t2.miniters == 0.5 + + t1.close() + t2.close() + + +def test_delay(): + """Test delay""" + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + t = tqdm(total=2, file=our_file, leave=True, delay=3) + cpu_timify(t, timer) + timer.sleep(2) + t.update(1) + assert not our_file.getvalue() + timer.sleep(2) + t.update(1) + assert our_file.getvalue() + t.close() + + +def test_min_iters(): + """Test miniters""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=2): + pass + + out = our_file.getvalue() + assert '| 0/3 ' in out + assert '| 1/3 ' not in out + assert '| 2/3 ' in out + assert '| 3/3 ' in out + + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=1): + pass + + out = our_file.getvalue() + assert '| 0/3 ' in out + assert '| 1/3 ' in out + assert '| 2/3 ' in out + assert '| 3/3 ' in out + + +def test_dynamic_min_iters(): + """Test purely dynamic miniters (and manual updates and __del__)""" + with closing(StringIO()) as our_file: + total = 10 + t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, smoothing=1) + + t.update() + # Increase 3 iterations + t.update(3) + # The next two iterations should be skipped because of dynamic_miniters + t.update() + t.update() + # The third iteration should be displayed + t.update() + + out = our_file.getvalue() + assert t.dynamic_miniters + t.__del__() # simulate immediate del gc + + assert ' 0%| | 0/10 [00:00<' in out + assert '40%' in out + assert '50%' not in out + assert '60%' not in out + assert '70%' in out + + # Check with smoothing=0, miniters should be set to max update seen so far + with closing(StringIO()) as our_file: + total = 10 + t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, smoothing=0) + + t.update() + t.update(2) + t.update(5) # this should be stored as miniters + t.update(1) + + out = our_file.getvalue() + assert all(i in out for i in ("0/10", "1/10", "3/10")) + assert "2/10" not in out + assert t.dynamic_miniters and not t.smoothing + assert t.miniters == 5 + t.close() + + # Check iterable based tqdm + with closing(StringIO()) as our_file: + t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, + smoothing=0.5) + for _ in t: + pass + assert t.dynamic_miniters + + # No smoothing + with closing(StringIO()) as our_file: + t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, + smoothing=0) + for _ in t: + pass + assert t.dynamic_miniters + + # No dynamic_miniters (miniters is fixed manually) + with closing(StringIO()) as our_file: + t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None) + for _ in t: + pass + assert not t.dynamic_miniters + + +def test_big_min_interval(): + """Test large mininterval""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(2), file=our_file, mininterval=1E10): + pass + assert '50%' not in our_file.getvalue() + + with closing(StringIO()) as our_file: + with tqdm(_range(2), file=our_file, mininterval=1E10) as t: + t.update() + t.update() + assert '50%' not in our_file.getvalue() + + +def test_smoothed_dynamic_min_iters(): + """Test smoothed dynamic miniters""" + timer = DiscreteTimer() + + with closing(StringIO()) as our_file: + with tqdm(total=100, file=our_file, miniters=None, mininterval=1, + smoothing=0.5, maxinterval=0) as t: + cpu_timify(t, timer) + + # Increase 10 iterations at once + timer.sleep(1) + t.update(10) + # The next iterations should be partially skipped + for _ in _range(2): + timer.sleep(1) + t.update(4) + for _ in _range(20): + timer.sleep(1) + t.update() + + assert t.dynamic_miniters + out = our_file.getvalue() + assert ' 0%| | 0/100 [00:00<' in out + assert '20%' in out + assert '23%' not in out + assert '25%' in out + assert '26%' not in out + assert '28%' in out + + +def test_smoothed_dynamic_min_iters_with_min_interval(): + """Test smoothed dynamic miniters with mininterval""" + timer = DiscreteTimer() + + # In this test, `miniters` should gradually decline + total = 100 + + with closing(StringIO()) as our_file: + # Test manual updating tqdm + with tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, + smoothing=1, maxinterval=0) as t: + cpu_timify(t, timer) + + t.update(10) + timer.sleep(1e-2) + for _ in _range(4): + t.update() + timer.sleep(1e-2) + out = our_file.getvalue() + assert t.dynamic_miniters + + with closing(StringIO()) as our_file: + # Test iteration-based tqdm + with tqdm(_range(total), file=our_file, miniters=None, + mininterval=0.01, smoothing=1, maxinterval=0) as t2: + cpu_timify(t2, timer) + + for i in t2: + if i >= 10: + timer.sleep(0.1) + if i >= 14: + break + out2 = our_file.getvalue() + + assert t.dynamic_miniters + assert ' 0%| | 0/100 [00:00<' in out + assert '11%' in out and '11%' in out2 + # assert '12%' not in out and '12%' in out2 + assert '13%' in out and '13%' in out2 + assert '14%' in out and '14%' in out2 + + +@mark.slow +def test_rlock_creation(): + """Test that importing tqdm does not create multiprocessing objects.""" + mp = importorskip('multiprocessing') + if not hasattr(mp, 'get_context'): + skip("missing multiprocessing.get_context") + + # Use 'spawn' instead of 'fork' so that the process does not inherit any + # globals that have been constructed by running other tests + ctx = mp.get_context('spawn') + with ctx.Pool(1) as pool: + # The pool will propagate the error if the target method fails + pool.apply(_rlock_creation_target) + + +def _rlock_creation_target(): + """Check that the RLock has not been constructed.""" + import multiprocessing as mp + patch = importorskip('unittest.mock').patch + + # Patch the RLock class/method but use the original implementation + with patch('multiprocessing.RLock', wraps=mp.RLock) as rlock_mock: + # Importing the module should not create a lock + from tqdm import tqdm + assert rlock_mock.call_count == 0 + # Creating a progress bar should initialize the lock + with closing(StringIO()) as our_file: + with tqdm(file=our_file) as _: # NOQA + pass + assert rlock_mock.call_count == 1 + # Creating a progress bar again should reuse the lock + with closing(StringIO()) as our_file: + with tqdm(file=our_file) as _: # NOQA + pass + assert rlock_mock.call_count == 1 + + +def test_disable(): + """Test disable""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, disable=True): + pass + assert our_file.getvalue() == '' + + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=1, disable=True) + progressbar.update(3) + progressbar.close() + assert our_file.getvalue() == '' + + +def test_infinite_total(): + """Test treatment of infinite total""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, total=float("inf")): + pass + + +def test_nototal(): + """Test unknown total length""" + with closing(StringIO()) as our_file: + for _ in tqdm(iter(range(10)), file=our_file, unit_scale=10): + pass + assert "100it" in our_file.getvalue() + + with closing(StringIO()) as our_file: + for _ in tqdm(iter(range(10)), file=our_file, + bar_format="{l_bar}{bar}{r_bar}"): + pass + assert "10/?" in our_file.getvalue() + + +def test_unit(): + """Test SI unit prefix""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"): + pass + assert 'bytes/s' in our_file.getvalue() + + +def test_ascii(): + """Test ascii/unicode bar""" + # Test ascii autodetection + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, ascii=None) as t: + assert t.ascii # TODO: this may fail in the future + + # Test ascii bar + with closing(StringIO()) as our_file: + for _ in tqdm(_range(3), total=15, file=our_file, miniters=1, + mininterval=0, ascii=True): + pass + res = our_file.getvalue().strip("\r").split("\r") + assert '7%|6' in res[1] + assert '13%|#3' in res[2] + assert '20%|##' in res[3] + + # Test unicode bar + with closing(UnicodeIO()) as our_file: + with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t: + for _ in _range(3): + t.update() + res = our_file.getvalue().strip("\r").split("\r") + assert u"7%|\u258b" in res[1] + assert u"13%|\u2588\u258e" in res[2] + assert u"20%|\u2588\u2588" in res[3] + + # Test custom bar + for bars in [" .oO0", " #"]: + with closing(StringIO()) as our_file: + for _ in tqdm(_range(len(bars) - 1), file=our_file, miniters=1, + mininterval=0, ascii=bars, ncols=27): + pass + res = our_file.getvalue().strip("\r").split("\r") + for b, line in zip(bars, res): + assert '|' + b + '|' in line + + +def test_update(): + """Test manual creation and updates""" + res = None + with closing(StringIO()) as our_file: + with tqdm(total=2, file=our_file, miniters=1, mininterval=0) as progressbar: + assert len(progressbar) == 2 + progressbar.update(2) + assert '| 2/2' in our_file.getvalue() + progressbar.desc = 'dynamically notify of 4 increments in total' + progressbar.total = 4 + progressbar.update(-1) + progressbar.update(2) + res = our_file.getvalue() + assert '| 3/4 ' in res + assert 'dynamically notify of 4 increments in total' in res + + +def test_close(): + """Test manual creation and closure and n_instances""" + + # With `leave` option + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=10) + progressbar.update(3) + assert '| 3/3 ' not in our_file.getvalue() # Should be blank + assert len(tqdm._instances) == 1 + progressbar.close() + assert len(tqdm._instances) == 0 + assert '| 3/3 ' in our_file.getvalue() + + # Without `leave` option + with closing(StringIO()) as our_file: + progressbar = tqdm(total=3, file=our_file, miniters=10, leave=False) + progressbar.update(3) + progressbar.close() + assert '| 3/3 ' not in our_file.getvalue() # Should be blank + + # With all updates + with closing(StringIO()) as our_file: + assert len(tqdm._instances) == 0 + with tqdm(total=3, file=our_file, miniters=0, mininterval=0, + leave=True) as progressbar: + assert len(tqdm._instances) == 1 + progressbar.update(3) + res = our_file.getvalue() + assert '| 3/3 ' in res # Should be blank + assert '\n' not in res + # close() called + assert len(tqdm._instances) == 0 + + exres = res.rsplit(', ', 1)[0] + res = our_file.getvalue() + assert res[-1] == '\n' + if not res.startswith(exres): + raise AssertionError("\n<<< Expected:\n{0}\n>>> Got:\n{1}\n===".format( + exres + ', ...it/s]\n', our_file.getvalue())) + + # Closing after the output stream has closed + with closing(StringIO()) as our_file: + t = tqdm(total=2, file=our_file) + t.update() + t.update() + t.close() + + +def test_ema(): + """Test exponential weighted average""" + ema = EMA(0.01) + assert round(ema(10), 2) == 10 + assert round(ema(1), 2) == 5.48 + assert round(ema(), 2) == 5.48 + assert round(ema(1), 2) == 3.97 + assert round(ema(1), 2) == 3.22 + + +def test_smoothing(): + """Test exponential weighted average smoothing""" + timer = DiscreteTimer() + + # -- Test disabling smoothing + with closing(StringIO()) as our_file: + with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t: + cpu_timify(t, timer) + + for _ in t: + pass + assert '| 3/3 ' in our_file.getvalue() + + # -- Test smoothing + # 1st case: no smoothing (only use average) + with closing(StringIO()) as our_file2: + with closing(StringIO()) as our_file: + t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True, + miniters=1, mininterval=0) + cpu_timify(t, timer) + + with tqdm(_range(3), file=our_file, smoothing=None, leave=True, + miniters=1, mininterval=0) as t2: + cpu_timify(t2, timer) + + for i in t2: + # Sleep more for first iteration and + # see how quickly rate is updated + if i == 0: + timer.sleep(0.01) + else: + # Need to sleep in all iterations + # to calculate smoothed rate + # (else delta_t is 0!) + timer.sleep(0.001) + t.update() + n_old = len(tqdm._instances) + t.close() + assert len(tqdm._instances) == n_old - 1 + # Get result for iter-based bar + a = progressbar_rate(get_bar(our_file.getvalue(), 3)) + # Get result for manually updated bar + a2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) + + # 2nd case: use max smoothing (= instant rate) + with closing(StringIO()) as our_file2: + with closing(StringIO()) as our_file: + t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True, + miniters=1, mininterval=0) + cpu_timify(t, timer) + + with tqdm(_range(3), file=our_file, smoothing=1, leave=True, + miniters=1, mininterval=0) as t2: + cpu_timify(t2, timer) + + for i in t2: + if i == 0: + timer.sleep(0.01) + else: + timer.sleep(0.001) + t.update() + t.close() + # Get result for iter-based bar + b = progressbar_rate(get_bar(our_file.getvalue(), 3)) + # Get result for manually updated bar + b2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) + + # 3rd case: use medium smoothing + with closing(StringIO()) as our_file2: + with closing(StringIO()) as our_file: + t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True, + miniters=1, mininterval=0) + cpu_timify(t, timer) + + t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True, + miniters=1, mininterval=0) + cpu_timify(t2, timer) + + for i in t2: + if i == 0: + timer.sleep(0.01) + else: + timer.sleep(0.001) + t.update() + t2.close() + t.close() + # Get result for iter-based bar + c = progressbar_rate(get_bar(our_file.getvalue(), 3)) + # Get result for manually updated bar + c2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) + + # Check that medium smoothing's rate is between no and max smoothing rates + assert a <= c <= b + assert a2 <= c2 <= b2 + + +@mark.skipif(nt_and_no_colorama, reason="Windows without colorama") +def test_deprecated_nested(): + """Test nested progress bars""" + # TODO: test degradation on windows without colorama? + + # Artificially test nested loop printing + # Without leave + our_file = StringIO() + try: + tqdm(total=2, file=our_file, nested=True) + except TqdmDeprecationWarning: + if """`nested` is deprecated and automated. +Use `position` instead for manual control.""" not in our_file.getvalue(): + raise + else: + raise DeprecationError("Should not allow nested kwarg") + + +def test_bar_format(): + """Test custom bar formatting""" + with closing(StringIO()) as our_file: + bar_format = ('{l_bar}{bar}|{n_fmt}/{total_fmt}-{n}/{total}' + '{percentage}{rate}{rate_fmt}{elapsed}{remaining}') + for _ in trange(2, file=our_file, leave=True, bar_format=bar_format): + pass + out = our_file.getvalue() + assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out + + # Test unicode string auto conversion + with closing(StringIO()) as our_file: + bar_format = r'hello world' + with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t: + assert isinstance(t.bar_format, _unicode) + + +def test_custom_format(): + """Test adding additional derived format arguments""" + class TqdmExtraFormat(tqdm): + """Provides a `total_time` format parameter""" + @property + def format_dict(self): + d = super(TqdmExtraFormat, self).format_dict + total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) + d.update(total_time=self.format_interval(total_time) + " in total") + return d + + with closing(StringIO()) as our_file: + for _ in TqdmExtraFormat( + range(10), file=our_file, + bar_format="{total_time}: {percentage:.0f}%|{bar}{r_bar}"): + pass + assert "00:00 in total" in our_file.getvalue() + + +def test_eta(capsys): + """Test eta bar_format""" + from datetime import datetime as dt + for _ in trange(999, miniters=1, mininterval=0, leave=True, + bar_format='{l_bar}{eta:%Y-%m-%d}'): + pass + _, err = capsys.readouterr() + assert "\r100%|{eta:%Y-%m-%d}\n".format(eta=dt.now()) in err + + +def test_unpause(): + """Test unpause""" + timer = DiscreteTimer() + with closing(StringIO()) as our_file: + t = trange(10, file=our_file, leave=True, mininterval=0) + cpu_timify(t, timer) + timer.sleep(0.01) + t.update() + timer.sleep(0.01) + t.update() + timer.sleep(0.1) # longer wait time + t.unpause() + timer.sleep(0.01) + t.update() + timer.sleep(0.01) + t.update() + t.close() + r_before = progressbar_rate(get_bar(our_file.getvalue(), 2)) + r_after = progressbar_rate(get_bar(our_file.getvalue(), 3)) + assert r_before == r_after + + +def test_disabled_unpause(capsys): + """Test disabled unpause""" + with tqdm(total=10, disable=True) as t: + t.update() + t.unpause() + t.update() + print(t) + out, err = capsys.readouterr() + assert not err + assert out == ' 0%| | 0/10 [00:00<?, ?it/s]\n' + + +def test_reset(): + """Test resetting a bar for re-use""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, + miniters=1, mininterval=0, maxinterval=0) as t: + t.update(9) + t.reset() + t.update() + t.reset(total=12) + t.update(10) + assert '| 1/10' in our_file.getvalue() + assert '| 10/12' in our_file.getvalue() + + +def test_disabled_reset(capsys): + """Test disabled reset""" + with tqdm(total=10, disable=True) as t: + t.update(9) + t.reset() + t.update() + t.reset(total=12) + t.update(10) + print(t) + out, err = capsys.readouterr() + assert not err + assert out == ' 0%| | 0/12 [00:00<?, ?it/s]\n' + + +@mark.skipif(nt_and_no_colorama, reason="Windows without colorama") +def test_position(): + """Test positioned progress bars""" + # Artificially test nested loop printing + # Without leave + our_file = StringIO() + kwargs = {'file': our_file, 'miniters': 1, 'mininterval': 0, 'maxinterval': 0} + t = tqdm(total=2, desc='pos2 bar', leave=False, position=2, **kwargs) + t.update() + t.close() + out = our_file.getvalue() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\r '] + + pos_line_diff(res, exres) + + # Test iteration-based tqdm positioning + our_file = StringIO() + kwargs["file"] = our_file + for _ in trange(2, desc='pos0 bar', position=0, **kwargs): + for _ in trange(2, desc='pos1 bar', position=1, **kwargs): + for _ in trange(2, desc='pos2 bar', position=2, **kwargs): + pass + out = our_file.getvalue() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 50%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 100%', + '\rpos1 bar: 100%', + '\n\rpos0 bar: 50%', + '\n\rpos1 bar: 0%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 50%', + '\n\n\rpos2 bar: 0%', + '\n\n\rpos2 bar: 50%', + '\n\n\rpos2 bar: 100%', + '\rpos2 bar: 100%', + '\n\n\rpos1 bar: 100%', + '\rpos1 bar: 100%', + '\n\rpos0 bar: 100%', + '\rpos0 bar: 100%', + '\n'] + pos_line_diff(res, exres) + + # Test manual tqdm positioning + our_file = StringIO() + kwargs["file"] = our_file + kwargs["total"] = 2 + t1 = tqdm(desc='pos0 bar', position=0, **kwargs) + t2 = tqdm(desc='pos1 bar', position=1, **kwargs) + t3 = tqdm(desc='pos2 bar', position=2, **kwargs) + for _ in _range(2): + t1.update() + t3.update() + t2.update() + out = our_file.getvalue() + res = [m[0] for m in RE_pos.findall(out)] + exres = ['\rpos0 bar: 0%', + '\n\rpos1 bar: 0%', + '\n\n\rpos2 bar: 0%', + '\rpos0 bar: 50%', + '\n\n\rpos2 bar: 50%', + '\n\rpos1 bar: 50%', + '\rpos0 bar: 100%', + '\n\n\rpos2 bar: 100%', + '\n\rpos1 bar: 100%'] + pos_line_diff(res, exres) + t1.close() + t2.close() + t3.close() + + # Test auto repositioning of bars when a bar is prematurely closed + # tqdm._instances.clear() # reset number of instances + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='1.pos0 bar', mininterval=0) + t2 = tqdm(total=10, file=our_file, desc='2.pos1 bar', mininterval=0) + t3 = tqdm(total=10, file=our_file, desc='3.pos2 bar', mininterval=0) + res = [m[0] for m in RE_pos.findall(our_file.getvalue())] + exres = ['\r1.pos0 bar: 0%', + '\n\r2.pos1 bar: 0%', + '\n\n\r3.pos2 bar: 0%'] + pos_line_diff(res, exres) + + t2.close() + t4 = tqdm(total=10, file=our_file, desc='4.pos2 bar', mininterval=0) + t1.update(1) + t3.update(1) + t4.update(1) + res = [m[0] for m in RE_pos.findall(our_file.getvalue())] + exres = ['\r1.pos0 bar: 0%', + '\n\r2.pos1 bar: 0%', + '\n\n\r3.pos2 bar: 0%', + '\r2.pos1 bar: 0%', + '\n\n\r4.pos2 bar: 0%', + '\r1.pos0 bar: 10%', + '\n\n\r3.pos2 bar: 10%', + '\n\r4.pos2 bar: 10%'] + pos_line_diff(res, exres) + t4.close() + t3.close() + t1.close() + + +def test_set_description(): + """Test set description""" + with closing(StringIO()) as our_file: + with tqdm(desc='Hello', file=our_file) as t: + assert t.desc == 'Hello' + t.set_description_str('World') + assert t.desc == 'World' + t.set_description() + assert t.desc == '' + t.set_description('Bye') + assert t.desc == 'Bye: ' + assert "World" in our_file.getvalue() + + # without refresh + with closing(StringIO()) as our_file: + with tqdm(desc='Hello', file=our_file) as t: + assert t.desc == 'Hello' + t.set_description_str('World', False) + assert t.desc == 'World' + t.set_description(None, False) + assert t.desc == '' + assert "World" not in our_file.getvalue() + + # unicode + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file) as t: + t.set_description(u"\xe1\xe9\xed\xf3\xfa") + + +def test_deprecated_gui(): + """Test internal GUI properties""" + # Check: StatusPrinter iff gui is disabled + with closing(StringIO()) as our_file: + t = tqdm(total=2, gui=True, file=our_file, miniters=1, mininterval=0) + assert not hasattr(t, "sp") + try: + t.update(1) + except TqdmDeprecationWarning as e: + if ( + 'Please use `tqdm.gui.tqdm(...)` instead of `tqdm(..., gui=True)`' + not in our_file.getvalue() + ): + raise e + else: + raise DeprecationError('Should not allow manual gui=True without' + ' overriding __iter__() and update()') + finally: + t._instances.clear() + # t.close() + # len(tqdm._instances) += 1 # undo the close() decrement + + t = tqdm(_range(3), gui=True, file=our_file, miniters=1, mininterval=0) + try: + for _ in t: + pass + except TqdmDeprecationWarning as e: + if ( + 'Please use `tqdm.gui.tqdm(...)` instead of `tqdm(..., gui=True)`' + not in our_file.getvalue() + ): + raise e + else: + raise DeprecationError('Should not allow manual gui=True without' + ' overriding __iter__() and update()') + finally: + t._instances.clear() + # t.close() + # len(tqdm._instances) += 1 # undo the close() decrement + + with tqdm(total=1, gui=False, file=our_file) as t: + assert hasattr(t, "sp") + + +def test_cmp(): + """Test comparison functions""" + with closing(StringIO()) as our_file: + t0 = tqdm(total=10, file=our_file) + t1 = tqdm(total=10, file=our_file) + t2 = tqdm(total=10, file=our_file) + + assert t0 < t1 + assert t2 >= t0 + assert t0 <= t2 + + t3 = tqdm(total=10, file=our_file) + t4 = tqdm(total=10, file=our_file) + t5 = tqdm(total=10, file=our_file) + t5.close() + t6 = tqdm(total=10, file=our_file) + + assert t3 != t4 + assert t3 > t2 + assert t5 == t6 + t6.close() + t4.close() + t3.close() + t2.close() + t1.close() + t0.close() + + +def test_repr(): + """Test representation""" + with closing(StringIO()) as our_file: + with tqdm(total=10, ascii=True, file=our_file) as t: + assert str(t) == ' 0%| | 0/10 [00:00<?, ?it/s]' + + +def test_clear(): + """Test clearing bar display""" + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', bar_format='{l_bar}') + t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}') + before = squash_ctrlchars(our_file.getvalue()) + t2.clear() + t1.clear() + after = squash_ctrlchars(our_file.getvalue()) + t1.close() + t2.close() + assert before == ['pos0 bar: 0%|', 'pos1 bar: 0%|'] + assert after == ['', ''] + + +def test_clear_disabled(): + """Test disabled clear""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, desc='pos0 bar', disable=True, + bar_format='{l_bar}') as t: + t.clear() + assert our_file.getvalue() == '' + + +def test_refresh(): + """Test refresh bar display""" + with closing(StringIO()) as our_file: + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{l_bar}', mininterval=999, miniters=999) + t2 = tqdm(total=10, file=our_file, desc='pos1 bar', + bar_format='{l_bar}', mininterval=999, miniters=999) + t1.update() + t2.update() + before = squash_ctrlchars(our_file.getvalue()) + t1.refresh() + t2.refresh() + after = squash_ctrlchars(our_file.getvalue()) + t1.close() + t2.close() + + # Check that refreshing indeed forced the display to use realtime state + assert before == [u'pos0 bar: 0%|', u'pos1 bar: 0%|'] + assert after == [u'pos0 bar: 10%|', u'pos1 bar: 10%|'] + + +def test_disabled_repr(capsys): + """Test disabled repr""" + with tqdm(total=10, disable=True) as t: + str(t) + t.update() + print(t) + out, err = capsys.readouterr() + assert not err + assert out == ' 0%| | 0/10 [00:00<?, ?it/s]\n' + + +def test_disabled_refresh(): + """Test disabled refresh""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, desc='pos0 bar', disable=True, + bar_format='{l_bar}', mininterval=999, miniters=999) as t: + t.update() + t.refresh() + + assert our_file.getvalue() == '' + + +def test_write(): + """Test write messages""" + s = "Hello world" + with closing(StringIO()) as our_file: + # Change format to keep only left part w/o bar and it/s rate + t1 = tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + t2 = trange(10, file=our_file, desc='pos1 bar', bar_format='{l_bar}', + mininterval=0, miniters=1) + t3 = tqdm(total=10, file=our_file, desc='pos2 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + t1.update() + t2.update() + t3.update() + before = our_file.getvalue() + + # Write msg and see if bars are correctly redrawn below the msg + t1.write(s, file=our_file) # call as an instance method + tqdm.write(s, file=our_file) # call as a class method + after = our_file.getvalue() + + t1.close() + t2.close() + t3.close() + + before_squashed = squash_ctrlchars(before) + after_squashed = squash_ctrlchars(after) + + assert after_squashed == [s, s] + before_squashed + + # Check that no bar clearing if different file + with closing(StringIO()) as our_file_bar: + with closing(StringIO()) as our_file_write: + t1 = tqdm(total=10, file=our_file_bar, desc='pos0 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + + t1.update() + before_bar = our_file_bar.getvalue() + + tqdm.write(s, file=our_file_write) + + after_bar = our_file_bar.getvalue() + t1.close() + + assert before_bar == after_bar + + # Test stdout/stderr anti-mixup strategy + # Backup stdout/stderr + stde = sys.stderr + stdo = sys.stdout + # Mock stdout/stderr + with closing(StringIO()) as our_stderr: + with closing(StringIO()) as our_stdout: + sys.stderr = our_stderr + sys.stdout = our_stdout + t1 = tqdm(total=10, file=sys.stderr, desc='pos0 bar', + bar_format='{l_bar}', mininterval=0, miniters=1) + + t1.update() + before_err = sys.stderr.getvalue() + before_out = sys.stdout.getvalue() + + tqdm.write(s, file=sys.stdout) + after_err = sys.stderr.getvalue() + after_out = sys.stdout.getvalue() + + t1.close() + + assert before_err == '\rpos0 bar: 0%|\rpos0 bar: 10%|' + assert before_out == '' + after_err_res = [m[0] for m in RE_pos.findall(after_err)] + exres = ['\rpos0 bar: 0%|', + '\rpos0 bar: 10%|', + '\r ', + '\r\rpos0 bar: 10%|'] + pos_line_diff(after_err_res, exres) + assert after_out == s + '\n' + # Restore stdout and stderr + sys.stderr = stde + sys.stdout = stdo + + +def test_len(): + """Test advance len (numpy array shape)""" + np = importorskip('numpy') + with closing(StringIO()) as f: + with tqdm(np.zeros((3, 4)), file=f) as t: + assert len(t) == 3 + + +def test_autodisable_disable(): + """Test autodisable will disable on non-TTY""" + with closing(StringIO()) as our_file: + with tqdm(total=10, disable=None, file=our_file) as t: + t.update(3) + assert our_file.getvalue() == '' + + +def test_autodisable_enable(): + """Test autodisable will not disable on TTY""" + with closing(StringIO()) as our_file: + our_file.isatty = lambda: True + with tqdm(total=10, disable=None, file=our_file) as t: + t.update() + assert our_file.getvalue() != '' + + +def test_deprecation_exception(): + def test_TqdmDeprecationWarning(): + with closing(StringIO()) as our_file: + raise (TqdmDeprecationWarning('Test!', fp_write=getattr( + our_file, 'write', sys.stderr.write))) + + def test_TqdmDeprecationWarning_nofpwrite(): + raise TqdmDeprecationWarning('Test!', fp_write=None) + + raises(TqdmDeprecationWarning, test_TqdmDeprecationWarning) + raises(Exception, test_TqdmDeprecationWarning_nofpwrite) + + +def test_postfix(): + """Test postfix""" + postfix = {'float': 0.321034, 'gen': 543, 'str': 'h', 'lst': [2]} + postfix_order = (('w', 'w'), ('a', 0)) # no need for OrderedDict + expected = ['float=0.321', 'gen=543', 'lst=[2]', 'str=h'] + expected_order = ['w=w', 'a=0', 'float=0.321', 'gen=543', 'lst=[2]', 'str=h'] + + # Test postfix set at init + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, desc='pos0 bar', + bar_format='{r_bar}', postfix=postfix) as t1: + t1.refresh() + out = our_file.getvalue() + + # Test postfix set after init + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos1 bar', bar_format='{r_bar}', + postfix=None) as t2: + t2.set_postfix(**postfix) + t2.refresh() + out2 = our_file.getvalue() + + # Order of items in dict may change, so need a loop to check per item + for res in expected: + assert res in out + assert res in out2 + + # Test postfix (with ordered dict and no refresh) set after init + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', + postfix=None) as t3: + t3.set_postfix(postfix_order, False, **postfix) + t3.refresh() # explicit external refresh + out3 = our_file.getvalue() + + out3 = out3[1:-1].split(', ')[3:] + assert out3 == expected_order + + # Test postfix (with ordered dict and refresh) set after init + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos2 bar', + bar_format='{r_bar}', postfix=None) as t4: + t4.set_postfix(postfix_order, True, **postfix) + t4.refresh() # double refresh + out4 = our_file.getvalue() + + assert out4.count('\r') > out3.count('\r') + assert out4.count(", ".join(expected_order)) == 2 + + # Test setting postfix string directly + with closing(StringIO()) as our_file: + with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', + postfix=None) as t5: + t5.set_postfix_str("Hello", False) + t5.set_postfix_str("World") + out5 = our_file.getvalue() + + assert "Hello" not in out5 + out5 = out5[1:-1].split(', ')[3:] + assert out5 == ["World"] + + +def test_postfix_direct(): + """Test directly assigning non-str objects to postfix""" + with closing(StringIO()) as our_file: + with tqdm(total=10, file=our_file, miniters=1, mininterval=0, + bar_format="{postfix[0][name]} {postfix[1]:>5.2f}", + postfix=[{'name': "foo"}, 42]) as t: + for i in range(10): + if i % 2: + t.postfix[0]["name"] = "abcdefghij"[i] + else: + t.postfix[1] = i + t.update() + res = our_file.getvalue() + assert "f 6.00" in res + assert "h 6.00" in res + assert "h 8.00" in res + assert "j 8.00" in res + + +@contextmanager +def std_out_err_redirect_tqdm(tqdm_file=sys.stderr): + orig_out_err = sys.stdout, sys.stderr + try: + sys.stdout = sys.stderr = DummyTqdmFile(tqdm_file) + yield orig_out_err[0] + # Relay exceptions + except Exception as exc: + raise exc + # Always restore sys.stdout/err if necessary + finally: + sys.stdout, sys.stderr = orig_out_err + + +def test_file_redirection(): + """Test redirection of output""" + with closing(StringIO()) as our_file: + # Redirect stdout to tqdm.write() + with std_out_err_redirect_tqdm(tqdm_file=our_file): + with tqdm(total=3) as pbar: + print("Such fun") + pbar.update(1) + print("Such", "fun") + pbar.update(1) + print("Such ", end="") + print("fun") + pbar.update(1) + res = our_file.getvalue() + assert res.count("Such fun\n") == 3 + assert "0/3" in res + assert "3/3" in res + + +def test_external_write(): + """Test external write mode""" + with closing(StringIO()) as our_file: + # Redirect stdout to tqdm.write() + for _ in trange(3, file=our_file): + del tqdm._lock # classmethod should be able to recreate lock + with tqdm.external_write_mode(file=our_file): + our_file.write("Such fun\n") + res = our_file.getvalue() + assert res.count("Such fun\n") == 3 + assert "0/3" in res + assert "3/3" in res + + +def test_unit_scale(): + """Test numeric `unit_scale`""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(9), unit_scale=9, file=our_file, + miniters=1, mininterval=0): + pass + out = our_file.getvalue() + assert '81/81' in out + + +def patch_lock(thread=True): + """decorator replacing tqdm's lock with vanilla threading/multiprocessing""" + try: + if thread: + from threading import RLock + else: + from multiprocessing import RLock + lock = RLock() + except (ImportError, OSError) as err: + skip(str(err)) + + def outer(func): + """actual decorator""" + @wraps(func) + def inner(*args, **kwargs): + """set & reset lock even if exceptions occur""" + default_lock = tqdm.get_lock() + try: + tqdm.set_lock(lock) + return func(*args, **kwargs) + finally: + tqdm.set_lock(default_lock) + return inner + return outer + + +@patch_lock(thread=False) +def test_threading(): + """Test multiprocess/thread-realted features""" + pass # TODO: test interleaved output #445 + + +def test_bool(): + """Test boolean cast""" + def internal(our_file, disable): + kwargs = {'file': our_file, 'disable': disable} + with trange(10, **kwargs) as t: + assert t + with trange(0, **kwargs) as t: + assert not t + with tqdm(total=10, **kwargs) as t: + assert bool(t) + with tqdm(total=0, **kwargs) as t: + assert not bool(t) + with tqdm([], **kwargs) as t: + assert not t + with tqdm([0], **kwargs) as t: + assert t + with tqdm(iter([]), **kwargs) as t: + assert t + with tqdm(iter([1, 2, 3]), **kwargs) as t: + assert t + with tqdm(**kwargs) as t: + try: + print(bool(t)) + except TypeError: + pass + else: + raise TypeError("Expected bool(tqdm()) to fail") + + # test with and without disable + with closing(StringIO()) as our_file: + internal(our_file, False) + internal(our_file, True) + + +def backendCheck(module): + """Test tqdm-like module fallback""" + tn = module.tqdm + tr = module.trange + + with closing(StringIO()) as our_file: + with tn(total=10, file=our_file) as t: + assert len(t) == 10 + with tr(1337) as t: + assert len(t) == 1337 + + +def test_auto(): + """Test auto fallback""" + from tqdm import auto, autonotebook + backendCheck(autonotebook) + backendCheck(auto) + + +def test_wrapattr(): + """Test wrapping file-like objects""" + data = "a twenty-char string" + + with closing(StringIO()) as our_file: + with closing(StringIO()) as writer: + with tqdm.wrapattr(writer, "write", file=our_file, bytes=True) as wrap: + wrap.write(data) + res = writer.getvalue() + assert data == res + res = our_file.getvalue() + assert '%.1fB [' % len(data) in res + + with closing(StringIO()) as our_file: + with closing(StringIO()) as writer: + with tqdm.wrapattr(writer, "write", file=our_file, bytes=False) as wrap: + wrap.write(data) + res = our_file.getvalue() + assert '%dit [' % len(data) in res + + +def test_float_progress(): + """Test float totals""" + with closing(StringIO()) as our_file: + with trange(10, total=9.6, file=our_file) as t: + with catch_warnings(record=True) as w: + simplefilter("always", category=TqdmWarning) + for i in t: + if i < 9: + assert not w + assert w + assert "clamping frac" in str(w[-1].message) + + +def test_screen_shape(): + """Test screen shape""" + # ncols + with closing(StringIO()) as our_file: + with trange(10, file=our_file, ncols=50) as t: + list(t) + + res = our_file.getvalue() + assert all(len(i) == 50 for i in get_bar(res)) + + # no second/third bar, leave=False + with closing(StringIO()) as our_file: + kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, + 'mininterval': 0, 'leave': False} + with trange(10, desc="one", **kwargs) as t1: + with trange(10, desc="two", **kwargs) as t2: + with trange(10, desc="three", **kwargs) as t3: + list(t3) + list(t2) + list(t1) + + res = our_file.getvalue() + assert "one" in res + assert "two" not in res + assert "three" not in res + assert "\n\n" not in res + assert "more hidden" in res + # double-check ncols + assert all(len(i) == 50 for i in get_bar(res) + if i.strip() and "more hidden" not in i) + + # all bars, leave=True + with closing(StringIO()) as our_file: + kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, + 'miniters': 0, 'mininterval': 0} + with trange(10, desc="one", **kwargs) as t1: + with trange(10, desc="two", **kwargs) as t2: + assert "two" not in our_file.getvalue() + with trange(10, desc="three", **kwargs) as t3: + assert "three" not in our_file.getvalue() + list(t3) + list(t2) + list(t1) + + res = our_file.getvalue() + assert "one" in res + assert "two" in res + assert "three" in res + assert "\n\n" not in res + assert "more hidden" in res + # double-check ncols + assert all(len(i) == 50 for i in get_bar(res) + if i.strip() and "more hidden" not in i) + + # second bar becomes first, leave=False + with closing(StringIO()) as our_file: + kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, + 'mininterval': 0, 'leave': False} + t1 = tqdm(total=10, desc="one", **kwargs) + with tqdm(total=10, desc="two", **kwargs) as t2: + t1.update() + t2.update() + t1.close() + res = our_file.getvalue() + assert "one" in res + assert "two" not in res + assert "more hidden" in res + t2.update() + + res = our_file.getvalue() + assert "two" in res + + +def test_initial(): + """Test `initial`""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(9), initial=10, total=19, file=our_file, + miniters=1, mininterval=0): + pass + out = our_file.getvalue() + assert '10/19' in out + assert '19/19' in out + + +def test_colour(): + """Test `colour`""" + with closing(StringIO()) as our_file: + for _ in tqdm(_range(9), file=our_file, colour="#beefed"): + pass + out = our_file.getvalue() + assert '\x1b[38;2;%d;%d;%dm' % (0xbe, 0xef, 0xed) in out + + with catch_warnings(record=True) as w: + simplefilter("always", category=TqdmWarning) + with tqdm(total=1, file=our_file, colour="charm") as t: + assert w + t.update() + assert "Unknown colour" in str(w[-1].message) + + with closing(StringIO()) as our_file2: + for _ in tqdm(_range(9), file=our_file2, colour="blue"): + pass + out = our_file2.getvalue() + assert '\x1b[34m' in out + + +def test_closed(): + """Test writing to closed file""" + with closing(StringIO()) as our_file: + for i in trange(9, file=our_file, miniters=1, mininterval=0): + if i == 5: + our_file.close() + + +def test_reversed(capsys): + """Test reversed()""" + for _ in reversed(tqdm(_range(9))): + pass + out, err = capsys.readouterr() + assert not out + assert ' 0%' in err + assert '100%' in err + + +def test_contains(capsys): + """Test __contains__ doesn't iterate""" + with tqdm(list(range(9))) as t: + assert 9 not in t + assert all(i in t for i in _range(9)) + out, err = capsys.readouterr() + assert not out + assert ' 0%' in err + assert '100%' not in err |