diff options
Diffstat (limited to 'tests/pool/test_sched.py')
-rw-r--r-- | tests/pool/test_sched.py | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/tests/pool/test_sched.py b/tests/pool/test_sched.py new file mode 100644 index 0000000..b3d2572 --- /dev/null +++ b/tests/pool/test_sched.py @@ -0,0 +1,154 @@ +import logging +from time import time, sleep +from functools import partial +from threading import Thread + +import pytest + +try: + from psycopg_pool.sched import Scheduler +except ImportError: + # Tests should have been skipped if the package is not available + pass + +pytestmark = [pytest.mark.timing] + + +@pytest.mark.slow +def test_sched(): + s = Scheduler() + results = [] + + def worker(i): + results.append((i, time())) + + t0 = time() + s.enter(0.1, partial(worker, 1)) + s.enter(0.4, partial(worker, 3)) + s.enter(0.3, None) + s.enter(0.2, partial(worker, 2)) + s.run() + assert len(results) == 2 + assert results[0][0] == 1 + assert results[0][1] - t0 == pytest.approx(0.1, 0.1) + assert results[1][0] == 2 + assert results[1][1] - t0 == pytest.approx(0.2, 0.1) + + +@pytest.mark.slow +def test_sched_thread(): + s = Scheduler() + t = Thread(target=s.run, daemon=True) + t.start() + + results = [] + + def worker(i): + results.append((i, time())) + + t0 = time() + s.enter(0.1, partial(worker, 1)) + s.enter(0.4, partial(worker, 3)) + s.enter(0.3, None) + s.enter(0.2, partial(worker, 2)) + + t.join() + t1 = time() + assert t1 - t0 == pytest.approx(0.3, 0.2) + + assert len(results) == 2 + assert results[0][0] == 1 + assert results[0][1] - t0 == pytest.approx(0.1, 0.2) + assert results[1][0] == 2 + assert results[1][1] - t0 == pytest.approx(0.2, 0.2) + + +@pytest.mark.slow +def test_sched_error(caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + s = Scheduler() + t = Thread(target=s.run, daemon=True) + t.start() + + results = [] + + def worker(i): + results.append((i, time())) + + def error(): + 1 / 0 + + t0 = time() + s.enter(0.1, partial(worker, 1)) + s.enter(0.4, None) + s.enter(0.3, partial(worker, 2)) + s.enter(0.2, error) + + t.join() + t1 = time() + assert t1 - t0 == pytest.approx(0.4, 0.1) + + assert len(results) == 2 + assert results[0][0] == 1 + assert results[0][1] - t0 == pytest.approx(0.1, 0.1) + assert results[1][0] == 2 + assert results[1][1] - t0 == pytest.approx(0.3, 0.1) + + assert len(caplog.records) == 1 + assert "ZeroDivisionError" in caplog.records[0].message + + +@pytest.mark.slow +def test_empty_queue_timeout(): + s = Scheduler() + + t0 = time() + times = [] + + wait_orig = s._event.wait + + def wait_logging(timeout=None): + rv = wait_orig(timeout) + times.append(time() - t0) + return rv + + setattr(s._event, "wait", wait_logging) + s.EMPTY_QUEUE_TIMEOUT = 0.2 + + t = Thread(target=s.run) + t.start() + sleep(0.5) + s.enter(0.5, None) + t.join() + times.append(time() - t0) + for got, want in zip(times, [0.2, 0.4, 0.5, 1.0]): + assert got == pytest.approx(want, 0.2), times + + +@pytest.mark.slow +def test_first_task_rescheduling(): + s = Scheduler() + + t0 = time() + times = [] + + wait_orig = s._event.wait + + def wait_logging(timeout=None): + rv = wait_orig(timeout) + times.append(time() - t0) + return rv + + setattr(s._event, "wait", wait_logging) + s.EMPTY_QUEUE_TIMEOUT = 0.1 + + s.enter(0.4, lambda: None) + t = Thread(target=s.run) + t.start() + s.enter(0.6, None) # this task doesn't trigger a reschedule + sleep(0.1) + s.enter(0.1, lambda: None) # this triggers a reschedule + t.join() + times.append(time() - t0) + for got, want in zip(times, [0.1, 0.2, 0.4, 0.6, 0.6]): + assert got == pytest.approx(want, 0.2), times |