summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/cli/argparsing/__init__.py
blob: 540cf5529ea6fff76855067de12819c8cc38f523 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""Completion finder which brings together custom options and completion logic."""
from __future__ import annotations

import abc
import argparse
import os
import re
import typing as t

from .argcompletion import (
    OptionCompletionFinder,
    get_comp_type,
    register_safe_action,
    warn,
)

from .parsers import (
    Completion,
    CompletionError,
    CompletionSuccess,
    CompletionUnavailable,
    DocumentationState,
    NamespaceParser,
    Parser,
    ParserError,
    ParserMode,
    ParserState,
)


class RegisteredCompletionFinder(OptionCompletionFinder):
    """
    Custom option completion finder for argcomplete which allows completion results to be registered.
    These registered completions, if provided, are used to filter the final completion results.
    This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221
    """
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        self.registered_completions: t.Optional[list[str]] = None

    def completer(
        self,
        prefix: str,
        action: argparse.Action,
        parsed_args: argparse.Namespace,
        **kwargs,
    ) -> list[str]:
        """
        Return a list of completions for the specified prefix and action.
        Use this as the completer function for argcomplete.
        """
        kwargs.clear()
        del kwargs

        completions = self.get_completions(prefix, action, parsed_args)

        if action.nargs and not isinstance(action.nargs, int):
            # prevent argcomplete from including unrelated arguments in the completion results
            self.registered_completions = completions

        return completions

    @abc.abstractmethod
    def get_completions(
        self,
        prefix: str,
        action: argparse.Action,
        parsed_args: argparse.Namespace,
    ) -> list[str]:
        """
        Return a list of completions for the specified prefix and action.
        Called by the complete function.
        """

    def quote_completions(self, completions, cword_prequote, last_wordbreak_pos):
        """Modify completion results before returning them."""
        if self.registered_completions is not None:
            # If one of the completion handlers registered their results, only allow those exact results to be returned.
            # This prevents argcomplete from adding results from other completers when they are known to be invalid.
            allowed_completions = set(self.registered_completions)
            completions = [completion for completion in completions if completion in allowed_completions]

        return super().quote_completions(completions, cword_prequote, last_wordbreak_pos)


class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
    """Base class for actions that parse composite arguments."""
    documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {}

    def __init__(
        self,
        *args,
        **kwargs,
    ):
        self.definition = self.create_parser()
        self.documentation_state[type(self)] = documentation_state = DocumentationState()
        self.definition.document(documentation_state)

        kwargs.update(dest=self.definition.dest)

        super().__init__(*args, **kwargs)

        register_safe_action(type(self))

    @abc.abstractmethod
    def create_parser(self) -> NamespaceParser:
        """Return a namespace parser to parse the argument associated with this action."""

    def __call__(
        self,
        parser,
        namespace,
        values,
        option_string=None,
    ):
        state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values)

        try:
            self.definition.parse(state)
        except ParserError as ex:
            error = str(ex)
        except CompletionError as ex:
            error = ex.message
        else:
            return

        if get_comp_type():
            # FUTURE: It may be possible to enhance error handling by surfacing this error message during downstream completion.
            return  # ignore parse errors during completion to avoid breaking downstream completion

        raise argparse.ArgumentError(self, error)


class CompositeActionCompletionFinder(RegisteredCompletionFinder):
    """Completion finder with support for composite argument parsing."""
    def get_completions(
        self,
        prefix: str,
        action: argparse.Action,
        parsed_args: argparse.Namespace,
    ) -> list[str]:
        """Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed."""
        assert isinstance(action, CompositeAction)

        state = ParserState(
            mode=ParserMode.LIST if self.list_mode else ParserMode.COMPLETE,
            remainder=prefix,
            namespaces=[parsed_args],
        )

        answer = complete(action.definition, state)

        completions = []

        if isinstance(answer, CompletionSuccess):
            self.disable_completion_mangling = answer.preserve
            completions = answer.completions

        if isinstance(answer, CompletionError):
            warn(answer.message)

        return completions


def detect_file_listing(value: str, mode: ParserMode) -> bool:
    """
    Return True if Bash will show a file listing and redraw the prompt, otherwise return False.

    If there are no list results, a file listing will be shown if the value after the last `=` or `:` character:

        - is empty
        - matches a full path
        - matches a partial path

    Otherwise Bash will play the bell sound and display nothing.

    see: https://github.com/kislyuk/argcomplete/issues/328
    see: https://github.com/kislyuk/argcomplete/pull/284
    """
    listing = False

    if mode == ParserMode.LIST:
        right = re.split('[=:]', value)[-1]
        listing = not right or os.path.exists(right)

        if not listing:
            directory = os.path.dirname(right)

            # noinspection PyBroadException
            try:
                filenames = os.listdir(directory or '.')
            except Exception:  # pylint: disable=broad-except
                pass
            else:
                listing = any(filename.startswith(right) for filename in filenames)

    return listing


def detect_false_file_completion(value: str, mode: ParserMode) -> bool:
    """
    Return True if Bash will provide an incorrect file completion, otherwise return False.

    If there are no completion results, a filename will be automatically completed if the value after the last `=` or `:` character:

        - matches exactly one partial path

    Otherwise Bash will play the bell sound and display nothing.

    see: https://github.com/kislyuk/argcomplete/issues/328
    see: https://github.com/kislyuk/argcomplete/pull/284
    """
    completion = False

    if mode == ParserMode.COMPLETE:
        completion = True

        right = re.split('[=:]', value)[-1]
        directory, prefix = os.path.split(right)

        # noinspection PyBroadException
        try:
            filenames = os.listdir(directory or '.')
        except Exception:  # pylint: disable=broad-except
            pass
        else:
            matches = [filename for filename in filenames if filename.startswith(prefix)]
            completion = len(matches) == 1

    return completion


def complete(
    completer: Parser,
    state: ParserState,
) -> Completion:
    """Perform argument completion using the given completer and return the completion result."""
    value = state.remainder

    answer: Completion

    try:
        completer.parse(state)
        raise ParserError('completion expected')
    except CompletionUnavailable as ex:
        if detect_file_listing(value, state.mode):
            # Displaying a warning before the file listing informs the user it is invalid. Bash will redraw the prompt after the list.
            # If the file listing is not shown, a warning could be helpful, but would introduce noise on the terminal since the prompt is not redrawn.
            answer = CompletionError(ex.message)
        elif detect_false_file_completion(value, state.mode):
            # When the current prefix provides no matches, but matches files a single file on disk, Bash will perform an incorrect completion.
            # Returning multiple invalid matches instead of no matches will prevent Bash from using its own completion logic in this case.
            answer = CompletionSuccess(
                list_mode=True,  # abuse list mode to enable preservation of the literal results
                consumed='',
                continuation='',
                matches=['completion', 'invalid']
            )
        else:
            answer = ex
    except Completion as ex:
        answer = ex

    return answer