+from __future__ import division, print_function
+import sys
+from contextlib import contextmanager
+from functools import wraps
+from time import sleep, time
+# Use relative/cpu timer to have reliable timings when there is a sudden load
+ from time import process_time
+except ImportError:
+ from time import clock
+ process_time = clock
+from tqdm import tqdm, trange
+from .tests_tqdm import _range, importorskip, mark, patch_lock, skip
+pytestmark = mark.slow
+def cpu_sleep(t):
+ """Sleep the given amount of cpu time"""
+ start = process_time()
+ while (process_time() - start) < t:
+ pass
+def checkCpuTime(sleeptime=0.2):
+ """Check if cpu time works correctly"""
+ if checkCpuTime.passed:
+ return True
+ # First test that sleeping does not consume cputime
+ start1 = process_time()
+ sleep(sleeptime)
+ t1 = process_time() - start1
+ # secondly check by comparing to cpusleep (where we actually do something)
+ start2 = process_time()
+ cpu_sleep(sleeptime)
+ t2 = process_time() - start2
+ if abs(t1) < 0.0001 and t1 < t2 / 10:
+ checkCpuTime.passed = True
+ return True
+ skip("cpu time not reliable on this machine")
+checkCpuTime.passed = False
+def relative_timer():
+ """yields a context timer function which stops ticking on exit"""
+ start = process_time()
+ def elapser():
+ return process_time() - start
+ yield lambda: elapser()
+ spent = elapser()
+ def elapser(): # NOQA
+ return spent
+def retry_on_except(n=3, check_cpu_time=True):
+ """decroator for retrying `n` times before raising Exceptions"""
+ def wrapper(func):
+ """actual decorator"""
+ @wraps(func)
+ def test_inner(*args, **kwargs):
+ """may skip if `check_cpu_time` fails"""
+ for i in range(1, n + 1):
+ try:
+ if check_cpu_time:
+ checkCpuTime()
+ func(*args, **kwargs)
+ except Exception:
+ if i >= n:
+ raise
+ else:
+ return
+ return test_inner
+ return wrapper
+def simple_progress(iterable=None, total=None, file=sys.stdout, desc='',
+ leave=False, miniters=1, mininterval=0.1, width=60):
+ """Simple progress bar reproducing tqdm's major features"""
+ n = [0] # use a closure
+ start_t = [time()]
+ last_n = [0]
+ last_t = [0]
+ if iterable is not None:
+ total = len(iterable)
+ def format_interval(t):
+ mins, s = divmod(int(t), 60)
+ h, m = divmod(mins, 60)
+ if h:
+ return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s)
+ else:
+ return '{0:02d}:{1:02d}'.format(m, s)
+ def update_and_print(i=1):
+ n[0] += i
+ if (n[0] - last_n[0]) >= miniters:
+ last_n[0] = n[0]
+ if (time() - last_t[0]) >= mininterval:
+ last_t[0] = time() # last_t[0] == current time
+ spent = last_t[0] - start_t[0]
+ spent_fmt = format_interval(spent)
+ rate = n[0] / spent if spent > 0 else 0
+ rate_fmt = "%.2fs/it" % (1.0 / rate) if 0.0 < rate < 1.0 else "%.2fit/s" % rate
+ frac = n[0] / total
+ percentage = int(frac * 100)
+ eta = (total - n[0]) / rate if rate > 0 else 0
+ eta_fmt = format_interval(eta)
+ # full_bar = "#" * int(frac * width)
+ barfill = " " * int((1.0 - frac) * width)
+ bar_length, frac_bar_length = divmod(int(frac * width * 10), 10)
+ full_bar = '#' * bar_length
+ frac_bar = chr(48 + frac_bar_length) if frac_bar_length else ' '
+ file.write("\r%s %i%%|%s%s%s| %i/%i [%s<%s, %s]" %
+ (desc, percentage, full_bar, frac_bar, barfill, n[0],
+ total, spent_fmt, eta_fmt, rate_fmt))
+ if n[0] == total and leave:
+ file.write("\n")
+ file.flush()
+ def update_and_yield():
+ for elt in iterable:
+ yield elt
+ update_and_print()
+ update_and_print(0)
+ if iterable is not None:
+ return update_and_yield()
+ else:
+ return update_and_print
+def assert_performance(thresh, name_left, time_left, name_right, time_right):
+ """raises if time_left > thresh * time_right"""
+ if time_left > thresh * time_right:
+ raise ValueError(
+ ('{name[0]}: {time[0]:f}, '
+ '{name[1]}: {time[1]:f}, '
+ 'ratio {ratio:f} > {thresh:f}').format(
+ name=(name_left, name_right),
+ time=(time_left, time_right),
+ ratio=time_left / time_right, thresh=thresh))
+def test_iter_basic_overhead():
+ """Test overhead of iteration based tqdm"""
+ total = int(1e6)
+ a = 0
+ with trange(total) as t:
+ with relative_timer() as time_tqdm:
+ for i in t:
+ a += i
+ assert a == (total ** 2 - total) / 2.0
+ a = 0
+ with relative_timer() as time_bench:
+ for i in _range(total):
+ a += i
+ sys.stdout.write(str(a))
+ assert_performance(3, 'trange', time_tqdm(), 'range', time_bench())
+def test_manual_basic_overhead():
+ """Test overhead of manual tqdm"""
+ total = int(1e6)
+ with tqdm(total=total * 10, leave=True) as t:
+ a = 0
+ with relative_timer() as time_tqdm:
+ for i in _range(total):
+ a += i
+ t.update(10)
+ a = 0
+ with relative_timer() as time_bench:
+ for i in _range(total):
+ a += i
+ sys.stdout.write(str(a))
+ assert_performance(5, 'tqdm', time_tqdm(), 'range', time_bench())
+def worker(total, blocking=True):
+ def incr_bar(x):
+ for _ in trange(total, lock_args=None if blocking else (False,),
+ miniters=1, mininterval=0, maxinterval=0):
+ pass
+ return x + 1
+ return incr_bar
+def test_lock_args():
+ """Test overhead of nonblocking threads"""
+ ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor
+ total = 16
+ subtotal = 10000
+ with ThreadPoolExecutor() as pool:
+ sys.stderr.write('block ... ')
+ sys.stderr.flush()
+ with relative_timer() as time_tqdm:
+ res = list(, True), range(total)))
+ assert sum(res) == sum(range(total)) + total
+ sys.stderr.write('noblock ... ')
+ sys.stderr.flush()
+ with relative_timer() as time_noblock:
+ res = list(, False), range(total)))
+ assert sum(res) == sum(range(total)) + total
+ assert_performance(0.5, 'noblock', time_noblock(), 'tqdm', time_tqdm())
+def test_iter_overhead_hard():
+ """Test overhead of iteration based tqdm (hard)"""
+ total = int(1e5)
+ a = 0
+ with trange(total, leave=True, miniters=1,
+ mininterval=0, maxinterval=0) as t:
+ with relative_timer() as time_tqdm:
+ for i in t:
+ a += i
+ assert a == (total ** 2 - total) / 2.0
+ a = 0
+ with relative_timer() as time_bench:
+ for i in _range(total):
+ a += i
+ sys.stdout.write(("%i" % a) * 40)
+ assert_performance(130, 'trange', time_tqdm(), 'range', time_bench())
+def test_manual_overhead_hard():
+ """Test overhead of manual tqdm (hard)"""
+ total = int(1e5)
+ with tqdm(total=total * 10, leave=True, miniters=1,
+ mininterval=0, maxinterval=0) as t:
+ a = 0
+ with relative_timer() as time_tqdm:
+ for i in _range(total):
+ a += i
+ t.update(10)
+ a = 0
+ with relative_timer() as time_bench:
+ for i in _range(total):
+ a += i
+ sys.stdout.write(("%i" % a) * 40)
+ assert_performance(130, 'tqdm', time_tqdm(), 'range', time_bench())
+def test_iter_overhead_simplebar_hard():
+ """Test overhead of iteration based tqdm vs simple progress bar (hard)"""
+ total = int(1e4)
+ a = 0
+ with trange(total, leave=True, miniters=1,
+ mininterval=0, maxinterval=0) as t:
+ with relative_timer() as time_tqdm:
+ for i in t:
+ a += i
+ assert a == (total ** 2 - total) / 2.0
+ a = 0
+ s = simple_progress(_range(total), leave=True,
+ miniters=1, mininterval=0)
+ with relative_timer() as time_bench:
+ for i in s:
+ a += i
+ assert_performance(10, 'trange', time_tqdm(), 'simple_progress', time_bench())
+def test_manual_overhead_simplebar_hard():
+ """Test overhead of manual tqdm vs simple progress bar (hard)"""
+ total = int(1e4)
+ with tqdm(total=total * 10, leave=True, miniters=1,
+ mininterval=0, maxinterval=0) as t:
+ a = 0
+ with relative_timer() as time_tqdm:
+ for i in _range(total):
+ a += i
+ t.update(10)
+ simplebar_update = simple_progress(total=total * 10, leave=True,
+ miniters=1, mininterval=0)
+ a = 0
+ with relative_timer() as time_bench:
+ for i in _range(total):
+ a += i
+ simplebar_update(10)
+ assert_performance(10, 'tqdm', time_tqdm(), 'simple_progress', time_bench())