-- -- nested calls -- CREATE FUNCTION nested_call_one(a text) RETURNS text AS 'q = "SELECT nested_call_two(''%s'')" % a r = plpy.execute(q) return r[0]' LANGUAGE plpython3u ; CREATE FUNCTION nested_call_two(a text) RETURNS text AS 'q = "SELECT nested_call_three(''%s'')" % a r = plpy.execute(q) return r[0]' LANGUAGE plpython3u ; CREATE FUNCTION nested_call_three(a text) RETURNS text AS 'return a' LANGUAGE plpython3u ; -- some spi stuff CREATE FUNCTION spi_prepared_plan_test_one(a text) RETURNS text AS 'if "myplan" not in SD: q = "SELECT count(*) FROM users WHERE lname = $1" SD["myplan"] = plpy.prepare(q, [ "text" ]) try: rv = plpy.execute(SD["myplan"], [a]) return "there are " + str(rv[0]["count"]) + " " + str(a) + "s" except Exception as ex: plpy.error(str(ex)) return None ' LANGUAGE plpython3u; CREATE FUNCTION spi_prepared_plan_test_two(a text) RETURNS text AS 'if "myplan" not in SD: q = "SELECT count(*) FROM users WHERE lname = $1" SD["myplan"] = plpy.prepare(q, [ "text" ]) try: rv = SD["myplan"].execute([a]) return "there are " + str(rv[0]["count"]) + " " + str(a) + "s" except Exception as ex: plpy.error(str(ex)) return None ' LANGUAGE plpython3u; CREATE FUNCTION spi_prepared_plan_test_nested(a text) RETURNS text AS 'if "myplan" not in SD: q = "SELECT spi_prepared_plan_test_one(''%s'') as count" % a SD["myplan"] = plpy.prepare(q) try: rv = plpy.execute(SD["myplan"]) if len(rv): return rv[0]["count"] except Exception as ex: plpy.error(str(ex)) return None ' LANGUAGE plpython3u; CREATE FUNCTION join_sequences(s sequences) RETURNS text AS 'if not s["multipart"]: return s["sequence"] q = "SELECT sequence FROM xsequences WHERE pid = ''%s''" % s["pid"] rv = plpy.execute(q) seq = s["sequence"] for r in rv: seq = seq + r["sequence"] return seq ' LANGUAGE plpython3u; CREATE FUNCTION spi_recursive_sum(a int) RETURNS int AS 'r = 0 if a > 1: r = plpy.execute("SELECT spi_recursive_sum(%d) as a" % (a-1))[0]["a"] return a + r ' LANGUAGE plpython3u; -- -- spi and nested calls -- select nested_call_one('pass this along'); nested_call_one ----------------------------------------------------------------- {'nested_call_two': "{'nested_call_three': 'pass this along'}"} (1 row) select spi_prepared_plan_test_one('doe'); spi_prepared_plan_test_one ---------------------------- there are 3 does (1 row) select spi_prepared_plan_test_two('smith'); spi_prepared_plan_test_two ---------------------------- there are 1 smiths (1 row) select spi_prepared_plan_test_nested('smith'); spi_prepared_plan_test_nested ------------------------------- there are 1 smiths (1 row) SELECT join_sequences(sequences) FROM sequences; join_sequences ---------------- ABCDEFGHIJKL ABCDEF ABCDEF ABCDEF ABCDEF ABCDEF (6 rows) SELECT join_sequences(sequences) FROM sequences WHERE join_sequences(sequences) ~* '^A'; join_sequences ---------------- ABCDEFGHIJKL ABCDEF ABCDEF ABCDEF ABCDEF ABCDEF (6 rows) SELECT join_sequences(sequences) FROM sequences WHERE join_sequences(sequences) ~* '^B'; join_sequences ---------------- (0 rows) SELECT spi_recursive_sum(10); spi_recursive_sum ------------------- 55 (1 row) -- -- plan and result objects -- CREATE FUNCTION result_metadata_test(cmd text) RETURNS int AS $$ plan = plpy.prepare(cmd) plpy.info(plan.status()) # not really documented or useful result = plpy.execute(plan) if result.status() > 0: plpy.info(result.colnames()) plpy.info(result.coltypes()) plpy.info(result.coltypmods()) return result.nrows() else: return None $$ LANGUAGE plpython3u; SELECT result_metadata_test($$SELECT 1 AS foo, '11'::text AS bar UNION SELECT 2, '22'$$); INFO: True INFO: ['foo', 'bar'] INFO: [23, 25] INFO: [-1, -1] result_metadata_test ---------------------- 2 (1 row) SELECT result_metadata_test($$CREATE TEMPORARY TABLE foo1 (a int, b text)$$); INFO: True ERROR: plpy.Error: command did not produce a result set CONTEXT: Traceback (most recent call last): PL/Python function "result_metadata_test", line 6, in plpy.info(result.colnames()) PL/Python function "result_metadata_test" CREATE FUNCTION result_nrows_test(cmd text) RETURNS int AS $$ result = plpy.execute(cmd) return result.nrows() $$ LANGUAGE plpython3u; SELECT result_nrows_test($$SELECT 1$$); result_nrows_test ------------------- 1 (1 row) SELECT result_nrows_test($$CREATE TEMPORARY TABLE foo2 (a int, b text)$$); result_nrows_test ------------------- 0 (1 row) SELECT result_nrows_test($$INSERT INTO foo2 VALUES (1, 'one'), (2, 'two')$$); result_nrows_test ------------------- 2 (1 row) SELECT result_nrows_test($$UPDATE foo2 SET b = '' WHERE a = 2$$); result_nrows_test ------------------- 1 (1 row) CREATE FUNCTION result_len_test(cmd text) RETURNS int AS $$ result = plpy.execute(cmd) return len(result) $$ LANGUAGE plpython3u; SELECT result_len_test($$SELECT 1$$); result_len_test ----------------- 1 (1 row) SELECT result_len_test($$CREATE TEMPORARY TABLE foo3 (a int, b text)$$); result_len_test ----------------- 0 (1 row) SELECT result_len_test($$INSERT INTO foo3 VALUES (1, 'one'), (2, 'two')$$); result_len_test ----------------- 0 (1 row) SELECT result_len_test($$UPDATE foo3 SET b= '' WHERE a = 2$$); result_len_test ----------------- 0 (1 row) CREATE FUNCTION result_subscript_test() RETURNS void AS $$ result = plpy.execute("SELECT 1 AS c UNION ALL SELECT 2 " "UNION ALL SELECT 3 UNION ALL SELECT 4") plpy.info(result[1]['c']) plpy.info(result[-1]['c']) plpy.info([item['c'] for item in result[1:3]]) plpy.info([item['c'] for item in result[::2]]) result[-1] = {'c': 1000} result[:2] = [{'c': 10}, {'c': 100}] plpy.info([item['c'] for item in result[:]]) # raises TypeError, catch so further tests could be added try: plpy.info(result['foo']) except TypeError: pass else: assert False, "TypeError not raised" $$ LANGUAGE plpython3u; SELECT result_subscript_test(); INFO: 2 INFO: 4 INFO: [2, 3] INFO: [1, 3] INFO: [10, 100, 3, 1000] result_subscript_test ----------------------- (1 row) CREATE FUNCTION result_empty_test() RETURNS void AS $$ result = plpy.execute("select 1 where false") plpy.info(result[:]) $$ LANGUAGE plpython3u; SELECT result_empty_test(); INFO: [] result_empty_test ------------------- (1 row) CREATE FUNCTION result_str_test(cmd text) RETURNS text AS $$ plan = plpy.prepare(cmd) result = plpy.execute(plan) return str(result) $$ LANGUAGE plpython3u; SELECT result_str_test($$SELECT 1 AS foo UNION SELECT 2$$); result_str_test ------------------------------------------------------------ (1 row) SELECT result_str_test($$CREATE TEMPORARY TABLE foo1 (a int, b text)$$); result_str_test -------------------------------------- (1 row) -- cursor objects CREATE FUNCTION simple_cursor_test() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users") does = 0 for row in res: if row['lname'] == 'doe': does += 1 return does $$ LANGUAGE plpython3u; CREATE FUNCTION double_cursor_close() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users") res.close() res.close() $$ LANGUAGE plpython3u; CREATE FUNCTION cursor_fetch() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users") assert len(res.fetch(3)) == 3 assert len(res.fetch(3)) == 1 assert len(res.fetch(3)) == 0 assert len(res.fetch(3)) == 0 try: # use next() or __next__(), the method name changed in # http://www.python.org/dev/peps/pep-3114/ try: res.next() except AttributeError: res.__next__() except StopIteration: pass else: assert False, "StopIteration not raised" $$ LANGUAGE plpython3u; CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users order by fname") assert len(res.fetch(2)) == 2 item = None try: item = res.next() except AttributeError: item = res.__next__() assert item['fname'] == 'rick' assert len(res.fetch(2)) == 1 $$ LANGUAGE plpython3u; CREATE FUNCTION fetch_after_close() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users") res.close() try: res.fetch(1) except ValueError: pass else: assert False, "ValueError not raised" $$ LANGUAGE plpython3u; CREATE FUNCTION next_after_close() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users") res.close() try: try: res.next() except AttributeError: res.__next__() except ValueError: pass else: assert False, "ValueError not raised" $$ LANGUAGE plpython3u; CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$ res = plpy.cursor("select fname, lname from users where false") assert len(res.fetch(1)) == 0 try: try: res.next() except AttributeError: res.__next__() except StopIteration: pass else: assert False, "StopIteration not raised" $$ LANGUAGE plpython3u; CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$ plan = plpy.prepare( "select fname, lname from users where fname like $1 || '%' order by fname", ["text"]) for row in plpy.cursor(plan, ["w"]): yield row['fname'] for row in plan.cursor(["j"]): yield row['fname'] $$ LANGUAGE plpython3u; CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$ plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'", ["text"]) c = plpy.cursor(plan, ["a", "b"]) $$ LANGUAGE plpython3u; CREATE TYPE test_composite_type AS ( a1 int, a2 varchar ); CREATE OR REPLACE FUNCTION plan_composite_args() RETURNS test_composite_type AS $$ plan = plpy.prepare("select $1 as c1", ["test_composite_type"]) res = plpy.execute(plan, [{"a1": 3, "a2": "label"}]) return res[0]["c1"] $$ LANGUAGE plpython3u; SELECT simple_cursor_test(); simple_cursor_test -------------------- 3 (1 row) SELECT double_cursor_close(); double_cursor_close --------------------- (1 row) SELECT cursor_fetch(); cursor_fetch -------------- (1 row) SELECT cursor_mix_next_and_fetch(); cursor_mix_next_and_fetch --------------------------- (1 row) SELECT fetch_after_close(); fetch_after_close ------------------- (1 row) SELECT next_after_close(); next_after_close ------------------ (1 row) SELECT cursor_fetch_next_empty(); cursor_fetch_next_empty ------------------------- (1 row) SELECT cursor_plan(); cursor_plan ------------- willem jane john (3 rows) SELECT cursor_plan_wrong_args(); ERROR: TypeError: Expected sequence of 1 argument, got 2: ['a', 'b'] CONTEXT: Traceback (most recent call last): PL/Python function "cursor_plan_wrong_args", line 4, in c = plpy.cursor(plan, ["a", "b"]) PL/Python function "cursor_plan_wrong_args" SELECT plan_composite_args(); plan_composite_args --------------------- (3,label) (1 row)