summaryrefslogtreecommitdiffstats
path: root/test/lib/ansible_test/_internal/cli/argparsing/parsers.py
blob: d07e03cbc86700b9ca2d6743ba1983c0db3eaa75 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
"""General purpose composite argument parsing and completion."""
from __future__ import annotations

import abc
import collections.abc as c
import contextlib
import dataclasses
import enum
import os
import re
import typing as t

# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior.
#
# Recommended characters for assignment and/or continuation: `/` `:` `=`
#
# The recommended assignment_character list is due to how argcomplete handles continuation characters.
# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558

PAIR_DELIMITER = ','
ASSIGNMENT_DELIMITER = '='
PATH_DELIMITER = '/'


# This class was originally frozen. However, that causes issues when running under Python 3.11.
# See: https://github.com/python/cpython/issues/99856
@dataclasses.dataclass
class Completion(Exception):
    """Base class for argument completion results."""


@dataclasses.dataclass
class CompletionUnavailable(Completion):
    """Argument completion unavailable."""
    message: str = 'No completions available.'


@dataclasses.dataclass
class CompletionError(Completion):
    """Argument completion error."""
    message: t.Optional[str] = None


@dataclasses.dataclass
class CompletionSuccess(Completion):
    """Successful argument completion result."""
    list_mode: bool
    consumed: str
    continuation: str
    matches: list[str] = dataclasses.field(default_factory=list)

    @property
    def preserve(self) -> bool:
        """
        True if argcomplete should not mangle completion values, otherwise False.
        Only used when more than one completion exists to avoid overwriting the word undergoing completion.
        """
        return len(self.matches) > 1 and self.list_mode

    @property
    def completions(self) -> list[str]:
        """List of completion values to return to argcomplete."""
        completions = self.matches
        continuation = '' if self.list_mode else self.continuation

        if not self.preserve:
            # include the existing prefix to avoid rewriting the word undergoing completion
            completions = [f'{self.consumed}{completion}{continuation}' for completion in completions]

        return completions


class ParserMode(enum.Enum):
    """Mode the parser is operating in."""
    PARSE = enum.auto()
    COMPLETE = enum.auto()
    LIST = enum.auto()


class ParserError(Exception):
    """Base class for all parsing exceptions."""


@dataclasses.dataclass
class ParserBoundary:
    """Boundary details for parsing composite input."""
    delimiters: str
    required: bool
    match: t.Optional[str] = None
    ready: bool = True


@dataclasses.dataclass
class ParserState:
    """State of the composite argument parser."""
    mode: ParserMode
    remainder: str = ''
    consumed: str = ''
    boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list)
    namespaces: list[t.Any] = dataclasses.field(default_factory=list)
    parts: list[str] = dataclasses.field(default_factory=list)

    @property
    def incomplete(self) -> bool:
        """True if parsing is incomplete (unparsed input remains), otherwise False."""
        return self.remainder is not None

    def match(self, value: str, choices: list[str]) -> bool:
        """Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False."""
        if self.current_boundary:
            delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match
        else:
            delimiters, delimiter = '', None

        for choice in choices:
            if choice.rstrip(delimiters) == choice:
                # choice is not delimited
                if value == choice:
                    return True  # value matched
            else:
                # choice is delimited
                if f'{value}{delimiter}' == choice:
                    return True  # value and delimiter matched

        return False

    def read(self) -> str:
        """Read and return the next input segment, taking into account parsing boundaries."""
        delimiters = "".join(boundary.delimiters for boundary in self.boundaries)

        if delimiters:
            pattern = '([' + re.escape(delimiters) + '])'
            regex = re.compile(pattern)
            parts = regex.split(self.remainder, 1)
        else:
            parts = [self.remainder]

        if len(parts) > 1:
            value, delimiter, remainder = parts
        else:
            value, delimiter, remainder = parts[0], None, None

        for boundary in reversed(self.boundaries):
            if delimiter and delimiter in boundary.delimiters:
                boundary.match = delimiter
                self.consumed += value + delimiter
                break

            boundary.match = None
            boundary.ready = False

            if boundary.required:
                break

        self.remainder = remainder

        return value

    @property
    def root_namespace(self) -> t.Any:
        """THe root namespace."""
        return self.namespaces[0]

    @property
    def current_namespace(self) -> t.Any:
        """The current namespace."""
        return self.namespaces[-1]

    @property
    def current_boundary(self) -> t.Optional[ParserBoundary]:
        """The current parser boundary, if any, otherwise None."""
        return self.boundaries[-1] if self.boundaries else None

    def set_namespace(self, namespace: t.Any) -> None:
        """Set the current namespace."""
        self.namespaces.append(namespace)

    @contextlib.contextmanager
    def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]:
        """Context manager for delimiting parsing of input."""
        boundary = ParserBoundary(delimiters=delimiters, required=required)

        self.boundaries.append(boundary)

        try:
            yield boundary
        finally:
            self.boundaries.pop()

        if boundary.required and not boundary.match:
            raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead')


@dataclasses.dataclass
class DocumentationState:
    """State of the composite argument parser's generated documentation."""
    sections: dict[str, str] = dataclasses.field(default_factory=dict)


class Parser(metaclass=abc.ABCMeta):
    """Base class for all composite argument parsers."""
    @abc.abstractmethod
    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""

    def document(self, state: DocumentationState) -> t.Optional[str]:
        """Generate and return documentation for this parser."""
        raise Exception(f'Undocumented parser: {type(self)}')


class MatchConditions(enum.Flag):
    """Acceptable condition(s) for matching user input to available choices."""
    CHOICE = enum.auto()
    """Match any choice."""
    ANY = enum.auto()
    """Match any non-empty string."""
    NOTHING = enum.auto()
    """Match an empty string which is not followed by a boundary match."""


class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers which use a list of choices that can be generated during completion."""
    def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None:
        self.conditions = conditions

    @abc.abstractmethod
    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""

    def no_completion_match(self, value: str) -> CompletionUnavailable:  # pylint: disable=unused-argument
        """Return an instance of CompletionUnavailable when no match was found for the given value."""
        return CompletionUnavailable()

    def no_choices_available(self, value: str) -> ParserError:  # pylint: disable=unused-argument
        """Return an instance of ParserError when parsing fails and no choices are available."""
        return ParserError('No choices available.')

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        value = state.read()
        choices = self.get_choices(value)

        if state.mode == ParserMode.PARSE or state.incomplete:
            if self.conditions & MatchConditions.CHOICE and state.match(value, choices):
                return value

            if self.conditions & MatchConditions.ANY and value:
                return value

            if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match:
                return value

            if state.mode == ParserMode.PARSE:
                if choices:
                    raise ParserError(f'"{value}" not in: {", ".join(choices)}')

                raise self.no_choices_available(value)

            raise CompletionUnavailable()

        matches = [choice for choice in choices if choice.startswith(value)]

        if not matches:
            raise self.no_completion_match(value)

        continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else ''

        raise CompletionSuccess(
            list_mode=state.mode == ParserMode.LIST,
            consumed=state.consumed,
            continuation=continuation,
            matches=matches,
        )


class ChoicesParser(DynamicChoicesParser):
    """Composite argument parser which relies on a static list of choices."""
    def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
        self.choices = choices

        super().__init__(conditions=conditions)

    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""
        return self.choices

    def document(self, state: DocumentationState) -> t.Optional[str]:
        """Generate and return documentation for this parser."""
        return '|'.join(self.choices)


class EnumValueChoicesParser(ChoicesParser):
    """Composite argument parser which relies on a static list of choices derived from the values of an enum."""
    def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
        self.enum_type = enum_type

        super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions)

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        value = super().parse(state)
        return self.enum_type(value)


class IntegerParser(DynamicChoicesParser):
    """Composite argument parser for integers."""
    PATTERN = re.compile('^[1-9][0-9]*$')

    def __init__(self, maximum: t.Optional[int] = None) -> None:
        self.maximum = maximum

        super().__init__()

    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""
        if not value:
            numbers = list(range(1, 10))
        elif self.PATTERN.search(value):
            int_prefix = int(value)
            base = int_prefix * 10
            numbers = [int_prefix] + [base + i for i in range(0, 10)]
        else:
            numbers = []

        # NOTE: the minimum is currently fixed at 1

        if self.maximum is not None:
            numbers = [n for n in numbers if n <= self.maximum]

        return [str(n) for n in numbers]

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        value = super().parse(state)
        return int(value)

    def document(self, state: DocumentationState) -> t.Optional[str]:
        """Generate and return documentation for this parser."""
        return '{integer}'


class BooleanParser(ChoicesParser):
    """Composite argument parser for boolean (yes/no) values."""
    def __init__(self) -> None:
        super().__init__(['yes', 'no'])

    def parse(self, state: ParserState) -> bool:
        """Parse the input from the given state and return the result."""
        value = super().parse(state)
        return value == 'yes'


class AnyParser(ChoicesParser):
    """Composite argument parser which accepts any input value."""
    def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None:
        self.no_match_message = no_match_message

        conditions = MatchConditions.ANY

        if nothing:
            conditions |= MatchConditions.NOTHING

        super().__init__([], conditions=conditions)

    def no_completion_match(self, value: str) -> CompletionUnavailable:
        """Return an instance of CompletionUnavailable when no match was found for the given value."""
        if self.no_match_message:
            return CompletionUnavailable(message=self.no_match_message)

        return super().no_completion_match(value)

    def no_choices_available(self, value: str) -> ParserError:
        """Return an instance of ParserError when parsing fails and no choices are available."""
        if self.no_match_message:
            return ParserError(self.no_match_message)

        return super().no_choices_available(value)


class RelativePathNameParser(DynamicChoicesParser):
    """Composite argument parser for relative path names."""
    RELATIVE_NAMES = ['.', '..']

    def __init__(self, choices: list[str]) -> None:
        self.choices = choices

        super().__init__()

    def get_choices(self, value: str) -> list[str]:
        """Return a list of valid choices based on the given input value."""
        choices = list(self.choices)

        if value in self.RELATIVE_NAMES:
            # complete relative names, but avoid suggesting them unless the current name is relative
            # unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../")
            choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES)

        return choices


class FileParser(Parser):
    """Composite argument parser for absolute or relative file paths."""
    def parse(self, state: ParserState) -> str:
        """Parse the input from the given state and return the result."""
        if state.mode == ParserMode.PARSE:
            path = AnyParser().parse(state)

            if not os.path.isfile(path):
                raise ParserError(f'Not a file: {path}')
        else:
            path = ''

            with state.delimit(PATH_DELIMITER, required=False) as boundary:  # type: ParserBoundary
                while boundary.ready:
                    directory = path or '.'

                    try:
                        with os.scandir(directory) as scan:  # type: c.Iterator[os.DirEntry]
                            choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan]
                    except OSError:
                        choices = []

                    if not path:
                        choices.append(PATH_DELIMITER)  # allow absolute paths
                        choices.append('../')  # suggest relative paths

                    part = RelativePathNameParser(choices).parse(state)
                    path += f'{part}{boundary.match or ""}'

        return path


class AbsolutePathParser(Parser):
    """Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence."""
    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        path = ''

        with state.delimit(PATH_DELIMITER, required=False) as boundary:  # type: ParserBoundary
            while boundary.ready:
                if path:
                    path += AnyParser(nothing=True).parse(state)
                else:
                    path += ChoicesParser([PATH_DELIMITER]).parse(state)

                path += (boundary.match or '')

        return path


class NamespaceParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers that store their results in a namespace."""
    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        namespace = state.current_namespace
        current = getattr(namespace, self.dest)

        if current and self.limit_one:
            if state.mode == ParserMode.PARSE:
                raise ParserError('Option cannot be specified more than once.')

            raise CompletionError('Option cannot be specified more than once.')

        value = self.get_value(state)

        if self.use_list:
            if not current:
                current = []
                setattr(namespace, self.dest, current)

            current.append(value)
        else:
            setattr(namespace, self.dest, value)

        return value

    def get_value(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result, without storing the result in the namespace."""
        return super().parse(state)

    @property
    def use_list(self) -> bool:
        """True if the destination is a list, otherwise False."""
        return False

    @property
    def limit_one(self) -> bool:
        """True if only one target is allowed, otherwise False."""
        return not self.use_list

    @property
    @abc.abstractmethod
    def dest(self) -> str:
        """The name of the attribute where the value should be stored."""


class NamespaceWrappedParser(NamespaceParser):
    """Composite argument parser that wraps a non-namespace parser and stores the result in a namespace."""
    def __init__(self, dest: str, parser: Parser) -> None:
        self._dest = dest
        self.parser = parser

    def get_value(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result, without storing the result in the namespace."""
        return self.parser.parse(state)

    @property
    def dest(self) -> str:
        """The name of the attribute where the value should be stored."""
        return self._dest


class KeyValueParser(Parser, metaclass=abc.ABCMeta):
    """Base class for key/value composite argument parsers."""
    @abc.abstractmethod
    def get_parsers(self, state: ParserState) -> dict[str, Parser]:
        """Return a dictionary of key names and value parsers."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        namespace = state.current_namespace
        parsers = self.get_parsers(state)
        keys = list(parsers)

        with state.delimit(PAIR_DELIMITER, required=False) as pair:  # type: ParserBoundary
            while pair.ready:
                with state.delimit(ASSIGNMENT_DELIMITER):
                    key = ChoicesParser(keys).parse(state)

                value = parsers[key].parse(state)

                setattr(namespace, key, value)

                keys.remove(key)

        return namespace


class PairParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter."""
    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        namespace = self.create_namespace()

        state.set_namespace(namespace)

        with state.delimit(self.delimiter, self.required) as boundary:  # type: ParserBoundary
            choice = self.get_left_parser(state).parse(state)

        if boundary.match:
            self.get_right_parser(choice).parse(state)

        return namespace

    @property
    def required(self) -> bool:
        """True if the delimiter (and thus right parser) is required, otherwise False."""
        return False

    @property
    def delimiter(self) -> str:
        """The delimiter to use between the left and right parser."""
        return PAIR_DELIMITER

    @abc.abstractmethod
    def create_namespace(self) -> t.Any:
        """Create and return a namespace."""

    @abc.abstractmethod
    def get_left_parser(self, state: ParserState) -> Parser:
        """Return the parser for the left side."""

    @abc.abstractmethod
    def get_right_parser(self, choice: t.Any) -> Parser:
        """Return the parser for the right side."""


class TypeParser(Parser, metaclass=abc.ABCMeta):
    """Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name."""
    def get_parsers(self, state: ParserState) -> dict[str, Parser]:  # pylint: disable=unused-argument
        """Return a dictionary of type names and type parsers."""
        return self.get_stateless_parsers()

    @abc.abstractmethod
    def get_stateless_parsers(self) -> dict[str, Parser]:
        """Return a dictionary of type names and type parsers."""

    def parse(self, state: ParserState) -> t.Any:
        """Parse the input from the given state and return the result."""
        parsers = self.get_parsers(state)

        with state.delimit(':'):
            key = ChoicesParser(list(parsers)).parse(state)

        value = parsers[key].parse(state)

        return value