summaryrefslogtreecommitdiffstats
path: root/tests/test_client_cursor.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client_cursor.py')
-rw-r--r--tests/test_client_cursor.py855
1 files changed, 855 insertions, 0 deletions
diff --git a/tests/test_client_cursor.py b/tests/test_client_cursor.py
new file mode 100644
index 0000000..b355604
--- /dev/null
+++ b/tests/test_client_cursor.py
@@ -0,0 +1,855 @@
+import pickle
+import weakref
+import datetime as dt
+from typing import List
+
+import pytest
+
+import psycopg
+from psycopg import sql, rows
+from psycopg.adapt import PyFormat
+from psycopg.postgres import types as builtins
+
+from .utils import gc_collect, gc_count
+from .test_cursor import my_row_factory
+from .fix_crdb import is_crdb, crdb_encoding, crdb_time_precision
+
+
+@pytest.fixture
+def conn(conn):
+ conn.cursor_factory = psycopg.ClientCursor
+ return conn
+
+
+def test_init(conn):
+ cur = psycopg.ClientCursor(conn)
+ cur.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+ conn.row_factory = rows.dict_row
+ cur = psycopg.ClientCursor(conn)
+ cur.execute("select 1 as a")
+ assert cur.fetchone() == {"a": 1}
+
+
+def test_init_factory(conn):
+ cur = psycopg.ClientCursor(conn, row_factory=rows.dict_row)
+ cur.execute("select 1 as a")
+ assert cur.fetchone() == {"a": 1}
+
+
+def test_from_cursor_factory(conn_cls, dsn):
+ with conn_cls.connect(dsn, cursor_factory=psycopg.ClientCursor) as conn:
+ cur = conn.cursor()
+ assert type(cur) is psycopg.ClientCursor
+
+ cur.execute("select %s", (1,))
+ assert cur.fetchone() == (1,)
+ assert cur._query
+ assert cur._query.query == b"select 1"
+
+
+def test_close(conn):
+ cur = conn.cursor()
+ assert not cur.closed
+ cur.close()
+ assert cur.closed
+
+ with pytest.raises(psycopg.InterfaceError):
+ cur.execute("select 'foo'")
+
+ cur.close()
+ assert cur.closed
+
+
+def test_cursor_close_fetchone(conn):
+ cur = conn.cursor()
+ assert not cur.closed
+
+ query = "select * from generate_series(1, 10)"
+ cur.execute(query)
+ for _ in range(5):
+ cur.fetchone()
+
+ cur.close()
+ assert cur.closed
+
+ with pytest.raises(psycopg.InterfaceError):
+ cur.fetchone()
+
+
+def test_cursor_close_fetchmany(conn):
+ cur = conn.cursor()
+ assert not cur.closed
+
+ query = "select * from generate_series(1, 10)"
+ cur.execute(query)
+ assert len(cur.fetchmany(2)) == 2
+
+ cur.close()
+ assert cur.closed
+
+ with pytest.raises(psycopg.InterfaceError):
+ cur.fetchmany(2)
+
+
+def test_cursor_close_fetchall(conn):
+ cur = conn.cursor()
+ assert not cur.closed
+
+ query = "select * from generate_series(1, 10)"
+ cur.execute(query)
+ assert len(cur.fetchall()) == 10
+
+ cur.close()
+ assert cur.closed
+
+ with pytest.raises(psycopg.InterfaceError):
+ cur.fetchall()
+
+
+def test_context(conn):
+ with conn.cursor() as cur:
+ assert not cur.closed
+
+ assert cur.closed
+
+
+@pytest.mark.slow
+def test_weakref(conn):
+ cur = conn.cursor()
+ w = weakref.ref(cur)
+ cur.close()
+ del cur
+ gc_collect()
+ assert w() is None
+
+
+def test_pgresult(conn):
+ cur = conn.cursor()
+ cur.execute("select 1")
+ assert cur.pgresult
+ cur.close()
+ assert not cur.pgresult
+
+
+def test_statusmessage(conn):
+ cur = conn.cursor()
+ assert cur.statusmessage is None
+
+ cur.execute("select generate_series(1, 10)")
+ assert cur.statusmessage == "SELECT 10"
+
+ cur.execute("create table statusmessage ()")
+ assert cur.statusmessage == "CREATE TABLE"
+
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.execute("wat")
+ assert cur.statusmessage is None
+
+
+def test_execute_sql(conn):
+ cur = conn.cursor()
+ cur.execute(sql.SQL("select {value}").format(value="hello"))
+ assert cur.fetchone() == ("hello",)
+
+
+def test_execute_many_results(conn):
+ cur = conn.cursor()
+ assert cur.nextset() is None
+
+ rv = cur.execute("select %s; select generate_series(1,%s)", ("foo", 3))
+ assert rv is cur
+ assert cur.fetchall() == [("foo",)]
+ assert cur.rowcount == 1
+ assert cur.nextset()
+ assert cur.fetchall() == [(1,), (2,), (3,)]
+ assert cur.nextset() is None
+
+ cur.close()
+ assert cur.nextset() is None
+
+
+def test_execute_sequence(conn):
+ cur = conn.cursor()
+ rv = cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+ assert rv is cur
+ assert len(cur._results) == 1
+ assert cur.pgresult.get_value(0, 0) == b"1"
+ assert cur.pgresult.get_value(0, 1) == b"foo"
+ assert cur.pgresult.get_value(0, 2) is None
+ assert cur.nextset() is None
+
+
+@pytest.mark.parametrize("query", ["", " ", ";"])
+def test_execute_empty_query(conn, query):
+ cur = conn.cursor()
+ cur.execute(query)
+ assert cur.pgresult.status == cur.ExecStatus.EMPTY_QUERY
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.fetchone()
+
+
+def test_execute_type_change(conn):
+ # issue #112
+ conn.execute("create table bug_112 (num integer)")
+ sql = "insert into bug_112 (num) values (%s)"
+ cur = conn.cursor()
+ cur.execute(sql, (1,))
+ cur.execute(sql, (100_000,))
+ cur.execute("select num from bug_112 order by num")
+ assert cur.fetchall() == [(1,), (100_000,)]
+
+
+def test_executemany_type_change(conn):
+ conn.execute("create table bug_112 (num integer)")
+ sql = "insert into bug_112 (num) values (%s)"
+ cur = conn.cursor()
+ cur.executemany(sql, [(1,), (100_000,)])
+ cur.execute("select num from bug_112 order by num")
+ assert cur.fetchall() == [(1,), (100_000,)]
+
+
+@pytest.mark.parametrize(
+ "query", ["copy testcopy from stdin", "copy testcopy to stdout"]
+)
+def test_execute_copy(conn, query):
+ cur = conn.cursor()
+ cur.execute("create table testcopy (id int)")
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.execute(query)
+
+
+def test_fetchone(conn):
+ cur = conn.cursor()
+ cur.execute("select %s::int, %s::text, %s::text", [1, "foo", None])
+ assert cur.pgresult.fformat(0) == 0
+
+ row = cur.fetchone()
+ assert row == (1, "foo", None)
+ row = cur.fetchone()
+ assert row is None
+
+
+def test_binary_cursor_execute(conn):
+ with pytest.raises(psycopg.NotSupportedError):
+ cur = conn.cursor(binary=True)
+ cur.execute("select %s, %s", [1, None])
+
+
+def test_execute_binary(conn):
+ with pytest.raises(psycopg.NotSupportedError):
+ cur = conn.cursor()
+ cur.execute("select %s, %s", [1, None], binary=True)
+
+
+def test_binary_cursor_text_override(conn):
+ cur = conn.cursor(binary=True)
+ cur.execute("select %s, %s", [1, None], binary=False)
+ assert cur.fetchone() == (1, None)
+ assert cur.pgresult.fformat(0) == 0
+ assert cur.pgresult.get_value(0, 0) == b"1"
+
+
+@pytest.mark.parametrize("encoding", ["utf8", crdb_encoding("latin9")])
+def test_query_encode(conn, encoding):
+ conn.execute(f"set client_encoding to {encoding}")
+ cur = conn.cursor()
+ (res,) = cur.execute("select '\u20ac'").fetchone()
+ assert res == "\u20ac"
+
+
+@pytest.mark.parametrize("encoding", [crdb_encoding("latin1")])
+def test_query_badenc(conn, encoding):
+ conn.execute(f"set client_encoding to {encoding}")
+ cur = conn.cursor()
+ with pytest.raises(UnicodeEncodeError):
+ cur.execute("select '\u20ac'")
+
+
+@pytest.fixture(scope="session")
+def _execmany(svcconn):
+ cur = svcconn.cursor()
+ cur.execute(
+ """
+ drop table if exists execmany;
+ create table execmany (id serial primary key, num integer, data text)
+ """
+ )
+
+
+@pytest.fixture(scope="function")
+def execmany(svcconn, _execmany):
+ cur = svcconn.cursor()
+ cur.execute("truncate table execmany")
+
+
+def test_executemany(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany(
+ "insert into execmany(num, data) values (%s, %s)",
+ [(10, "hello"), (20, "world")],
+ )
+ cur.execute("select num, data from execmany order by 1")
+ assert cur.fetchall() == [(10, "hello"), (20, "world")]
+
+
+def test_executemany_name(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany(
+ "insert into execmany(num, data) values (%(num)s, %(data)s)",
+ [{"num": 11, "data": "hello", "x": 1}, {"num": 21, "data": "world"}],
+ )
+ cur.execute("select num, data from execmany order by 1")
+ assert cur.fetchall() == [(11, "hello"), (21, "world")]
+
+
+def test_executemany_no_data(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany("insert into execmany(num, data) values (%s, %s)", [])
+ assert cur.rowcount == 0
+
+
+def test_executemany_rowcount(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany(
+ "insert into execmany(num, data) values (%s, %s)",
+ [(10, "hello"), (20, "world")],
+ )
+ assert cur.rowcount == 2
+
+
+def test_executemany_returning(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany(
+ "insert into execmany(num, data) values (%s, %s) returning num",
+ [(10, "hello"), (20, "world")],
+ returning=True,
+ )
+ assert cur.rowcount == 2
+ assert cur.fetchone() == (10,)
+ assert cur.nextset()
+ assert cur.fetchone() == (20,)
+ assert cur.nextset() is None
+
+
+def test_executemany_returning_discard(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany(
+ "insert into execmany(num, data) values (%s, %s) returning num",
+ [(10, "hello"), (20, "world")],
+ )
+ assert cur.rowcount == 2
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.fetchone()
+ assert cur.nextset() is None
+
+
+def test_executemany_no_result(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany(
+ "insert into execmany(num, data) values (%s, %s)",
+ [(10, "hello"), (20, "world")],
+ returning=True,
+ )
+ assert cur.rowcount == 2
+ assert cur.statusmessage.startswith("INSERT")
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.fetchone()
+ pgresult = cur.pgresult
+ assert cur.nextset()
+ assert cur.statusmessage.startswith("INSERT")
+ assert pgresult is not cur.pgresult
+ assert cur.nextset() is None
+
+
+def test_executemany_rowcount_no_hit(conn, execmany):
+ cur = conn.cursor()
+ cur.executemany("delete from execmany where id = %s", [(-1,), (-2,)])
+ assert cur.rowcount == 0
+ cur.executemany("delete from execmany where id = %s", [])
+ assert cur.rowcount == 0
+ cur.executemany("delete from execmany where id = %s returning num", [(-1,), (-2,)])
+ assert cur.rowcount == 0
+
+
+@pytest.mark.parametrize(
+ "query",
+ [
+ "insert into nosuchtable values (%s, %s)",
+ # This fails, but only because we try to copy in pipeline mode,
+ # crashing the connection. Which would be even fine, but with
+ # the async cursor it's worse... See test_client_cursor_async.py.
+ # "copy (select %s, %s) to stdout",
+ "wat (%s, %s)",
+ ],
+)
+def test_executemany_badquery(conn, query):
+ cur = conn.cursor()
+ with pytest.raises(psycopg.DatabaseError):
+ cur.executemany(query, [(10, "hello"), (20, "world")])
+
+
+@pytest.mark.parametrize("fmt_in", PyFormat)
+def test_executemany_null_first(conn, fmt_in):
+ cur = conn.cursor()
+ cur.execute("create table testmany (a bigint, b bigint)")
+ cur.executemany(
+ f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+ [[1, None], [3, 4]],
+ )
+ with pytest.raises((psycopg.DataError, psycopg.ProgrammingError)):
+ cur.executemany(
+ f"insert into testmany values (%{fmt_in.value}, %{fmt_in.value})",
+ [[1, ""], [3, 4]],
+ )
+
+
+def test_rowcount(conn):
+ cur = conn.cursor()
+
+ cur.execute("select 1 from generate_series(1, 0)")
+ assert cur.rowcount == 0
+
+ cur.execute("select 1 from generate_series(1, 42)")
+ assert cur.rowcount == 42
+
+ cur.execute("create table test_rowcount_notuples (id int primary key)")
+ assert cur.rowcount == -1
+
+ cur.execute("insert into test_rowcount_notuples select generate_series(1, 42)")
+ assert cur.rowcount == 42
+
+
+def test_rownumber(conn):
+ cur = conn.cursor()
+ assert cur.rownumber is None
+
+ cur.execute("select 1 from generate_series(1, 42)")
+ assert cur.rownumber == 0
+
+ cur.fetchone()
+ assert cur.rownumber == 1
+ cur.fetchone()
+ assert cur.rownumber == 2
+ cur.fetchmany(10)
+ assert cur.rownumber == 12
+ rns: List[int] = []
+ for i in cur:
+ assert cur.rownumber
+ rns.append(cur.rownumber)
+ if len(rns) >= 3:
+ break
+ assert rns == [13, 14, 15]
+ assert len(cur.fetchall()) == 42 - rns[-1]
+ assert cur.rownumber == 42
+
+
+def test_iter(conn):
+ cur = conn.cursor()
+ cur.execute("select generate_series(1, 3)")
+ assert list(cur) == [(1,), (2,), (3,)]
+
+
+def test_iter_stop(conn):
+ cur = conn.cursor()
+ cur.execute("select generate_series(1, 3)")
+ for rec in cur:
+ assert rec == (1,)
+ break
+
+ for rec in cur:
+ assert rec == (2,)
+ break
+
+ assert cur.fetchone() == (3,)
+ assert list(cur) == []
+
+
+def test_row_factory(conn):
+ cur = conn.cursor(row_factory=my_row_factory)
+
+ cur.execute("reset search_path")
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.fetchone()
+
+ cur.execute("select 'foo' as bar")
+ (r,) = cur.fetchone()
+ assert r == "FOObar"
+
+ cur.execute("select 'x' as x; select 'y' as y, 'z' as z")
+ assert cur.fetchall() == [["Xx"]]
+ assert cur.nextset()
+ assert cur.fetchall() == [["Yy", "Zz"]]
+
+ cur.scroll(-1)
+ cur.row_factory = rows.dict_row
+ assert cur.fetchone() == {"y": "y", "z": "z"}
+
+
+def test_row_factory_none(conn):
+ cur = conn.cursor(row_factory=None)
+ assert cur.row_factory is rows.tuple_row
+ r = cur.execute("select 1 as a, 2 as b").fetchone()
+ assert type(r) is tuple
+ assert r == (1, 2)
+
+
+def test_bad_row_factory(conn):
+ def broken_factory(cur):
+ 1 / 0
+
+ cur = conn.cursor(row_factory=broken_factory)
+ with pytest.raises(ZeroDivisionError):
+ cur.execute("select 1")
+
+ def broken_maker(cur):
+ def make_row(seq):
+ 1 / 0
+
+ return make_row
+
+ cur = conn.cursor(row_factory=broken_maker)
+ cur.execute("select 1")
+ with pytest.raises(ZeroDivisionError):
+ cur.fetchone()
+
+
+def test_scroll(conn):
+ cur = conn.cursor()
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.scroll(0)
+
+ cur.execute("select generate_series(0,9)")
+ cur.scroll(2)
+ assert cur.fetchone() == (2,)
+ cur.scroll(2)
+ assert cur.fetchone() == (5,)
+ cur.scroll(2, mode="relative")
+ assert cur.fetchone() == (8,)
+ cur.scroll(-1)
+ assert cur.fetchone() == (8,)
+ cur.scroll(-2)
+ assert cur.fetchone() == (7,)
+ cur.scroll(2, mode="absolute")
+ assert cur.fetchone() == (2,)
+
+ # on the boundary
+ cur.scroll(0, mode="absolute")
+ assert cur.fetchone() == (0,)
+ with pytest.raises(IndexError):
+ cur.scroll(-1, mode="absolute")
+
+ cur.scroll(0, mode="absolute")
+ with pytest.raises(IndexError):
+ cur.scroll(-1)
+
+ cur.scroll(9, mode="absolute")
+ assert cur.fetchone() == (9,)
+ with pytest.raises(IndexError):
+ cur.scroll(10, mode="absolute")
+
+ cur.scroll(9, mode="absolute")
+ with pytest.raises(IndexError):
+ cur.scroll(1)
+
+ with pytest.raises(ValueError):
+ cur.scroll(1, "wat")
+
+
+def test_query_params_execute(conn):
+ cur = conn.cursor()
+ assert cur._query is None
+
+ cur.execute("select %t, %s::text", [1, None])
+ assert cur._query is not None
+ assert cur._query.query == b"select 1, NULL::text"
+ assert cur._query.params == (b"1", b"NULL")
+
+ cur.execute("select 1")
+ assert cur._query.query == b"select 1"
+ assert not cur._query.params
+
+ with pytest.raises(psycopg.DataError):
+ cur.execute("select %t::int", ["wat"])
+
+ assert cur._query.query == b"select 'wat'::int"
+ assert cur._query.params == (b"'wat'",)
+
+
+@pytest.mark.parametrize(
+ "query, params, want",
+ [
+ ("select %(x)s", {"x": 1}, (1,)),
+ ("select %(x)s, %(y)s", {"x": 1, "y": 2}, (1, 2)),
+ ("select %(x)s, %(x)s", {"x": 1}, (1, 1)),
+ ],
+)
+def test_query_params_named(conn, query, params, want):
+ cur = conn.cursor()
+ cur.execute(query, params)
+ rec = cur.fetchone()
+ assert rec == want
+
+
+def test_query_params_executemany(conn):
+ cur = conn.cursor()
+
+ cur.executemany("select %t, %t", [[1, 2], [3, 4]])
+ assert cur._query.query == b"select 3, 4"
+ assert cur._query.params == (b"3", b"4")
+
+
+@pytest.mark.crdb_skip("copy")
+@pytest.mark.parametrize("ph, params", [("%s", (10,)), ("%(n)s", {"n": 10})])
+def test_copy_out_param(conn, ph, params):
+ cur = conn.cursor()
+ with cur.copy(
+ f"copy (select * from generate_series(1, {ph})) to stdout", params
+ ) as copy:
+ copy.set_types(["int4"])
+ assert list(copy.rows()) == [(i + 1,) for i in range(10)]
+
+ assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+
+
+def test_stream(conn):
+ cur = conn.cursor()
+ recs = []
+ for rec in cur.stream(
+ "select i, '2021-01-01'::date + i from generate_series(1, %s) as i",
+ [2],
+ ):
+ recs.append(rec)
+
+ assert recs == [(1, dt.date(2021, 1, 2)), (2, dt.date(2021, 1, 3))]
+
+
+class TestColumn:
+ def test_description_attribs(self, conn):
+ curs = conn.cursor()
+ curs.execute(
+ """select
+ 3.14::decimal(10,2) as pi,
+ 'hello'::text as hi,
+ '2010-02-18'::date as now
+ """
+ )
+ assert len(curs.description) == 3
+ for c in curs.description:
+ len(c) == 7 # DBAPI happy
+ for i, a in enumerate(
+ """
+ name type_code display_size internal_size precision scale null_ok
+ """.split()
+ ):
+ assert c[i] == getattr(c, a)
+
+ # Won't fill them up
+ assert c.null_ok is None
+
+ c = curs.description[0]
+ assert c.name == "pi"
+ assert c.type_code == builtins["numeric"].oid
+ assert c.display_size is None
+ assert c.internal_size is None
+ assert c.precision == 10
+ assert c.scale == 2
+
+ c = curs.description[1]
+ assert c.name == "hi"
+ assert c.type_code == builtins["text"].oid
+ assert c.display_size is None
+ assert c.internal_size is None
+ assert c.precision is None
+ assert c.scale is None
+
+ c = curs.description[2]
+ assert c.name == "now"
+ assert c.type_code == builtins["date"].oid
+ assert c.display_size is None
+ if is_crdb(conn):
+ assert c.internal_size == 16
+ else:
+ assert c.internal_size == 4
+ assert c.precision is None
+ assert c.scale is None
+
+ def test_description_slice(self, conn):
+ curs = conn.cursor()
+ curs.execute("select 1::int as a")
+ curs.description[0][0:2] == ("a", 23)
+
+ @pytest.mark.parametrize(
+ "type, precision, scale, dsize, isize",
+ [
+ ("text", None, None, None, None),
+ ("varchar", None, None, None, None),
+ ("varchar(42)", None, None, 42, None),
+ ("int4", None, None, None, 4),
+ ("numeric", None, None, None, None),
+ ("numeric(10)", 10, 0, None, None),
+ ("numeric(10, 3)", 10, 3, None, None),
+ ("time", None, None, None, 8),
+ crdb_time_precision("time(4)", 4, None, None, 8),
+ crdb_time_precision("time(10)", 6, None, None, 8),
+ ],
+ )
+ def test_details(self, conn, type, precision, scale, dsize, isize):
+ cur = conn.cursor()
+ cur.execute(f"select null::{type}")
+ col = cur.description[0]
+ repr(col)
+ assert col.precision == precision
+ assert col.scale == scale
+ assert col.display_size == dsize
+ assert col.internal_size == isize
+
+ def test_pickle(self, conn):
+ curs = conn.cursor()
+ curs.execute(
+ """select
+ 3.14::decimal(10,2) as pi,
+ 'hello'::text as hi,
+ '2010-02-18'::date as now
+ """
+ )
+ description = curs.description
+ pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
+ unpickled = pickle.loads(pickled)
+ assert [tuple(d) for d in description] == [tuple(d) for d in unpickled]
+
+ @pytest.mark.crdb_skip("no col query")
+ def test_no_col_query(self, conn):
+ cur = conn.execute("select")
+ assert cur.description == []
+ assert cur.fetchall() == [()]
+
+ def test_description_closed_connection(self, conn):
+ # If we have reasons to break this test we will (e.g. we really need
+ # the connection). In #172 it fails just by accident.
+ cur = conn.execute("select 1::int4 as foo")
+ conn.close()
+ assert len(cur.description) == 1
+ col = cur.description[0]
+ assert col.name == "foo"
+ assert col.type_code == 23
+
+ def test_name_not_a_name(self, conn):
+ cur = conn.cursor()
+ (res,) = cur.execute("""select 'x' as "foo-bar" """).fetchone()
+ assert res == "x"
+ assert cur.description[0].name == "foo-bar"
+
+ @pytest.mark.parametrize("encoding", ["utf8", crdb_encoding("latin9")])
+ def test_name_encode(self, conn, encoding):
+ conn.execute(f"set client_encoding to {encoding}")
+ cur = conn.cursor()
+ (res,) = cur.execute("""select 'x' as "\u20ac" """).fetchone()
+ assert res == "x"
+ assert cur.description[0].name == "\u20ac"
+
+
+def test_str(conn):
+ cur = conn.cursor()
+ assert "psycopg.ClientCursor" in str(cur)
+ assert "[IDLE]" in str(cur)
+ assert "[closed]" not in str(cur)
+ assert "[no result]" in str(cur)
+ cur.execute("select 1")
+ assert "[INTRANS]" in str(cur)
+ assert "[TUPLES_OK]" in str(cur)
+ assert "[closed]" not in str(cur)
+ assert "[no result]" not in str(cur)
+ cur.close()
+ assert "[closed]" in str(cur)
+ assert "[INTRANS]" in str(cur)
+
+
+@pytest.mark.slow
+@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+def test_leak(conn_cls, dsn, faker, fetch, row_factory):
+ faker.choose_schema(ncols=5)
+ faker.make_records(10)
+ row_factory = getattr(rows, row_factory)
+
+ def work():
+ with conn_cls.connect(dsn) as conn, conn.transaction(force_rollback=True):
+ with psycopg.ClientCursor(conn, row_factory=row_factory) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+ with faker.find_insert_problem(conn):
+ cur.executemany(faker.insert_stmt, faker.records)
+
+ cur.execute(faker.select_stmt)
+
+ if fetch == "one":
+ while True:
+ tmp = cur.fetchone()
+ if tmp is None:
+ break
+ elif fetch == "many":
+ while True:
+ tmp = cur.fetchmany(3)
+ if not tmp:
+ break
+ elif fetch == "all":
+ cur.fetchall()
+ elif fetch == "iter":
+ for rec in cur:
+ pass
+
+ n = []
+ gc_collect()
+ for i in range(3):
+ work()
+ gc_collect()
+ n.append(gc_count())
+ assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
+@pytest.mark.parametrize(
+ "query, params, want",
+ [
+ ("select 'hello'", (), "select 'hello'"),
+ ("select %s, %s", ([1, dt.date(2020, 1, 1)],), "select 1, '2020-01-01'::date"),
+ ("select %(foo)s, %(foo)s", ({"foo": "x"},), "select 'x', 'x'"),
+ ("select %%", (), "select %%"),
+ ("select %%, %s", (["a"],), "select %, 'a'"),
+ ("select %%, %(foo)s", ({"foo": "x"},), "select %, 'x'"),
+ ("select %%s, %(foo)s", ({"foo": "x"},), "select %s, 'x'"),
+ ],
+)
+def test_mogrify(conn, query, params, want):
+ cur = conn.cursor()
+ got = cur.mogrify(query, *params)
+ assert got == want
+
+
+@pytest.mark.parametrize("encoding", ["utf8", crdb_encoding("latin9")])
+def test_mogrify_encoding(conn, encoding):
+ conn.execute(f"set client_encoding to {encoding}")
+ q = conn.cursor().mogrify("select %(s)s", {"s": "\u20ac"})
+ assert q == "select '\u20ac'"
+
+
+@pytest.mark.parametrize("encoding", [crdb_encoding("latin1")])
+def test_mogrify_badenc(conn, encoding):
+ conn.execute(f"set client_encoding to {encoding}")
+ with pytest.raises(UnicodeEncodeError):
+ conn.cursor().mogrify("select %(s)s", {"s": "\u20ac"})
+
+
+@pytest.mark.pipeline
+def test_message_0x33(conn):
+ # https://github.com/psycopg/psycopg/issues/314
+ notices = []
+ conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
+ conn.autocommit = True
+ with conn.pipeline():
+ cur = conn.execute("select 'test'")
+ assert cur.fetchone() == ("test",)
+
+ assert not notices