summaryrefslogtreecommitdiffstats
path: root/tests/pool/test_sched.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/pool/test_sched.py')
-rw-r--r--tests/pool/test_sched.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/tests/pool/test_sched.py b/tests/pool/test_sched.py
new file mode 100644
index 0000000..b3d2572
--- /dev/null
+++ b/tests/pool/test_sched.py
@@ -0,0 +1,154 @@
+import logging
+from time import time, sleep
+from functools import partial
+from threading import Thread
+
+import pytest
+
+try:
+ from psycopg_pool.sched import Scheduler
+except ImportError:
+ # Tests should have been skipped if the package is not available
+ pass
+
+pytestmark = [pytest.mark.timing]
+
+
+@pytest.mark.slow
+def test_sched():
+ s = Scheduler()
+ results = []
+
+ def worker(i):
+ results.append((i, time()))
+
+ t0 = time()
+ s.enter(0.1, partial(worker, 1))
+ s.enter(0.4, partial(worker, 3))
+ s.enter(0.3, None)
+ s.enter(0.2, partial(worker, 2))
+ s.run()
+ assert len(results) == 2
+ assert results[0][0] == 1
+ assert results[0][1] - t0 == pytest.approx(0.1, 0.1)
+ assert results[1][0] == 2
+ assert results[1][1] - t0 == pytest.approx(0.2, 0.1)
+
+
+@pytest.mark.slow
+def test_sched_thread():
+ s = Scheduler()
+ t = Thread(target=s.run, daemon=True)
+ t.start()
+
+ results = []
+
+ def worker(i):
+ results.append((i, time()))
+
+ t0 = time()
+ s.enter(0.1, partial(worker, 1))
+ s.enter(0.4, partial(worker, 3))
+ s.enter(0.3, None)
+ s.enter(0.2, partial(worker, 2))
+
+ t.join()
+ t1 = time()
+ assert t1 - t0 == pytest.approx(0.3, 0.2)
+
+ assert len(results) == 2
+ assert results[0][0] == 1
+ assert results[0][1] - t0 == pytest.approx(0.1, 0.2)
+ assert results[1][0] == 2
+ assert results[1][1] - t0 == pytest.approx(0.2, 0.2)
+
+
+@pytest.mark.slow
+def test_sched_error(caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ s = Scheduler()
+ t = Thread(target=s.run, daemon=True)
+ t.start()
+
+ results = []
+
+ def worker(i):
+ results.append((i, time()))
+
+ def error():
+ 1 / 0
+
+ t0 = time()
+ s.enter(0.1, partial(worker, 1))
+ s.enter(0.4, None)
+ s.enter(0.3, partial(worker, 2))
+ s.enter(0.2, error)
+
+ t.join()
+ t1 = time()
+ assert t1 - t0 == pytest.approx(0.4, 0.1)
+
+ assert len(results) == 2
+ assert results[0][0] == 1
+ assert results[0][1] - t0 == pytest.approx(0.1, 0.1)
+ assert results[1][0] == 2
+ assert results[1][1] - t0 == pytest.approx(0.3, 0.1)
+
+ assert len(caplog.records) == 1
+ assert "ZeroDivisionError" in caplog.records[0].message
+
+
+@pytest.mark.slow
+def test_empty_queue_timeout():
+ s = Scheduler()
+
+ t0 = time()
+ times = []
+
+ wait_orig = s._event.wait
+
+ def wait_logging(timeout=None):
+ rv = wait_orig(timeout)
+ times.append(time() - t0)
+ return rv
+
+ setattr(s._event, "wait", wait_logging)
+ s.EMPTY_QUEUE_TIMEOUT = 0.2
+
+ t = Thread(target=s.run)
+ t.start()
+ sleep(0.5)
+ s.enter(0.5, None)
+ t.join()
+ times.append(time() - t0)
+ for got, want in zip(times, [0.2, 0.4, 0.5, 1.0]):
+ assert got == pytest.approx(want, 0.2), times
+
+
+@pytest.mark.slow
+def test_first_task_rescheduling():
+ s = Scheduler()
+
+ t0 = time()
+ times = []
+
+ wait_orig = s._event.wait
+
+ def wait_logging(timeout=None):
+ rv = wait_orig(timeout)
+ times.append(time() - t0)
+ return rv
+
+ setattr(s._event, "wait", wait_logging)
+ s.EMPTY_QUEUE_TIMEOUT = 0.1
+
+ s.enter(0.4, lambda: None)
+ t = Thread(target=s.run)
+ t.start()
+ s.enter(0.6, None) # this task doesn't trigger a reschedule
+ sleep(0.1)
+ s.enter(0.1, lambda: None) # this triggers a reschedule
+ t.join()
+ times.append(time() - t0)
+ for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]):
+ assert got == pytest.approx(want, 0.2), times