import logging from typing import Iterable from datetime import datetime, timezone from dateutil.relativedelta import relativedelta from prompt_toolkit.completion import ( CompleteEvent, Completer, Completion, FuzzyWordCompleter, WordCompleter, ) from prompt_toolkit.contrib.regular_languages.completion import GrammarCompleter from prompt_toolkit.document import Document from .commands import split_command_args, commands_summary, all_commands from .config import config from .exceptions import InvalidArguments, AmbiguousCommand from .redis_grammar import CONST, command_grammar, get_command_grammar from .utils import strip_quote_args, ensure_str logger = logging.getLogger(__name__) class MostRecentlyUsedFirstWordMixin: """ A Mixin for WordCompleter, with a `touch()` method can make latest used word appears first. And evict old completion word when `max_words` reached. Not thread safe. """ def __init__(self, max_words, words, *args, **kwargs): self.words = words self.max_words = max_words super().__init__(words, *args, **kwargs) def touch(self, word): """ Make sure word is in the first place of the completer list. """ if word in self.words: self.words.remove(word) else: # not in words if len(self.words) == self.max_words: # full self.words.pop() self.words.insert(0, word) def touch_words(self, words): for word in words: self.touch(word) class MostRecentlyUsedFirstWordCompleter( MostRecentlyUsedFirstWordMixin, FuzzyWordCompleter ): pass class IntegerTypeCompleter(MostRecentlyUsedFirstWordMixin, WordCompleter): def __init__(self): words = [] for i in range(1, 64): words.append(f"i{i}") # signed integer, 64 bit max words.append(f"u{i}") # unsigned integer, 63 bit max words.append("i64") super().__init__(len(words), list(reversed(words))) class TimestampCompleter(Completer): """ Completer for timestamp based on input. Features: * Auto complete humanize time, like 3 -> 3 minutes ago, 3 hours ago. * Auto guess datetime, complete by its timestamp. 2020-01-01 12:00 -> 1577851200. The timezone is read from system. """ def __init__(self, is_milliseconds, future_time, *args, **kwargs): if is_milliseconds: self.factor = 1000 else: self.factor = 1 self.future_time = future_time super().__init__(*args, **kwargs) when_lower_than = { "year": 20, "month": 12, "day": 31, "hour": 100, "minute": 1000, "second": 1000_000, } def _completion_humanize_time(self, document: Document) -> Iterable[Completion]: text = document.text if not text.isnumeric(): return current = int(text) now = datetime.now() for unit, minimum in self.when_lower_than.items(): if current <= minimum: if self.future_time: dt = now + relativedelta(**{f"{unit}s": current}) offset_text = "later" else: dt = now - relativedelta(**{f"{unit}s": current}) offset_text = "ago" meta = f"{text} {unit}{'s' if current > 1 else ''} {offset_text} ({dt.strftime('%Y-%m-%d %H:%M:%S')})" yield Completion( str(int(dt.timestamp()) * self.factor), start_position=-len(document.text_before_cursor), display_meta=meta, ) def _completion_formatted_time(self, document: Document) -> Iterable[Completion]: text = document.text try: dt = datetime.fromisoformat(text).replace(tzinfo=timezone.utc) except Exception: return yield Completion( str(int(dt.timestamp()) * self.factor), start_position=-len(document.text_before_cursor), display_meta=str(dt), ) def get_completions( self, document: Document, complete_event: CompleteEvent ) -> Iterable[Completion]: completions = list(self._completion_humanize_time(document)) + list( self._completion_formatted_time(document) ) # here we yield bigger timestamp first. yield from sorted(completions, key=lambda a: a.text) class IRedisCompleter(Completer): """ Completer class that can dynamically returns any Completer. :param get_completer: Callable that returns a :class:`.Completer` instance. """ def __init__(self, hint=False, completion_casing="upper"): super().__init__() self.completer_mapping = self.get_completer_mapping(hint, completion_casing) self.current_completer = self.root_completer = GrammarCompleter( command_grammar, self.completer_mapping ) @property def key_completer(self) -> MostRecentlyUsedFirstWordCompleter: return self.completer_mapping["key"] @property def member_completer(self) -> MostRecentlyUsedFirstWordCompleter: return self.completer_mapping["member"] @property def field_completer(self) -> MostRecentlyUsedFirstWordCompleter: return self.completer_mapping["field"] @property def group_completer(self) -> MostRecentlyUsedFirstWordCompleter: return self.completer_mapping["group"] @property def catetoryname_completer(self) -> MostRecentlyUsedFirstWordCompleter: return self.completer_mapping["categoryname"] @property def username_completer(self) -> MostRecentlyUsedFirstWordCompleter: return self.completer_mapping["username"] def get_completer(self, input_text): try: command, _ = split_command_args(input_text) # here will compile grammar for this command grammar = get_command_grammar(command) completer = GrammarCompleter( compiled_grammar=grammar, completers=self.completer_mapping ) except (InvalidArguments, AmbiguousCommand): completer = self.root_completer return completer def get_completions( self, document: Document, complete_event: CompleteEvent ) -> Iterable[Completion]: input_text = document.text self.current_completer = self.get_completer(input_text) return self.current_completer.get_completions(document, complete_event) def update_completer_for_input(self, command): completer = self.get_completer(command) grammar = completer.compiled_grammar m = grammar.match(command) if not m: # invalid command! return variables = m.variables() # auto update completion words, if it's LRU strategy. for _token, _completer in self.completer_mapping.items(): if not isinstance(_completer, MostRecentlyUsedFirstWordMixin): continue # getall always returns a [] tokens_in_command = variables.getall(_token) for _token_in_command in tokens_in_command: # prompt_toolkit didn't support multi tokens # like DEL key1 key2 key3 # so we have to split them manually for single_token in strip_quote_args(_token_in_command): _completer.touch(single_token) def update_completer_for_response(self, command_name, args, response): command_name = " ".join(command_name.split()).upper() logger.info( f"Try update completer using response... command_name is {command_name}" ) if response is None: return response = ensure_str(response) if command_name in ("HKEYS",): self.field_completer.touch_words(response) logger.debug(f"[Completer] field completer updated with {response}.") if command_name in ("HGETALL",): self.field_completer.touch_words(response[::2]) logger.debug(f"[Completer] field completer updated with {response[::2]}.") if command_name in ("ZPOPMAX", "ZPOPMIN", "ZRANGE", "ZRANGE", "ZRANGEBYSCORE"): if config.withscores: self.member_completer.touch_words(response[::2]) logger.debug( f"[Completer] member completer updated with {response[::2]}." ) else: self.member_completer.touch_words(response) logger.debug(f"[Completer] member completer updated with {response}.") if command_name in ("KEYS",): self.key_completer.touch_words(response) logger.debug(f"[Completer] key completer updated with {response}.") if command_name in ("SCAN",): self.key_completer.touch_words(response[1]) logger.debug(f"[Completer] key completer updated with {response[1]}.") if command_name in ("SSCAN", "ZSCAN"): self.member_completer.touch_words(response[1]) logger.debug(f"[Completer] member completer updated with {response[1]}.") if command_name in ("HSCAN",): self.field_completer.touch_words(response[1][::2]) logger.debug( f"[Completer] field completer updated with {response[1][::2]}." ) # only update categoryname completer when `ACL CAT` without args. if command_name == "ACL CAT" and not args: self.catetoryname_completer.touch_words(response) if command_name == "ACL USERS": self.username_completer.touch_words(response) def _touch_members(self, items): _step = 1 if config.withscores: _step = 2 self.member_completer.touch_words(ensure_str(items)[::_step]) def _touch_hash_pairs(self, items): self.field_completer.touch_words(ensure_str(items)[::2]) def _touch_keys(self, items): self.key_completer.touch_words(ensure_str(items)) def __repr__(self) -> str: return "DynamicCompleter({!r} -> {!r})".format( self.get_completer, self.current_completer, ) def get_completer_mapping(self, hint_on, completion_casing): completer_mapping = {} completer_mapping.update( { key: WordCompleter(tokens.split(" "), ignore_case=True) for key, tokens in CONST.items() } ) key_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, []) member_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, []) field_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, []) group_completer = MostRecentlyUsedFirstWordCompleter(config.completer_max, []) username_completer = MostRecentlyUsedFirstWordCompleter( config.completer_max, [] ) categoryname_completer = MostRecentlyUsedFirstWordCompleter(100, []) timestamp_ms_ago_completer = TimestampCompleter( is_milliseconds=True, future_time=False ) timestamp_ms_after_completer = TimestampCompleter( is_milliseconds=True, future_time=True ) timestamp_after_completer = TimestampCompleter( is_milliseconds=False, future_time=True ) integer_type_completer = IntegerTypeCompleter() completer_mapping.update( { # all key related completers share the same completer "keys": key_completer, "key": key_completer, "destination": key_completer, "newkey": key_completer, # member "member": member_completer, "members": member_completer, # zmember # TODO separate sorted set and set # hash fields "field": field_completer, "fields": field_completer, # stream groups "group": group_completer, # stream id "stream_id": timestamp_ms_ago_completer, "timestampms": timestamp_ms_after_completer, "timestamp": timestamp_after_completer, "inttype": integer_type_completer, "categoryname": categoryname_completer, "username": username_completer, } ) # command completer if hint_on: command_hint = { key: info["summary"] for key, info in commands_summary.items() } hint = { command: command_hint.get(command.upper()) for command in all_commands } hint.update( { command.lower(): command_hint.get(command.upper()) for command in all_commands } ) else: hint = {} upper_commands = all_commands[::-1] lower_commands = [command.lower() for command in all_commands[::-1]] auto_commands = upper_commands + lower_commands ignore_case = completion_casing != "auto" command_completions = { "auto": auto_commands, "upper": upper_commands, "lower": lower_commands, }.get(completion_casing) completer_mapping["command"] = WordCompleter( command_completions, ignore_case=ignore_case, sentence=True, meta_dict=hint ) return completer_mapping