1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
|
"""General purpose composite argument parsing and completion."""
from __future__ import annotations
import abc
import collections.abc as c
import contextlib
import dataclasses
import enum
import os
import re
import typing as t
# NOTE: When choosing delimiters, take into account Bash and argcomplete behavior.
#
# Recommended characters for assignment and/or continuation: `/` `:` `=`
#
# The recommended assignment_character list is due to how argcomplete handles continuation characters.
# see: https://github.com/kislyuk/argcomplete/blob/5a20d6165fbb4d4d58559378919b05964870cc16/argcomplete/__init__.py#L557-L558
PAIR_DELIMITER = ','
ASSIGNMENT_DELIMITER = '='
PATH_DELIMITER = '/'
# This class was originally frozen. However, that causes issues when running under Python 3.11.
# See: https://github.com/python/cpython/issues/99856
@dataclasses.dataclass
class Completion(Exception):
"""Base class for argument completion results."""
@dataclasses.dataclass
class CompletionUnavailable(Completion):
"""Argument completion unavailable."""
message: str = 'No completions available.'
@dataclasses.dataclass
class CompletionError(Completion):
"""Argument completion error."""
message: t.Optional[str] = None
@dataclasses.dataclass
class CompletionSuccess(Completion):
"""Successful argument completion result."""
list_mode: bool
consumed: str
continuation: str
matches: list[str] = dataclasses.field(default_factory=list)
@property
def preserve(self) -> bool:
"""
True if argcomplete should not mangle completion values, otherwise False.
Only used when more than one completion exists to avoid overwriting the word undergoing completion.
"""
return len(self.matches) > 1 and self.list_mode
@property
def completions(self) -> list[str]:
"""List of completion values to return to argcomplete."""
completions = self.matches
continuation = '' if self.list_mode else self.continuation
if not self.preserve:
# include the existing prefix to avoid rewriting the word undergoing completion
completions = [f'{self.consumed}{completion}{continuation}' for completion in completions]
return completions
class ParserMode(enum.Enum):
"""Mode the parser is operating in."""
PARSE = enum.auto()
COMPLETE = enum.auto()
LIST = enum.auto()
class ParserError(Exception):
"""Base class for all parsing exceptions."""
@dataclasses.dataclass
class ParserBoundary:
"""Boundary details for parsing composite input."""
delimiters: str
required: bool
match: t.Optional[str] = None
ready: bool = True
@dataclasses.dataclass
class ParserState:
"""State of the composite argument parser."""
mode: ParserMode
remainder: str = ''
consumed: str = ''
boundaries: list[ParserBoundary] = dataclasses.field(default_factory=list)
namespaces: list[t.Any] = dataclasses.field(default_factory=list)
parts: list[str] = dataclasses.field(default_factory=list)
@property
def incomplete(self) -> bool:
"""True if parsing is incomplete (unparsed input remains), otherwise False."""
return self.remainder is not None
def match(self, value: str, choices: list[str]) -> bool:
"""Return True if the given value matches the provided choices, taking into account parsing boundaries, otherwise return False."""
if self.current_boundary:
delimiters, delimiter = self.current_boundary.delimiters, self.current_boundary.match
else:
delimiters, delimiter = '', None
for choice in choices:
if choice.rstrip(delimiters) == choice:
# choice is not delimited
if value == choice:
return True # value matched
else:
# choice is delimited
if f'{value}{delimiter}' == choice:
return True # value and delimiter matched
return False
def read(self) -> str:
"""Read and return the next input segment, taking into account parsing boundaries."""
delimiters = "".join(boundary.delimiters for boundary in self.boundaries)
if delimiters:
pattern = '([' + re.escape(delimiters) + '])'
regex = re.compile(pattern)
parts = regex.split(self.remainder, 1)
else:
parts = [self.remainder]
if len(parts) > 1:
value, delimiter, remainder = parts
else:
value, delimiter, remainder = parts[0], None, None
for boundary in reversed(self.boundaries):
if delimiter and delimiter in boundary.delimiters:
boundary.match = delimiter
self.consumed += value + delimiter
break
boundary.match = None
boundary.ready = False
if boundary.required:
break
self.remainder = remainder
return value
@property
def root_namespace(self) -> t.Any:
"""THe root namespace."""
return self.namespaces[0]
@property
def current_namespace(self) -> t.Any:
"""The current namespace."""
return self.namespaces[-1]
@property
def current_boundary(self) -> t.Optional[ParserBoundary]:
"""The current parser boundary, if any, otherwise None."""
return self.boundaries[-1] if self.boundaries else None
def set_namespace(self, namespace: t.Any) -> None:
"""Set the current namespace."""
self.namespaces.append(namespace)
@contextlib.contextmanager
def delimit(self, delimiters: str, required: bool = True) -> c.Iterator[ParserBoundary]:
"""Context manager for delimiting parsing of input."""
boundary = ParserBoundary(delimiters=delimiters, required=required)
self.boundaries.append(boundary)
try:
yield boundary
finally:
self.boundaries.pop()
if boundary.required and not boundary.match:
raise ParserError('required delimiter not found, hit up-level delimiter or end of input instead')
@dataclasses.dataclass
class DocumentationState:
"""State of the composite argument parser's generated documentation."""
sections: dict[str, str] = dataclasses.field(default_factory=dict)
class Parser(metaclass=abc.ABCMeta):
"""Base class for all composite argument parsers."""
@abc.abstractmethod
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
def document(self, state: DocumentationState) -> t.Optional[str]:
"""Generate and return documentation for this parser."""
raise Exception(f'Undocumented parser: {type(self)}')
class MatchConditions(enum.Flag):
"""Acceptable condition(s) for matching user input to available choices."""
CHOICE = enum.auto()
"""Match any choice."""
ANY = enum.auto()
"""Match any non-empty string."""
NOTHING = enum.auto()
"""Match an empty string which is not followed by a boundary match."""
class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers which use a list of choices that can be generated during completion."""
def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None:
self.conditions = conditions
@abc.abstractmethod
def get_choices(self, value: str) -> list[str]:
"""Return a list of valid choices based on the given input value."""
def no_completion_match(self, value: str) -> CompletionUnavailable: # pylint: disable=unused-argument
"""Return an instance of CompletionUnavailable when no match was found for the given value."""
return CompletionUnavailable()
def no_choices_available(self, value: str) -> ParserError: # pylint: disable=unused-argument
"""Return an instance of ParserError when parsing fails and no choices are available."""
return ParserError('No choices available.')
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
value = state.read()
choices = self.get_choices(value)
if state.mode == ParserMode.PARSE or state.incomplete:
if self.conditions & MatchConditions.CHOICE and state.match(value, choices):
return value
if self.conditions & MatchConditions.ANY and value:
return value
if self.conditions & MatchConditions.NOTHING and not value and state.current_boundary and not state.current_boundary.match:
return value
if state.mode == ParserMode.PARSE:
if choices:
raise ParserError(f'"{value}" not in: {", ".join(choices)}')
raise self.no_choices_available(value)
raise CompletionUnavailable()
matches = [choice for choice in choices if choice.startswith(value)]
if not matches:
raise self.no_completion_match(value)
continuation = state.current_boundary.delimiters if state.current_boundary and state.current_boundary.required else ''
raise CompletionSuccess(
list_mode=state.mode == ParserMode.LIST,
consumed=state.consumed,
continuation=continuation,
matches=matches,
)
class ChoicesParser(DynamicChoicesParser):
"""Composite argument parser which relies on a static list of choices."""
def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
self.choices = choices
super().__init__(conditions=conditions)
def get_choices(self, value: str) -> list[str]:
"""Return a list of valid choices based on the given input value."""
return self.choices
def document(self, state: DocumentationState) -> t.Optional[str]:
"""Generate and return documentation for this parser."""
return '|'.join(self.choices)
class EnumValueChoicesParser(ChoicesParser):
"""Composite argument parser which relies on a static list of choices derived from the values of an enum."""
def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
self.enum_type = enum_type
super().__init__(choices=[str(item.value) for item in enum_type], conditions=conditions)
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
value = super().parse(state)
return self.enum_type(value)
class IntegerParser(DynamicChoicesParser):
"""Composite argument parser for integers."""
PATTERN = re.compile('^[1-9][0-9]*$')
def __init__(self, maximum: t.Optional[int] = None) -> None:
self.maximum = maximum
super().__init__()
def get_choices(self, value: str) -> list[str]:
"""Return a list of valid choices based on the given input value."""
if not value:
numbers = list(range(1, 10))
elif self.PATTERN.search(value):
int_prefix = int(value)
base = int_prefix * 10
numbers = [int_prefix] + [base + i for i in range(0, 10)]
else:
numbers = []
# NOTE: the minimum is currently fixed at 1
if self.maximum is not None:
numbers = [n for n in numbers if n <= self.maximum]
return [str(n) for n in numbers]
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
value = super().parse(state)
return int(value)
def document(self, state: DocumentationState) -> t.Optional[str]:
"""Generate and return documentation for this parser."""
return '{integer}'
class BooleanParser(ChoicesParser):
"""Composite argument parser for boolean (yes/no) values."""
def __init__(self) -> None:
super().__init__(['yes', 'no'])
def parse(self, state: ParserState) -> bool:
"""Parse the input from the given state and return the result."""
value = super().parse(state)
return value == 'yes'
class AnyParser(ChoicesParser):
"""Composite argument parser which accepts any input value."""
def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None:
self.no_match_message = no_match_message
conditions = MatchConditions.ANY
if nothing:
conditions |= MatchConditions.NOTHING
super().__init__([], conditions=conditions)
def no_completion_match(self, value: str) -> CompletionUnavailable:
"""Return an instance of CompletionUnavailable when no match was found for the given value."""
if self.no_match_message:
return CompletionUnavailable(message=self.no_match_message)
return super().no_completion_match(value)
def no_choices_available(self, value: str) -> ParserError:
"""Return an instance of ParserError when parsing fails and no choices are available."""
if self.no_match_message:
return ParserError(self.no_match_message)
return super().no_choices_available(value)
class RelativePathNameParser(DynamicChoicesParser):
"""Composite argument parser for relative path names."""
RELATIVE_NAMES = ['.', '..']
def __init__(self, choices: list[str]) -> None:
self.choices = choices
super().__init__()
def get_choices(self, value: str) -> list[str]:
"""Return a list of valid choices based on the given input value."""
choices = list(self.choices)
if value in self.RELATIVE_NAMES:
# complete relative names, but avoid suggesting them unless the current name is relative
# unfortunately this will be sorted in reverse of what bash presents ("../ ./" instead of "./ ../")
choices.extend(f'{item}{PATH_DELIMITER}' for item in self.RELATIVE_NAMES)
return choices
class FileParser(Parser):
"""Composite argument parser for absolute or relative file paths."""
def parse(self, state: ParserState) -> str:
"""Parse the input from the given state and return the result."""
if state.mode == ParserMode.PARSE:
path = AnyParser().parse(state)
if not os.path.isfile(path):
raise ParserError(f'Not a file: {path}')
else:
path = ''
with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
while boundary.ready:
directory = path or '.'
try:
with os.scandir(directory) as scan: # type: c.Iterator[os.DirEntry]
choices = [f'{item.name}{PATH_DELIMITER}' if item.is_dir() else item.name for item in scan]
except OSError:
choices = []
if not path:
choices.append(PATH_DELIMITER) # allow absolute paths
choices.append('../') # suggest relative paths
part = RelativePathNameParser(choices).parse(state)
path += f'{part}{boundary.match or ""}'
return path
class AbsolutePathParser(Parser):
"""Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
path = ''
with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
while boundary.ready:
if path:
path += AnyParser(nothing=True).parse(state)
else:
path += ChoicesParser([PATH_DELIMITER]).parse(state)
path += (boundary.match or '')
return path
class NamespaceParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers that store their results in a namespace."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = state.current_namespace
current = getattr(namespace, self.dest)
if current and self.limit_one:
if state.mode == ParserMode.PARSE:
raise ParserError('Option cannot be specified more than once.')
raise CompletionError('Option cannot be specified more than once.')
value = self.get_value(state)
if self.use_list:
if not current:
current = []
setattr(namespace, self.dest, current)
current.append(value)
else:
setattr(namespace, self.dest, value)
return value
def get_value(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result, without storing the result in the namespace."""
return super().parse(state)
@property
def use_list(self) -> bool:
"""True if the destination is a list, otherwise False."""
return False
@property
def limit_one(self) -> bool:
"""True if only one target is allowed, otherwise False."""
return not self.use_list
@property
@abc.abstractmethod
def dest(self) -> str:
"""The name of the attribute where the value should be stored."""
class NamespaceWrappedParser(NamespaceParser):
"""Composite argument parser that wraps a non-namespace parser and stores the result in a namespace."""
def __init__(self, dest: str, parser: Parser) -> None:
self._dest = dest
self.parser = parser
def get_value(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result, without storing the result in the namespace."""
return self.parser.parse(state)
@property
def dest(self) -> str:
"""The name of the attribute where the value should be stored."""
return self._dest
class KeyValueParser(Parser, metaclass=abc.ABCMeta):
"""Base class for key/value composite argument parsers."""
@abc.abstractmethod
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = state.current_namespace
parsers = self.get_parsers(state)
keys = list(parsers)
with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary
while pair.ready:
with state.delimit(ASSIGNMENT_DELIMITER):
key = ChoicesParser(keys).parse(state)
value = parsers[key].parse(state)
setattr(namespace, key, value)
keys.remove(key)
return namespace
class PairParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = self.create_namespace()
state.set_namespace(namespace)
with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary
choice = self.get_left_parser(state).parse(state)
if boundary.match:
self.get_right_parser(choice).parse(state)
return namespace
@property
def required(self) -> bool:
"""True if the delimiter (and thus right parser) is required, otherwise False."""
return False
@property
def delimiter(self) -> str:
"""The delimiter to use between the left and right parser."""
return PAIR_DELIMITER
@abc.abstractmethod
def create_namespace(self) -> t.Any:
"""Create and return a namespace."""
@abc.abstractmethod
def get_left_parser(self, state: ParserState) -> Parser:
"""Return the parser for the left side."""
@abc.abstractmethod
def get_right_parser(self, choice: t.Any) -> Parser:
"""Return the parser for the right side."""
class TypeParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument
"""Return a dictionary of type names and type parsers."""
return self.get_stateless_parsers()
@abc.abstractmethod
def get_stateless_parsers(self) -> dict[str, Parser]:
"""Return a dictionary of type names and type parsers."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
parsers = self.get_parsers(state)
with state.delimit(':'):
key = ChoicesParser(list(parsers)).parse(state)
value = parsers[key].parse(state)
return value
|