diff options
Diffstat (limited to 'pgspecial/main.py')
-rw-r--r-- | pgspecial/main.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/pgspecial/main.py b/pgspecial/main.py new file mode 100644 index 0000000..b13191e --- /dev/null +++ b/pgspecial/main.py @@ -0,0 +1,349 @@ +from __future__ import unicode_literals +import os +import logging +from collections import namedtuple + +from . import export +from .help.commands import helpcommands + +log = logging.getLogger(__name__) + +NO_QUERY = 0 +PARSED_QUERY = 1 +RAW_QUERY = 2 + +PAGER_ALWAYS = 2 +PAGER_LONG_OUTPUT = 1 +PAGER_OFF = 0 + +PAGER_MSG = { + PAGER_OFF: "Pager usage is off.", + PAGER_LONG_OUTPUT: "Pager is used for long output.", + PAGER_ALWAYS: "Pager is always used.", +} + +SpecialCommand = namedtuple( + "SpecialCommand", + ["handler", "syntax", "description", "arg_type", "hidden", "case_sensitive"], +) + + +@export +class CommandNotFound(Exception): + pass + + +@export +class PGSpecial(object): + # Default static commands that don't rely on PGSpecial state are registered + # via the special_command decorator and stored in default_commands + default_commands = {} + + def __init__(self): + self.timing_enabled = True + + self.commands = self.default_commands.copy() + self.timing_enabled = False + self.expanded_output = False + self.auto_expand = False + self.pager_config = PAGER_ALWAYS + self.pager = os.environ.get("PAGER", "") + + self.register( + self.show_help, "\\?", "\\?", "Show Commands.", arg_type=PARSED_QUERY + ) + + self.register( + self.toggle_expanded_output, + "\\x", + "\\x", + "Toggle expanded output.", + arg_type=PARSED_QUERY, + ) + + self.register( + self.call_pset, + "\\pset", + "\\pset [key] [value]", + "A limited version of traditional \\pset", + arg_type=PARSED_QUERY, + ) + + self.register( + self.show_command_help, + "\\h", + "\\h", + "Show SQL syntax and help.", + arg_type=PARSED_QUERY, + ) + + self.register( + self.toggle_timing, + "\\timing", + "\\timing", + "Toggle timing of commands.", + arg_type=NO_QUERY, + ) + + self.register( + self.set_pager, + "\\pager", + "\\pager [command]", + "Set PAGER. Print the query results via PAGER.", + arg_type=PARSED_QUERY, + ) + + def register(self, *args, **kwargs): + register_special_command(*args, command_dict=self.commands, **kwargs) + + def execute(self, cur, sql): + commands = self.commands + command, verbose, pattern = parse_special_command(sql) + + if (command not in commands) and (command.lower() not in commands): + raise CommandNotFound + + try: + special_cmd = commands[command] + except KeyError: + special_cmd = commands[command.lower()] + if special_cmd.case_sensitive: + raise CommandNotFound("Command not found: %s" % command) + + if special_cmd.arg_type == NO_QUERY: + return special_cmd.handler() + elif special_cmd.arg_type == PARSED_QUERY: + return special_cmd.handler(cur=cur, pattern=pattern, verbose=verbose) + elif special_cmd.arg_type == RAW_QUERY: + return special_cmd.handler(cur=cur, query=sql) + + def show_help(self, pattern, **_): + if pattern.strip(): + return self.show_command_help(pattern) + + headers = ["Command", "Description"] + result = [] + + for _, value in sorted(self.commands.items()): + if not value.hidden: + result.append((value.syntax, value.description)) + return [(None, result, headers, None)] + + def show_command_help_listing(self): + table = chunks(sorted(helpcommands.keys()), 6) + return [(None, table, [], None)] + + def show_command_help(self, pattern, **_): + command = pattern.strip().upper() + message = "" + + if not command: + return self.show_command_help_listing() + + if command in helpcommands: + helpcommand = helpcommands[command] + + if "description" in helpcommand: + message += helpcommand["description"] + if "synopsis" in helpcommand: + message += "\nSyntax:\n" + message += helpcommand["synopsis"] + else: + message = 'No help available for "%s"' % pattern + message += "\nTry \\h with no arguments to see available help." + + return [(None, None, None, message)] + + def toggle_expanded_output(self, pattern, **_): + flag = pattern.strip() + if flag == "auto": + self.auto_expand = True + self.expanded_output = False + return [(None, None, None, "Expanded display is used automatically.")] + elif flag == "off": + self.expanded_output = False + elif flag == "on": + self.expanded_output = True + else: + self.expanded_output = not (self.expanded_output or self.auto_expand) + + self.auto_expand = self.expanded_output + message = "Expanded display is " + message += "on." if self.expanded_output else "off." + return [(None, None, None, message)] + + def toggle_timing(self): + self.timing_enabled = not self.timing_enabled + message = "Timing is " + message += "on." if self.timing_enabled else "off." + return [(None, None, None, message)] + + def call_pset(self, pattern, **_): + pattern = pattern.split(" ", 2) + val = pattern[1] if len(pattern) > 1 else "" + key = pattern[0] + if hasattr(self, "pset_" + key): + return getattr(self, "pset_" + key)(val) + else: + return [(None, None, None, "'%s' is currently not supported by pset" % key)] + + def pset_pager(self, value): + if value == "always": + self.pager_config = PAGER_ALWAYS + elif value == "off": + self.pager_config = PAGER_OFF + elif value == "on": + self.pager_config = PAGER_LONG_OUTPUT + elif self.pager_config == PAGER_LONG_OUTPUT: + self.pager_config = PAGER_OFF + else: + self.pager_config = PAGER_LONG_OUTPUT + return [(None, None, None, "%s" % PAGER_MSG[self.pager_config])] + + def set_pager(self, pattern, **_): + if not pattern: + if not self.pager: + os.environ.pop("PAGER", None) + msg = "Pager reset to system default." + else: + os.environ["PAGER"] = self.pager + msg = "Reset pager back to default. Default: %s" % self.pager + else: + os.environ["PAGER"] = pattern + msg = "PAGER set to %s." % pattern + + return [(None, None, None, msg)] + + +@export +def content_exceeds_width(row, width): + # Account for 3 characters between each column + separator_space = len(row) * 3 + # Add 2 columns for a bit of buffer + line_len = sum([len(x) for x in row]) + separator_space + 2 + return line_len > width + + +@export +def parse_special_command(sql): + command, _, arg = sql.partition(" ") + verbose = "+" in command + + command = command.strip().replace("+", "") + return (command, verbose, arg.strip()) + + +def show_extra_help_command(command, syntax, description): + r""" + A decorator used internally for registering help for a command that is not + automatically executed via PGSpecial.execute, but invoked manually by the + caller (e.g. \watch). + """ + + @special_command(command, syntax, description, arg_type=NO_QUERY) + def placeholder(): + raise RuntimeError + + def wrapper(wrapped): + return wrapped + + return wrapper + + +def special_command( + command, + syntax, + description, + arg_type=PARSED_QUERY, + hidden=False, + case_sensitive=True, + aliases=(), +): + """A decorator used internally for static special commands""" + + def wrapper(wrapped): + register_special_command( + wrapped, + command, + syntax, + description, + arg_type, + hidden, + case_sensitive, + aliases, + command_dict=PGSpecial.default_commands, + ) + return wrapped + + return wrapper + + +def register_special_command( + handler, + command, + syntax, + description, + arg_type=PARSED_QUERY, + hidden=False, + case_sensitive=True, + aliases=(), + command_dict=None, +): + cmd = command.lower() if not case_sensitive else command + command_dict[cmd] = SpecialCommand( + handler, syntax, description, arg_type, hidden, case_sensitive + ) + for alias in aliases: + cmd = alias.lower() if not case_sensitive else alias + command_dict[cmd] = SpecialCommand( + handler, + syntax, + description, + arg_type, + case_sensitive=case_sensitive, + hidden=True, + ) + + +def chunks(l, n): + n = max(1, n) + return [l[i : i + n] for i in range(0, len(l), n)] + + +@special_command( + "\\e", "\\e [file]", "Edit the query with external editor.", arg_type=NO_QUERY +) +@special_command( + "\\ef", + "\\ef [funcname [line]]", + "Edit the contents of the query buffer.", + arg_type=NO_QUERY, + hidden=True, +) +@special_command( + "\\ev", + "\\ev [viewname [line]]", + "Edit the contents of the query buffer.", + arg_type=NO_QUERY, + hidden=True, +) +def doc_only(): + "Documention placeholder. Implemented in pgcli.main.handle_editor_command" + raise RuntimeError + + +@special_command( + "\\do", "\\do[S] [pattern]", "List operators.", arg_type=NO_QUERY, hidden=True +) +@special_command( + "\\dp", + "\\dp [pattern]", + "List table, view, and sequence access privileges.", + arg_type=NO_QUERY, + hidden=True, +) +@special_command( + "\\z", "\\z [pattern]", "Same as \\dp.", arg_type=NO_QUERY, hidden=True +) +def place_holder(): + raise NotImplementedError |