diff options
Diffstat (limited to 'tests/test_query.py')
-rw-r--r-- | tests/test_query.py | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/tests/test_query.py b/tests/test_query.py new file mode 100644 index 0000000..7263a80 --- /dev/null +++ b/tests/test_query.py @@ -0,0 +1,162 @@ +import pytest + +import psycopg +from psycopg import pq +from psycopg.adapt import Transformer, PyFormat +from psycopg._queries import PostgresQuery, _split_query + + +@pytest.mark.parametrize( + "input, want", + [ + (b"", [(b"", 0, PyFormat.AUTO)]), + (b"foo bar", [(b"foo bar", 0, PyFormat.AUTO)]), + (b"foo %% bar", [(b"foo % bar", 0, PyFormat.AUTO)]), + (b"%s", [(b"", 0, PyFormat.AUTO), (b"", 0, PyFormat.AUTO)]), + (b"%s foo", [(b"", 0, PyFormat.AUTO), (b" foo", 0, PyFormat.AUTO)]), + (b"%b foo", [(b"", 0, PyFormat.BINARY), (b" foo", 0, PyFormat.AUTO)]), + (b"foo %s", [(b"foo ", 0, PyFormat.AUTO), (b"", 0, PyFormat.AUTO)]), + ( + b"foo %%%s bar", + [(b"foo %", 0, PyFormat.AUTO), (b" bar", 0, PyFormat.AUTO)], + ), + ( + b"foo %(name)s bar", + [(b"foo ", "name", PyFormat.AUTO), (b" bar", 0, PyFormat.AUTO)], + ), + ( + b"foo %(name)s %(name)b bar", + [ + (b"foo ", "name", PyFormat.AUTO), + (b" ", "name", PyFormat.BINARY), + (b" bar", 0, PyFormat.AUTO), + ], + ), + ( + b"foo %s%b bar %s baz", + [ + (b"foo ", 0, PyFormat.AUTO), + (b"", 1, PyFormat.BINARY), + (b" bar ", 2, PyFormat.AUTO), + (b" baz", 0, PyFormat.AUTO), + ], + ), + ], +) +def test_split_query(input, want): + assert _split_query(input) == want + + +@pytest.mark.parametrize( + "input", + [ + b"foo %d bar", + b"foo % bar", + b"foo %%% bar", + b"foo %(foo)d bar", + b"foo %(foo)s bar %s baz", + b"foo %(foo) bar", + b"foo %(foo bar", + b"3%2", + ], +) +def test_split_query_bad(input): + with pytest.raises(psycopg.ProgrammingError): + _split_query(input) + + +@pytest.mark.parametrize( + "query, params, want, wformats, wparams", + [ + (b"", None, b"", None, None), + (b"", [], b"", [], []), + (b"%%", [], b"%", [], []), + (b"select %t", (1,), b"select $1", [pq.Format.TEXT], [b"1"]), + ( + b"%t %% %t", + (1, 2), + b"$1 % $2", + [pq.Format.TEXT, pq.Format.TEXT], + [b"1", b"2"], + ), + ( + b"%t %% %t", + ("a", 2), + b"$1 % $2", + [pq.Format.TEXT, pq.Format.TEXT], + [b"a", b"2"], + ), + ], +) +def test_pg_query_seq(query, params, want, wformats, wparams): + pq = PostgresQuery(Transformer()) + pq.convert(query, params) + assert pq.query == want + assert pq.formats == wformats + assert pq.params == wparams + + +@pytest.mark.parametrize( + "query, params, want, wformats, wparams", + [ + (b"", {}, b"", [], []), + (b"hello %%", {"a": 1}, b"hello %", [], []), + ( + b"select %(hello)t", + {"hello": 1, "world": 2}, + b"select $1", + [pq.Format.TEXT], + [b"1"], + ), + ( + b"select %(hi)s %(there)s %(hi)s", + {"hi": 0, "there": "a"}, + b"select $1 $2 $1", + [pq.Format.BINARY, pq.Format.TEXT], + [b"\x00" * 2, b"a"], + ), + ], +) +def test_pg_query_map(query, params, want, wformats, wparams): + pq = PostgresQuery(Transformer()) + pq.convert(query, params) + assert pq.query == want + assert pq.formats == wformats + assert pq.params == wparams + + +@pytest.mark.parametrize( + "query, params", + [ + (b"select %s", {"a": 1}), + (b"select %(name)s", [1]), + (b"select %s", "a"), + (b"select %s", 1), + (b"select %s", b"a"), + (b"select %s", set()), + ], +) +def test_pq_query_badtype(query, params): + pq = PostgresQuery(Transformer()) + with pytest.raises(TypeError): + pq.convert(query, params) + + +@pytest.mark.parametrize( + "query, params", + [ + (b"", [1]), + (b"%s", []), + (b"%%", [1]), + (b"$1", [1]), + (b"select %(", {"a": 1}), + (b"select %(a", {"a": 1}), + (b"select %(a)", {"a": 1}), + (b"select %s %(hi)s", [1]), + (b"select %(hi)s %(hi)b", {"hi": 1}), + ], +) +def test_pq_query_badprog(query, params): + pq = PostgresQuery(Transformer()) + with pytest.raises(psycopg.ProgrammingError): + pq.convert(query, params) |