diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:41:08 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 17:41:08 +0000 |
commit | 506ed8899b3a97e512be3fd6d44d5b11463bf9bf (patch) | |
tree | 808913770c5e6935d3714058c2a066c57b4632ec /tests/scripts | |
parent | Initial commit. (diff) | |
download | psycopg3-7acd1f75a918595b0fe5366910f80c2a8527072c.tar.xz psycopg3-7acd1f75a918595b0fe5366910f80c2a8527072c.zip |
Adding upstream version 3.1.7.upstream/3.1.7upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/scripts')
-rw-r--r-- | tests/scripts/bench-411.py | 300 | ||||
-rw-r--r-- | tests/scripts/dectest.py | 51 | ||||
-rw-r--r-- | tests/scripts/pipeline-demo.py | 340 | ||||
-rw-r--r-- | tests/scripts/spiketest.py | 156 |
4 files changed, 847 insertions, 0 deletions
diff --git a/tests/scripts/bench-411.py b/tests/scripts/bench-411.py new file mode 100644 index 0000000..82ea451 --- /dev/null +++ b/tests/scripts/bench-411.py @@ -0,0 +1,300 @@ +import os +import sys +import time +import random +import asyncio +import logging +from enum import Enum +from typing import Any, Dict, List, Generator +from argparse import ArgumentParser, Namespace +from contextlib import contextmanager + +logger = logging.getLogger() +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", +) + + +class Driver(str, Enum): + psycopg2 = "psycopg2" + psycopg = "psycopg" + psycopg_async = "psycopg_async" + asyncpg = "asyncpg" + + +ids: List[int] = [] +data: List[Dict[str, Any]] = [] + + +def main() -> None: + + args = parse_cmdline() + + ids[:] = range(args.ntests) + data[:] = [ + dict( + id=i, + name="c%d" % i, + description="c%d" % i, + q=i * 10, + p=i * 20, + x=i * 30, + y=i * 40, + ) + for i in ids + ] + + # Must be done just on end + drop_at_the_end = args.drop + args.drop = False + + for i, name in enumerate(args.drivers): + if i == len(args.drivers) - 1: + args.drop = drop_at_the_end + + if name == Driver.psycopg2: + import psycopg2 # type: ignore + + run_psycopg2(psycopg2, args) + + elif name == Driver.psycopg: + import psycopg + + run_psycopg(psycopg, args) + + elif name == Driver.psycopg_async: + import psycopg + + if sys.platform == "win32": + if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + asyncio.set_event_loop_policy( + asyncio.WindowsSelectorEventLoopPolicy() + ) + + asyncio.run(run_psycopg_async(psycopg, args)) + + elif name == Driver.asyncpg: + import asyncpg # type: ignore + + asyncio.run(run_asyncpg(asyncpg, args)) + + else: + raise AssertionError(f"unknown driver: {name!r}") + + # Must be done just on start + args.create = False + + +table = """ +CREATE TABLE customer ( + id SERIAL NOT NULL, + name VARCHAR(255), + description VARCHAR(255), + q INTEGER, + p INTEGER, + x INTEGER, + y INTEGER, + z INTEGER, + PRIMARY KEY (id) +) +""" +drop = "DROP TABLE IF EXISTS customer" + +insert = """ +INSERT INTO customer (id, name, description, q, p, x, y) VALUES +(%(id)s, %(name)s, %(description)s, %(q)s, %(p)s, %(x)s, %(y)s) +""" + +select = """ +SELECT customer.id, customer.name, customer.description, customer.q, + customer.p, customer.x, customer.y, customer.z +FROM customer +WHERE customer.id = %(id)s +""" + + +@contextmanager +def time_log(message: str) -> Generator[None, None, None]: + start = time.monotonic() + yield + end = time.monotonic() + logger.info(f"Run {message} in {end-start} s") + + +def run_psycopg2(psycopg2: Any, args: Namespace) -> None: + logger.info("Running psycopg2") + + if args.create: + logger.info(f"inserting {args.ntests} test records") + with psycopg2.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + cursor.execute(table) + cursor.executemany(insert, data) + conn.commit() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + with psycopg2.connect(args.dsn) as conn: + with time_log("psycopg2"): + for id_ in to_query: + with conn.cursor() as cursor: + cursor.execute(select, {"id": id_}) + cursor.fetchall() + # conn.rollback() + + if args.drop: + logger.info("dropping test records") + with psycopg2.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + conn.commit() + + +def run_psycopg(psycopg: Any, args: Namespace) -> None: + logger.info("Running psycopg sync") + + if args.create: + logger.info(f"inserting {args.ntests} test records") + with psycopg.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + cursor.execute(table) + cursor.executemany(insert, data) + conn.commit() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + with psycopg.connect(args.dsn) as conn: + with time_log("psycopg"): + for id_ in to_query: + with conn.cursor() as cursor: + cursor.execute(select, {"id": id_}) + cursor.fetchall() + # conn.rollback() + + if args.drop: + logger.info("dropping test records") + with psycopg.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + conn.commit() + + +async def run_psycopg_async(psycopg: Any, args: Namespace) -> None: + logger.info("Running psycopg async") + + conn: Any + + if args.create: + logger.info(f"inserting {args.ntests} test records") + async with await psycopg.AsyncConnection.connect(args.dsn) as conn: + async with conn.cursor() as cursor: + await cursor.execute(drop) + await cursor.execute(table) + await cursor.executemany(insert, data) + await conn.commit() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + async with await psycopg.AsyncConnection.connect(args.dsn) as conn: + with time_log("psycopg_async"): + for id_ in to_query: + cursor = await conn.execute(select, {"id": id_}) + await cursor.fetchall() + await cursor.close() + # await conn.rollback() + + if args.drop: + logger.info("dropping test records") + async with await psycopg.AsyncConnection.connect(args.dsn) as conn: + async with conn.cursor() as cursor: + await cursor.execute(drop) + await conn.commit() + + +async def run_asyncpg(asyncpg: Any, args: Namespace) -> None: + logger.info("Running asyncpg") + + places = dict(id="$1", name="$2", description="$3", q="$4", p="$5", x="$6", y="$7") + a_insert = insert % places + a_select = select % {"id": "$1"} + + conn: Any + + if args.create: + logger.info(f"inserting {args.ntests} test records") + conn = await asyncpg.connect(args.dsn) + async with conn.transaction(): + await conn.execute(drop) + await conn.execute(table) + await conn.executemany(a_insert, [tuple(d.values()) for d in data]) + await conn.close() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + conn = await asyncpg.connect(args.dsn) + with time_log("asyncpg"): + for id_ in to_query: + tr = conn.transaction() + await tr.start() + await conn.fetch(a_select, id_) + # await tr.rollback() + await conn.close() + + if args.drop: + logger.info("dropping test records") + conn = await asyncpg.connect(args.dsn) + async with conn.transaction(): + await conn.execute(drop) + await conn.close() + + +def parse_cmdline() -> Namespace: + parser = ArgumentParser(description=__doc__) + parser.add_argument( + "drivers", + nargs="+", + metavar="DRIVER", + type=Driver, + help=f"the drivers to test [choices: {', '.join(d.value for d in Driver)}]", + ) + + parser.add_argument( + "--ntests", + type=int, + default=10_000, + help="number of tests to perform [default: %(default)s]", + ) + + parser.add_argument( + "--dsn", + default=os.environ.get("PSYCOPG_TEST_DSN", ""), + help="database connection string" + " [default: %(default)r (from PSYCOPG_TEST_DSN env var)]", + ) + + parser.add_argument( + "--no-create", + dest="create", + action="store_false", + default="True", + help="skip data creation before tests (it must exist already)", + ) + + parser.add_argument( + "--no-drop", + dest="drop", + action="store_false", + default="True", + help="skip data drop after tests", + ) + + opt = parser.parse_args() + + return opt + + +if __name__ == "__main__": + main() diff --git a/tests/scripts/dectest.py b/tests/scripts/dectest.py new file mode 100644 index 0000000..a49f116 --- /dev/null +++ b/tests/scripts/dectest.py @@ -0,0 +1,51 @@ +""" +A quick and rough performance comparison of text vs. binary Decimal adaptation +""" +from random import randrange +from decimal import Decimal +import psycopg +from psycopg import sql + +ncols = 10 +nrows = 500000 +format = psycopg.pq.Format.BINARY +test = "copy" + + +def main() -> None: + cnn = psycopg.connect() + + cnn.execute( + sql.SQL("create table testdec ({})").format( + sql.SQL(", ").join( + [ + sql.SQL("{} numeric(10,2)").format(sql.Identifier(f"t{i}")) + for i in range(ncols) + ] + ) + ) + ) + cur = cnn.cursor() + + if test == "copy": + with cur.copy(f"copy testdec from stdin (format {format.name})") as copy: + for j in range(nrows): + copy.write_row( + [Decimal(randrange(10000000000)) / 100 for i in range(ncols)] + ) + + elif test == "insert": + ph = ["%t", "%b"][format] + cur.executemany( + "insert into testdec values (%s)" % ", ".join([ph] * ncols), + ( + [Decimal(randrange(10000000000)) / 100 for i in range(ncols)] + for j in range(nrows) + ), + ) + else: + raise Exception(f"bad test: {test}") + + +if __name__ == "__main__": + main() diff --git a/tests/scripts/pipeline-demo.py b/tests/scripts/pipeline-demo.py new file mode 100644 index 0000000..ec95229 --- /dev/null +++ b/tests/scripts/pipeline-demo.py @@ -0,0 +1,340 @@ +"""Pipeline mode demo + +This reproduces libpq_pipeline::pipelined_insert PostgreSQL test at +src/test/modules/libpq_pipeline/libpq_pipeline.c::test_pipelined_insert(). + +We do not fetch results explicitly (using cursor.fetch*()), this is +handled by execute() calls when pgconn socket is read-ready, which +happens when the output buffer is full. +""" +import argparse +import asyncio +import logging +from contextlib import contextmanager +from functools import partial +from typing import Any, Iterator, Optional, Sequence, Tuple + +from psycopg import AsyncConnection, Connection +from psycopg import pq, waiting +from psycopg import errors as e +from psycopg.abc import PipelineCommand +from psycopg.generators import pipeline_communicate +from psycopg.pq import Format, DiagnosticField +from psycopg._compat import Deque + +psycopg_logger = logging.getLogger("psycopg") +pipeline_logger = logging.getLogger("pipeline") +args: argparse.Namespace + + +class LoggingPGconn: + """Wrapper for PGconn that logs fetched results.""" + + def __init__(self, pgconn: pq.abc.PGconn, logger: logging.Logger): + self._pgconn = pgconn + self._logger = logger + + def log_notice(result: pq.abc.PGresult) -> None: + def get_field(field: DiagnosticField) -> Optional[str]: + value = result.error_field(field) + return value.decode("utf-8", "replace") if value else None + + logger.info( + "notice %s %s", + get_field(DiagnosticField.SEVERITY), + get_field(DiagnosticField.MESSAGE_PRIMARY), + ) + + pgconn.notice_handler = log_notice + + if args.trace: + self._trace_file = open(args.trace, "w") + pgconn.trace(self._trace_file.fileno()) + + def __del__(self) -> None: + if hasattr(self, "_trace_file"): + self._pgconn.untrace() + self._trace_file.close() + + def __getattr__(self, name: str) -> Any: + return getattr(self._pgconn, name) + + def send_query(self, command: bytes) -> None: + self._logger.warning("PQsendQuery broken in libpq 14.5") + self._pgconn.send_query(command) + self._logger.info("sent %s", command.decode()) + + def send_query_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[int]] = None, + result_format: int = Format.TEXT, + ) -> None: + self._pgconn.send_query_params( + command, param_values, param_types, param_formats, result_format + ) + self._logger.info("sent %s", command.decode()) + + def send_query_prepared( + self, + name: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_formats: Optional[Sequence[int]] = None, + result_format: int = Format.TEXT, + ) -> None: + self._pgconn.send_query_prepared( + name, param_values, param_formats, result_format + ) + self._logger.info("sent prepared '%s' with %s", name.decode(), param_values) + + def send_prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> None: + self._pgconn.send_prepare(name, command, param_types) + self._logger.info("prepare %s as '%s'", command.decode(), name.decode()) + + def get_result(self) -> Optional[pq.abc.PGresult]: + r = self._pgconn.get_result() + if r is not None: + self._logger.info("got %s result", pq.ExecStatus(r.status).name) + return r + + +@contextmanager +def prepare_pipeline_demo_pq( + pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger +) -> Iterator[Tuple[Deque[PipelineCommand], Deque[str]]]: + """Set up pipeline demo with initial queries and yield commands and + results queue for pipeline_communicate(). + """ + logger.debug("enter pipeline") + pgconn.enter_pipeline_mode() + + setup_queries = [ + ("begin", "BEGIN TRANSACTION"), + ("drop table", "DROP TABLE IF EXISTS pq_pipeline_demo"), + ( + "create table", + ( + "CREATE UNLOGGED TABLE pq_pipeline_demo(" + " id serial primary key," + " itemno integer," + " int8filler int8" + ")" + ), + ), + ( + "prepare", + ("INSERT INTO pq_pipeline_demo(itemno, int8filler)" " VALUES ($1, $2)"), + ), + ] + + commands = Deque[PipelineCommand]() + results_queue = Deque[str]() + + for qname, query in setup_queries: + if qname == "prepare": + pgconn.send_prepare(qname.encode(), query.encode()) + else: + pgconn.send_query_params(query.encode(), None) + results_queue.append(qname) + + committed = False + synced = False + + while True: + if rows_to_send: + params = [f"{rows_to_send}".encode(), f"{1 << 62}".encode()] + commands.append(partial(pgconn.send_query_prepared, b"prepare", params)) + results_queue.append(f"row {rows_to_send}") + rows_to_send -= 1 + + elif not committed: + committed = True + commands.append(partial(pgconn.send_query_params, b"COMMIT", None)) + results_queue.append("commit") + + elif not synced: + + def sync() -> None: + pgconn.pipeline_sync() + logger.info("pipeline sync sent") + + synced = True + commands.append(sync) + results_queue.append("sync") + + else: + break + + try: + yield commands, results_queue + finally: + logger.debug("exit pipeline") + pgconn.exit_pipeline_mode() + + +def pipeline_demo_pq(rows_to_send: int, logger: logging.Logger) -> None: + pgconn = LoggingPGconn(Connection.connect().pgconn, logger) + with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as ( + commands, + results_queue, + ): + while results_queue: + fetched = waiting.wait( + pipeline_communicate( + pgconn, # type: ignore[arg-type] + commands, + ), + pgconn.socket, + ) + assert not commands, commands + for results in fetched: + results_queue.popleft() + for r in results: + if r.status in ( + pq.ExecStatus.FATAL_ERROR, + pq.ExecStatus.PIPELINE_ABORTED, + ): + raise e.error_from_result(r) + + +async def pipeline_demo_pq_async(rows_to_send: int, logger: logging.Logger) -> None: + pgconn = LoggingPGconn((await AsyncConnection.connect()).pgconn, logger) + + with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as ( + commands, + results_queue, + ): + while results_queue: + fetched = await waiting.wait_async( + pipeline_communicate( + pgconn, # type: ignore[arg-type] + commands, + ), + pgconn.socket, + ) + assert not commands, commands + for results in fetched: + results_queue.popleft() + for r in results: + if r.status in ( + pq.ExecStatus.FATAL_ERROR, + pq.ExecStatus.PIPELINE_ABORTED, + ): + raise e.error_from_result(r) + + +def pipeline_demo(rows_to_send: int, many: bool, logger: logging.Logger) -> None: + """Pipeline demo using sync API.""" + conn = Connection.connect() + conn.autocommit = True + conn.pgconn = LoggingPGconn(conn.pgconn, logger) # type: ignore[assignment] + with conn.pipeline(): + with conn.transaction(): + conn.execute("DROP TABLE IF EXISTS pq_pipeline_demo") + conn.execute( + "CREATE UNLOGGED TABLE pq_pipeline_demo(" + " id serial primary key," + " itemno integer," + " int8filler int8" + ")" + ) + query = "INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES (%s, %s)" + params = ((r, 1 << 62) for r in range(rows_to_send, 0, -1)) + if many: + cur = conn.cursor() + cur.executemany(query, list(params)) + else: + for p in params: + conn.execute(query, p) + + +async def pipeline_demo_async( + rows_to_send: int, many: bool, logger: logging.Logger +) -> None: + """Pipeline demo using async API.""" + aconn = await AsyncConnection.connect() + await aconn.set_autocommit(True) + aconn.pgconn = LoggingPGconn(aconn.pgconn, logger) # type: ignore[assignment] + async with aconn.pipeline(): + async with aconn.transaction(): + await aconn.execute("DROP TABLE IF EXISTS pq_pipeline_demo") + await aconn.execute( + "CREATE UNLOGGED TABLE pq_pipeline_demo(" + " id serial primary key," + " itemno integer," + " int8filler int8" + ")" + ) + query = "INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES (%s, %s)" + params = ((r, 1 << 62) for r in range(rows_to_send, 0, -1)) + if many: + cur = aconn.cursor() + await cur.executemany(query, list(params)) + else: + for p in params: + await aconn.execute(query, p) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", + dest="nrows", + metavar="ROWS", + default=10_000, + type=int, + help="number of rows to insert", + ) + parser.add_argument( + "--pq", action="store_true", help="use low-level psycopg.pq API" + ) + parser.add_argument( + "--async", dest="async_", action="store_true", help="use async API" + ) + parser.add_argument( + "--many", + action="store_true", + help="use executemany() (not applicable for --pq)", + ) + parser.add_argument("--trace", help="write trace info into TRACE file") + parser.add_argument("-l", "--log", help="log file (stderr by default)") + + global args + args = parser.parse_args() + + psycopg_logger.setLevel(logging.DEBUG) + pipeline_logger.setLevel(logging.DEBUG) + if args.log: + psycopg_logger.addHandler(logging.FileHandler(args.log)) + pipeline_logger.addHandler(logging.FileHandler(args.log)) + else: + psycopg_logger.addHandler(logging.StreamHandler()) + pipeline_logger.addHandler(logging.StreamHandler()) + + if args.pq: + if args.many: + parser.error("--many cannot be used with --pq") + if args.async_: + asyncio.run(pipeline_demo_pq_async(args.nrows, pipeline_logger)) + else: + pipeline_demo_pq(args.nrows, pipeline_logger) + else: + if pq.__impl__ != "python": + parser.error( + "only supported for Python implementation (set PSYCOPG_IMPL=python)" + ) + if args.async_: + asyncio.run(pipeline_demo_async(args.nrows, args.many, pipeline_logger)) + else: + pipeline_demo(args.nrows, args.many, pipeline_logger) + + +if __name__ == "__main__": + main() diff --git a/tests/scripts/spiketest.py b/tests/scripts/spiketest.py new file mode 100644 index 0000000..2c9cc16 --- /dev/null +++ b/tests/scripts/spiketest.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +""" +Run a connection pool spike test. + +The test is inspired to the `spike analysis`__ illustrated by HikariCP + +.. __: https://github.com/brettwooldridge/HikariCP/blob/dev/documents/ + Welcome-To-The-Jungle.md + +""" +# mypy: allow-untyped-defs +# mypy: allow-untyped-calls + +import time +import threading + +import psycopg +import psycopg_pool +from psycopg.rows import Row + +import logging + + +def main() -> None: + opt = parse_cmdline() + if opt.loglevel: + loglevel = getattr(logging, opt.loglevel.upper()) + logging.basicConfig( + level=loglevel, format="%(asctime)s %(levelname)s %(message)s" + ) + + logging.getLogger("psycopg2.pool").setLevel(loglevel) + + with psycopg_pool.ConnectionPool( + opt.dsn, + min_size=opt.min_size, + max_size=opt.max_size, + connection_class=DelayedConnection, + kwargs={"conn_delay": 0.150}, + ) as pool: + pool.wait() + measurer = Measurer(pool) + + # Create and start all the thread: they will get stuck on the event + ev = threading.Event() + threads = [ + threading.Thread(target=worker, args=(pool, 0.002, ev), daemon=True) + for i in range(opt.num_clients) + ] + for t in threads: + t.start() + time.sleep(0.2) + + # Release the threads! + measurer.start(0.00025) + t0 = time.time() + ev.set() + + # Wait for the threads to finish + for t in threads: + t.join() + t1 = time.time() + measurer.stop() + + print(f"time: {(t1 - t0) * 1000} msec") + print("active,idle,total,waiting") + recs = [ + f'{m["pool_size"] - m["pool_available"]}' + f',{m["pool_available"]}' + f',{m["pool_size"]}' + f',{m["requests_waiting"]}' + for m in measurer.measures + ] + print("\n".join(recs)) + + +def worker(p, t, ev): + ev.wait() + with p.connection(): + time.sleep(t) + + +class Measurer: + def __init__(self, pool): + self.pool = pool + self.worker = None + self.stopped = False + self.measures = [] + + def start(self, interval): + self.worker = threading.Thread(target=self._run, args=(interval,), daemon=True) + self.worker.start() + + def stop(self): + self.stopped = True + if self.worker: + self.worker.join() + self.worker = None + + def _run(self, interval): + while not self.stopped: + self.measures.append(self.pool.get_stats()) + time.sleep(interval) + + +class DelayedConnection(psycopg.Connection[Row]): + """A connection adding a delay to the connection time.""" + + @classmethod + def connect(cls, conninfo, conn_delay=0, **kwargs): + t0 = time.time() + conn = super().connect(conninfo, **kwargs) + t1 = time.time() + wait = max(0.0, conn_delay - (t1 - t0)) + if wait: + time.sleep(wait) + return conn + + +def parse_cmdline(): + from argparse import ArgumentParser + + parser = ArgumentParser(description=__doc__) + parser.add_argument("--dsn", default="", help="connection string to the database") + parser.add_argument( + "--min_size", + default=5, + type=int, + help="minimum number of connections in the pool", + ) + parser.add_argument( + "--max_size", + default=20, + type=int, + help="maximum number of connections in the pool", + ) + parser.add_argument( + "--num-clients", + default=50, + type=int, + help="number of threads making a request", + ) + parser.add_argument( + "--loglevel", + default=None, + choices=("DEBUG", "INFO", "WARNING", "ERROR"), + help="level to log at [default: no log]", + ) + + opt = parser.parse_args() + + return opt + + +if __name__ == "__main__": + main() |