diff options
Diffstat (limited to 'litecli/packages/special')
-rw-r--r-- | litecli/packages/special/__init__.py | 12 | ||||
-rw-r--r-- | litecli/packages/special/dbcommands.py | 273 | ||||
-rw-r--r-- | litecli/packages/special/favoritequeries.py | 59 | ||||
-rw-r--r-- | litecli/packages/special/iocommands.py | 479 | ||||
-rw-r--r-- | litecli/packages/special/main.py | 160 | ||||
-rw-r--r-- | litecli/packages/special/utils.py | 48 |
6 files changed, 1031 insertions, 0 deletions
diff --git a/litecli/packages/special/__init__.py b/litecli/packages/special/__init__.py new file mode 100644 index 0000000..fd2b18c --- /dev/null +++ b/litecli/packages/special/__init__.py @@ -0,0 +1,12 @@ +__all__ = [] + + +def export(defn): + """Decorator to explicitly mark functions that are exposed in a lib.""" + globals()[defn.__name__] = defn + __all__.append(defn.__name__) + return defn + + +from . import dbcommands +from . import iocommands diff --git a/litecli/packages/special/dbcommands.py b/litecli/packages/special/dbcommands.py new file mode 100644 index 0000000..a7eaa0c --- /dev/null +++ b/litecli/packages/special/dbcommands.py @@ -0,0 +1,273 @@ +from __future__ import unicode_literals, print_function +import csv +import logging +import os +import sys +import platform +import shlex +from sqlite3 import ProgrammingError + +from litecli import __version__ +from litecli.packages.special import iocommands +from litecli.packages.special.utils import format_uptime +from .main import special_command, RAW_QUERY, PARSED_QUERY, ArgumentMissing + +log = logging.getLogger(__name__) + + +@special_command( + ".tables", + "\\dt", + "List tables.", + arg_type=PARSED_QUERY, + case_sensitive=True, + aliases=("\\dt",), +) +def list_tables(cur, arg=None, arg_type=PARSED_QUERY, verbose=False): + if arg: + args = ("{0}%".format(arg),) + query = """ + SELECT name FROM sqlite_master + WHERE type IN ('table','view') AND name LIKE ? AND name NOT LIKE 'sqlite_%' + ORDER BY 1 + """ + else: + args = tuple() + query = """ + SELECT name FROM sqlite_master + WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' + ORDER BY 1 + """ + + log.debug(query) + cur.execute(query, args) + tables = cur.fetchall() + status = "" + if cur.description: + headers = [x[0] for x in cur.description] + else: + return [(None, None, None, "")] + + # if verbose and arg: + # query = "SELECT sql FROM sqlite_master WHERE name LIKE ?" + # log.debug(query) + # cur.execute(query) + # status = cur.fetchone()[1] + + return [(None, tables, headers, status)] + + +@special_command( + ".schema", + ".schema[+] [table]", + "The complete schema for the database or a single table", + arg_type=PARSED_QUERY, + case_sensitive=True, +) +def show_schema(cur, arg=None, **_): + if arg: + args = (arg,) + query = """ + SELECT sql FROM sqlite_master + WHERE name==? + ORDER BY tbl_name, type DESC, name + """ + else: + args = tuple() + query = """ + SELECT sql FROM sqlite_master + ORDER BY tbl_name, type DESC, name + """ + + log.debug(query) + cur.execute(query, args) + tables = cur.fetchall() + status = "" + if cur.description: + headers = [x[0] for x in cur.description] + else: + return [(None, None, None, "")] + + return [(None, tables, headers, status)] + + +@special_command( + ".databases", + ".databases", + "List databases.", + arg_type=RAW_QUERY, + case_sensitive=True, + aliases=("\\l",), +) +def list_databases(cur, **_): + query = "PRAGMA database_list" + log.debug(query) + cur.execute(query) + if cur.description: + headers = [x[0] for x in cur.description] + return [(None, cur, headers, "")] + else: + return [(None, None, None, "")] + + +@special_command( + ".status", + "\\s", + "Show current settings.", + arg_type=RAW_QUERY, + aliases=("\\s",), + case_sensitive=True, +) +def status(cur, **_): + # Create output buffers. + footer = [] + footer.append("--------------") + + # Output the litecli client information. + implementation = platform.python_implementation() + version = platform.python_version() + client_info = [] + client_info.append("litecli {0},".format(__version__)) + client_info.append("running on {0} {1}".format(implementation, version)) + footer.append(" ".join(client_info)) + + # Build the output that will be displayed as a table. + query = "SELECT file from pragma_database_list() where name = 'main';" + log.debug(query) + cur.execute(query) + db = cur.fetchone()[0] + if db is None: + db = "" + + footer.append("Current database: " + db) + if iocommands.is_pager_enabled(): + if "PAGER" in os.environ: + pager = os.environ["PAGER"] + else: + pager = "System default" + else: + pager = "stdout" + footer.append("Current pager:" + pager) + + footer.append("--------------") + return [(None, None, "", "\n".join(footer))] + + +@special_command( + ".load", + ".load path", + "Load an extension library.", + arg_type=PARSED_QUERY, + case_sensitive=True, +) +def load_extension(cur, arg, **_): + args = shlex.split(arg) + if len(args) != 1: + raise TypeError(".load accepts exactly one path") + path = args[0] + conn = cur.connection + conn.enable_load_extension(True) + conn.load_extension(path) + return [(None, None, None, "")] + + +@special_command( + "describe", + "\\d [table]", + "Description of a table", + arg_type=PARSED_QUERY, + case_sensitive=True, + aliases=("\\d", "describe", "desc"), +) +def describe(cur, arg, **_): + if arg: + args = (arg,) + query = """ + PRAGMA table_info({}) + """.format( + arg + ) + else: + raise ArgumentMissing("Table name required.") + + log.debug(query) + cur.execute(query) + tables = cur.fetchall() + status = "" + if cur.description: + headers = [x[0] for x in cur.description] + else: + return [(None, None, None, "")] + + return [(None, tables, headers, status)] + + +@special_command( + ".read", + ".read path", + "Read input from path", + arg_type=PARSED_QUERY, + case_sensitive=True, +) +def read_script(cur, arg, **_): + args = shlex.split(arg) + if len(args) != 1: + raise TypeError(".read accepts exactly one path") + path = args[0] + with open(path, "r") as f: + script = f.read() + cur.executescript(script) + return [(None, None, None, "")] + + +@special_command( + ".import", + ".import filename table", + "Import data from filename into an existing table", + arg_type=PARSED_QUERY, + case_sensitive=True, +) +def import_file(cur, arg=None, **_): + def split(s): + # this is a modification of shlex.split function, just to make it support '`', + # because table name might contain '`' character. + lex = shlex.shlex(s, posix=True) + lex.whitespace_split = True + lex.commenters = "" + lex.quotes += "`" + return list(lex) + + args = split(arg) + log.debug("[arg = %r], [args = %r]", arg, args) + if len(args) != 2: + raise TypeError("Usage: .import filename table") + + filename, table = args + cur.execute('PRAGMA table_info("%s")' % table) + ncols = len(cur.fetchall()) + insert_tmpl = 'INSERT INTO "%s" VALUES (?%s)' % (table, ",?" * (ncols - 1)) + + with open(filename, "r") as csvfile: + dialect = csv.Sniffer().sniff(csvfile.read(1024)) + csvfile.seek(0) + reader = csv.reader(csvfile, dialect) + + cur.execute("BEGIN") + ninserted, nignored = 0, 0 + for i, row in enumerate(reader): + if len(row) != ncols: + print( + "%s:%d expected %d columns but found %d - ignored" + % (filename, i, ncols, len(row)), + file=sys.stderr, + ) + nignored += 1 + continue + cur.execute(insert_tmpl, row) + ninserted += 1 + cur.execute("COMMIT") + + status = "Inserted %d rows into %s" % (ninserted, table) + if nignored > 0: + status += " (%d rows are ignored)" % nignored + return [(None, None, None, status)] diff --git a/litecli/packages/special/favoritequeries.py b/litecli/packages/special/favoritequeries.py new file mode 100644 index 0000000..7da6fbf --- /dev/null +++ b/litecli/packages/special/favoritequeries.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + + +class FavoriteQueries(object): + + section_name = "favorite_queries" + + usage = """ +Favorite Queries are a way to save frequently used queries +with a short name. +Examples: + + # Save a new favorite query. + > \\fs simple select * from abc where a is not Null; + + # List all favorite queries. + > \\f + ╒════════╤═══════════════════════════════════════╕ + │ Name │ Query │ + ╞════════╪═══════════════════════════════════════╡ + │ simple │ SELECT * FROM abc where a is not NULL │ + ╘════════╧═══════════════════════════════════════╛ + + # Run a favorite query. + > \\f simple + ╒════════╤════════╕ + │ a │ b │ + ╞════════╪════════╡ + │ 日本語 │ 日本語 │ + ╘════════╧════════╛ + + # Delete a favorite query. + > \\fd simple + simple: Deleted +""" + + def __init__(self, config): + self.config = config + + def list(self): + return self.config.get(self.section_name, []) + + def get(self, name): + return self.config.get(self.section_name, {}).get(name, None) + + def save(self, name, query): + if self.section_name not in self.config: + self.config[self.section_name] = {} + self.config[self.section_name][name] = query + self.config.write() + + def delete(self, name): + try: + del self.config[self.section_name][name] + except KeyError: + return "%s: Not Found." % name + self.config.write() + return "%s: Deleted" % name diff --git a/litecli/packages/special/iocommands.py b/litecli/packages/special/iocommands.py new file mode 100644 index 0000000..8940057 --- /dev/null +++ b/litecli/packages/special/iocommands.py @@ -0,0 +1,479 @@ +from __future__ import unicode_literals +import os +import re +import locale +import logging +import subprocess +import shlex +from io import open +from time import sleep + +import click +import sqlparse +from configobj import ConfigObj + +from . import export +from .main import special_command, NO_QUERY, PARSED_QUERY +from .favoritequeries import FavoriteQueries +from .utils import handle_cd_command +from litecli.packages.prompt_utils import confirm_destructive_query + +use_expanded_output = False +PAGER_ENABLED = True +tee_file = None +once_file = written_to_once_file = None +favoritequeries = FavoriteQueries(ConfigObj()) + + +@export +def set_favorite_queries(config): + global favoritequeries + favoritequeries = FavoriteQueries(config) + + +@export +def set_pager_enabled(val): + global PAGER_ENABLED + PAGER_ENABLED = val + + +@export +def is_pager_enabled(): + return PAGER_ENABLED + + +@export +@special_command( + "pager", + "\\P [command]", + "Set PAGER. Print the query results via PAGER.", + arg_type=PARSED_QUERY, + aliases=("\\P",), + case_sensitive=True, +) +def set_pager(arg, **_): + if arg: + os.environ["PAGER"] = arg + msg = "PAGER set to %s." % arg + set_pager_enabled(True) + else: + if "PAGER" in os.environ: + msg = "PAGER set to %s." % os.environ["PAGER"] + else: + # This uses click's default per echo_via_pager. + msg = "Pager enabled." + set_pager_enabled(True) + + return [(None, None, None, msg)] + + +@export +@special_command( + "nopager", + "\\n", + "Disable pager, print to stdout.", + arg_type=NO_QUERY, + aliases=("\\n",), + case_sensitive=True, +) +def disable_pager(): + set_pager_enabled(False) + return [(None, None, None, "Pager disabled.")] + + +@export +def set_expanded_output(val): + global use_expanded_output + use_expanded_output = val + + +@export +def is_expanded_output(): + return use_expanded_output + + +_logger = logging.getLogger(__name__) + + +@export +def editor_command(command): + """ + Is this an external editor command? + :param command: string + """ + # It is possible to have `\e filename` or `SELECT * FROM \e`. So we check + # for both conditions. + return command.strip().endswith("\\e") or command.strip().startswith("\\e") + + +@export +def get_filename(sql): + if sql.strip().startswith("\\e"): + command, _, filename = sql.partition(" ") + return filename.strip() or None + + +@export +def get_editor_query(sql): + """Get the query part of an editor command.""" + sql = sql.strip() + + # The reason we can't simply do .strip('\e') is that it strips characters, + # not a substring. So it'll strip "e" in the end of the sql also! + # Ex: "select * from style\e" -> "select * from styl". + pattern = re.compile("(^\\\e|\\\e$)") + while pattern.search(sql): + sql = pattern.sub("", sql) + + return sql + + +@export +def open_external_editor(filename=None, sql=None): + """Open external editor, wait for the user to type in their query, return + the query. + + :return: list with one tuple, query as first element. + + """ + + message = None + filename = filename.strip().split(" ", 1)[0] if filename else None + + sql = sql or "" + MARKER = "# Type your query above this line.\n" + + # Populate the editor buffer with the partial sql (if available) and a + # placeholder comment. + query = click.edit( + "{sql}\n\n{marker}".format(sql=sql, marker=MARKER), + filename=filename, + extension=".sql", + ) + + if filename: + try: + with open(filename, encoding="utf-8") as f: + query = f.read() + except IOError: + message = "Error reading file: %s." % filename + + if query is not None: + query = query.split(MARKER, 1)[0].rstrip("\n") + else: + # Don't return None for the caller to deal with. + # Empty string is ok. + query = sql + + return (query, message) + + +@special_command( + "\\f", + "\\f [name [args..]]", + "List or execute favorite queries.", + arg_type=PARSED_QUERY, + case_sensitive=True, +) +def execute_favorite_query(cur, arg, verbose=False, **_): + """Returns (title, rows, headers, status)""" + if arg == "": + for result in list_favorite_queries(): + yield result + + """Parse out favorite name and optional substitution parameters""" + name, _, arg_str = arg.partition(" ") + args = shlex.split(arg_str) + + query = favoritequeries.get(name) + if query is None: + message = "No favorite query: %s" % (name) + yield (None, None, None, message) + elif "?" in query: + for sql in sqlparse.split(query): + sql = sql.rstrip(";") + title = "> %s" % (sql) if verbose else None + cur.execute(sql, args) + if cur.description: + headers = [x[0] for x in cur.description] + yield (title, cur, headers, None) + else: + yield (title, None, None, None) + else: + query, arg_error = subst_favorite_query_args(query, args) + if arg_error: + yield (None, None, None, arg_error) + else: + for sql in sqlparse.split(query): + sql = sql.rstrip(";") + title = "> %s" % (sql) if verbose else None + cur.execute(sql) + if cur.description: + headers = [x[0] for x in cur.description] + yield (title, cur, headers, None) + else: + yield (title, None, None, None) + + +def list_favorite_queries(): + """List of all favorite queries. + Returns (title, rows, headers, status)""" + + headers = ["Name", "Query"] + rows = [(r, favoritequeries.get(r)) for r in favoritequeries.list()] + + if not rows: + status = "\nNo favorite queries found." + favoritequeries.usage + else: + status = "" + return [("", rows, headers, status)] + + +def subst_favorite_query_args(query, args): + """replace positional parameters ($1...$N) in query.""" + for idx, val in enumerate(args): + shell_subst_var = "$" + str(idx + 1) + question_subst_var = "?" + if shell_subst_var in query: + query = query.replace(shell_subst_var, val) + elif question_subst_var in query: + query = query.replace(question_subst_var, val, 1) + else: + return [ + None, + "Too many arguments.\nQuery does not have enough place holders to substitute.\n" + + query, + ] + + match = re.search("\\?|\\$\d+", query) + if match: + return [ + None, + "missing substitution for " + match.group(0) + " in query:\n " + query, + ] + + return [query, None] + + +@special_command("\\fs", "\\fs name query", "Save a favorite query.") +def save_favorite_query(arg, **_): + """Save a new favorite query. + Returns (title, rows, headers, status)""" + + usage = "Syntax: \\fs name query.\n\n" + favoritequeries.usage + if not arg: + return [(None, None, None, usage)] + + name, _, query = arg.partition(" ") + + # If either name or query is missing then print the usage and complain. + if (not name) or (not query): + return [(None, None, None, usage + "Err: Both name and query are required.")] + + favoritequeries.save(name, query) + return [(None, None, None, "Saved.")] + + +@special_command("\\fd", "\\fd [name]", "Delete a favorite query.") +def delete_favorite_query(arg, **_): + """Delete an existing favorite query. + """ + usage = "Syntax: \\fd name.\n\n" + favoritequeries.usage + if not arg: + return [(None, None, None, usage)] + + status = favoritequeries.delete(arg) + + return [(None, None, None, status)] + + +@special_command("system", "system [command]", "Execute a system shell commmand.") +def execute_system_command(arg, **_): + """Execute a system shell command.""" + usage = "Syntax: system [command].\n" + + if not arg: + return [(None, None, None, usage)] + + try: + command = arg.strip() + if command.startswith("cd"): + ok, error_message = handle_cd_command(arg) + if not ok: + return [(None, None, None, error_message)] + return [(None, None, None, "")] + + args = arg.split(" ") + process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + output, error = process.communicate() + response = output if not error else error + + # Python 3 returns bytes. This needs to be decoded to a string. + if isinstance(response, bytes): + encoding = locale.getpreferredencoding(False) + response = response.decode(encoding) + + return [(None, None, None, response)] + except OSError as e: + return [(None, None, None, "OSError: %s" % e.strerror)] + + +def parseargfile(arg): + if arg.startswith("-o "): + mode = "w" + filename = arg[3:] + else: + mode = "a" + filename = arg + + if not filename: + raise TypeError("You must provide a filename.") + + return {"file": os.path.expanduser(filename), "mode": mode} + + +@special_command( + "tee", + "tee [-o] filename", + "Append all results to an output file (overwrite using -o).", +) +def set_tee(arg, **_): + global tee_file + + try: + tee_file = open(**parseargfile(arg)) + except (IOError, OSError) as e: + raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror)) + + return [(None, None, None, "")] + + +@export +def close_tee(): + global tee_file + if tee_file: + tee_file.close() + tee_file = None + + +@special_command("notee", "notee", "Stop writing results to an output file.") +def no_tee(arg, **_): + close_tee() + return [(None, None, None, "")] + + +@export +def write_tee(output): + global tee_file + if tee_file: + click.echo(output, file=tee_file, nl=False) + click.echo("\n", file=tee_file, nl=False) + tee_file.flush() + + +@special_command( + ".once", + "\\o [-o] filename", + "Append next result to an output file (overwrite using -o).", + aliases=("\\o", "\\once"), +) +def set_once(arg, **_): + global once_file + + once_file = parseargfile(arg) + + return [(None, None, None, "")] + + +@export +def write_once(output): + global once_file, written_to_once_file + if output and once_file: + try: + f = open(**once_file) + except (IOError, OSError) as e: + once_file = None + raise OSError( + "Cannot write to file '{}': {}".format(e.filename, e.strerror) + ) + + with f: + click.echo(output, file=f, nl=False) + click.echo("\n", file=f, nl=False) + written_to_once_file = True + + +@export +def unset_once_if_written(): + """Unset the once file, if it has been written to.""" + global once_file + if written_to_once_file: + once_file = None + + +@special_command( + "watch", + "watch [seconds] [-c] query", + "Executes the query every [seconds] seconds (by default 5).", +) +def watch_query(arg, **kwargs): + usage = """Syntax: watch [seconds] [-c] query. + * seconds: The interval at the query will be repeated, in seconds. + By default 5. + * -c: Clears the screen between every iteration. +""" + if not arg: + yield (None, None, None, usage) + raise StopIteration + seconds = 5 + clear_screen = False + statement = None + while statement is None: + arg = arg.strip() + if not arg: + # Oops, we parsed all the arguments without finding a statement + yield (None, None, None, usage) + raise StopIteration + (current_arg, _, arg) = arg.partition(" ") + try: + seconds = float(current_arg) + continue + except ValueError: + pass + if current_arg == "-c": + clear_screen = True + continue + statement = "{0!s} {1!s}".format(current_arg, arg) + destructive_prompt = confirm_destructive_query(statement) + if destructive_prompt is False: + click.secho("Wise choice!") + raise StopIteration + elif destructive_prompt is True: + click.secho("Your call!") + cur = kwargs["cur"] + sql_list = [ + (sql.rstrip(";"), "> {0!s}".format(sql)) for sql in sqlparse.split(statement) + ] + old_pager_enabled = is_pager_enabled() + while True: + if clear_screen: + click.clear() + try: + # Somewhere in the code the pager its activated after every yield, + # so we disable it in every iteration + set_pager_enabled(False) + for (sql, title) in sql_list: + cur.execute(sql) + if cur.description: + headers = [x[0] for x in cur.description] + yield (title, cur, headers, None) + else: + yield (title, None, None, None) + sleep(seconds) + except KeyboardInterrupt: + # This prints the Ctrl-C character in its own line, which prevents + # to print a line with the cursor positioned behind the prompt + click.secho("", nl=True) + raise StopIteration + finally: + set_pager_enabled(old_pager_enabled) diff --git a/litecli/packages/special/main.py b/litecli/packages/special/main.py new file mode 100644 index 0000000..3dd0e77 --- /dev/null +++ b/litecli/packages/special/main.py @@ -0,0 +1,160 @@ +from __future__ import unicode_literals +import logging +from collections import namedtuple + +from . import export + +log = logging.getLogger(__name__) + +NO_QUERY = 0 +PARSED_QUERY = 1 +RAW_QUERY = 2 + +SpecialCommand = namedtuple( + "SpecialCommand", + [ + "handler", + "command", + "shortcut", + "description", + "arg_type", + "hidden", + "case_sensitive", + ], +) + +COMMANDS = {} + + +@export +class ArgumentMissing(Exception): + pass + + +@export +class CommandNotFound(Exception): + pass + + +@export +def parse_special_command(sql): + command, _, arg = sql.partition(" ") + verbose = "+" in command + command = command.strip().replace("+", "") + return (command, verbose, arg.strip()) + + +@export +def special_command( + command, + shortcut, + description, + arg_type=PARSED_QUERY, + hidden=False, + case_sensitive=False, + aliases=(), +): + def wrapper(wrapped): + register_special_command( + wrapped, + command, + shortcut, + description, + arg_type, + hidden, + case_sensitive, + aliases, + ) + return wrapped + + return wrapper + + +@export +def register_special_command( + handler, + command, + shortcut, + description, + arg_type=PARSED_QUERY, + hidden=False, + case_sensitive=False, + aliases=(), +): + cmd = command.lower() if not case_sensitive else command + COMMANDS[cmd] = SpecialCommand( + handler, command, shortcut, description, arg_type, hidden, case_sensitive + ) + for alias in aliases: + cmd = alias.lower() if not case_sensitive else alias + COMMANDS[cmd] = SpecialCommand( + handler, + command, + shortcut, + description, + arg_type, + case_sensitive=case_sensitive, + hidden=True, + ) + + +@export +def execute(cur, sql): + """Execute a special command and return the results. If the special command + is not supported a KeyError will be raised. + """ + command, verbose, arg = parse_special_command(sql) + + if (command not in COMMANDS) and (command.lower() not in COMMANDS): + raise CommandNotFound + + try: + special_cmd = COMMANDS[command] + except KeyError: + special_cmd = COMMANDS[command.lower()] + if special_cmd.case_sensitive: + raise CommandNotFound("Command not found: %s" % command) + + if special_cmd.arg_type == NO_QUERY: + return special_cmd.handler() + elif special_cmd.arg_type == PARSED_QUERY: + return special_cmd.handler(cur=cur, arg=arg, verbose=verbose) + elif special_cmd.arg_type == RAW_QUERY: + return special_cmd.handler(cur=cur, query=sql) + + +@special_command( + "help", "\\?", "Show this help.", arg_type=NO_QUERY, aliases=("\\?", "?") +) +def show_help(): # All the parameters are ignored. + headers = ["Command", "Shortcut", "Description"] + result = [] + + for _, value in sorted(COMMANDS.items()): + if not value.hidden: + result.append((value.command, value.shortcut, value.description)) + return [(None, result, headers, None)] + + +@special_command(".exit", "\\q", "Exit.", arg_type=NO_QUERY, aliases=("\\q", "exit")) +@special_command("quit", "\\q", "Quit.", arg_type=NO_QUERY) +def quit(*_args): + raise EOFError + + +@special_command( + "\\e", + "\\e", + "Edit command with editor (uses $EDITOR).", + arg_type=NO_QUERY, + case_sensitive=True, +) +@special_command( + "\\G", + "\\G", + "Display current query results vertically.", + arg_type=NO_QUERY, + case_sensitive=True, +) +def stub(): + raise NotImplementedError diff --git a/litecli/packages/special/utils.py b/litecli/packages/special/utils.py new file mode 100644 index 0000000..eed9306 --- /dev/null +++ b/litecli/packages/special/utils.py @@ -0,0 +1,48 @@ +import os +import subprocess + + +def handle_cd_command(arg): + """Handles a `cd` shell command by calling python's os.chdir.""" + CD_CMD = "cd" + tokens = arg.split(CD_CMD + " ") + directory = tokens[-1] if len(tokens) > 1 else None + if not directory: + return False, "No folder name was provided." + try: + os.chdir(directory) + subprocess.call(["pwd"]) + return True, None + except OSError as e: + return False, e.strerror + + +def format_uptime(uptime_in_seconds): + """Format number of seconds into human-readable string. + + :param uptime_in_seconds: The server uptime in seconds. + :returns: A human-readable string representing the uptime. + + >>> uptime = format_uptime('56892') + >>> print(uptime) + 15 hours 48 min 12 sec + """ + + m, s = divmod(int(uptime_in_seconds), 60) + h, m = divmod(m, 60) + d, h = divmod(h, 24) + + uptime_values = [] + + for value, unit in ((d, "days"), (h, "hours"), (m, "min"), (s, "sec")): + if value == 0 and not uptime_values: + # Don't include a value/unit if the unit isn't applicable to + # the uptime. E.g. don't do 0 days 0 hours 1 min 30 sec. + continue + elif value == 1 and unit.endswith("s"): + # Remove the "s" if the unit is singular. + unit = unit[:-1] + uptime_values.append("{0} {1}".format(value, unit)) + + uptime = " ".join(uptime_values) + return uptime |