summaryrefslogtreecommitdiffstats
path: root/pgspecial/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'pgspecial/main.py')
-rw-r--r--pgspecial/main.py349
1 files changed, 349 insertions, 0 deletions
diff --git a/pgspecial/main.py b/pgspecial/main.py
new file mode 100644
index 0000000..b13191e
--- /dev/null
+++ b/pgspecial/main.py
@@ -0,0 +1,349 @@
+from __future__ import unicode_literals
+import os
+import logging
+from collections import namedtuple
+
+from . import export
+from .help.commands import helpcommands
+
+log = logging.getLogger(__name__)
+
+NO_QUERY = 0
+PARSED_QUERY = 1
+RAW_QUERY = 2
+
+PAGER_ALWAYS = 2
+PAGER_LONG_OUTPUT = 1
+PAGER_OFF = 0
+
+PAGER_MSG = {
+ PAGER_OFF: "Pager usage is off.",
+ PAGER_LONG_OUTPUT: "Pager is used for long output.",
+ PAGER_ALWAYS: "Pager is always used.",
+}
+
+SpecialCommand = namedtuple(
+ "SpecialCommand",
+ ["handler", "syntax", "description", "arg_type", "hidden", "case_sensitive"],
+)
+
+
+@export
+class CommandNotFound(Exception):
+ pass
+
+
+@export
+class PGSpecial(object):
+ # Default static commands that don't rely on PGSpecial state are registered
+ # via the special_command decorator and stored in default_commands
+ default_commands = {}
+
+ def __init__(self):
+ self.timing_enabled = True
+
+ self.commands = self.default_commands.copy()
+ self.timing_enabled = False
+ self.expanded_output = False
+ self.auto_expand = False
+ self.pager_config = PAGER_ALWAYS
+ self.pager = os.environ.get("PAGER", "")
+
+ self.register(
+ self.show_help, "\\?", "\\?", "Show Commands.", arg_type=PARSED_QUERY
+ )
+
+ self.register(
+ self.toggle_expanded_output,
+ "\\x",
+ "\\x",
+ "Toggle expanded output.",
+ arg_type=PARSED_QUERY,
+ )
+
+ self.register(
+ self.call_pset,
+ "\\pset",
+ "\\pset [key] [value]",
+ "A limited version of traditional \\pset",
+ arg_type=PARSED_QUERY,
+ )
+
+ self.register(
+ self.show_command_help,
+ "\\h",
+ "\\h",
+ "Show SQL syntax and help.",
+ arg_type=PARSED_QUERY,
+ )
+
+ self.register(
+ self.toggle_timing,
+ "\\timing",
+ "\\timing",
+ "Toggle timing of commands.",
+ arg_type=NO_QUERY,
+ )
+
+ self.register(
+ self.set_pager,
+ "\\pager",
+ "\\pager [command]",
+ "Set PAGER. Print the query results via PAGER.",
+ arg_type=PARSED_QUERY,
+ )
+
+ def register(self, *args, **kwargs):
+ register_special_command(*args, command_dict=self.commands, **kwargs)
+
+ def execute(self, cur, sql):
+ commands = self.commands
+ command, verbose, pattern = parse_special_command(sql)
+
+ if (command not in commands) and (command.lower() not in commands):
+ raise CommandNotFound
+
+ try:
+ special_cmd = commands[command]
+ except KeyError:
+ special_cmd = commands[command.lower()]
+ if special_cmd.case_sensitive:
+ raise CommandNotFound("Command not found: %s" % command)
+
+ if special_cmd.arg_type == NO_QUERY:
+ return special_cmd.handler()
+ elif special_cmd.arg_type == PARSED_QUERY:
+ return special_cmd.handler(cur=cur, pattern=pattern, verbose=verbose)
+ elif special_cmd.arg_type == RAW_QUERY:
+ return special_cmd.handler(cur=cur, query=sql)
+
+ def show_help(self, pattern, **_):
+ if pattern.strip():
+ return self.show_command_help(pattern)
+
+ headers = ["Command", "Description"]
+ result = []
+
+ for _, value in sorted(self.commands.items()):
+ if not value.hidden:
+ result.append((value.syntax, value.description))
+ return [(None, result, headers, None)]
+
+ def show_command_help_listing(self):
+ table = chunks(sorted(helpcommands.keys()), 6)
+ return [(None, table, [], None)]
+
+ def show_command_help(self, pattern, **_):
+ command = pattern.strip().upper()
+ message = ""
+
+ if not command:
+ return self.show_command_help_listing()
+
+ if command in helpcommands:
+ helpcommand = helpcommands[command]
+
+ if "description" in helpcommand:
+ message += helpcommand["description"]
+ if "synopsis" in helpcommand:
+ message += "\nSyntax:\n"
+ message += helpcommand["synopsis"]
+ else:
+ message = 'No help available for "%s"' % pattern
+ message += "\nTry \\h with no arguments to see available help."
+
+ return [(None, None, None, message)]
+
+ def toggle_expanded_output(self, pattern, **_):
+ flag = pattern.strip()
+ if flag == "auto":
+ self.auto_expand = True
+ self.expanded_output = False
+ return [(None, None, None, "Expanded display is used automatically.")]
+ elif flag == "off":
+ self.expanded_output = False
+ elif flag == "on":
+ self.expanded_output = True
+ else:
+ self.expanded_output = not (self.expanded_output or self.auto_expand)
+
+ self.auto_expand = self.expanded_output
+ message = "Expanded display is "
+ message += "on." if self.expanded_output else "off."
+ return [(None, None, None, message)]
+
+ def toggle_timing(self):
+ self.timing_enabled = not self.timing_enabled
+ message = "Timing is "
+ message += "on." if self.timing_enabled else "off."
+ return [(None, None, None, message)]
+
+ def call_pset(self, pattern, **_):
+ pattern = pattern.split(" ", 2)
+ val = pattern[1] if len(pattern) > 1 else ""
+ key = pattern[0]
+ if hasattr(self, "pset_" + key):
+ return getattr(self, "pset_" + key)(val)
+ else:
+ return [(None, None, None, "'%s' is currently not supported by pset" % key)]
+
+ def pset_pager(self, value):
+ if value == "always":
+ self.pager_config = PAGER_ALWAYS
+ elif value == "off":
+ self.pager_config = PAGER_OFF
+ elif value == "on":
+ self.pager_config = PAGER_LONG_OUTPUT
+ elif self.pager_config == PAGER_LONG_OUTPUT:
+ self.pager_config = PAGER_OFF
+ else:
+ self.pager_config = PAGER_LONG_OUTPUT
+ return [(None, None, None, "%s" % PAGER_MSG[self.pager_config])]
+
+ def set_pager(self, pattern, **_):
+ if not pattern:
+ if not self.pager:
+ os.environ.pop("PAGER", None)
+ msg = "Pager reset to system default."
+ else:
+ os.environ["PAGER"] = self.pager
+ msg = "Reset pager back to default. Default: %s" % self.pager
+ else:
+ os.environ["PAGER"] = pattern
+ msg = "PAGER set to %s." % pattern
+
+ return [(None, None, None, msg)]
+
+
+@export
+def content_exceeds_width(row, width):
+ # Account for 3 characters between each column
+ separator_space = len(row) * 3
+ # Add 2 columns for a bit of buffer
+ line_len = sum([len(x) for x in row]) + separator_space + 2
+ return line_len > width
+
+
+@export
+def parse_special_command(sql):
+ command, _, arg = sql.partition(" ")
+ verbose = "+" in command
+
+ command = command.strip().replace("+", "")
+ return (command, verbose, arg.strip())
+
+
+def show_extra_help_command(command, syntax, description):
+ r"""
+ A decorator used internally for registering help for a command that is not
+ automatically executed via PGSpecial.execute, but invoked manually by the
+ caller (e.g. \watch).
+ """
+
+ @special_command(command, syntax, description, arg_type=NO_QUERY)
+ def placeholder():
+ raise RuntimeError
+
+ def wrapper(wrapped):
+ return wrapped
+
+ return wrapper
+
+
+def special_command(
+ command,
+ syntax,
+ description,
+ arg_type=PARSED_QUERY,
+ hidden=False,
+ case_sensitive=True,
+ aliases=(),
+):
+ """A decorator used internally for static special commands"""
+
+ def wrapper(wrapped):
+ register_special_command(
+ wrapped,
+ command,
+ syntax,
+ description,
+ arg_type,
+ hidden,
+ case_sensitive,
+ aliases,
+ command_dict=PGSpecial.default_commands,
+ )
+ return wrapped
+
+ return wrapper
+
+
+def register_special_command(
+ handler,
+ command,
+ syntax,
+ description,
+ arg_type=PARSED_QUERY,
+ hidden=False,
+ case_sensitive=True,
+ aliases=(),
+ command_dict=None,
+):
+ cmd = command.lower() if not case_sensitive else command
+ command_dict[cmd] = SpecialCommand(
+ handler, syntax, description, arg_type, hidden, case_sensitive
+ )
+ for alias in aliases:
+ cmd = alias.lower() if not case_sensitive else alias
+ command_dict[cmd] = SpecialCommand(
+ handler,
+ syntax,
+ description,
+ arg_type,
+ case_sensitive=case_sensitive,
+ hidden=True,
+ )
+
+
+def chunks(l, n):
+ n = max(1, n)
+ return [l[i : i + n] for i in range(0, len(l), n)]
+
+
+@special_command(
+ "\\e", "\\e [file]", "Edit the query with external editor.", arg_type=NO_QUERY
+)
+@special_command(
+ "\\ef",
+ "\\ef [funcname [line]]",
+ "Edit the contents of the query buffer.",
+ arg_type=NO_QUERY,
+ hidden=True,
+)
+@special_command(
+ "\\ev",
+ "\\ev [viewname [line]]",
+ "Edit the contents of the query buffer.",
+ arg_type=NO_QUERY,
+ hidden=True,
+)
+def doc_only():
+ "Documention placeholder. Implemented in pgcli.main.handle_editor_command"
+ raise RuntimeError
+
+
+@special_command(
+ "\\do", "\\do[S] [pattern]", "List operators.", arg_type=NO_QUERY, hidden=True
+)
+@special_command(
+ "\\dp",
+ "\\dp [pattern]",
+ "List table, view, and sequence access privileges.",
+ arg_type=NO_QUERY,
+ hidden=True,
+)
+@special_command(
+ "\\z", "\\z [pattern]", "Same as \\dp.", arg_type=NO_QUERY, hidden=True
+)
+def place_holder():
+ raise NotImplementedError