summaryrefslogtreecommitdiffstats
path: root/litecli/packages/special/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'litecli/packages/special/main.py')
-rw-r--r--litecli/packages/special/main.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/litecli/packages/special/main.py b/litecli/packages/special/main.py
new file mode 100644
index 0000000..3dd0e77
--- /dev/null
+++ b/litecli/packages/special/main.py
@@ -0,0 +1,160 @@
+from __future__ import unicode_literals
+import logging
+from collections import namedtuple
+
+from . import export
+
+log = logging.getLogger(__name__)
+
+NO_QUERY = 0
+PARSED_QUERY = 1
+RAW_QUERY = 2
+
+SpecialCommand = namedtuple(
+ "SpecialCommand",
+ [
+ "handler",
+ "command",
+ "shortcut",
+ "description",
+ "arg_type",
+ "hidden",
+ "case_sensitive",
+ ],
+)
+
+COMMANDS = {}
+
+
+@export
+class ArgumentMissing(Exception):
+ pass
+
+
+@export
+class CommandNotFound(Exception):
+ pass
+
+
+@export
+def parse_special_command(sql):
+ command, _, arg = sql.partition(" ")
+ verbose = "+" in command
+ command = command.strip().replace("+", "")
+ return (command, verbose, arg.strip())
+
+
+@export
+def special_command(
+ command,
+ shortcut,
+ description,
+ arg_type=PARSED_QUERY,
+ hidden=False,
+ case_sensitive=False,
+ aliases=(),
+):
+ def wrapper(wrapped):
+ register_special_command(
+ wrapped,
+ command,
+ shortcut,
+ description,
+ arg_type,
+ hidden,
+ case_sensitive,
+ aliases,
+ )
+ return wrapped
+
+ return wrapper
+
+
+@export
+def register_special_command(
+ handler,
+ command,
+ shortcut,
+ description,
+ arg_type=PARSED_QUERY,
+ hidden=False,
+ case_sensitive=False,
+ aliases=(),
+):
+ cmd = command.lower() if not case_sensitive else command
+ COMMANDS[cmd] = SpecialCommand(
+ handler, command, shortcut, description, arg_type, hidden, case_sensitive
+ )
+ for alias in aliases:
+ cmd = alias.lower() if not case_sensitive else alias
+ COMMANDS[cmd] = SpecialCommand(
+ handler,
+ command,
+ shortcut,
+ description,
+ arg_type,
+ case_sensitive=case_sensitive,
+ hidden=True,
+ )
+
+
+@export
+def execute(cur, sql):
+ """Execute a special command and return the results. If the special command
+ is not supported a KeyError will be raised.
+ """
+ command, verbose, arg = parse_special_command(sql)
+
+ if (command not in COMMANDS) and (command.lower() not in COMMANDS):
+ raise CommandNotFound
+
+ try:
+ special_cmd = COMMANDS[command]
+ except KeyError:
+ special_cmd = COMMANDS[command.lower()]
+ if special_cmd.case_sensitive:
+ raise CommandNotFound("Command not found: %s" % command)
+
+ if special_cmd.arg_type == NO_QUERY:
+ return special_cmd.handler()
+ elif special_cmd.arg_type == PARSED_QUERY:
+ return special_cmd.handler(cur=cur, arg=arg, verbose=verbose)
+ elif special_cmd.arg_type == RAW_QUERY:
+ return special_cmd.handler(cur=cur, query=sql)
+
+
+@special_command(
+ "help", "\\?", "Show this help.", arg_type=NO_QUERY, aliases=("\\?", "?")
+)
+def show_help(): # All the parameters are ignored.
+ headers = ["Command", "Shortcut", "Description"]
+ result = []
+
+ for _, value in sorted(COMMANDS.items()):
+ if not value.hidden:
+ result.append((value.command, value.shortcut, value.description))
+ return [(None, result, headers, None)]
+
+
+@special_command(".exit", "\\q", "Exit.", arg_type=NO_QUERY, aliases=("\\q", "exit"))
+@special_command("quit", "\\q", "Quit.", arg_type=NO_QUERY)
+def quit(*_args):
+ raise EOFError
+
+
+@special_command(
+ "\\e",
+ "\\e",
+ "Edit command with editor (uses $EDITOR).",
+ arg_type=NO_QUERY,
+ case_sensitive=True,
+)
+@special_command(
+ "\\G",
+ "\\G",
+ "Display current query results vertically.",
+ arg_type=NO_QUERY,
+ case_sensitive=True,
+)
+def stub():
+ raise NotImplementedError