summaryrefslogtreecommitdiffstats
path: root/mycli/packages/special/main.py
blob: ab04f30d858425d5d5583ebc3b9cb9eb5ad46681 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import logging
from collections import namedtuple

from . import export

log = logging.getLogger(__name__)

NO_QUERY = 0
PARSED_QUERY = 1
RAW_QUERY = 2

SpecialCommand = namedtuple('SpecialCommand',
        ['handler', 'command', 'shortcut', 'description', 'arg_type', 'hidden',
            'case_sensitive'])

COMMANDS = {}

@export
class CommandNotFound(Exception):
    pass

@export
def parse_special_command(sql):
    command, _, arg = sql.partition(' ')
    verbose = '+' in command
    command = command.strip().replace('+', '')
    return (command, verbose, arg.strip())

@export
def special_command(command, shortcut, description, arg_type=PARSED_QUERY,
        hidden=False, case_sensitive=False, aliases=()):
    def wrapper(wrapped):
        register_special_command(wrapped, command, shortcut, description,
                arg_type, hidden, case_sensitive, aliases)
        return wrapped
    return wrapper

@export
def register_special_command(handler, command, shortcut, description,
        arg_type=PARSED_QUERY, hidden=False, case_sensitive=False, aliases=()):
    cmd = command.lower() if not case_sensitive else command
    COMMANDS[cmd] = SpecialCommand(handler, command, shortcut, description,
                                   arg_type, hidden, case_sensitive)
    for alias in aliases:
        cmd = alias.lower() if not case_sensitive else alias
        COMMANDS[cmd] = SpecialCommand(handler, command, shortcut, description,
                                       arg_type, case_sensitive=case_sensitive,
                                       hidden=True)

@export
def execute(cur, sql):
    """Execute a special command and return the results. If the special command
    is not supported a KeyError will be raised.
    """
    command, verbose, arg = parse_special_command(sql)

    if (command not in COMMANDS) and (command.lower() not in COMMANDS):
        raise CommandNotFound

    try:
        special_cmd = COMMANDS[command]
    except KeyError:
        special_cmd = COMMANDS[command.lower()]
        if special_cmd.case_sensitive:
            raise CommandNotFound('Command not found: %s' % command)

    # "help <SQL KEYWORD> is a special case. We want built-in help, not
    # mycli help here.
    if command == 'help' and arg:
        return show_keyword_help(cur=cur, arg=arg)

    if special_cmd.arg_type == NO_QUERY:
        return special_cmd.handler()
    elif special_cmd.arg_type == PARSED_QUERY:
        return special_cmd.handler(cur=cur, arg=arg, verbose=verbose)
    elif special_cmd.arg_type == RAW_QUERY:
        return special_cmd.handler(cur=cur, query=sql)

@special_command('help', '\\?', 'Show this help.', arg_type=NO_QUERY, aliases=('\\?', '?'))
def show_help():  # All the parameters are ignored.
    headers = ['Command', 'Shortcut', 'Description']
    result = []

    for _, value in sorted(COMMANDS.items()):
        if not value.hidden:
            result.append((value.command, value.shortcut, value.description))
    return [(None, result, headers, None)]

def show_keyword_help(cur, arg):
    """
    Call the built-in "show <command>", to display help for an SQL keyword.
    :param cur: cursor
    :param arg: string
    :return: list
    """
    keyword = arg.strip('"').strip("'")
    query = "help '{0}'".format(keyword)
    log.debug(query)
    cur.execute(query)
    if cur.description and cur.rowcount > 0:
        headers = [x[0] for x in cur.description]
        return [(None, cur, headers, '')]
    else:
        return [(None, None, None, 'No help found for {0}.'.format(keyword))]


@special_command('exit', '\\q', 'Exit.', arg_type=NO_QUERY, aliases=('\\q', ))
@special_command('quit', '\\q', 'Quit.', arg_type=NO_QUERY)
def quit(*_args):
    raise EOFError


@special_command('\\e', '\\e', 'Edit command with editor (uses $EDITOR).',
                 arg_type=NO_QUERY, case_sensitive=True)
@special_command('\\clip', '\\clip', 'Copy query to the system clipboard.',
                 arg_type=NO_QUERY, case_sensitive=True)
@special_command('\\G', '\\G', 'Display current query results vertically.',
                 arg_type=NO_QUERY, case_sensitive=True)
def stub():
    raise NotImplementedError