from __future__ import unicode_literals import logging from collections import namedtuple from . import export log = logging.getLogger(__name__) NO_QUERY = 0 PARSED_QUERY = 1 RAW_QUERY = 2 SpecialCommand = namedtuple( "SpecialCommand", [ "handler", "command", "shortcut", "description", "arg_type", "hidden", "case_sensitive", ], ) COMMANDS = {} @export class ArgumentMissing(Exception): pass @export class CommandNotFound(Exception): pass @export def parse_special_command(sql): command, _, arg = sql.partition(" ") verbose = "+" in command command = command.strip().replace("+", "") return (command, verbose, arg.strip()) @export def special_command( command, shortcut, description, arg_type=PARSED_QUERY, hidden=False, case_sensitive=False, aliases=(), ): def wrapper(wrapped): register_special_command( wrapped, command, shortcut, description, arg_type, hidden, case_sensitive, aliases, ) return wrapped return wrapper @export def register_special_command( handler, command, shortcut, description, arg_type=PARSED_QUERY, hidden=False, case_sensitive=False, aliases=(), ): cmd = command.lower() if not case_sensitive else command COMMANDS[cmd] = SpecialCommand( handler, command, shortcut, description, arg_type, hidden, case_sensitive ) for alias in aliases: cmd = alias.lower() if not case_sensitive else alias COMMANDS[cmd] = SpecialCommand( handler, command, shortcut, description, arg_type, case_sensitive=case_sensitive, hidden=True, ) @export def execute(cur, sql): """Execute a special command and return the results. If the special command is not supported a KeyError will be raised. """ command, verbose, arg = parse_special_command(sql) if (command not in COMMANDS) and (command.lower() not in COMMANDS): raise CommandNotFound try: special_cmd = COMMANDS[command] except KeyError: special_cmd = COMMANDS[command.lower()] if special_cmd.case_sensitive: raise CommandNotFound("Command not found: %s" % command) if special_cmd.arg_type == NO_QUERY: return special_cmd.handler() elif special_cmd.arg_type == PARSED_QUERY: return special_cmd.handler(cur=cur, arg=arg, verbose=verbose) elif special_cmd.arg_type == RAW_QUERY: return special_cmd.handler(cur=cur, query=sql) @special_command( "help", "\\?", "Show this help.", arg_type=NO_QUERY, aliases=("\\?", "?") ) def show_help(): # All the parameters are ignored. headers = ["Command", "Shortcut", "Description"] result = [] for _, value in sorted(COMMANDS.items()): if not value.hidden: result.append((value.command, value.shortcut, value.description)) return [(None, result, headers, None)] @special_command(".exit", "\\q", "Exit.", arg_type=NO_QUERY, aliases=("\\q", "exit")) @special_command("quit", "\\q", "Quit.", arg_type=NO_QUERY) def quit(*_args): raise EOFError @special_command( "\\e", "\\e", "Edit command with editor (uses $EDITOR).", arg_type=NO_QUERY, case_sensitive=True, ) @special_command( "\\G", "\\G", "Display current query results vertically.", arg_type=NO_QUERY, case_sensitive=True, ) def stub(): raise NotImplementedError