diff options
Diffstat (limited to '')
-rw-r--r-- | pgcli/pgexecute.py | 857 |
1 files changed, 857 insertions, 0 deletions
diff --git a/pgcli/pgexecute.py b/pgcli/pgexecute.py new file mode 100644 index 0000000..d34bf26 --- /dev/null +++ b/pgcli/pgexecute.py @@ -0,0 +1,857 @@ +import traceback +import logging +import psycopg2 +import psycopg2.extras +import psycopg2.errorcodes +import psycopg2.extensions as ext +import sqlparse +import pgspecial as special +import select +from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE, make_dsn +from .packages.parseutils.meta import FunctionMetadata, ForeignKey + +_logger = logging.getLogger(__name__) + +# Cast all database input to unicode automatically. +# See http://initd.org/psycopg/docs/usage.html#unicode-handling for more info. +ext.register_type(ext.UNICODE) +ext.register_type(ext.UNICODEARRAY) +ext.register_type(ext.new_type((705,), "UNKNOWN", ext.UNICODE)) +# See https://github.com/dbcli/pgcli/issues/426 for more details. +# This registers a unicode type caster for datatype 'RECORD'. +ext.register_type(ext.new_type((2249,), "RECORD", ext.UNICODE)) + +# Cast bytea fields to text. By default, this will render as hex strings with +# Postgres 9+ and as escaped binary in earlier versions. +ext.register_type(ext.new_type((17,), "BYTEA_TEXT", psycopg2.STRING)) + +# TODO: Get default timeout from pgclirc? +_WAIT_SELECT_TIMEOUT = 1 + + +def _wait_select(conn): + """ + copy-pasted from psycopg2.extras.wait_select + the default implementation doesn't define a timeout in the select calls + """ + while 1: + try: + state = conn.poll() + if state == POLL_OK: + break + elif state == POLL_READ: + select.select([conn.fileno()], [], [], _WAIT_SELECT_TIMEOUT) + elif state == POLL_WRITE: + select.select([], [conn.fileno()], [], _WAIT_SELECT_TIMEOUT) + else: + raise conn.OperationalError("bad state from poll: %s" % state) + except KeyboardInterrupt: + conn.cancel() + # the loop will be broken by a server error + continue + except select.error as e: + errno = e.args[0] + if errno != 4: + raise + + +# When running a query, make pressing CTRL+C raise a KeyboardInterrupt +# See http://initd.org/psycopg/articles/2014/07/20/cancelling-postgresql-statements-python/ +# See also https://github.com/psycopg/psycopg2/issues/468 +ext.set_wait_callback(_wait_select) + + +def register_date_typecasters(connection): + """ + Casts date and timestamp values to string, resolves issues with out of + range dates (e.g. BC) which psycopg2 can't handle + """ + + def cast_date(value, cursor): + return value + + cursor = connection.cursor() + cursor.execute("SELECT NULL::date") + date_oid = cursor.description[0][1] + cursor.execute("SELECT NULL::timestamp") + timestamp_oid = cursor.description[0][1] + cursor.execute("SELECT NULL::timestamp with time zone") + timestamptz_oid = cursor.description[0][1] + oids = (date_oid, timestamp_oid, timestamptz_oid) + new_type = psycopg2.extensions.new_type(oids, "DATE", cast_date) + psycopg2.extensions.register_type(new_type) + + +def register_json_typecasters(conn, loads_fn): + """Set the function for converting JSON data for a connection. + + Use the supplied function to decode JSON data returned from the database + via the given connection. The function should accept a single argument of + the data as a string encoded in the database's character encoding. + psycopg2's default handler for JSON data is json.loads. + http://initd.org/psycopg/docs/extras.html#json-adaptation + + This function attempts to register the typecaster for both JSON and JSONB + types. + + Returns a set that is a subset of {'json', 'jsonb'} indicating which types + (if any) were successfully registered. + """ + available = set() + + for name in ["json", "jsonb"]: + try: + psycopg2.extras.register_json(conn, loads=loads_fn, name=name) + available.add(name) + except psycopg2.ProgrammingError: + pass + + return available + + +def register_hstore_typecaster(conn): + """ + Instead of using register_hstore() which converts hstore into a python + dict, we query the 'oid' of hstore which will be different for each + database and register a type caster that converts it to unicode. + http://initd.org/psycopg/docs/extras.html#psycopg2.extras.register_hstore + """ + with conn.cursor() as cur: + try: + cur.execute( + "select t.oid FROM pg_type t WHERE t.typname = 'hstore' and t.typisdefined" + ) + oid = cur.fetchone()[0] + ext.register_type(ext.new_type((oid,), "HSTORE", ext.UNICODE)) + except Exception: + pass + + +class PGExecute(object): + + # The boolean argument to the current_schemas function indicates whether + # implicit schemas, e.g. pg_catalog + search_path_query = """ + SELECT * FROM unnest(current_schemas(true))""" + + schemata_query = """ + SELECT nspname + FROM pg_catalog.pg_namespace + ORDER BY 1 """ + + tables_query = """ + SELECT n.nspname schema_name, + c.relname table_name + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n + ON n.oid = c.relnamespace + WHERE c.relkind = ANY(%s) + ORDER BY 1,2;""" + + databases_query = """ + SELECT d.datname + FROM pg_catalog.pg_database d + ORDER BY 1""" + + full_databases_query = """ + SELECT d.datname as "Name", + pg_catalog.pg_get_userbyid(d.datdba) as "Owner", + pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding", + d.datcollate as "Collate", + d.datctype as "Ctype", + pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges" + FROM pg_catalog.pg_database d + ORDER BY 1""" + + socket_directory_query = """ + SELECT setting + FROM pg_settings + WHERE name = 'unix_socket_directories' + """ + + view_definition_query = """ + WITH v AS (SELECT %s::pg_catalog.regclass::pg_catalog.oid AS v_oid) + SELECT nspname, relname, relkind, + pg_catalog.pg_get_viewdef(c.oid, true), + array_remove(array_remove(c.reloptions,'check_option=local'), + 'check_option=cascaded') AS reloptions, + CASE + WHEN 'check_option=local' = ANY (c.reloptions) THEN 'LOCAL'::text + WHEN 'check_option=cascaded' = ANY (c.reloptions) THEN 'CASCADED'::text + ELSE NULL + END AS checkoption + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) + JOIN v ON (c.oid = v.v_oid)""" + + function_definition_query = """ + WITH f AS + (SELECT %s::pg_catalog.regproc::pg_catalog.oid AS f_oid) + SELECT pg_catalog.pg_get_functiondef(f.f_oid) + FROM f""" + + version_query = "SELECT version();" + + def __init__( + self, + database=None, + user=None, + password=None, + host=None, + port=None, + dsn=None, + **kwargs, + ): + self._conn_params = {} + self.conn = None + self.dbname = None + self.user = None + self.password = None + self.host = None + self.port = None + self.server_version = None + self.extra_args = None + self.connect(database, user, password, host, port, dsn, **kwargs) + self.reset_expanded = None + + def copy(self): + """Returns a clone of the current executor.""" + return self.__class__(**self._conn_params) + + def connect( + self, + database=None, + user=None, + password=None, + host=None, + port=None, + dsn=None, + **kwargs, + ): + + conn_params = self._conn_params.copy() + + new_params = { + "database": database, + "user": user, + "password": password, + "host": host, + "port": port, + "dsn": dsn, + } + new_params.update(kwargs) + + if new_params["dsn"]: + new_params = {"dsn": new_params["dsn"], "password": new_params["password"]} + + if new_params["password"]: + new_params["dsn"] = make_dsn( + new_params["dsn"], password=new_params.pop("password") + ) + + conn_params.update({k: v for k, v in new_params.items() if v}) + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor() + conn.set_client_encoding("utf8") + + self._conn_params = conn_params + if self.conn: + self.conn.close() + self.conn = conn + self.conn.autocommit = True + + # When we connect using a DSN, we don't really know what db, + # user, etc. we connected to. Let's read it. + # Note: moved this after setting autocommit because of #664. + libpq_version = psycopg2.__libpq_version__ + dsn_parameters = {} + if libpq_version >= 93000: + # use actual connection info from psycopg2.extensions.Connection.info + # as libpq_version > 9.3 is available and required dependency + dsn_parameters = conn.info.dsn_parameters + else: + try: + dsn_parameters = conn.get_dsn_parameters() + except Exception as x: + # https://github.com/dbcli/pgcli/issues/1110 + # PQconninfo not available in libpq < 9.3 + _logger.info("Exception in get_dsn_parameters: %r", x) + + if dsn_parameters: + self.dbname = dsn_parameters.get("dbname") + self.user = dsn_parameters.get("user") + self.host = dsn_parameters.get("host") + self.port = dsn_parameters.get("port") + else: + self.dbname = conn_params.get("database") + self.user = conn_params.get("user") + self.host = conn_params.get("host") + self.port = conn_params.get("port") + + self.password = password + self.extra_args = kwargs + + if not self.host: + self.host = self.get_socket_directory() + + pid = self._select_one(cursor, "select pg_backend_pid()")[0] + self.pid = pid + self.superuser = conn.get_parameter_status("is_superuser") in ("on", "1") + self.server_version = conn.get_parameter_status("server_version") + + register_date_typecasters(conn) + register_json_typecasters(self.conn, self._json_typecaster) + register_hstore_typecaster(self.conn) + + @property + def short_host(self): + if "," in self.host: + host, _, _ = self.host.partition(",") + else: + host = self.host + short_host, _, _ = host.partition(".") + return short_host + + def _select_one(self, cur, sql): + """ + Helper method to run a select and retrieve a single field value + :param cur: cursor + :param sql: string + :return: string + """ + cur.execute(sql) + return cur.fetchone() + + def _json_typecaster(self, json_data): + """Interpret incoming JSON data as a string. + + The raw data is decoded using the connection's encoding, which defaults + to the database's encoding. + + See http://initd.org/psycopg/docs/connection.html#connection.encoding + """ + + return json_data + + def failed_transaction(self): + status = self.conn.get_transaction_status() + return status == ext.TRANSACTION_STATUS_INERROR + + def valid_transaction(self): + status = self.conn.get_transaction_status() + return ( + status == ext.TRANSACTION_STATUS_ACTIVE + or status == ext.TRANSACTION_STATUS_INTRANS + ) + + def run( + self, statement, pgspecial=None, exception_formatter=None, on_error_resume=False + ): + """Execute the sql in the database and return the results. + + :param statement: A string containing one or more sql statements + :param pgspecial: PGSpecial object + :param exception_formatter: A callable that accepts an Exception and + returns a formatted (title, rows, headers, status) tuple that can + act as a query result. If an exception_formatter is not supplied, + psycopg2 exceptions are always raised. + :param on_error_resume: Bool. If true, queries following an exception + (assuming exception_formatter has been supplied) continue to + execute. + + :return: Generator yielding tuples containing + (title, rows, headers, status, query, success, is_special) + """ + + # Remove spaces and EOL + statement = statement.strip() + if not statement: # Empty string + yield (None, None, None, None, statement, False, False) + + # Split the sql into separate queries and run each one. + for sql in sqlparse.split(statement): + # Remove spaces, eol and semi-colons. + sql = sql.rstrip(";") + sql = sqlparse.format(sql, strip_comments=True).strip() + if not sql: + continue + try: + if pgspecial: + # \G is treated specially since we have to set the expanded output. + if sql.endswith("\\G"): + if not pgspecial.expanded_output: + pgspecial.expanded_output = True + self.reset_expanded = True + sql = sql[:-2].strip() + + # First try to run each query as special + _logger.debug("Trying a pgspecial command. sql: %r", sql) + try: + cur = self.conn.cursor() + except psycopg2.InterfaceError: + # edge case when connection is already closed, but we + # don't need cursor for special_cmd.arg_type == NO_QUERY. + # See https://github.com/dbcli/pgcli/issues/1014. + cur = None + try: + for result in pgspecial.execute(cur, sql): + # e.g. execute_from_file already appends these + if len(result) < 7: + yield result + (sql, True, True) + else: + yield result + continue + except special.CommandNotFound: + pass + + # Not a special command, so execute as normal sql + yield self.execute_normal_sql(sql) + (sql, True, False) + except psycopg2.DatabaseError as e: + _logger.error("sql: %r, error: %r", sql, e) + _logger.error("traceback: %r", traceback.format_exc()) + + if self._must_raise(e) or not exception_formatter: + raise + + yield None, None, None, exception_formatter(e), sql, False, False + + if not on_error_resume: + break + finally: + if self.reset_expanded: + pgspecial.expanded_output = False + self.reset_expanded = None + + def _must_raise(self, e): + """Return true if e is an error that should not be caught in ``run``. + + An uncaught error will prompt the user to reconnect; as long as we + detect that the connection is stil open, we catch the error, as + reconnecting won't solve that problem. + + :param e: DatabaseError. An exception raised while executing a query. + + :return: Bool. True if ``run`` must raise this exception. + + """ + return self.conn.closed != 0 + + def execute_normal_sql(self, split_sql): + """Returns tuple (title, rows, headers, status)""" + _logger.debug("Regular sql statement. sql: %r", split_sql) + cur = self.conn.cursor() + cur.execute(split_sql) + + # conn.notices persist between queies, we use pop to clear out the list + title = "" + while len(self.conn.notices) > 0: + title = self.conn.notices.pop() + title + + # cur.description will be None for operations that do not return + # rows. + if cur.description: + headers = [x[0] for x in cur.description] + return title, cur, headers, cur.statusmessage + else: + _logger.debug("No rows in result.") + return title, None, None, cur.statusmessage + + def search_path(self): + """Returns the current search path as a list of schema names""" + + try: + with self.conn.cursor() as cur: + _logger.debug("Search path query. sql: %r", self.search_path_query) + cur.execute(self.search_path_query) + return [x[0] for x in cur.fetchall()] + except psycopg2.ProgrammingError: + fallback = "SELECT * FROM current_schemas(true)" + with self.conn.cursor() as cur: + _logger.debug("Search path query. sql: %r", fallback) + cur.execute(fallback) + return cur.fetchone()[0] + + def view_definition(self, spec): + """Returns the SQL defining views described by `spec`""" + + template = "CREATE OR REPLACE {6} VIEW {0}.{1} AS \n{3}" + # 2: relkind, v or m (materialized) + # 4: reloptions, null + # 5: checkoption: local or cascaded + with self.conn.cursor() as cur: + sql = self.view_definition_query + _logger.debug("View Definition Query. sql: %r\nspec: %r", sql, spec) + try: + cur.execute(sql, (spec,)) + except psycopg2.ProgrammingError: + raise RuntimeError("View {} does not exist.".format(spec)) + result = cur.fetchone() + view_type = "MATERIALIZED" if result[2] == "m" else "" + return template.format(*result + (view_type,)) + + def function_definition(self, spec): + """Returns the SQL defining functions described by `spec`""" + + with self.conn.cursor() as cur: + sql = self.function_definition_query + _logger.debug("Function Definition Query. sql: %r\nspec: %r", sql, spec) + try: + cur.execute(sql, (spec,)) + result = cur.fetchone() + return result[0] + except psycopg2.ProgrammingError: + raise RuntimeError("Function {} does not exist.".format(spec)) + + def schemata(self): + """Returns a list of schema names in the database""" + + with self.conn.cursor() as cur: + _logger.debug("Schemata Query. sql: %r", self.schemata_query) + cur.execute(self.schemata_query) + return [x[0] for x in cur.fetchall()] + + def _relations(self, kinds=("r", "p", "f", "v", "m")): + """Get table or view name metadata + + :param kinds: list of postgres relkind filters: + 'r' - table + 'p' - partitioned table + 'f' - foreign table + 'v' - view + 'm' - materialized view + :return: (schema_name, rel_name) tuples + """ + + with self.conn.cursor() as cur: + sql = cur.mogrify(self.tables_query, [kinds]) + _logger.debug("Tables Query. sql: %r", sql) + cur.execute(sql) + for row in cur: + yield row + + def tables(self): + """Yields (schema_name, table_name) tuples""" + for row in self._relations(kinds=["r", "p", "f"]): + yield row + + def views(self): + """Yields (schema_name, view_name) tuples. + + Includes both views and and materialized views + """ + for row in self._relations(kinds=["v", "m"]): + yield row + + def _columns(self, kinds=("r", "p", "f", "v", "m")): + """Get column metadata for tables and views + + :param kinds: kinds: list of postgres relkind filters: + 'r' - table + 'p' - partitioned table + 'f' - foreign table + 'v' - view + 'm' - materialized view + :return: list of (schema_name, relation_name, column_name, column_type) tuples + """ + + if self.conn.server_version >= 80400: + columns_query = """ + SELECT nsp.nspname schema_name, + cls.relname table_name, + att.attname column_name, + att.atttypid::regtype::text type_name, + att.atthasdef AS has_default, + pg_catalog.pg_get_expr(def.adbin, def.adrelid, true) as default + FROM pg_catalog.pg_attribute att + INNER JOIN pg_catalog.pg_class cls + ON att.attrelid = cls.oid + INNER JOIN pg_catalog.pg_namespace nsp + ON cls.relnamespace = nsp.oid + LEFT OUTER JOIN pg_attrdef def + ON def.adrelid = att.attrelid + AND def.adnum = att.attnum + WHERE cls.relkind = ANY(%s) + AND NOT att.attisdropped + AND att.attnum > 0 + ORDER BY 1, 2, att.attnum""" + else: + columns_query = """ + SELECT nsp.nspname schema_name, + cls.relname table_name, + att.attname column_name, + typ.typname type_name, + NULL AS has_default, + NULL AS default + FROM pg_catalog.pg_attribute att + INNER JOIN pg_catalog.pg_class cls + ON att.attrelid = cls.oid + INNER JOIN pg_catalog.pg_namespace nsp + ON cls.relnamespace = nsp.oid + INNER JOIN pg_catalog.pg_type typ + ON typ.oid = att.atttypid + WHERE cls.relkind = ANY(%s) + AND NOT att.attisdropped + AND att.attnum > 0 + ORDER BY 1, 2, att.attnum""" + + with self.conn.cursor() as cur: + sql = cur.mogrify(columns_query, [kinds]) + _logger.debug("Columns Query. sql: %r", sql) + cur.execute(sql) + for row in cur: + yield row + + def table_columns(self): + for row in self._columns(kinds=["r", "p", "f"]): + yield row + + def view_columns(self): + for row in self._columns(kinds=["v", "m"]): + yield row + + def databases(self): + with self.conn.cursor() as cur: + _logger.debug("Databases Query. sql: %r", self.databases_query) + cur.execute(self.databases_query) + return [x[0] for x in cur.fetchall()] + + def full_databases(self): + with self.conn.cursor() as cur: + _logger.debug("Databases Query. sql: %r", self.full_databases_query) + cur.execute(self.full_databases_query) + headers = [x[0] for x in cur.description] + return cur.fetchall(), headers, cur.statusmessage + + def get_socket_directory(self): + with self.conn.cursor() as cur: + _logger.debug( + "Socket directory Query. sql: %r", self.socket_directory_query + ) + cur.execute(self.socket_directory_query) + result = cur.fetchone() + return result[0] if result else "" + + def foreignkeys(self): + """Yields ForeignKey named tuples""" + + if self.conn.server_version < 90000: + return + + with self.conn.cursor() as cur: + query = """ + SELECT s_p.nspname AS parentschema, + t_p.relname AS parenttable, + unnest(( + select + array_agg(attname ORDER BY i) + from + (select unnest(confkey) as attnum, generate_subscripts(confkey, 1) as i) x + JOIN pg_catalog.pg_attribute c USING(attnum) + WHERE c.attrelid = fk.confrelid + )) AS parentcolumn, + s_c.nspname AS childschema, + t_c.relname AS childtable, + unnest(( + select + array_agg(attname ORDER BY i) + from + (select unnest(conkey) as attnum, generate_subscripts(conkey, 1) as i) x + JOIN pg_catalog.pg_attribute c USING(attnum) + WHERE c.attrelid = fk.conrelid + )) AS childcolumn + FROM pg_catalog.pg_constraint fk + JOIN pg_catalog.pg_class t_p ON t_p.oid = fk.confrelid + JOIN pg_catalog.pg_namespace s_p ON s_p.oid = t_p.relnamespace + JOIN pg_catalog.pg_class t_c ON t_c.oid = fk.conrelid + JOIN pg_catalog.pg_namespace s_c ON s_c.oid = t_c.relnamespace + WHERE fk.contype = 'f'; + """ + _logger.debug("Functions Query. sql: %r", query) + cur.execute(query) + for row in cur: + yield ForeignKey(*row) + + def functions(self): + """Yields FunctionMetadata named tuples""" + + if self.conn.server_version >= 110000: + query = """ + SELECT n.nspname schema_name, + p.proname func_name, + p.proargnames, + COALESCE(proallargtypes::regtype[], proargtypes::regtype[])::text[], + p.proargmodes, + prorettype::regtype::text return_type, + p.prokind = 'a' is_aggregate, + p.prokind = 'w' is_window, + p.proretset is_set_returning, + d.deptype = 'e' is_extension, + pg_get_expr(proargdefaults, 0) AS arg_defaults + FROM pg_catalog.pg_proc p + INNER JOIN pg_catalog.pg_namespace n + ON n.oid = p.pronamespace + LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e' + WHERE p.prorettype::regtype != 'trigger'::regtype + ORDER BY 1, 2 + """ + elif self.conn.server_version > 90000: + query = """ + SELECT n.nspname schema_name, + p.proname func_name, + p.proargnames, + COALESCE(proallargtypes::regtype[], proargtypes::regtype[])::text[], + p.proargmodes, + prorettype::regtype::text return_type, + p.proisagg is_aggregate, + p.proiswindow is_window, + p.proretset is_set_returning, + d.deptype = 'e' is_extension, + pg_get_expr(proargdefaults, 0) AS arg_defaults + FROM pg_catalog.pg_proc p + INNER JOIN pg_catalog.pg_namespace n + ON n.oid = p.pronamespace + LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e' + WHERE p.prorettype::regtype != 'trigger'::regtype + ORDER BY 1, 2 + """ + elif self.conn.server_version >= 80400: + query = """ + SELECT n.nspname schema_name, + p.proname func_name, + p.proargnames, + COALESCE(proallargtypes::regtype[], proargtypes::regtype[])::text[], + p.proargmodes, + prorettype::regtype::text, + p.proisagg is_aggregate, + false is_window, + p.proretset is_set_returning, + d.deptype = 'e' is_extension, + NULL AS arg_defaults + FROM pg_catalog.pg_proc p + INNER JOIN pg_catalog.pg_namespace n + ON n.oid = p.pronamespace + LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e' + WHERE p.prorettype::regtype != 'trigger'::regtype + ORDER BY 1, 2 + """ + else: + query = """ + SELECT n.nspname schema_name, + p.proname func_name, + p.proargnames, + NULL arg_types, + NULL arg_modes, + '' ret_type, + p.proisagg is_aggregate, + false is_window, + p.proretset is_set_returning, + d.deptype = 'e' is_extension, + NULL AS arg_defaults + FROM pg_catalog.pg_proc p + INNER JOIN pg_catalog.pg_namespace n + ON n.oid = p.pronamespace + LEFT JOIN pg_depend d ON d.objid = p.oid and d.deptype = 'e' + WHERE p.prorettype::regtype != 'trigger'::regtype + ORDER BY 1, 2 + """ + + with self.conn.cursor() as cur: + _logger.debug("Functions Query. sql: %r", query) + cur.execute(query) + for row in cur: + yield FunctionMetadata(*row) + + def datatypes(self): + """Yields tuples of (schema_name, type_name)""" + + with self.conn.cursor() as cur: + if self.conn.server_version > 90000: + query = """ + SELECT n.nspname schema_name, + t.typname type_name + FROM pg_catalog.pg_type t + INNER JOIN pg_catalog.pg_namespace n + ON n.oid = t.typnamespace + WHERE ( t.typrelid = 0 -- non-composite types + OR ( -- composite type, but not a table + SELECT c.relkind = 'c' + FROM pg_catalog.pg_class c + WHERE c.oid = t.typrelid + ) + ) + AND NOT EXISTS( -- ignore array types + SELECT 1 + FROM pg_catalog.pg_type el + WHERE el.oid = t.typelem AND el.typarray = t.oid + ) + AND n.nspname <> 'pg_catalog' + AND n.nspname <> 'information_schema' + ORDER BY 1, 2; + """ + else: + query = """ + SELECT n.nspname schema_name, + pg_catalog.format_type(t.oid, NULL) type_name + FROM pg_catalog.pg_type t + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace + WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid)) + AND t.typname !~ '^_' + AND n.nspname <> 'pg_catalog' + AND n.nspname <> 'information_schema' + AND pg_catalog.pg_type_is_visible(t.oid) + ORDER BY 1, 2; + """ + _logger.debug("Datatypes Query. sql: %r", query) + cur.execute(query) + for row in cur: + yield row + + def casing(self): + """Yields the most common casing for names used in db functions""" + with self.conn.cursor() as cur: + query = r""" + WITH Words AS ( + SELECT regexp_split_to_table(prosrc, '\W+') AS Word, COUNT(1) + FROM pg_catalog.pg_proc P + JOIN pg_catalog.pg_namespace N ON N.oid = P.pronamespace + JOIN pg_catalog.pg_language L ON L.oid = P.prolang + WHERE L.lanname IN ('sql', 'plpgsql') + AND N.nspname NOT IN ('pg_catalog', 'information_schema') + GROUP BY Word + ), + OrderWords AS ( + SELECT Word, + ROW_NUMBER() OVER(PARTITION BY LOWER(Word) ORDER BY Count DESC) + FROM Words + WHERE Word ~* '.*[a-z].*' + ), + Names AS ( + --Column names + SELECT attname AS Name + FROM pg_catalog.pg_attribute + UNION -- Table/view names + SELECT relname + FROM pg_catalog.pg_class + UNION -- Function names + SELECT proname + FROM pg_catalog.pg_proc + UNION -- Type names + SELECT typname + FROM pg_catalog.pg_type + UNION -- Schema names + SELECT nspname + FROM pg_catalog.pg_namespace + UNION -- Parameter names + SELECT unnest(proargnames) + FROM pg_proc + ) + SELECT Word + FROM OrderWords + WHERE LOWER(Word) IN (SELECT Name FROM Names) + AND Row_Number = 1; + """ + _logger.debug("Casing Query. sql: %r", query) + cur.execute(query) + for row in cur: + yield row[0] |