summaryrefslogtreecommitdiffstats
path: root/tests/test_query.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/test_query.py162
1 files changed, 162 insertions, 0 deletions
diff --git a/tests/test_query.py b/tests/test_query.py
new file mode 100644
index 0000000..7263a80
--- /dev/null
+++ b/tests/test_query.py
@@ -0,0 +1,162 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+from psycopg.adapt import Transformer, PyFormat
+from psycopg._queries import PostgresQuery, _split_query
+
+
+@pytest.mark.parametrize(
+ "input, want",
+ [
+ (b"", [(b"", 0, PyFormat.AUTO)]),
+ (b"foo bar", [(b"foo bar", 0, PyFormat.AUTO)]),
+ (b"foo %% bar", [(b"foo % bar", 0, PyFormat.AUTO)]),
+ (b"%s", [(b"", 0, PyFormat.AUTO), (b"", 0, PyFormat.AUTO)]),
+ (b"%s foo", [(b"", 0, PyFormat.AUTO), (b" foo", 0, PyFormat.AUTO)]),
+ (b"%b foo", [(b"", 0, PyFormat.BINARY), (b" foo", 0, PyFormat.AUTO)]),
+ (b"foo %s", [(b"foo ", 0, PyFormat.AUTO), (b"", 0, PyFormat.AUTO)]),
+ (
+ b"foo %%%s bar",
+ [(b"foo %", 0, PyFormat.AUTO), (b" bar", 0, PyFormat.AUTO)],
+ ),
+ (
+ b"foo %(name)s bar",
+ [(b"foo ", "name", PyFormat.AUTO), (b" bar", 0, PyFormat.AUTO)],
+ ),
+ (
+ b"foo %(name)s %(name)b bar",
+ [
+ (b"foo ", "name", PyFormat.AUTO),
+ (b" ", "name", PyFormat.BINARY),
+ (b" bar", 0, PyFormat.AUTO),
+ ],
+ ),
+ (
+ b"foo %s%b bar %s baz",
+ [
+ (b"foo ", 0, PyFormat.AUTO),
+ (b"", 1, PyFormat.BINARY),
+ (b" bar ", 2, PyFormat.AUTO),
+ (b" baz", 0, PyFormat.AUTO),
+ ],
+ ),
+ ],
+)
+def test_split_query(input, want):
+ assert _split_query(input) == want
+
+
+@pytest.mark.parametrize(
+ "input",
+ [
+ b"foo %d bar",
+ b"foo % bar",
+ b"foo %%% bar",
+ b"foo %(foo)d bar",
+ b"foo %(foo)s bar %s baz",
+ b"foo %(foo) bar",
+ b"foo %(foo bar",
+ b"3%2",
+ ],
+)
+def test_split_query_bad(input):
+ with pytest.raises(psycopg.ProgrammingError):
+ _split_query(input)
+
+
+@pytest.mark.parametrize(
+ "query, params, want, wformats, wparams",
+ [
+ (b"", None, b"", None, None),
+ (b"", [], b"", [], []),
+ (b"%%", [], b"%", [], []),
+ (b"select %t", (1,), b"select $1", [pq.Format.TEXT], [b"1"]),
+ (
+ b"%t %% %t",
+ (1, 2),
+ b"$1 % $2",
+ [pq.Format.TEXT, pq.Format.TEXT],
+ [b"1", b"2"],
+ ),
+ (
+ b"%t %% %t",
+ ("a", 2),
+ b"$1 % $2",
+ [pq.Format.TEXT, pq.Format.TEXT],
+ [b"a", b"2"],
+ ),
+ ],
+)
+def test_pg_query_seq(query, params, want, wformats, wparams):
+ pq = PostgresQuery(Transformer())
+ pq.convert(query, params)
+ assert pq.query == want
+ assert pq.formats == wformats
+ assert pq.params == wparams
+
+
+@pytest.mark.parametrize(
+ "query, params, want, wformats, wparams",
+ [
+ (b"", {}, b"", [], []),
+ (b"hello %%", {"a": 1}, b"hello %", [], []),
+ (
+ b"select %(hello)t",
+ {"hello": 1, "world": 2},
+ b"select $1",
+ [pq.Format.TEXT],
+ [b"1"],
+ ),
+ (
+ b"select %(hi)s %(there)s %(hi)s",
+ {"hi": 0, "there": "a"},
+ b"select $1 $2 $1",
+ [pq.Format.BINARY, pq.Format.TEXT],
+ [b"\x00" * 2, b"a"],
+ ),
+ ],
+)
+def test_pg_query_map(query, params, want, wformats, wparams):
+ pq = PostgresQuery(Transformer())
+ pq.convert(query, params)
+ assert pq.query == want
+ assert pq.formats == wformats
+ assert pq.params == wparams
+
+
+@pytest.mark.parametrize(
+ "query, params",
+ [
+ (b"select %s", {"a": 1}),
+ (b"select %(name)s", [1]),
+ (b"select %s", "a"),
+ (b"select %s", 1),
+ (b"select %s", b"a"),
+ (b"select %s", set()),
+ ],
+)
+def test_pq_query_badtype(query, params):
+ pq = PostgresQuery(Transformer())
+ with pytest.raises(TypeError):
+ pq.convert(query, params)
+
+
+@pytest.mark.parametrize(
+ "query, params",
+ [
+ (b"", [1]),
+ (b"%s", []),
+ (b"%%", [1]),
+ (b"$1", [1]),
+ (b"select %(", {"a": 1}),
+ (b"select %(a", {"a": 1}),
+ (b"select %(a)", {"a": 1}),
+ (b"select %s %(hi)s", [1]),
+ (b"select %(hi)s %(hi)b", {"hi": 1}),
+ ],
+)
+def test_pq_query_badprog(query, params):
+ pq = PostgresQuery(Transformer())
+ with pytest.raises(psycopg.ProgrammingError):
+ pq.convert(query, params)