summaryrefslogtreecommitdiffstats
path: root/tests/tests_synchronisation.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/tests_synchronisation.py')
-rw-r--r--tests/tests_synchronisation.py224
1 files changed, 224 insertions, 0 deletions
diff --git a/tests/tests_synchronisation.py b/tests/tests_synchronisation.py
new file mode 100644
index 0000000..7ee55fb
--- /dev/null
+++ b/tests/tests_synchronisation.py
@@ -0,0 +1,224 @@
+from __future__ import division
+
+import sys
+from functools import wraps
+from threading import Event
+from time import sleep, time
+
+from tqdm import TMonitor, tqdm, trange
+
+from .tests_perf import retry_on_except
+from .tests_tqdm import StringIO, closing, importorskip, patch_lock, skip
+
+
+class Time(object):
+ """Fake time class class providing an offset"""
+ offset = 0
+
+ @classmethod
+ def reset(cls):
+ """zeroes internal offset"""
+ cls.offset = 0
+
+ @classmethod
+ def time(cls):
+ """time.time() + offset"""
+ return time() + cls.offset
+
+ @staticmethod
+ def sleep(dur):
+ """identical to time.sleep()"""
+ sleep(dur)
+
+ @classmethod
+ def fake_sleep(cls, dur):
+ """adds `dur` to internal offset"""
+ cls.offset += dur
+ sleep(0.000001) # sleep to allow interrupt (instead of pass)
+
+
+def FakeEvent():
+ """patched `threading.Event` where `wait()` uses `Time.fake_sleep()`"""
+ event = Event() # not a class in py2 so can't inherit
+
+ def wait(timeout=None):
+ """uses Time.fake_sleep"""
+ if timeout is not None:
+ Time.fake_sleep(timeout)
+ return event.is_set()
+
+ event.wait = wait
+ return event
+
+
+def patch_sleep(func):
+ """Temporarily makes TMonitor use Time.fake_sleep"""
+ @wraps(func)
+ def inner(*args, **kwargs):
+ """restores TMonitor on completion regardless of Exceptions"""
+ TMonitor._test["time"] = Time.time
+ TMonitor._test["Event"] = FakeEvent
+ if tqdm.monitor:
+ assert not tqdm.monitor.get_instances()
+ tqdm.monitor.exit()
+ del tqdm.monitor
+ tqdm.monitor = None
+ try:
+ return func(*args, **kwargs)
+ finally:
+ # Check that class var monitor is deleted if no instance left
+ tqdm.monitor_interval = 10
+ if tqdm.monitor:
+ assert not tqdm.monitor.get_instances()
+ tqdm.monitor.exit()
+ del tqdm.monitor
+ tqdm.monitor = None
+ TMonitor._test.pop("Event")
+ TMonitor._test.pop("time")
+
+ return inner
+
+
+def cpu_timify(t, timer=Time):
+ """Force tqdm to use the specified timer instead of system-wide time"""
+ t._time = timer.time
+ t._sleep = timer.fake_sleep
+ t.start_t = t.last_print_t = t._time()
+ return timer
+
+
+class FakeTqdm(object):
+ _instances = set()
+ get_lock = tqdm.get_lock
+
+
+def incr(x):
+ return x + 1
+
+
+def incr_bar(x):
+ with closing(StringIO()) as our_file:
+ for _ in trange(x, lock_args=(False,), file=our_file):
+ pass
+ return incr(x)
+
+
+@patch_sleep
+def test_monitor_thread():
+ """Test dummy monitoring thread"""
+ monitor = TMonitor(FakeTqdm, 10)
+ # Test if alive, then killed
+ assert monitor.report()
+ monitor.exit()
+ assert not monitor.report()
+ assert not monitor.is_alive()
+ del monitor
+
+
+@patch_sleep
+def test_monitoring_and_cleanup():
+ """Test for stalled tqdm instance and monitor deletion"""
+ # Note: should fix miniters for these tests, else with dynamic_miniters
+ # it's too complicated to handle with monitoring update and maxinterval...
+ maxinterval = tqdm.monitor_interval
+ assert maxinterval == 10
+ total = 1000
+
+ with closing(StringIO()) as our_file:
+ with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
+ maxinterval=maxinterval) as t:
+ cpu_timify(t, Time)
+ # Do a lot of iterations in a small timeframe
+ # (smaller than monitor interval)
+ Time.fake_sleep(maxinterval / 10) # monitor won't wake up
+ t.update(500)
+ # check that our fixed miniters is still there
+ assert t.miniters <= 500 # TODO: should really be == 500
+ # Then do 1 it after monitor interval, so that monitor kicks in
+ Time.fake_sleep(maxinterval)
+ t.update(1)
+ # Wait for the monitor to get out of sleep's loop and update tqdm.
+ timeend = Time.time()
+ while not (t.monitor.woken >= timeend and t.miniters == 1):
+ Time.fake_sleep(1) # Force awake up if it woken too soon
+ assert t.miniters == 1 # check that monitor corrected miniters
+ # Note: at this point, there may be a race condition: monitor saved
+ # current woken time but Time.sleep() happen just before monitor
+ # sleep. To fix that, either sleep here or increase time in a loop
+ # to ensure that monitor wakes up at some point.
+
+ # Try again but already at miniters = 1 so nothing will be done
+ Time.fake_sleep(maxinterval)
+ t.update(2)
+ timeend = Time.time()
+ while t.monitor.woken < timeend:
+ Time.fake_sleep(1) # Force awake if it woken too soon
+ # Wait for the monitor to get out of sleep's loop and update
+ # tqdm
+ assert t.miniters == 1 # check that monitor corrected miniters
+
+
+@patch_sleep
+def test_monitoring_multi():
+ """Test on multiple bars, one not needing miniters adjustment"""
+ # Note: should fix miniters for these tests, else with dynamic_miniters
+ # it's too complicated to handle with monitoring update and maxinterval...
+ maxinterval = tqdm.monitor_interval
+ assert maxinterval == 10
+ total = 1000
+
+ with closing(StringIO()) as our_file:
+ with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
+ maxinterval=maxinterval) as t1:
+ # Set high maxinterval for t2 so monitor does not need to adjust it
+ with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1,
+ maxinterval=1E5) as t2:
+ cpu_timify(t1, Time)
+ cpu_timify(t2, Time)
+ # Do a lot of iterations in a small timeframe
+ Time.fake_sleep(maxinterval / 10)
+ t1.update(500)
+ t2.update(500)
+ assert t1.miniters <= 500 # TODO: should really be == 500
+ assert t2.miniters == 500
+ # Then do 1 it after monitor interval, so that monitor kicks in
+ Time.fake_sleep(maxinterval)
+ t1.update(1)
+ t2.update(1)
+ # Wait for the monitor to get out of sleep and update tqdm
+ timeend = Time.time()
+ while not (t1.monitor.woken >= timeend and t1.miniters == 1):
+ Time.fake_sleep(1)
+ assert t1.miniters == 1 # check that monitor corrected miniters
+ assert t2.miniters == 500 # check that t2 was not adjusted
+
+
+def test_imap():
+ """Test multiprocessing.Pool"""
+ try:
+ from multiprocessing import Pool
+ except ImportError as err:
+ skip(str(err))
+
+ pool = Pool()
+ res = list(tqdm(pool.imap(incr, range(100)), disable=True))
+ pool.close()
+ assert res[-1] == 100
+
+
+# py2: locks won't propagate to incr_bar so may cause `AttributeError`
+@retry_on_except(n=3 if sys.version_info < (3,) else 1, check_cpu_time=False)
+@patch_lock(thread=True)
+def test_threadpool():
+ """Test concurrent.futures.ThreadPoolExecutor"""
+ ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor
+
+ with ThreadPoolExecutor(8) as pool:
+ try:
+ res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
+ except AttributeError:
+ if sys.version_info < (3,):
+ skip("not supported on py2")
+ else:
+ raise
+ assert sum(res) == sum(range(1, 101))