diff options
Diffstat (limited to 'tests/scripts/bench-411.py')
-rw-r--r-- | tests/scripts/bench-411.py | 300 |
1 files changed, 300 insertions, 0 deletions
diff --git a/tests/scripts/bench-411.py b/tests/scripts/bench-411.py new file mode 100644 index 0000000..82ea451 --- /dev/null +++ b/tests/scripts/bench-411.py @@ -0,0 +1,300 @@ +import os +import sys +import time +import random +import asyncio +import logging +from enum import Enum +from typing import Any, Dict, List, Generator +from argparse import ArgumentParser, Namespace +from contextlib import contextmanager + +logger = logging.getLogger() +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", +) + + +class Driver(str, Enum): + psycopg2 = "psycopg2" + psycopg = "psycopg" + psycopg_async = "psycopg_async" + asyncpg = "asyncpg" + + +ids: List[int] = [] +data: List[Dict[str, Any]] = [] + + +def main() -> None: + + args = parse_cmdline() + + ids[:] = range(args.ntests) + data[:] = [ + dict( + id=i, + name="c%d" % i, + description="c%d" % i, + q=i * 10, + p=i * 20, + x=i * 30, + y=i * 40, + ) + for i in ids + ] + + # Must be done just on end + drop_at_the_end = args.drop + args.drop = False + + for i, name in enumerate(args.drivers): + if i == len(args.drivers) - 1: + args.drop = drop_at_the_end + + if name == Driver.psycopg2: + import psycopg2 # type: ignore + + run_psycopg2(psycopg2, args) + + elif name == Driver.psycopg: + import psycopg + + run_psycopg(psycopg, args) + + elif name == Driver.psycopg_async: + import psycopg + + if sys.platform == "win32": + if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + asyncio.set_event_loop_policy( + asyncio.WindowsSelectorEventLoopPolicy() + ) + + asyncio.run(run_psycopg_async(psycopg, args)) + + elif name == Driver.asyncpg: + import asyncpg # type: ignore + + asyncio.run(run_asyncpg(asyncpg, args)) + + else: + raise AssertionError(f"unknown driver: {name!r}") + + # Must be done just on start + args.create = False + + +table = """ +CREATE TABLE customer ( + id SERIAL NOT NULL, + name VARCHAR(255), + description VARCHAR(255), + q INTEGER, + p INTEGER, + x INTEGER, + y INTEGER, + z INTEGER, + PRIMARY KEY (id) +) +""" +drop = "DROP TABLE IF EXISTS customer" + +insert = """ +INSERT INTO customer (id, name, description, q, p, x, y) VALUES +(%(id)s, %(name)s, %(description)s, %(q)s, %(p)s, %(x)s, %(y)s) +""" + +select = """ +SELECT customer.id, customer.name, customer.description, customer.q, + customer.p, customer.x, customer.y, customer.z +FROM customer +WHERE customer.id = %(id)s +""" + + +@contextmanager +def time_log(message: str) -> Generator[None, None, None]: + start = time.monotonic() + yield + end = time.monotonic() + logger.info(f"Run {message} in {end-start} s") + + +def run_psycopg2(psycopg2: Any, args: Namespace) -> None: + logger.info("Running psycopg2") + + if args.create: + logger.info(f"inserting {args.ntests} test records") + with psycopg2.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + cursor.execute(table) + cursor.executemany(insert, data) + conn.commit() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + with psycopg2.connect(args.dsn) as conn: + with time_log("psycopg2"): + for id_ in to_query: + with conn.cursor() as cursor: + cursor.execute(select, {"id": id_}) + cursor.fetchall() + # conn.rollback() + + if args.drop: + logger.info("dropping test records") + with psycopg2.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + conn.commit() + + +def run_psycopg(psycopg: Any, args: Namespace) -> None: + logger.info("Running psycopg sync") + + if args.create: + logger.info(f"inserting {args.ntests} test records") + with psycopg.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + cursor.execute(table) + cursor.executemany(insert, data) + conn.commit() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + with psycopg.connect(args.dsn) as conn: + with time_log("psycopg"): + for id_ in to_query: + with conn.cursor() as cursor: + cursor.execute(select, {"id": id_}) + cursor.fetchall() + # conn.rollback() + + if args.drop: + logger.info("dropping test records") + with psycopg.connect(args.dsn) as conn: + with conn.cursor() as cursor: + cursor.execute(drop) + conn.commit() + + +async def run_psycopg_async(psycopg: Any, args: Namespace) -> None: + logger.info("Running psycopg async") + + conn: Any + + if args.create: + logger.info(f"inserting {args.ntests} test records") + async with await psycopg.AsyncConnection.connect(args.dsn) as conn: + async with conn.cursor() as cursor: + await cursor.execute(drop) + await cursor.execute(table) + await cursor.executemany(insert, data) + await conn.commit() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + async with await psycopg.AsyncConnection.connect(args.dsn) as conn: + with time_log("psycopg_async"): + for id_ in to_query: + cursor = await conn.execute(select, {"id": id_}) + await cursor.fetchall() + await cursor.close() + # await conn.rollback() + + if args.drop: + logger.info("dropping test records") + async with await psycopg.AsyncConnection.connect(args.dsn) as conn: + async with conn.cursor() as cursor: + await cursor.execute(drop) + await conn.commit() + + +async def run_asyncpg(asyncpg: Any, args: Namespace) -> None: + logger.info("Running asyncpg") + + places = dict(id="$1", name="$2", description="$3", q="$4", p="$5", x="$6", y="$7") + a_insert = insert % places + a_select = select % {"id": "$1"} + + conn: Any + + if args.create: + logger.info(f"inserting {args.ntests} test records") + conn = await asyncpg.connect(args.dsn) + async with conn.transaction(): + await conn.execute(drop) + await conn.execute(table) + await conn.executemany(a_insert, [tuple(d.values()) for d in data]) + await conn.close() + + logger.info(f"running {args.ntests} queries") + to_query = random.choices(ids, k=args.ntests) + conn = await asyncpg.connect(args.dsn) + with time_log("asyncpg"): + for id_ in to_query: + tr = conn.transaction() + await tr.start() + await conn.fetch(a_select, id_) + # await tr.rollback() + await conn.close() + + if args.drop: + logger.info("dropping test records") + conn = await asyncpg.connect(args.dsn) + async with conn.transaction(): + await conn.execute(drop) + await conn.close() + + +def parse_cmdline() -> Namespace: + parser = ArgumentParser(description=__doc__) + parser.add_argument( + "drivers", + nargs="+", + metavar="DRIVER", + type=Driver, + help=f"the drivers to test [choices: {', '.join(d.value for d in Driver)}]", + ) + + parser.add_argument( + "--ntests", + type=int, + default=10_000, + help="number of tests to perform [default: %(default)s]", + ) + + parser.add_argument( + "--dsn", + default=os.environ.get("PSYCOPG_TEST_DSN", ""), + help="database connection string" + " [default: %(default)r (from PSYCOPG_TEST_DSN env var)]", + ) + + parser.add_argument( + "--no-create", + dest="create", + action="store_false", + default="True", + help="skip data creation before tests (it must exist already)", + ) + + parser.add_argument( + "--no-drop", + dest="drop", + action="store_false", + default="True", + help="skip data drop after tests", + ) + + opt = parser.parse_args() + + return opt + + +if __name__ == "__main__": + main() |