import os import logging import sys import time from pathlib import Path import platform import click from prompt_toolkit import PromptSession from prompt_toolkit.history import FileHistory from prompt_toolkit.auto_suggest import AutoSuggestFromHistory from prompt_toolkit import print_formatted_text from prompt_toolkit.formatted_text import FormattedText from prompt_toolkit.key_binding.bindings.named_commands import ( register as prompt_register, ) from .client import Client from .key_bindings import kb as key_bindings from .style import STYLE from .config import config, load_config_files from .processors import UserInputCommand, UpdateBottomProcessor, PasswordProcessor from .bottom import BottomToolbar from .utils import timer, exit, convert_formatted_text_to_bytes, parse_url from .completers import IRedisCompleter from .lexer import IRedisLexer from . import __version__ logger = logging.getLogger(__name__) class SkipAuthFileHistory(FileHistory): """Exactlly like FileHistory, but won't save `AUTH` command into history file.""" def append_string(self, string: str) -> None: if string.lstrip().upper().startswith("AUTH"): return super().append_string(string) def setup_log(): if config.log_location: logging.basicConfig( filename=os.path.expanduser(config.log_location), filemode="a", format="%(levelname)5s %(message)s", level="DEBUG", ) else: logging.disable(logging.CRITICAL) logger.info("------ iRedis ------") def greetings(): iredis_version = f"iredis {__version__} (Python {platform.python_version()})" if config.no_version_reason: reason = f"({config.no_version_reason})" else: reason = "" server_version = f"redis-server {config.version} {reason}" home_page = "Home: https://iredis.xbin.io/" issues = "Issues: https://github.com/laixintao/iredis/issues" display = "\n".join([iredis_version, server_version, home_page, issues]) if config.raw: display = display.encode() write_result(display) def print_help_msg(command): with click.Context(command) as ctx: click.echo(command.get_help(ctx)) def is_too_tall(text, max_height): if isinstance(text, FormattedText): text = convert_formatted_text_to_bytes(text) lines = len(text.split(b"\n")) return lines > max_height def write_result(text, max_height=None): """ When config.raw set to True, write text(must be bytes in that case) directly to stdout, same if text is bytes. :param text: is_raw: bytes or str, not raw: FormattedText :is_raw: bool """ logger.info(f"Print result {type(text)}: {text}"[:200]) # this function only handle bytes or FormattedText # if it's str, convert to bytes if isinstance(text, str): if config.decode: text = text.encode(config.decode) else: text = text.encode() # using pager if too tall if max_height and config.enable_pager and is_too_tall(text, max_height): if isinstance(text, FormattedText): text = convert_formatted_text_to_bytes(text) os.environ["LESS"] = "-SRX" # click.echo_via_pager only accepts str if config.decode: text = text.decode(config.decode) else: text = text.decode() # TODO current pager doesn't support colors click.echo_via_pager(text) return if isinstance(text, bytes): sys.stdout.buffer.write(text) sys.stdout.write("\n") else: print_formatted_text(text, end="", style=STYLE) print_formatted_text() class Rainbow: color = [ "#cc2244", "#bb4444", "#996644", "#cc8844", "#ccaa44", "#bbaa44", "#99aa44", "#778844", "#55aa44", "#33aa44", "#11aa44", "#11aa66", "#11aa88", "#11aaaa", "#11aacc", "#11aaee", ] def __init__(self): self.current = -1 self.forword = 1 def __iter__(self): return self def __next__(self): self.current += self.forword if 0 <= self.current < len(self.color): # not to the end return self.color[self.current] else: self.forword = -self.forword self.current += 2 * self.forword return self.color[self.current] def prompt_message(client): text = str(client) if config.rainbow: return list(zip(Rainbow(), text)) return text def repl(client, session, start_time): command_holder = UserInputCommand() timer(f"First REPL command enter, time cost: {time.time() - start_time}") while True: logger.info("↓↓↓↓" * 10) logger.info("REPL waiting for command...") try: command = session.prompt( prompt_message(client), bottom_toolbar=( BottomToolbar(command_holder).render if config.bottom_bar else None ), input_processors=[ UpdateBottomProcessor(command_holder, session), PasswordProcessor(), ], rprompt=lambda: "" if config.transaction else None, key_bindings=key_bindings, enable_suspend=True, ) except KeyboardInterrupt: logger.warning("KeyboardInterrupt!") continue except EOFError: exit() command = command.strip() logger.info(f"[Command] {command}") # blank input if not command: continue try: answers = client.send_command(command, session.completer) for answer in answers: write_result( answer, # -1 is because 127.0.0.1:6379> takes one line session.output.get_size().rows - session.reserve_space_for_menu - 1, ) # Error with previous command or exception except Exception as e: logger.exception(e) # TODO red error color print("(error)", str(e)) RAW_HELP = """ Use raw formatting for replies (default when STDOUT is not a tty). \ However, you can use --no-raw to force formatted output even \ when STDOUT is not a tty. """ DECODE_HELP = """ decode response, default is No decode, which will output all bytes literals. """ RAINBOW = "Display colorful prompt." DSN_HELP = """ Use DSN configured into the [alias_dsn] section of iredisrc file. \ (Can set with env `IREDIS_DSN`) """ URL_HELP = """ Use Redis URL to indicate connection(Can set with env `IREDIS_URL`), Example: redis://[[username]:[password]]@localhost:6379/0 rediss://[[username]:[password]]@localhost:6379/0 unix://[[username]:[password]]@/path/to/socket.sock?db=0 """ SHELL = """Allow to run shell commands, default to True.""" PAGER_HELP = """Using pager when output is too tall for your window, default to True.""" VERIFY_SSL_HELP = """Set the TLS certificate verification strategy""" # command line entry here... @click.command() @click.pass_context @click.option("-h", help="Server hostname (default: 127.0.0.1).", default="127.0.0.1") @click.option("-p", help="Server port (default: 6379).", default="6379") @click.option( "-s", "--socket", default=None, help="Server socket (overrides hostname and port)." ) @click.option("-n", help="Database number.(overwrites dsn/url's db number)", default=0) @click.option( "-u", "--username", help="User name used to auth, will be ignore for redis version < 6.", ) @click.option("-a", "--password", help="Password to use when connecting to the server.") @click.option("--url", default=None, envvar="IREDIS_URL", help=URL_HELP) @click.option("-d", "--dsn", default=None, envvar="IREDIS_DSN", help=DSN_HELP) @click.option( "--newbie/--no-newbie", default=None, is_flag=True, help="Show command hints and useful helps.", ) @click.option( "--iredisrc", default="~/.iredisrc", help="Config file for iredis, default is ~/.iredisrc.", ) @click.option("--decode", default=None, help=DECODE_HELP) @click.option("--client_name", help="Assign a name to the current connection.") @click.option("--raw/--no-raw", default=None, is_flag=True, help=RAW_HELP) @click.option("--rainbow/--no-rainbow", default=None, is_flag=True, help=RAINBOW) @click.option("--shell/--no-shell", default=None, is_flag=True, help=SHELL) @click.option("--pager/--no-pager", default=None, is_flag=True, help=PAGER_HELP) @click.option( "--greetings/--no-greetings", default=None, is_flag=True, help="Enable or disable greeting messages", ) @click.option( "--verify-ssl", default=None, type=click.Choice(["none", "optional", "required"]), help=VERIFY_SSL_HELP, ) @click.option( "--prompt", default=None, help=( "Prompt format (supported interpolations: {client_name}, {db}, {host}, {path}," " {port}, {username}, {client_addr}, {client_id})." ), ) @click.version_option() @click.argument("cmd", nargs=-1) def gather_args( ctx, h, p, n, username, password, client_name, newbie, iredisrc, decode, raw, rainbow, cmd, dsn, url, socket, shell, pager, greetings, verify_ssl, prompt, ): """ IRedis: Interactive Redis When no command is given, IRedis starts in interactive mode. \b Examples: - iredis - iredis -d dsn - iredis -h 127.0.0.1 -p 6379 - iredis -h 127.0.0.1 -p 6379 -a - iredis --url redis://localhost:7890/3 Type "help" in interactive mode for information on available commands and settings. """ load_config_files(iredisrc) setup_log() logger.info( f"[commandline args] host={h}, port={p}, db={n}, user={username}," f" newbie={newbie}, iredisrc={iredisrc}, decode={decode}, raw={raw}, cmd={cmd}," f" rainbow={rainbow}." ) # raw config if raw is not None: config.raw = raw if not sys.stdout.isatty(): config.raw = True if newbie is not None: config.newbie_mode = newbie if decode is not None: config.decode = decode if rainbow is not None: config.rainbow = rainbow if shell is not None: config.shell = shell if pager is not None: config.enable_pager = pager if verify_ssl is not None: config.verify_ssl = verify_ssl if greetings is not None: config.greetings = greetings return ctx @prompt_register("edit-and-execute-command") def edit_and_execute(event): """Different from the prompt-toolkit default, we want to have a choice not to execute a query after editing, hence validate_and_handle=False.""" buff = event.current_buffer # this will prevent running command immediately when exit editor. buff.open_in_editor(validate_and_handle=False) def resolve_dsn(dsn): try: dsn_uri = config.alias_dsn[dsn] except KeyError: click.secho( "Could not find the specified DSN in the config file. " 'Please check the "[alias_dsn]" section in your ' "iredisrc.", err=True, fg="red", ) sys.exit(1) return dsn_uri def create_client(params): """ Create a Client. :param params: commandline params. """ host = params["h"] port = params["p"] db = params["n"] username = params["username"] password = params["password"] client_name = params["client_name"] prompt = params["prompt"] verify_ssl = params["verify_ssl"] dsn_from_url = None dsn = params["dsn"] if config.alias_dsn and dsn: dsn_uri = resolve_dsn(dsn) dsn_from_url = parse_url(dsn_uri) if params["url"]: dsn_from_url = parse_url(params["url"]) if dsn_from_url: # db from command lint options should be high priority db = db if db else dsn_from_url.db verify_ssl = verify_ssl or dsn_from_url.verify_ssl return Client( host=dsn_from_url.host, port=dsn_from_url.port, db=db, password=dsn_from_url.password, path=dsn_from_url.path, scheme=dsn_from_url.scheme, username=dsn_from_url.username, client_name=client_name, prompt=prompt, verify_ssl=verify_ssl, ) if params["socket"]: return Client( scheme="unix", path=params["socket"], db=db, username=username, password=password, client_name=client_name, prompt=prompt, ) return Client( host=host, port=port, db=db, username=username, password=password, client_name=client_name, prompt=prompt, verify_ssl=verify_ssl, ) def main(): enter_main_time = time.time() # just for logs # invoke in non-standalone mode to gather args ctx = None try: ctx = gather_args.main(standalone_mode=False) except click.exceptions.NoSuchOption as nosuchoption: nosuchoption.show() except click.exceptions.BadOptionUsage as badoption: if badoption.option_name == "-h": # -h without host, is short command for --help # like redis-cli print_help_msg(gather_args) return if not ctx: # called help return # redis client client = create_client(ctx.params) if not sys.stdin.isatty(): for line in sys.stdin.readlines(): logger.debug(f"[Command stdin] {line}") for answer in client.send_command(line, None): write_result(answer) return # no interactive mode, directly run a command if ctx.params["cmd"]: answers = client.send_command(" ".join(ctx.params["cmd"]), None) for answer in answers: write_result(answer) logger.warning("[OVER] command executed, exit...") return # prompt session session = PromptSession( history=SkipAuthFileHistory(Path(os.path.expanduser(config.history_location))), style=STYLE, auto_suggest=AutoSuggestFromHistory(), complete_while_typing=True, lexer=IRedisLexer(), completer=IRedisCompleter( hint=config.newbie_mode, completion_casing=config.completion_casing ), enable_open_in_editor=True, tempfile_suffix=".redis", ) # print hello message if config.greetings: greetings() repl(client, session, enter_main_time)