summaryrefslogtreecommitdiffstats
path: root/tests/scripts
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 17:41:08 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 17:41:08 +0000
commit506ed8899b3a97e512be3fd6d44d5b11463bf9bf (patch)
tree808913770c5e6935d3714058c2a066c57b4632ec /tests/scripts
parentInitial commit. (diff)
downloadpsycopg3-7acd1f75a918595b0fe5366910f80c2a8527072c.tar.xz
psycopg3-7acd1f75a918595b0fe5366910f80c2a8527072c.zip
Adding upstream version 3.1.7.upstream/3.1.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/scripts')
-rw-r--r--tests/scripts/bench-411.py300
-rw-r--r--tests/scripts/dectest.py51
-rw-r--r--tests/scripts/pipeline-demo.py340
-rw-r--r--tests/scripts/spiketest.py156
4 files changed, 847 insertions, 0 deletions
diff --git a/tests/scripts/bench-411.py b/tests/scripts/bench-411.py
new file mode 100644
index 0000000..82ea451
--- /dev/null
+++ b/tests/scripts/bench-411.py
@@ -0,0 +1,300 @@
+import os
+import sys
+import time
+import random
+import asyncio
+import logging
+from enum import Enum
+from typing import Any, Dict, List, Generator
+from argparse import ArgumentParser, Namespace
+from contextlib import contextmanager
+
+logger = logging.getLogger()
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)s %(message)s",
+)
+
+
+class Driver(str, Enum):
+ psycopg2 = "psycopg2"
+ psycopg = "psycopg"
+ psycopg_async = "psycopg_async"
+ asyncpg = "asyncpg"
+
+
+ids: List[int] = []
+data: List[Dict[str, Any]] = []
+
+
+def main() -> None:
+
+ args = parse_cmdline()
+
+ ids[:] = range(args.ntests)
+ data[:] = [
+ dict(
+ id=i,
+ name="c%d" % i,
+ description="c%d" % i,
+ q=i * 10,
+ p=i * 20,
+ x=i * 30,
+ y=i * 40,
+ )
+ for i in ids
+ ]
+
+ # Must be done just on end
+ drop_at_the_end = args.drop
+ args.drop = False
+
+ for i, name in enumerate(args.drivers):
+ if i == len(args.drivers) - 1:
+ args.drop = drop_at_the_end
+
+ if name == Driver.psycopg2:
+ import psycopg2 # type: ignore
+
+ run_psycopg2(psycopg2, args)
+
+ elif name == Driver.psycopg:
+ import psycopg
+
+ run_psycopg(psycopg, args)
+
+ elif name == Driver.psycopg_async:
+ import psycopg
+
+ if sys.platform == "win32":
+ if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
+ asyncio.set_event_loop_policy(
+ asyncio.WindowsSelectorEventLoopPolicy()
+ )
+
+ asyncio.run(run_psycopg_async(psycopg, args))
+
+ elif name == Driver.asyncpg:
+ import asyncpg # type: ignore
+
+ asyncio.run(run_asyncpg(asyncpg, args))
+
+ else:
+ raise AssertionError(f"unknown driver: {name!r}")
+
+ # Must be done just on start
+ args.create = False
+
+
+table = """
+CREATE TABLE customer (
+ id SERIAL NOT NULL,
+ name VARCHAR(255),
+ description VARCHAR(255),
+ q INTEGER,
+ p INTEGER,
+ x INTEGER,
+ y INTEGER,
+ z INTEGER,
+ PRIMARY KEY (id)
+)
+"""
+drop = "DROP TABLE IF EXISTS customer"
+
+insert = """
+INSERT INTO customer (id, name, description, q, p, x, y) VALUES
+(%(id)s, %(name)s, %(description)s, %(q)s, %(p)s, %(x)s, %(y)s)
+"""
+
+select = """
+SELECT customer.id, customer.name, customer.description, customer.q,
+ customer.p, customer.x, customer.y, customer.z
+FROM customer
+WHERE customer.id = %(id)s
+"""
+
+
+@contextmanager
+def time_log(message: str) -> Generator[None, None, None]:
+ start = time.monotonic()
+ yield
+ end = time.monotonic()
+ logger.info(f"Run {message} in {end-start} s")
+
+
+def run_psycopg2(psycopg2: Any, args: Namespace) -> None:
+ logger.info("Running psycopg2")
+
+ if args.create:
+ logger.info(f"inserting {args.ntests} test records")
+ with psycopg2.connect(args.dsn) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(drop)
+ cursor.execute(table)
+ cursor.executemany(insert, data)
+ conn.commit()
+
+ logger.info(f"running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ with psycopg2.connect(args.dsn) as conn:
+ with time_log("psycopg2"):
+ for id_ in to_query:
+ with conn.cursor() as cursor:
+ cursor.execute(select, {"id": id_})
+ cursor.fetchall()
+ # conn.rollback()
+
+ if args.drop:
+ logger.info("dropping test records")
+ with psycopg2.connect(args.dsn) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(drop)
+ conn.commit()
+
+
+def run_psycopg(psycopg: Any, args: Namespace) -> None:
+ logger.info("Running psycopg sync")
+
+ if args.create:
+ logger.info(f"inserting {args.ntests} test records")
+ with psycopg.connect(args.dsn) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(drop)
+ cursor.execute(table)
+ cursor.executemany(insert, data)
+ conn.commit()
+
+ logger.info(f"running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ with psycopg.connect(args.dsn) as conn:
+ with time_log("psycopg"):
+ for id_ in to_query:
+ with conn.cursor() as cursor:
+ cursor.execute(select, {"id": id_})
+ cursor.fetchall()
+ # conn.rollback()
+
+ if args.drop:
+ logger.info("dropping test records")
+ with psycopg.connect(args.dsn) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(drop)
+ conn.commit()
+
+
+async def run_psycopg_async(psycopg: Any, args: Namespace) -> None:
+ logger.info("Running psycopg async")
+
+ conn: Any
+
+ if args.create:
+ logger.info(f"inserting {args.ntests} test records")
+ async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+ async with conn.cursor() as cursor:
+ await cursor.execute(drop)
+ await cursor.execute(table)
+ await cursor.executemany(insert, data)
+ await conn.commit()
+
+ logger.info(f"running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+ with time_log("psycopg_async"):
+ for id_ in to_query:
+ cursor = await conn.execute(select, {"id": id_})
+ await cursor.fetchall()
+ await cursor.close()
+ # await conn.rollback()
+
+ if args.drop:
+ logger.info("dropping test records")
+ async with await psycopg.AsyncConnection.connect(args.dsn) as conn:
+ async with conn.cursor() as cursor:
+ await cursor.execute(drop)
+ await conn.commit()
+
+
+async def run_asyncpg(asyncpg: Any, args: Namespace) -> None:
+ logger.info("Running asyncpg")
+
+ places = dict(id="$1", name="$2", description="$3", q="$4", p="$5", x="$6", y="$7")
+ a_insert = insert % places
+ a_select = select % {"id": "$1"}
+
+ conn: Any
+
+ if args.create:
+ logger.info(f"inserting {args.ntests} test records")
+ conn = await asyncpg.connect(args.dsn)
+ async with conn.transaction():
+ await conn.execute(drop)
+ await conn.execute(table)
+ await conn.executemany(a_insert, [tuple(d.values()) for d in data])
+ await conn.close()
+
+ logger.info(f"running {args.ntests} queries")
+ to_query = random.choices(ids, k=args.ntests)
+ conn = await asyncpg.connect(args.dsn)
+ with time_log("asyncpg"):
+ for id_ in to_query:
+ tr = conn.transaction()
+ await tr.start()
+ await conn.fetch(a_select, id_)
+ # await tr.rollback()
+ await conn.close()
+
+ if args.drop:
+ logger.info("dropping test records")
+ conn = await asyncpg.connect(args.dsn)
+ async with conn.transaction():
+ await conn.execute(drop)
+ await conn.close()
+
+
+def parse_cmdline() -> Namespace:
+ parser = ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "drivers",
+ nargs="+",
+ metavar="DRIVER",
+ type=Driver,
+ help=f"the drivers to test [choices: {', '.join(d.value for d in Driver)}]",
+ )
+
+ parser.add_argument(
+ "--ntests",
+ type=int,
+ default=10_000,
+ help="number of tests to perform [default: %(default)s]",
+ )
+
+ parser.add_argument(
+ "--dsn",
+ default=os.environ.get("PSYCOPG_TEST_DSN", ""),
+ help="database connection string"
+ " [default: %(default)r (from PSYCOPG_TEST_DSN env var)]",
+ )
+
+ parser.add_argument(
+ "--no-create",
+ dest="create",
+ action="store_false",
+ default="True",
+ help="skip data creation before tests (it must exist already)",
+ )
+
+ parser.add_argument(
+ "--no-drop",
+ dest="drop",
+ action="store_false",
+ default="True",
+ help="skip data drop after tests",
+ )
+
+ opt = parser.parse_args()
+
+ return opt
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/scripts/dectest.py b/tests/scripts/dectest.py
new file mode 100644
index 0000000..a49f116
--- /dev/null
+++ b/tests/scripts/dectest.py
@@ -0,0 +1,51 @@
+"""
+A quick and rough performance comparison of text vs. binary Decimal adaptation
+"""
+from random import randrange
+from decimal import Decimal
+import psycopg
+from psycopg import sql
+
+ncols = 10
+nrows = 500000
+format = psycopg.pq.Format.BINARY
+test = "copy"
+
+
+def main() -> None:
+ cnn = psycopg.connect()
+
+ cnn.execute(
+ sql.SQL("create table testdec ({})").format(
+ sql.SQL(", ").join(
+ [
+ sql.SQL("{} numeric(10,2)").format(sql.Identifier(f"t{i}"))
+ for i in range(ncols)
+ ]
+ )
+ )
+ )
+ cur = cnn.cursor()
+
+ if test == "copy":
+ with cur.copy(f"copy testdec from stdin (format {format.name})") as copy:
+ for j in range(nrows):
+ copy.write_row(
+ [Decimal(randrange(10000000000)) / 100 for i in range(ncols)]
+ )
+
+ elif test == "insert":
+ ph = ["%t", "%b"][format]
+ cur.executemany(
+ "insert into testdec values (%s)" % ", ".join([ph] * ncols),
+ (
+ [Decimal(randrange(10000000000)) / 100 for i in range(ncols)]
+ for j in range(nrows)
+ ),
+ )
+ else:
+ raise Exception(f"bad test: {test}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/scripts/pipeline-demo.py b/tests/scripts/pipeline-demo.py
new file mode 100644
index 0000000..ec95229
--- /dev/null
+++ b/tests/scripts/pipeline-demo.py
@@ -0,0 +1,340 @@
+"""Pipeline mode demo
+
+This reproduces libpq_pipeline::pipelined_insert PostgreSQL test at
+src/test/modules/libpq_pipeline/libpq_pipeline.c::test_pipelined_insert().
+
+We do not fetch results explicitly (using cursor.fetch*()), this is
+handled by execute() calls when pgconn socket is read-ready, which
+happens when the output buffer is full.
+"""
+import argparse
+import asyncio
+import logging
+from contextlib import contextmanager
+from functools import partial
+from typing import Any, Iterator, Optional, Sequence, Tuple
+
+from psycopg import AsyncConnection, Connection
+from psycopg import pq, waiting
+from psycopg import errors as e
+from psycopg.abc import PipelineCommand
+from psycopg.generators import pipeline_communicate
+from psycopg.pq import Format, DiagnosticField
+from psycopg._compat import Deque
+
+psycopg_logger = logging.getLogger("psycopg")
+pipeline_logger = logging.getLogger("pipeline")
+args: argparse.Namespace
+
+
+class LoggingPGconn:
+ """Wrapper for PGconn that logs fetched results."""
+
+ def __init__(self, pgconn: pq.abc.PGconn, logger: logging.Logger):
+ self._pgconn = pgconn
+ self._logger = logger
+
+ def log_notice(result: pq.abc.PGresult) -> None:
+ def get_field(field: DiagnosticField) -> Optional[str]:
+ value = result.error_field(field)
+ return value.decode("utf-8", "replace") if value else None
+
+ logger.info(
+ "notice %s %s",
+ get_field(DiagnosticField.SEVERITY),
+ get_field(DiagnosticField.MESSAGE_PRIMARY),
+ )
+
+ pgconn.notice_handler = log_notice
+
+ if args.trace:
+ self._trace_file = open(args.trace, "w")
+ pgconn.trace(self._trace_file.fileno())
+
+ def __del__(self) -> None:
+ if hasattr(self, "_trace_file"):
+ self._pgconn.untrace()
+ self._trace_file.close()
+
+ def __getattr__(self, name: str) -> Any:
+ return getattr(self._pgconn, name)
+
+ def send_query(self, command: bytes) -> None:
+ self._logger.warning("PQsendQuery broken in libpq 14.5")
+ self._pgconn.send_query(command)
+ self._logger.info("sent %s", command.decode())
+
+ def send_query_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = Format.TEXT,
+ ) -> None:
+ self._pgconn.send_query_params(
+ command, param_values, param_types, param_formats, result_format
+ )
+ self._logger.info("sent %s", command.decode())
+
+ def send_query_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = Format.TEXT,
+ ) -> None:
+ self._pgconn.send_query_prepared(
+ name, param_values, param_formats, result_format
+ )
+ self._logger.info("sent prepared '%s' with %s", name.decode(), param_values)
+
+ def send_prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> None:
+ self._pgconn.send_prepare(name, command, param_types)
+ self._logger.info("prepare %s as '%s'", command.decode(), name.decode())
+
+ def get_result(self) -> Optional[pq.abc.PGresult]:
+ r = self._pgconn.get_result()
+ if r is not None:
+ self._logger.info("got %s result", pq.ExecStatus(r.status).name)
+ return r
+
+
+@contextmanager
+def prepare_pipeline_demo_pq(
+ pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
+) -> Iterator[Tuple[Deque[PipelineCommand], Deque[str]]]:
+ """Set up pipeline demo with initial queries and yield commands and
+ results queue for pipeline_communicate().
+ """
+ logger.debug("enter pipeline")
+ pgconn.enter_pipeline_mode()
+
+ setup_queries = [
+ ("begin", "BEGIN TRANSACTION"),
+ ("drop table", "DROP TABLE IF EXISTS pq_pipeline_demo"),
+ (
+ "create table",
+ (
+ "CREATE UNLOGGED TABLE pq_pipeline_demo("
+ " id serial primary key,"
+ " itemno integer,"
+ " int8filler int8"
+ ")"
+ ),
+ ),
+ (
+ "prepare",
+ ("INSERT INTO pq_pipeline_demo(itemno, int8filler)" " VALUES ($1, $2)"),
+ ),
+ ]
+
+ commands = Deque[PipelineCommand]()
+ results_queue = Deque[str]()
+
+ for qname, query in setup_queries:
+ if qname == "prepare":
+ pgconn.send_prepare(qname.encode(), query.encode())
+ else:
+ pgconn.send_query_params(query.encode(), None)
+ results_queue.append(qname)
+
+ committed = False
+ synced = False
+
+ while True:
+ if rows_to_send:
+ params = [f"{rows_to_send}".encode(), f"{1 << 62}".encode()]
+ commands.append(partial(pgconn.send_query_prepared, b"prepare", params))
+ results_queue.append(f"row {rows_to_send}")
+ rows_to_send -= 1
+
+ elif not committed:
+ committed = True
+ commands.append(partial(pgconn.send_query_params, b"COMMIT", None))
+ results_queue.append("commit")
+
+ elif not synced:
+
+ def sync() -> None:
+ pgconn.pipeline_sync()
+ logger.info("pipeline sync sent")
+
+ synced = True
+ commands.append(sync)
+ results_queue.append("sync")
+
+ else:
+ break
+
+ try:
+ yield commands, results_queue
+ finally:
+ logger.debug("exit pipeline")
+ pgconn.exit_pipeline_mode()
+
+
+def pipeline_demo_pq(rows_to_send: int, logger: logging.Logger) -> None:
+ pgconn = LoggingPGconn(Connection.connect().pgconn, logger)
+ with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as (
+ commands,
+ results_queue,
+ ):
+ while results_queue:
+ fetched = waiting.wait(
+ pipeline_communicate(
+ pgconn, # type: ignore[arg-type]
+ commands,
+ ),
+ pgconn.socket,
+ )
+ assert not commands, commands
+ for results in fetched:
+ results_queue.popleft()
+ for r in results:
+ if r.status in (
+ pq.ExecStatus.FATAL_ERROR,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ ):
+ raise e.error_from_result(r)
+
+
+async def pipeline_demo_pq_async(rows_to_send: int, logger: logging.Logger) -> None:
+ pgconn = LoggingPGconn((await AsyncConnection.connect()).pgconn, logger)
+
+ with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as (
+ commands,
+ results_queue,
+ ):
+ while results_queue:
+ fetched = await waiting.wait_async(
+ pipeline_communicate(
+ pgconn, # type: ignore[arg-type]
+ commands,
+ ),
+ pgconn.socket,
+ )
+ assert not commands, commands
+ for results in fetched:
+ results_queue.popleft()
+ for r in results:
+ if r.status in (
+ pq.ExecStatus.FATAL_ERROR,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ ):
+ raise e.error_from_result(r)
+
+
+def pipeline_demo(rows_to_send: int, many: bool, logger: logging.Logger) -> None:
+ """Pipeline demo using sync API."""
+ conn = Connection.connect()
+ conn.autocommit = True
+ conn.pgconn = LoggingPGconn(conn.pgconn, logger) # type: ignore[assignment]
+ with conn.pipeline():
+ with conn.transaction():
+ conn.execute("DROP TABLE IF EXISTS pq_pipeline_demo")
+ conn.execute(
+ "CREATE UNLOGGED TABLE pq_pipeline_demo("
+ " id serial primary key,"
+ " itemno integer,"
+ " int8filler int8"
+ ")"
+ )
+ query = "INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES (%s, %s)"
+ params = ((r, 1 << 62) for r in range(rows_to_send, 0, -1))
+ if many:
+ cur = conn.cursor()
+ cur.executemany(query, list(params))
+ else:
+ for p in params:
+ conn.execute(query, p)
+
+
+async def pipeline_demo_async(
+ rows_to_send: int, many: bool, logger: logging.Logger
+) -> None:
+ """Pipeline demo using async API."""
+ aconn = await AsyncConnection.connect()
+ await aconn.set_autocommit(True)
+ aconn.pgconn = LoggingPGconn(aconn.pgconn, logger) # type: ignore[assignment]
+ async with aconn.pipeline():
+ async with aconn.transaction():
+ await aconn.execute("DROP TABLE IF EXISTS pq_pipeline_demo")
+ await aconn.execute(
+ "CREATE UNLOGGED TABLE pq_pipeline_demo("
+ " id serial primary key,"
+ " itemno integer,"
+ " int8filler int8"
+ ")"
+ )
+ query = "INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES (%s, %s)"
+ params = ((r, 1 << 62) for r in range(rows_to_send, 0, -1))
+ if many:
+ cur = aconn.cursor()
+ await cur.executemany(query, list(params))
+ else:
+ for p in params:
+ await aconn.execute(query, p)
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-n",
+ dest="nrows",
+ metavar="ROWS",
+ default=10_000,
+ type=int,
+ help="number of rows to insert",
+ )
+ parser.add_argument(
+ "--pq", action="store_true", help="use low-level psycopg.pq API"
+ )
+ parser.add_argument(
+ "--async", dest="async_", action="store_true", help="use async API"
+ )
+ parser.add_argument(
+ "--many",
+ action="store_true",
+ help="use executemany() (not applicable for --pq)",
+ )
+ parser.add_argument("--trace", help="write trace info into TRACE file")
+ parser.add_argument("-l", "--log", help="log file (stderr by default)")
+
+ global args
+ args = parser.parse_args()
+
+ psycopg_logger.setLevel(logging.DEBUG)
+ pipeline_logger.setLevel(logging.DEBUG)
+ if args.log:
+ psycopg_logger.addHandler(logging.FileHandler(args.log))
+ pipeline_logger.addHandler(logging.FileHandler(args.log))
+ else:
+ psycopg_logger.addHandler(logging.StreamHandler())
+ pipeline_logger.addHandler(logging.StreamHandler())
+
+ if args.pq:
+ if args.many:
+ parser.error("--many cannot be used with --pq")
+ if args.async_:
+ asyncio.run(pipeline_demo_pq_async(args.nrows, pipeline_logger))
+ else:
+ pipeline_demo_pq(args.nrows, pipeline_logger)
+ else:
+ if pq.__impl__ != "python":
+ parser.error(
+ "only supported for Python implementation (set PSYCOPG_IMPL=python)"
+ )
+ if args.async_:
+ asyncio.run(pipeline_demo_async(args.nrows, args.many, pipeline_logger))
+ else:
+ pipeline_demo(args.nrows, args.many, pipeline_logger)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/scripts/spiketest.py b/tests/scripts/spiketest.py
new file mode 100644
index 0000000..2c9cc16
--- /dev/null
+++ b/tests/scripts/spiketest.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+"""
+Run a connection pool spike test.
+
+The test is inspired to the `spike analysis`__ illustrated by HikariCP
+
+.. __: https://github.com/brettwooldridge/HikariCP/blob/dev/documents/
+ Welcome-To-The-Jungle.md
+
+"""
+# mypy: allow-untyped-defs
+# mypy: allow-untyped-calls
+
+import time
+import threading
+
+import psycopg
+import psycopg_pool
+from psycopg.rows import Row
+
+import logging
+
+
+def main() -> None:
+ opt = parse_cmdline()
+ if opt.loglevel:
+ loglevel = getattr(logging, opt.loglevel.upper())
+ logging.basicConfig(
+ level=loglevel, format="%(asctime)s %(levelname)s %(message)s"
+ )
+
+ logging.getLogger("psycopg2.pool").setLevel(loglevel)
+
+ with psycopg_pool.ConnectionPool(
+ opt.dsn,
+ min_size=opt.min_size,
+ max_size=opt.max_size,
+ connection_class=DelayedConnection,
+ kwargs={"conn_delay": 0.150},
+ ) as pool:
+ pool.wait()
+ measurer = Measurer(pool)
+
+ # Create and start all the thread: they will get stuck on the event
+ ev = threading.Event()
+ threads = [
+ threading.Thread(target=worker, args=(pool, 0.002, ev), daemon=True)
+ for i in range(opt.num_clients)
+ ]
+ for t in threads:
+ t.start()
+ time.sleep(0.2)
+
+ # Release the threads!
+ measurer.start(0.00025)
+ t0 = time.time()
+ ev.set()
+
+ # Wait for the threads to finish
+ for t in threads:
+ t.join()
+ t1 = time.time()
+ measurer.stop()
+
+ print(f"time: {(t1 - t0) * 1000} msec")
+ print("active,idle,total,waiting")
+ recs = [
+ f'{m["pool_size"] - m["pool_available"]}'
+ f',{m["pool_available"]}'
+ f',{m["pool_size"]}'
+ f',{m["requests_waiting"]}'
+ for m in measurer.measures
+ ]
+ print("\n".join(recs))
+
+
+def worker(p, t, ev):
+ ev.wait()
+ with p.connection():
+ time.sleep(t)
+
+
+class Measurer:
+ def __init__(self, pool):
+ self.pool = pool
+ self.worker = None
+ self.stopped = False
+ self.measures = []
+
+ def start(self, interval):
+ self.worker = threading.Thread(target=self._run, args=(interval,), daemon=True)
+ self.worker.start()
+
+ def stop(self):
+ self.stopped = True
+ if self.worker:
+ self.worker.join()
+ self.worker = None
+
+ def _run(self, interval):
+ while not self.stopped:
+ self.measures.append(self.pool.get_stats())
+ time.sleep(interval)
+
+
+class DelayedConnection(psycopg.Connection[Row]):
+ """A connection adding a delay to the connection time."""
+
+ @classmethod
+ def connect(cls, conninfo, conn_delay=0, **kwargs):
+ t0 = time.time()
+ conn = super().connect(conninfo, **kwargs)
+ t1 = time.time()
+ wait = max(0.0, conn_delay - (t1 - t0))
+ if wait:
+ time.sleep(wait)
+ return conn
+
+
+def parse_cmdline():
+ from argparse import ArgumentParser
+
+ parser = ArgumentParser(description=__doc__)
+ parser.add_argument("--dsn", default="", help="connection string to the database")
+ parser.add_argument(
+ "--min_size",
+ default=5,
+ type=int,
+ help="minimum number of connections in the pool",
+ )
+ parser.add_argument(
+ "--max_size",
+ default=20,
+ type=int,
+ help="maximum number of connections in the pool",
+ )
+ parser.add_argument(
+ "--num-clients",
+ default=50,
+ type=int,
+ help="number of threads making a request",
+ )
+ parser.add_argument(
+ "--loglevel",
+ default=None,
+ choices=("DEBUG", "INFO", "WARNING", "ERROR"),
+ help="level to log at [default: no log]",
+ )
+
+ opt = parser.parse_args()
+
+ return opt
+
+
+if __name__ == "__main__":
+ main()