summaryrefslogtreecommitdiffstats
path: root/pgcli/pgexecute.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-02-08 10:31:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-02-08 10:31:05 +0000
commit6884720fae8a2622b14e93d9e35ca5fcc2283b40 (patch)
treedf6f736bb623cdd7932bbe2256101a6ac4ef7f35 /pgcli/pgexecute.py
parentInitial commit. (diff)
downloadpgcli-6884720fae8a2622b14e93d9e35ca5fcc2283b40.tar.xz
pgcli-6884720fae8a2622b14e93d9e35ca5fcc2283b40.zip
Adding upstream version 3.1.0.upstream/3.1.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'pgcli/pgexecute.py')
-rw-r--r--pgcli/pgexecute.py857
1 files changed, 857 insertions, 0 deletions
diff --git a/pgcli/pgexecute.py b/pgcli/pgexecute.py
new file mode 100644
index 0000000..d34bf26
--- /dev/null
+++ b/pgcli/pgexecute.py
@@ -0,0 +1,857 @@
+import traceback
+import logging
+import psycopg2
+import psycopg2.extras
+import psycopg2.errorcodes
+import psycopg2.extensions as ext
+import sqlparse
+import pgspecial as special
+import select
+from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE, make_dsn
+from .packages.parseutils.meta import FunctionMetadata, ForeignKey
+
+_logger = logging.getLogger(__name__)
+
+# Cast all database input to unicode automatically.
+# See http://initd.org/psycopg/docs/usage.html#unicode-handling for more info.
+ext.register_type(ext.UNICODE)
+ext.register_type(ext.UNICODEARRAY)
+ext.register_type(ext.new_type((705,), "UNKNOWN", ext.UNICODE))
+# See https://github.com/dbcli/pgcli/issues/426 for more details.
+# This registers a unicode type caster for datatype 'RECORD'.
+ext.register_type(ext.new_type((2249,), "RECORD", ext.UNICODE))
+
+# Cast bytea fields to text. By default, this will render as hex strings with
+# Postgres 9+ and as escaped binary in earlier versions.
+ext.register_type(ext.new_type((17,), "BYTEA_TEXT", psycopg2.STRING))
+
+# TODO: Get default timeout from pgclirc?
+_WAIT_SELECT_TIMEOUT = 1
+
+
+def _wait_select(conn):
+ """
+ copy-pasted from psycopg2.extras.wait_select
+ the default implementation doesn't define a timeout in the select calls
+ """
+ while 1:
+ try:
+ state = conn.poll()
+ if state == POLL_OK:
+ break
+ elif state == POLL_READ:
+ select.select([conn.fileno()], [], [], _WAIT_SELECT_TIMEOUT)
+ elif state == POLL_WRITE:
+ select.select([], [conn.fileno()], [], _WAIT_SELECT_TIMEOUT)
+ else:
+ raise conn.OperationalError("bad state from poll: %s" % state)
+ except KeyboardInterrupt:
+ conn.cancel()
+ # the loop will be broken by a server error
+ continue
+ except select.error as e:
+ errno = e.args[0]
+ if errno != 4:
+ raise
+
+
+# When running a query, make pressing CTRL+C raise a KeyboardInterrupt
+# See http://initd.org/psycopg/articles/2014/07/20/cancelling-postgresql-statements-python/
+# See also https://github.com/psycopg/psycopg2/issues/468
+ext.set_wait_callback(_wait_select)
+
+
+def register_date_typecasters(connection):
+ """
+ Casts date and timestamp values to string, resolves issues with out of
+ range dates (e.g. BC) which psycopg2 can't handle
+ """
+
+ def cast_date(value, cursor):
+ return value
+
+ cursor = connection.cursor()
+ cursor.execute("SELECT NULL::date")
+ date_oid = cursor.description[0][1]
+ cursor.execute("SELECT NULL::timestamp")
+ timestamp_oid = cursor.description[0][1]
+ cursor.execute("SELECT NULL::timestamp with time zone")
+ timestamptz_oid = cursor.description[0][1]
+ oids = (date_oid, timestamp_oid, timestamptz_oid)
+ new_type = psycopg2.extensions.new_type(oids, "DATE", cast_date)
+ psycopg2.extensions.register_type(new_type)
+
+
+def register_json_typecasters(conn, loads_fn):
+ """Set the function for converting JSON data for a connection.
+
+ Use the supplied function to decode JSON data returned from the database
+ via the given connection. The function should accept a single argument of
+ the data as a string encoded in the database's character encoding.
+ psycopg2's default handler for JSON data is json.loads.
+ http://initd.org/psycopg/docs/extras.html#json-adaptation
+
+ This function attempts to register the typecaster for both JSON and JSONB
+ types.
+
+ Returns a set that is a subset of {'json', 'jsonb'} indicating which types
+ (if any) were successfully registered.
+ """
+ available = set()
+
+ for name in ["json", "jsonb"]:
+ try:
+ psycopg2.extras.register_json(conn, loads=loads_fn, name=name)
+ available.add(name)
+ except psycopg2.ProgrammingError:
+ pass
+
+ return available
+
+
+def register_hstore_typecaster(conn):
+ """
+ Instead of using register_hstore() which converts hstore into a python
+ dict, we query the 'oid' of hstore which will be different for each
+ database and register a type caster that converts it to unicode.
+ http://initd.org/psycopg/docs/extras.html#psycopg2.extras.register_hstore
+ """
+ with conn.cursor() as cur:
+ try:
+ cur.execute(
+ "select t.oid FROM pg_type t WHERE t.typname = 'hstore' and t.typisdefined"
+ )
+ oid = cur.fetchone()[0]
+ ext.register_type(ext.new_type((oid,), "HSTORE", ext.UNICODE))
+ except Exception:
+ pass
+
+
+class PGExecute(object):
+
+ # The boolean argument to the current_schemas function indicates whether
+ # implicit schemas, e.g. pg_catalog
+ search_path_query = """
+ SELECT * FROM unnest(current_schemas(true))"""
+
+ schemata_query = """
+ SELECT nspname
+ FROM pg_catalog.pg_namespace
+ ORDER BY 1 """
+
+ tables_query = """
+ SELECT n.nspname schema_name,
+ c.relname table_name
+ FROM pg_catalog.pg_class c
+ LEFT JOIN pg_catalog.pg_namespace n
+ ON n.oid = c.relnamespace
+ WHERE c.relkind = ANY(%s)
+ ORDER BY 1,2;"""
+
+ databases_query = """
+ SELECT d.datname
+ FROM pg_catalog.pg_database d
+ ORDER BY 1"""
+
+ full_databases_query = """
+ SELECT d.datname as "Name",
+ pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
+ pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding",
+ d.datcollate as "Collate",
+ d.datctype as "Ctype",
+ pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
+ FROM pg_catalog.pg_database d
+ ORDER BY 1"""
+
+ socket_directory_query = """
+ SELECT setting
+ FROM pg_settings
+ WHERE name = 'unix_socket_directories'
+ """
+
+ view_definition_query = """
+ WITH v AS (SELECT %s::pg_catalog.regclass::pg_catalog.oid AS v_oid)
+ SELECT nspname, relname, relkind,
+ pg_catalog.pg_get_viewdef(c.oid, true),
+ array_remove(array_remove(c.reloptions,'check_option=local'),
+ 'check_option=cascaded') AS reloptions,
+ CASE
+ WHEN 'check_option=local' = ANY (c.reloptions) THEN 'LOCAL'::text
+ WHEN 'check_option=cascaded' = ANY (c.reloptions) THEN 'CASCADED'::text
+ ELSE NULL
+ END AS checkoption
+ FROM pg_catalog.pg_class c
+ LEFT JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid)
+ JOIN v ON (c.oid = v.v_oid)"""
+
+ function_definition_query = """
+ WITH f AS
+ (SELECT %s::pg_catalog.regproc::pg_catalog.oid AS f_oid)
+ SELECT pg_catalog.pg_get_functiondef(f.f_oid)
+ FROM f"""
+
+ version_query = "SELECT version();"
+
+ def __init__(
+ self,
+ database=None,
+ user=None,
+ password=None,
+ host=None,
+ port=None,
+ dsn=None,
+ **kwargs,
+ ):
+ self._conn_params = {}
+ self.conn = None
+ self.dbname = None
+ self.user = None
+ self.password = None
+ self.host = None
+ self.port = None
+ self.server_version = None
+ self.extra_args = None
+ self.connect(database, user, password, host, port, dsn, **kwargs)
+ self.reset_expanded = None
+
+ def copy(self):
+ """Returns a clone of the current executor."""
+ return self.__class__(**self._conn_params)
+
+ def connect(
+ self,
+ database=None,
+ user=None,
+ password=None,
+ host=None,
+ port=None,
+ dsn=None,
+ **kwargs,
+ ):
+
+ conn_params = self._conn_params.copy()
+
+ new_params = {
+ "database": database,
+ "user": user,
+ "password": password,
+ "host": host,
+ "port": port,
+ "dsn": dsn,
+ }
+ new_params.update(kwargs)
+
+ if new_params["dsn"]:
+ new_params = {"dsn": new_params["dsn"], "password": new_params["password"]}
+
+ if new_params["password"]:
+ new_params["dsn"] = make_dsn(
+ new_params["dsn"], password=new_params.pop("password")
+ )
+
+ conn_params.update({k: v for k, v in new_params.items() if v})
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor()
+ conn.set_client_encoding("utf8")
+
+ self._conn_params = conn_params
+ if self.conn:
+ self.conn.close()
+ self.conn = conn
+ self.conn.autocommit = True
+
+ # When we connect using a DSN, we don't really know what db,
+ # user, etc. we connected to. Let's read it.
+ # Note: moved this after setting autocommit because of #664.
+ libpq_version = psycopg2.__libpq_version__
+ dsn_parameters = {}
+ if libpq_version >= 93000:
+ # use actual connection info from psycopg2.extensions.Connection.info
+ # as libpq_version > 9.3 is available and required dependency
+ dsn_parameters = conn.info.dsn_parameters
+ else:
+ try:
+ dsn_parameters = conn.get_dsn_parameters()
+ except Exception as x:
+ # https://github.com/dbcli/pgcli/issues/1110
+ # PQconninfo not available in libpq < 9.3
+ _logger.info("Exception in get_dsn_parameters: %r", x)
+
+ if dsn_parameters:
+ self.dbname = dsn_parameters.get("dbname")
+ self.user = dsn_parameters.get("user")
+ self.host = dsn_parameters.get("host")
+ self.port = dsn_parameters.get("port")
+ else:
+ self.dbname = conn_params.get("database")
+ self.user = conn_params.get("user")
+ self.host = conn_params.get("host")
+ self.port = conn_params.get("port")
+
+ self.password = password
+ self.extra_args = kwargs
+
+ if not self.host:
+ self.host = self.get_socket_directory()
+
+ pid = self._select_one(cursor, "select pg_backend_pid()")[0]
+ self.pid = pid
+ self.superuser = conn.get_parameter_status("is_superuser") in ("on", "1")
+ self.server_version = conn.get_parameter_status("server_version")
+
+ register_date_typecasters(conn)
+ register_json_typecasters(self.conn, self._json_typecaster)
+ register_hstore_typecaster(self.conn)
+
+ @property
+ def short_host(self):
+ if "," in self.host:
+ host, _, _ = self.host.partition(",")
+ else:
+ host = self.host
+ short_host, _, _ = host.partition(".")
+ return short_host
+
+ def _select_one(self, cur, sql):
+ """
+ Helper method to run a select and retrieve a single field value
+ :param cur: cursor
+ :param sql: string
+ :return: string
+ """
+ cur.execute(sql)
+ return cur.fetchone()
+
+ def _json_typecaster(self, json_data):
+ """Interpret incoming JSON data as a string.
+
+ The raw data is decoded using the connection's encoding, which defaults
+ to the database's encoding.
+
+ See http://initd.org/psycopg/docs/connection.html#connection.encoding
+ """
+
+ return json_data
+
+ def failed_transaction(self):
+ status = self.conn.get_transaction_status()
+ return status == ext.TRANSACTION_STATUS_INERROR
+
+ def valid_transaction(self):
+ status = self.conn.get_transaction_status()
+ return (
+ status == ext.TRANSACTION_STATUS_ACTIVE
+ or status == ext.TRANSACTION_STATUS_INTRANS
+ )
+
+ def run(
+ self, statement, pgspecial=None, exception_formatter=None, on_error_resume=False
+ ):
+ """Execute the sql in the database and return the results.
+
+ :param statement: A string containing one or more sql statements
+ :param pgspecial: PGSpecial object
+ :param exception_formatter: A callable that accepts an Exception and
+ returns a formatted (title, rows, headers, status) tuple that can
+ act as a query result. If an exception_formatter is not supplied,
+ psycopg2 exceptions are always raised.
+ :param on_error_resume: Bool. If true, queries following an exception
+ (assuming exception_formatter has been supplied) continue to
+ execute.
+
+ :return: Generator yielding tuples containing
+ (title, rows, headers, status, query, success, is_special)
+ """
+
+ # Remove spaces and EOL
+ statement = statement.strip()
+ if not statement: # Empty string
+ yield (None, None, None, None, statement, False, False)
+
+ # Split the sql into separate queries and run each one.
+ for sql in sqlparse.split(statement):
+ # Remove spaces, eol and semi-colons.
+ sql = sql.rstrip(";")
+ sql = sqlparse.format(sql, strip_comments=True).strip()
+ if not sql:
+ continue
+ try:
+ if pgspecial:
+ # \G is treated specially since we have to set the expanded output.
+ if sql.endswith("\\G"):
+ if not pgspecial.expanded_output:
+ pgspecial.expanded_output = True
+ self.reset_expanded = True
+ sql = sql[:-2].strip()
+
+ # First try to run each query as special
+ _logger.debug("Trying a pgspecial command. sql: %r", sql)
+ try:
+ cur = self.conn.cursor()
+ except psycopg2.InterfaceError:
+ # edge case when connection is already closed, but we
+ # don't need cursor for special_cmd.arg_type == NO_QUERY.
+ # See https://github.com/dbcli/pgcli/issues/1014.
+ cur = None
+ try:
+ for result in pgspecial.execute(cur, sql):
+ # e.g. execute_from_file already appends these
+ if len(result) < 7:
+ yield result + (sql, True, True)
+ else:
+ yield result
+ continue
+ except special.CommandNotFound:
+ pass
+
+ # Not a special command, so execute as normal sql
+ yield self.execute_normal_sql(sql) + (sql, True, False)
+ except psycopg2.DatabaseError as e:
+ _logger.error("sql: %r, error: %r", sql, e)
+ _logger.error("traceback: %r", traceback.format_exc())
+
+ if self._must_raise(e) or not exception_formatter:
+ raise
+
+ yield None, None, None, exception_formatter(e), sql, False, False
+
+ if not on_error_resume:
+ break
+ finally:
+ if self.reset_expanded:
+ pgspecial.expanded_output = False
+ self.reset_expanded = None
+
+ def _must_raise(self, e):
+ """Return true if e is an error that should not be caught in ``run``.
+
+ An uncaught error will prompt the user to reconnect; as long as we
+ detect that the connection is stil open, we catch the error, as
+ reconnecting won't solve that problem.
+
+ :param e: DatabaseError. An exception raised while executing a query.
+
+ :return: Bool. True if ``run`` must raise this exception.
+
+ """
+ return self.conn.closed != 0
+
+ def execute_normal_sql(self, split_sql):
+ """Returns tuple (title, rows, headers, status)"""
+ _logger.debug("Regular sql statement. sql: %r", split_sql)
+ cur = self.conn.cursor()
+ cur.execute(split_sql)
+
+ # conn.notices persist between queies, we use pop to clear out the list
+ title = ""
+ while len(self.conn.notices) > 0:
+ title = self.conn.notices.pop() + title
+
+ # cur.description will be None for operations that do not return
+ # rows.
+ if cur.description:
+ headers = [x[0] for x in cur.description]
+ return title, cur, headers, cur.statusmessage
+ else:
+ _logger.debug("No rows in result.")
+ return title, None, None, cur.statusmessage
+
+ def search_path(self):
+ """Returns the current search path as a list of schema names"""
+
+ try:
+ with self.conn.cursor() as cur:
+ _logger.debug("Search path query. sql: %r", self.search_path_query)
+ cur.execute(self.search_path_query)
+ return [x[0] for x in cur.fetchall()]
+ except psycopg2.ProgrammingError:
+ fallback = "SELECT * FROM current_schemas(true)"
+ with self.conn.cursor() as cur:
+ _logger.debug("Search path query. sql: %r", fallback)
+ cur.execute(fallback)
+ return cur.fetchone()[0]
+
+ def view_definition(self, spec):
+ """Returns the SQL defining views described by `spec`"""
+
+ template = "CREATE OR REPLACE {6} VIEW {0}.{1} AS \n{3}"
+ # 2: relkind, v or m (materialized)
+ # 4: reloptions, null
+ # 5: checkoption: local or cascaded
+ with self.conn.cursor() as cur:
+ sql = self.view_definition_query
+ _logger.debug("View Definition Query. sql: %r\nspec: %r", sql, spec)
+ try:
+ cur.execute(sql, (spec,))
+ except psycopg2.ProgrammingError:
+ raise RuntimeError("View {} does not exist.".format(spec))
+ result = cur.fetchone()
+ view_type = "MATERIALIZED" if result[2] == "m" else ""
+ return template.format(*result + (view_type,))
+
+ def function_definition(self, spec):
+ """Returns the SQL defining functions described by `spec`"""
+
+ with self.conn.cursor() as cur:
+ sql = self.function_definition_query
+ _logger.debug("Function Definition Query. sql: %r\nspec: %r", sql, spec)
+ try:
+ cur.execute(sql, (spec,))
+ result = cur.fetchone()
+ return result[0]
+ except psycopg2.ProgrammingError:
+ raise RuntimeError("Function {} does not exist.".format(spec))
+
+ def schemata(self):
+ """Returns a list of schema names in the database"""
+
+ with self.conn.cursor() as cur:
+ _logger.debug("Schemata Query. sql: %r", self.schemata_query)
+ cur.execute(self.schemata_query)
+ return [x[0] for x in cur.fetchall()]
+
+ def _relations(self, kinds=("r", "p", "f", "v", "m")):
+ """Get table or view name metadata
+
+ :param kinds: list of postgres relkind filters:
+ 'r' - table
+ 'p' - partitioned table
+ 'f' - foreign table
+ 'v' - view
+ 'm' - materialized view
+ :return: (schema_name, rel_name) tuples
+ """
+
+ with self.conn.cursor() as cur:
+ sql = cur.mogrify(self.tables_query, [kinds])
+ _logger.debug("Tables Query. sql: %r", sql)
+ cur.execute(sql)
+ for row in cur:
+ yield row
+
+ def tables(self):
+ """Yields (schema_name, table_name) tuples"""
+ for row in self._relations(kinds=["r", "p", "f"]):
+ yield row
+
+ def views(self):
+ """Yields (schema_name, view_name) tuples.
+
+ Includes both views and and materialized views
+ """
+ for row in self._relations(kinds=["v", "m"]):
+ yield row
+
+ def _columns(self, kinds=("r", "p", "f", "v", "m")):
+ """Get column metadata for tables and views
+
+ :param kinds: kinds: list of postgres relkind filters:
+ 'r' - table
+ 'p' - partitioned table
+ 'f' - foreign table
+ 'v' - view
+ 'm' - materialized view
+ :return: list of (schema_name, relation_name, column_name, column_type) tuples
+ """
+
+ if self.conn.server_version >= 80400:
+ columns_query = """
+ SELECT nsp.nspname schema_name,
+ cls.relname table_name,
+ att.attname column_name,
+ att.atttypid::regtype::text type_name,
+ att.atthasdef AS has_default,
+ pg_catalog.pg_get_expr(def.adbin, def.adrelid, true) as default
+ FROM pg_catalog.pg_attribute att
+ INNER JOIN pg_catalog.pg_class cls
+ ON att.attrelid = cls.oid
+ INNER JOIN pg_catalog.pg_namespace nsp
+ ON cls.relnamespace = nsp.oid
+ LEFT OUTER JOIN pg_attrdef def
+ ON def.adrelid = att.attrelid
+ AND def.adnum = att.attnum
+ WHERE cls.relkind = ANY(%s)
+ AND NOT att.attisdropped
+ AND att.attnum > 0
+ ORDER BY 1, 2, att.attnum"""
+ else:
+ columns_query = """
+ SELECT nsp.nspname schema_name,
+ cls.relname table_name,
+ att.attname column_name,
+ typ.typname type_name,
+ NULL AS has_default,
+ NULL AS default
+ FROM pg_catalog.pg_attribute att
+ INNER JOIN pg_catalog.pg_class cls
+ ON att.attrelid = cls.oid
+ INNER JOIN pg_catalog.pg_namespace nsp
+ ON cls.relnamespace = nsp.oid
+ INNER JOIN pg_catalog.pg_type typ
+ ON typ.oid = att.atttypid
+ WHERE cls.relkind = ANY(%s)
+ AND NOT att.attisdropped
+ AND att.attnum > 0
+ ORDER BY 1, 2, att.attnum"""
+
+ with self.conn.cursor() as cur:
+ sql = cur.mogrify(columns_query, [kinds])
+ _logger.debug("Columns Query. sql: %r", sql)
+ cur.execute(sql)
+ for row in cur:
+ yield row
+
+ def table_columns(self):
+ for row in self._columns(kinds=["r", "p", "f"]):
+ yield row
+
+ def view_columns(self):
+ for row in self._columns(kinds=["v", "m"]):
+ yield row
+
+ def databases(self):
+ with self.conn.cursor() as cur:
+ _logger.debug("Databases Query. sql: %r", self.databases_query)
+ cur.execute(self.databases_query)
+ return [x[0] for x in cur.fetchall()]
+
+ def full_databases(self):
+ with self.conn.cursor() as cur:
+ _logger.debug("Databases Query. sql: %r", self.full_databases_query)
+ cur.execute(self.full_databases_query)
+ headers = [x[0] for x in cur.description]
+ return cur.fetchall(), headers, cur.statusmessage
+
+ def get_socket_directory(self):
+ with self.conn.cursor() as cur:
+ _logger.debug(
+ "Socket directory Query. sql: %r", self.socket_directory_query
+ )
+ cur.execute(self.socket_directory_query)
+ result = cur.fetchone()
+ return result[0] if result else ""
+
+ def foreignkeys(self):
+ """Yields ForeignKey named tuples"""
+
+ if self.conn.server_version < 90000:
+ return
+
+ with self.conn.cursor() as cur:
+ query = """
+ SELECT s_p.nspname AS parentschema,
+ t_p.relname AS parenttable,
+ unnest((
+ select
+ array_agg(attname ORDER BY i)
+ from
+ (select unnest(confkey) as attnum, generate_subscripts(confkey, 1) as i) x
+ JOIN pg_catalog.pg_attribute c USING(attnum)
+ WHERE c.attrelid = fk.confrelid
+ )) AS parentcolumn,
+ s_c.nspname AS childschema,
+ t_c.relname AS childtable,
+ unnest((
+ select
+ array_agg(attname ORDER BY i)
+ from
+ (select unnest(conkey) as attnum, generate_subscripts(conkey, 1) as i) x
+ JOIN pg_catalog.pg_attribute c USING(attnum)
+ WHERE c.attrelid = fk.conrelid
+ )) AS childcolumn
+ FROM pg_catalog.pg_constraint fk
+ JOIN pg_catalog.pg_class t_p ON t_p.oid = fk.confrelid
+ JOIN pg_catalog.pg_namespace s_p ON s_p.oid = t_p.relnamespace
+ JOIN pg_catalog.pg_class t_c ON t_c.oid = fk.conrelid
+ JOIN pg_catalog.pg_namespace s_c ON s_c.oid = t_c.relnamespace
+ WHERE fk.contype = 'f';
+ """
+ _logger.debug("Functions Query. sql: %r", query)
+ cur.execute(query)
+ for row in cur:
+ yield ForeignKey(*row)
+
+ def functions(self):
+ """Yields FunctionMetadata named tuples"""
+
+ if self.conn.server_version >= 110000:
+ query = """
+ SELECT n.nspname schema_name,
+ p.proname func_name,
+ p.proargnames,
+ COALESCE(proallargtypes::regtype[], proargtypes::regtype[])::text[],
+ p.proargmodes,
+ prorettype::regtype::text return_type,
+ p.prokind = 'a' is_aggregate,
+ p.prokind = 'w' is_window,
+ p.proretset is_set_returning,
+ d.deptype = 'e' is_extension,
+ pg_get_expr(proargdefaults, 0) AS arg_defaults
+ FROM pg_catalog.pg_proc p
+ INNER JOIN pg_catalog.pg_namespace n
+ ON n.oid = p.pronamespace
+ LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e'
+ WHERE p.prorettype::regtype != 'trigger'::regtype
+ ORDER BY 1, 2
+ """
+ elif self.conn.server_version > 90000:
+ query = """
+ SELECT n.nspname schema_name,
+ p.proname func_name,
+ p.proargnames,
+ COALESCE(proallargtypes::regtype[], proargtypes::regtype[])::text[],
+ p.proargmodes,
+ prorettype::regtype::text return_type,
+ p.proisagg is_aggregate,
+ p.proiswindow is_window,
+ p.proretset is_set_returning,
+ d.deptype = 'e' is_extension,
+ pg_get_expr(proargdefaults, 0) AS arg_defaults
+ FROM pg_catalog.pg_proc p
+ INNER JOIN pg_catalog.pg_namespace n
+ ON n.oid = p.pronamespace
+ LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e'
+ WHERE p.prorettype::regtype != 'trigger'::regtype
+ ORDER BY 1, 2
+ """
+ elif self.conn.server_version >= 80400:
+ query = """
+ SELECT n.nspname schema_name,
+ p.proname func_name,
+ p.proargnames,
+ COALESCE(proallargtypes::regtype[], proargtypes::regtype[])::text[],
+ p.proargmodes,
+ prorettype::regtype::text,
+ p.proisagg is_aggregate,
+ false is_window,
+ p.proretset is_set_returning,
+ d.deptype = 'e' is_extension,
+ NULL AS arg_defaults
+ FROM pg_catalog.pg_proc p
+ INNER JOIN pg_catalog.pg_namespace n
+ ON n.oid = p.pronamespace
+ LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e'
+ WHERE p.prorettype::regtype != 'trigger'::regtype
+ ORDER BY 1, 2
+ """
+ else:
+ query = """
+ SELECT n.nspname schema_name,
+ p.proname func_name,
+ p.proargnames,
+ NULL arg_types,
+ NULL arg_modes,
+ '' ret_type,
+ p.proisagg is_aggregate,
+ false is_window,
+ p.proretset is_set_returning,
+ d.deptype = 'e' is_extension,
+ NULL AS arg_defaults
+ FROM pg_catalog.pg_proc p
+ INNER JOIN pg_catalog.pg_namespace n
+ ON n.oid = p.pronamespace
+ LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e'
+ WHERE p.prorettype::regtype != 'trigger'::regtype
+ ORDER BY 1, 2
+ """
+
+ with self.conn.cursor() as cur:
+ _logger.debug("Functions Query. sql: %r", query)
+ cur.execute(query)
+ for row in cur:
+ yield FunctionMetadata(*row)
+
+ def datatypes(self):
+ """Yields tuples of (schema_name, type_name)"""
+
+ with self.conn.cursor() as cur:
+ if self.conn.server_version > 90000:
+ query = """
+ SELECT n.nspname schema_name,
+ t.typname type_name
+ FROM pg_catalog.pg_type t
+ INNER JOIN pg_catalog.pg_namespace n
+ ON n.oid = t.typnamespace
+ WHERE ( t.typrelid = 0 -- non-composite types
+ OR ( -- composite type, but not a table
+ SELECT c.relkind = 'c'
+ FROM pg_catalog.pg_class c
+ WHERE c.oid = t.typrelid
+ )
+ )
+ AND NOT EXISTS( -- ignore array types
+ SELECT 1
+ FROM pg_catalog.pg_type el
+ WHERE el.oid = t.typelem AND el.typarray = t.oid
+ )
+ AND n.nspname <> 'pg_catalog'
+ AND n.nspname <> 'information_schema'
+ ORDER BY 1, 2;
+ """
+ else:
+ query = """
+ SELECT n.nspname schema_name,
+ pg_catalog.format_type(t.oid, NULL) type_name
+ FROM pg_catalog.pg_type t
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+ WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
+ AND t.typname !~ '^_'
+ AND n.nspname <> 'pg_catalog'
+ AND n.nspname <> 'information_schema'
+ AND pg_catalog.pg_type_is_visible(t.oid)
+ ORDER BY 1, 2;
+ """
+ _logger.debug("Datatypes Query. sql: %r", query)
+ cur.execute(query)
+ for row in cur:
+ yield row
+
+ def casing(self):
+ """Yields the most common casing for names used in db functions"""
+ with self.conn.cursor() as cur:
+ query = r"""
+ WITH Words AS (
+ SELECT regexp_split_to_table(prosrc, '\W+') AS Word, COUNT(1)
+ FROM pg_catalog.pg_proc P
+ JOIN pg_catalog.pg_namespace N ON N.oid = P.pronamespace
+ JOIN pg_catalog.pg_language L ON L.oid = P.prolang
+ WHERE L.lanname IN ('sql', 'plpgsql')
+ AND N.nspname NOT IN ('pg_catalog', 'information_schema')
+ GROUP BY Word
+ ),
+ OrderWords AS (
+ SELECT Word,
+ ROW_NUMBER() OVER(PARTITION BY LOWER(Word) ORDER BY Count DESC)
+ FROM Words
+ WHERE Word ~* '.*[a-z].*'
+ ),
+ Names AS (
+ --Column names
+ SELECT attname AS Name
+ FROM pg_catalog.pg_attribute
+ UNION -- Table/view names
+ SELECT relname
+ FROM pg_catalog.pg_class
+ UNION -- Function names
+ SELECT proname
+ FROM pg_catalog.pg_proc
+ UNION -- Type names
+ SELECT typname
+ FROM pg_catalog.pg_type
+ UNION -- Schema names
+ SELECT nspname
+ FROM pg_catalog.pg_namespace
+ UNION -- Parameter names
+ SELECT unnest(proargnames)
+ FROM pg_proc
+ )
+ SELECT Word
+ FROM OrderWords
+ WHERE LOWER(Word) IN (SELECT Name FROM Names)
+ AND Row_Number = 1;
+ """
+ _logger.debug("Casing Query. sql: %r", query)
+ cur.execute(query)
+ for row in cur:
+ yield row[0]