diff options
Diffstat (limited to 'src')
21 files changed, 1414 insertions, 410 deletions
diff --git a/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py b/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py index e72a6ce..dcb5063 100644 --- a/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py +++ b/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py @@ -13,7 +13,7 @@ _EDITOR_SNIPPETS = { ;; ;; Add to ~/.emacs or ~/.emacs.d/init.el and then activate via `M-x eglot`. ;; - ;; Requires: apt install elpa-dpkg-dev-el + ;; Requires: apt install elpa-dpkg-dev-el elpa-yaml-mode ;; Recommends: apt install elpa-markdown-mode ;; Make emacs recognize debian/debputy.manifest as a YAML file @@ -29,6 +29,8 @@ _EDITOR_SNIPPETS = { ;; The debian/rules file uses the qmake mode. (add-to-list 'eglot-server-programs '(makefile-gmake-mode . ("debputy" "lsp" "server"))) + (add-to-list 'eglot-server-programs + '(yaml-mode . ("debputy" "lsp" "server"))) ) ;; Auto-start eglot for the relevant modes. @@ -39,6 +41,7 @@ _EDITOR_SNIPPETS = { ;; (add-hook 'debian-changelog-mode-hook 'eglot-ensure) (add-hook 'debian-copyright-mode-hook 'eglot-ensure) (add-hook 'makefile-gmake-mode-hook 'eglot-ensure) + (add-hook 'yaml-mode-hook 'eglot-ensure) """ ), "vim": "vim+youcompleteme", @@ -112,9 +115,15 @@ def lsp_server_cmd(context: CommandContext) -> None: "This feature requires lsprotocol and pygls (apt-get install python3-lsprotocol python3-pygls)" ) - from debputy.lsp.lsp_features import ensure_lsp_features_are_loaded + feature_set = context.load_plugins() + + from debputy.lsp.lsp_features import ( + ensure_lsp_features_are_loaded, + lsp_set_plugin_features, + ) from debputy.lsp.lsp_dispatch import DEBPUTY_LANGUAGE_SERVER + lsp_set_plugin_features(feature_set) ensure_lsp_features_are_loaded() debputy_language_server = DEBPUTY_LANGUAGE_SERVER @@ -134,12 +143,27 @@ def lsp_server_cmd(context: CommandContext) -> None: "editor_name", metavar="editor", choices=_EDITOR_SNIPPETS, + default=None, + nargs="?", help="The editor to provide a snippet for", ), ], ) def lsp_editor_glue(context: CommandContext) -> None: editor_name = context.parsed_args.editor_name + + if editor_name is None: + content = [] + for editor_name, payload in _EDITOR_SNIPPETS.items(): + alias_of = "" + if payload in _EDITOR_SNIPPETS: + alias_of = f" (short for: {payload})" + content.append((editor_name, alias_of)) + max_name = max(len(c[0]) for c in content) + print("This version of debputy has editor snippets for the following editors: ") + for editor_name, alias_of in content: + print(f" * {editor_name:<{max_name}}{alias_of}") + return result = _EDITOR_SNIPPETS[editor_name] while result in _EDITOR_SNIPPETS: result = _EDITOR_SNIPPETS[result] @@ -200,6 +224,12 @@ def lint_cmd(context: CommandContext) -> None: from debputy.linting.lint_impl import perform_linting context.must_be_called_in_source_root() + feature_set = context.load_plugins() + + from debputy.lsp.lsp_features import lsp_set_plugin_features + + lsp_set_plugin_features(feature_set) + perform_linting(context) diff --git a/src/debputy/commands/debputy_cmd/plugin_cmds.py b/src/debputy/commands/debputy_cmd/plugin_cmds.py index 2456902..69b2a2a 100644 --- a/src/debputy/commands/debputy_cmd/plugin_cmds.py +++ b/src/debputy/commands/debputy_cmd/plugin_cmds.py @@ -37,6 +37,7 @@ from debputy.manifest_parser.declarative_parser import ( BASIC_SIMPLE_TYPES, ) from debputy.manifest_parser.parser_data import ParserContextData +from debputy.manifest_parser.parser_doc import render_rule from debputy.manifest_parser.util import unpack_type, AttributePath from debputy.packager_provided_files import detect_all_packager_provided_files from debputy.plugin.api.example_processing import ( @@ -449,8 +450,9 @@ def _plugin_cmd_list_manifest_rules(context: CommandContext) -> None: # to derive to this common base type on its own. base_type = Iterable[Tuple[Union[str, Type[Any]], DispatchingParserBase[Any]]] - table_parsers: base_type = feature_set.dispatchable_table_parsers.items() - object_parsers: base_type = feature_set.dispatchable_object_parsers.items() + parser_generator = feature_set.manifest_parser_generator + table_parsers: base_type = parser_generator.dispatchable_table_parsers.items() + object_parsers: base_type = parser_generator.dispatchable_object_parsers.items() parsers = chain( table_parsers, @@ -493,215 +495,6 @@ def _plugin_cmd_list_automatic_discard_rules(context: CommandContext) -> None: ) -def _provide_placeholder_parser_doc( - parser_doc: Optional[ParserDocumentation], - attributes: Iterable[str], -) -> ParserDocumentation: - if parser_doc is None: - parser_doc = reference_documentation() - changes = {} - if parser_doc.attribute_doc is None: - changes["attribute_doc"] = [undocumented_attr(attr) for attr in attributes] - - if changes: - return parser_doc.replace(**changes) - return parser_doc - - -def _doc_args_parser_doc( - rule_name: str, - declarative_parser: DeclarativeInputParser[Any], - plugin_metadata: DebputyPluginMetadata, -) -> Tuple[Mapping[str, str], ParserDocumentation]: - attributes: Iterable[str] - if isinstance(declarative_parser, DeclarativeMappingInputParser): - attributes = declarative_parser.source_attributes.keys() - else: - attributes = [] - doc_args = { - "RULE_NAME": rule_name, - "MANIFEST_FORMAT_DOC": f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md", - "PLUGIN_NAME": plugin_metadata.plugin_name, - } - parser_doc = _provide_placeholder_parser_doc( - declarative_parser.inline_reference_documentation, - attributes, - ) - return doc_args, parser_doc - - -def _render_rule( - rule_name: str, - rule_type: str, - declarative_parser: DeclarativeInputParser[Any], - plugin_metadata: DebputyPluginMetadata, - manifest_attribute_path: str, -) -> None: - is_root_rule = rule_name == "::" - - doc_args, parser_doc = _doc_args_parser_doc( - "the manifest root" if is_root_rule else rule_name, - declarative_parser, - plugin_metadata, - ) - t = assume_not_none(parser_doc.title).format(**doc_args) - print(t) - print("=" * len(t)) - print() - - print(assume_not_none(parser_doc.description).format(**doc_args).rstrip()) - - print() - alt_form_parser = getattr(declarative_parser, "alt_form_parser", None) - if isinstance( - declarative_parser, (DeclarativeMappingInputParser, DispatchingObjectParser) - ): - if isinstance(declarative_parser, DeclarativeMappingInputParser): - attributes = declarative_parser.source_attributes - required = declarative_parser.input_time_required_parameters - conditionally_required = declarative_parser.at_least_one_of - mutually_exclusive = declarative_parser.mutually_exclusive_attributes - is_list_wrapped = declarative_parser.is_list_wrapped - else: - attributes = {} - required = frozenset() - conditionally_required = frozenset() - mutually_exclusive = frozenset() - is_list_wrapped = False - if is_list_wrapped: - print("List where each element has the following attributes:") - else: - print("Attributes:") - attribute_docs = ( - parser_doc.attribute_doc if parser_doc.attribute_doc is not None else [] - ) - for attr_doc in assume_not_none(attribute_docs): - attr_description = attr_doc.description - prefix = " - " - - for parameter in sorted(attr_doc.attributes): - parameter_details = attributes.get(parameter) - if parameter_details is not None: - source_name = parameter_details.source_attribute_name - describe_type = parameter_details.type_validator.describe_type() - else: - assert isinstance(declarative_parser, DispatchingObjectParser) - source_name = parameter - subparser = declarative_parser.parser_for(source_name).parser - if isinstance(subparser, DispatchingObjectParser): - rule_prefix = rule_name if rule_name != "::" else "" - describe_type = f"Object (see `{rule_prefix}::{subparser.manifest_attribute_path_template}`)" - elif isinstance(subparser, DeclarativeMappingInputParser): - describe_type = "<Type definition not implemented yet>" # TODO: Derive from subparser - elif isinstance(subparser, DeclarativeNonMappingInputParser): - describe_type = ( - subparser.alt_form_parser.type_validator.describe_type() - ) - else: - describe_type = f"<Unknown: Non-introspectable subparser - {subparser.__class__.__name__}>" - - if source_name in required: - req_str = "required" - elif any(source_name in s for s in conditionally_required): - req_str = "conditional" - else: - req_str = "optional" - print(f"{prefix}`{source_name}` ({req_str}): {describe_type}") - prefix = " " - - if attr_description: - print() - for line in attr_description.format(**doc_args).splitlines( - keepends=False - ): - print(f" {line}") - print() - - if ( - bool(conditionally_required) - or bool(mutually_exclusive) - or any(pd.conflicting_attributes for pd in attributes.values()) - ): - print() - if is_list_wrapped: - print( - "This rule enforces the following restrictions on each element in the list:" - ) - else: - print("This rule enforces the following restrictions:") - - if conditionally_required or mutually_exclusive: - all_groups = set( - itertools.chain(conditionally_required, mutually_exclusive) - ) - for g in all_groups: - anames = "`, `".join(g) - is_mx = g in mutually_exclusive - is_cr = g in conditionally_required - if is_mx and is_cr: - print(f" - The rule must use exactly one of: `{anames}`") - elif is_cr: - print(f" - The rule must use at least one of: `{anames}`") - else: - assert is_mx - print( - f" - The following attributes are mutually exclusive: `{anames}`" - ) - - if mutually_exclusive or any( - pd.conflicting_attributes for pd in attributes.values() - ): - for parameter, parameter_details in sorted(attributes.items()): - source_name = parameter_details.source_attribute_name - conflicts = set(parameter_details.conflicting_attributes) - for mx in mutually_exclusive: - if parameter in mx and mx not in conditionally_required: - conflicts |= mx - if conflicts: - conflicts.discard(parameter) - cnames = "`, `".join( - attributes[a].source_attribute_name for a in conflicts - ) - print( - f" - The attribute `{source_name}` cannot be used with any of: `{cnames}`" - ) - print() - if alt_form_parser is not None: - # FIXME: Mapping[str, Any] ends here, which is ironic given the headline. - print(f"Non-mapping format: {alt_form_parser.type_validator.describe_type()}") - alt_parser_desc = parser_doc.alt_parser_description - if alt_parser_desc: - for line in alt_parser_desc.format(**doc_args).splitlines(keepends=False): - print(f" {line}") - print() - - if declarative_parser.reference_documentation_url is not None: - print( - f"Reference documentation: {declarative_parser.reference_documentation_url}" - ) - else: - print( - "Reference documentation: No reference documentation link provided by the plugin" - ) - - if not is_root_rule: - print( - f"Used in: {manifest_attribute_path if manifest_attribute_path != '<ROOT>' else 'The manifest root'}" - ) - print(f"Rule reference: {rule_type}::{rule_name}") - print(f"Plugin: {plugin_metadata.plugin_name}") - else: - print(f"Rule reference: {rule_name}") - - print() - print( - "PS: If you want to know more about a non-trivial type of an attribute such as `FileSystemMatchRule`," - ) - print( - "you can use `debputy plugin show type-mappings FileSystemMatchRule` to look it up " - ) - - def _render_manifest_variable_value(v: Optional[str]) -> str: if v is None: return "(N/A: Cannot resolve the variable)" @@ -991,8 +784,9 @@ def _plugin_cmd_show_manifest_rule(context: CommandContext) -> None: matched = [] base_type = Iterable[Tuple[Union[str, Type[Any]], DispatchingParserBase[Any]]] - table_parsers: base_type = feature_set.dispatchable_table_parsers.items() - object_parsers: base_type = feature_set.dispatchable_object_parsers.items() + parser_generator = feature_set.manifest_parser_generator + table_parsers: base_type = parser_generator.dispatchable_table_parsers.items() + object_parsers: base_type = parser_generator.dispatchable_object_parsers.items() parsers = chain( table_parsers, @@ -1034,17 +828,36 @@ def _plugin_cmd_show_manifest_rule(context: CommandContext) -> None: plugin_metadata = plugin_provided_parser.plugin_metadata else: rule_name = "::" - parser = feature_set.dispatchable_object_parsers[OPARSER_MANIFEST_ROOT] + parser = parser_generator.dispatchable_object_parsers[OPARSER_MANIFEST_ROOT] parser_type_name = "" plugin_metadata = plugin_metadata_for_debputys_own_plugin() manifest_attribute_path = "" - _render_rule( - rule_name, - parser_type_name, - parser, - plugin_metadata, - manifest_attribute_path, + is_root_rule = rule_name == "::" + print( + render_rule( + rule_name, + parser, + plugin_metadata, + is_root_rule=is_root_rule, + ) + ) + + if not is_root_rule: + print( + f"Used in: {manifest_attribute_path if manifest_attribute_path != '<ROOT>' else 'The manifest root'}" + ) + print(f"Rule reference: {parser_type_name}::{rule_name}") + print(f"Plugin: {plugin_metadata.plugin_name}") + else: + print(f"Rule reference: {rule_name}") + + print() + print( + "PS: If you want to know more about a non-trivial type of an attribute such as `FileSystemMatchRule`," + ) + print( + "you can use `debputy plugin show type-mappings FileSystemMatchRule` to look it up " ) diff --git a/src/debputy/highlevel_manifest.py b/src/debputy/highlevel_manifest.py index 1e92210..30440f1 100644 --- a/src/debputy/highlevel_manifest.py +++ b/src/debputy/highlevel_manifest.py @@ -22,9 +22,6 @@ from typing import ( ) from debian.debian_support import DpkgArchTable -from ruamel.yaml import YAML -from ruamel.yaml.comments import CommentedMap, CommentedSeq - from ._deb_options_profiles import DebBuildOptionsAndProfiles from ._manifest_constants import * from .architecture_support import DpkgArchitectureBuildProcessValuesTable @@ -77,8 +74,8 @@ from .util import ( generated_content_dir, _info, ) - -MANIFEST_YAML = YAML() +from .yaml import MANIFEST_YAML +from .yaml.compat import CommentedMap, CommentedSeq @dataclass(slots=True) diff --git a/src/debputy/highlevel_manifest_parser.py b/src/debputy/highlevel_manifest_parser.py index 24d05c7..28a3f80 100644 --- a/src/debputy/highlevel_manifest_parser.py +++ b/src/debputy/highlevel_manifest_parser.py @@ -15,13 +15,11 @@ from typing import ( ) from debian.debian_support import DpkgArchTable -from ruamel.yaml import YAMLError from debputy.highlevel_manifest import ( HighLevelManifest, PackageTransformationDefinition, MutableYAMLManifest, - MANIFEST_YAML, ) from debputy.maintscript_snippet import ( MaintscriptSnippet, @@ -54,10 +52,11 @@ from .plugin.api.impl_types import ( TP, TTP, DispatchingTableParser, - OPARSER_PACKAGES, OPARSER_MANIFEST_ROOT, + PackageContextData, ) from .plugin.api.feature_set import PluginProvidedFeatureSet +from .yaml import YAMLError, MANIFEST_YAML try: from Levenshtein import distance @@ -273,7 +272,9 @@ class HighLevelManifestParser(ParserContextData): self._package_state_stack.pop() def dispatch_parser_table_for(self, rule_type: TTP) -> DispatchingTableParser[TP]: - t = self._plugin_provided_feature_set.dispatchable_table_parsers.get(rule_type) + t = self._plugin_provided_feature_set.manifest_parser_generator.dispatch_parser_table_for( + rule_type + ) if t is None: raise AssertionError( f"Internal error: No dispatching parser for {rule_type.__name__}" @@ -440,33 +441,30 @@ class YAMLManifestParser(HighLevelManifestParser): def from_yaml_dict(self, yaml_data: object) -> "HighLevelManifest": attribute_path = AttributePath.root_path() - manifest_root_parser = ( - self._plugin_provided_feature_set.dispatchable_object_parsers[ - OPARSER_MANIFEST_ROOT - ] - ) + parser_generator = self._plugin_provided_feature_set.manifest_parser_generator + dispatchable_object_parsers = parser_generator.dispatchable_object_parsers + manifest_root_parser = dispatchable_object_parsers[OPARSER_MANIFEST_ROOT] parsed_data = cast( "ManifestRootRule", - manifest_root_parser.parse( + manifest_root_parser.parse_input( yaml_data, attribute_path, parser_context=self, ), ) - packages_dict = parsed_data.get("packages", {}) + packages_dict: Mapping[str, PackageContextData[Mapping[str, Any]]] = cast( + "Mapping[str, PackageContextData[Mapping[str, Any]]]", + parsed_data.get("packages", {}), + ) install_rules = parsed_data.get("installations") if install_rules: self._install_rules = install_rules packages_parent_path = attribute_path["packages"] - for package_name_raw, v in packages_dict.items(): + for package_name_raw, pcd in packages_dict.items(): definition_source = packages_parent_path[package_name_raw] - package_name = package_name_raw - if "{{" in package_name: - package_name = self.substitution.substitute( - package_name_raw, - definition_source.path, - ) + package_name = pcd.resolved_package_name + parsed = pcd.value package_state: PackageTransformationDefinition with self.binary_package_context(package_name) as package_state: @@ -476,17 +474,6 @@ class YAMLManifestParser(HighLevelManifestParser): f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an' " auto-generated package." ) - package_rule_parser = ( - self._plugin_provided_feature_set.dispatchable_object_parsers[ - OPARSER_PACKAGES - ] - ) - parsed = cast( - "Mapping[str, Any]", - package_rule_parser.parse( - v, definition_source, parser_context=self - ), - ) binary_version = parsed.get("binary-version") if binary_version is not None: package_state.binary_version = ( diff --git a/src/debputy/linting/lint_impl.py b/src/debputy/linting/lint_impl.py index 5df76c4..66ff635 100644 --- a/src/debputy/linting/lint_impl.py +++ b/src/debputy/linting/lint_impl.py @@ -75,7 +75,7 @@ def perform_linting(context: CommandContext) -> None: if os.path.isfile("debian/debputy.manifest"): _info("Note: Due to a limitation in the linter, debian/debputy.manifest is") _info("only **partially** checked by this command at the time of writing.") - _info("Please use `debputy check-manifest` for checking the manifest.") + _info("Please use `debputy check-manifest` to fully check the manifest.") if linter_exit_code: _exit_with_lint_code(lint_report) diff --git a/src/debputy/lsp/logins-and-people.dic b/src/debputy/lsp/logins-and-people.dic index 8c231b2..dca29cf 100644 --- a/src/debputy/lsp/logins-and-people.dic +++ b/src/debputy/lsp/logins-and-people.dic @@ -132,6 +132,7 @@ Jorgen Josip Josselin Jover +Kalnischkies Kastner Kel Kis diff --git a/src/debputy/lsp/lsp_debian_debputy_manifest.py b/src/debputy/lsp/lsp_debian_debputy_manifest.py index 97fffcc..1dc5fb3 100644 --- a/src/debputy/lsp/lsp_debian_debputy_manifest.py +++ b/src/debputy/lsp/lsp_debian_debputy_manifest.py @@ -1,7 +1,14 @@ -import re from typing import ( Optional, List, + Any, + Tuple, + Union, + Iterable, + Sequence, + Literal, + get_args, + get_origin, ) from lsprotocol.types import ( @@ -10,17 +17,63 @@ from lsprotocol.types import ( Position, Range, DiagnosticSeverity, + HoverParams, + Hover, + MarkupKind, + MarkupContent, + TEXT_DOCUMENT_CODE_ACTION, + CompletionParams, + CompletionList, + CompletionItem, +) +from debputy.lsp.quickfixes import propose_correct_text_quick_fix +from debputy.manifest_parser.base_types import DebputyDispatchableType +from debputy.plugin.api.feature_set import PluginProvidedFeatureSet +from debputy.yaml.compat import ( + Node, + CommentedMap, + LineCol, + CommentedSeq, + CommentedBase, + MarkedYAMLError, + YAMLError, ) -from ruamel.yaml.error import MarkedYAMLError, YAMLError from debputy.highlevel_manifest import MANIFEST_YAML from debputy.lsp.lsp_features import ( lint_diagnostics, lsp_standard_handler, + lsp_hover, + lsp_get_plugin_features, + lsp_completer, ) from debputy.lsp.text_util import ( LintCapablePositionCodec, + detect_possible_typo, +) +from debputy.manifest_parser.declarative_parser import ( + AttributeDescription, + ParserGenerator, +) +from debputy.manifest_parser.declarative_parser import DeclarativeMappingInputParser +from debputy.manifest_parser.parser_doc import ( + render_rule, + render_attribute_doc, + doc_args_for_parser_doc, ) +from debputy.manifest_parser.util import AttributePath +from debputy.plugin.api.impl import plugin_metadata_for_debputys_own_plugin +from debputy.plugin.api.impl_types import ( + OPARSER_MANIFEST_ROOT, + DeclarativeInputParser, + DispatchingParserBase, + DebputyPluginMetadata, + ListWrappedDeclarativeInputParser, + InPackageContextParser, + DeclarativeValuelessKeywordInputParser, +) +from debputy.util import _info, _warn + try: from pygls.server import LanguageServer @@ -28,11 +81,6 @@ except ImportError: pass -_CONTAINS_TAB_OR_COLON = re.compile(r"[\t:]") -_WORDS_RE = re.compile("([a-zA-Z0-9_-]+)") -_MAKE_ERROR_RE = re.compile(r"^[^:]+:(\d+):\s*(\S.+)") - - _LANGUAGE_IDS = [ "debian/debputy.manifest", "debputy.manifest", @@ -41,7 +89,7 @@ _LANGUAGE_IDS = [ ] -# lsp_standard_handler(_LANGUAGE_IDS, TEXT_DOCUMENT_CODE_ACTION) +lsp_standard_handler(_LANGUAGE_IDS, TEXT_DOCUMENT_CODE_ACTION) lsp_standard_handler(_LANGUAGE_IDS, TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL) @@ -83,14 +131,20 @@ def _lint_debian_debputy_manifest( return None diagnostics = [] try: - MANIFEST_YAML.load("".join(lines)) + content = MANIFEST_YAML.load("".join(lines)) except MarkedYAMLError as e: + if e.context_mark: + line = e.context_mark.line + column = e.context_mark.column + 1 + else: + line = e.problem_mark.line + column = e.problem_mark.column + 1 error_range = position_codec.range_to_client_units( lines, _word_range_at_position( lines, - e.problem_mark.line, - e.problem_mark.column, + line, + column, ), ) diagnostics.append( @@ -115,5 +169,629 @@ def _lint_debian_debputy_manifest( DiagnosticSeverity.Error, ), ) - + else: + feature_set = lsp_get_plugin_features() + root_parser = feature_set.manifest_parser_generator.dispatchable_object_parsers[ + OPARSER_MANIFEST_ROOT + ] + diagnostics.extend(_lint_content(root_parser, content, lines, position_codec)) return diagnostics + + +def _lint_content( + parser: DeclarativeInputParser[Any], + content: Any, + lines: List[str], + position_codec: LintCapablePositionCodec, +) -> Iterable[Diagnostic]: + if isinstance(parser, DispatchingParserBase): + if not isinstance(content, CommentedMap): + return + lc = content.lc + for key, value in content.items(): + if not parser.is_known_keyword(key): + line, col = lc.key(key) + key_range = position_codec.range_to_client_units( + lines, + Range( + Position( + line, + col, + ), + Position( + line, + col + len(key), + ), + ), + ) + + candidates = detect_possible_typo(key, parser.registered_keywords()) + + yield Diagnostic( + key_range, + f"Unknown or unsupported key {key}", + DiagnosticSeverity.Error, + source="debputy", + data=[propose_correct_text_quick_fix(n) for n in candidates], + ) + else: + subparser = parser.parser_for(key) + assert subparser is not None + yield from _lint_content(subparser.parser, value, lines, position_codec) + elif isinstance(parser, ListWrappedDeclarativeInputParser): + if not isinstance(content, CommentedSeq): + return + subparser = parser.delegate + for value in content: + yield from _lint_content(subparser, value, lines, position_codec) + elif isinstance(parser, InPackageContextParser): + if not isinstance(content, CommentedMap): + return + for v in content.values(): + yield from _lint_content(parser.delegate, v, lines, position_codec) + elif isinstance(parser, DeclarativeMappingInputParser): + if not isinstance(content, CommentedMap): + return + lc = content.lc + for key, value in content.items(): + attr = parser.manifest_attributes.get(key) + if attr is None: + line, col = lc.key(key) + key_range = position_codec.range_to_client_units( + lines, + Range( + Position( + line, + col, + ), + Position( + line, + col + len(key), + ), + ), + ) + + candidates = detect_possible_typo(key, parser.manifest_attributes) + yield Diagnostic( + key_range, + f"Unknown or unsupported key {key}", + DiagnosticSeverity.Error, + source="debputy", + data=[propose_correct_text_quick_fix(n) for n in candidates], + ) + + +def is_at(position: Position, lc_pos: Tuple[int, int]) -> bool: + return position.line == lc_pos[0] and position.character == lc_pos[1] + + +def is_before(position: Position, lc_pos: Tuple[int, int]) -> bool: + line, column = lc_pos + if position.line < line: + return True + if position.line == line and position.character < column: + return True + return False + + +def is_after(position: Position, lc_pos: Tuple[int, int]) -> bool: + line, column = lc_pos + if position.line > line: + return True + if position.line == line and position.character > column: + return True + return False + + +def _trace_cursor( + content: Any, + attribute_path: AttributePath, + server_position: Position, +) -> Optional[Tuple[bool, AttributePath, Any, Any]]: + matched_key: Optional[Union[str, int]] = None + matched: Optional[Node] = None + matched_was_key: bool = False + + if isinstance(content, CommentedMap): + dict_lc: LineCol = content.lc + for k, v in content.items(): + k_lc = dict_lc.key(k) + if is_before(server_position, k_lc): + break + v_lc = dict_lc.value(k) + if is_before(server_position, v_lc): + # TODO: Handle ":" and "whitespace" + matched = k + matched_key = k + matched_was_key = True + break + matched = v + matched_key = k + matched_was_key = False + elif isinstance(content, CommentedSeq): + list_lc: LineCol = content.lc + for idx, value in enumerate(content): + i_lc = list_lc.item(idx) + if is_before(server_position, i_lc): + break + matched_key = idx + matched = value + + if matched is not None: + assert matched_key is not None + sub_path = attribute_path[matched_key] + if not matched_was_key and isinstance(matched, CommentedBase): + return _trace_cursor(matched, sub_path, server_position) + return matched_was_key, sub_path, matched, content + return None + + +_COMPLETION_HINT_KEY = "___COMPLETE:" +_COMPLETION_HINT_VALUE = "___COMPLETE" + + +def resolve_keyword( + current_parser: Union[DeclarativeInputParser[Any], DispatchingParserBase], + current_plugin: DebputyPluginMetadata, + segments: List[Union[str, int]], + segment_idx: int, + parser_generator: ParserGenerator, + *, + is_completion_attempt: bool = False, +) -> Optional[ + Tuple[ + Union[DeclarativeInputParser[Any], DispatchingParserBase], + DebputyPluginMetadata, + int, + ] +]: + if segment_idx >= len(segments): + return current_parser, current_plugin, segment_idx + current_segment = segments[segment_idx] + if isinstance(current_parser, ListWrappedDeclarativeInputParser): + if isinstance(current_segment, int): + current_parser = current_parser.delegate + segment_idx += 1 + if segment_idx >= len(segments): + return current_parser, current_plugin, segment_idx + current_segment = segments[segment_idx] + + if not isinstance(current_segment, str): + return None + + if is_completion_attempt and current_segment.endswith( + (_COMPLETION_HINT_KEY, _COMPLETION_HINT_VALUE) + ): + return current_parser, current_plugin, segment_idx + + if isinstance(current_parser, InPackageContextParser): + return resolve_keyword( + current_parser.delegate, + current_plugin, + segments, + segment_idx + 1, + parser_generator, + is_completion_attempt=is_completion_attempt, + ) + elif isinstance(current_parser, DispatchingParserBase): + if not current_parser.is_known_keyword(current_segment): + if is_completion_attempt: + return current_parser, current_plugin, segment_idx + return None + subparser = current_parser.parser_for(current_segment) + segment_idx += 1 + if segment_idx < len(segments): + return resolve_keyword( + subparser.parser, + subparser.plugin_metadata, + segments, + segment_idx, + parser_generator, + is_completion_attempt=is_completion_attempt, + ) + return subparser.parser, subparser.plugin_metadata, segment_idx + elif isinstance(current_parser, DeclarativeMappingInputParser): + attr = current_parser.manifest_attributes.get(current_segment) + attr_type = attr.attribute_type if attr is not None else None + if ( + attr_type is not None + and isinstance(attr_type, type) + and issubclass(attr_type, DebputyDispatchableType) + ): + subparser = parser_generator.dispatch_parser_table_for(attr_type) + if subparser is not None and ( + is_completion_attempt or segment_idx + 1 < len(segments) + ): + return resolve_keyword( + subparser, + current_plugin, + segments, + segment_idx + 1, + parser_generator, + is_completion_attempt=is_completion_attempt, + ) + return current_parser, current_plugin, segment_idx + else: + _info(f"Unknown parser: {current_parser.__class__}") + return None + + +def _render_param_doc( + rule_name: str, + declarative_parser: DeclarativeMappingInputParser, + plugin_metadata: DebputyPluginMetadata, + attribute: str, +) -> Optional[str]: + attr = declarative_parser.source_attributes.get(attribute) + if attr is None: + return None + + doc_args, parser_doc = doc_args_for_parser_doc( + rule_name, + declarative_parser, + plugin_metadata, + ) + rendered_docs = render_attribute_doc( + declarative_parser, + declarative_parser.source_attributes, + declarative_parser.input_time_required_parameters, + declarative_parser.at_least_one_of, + parser_doc, + doc_args, + is_interactive=True, + rule_name=rule_name, + ) + + for attributes, rendered_doc in rendered_docs: + if attribute in attributes: + full_doc = [ + f"# Attribute `{attribute}`", + "", + ] + full_doc.extend(rendered_doc) + + return "\n".join(full_doc) + return None + + +DEBPUTY_PLUGIN_METADATA = plugin_metadata_for_debputys_own_plugin() + + +def _guess_rule_name(segments: List[Union[str, int]], idx: int) -> str: + orig_idx = idx + idx -= 1 + while idx >= 0: + segment = segments[idx] + if isinstance(segment, str): + return segment + idx -= 1 + _warn(f"Unable to derive rule name from {segments} [{orig_idx}]") + return "<Bug: unknown rule name>" + + +def _ecsape(v: str) -> str: + return '"' + v.replace("\n", "\\n") + '"' + + +def _insert_snippet(lines: List[str], server_position: Position) -> bool: + _info(f"Complete at {server_position}") + line_no = server_position.line + line = lines[line_no] + pos_rhs = line[server_position.character :] + if pos_rhs and not pos_rhs.isspace(): + _info(f"No insertion: {_ecsape(line[server_position.character:])}") + return False + lhs = line[: server_position.character].strip() + if not lhs: + _info(f"Insertion of key: {_ecsape(line[server_position.character:])}") + # Respect the provided indentation + new_line = line[: server_position.character] + _COMPLETION_HINT_KEY + elif lhs.endswith(":"): + new_line = line[: server_position.character] + _COMPLETION_HINT_VALUE + else: + c = line[server_position.character] + _info(f"Not touching line: {_ecsape(line)} -- {_ecsape(c)}") + return False + _info(f'Evaluating complete on synthetic line: "{new_line}"') + lines[line_no] = new_line + return True + + +@lsp_completer(_LANGUAGE_IDS) +def debputy_manifest_completer( + ls: "LanguageServer", + params: CompletionParams, +) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]: + doc = ls.workspace.get_text_document(params.text_document.uri) + if not is_valid_file(doc.path): + return None + lines = doc.lines + server_position = doc.position_codec.position_from_client_units( + lines, params.position + ) + attribute_root_path = AttributePath.root_path() + added_key = _insert_snippet(lines, server_position) + attempts = 1 if added_key else 2 + content = None + while attempts > 0: + attempts -= 1 + try: + content = MANIFEST_YAML.load("".join(lines)) + break + except MarkedYAMLError as e: + context_line = ( + e.context_mark.line if e.context_mark else e.problem_mark.line + ) + if ( + e.problem_mark.line != server_position.line + and context_line != server_position.line + ): + l_data = ( + lines[e.problem_mark.line].rstrip() + if e.problem_mark.line < len(lines) + else "N/A (OOB)" + ) + _info(f"Parse error on line: {e.problem_mark.line}: {l_data}") + return None + + if attempts > 0: + # Try to make it a key and see if that fixes the problem + new_line = lines[server_position.line].rstrip() + _COMPLETION_HINT_KEY + lines[server_position.line] = new_line + except YAMLError: + break + + if content is None: + context = lines[server_position.line].replace("\n", "\\n") + _info(f"Completion failed: parse error: Line in question: {context}") + return None + + m = _trace_cursor(content, attribute_root_path, server_position) + if m is None: + _info("No match") + return None + matched_key, attr_path, matched, parent = m + _info(f"Matched path: {matched} (path: {attr_path.path}) [{matched_key=}]") + + feature_set = lsp_get_plugin_features() + root_parser = feature_set.manifest_parser_generator.dispatchable_object_parsers[ + OPARSER_MANIFEST_ROOT + ] + segments = list(attr_path.path_segments()) + if added_key: + segments.pop() + km = resolve_keyword( + root_parser, + DEBPUTY_PLUGIN_METADATA, + segments, + 0, + feature_set.manifest_parser_generator, + is_completion_attempt=True, + ) + if km is None: + return None + parser, _, at_depth_idx = km + _info(f"Match leaf parser {at_depth_idx} -- {parser.__class__}") + items = [] + if at_depth_idx + 1 >= len(segments): + if isinstance(parser, DispatchingParserBase): + if matched_key: + items = [ + CompletionItem(f"{k}:") + for k in parser.registered_keywords() + if k not in parent + and not isinstance( + parser.parser_for(k).parser, + DeclarativeValuelessKeywordInputParser, + ) + ] + else: + _info("TODO: Match value") + elif isinstance(parser, InPackageContextParser): + # doc = ls.workspace.get_text_document(params.text_document.uri) + _info(f"TODO: Match package - {parent} -- {matched} -- {matched_key=}") + elif isinstance(parser, DeclarativeMappingInputParser): + print(f"MMM: {matched} - {parent}") + if matched_key: + _info("Match attributes") + locked = set(parent) + for mx in parser.mutually_exclusive_attributes: + if not mx.isdisjoint(parent.keys()): + locked.update(mx) + items = [ + CompletionItem(f"{k}:") + for k in parser.manifest_attributes + if k not in locked + ] + else: + # Value + key = segments[at_depth_idx] if len(segments) > at_depth_idx else None + attr = parser.manifest_attributes.get(key) + if attr is not None: + _info(f"Expand value / key: {key} -- {attr.attribute_type}") + items = _completion_from_attr( + attr, + feature_set.manifest_parser_generator, + matched, + ) + else: + _info( + f"Expand value / key: {key} -- !! {list(parser.manifest_attributes)}" + ) + return items + + +def _completion_from_attr( + attr: AttributeDescription, + pg: ParserGenerator, + matched: Any, +) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]: + orig = get_origin(attr.attribute_type) + valid_values: Sequence[Any] = tuple() + if orig == Literal: + valid_values = get_args(attr.attribute_type) + elif orig == bool or attr.attribute_type == bool: + valid_values = ("true", "false") + elif isinstance(orig, type) and issubclass(orig, DebputyDispatchableType): + parser = pg.dispatch_parser_table_for(orig) + _info(f"M: {parser}") + + if matched in valid_values: + _info(f"Already filled: {matched} is one of {valid_values}") + return None + if valid_values: + return [CompletionItem(x) for x in valid_values] + return None + + +@lsp_hover(_LANGUAGE_IDS) +def debputy_manifest_hover( + ls: "LanguageServer", + params: HoverParams, +) -> Optional[Hover]: + doc = ls.workspace.get_text_document(params.text_document.uri) + if not is_valid_file(doc.path): + return None + lines = doc.lines + position_codec = doc.position_codec + attribute_root_path = AttributePath.root_path() + server_position = position_codec.position_from_client_units(lines, params.position) + + try: + content = MANIFEST_YAML.load("".join(lines)) + except YAMLError: + return None + m = _trace_cursor(content, attribute_root_path, server_position) + if m is None: + _info("No match") + return None + matched_key, attr_path, matched, _ = m + _info(f"Matched path: {matched} (path: {attr_path.path}) [{matched_key=}]") + + feature_set = lsp_get_plugin_features() + parser_generator = feature_set.manifest_parser_generator + root_parser = parser_generator.dispatchable_object_parsers[OPARSER_MANIFEST_ROOT] + segments = list(attr_path.path_segments()) + km = resolve_keyword( + root_parser, + DEBPUTY_PLUGIN_METADATA, + segments, + 0, + parser_generator, + ) + if km is None: + _info("No keyword match") + return + parser, plugin_metadata, at_depth_idx = km + _info(f"Match leaf parser {at_depth_idx}/{len(segments)} -- {parser.__class__}") + hover_doc_text = resolve_hover_text( + feature_set, + parser, + plugin_metadata, + segments, + at_depth_idx, + matched, + matched_key, + ) + return _hover_doc(ls, hover_doc_text) + + +def resolve_hover_text_for_value( + feature_set: PluginProvidedFeatureSet, + parser: DeclarativeMappingInputParser, + plugin_metadata: DebputyPluginMetadata, + segment: Union[str, int], + matched: Any, +) -> Optional[str]: + + hover_doc_text: Optional[str] = None + attr = parser.manifest_attributes.get(segment) + attr_type = attr.attribute_type if attr is not None else None + if attr_type is None: + _info(f"Matched value for {segment} -- No attr or type") + return None + if isinstance(attr_type, type) and issubclass(attr_type, DebputyDispatchableType): + parser_generator = feature_set.manifest_parser_generator + parser = parser_generator.dispatch_parser_table_for(attr_type) + if parser is None or not isinstance(matched, str): + _info( + f"Unknown parser for {segment} or matched is not a str -- {attr_type} {type(matched)=}" + ) + return None + subparser = parser.parser_for(matched) + if subparser is None: + _info(f"Unknown parser for {matched} (subparser)") + return None + hover_doc_text = render_rule( + matched, + subparser.parser, + plugin_metadata, + ) + else: + _info(f"Unknown value: {matched} -- {segment}") + return hover_doc_text + + +def resolve_hover_text( + feature_set: PluginProvidedFeatureSet, + parser: Optional[Union[DeclarativeInputParser[Any], DispatchingParserBase]], + plugin_metadata: DebputyPluginMetadata, + segments: List[Union[str, int]], + at_depth_idx: int, + matched: Any, + matched_key: bool, +) -> Optional[str]: + hover_doc_text: Optional[str] = None + if at_depth_idx == len(segments): + segment = segments[at_depth_idx - 1] + _info(f"Matched {segment} at ==, {matched_key=} ") + hover_doc_text = render_rule( + segment, + parser, + plugin_metadata, + is_root_rule=False, + ) + elif at_depth_idx + 1 == len(segments) and isinstance( + parser, DeclarativeMappingInputParser + ): + segment = segments[at_depth_idx] + _info(f"Matched {segment} at -1, {matched_key=} ") + if isinstance(segment, str): + if not matched_key: + hover_doc_text = resolve_hover_text_for_value( + feature_set, + parser, + plugin_metadata, + segment, + matched, + ) + if matched_key or hover_doc_text is None: + rule_name = _guess_rule_name(segments, at_depth_idx) + hover_doc_text = _render_param_doc( + rule_name, + parser, + plugin_metadata, + segment, + ) + else: + _info(f"No doc: {at_depth_idx=} {len(segments)=}") + + return hover_doc_text + + +def _hover_doc(ls: "LanguageServer", hover_doc_text: Optional[str]) -> Optional[Hover]: + if hover_doc_text is None: + return None + try: + supported_formats = ls.client_capabilities.text_document.hover.content_format + except AttributeError: + supported_formats = [] + markup_kind = MarkupKind.Markdown + if markup_kind not in supported_formats: + markup_kind = MarkupKind.PlainText + return Hover( + contents=MarkupContent( + kind=markup_kind, + value=hover_doc_text, + ), + ) diff --git a/src/debputy/lsp/lsp_dispatch.py b/src/debputy/lsp/lsp_dispatch.py index b7b744c..7a20ae8 100644 --- a/src/debputy/lsp/lsp_dispatch.py +++ b/src/debputy/lsp/lsp_dispatch.py @@ -1,14 +1,15 @@ import asyncio +import os.path from typing import ( Dict, Sequence, Union, Optional, - Any, TypeVar, Callable, Mapping, List, + Tuple, ) from lsprotocol.types import ( @@ -31,7 +32,6 @@ from lsprotocol.types import ( TEXT_DOCUMENT_CODE_ACTION, Command, CodeAction, - TextDocumentCodeActionRequest, CodeActionParams, SemanticTokensRegistrationOptions, ) @@ -51,6 +51,7 @@ _DOCUMENT_VERSION_TABLE: Dict[str, int] = {} try: from pygls.server import LanguageServer + from pygls.workspace import TextDocument DEBPUTY_LANGUAGE_SERVER = LanguageServer("debputy", f"v{__version__}") except ImportError: @@ -72,6 +73,19 @@ def is_doc_at_version(uri: str, version: int) -> bool: return dv == version +def determine_language_id(doc: "TextDocument") -> Tuple[str, str]: + lang_id = doc.language_id + if lang_id and not lang_id.isspace(): + return "declared", lang_id + path = doc.path + try: + last_idx = path.rindex("debian/") + except ValueError: + return "filename", os.path.basename(path) + guess_language_id = path[last_idx:] + return "filename", guess_language_id + + @DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_OPEN) @DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_CHANGE) async def _open_or_changed_document( @@ -83,15 +97,15 @@ async def _open_or_changed_document( doc = ls.workspace.get_text_document(doc_uri) _DOCUMENT_VERSION_TABLE[doc_uri] = version - - handler = DIAGNOSTIC_HANDLERS.get(doc.language_id) + id_source, language_id = determine_language_id(doc) + handler = DIAGNOSTIC_HANDLERS.get(language_id) if handler is None: _info( - f"Opened/Changed document: {doc.path} ({doc.language_id}) - no diagnostics handler" + f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - no diagnostics handler" ) return _info( - f"Opened/Changed document: {doc.path} ({doc.language_id}) - running diagnostics for doc version {version}" + f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - running diagnostics for doc version {version}" ) last_publish_count = -1 @@ -198,14 +212,15 @@ def _dispatch_standard_handler( ) -> R: doc = ls.workspace.get_text_document(doc_uri) - handler = handler_table.get(doc.language_id) + id_source, language_id = determine_language_id(doc) + handler = handler_table.get(language_id) if handler is None: _info( - f"{request_type} for document: {doc.path} ({doc.language_id}) - no handler" + f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - no handler" ) return _info( - f"{request_type} for document: {doc.path} ({doc.language_id}) - delegating to handler" + f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - delegating to handler" ) return handler( diff --git a/src/debputy/lsp/lsp_features.py b/src/debputy/lsp/lsp_features.py index 5b01266..00bed1b 100644 --- a/src/debputy/lsp/lsp_features.py +++ b/src/debputy/lsp/lsp_features.py @@ -11,6 +11,8 @@ from lsprotocol.types import ( SemanticTokensLegend, ) +from debputy.plugin.api.feature_set import PluginProvidedFeatureSet + try: from pygls.server import LanguageServer except ImportError: @@ -30,6 +32,7 @@ SEMANTIC_TOKEN_TYPES_IDS = { t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types) } +LSP_PLUGIN_FEATURE_SET: Optional[PluginProvidedFeatureSet] = None DIAGNOSTIC_HANDLERS = {} COMPLETER_HANDLERS = {} HOVER_HANDLERS = {} @@ -173,6 +176,21 @@ def _register_handler( handler_dict[file_format] = handler +def lsp_set_plugin_features(feature_set: Optional[PluginProvidedFeatureSet]) -> None: + global LSP_PLUGIN_FEATURE_SET + LSP_PLUGIN_FEATURE_SET = feature_set + + +def lsp_get_plugin_features() -> PluginProvidedFeatureSet: + global LSP_PLUGIN_FEATURE_SET + features = LSP_PLUGIN_FEATURE_SET + if features is None: + raise RuntimeError( + "Initialization error: The plugin feature set has not been initialized before it was needed." + ) + return features + + def ensure_lsp_features_are_loaded() -> None: # FIXME: This import is needed to force loading of the LSP files. But it only works # for files with a linter (which currently happens to be all of them, but this is diff --git a/src/debputy/manifest_parser/declarative_parser.py b/src/debputy/manifest_parser/declarative_parser.py index bb901fc..f18dc1c 100644 --- a/src/debputy/manifest_parser/declarative_parser.py +++ b/src/debputy/manifest_parser/declarative_parser.py @@ -25,6 +25,7 @@ from typing import ( Iterable, Literal, Sequence, + Container, ) from debputy.manifest_parser.base_types import ( @@ -49,6 +50,12 @@ from debputy.plugin.api.impl_types import ( TD, _ALL_PACKAGE_TYPES, resolve_package_type_selectors, + ListWrappedDeclarativeInputParser, + DispatchingObjectParser, + DispatchingTableParser, + TTP, + TP, + InPackageContextParser, ) from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector from debputy.util import _info, _warn, assume_not_none @@ -279,8 +286,6 @@ class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]) _per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None inline_reference_documentation: Optional[ParserDocumentation] = None path_hint_source_attributes: Sequence[str] = tuple() - # TODO: List-wrapping should probably be its own parser that delegetes to subparsers - is_list_wrapped: bool = False def _parse_alt_form( self, @@ -410,64 +415,29 @@ class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF]) return f" (Documentation: {doc_url})" return "" - def _parse_input( + def parse_input( self, value: object, path: AttributePath, *, parser_context: Optional["ParserContextData"] = None, - is_list_wrapped: bool, ) -> TD: if value is None: - if is_list_wrapped: - form_note = " The attribute must be a list of mappings" - else: - form_note = " The attribute must be a mapping." - if self.alt_form_parser is not None: - form_note = ( - " The attribute can be a mapping or a non-mapping format" - ' (usually, "non-mapping format" means a string or a list of strings).' - ) + form_note = " The attribute must be a mapping." + if self.alt_form_parser is not None: + form_note = ( + " The attribute can be a mapping or a non-mapping format" + ' (usually, "non-mapping format" means a string or a list of strings).' + ) doc_ref = self._doc_url_error_suffix(see_url_version=True) raise ManifestParseException( f"The attribute {path.path} was missing a value. {form_note}{doc_ref}" ) - if is_list_wrapped: - if not isinstance(value, list) or not value: - doc_ref = self._doc_url_error_suffix(see_url_version=True) - raise ManifestParseException( - f"The attribute {path.path} must be a non-empty list.{doc_ref}" - ) - result = [] - for idx, element in enumerate(value): - element_path = path[idx] - result.append( - self._parse_input( - element, - element_path, - parser_context=parser_context, - is_list_wrapped=False, - ) - ) - return result + if not isinstance(value, dict): return self._parse_alt_form(value, path, parser_context=parser_context) return self._parse_typed_dict_form(value, path, parser_context=parser_context) - def parse_input( - self, - value: object, - path: AttributePath, - *, - parser_context: Optional["ParserContextData"] = None, - ) -> TD: - return self._parse_input( - value, - path, - parser_context=parser_context, - is_list_wrapped=self.is_list_wrapped, - ) - def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]: conflicts = self._per_attribute_conflicts_cache if conflicts is not None: @@ -496,7 +466,7 @@ class DebputyParseHint: >>> class TargetType(TypedDict): ... sources: List[str] >>> pg = ParserGenerator() - >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType) + >>> parser = pg.generate_parser(TargetType, source_content=SourceType) In this example, the user can provide either `source` or `sources` and the parser will map them to the `sources` attribute in the `TargetType`. Note this example relies on @@ -545,7 +515,7 @@ class DebputyParseHint: ... into_dir: NotRequired[str] ... renamed_to: NotRequired[str] >>> pg = ParserGenerator() - >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType) + >>> parser = pg.generate_parser(TargetType, source_content=SourceType) In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for @@ -738,6 +708,11 @@ def _is_path_attribute_candidate( class ParserGenerator: def __init__(self) -> None: self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {} + self._object_parsers: Dict[str, DispatchingObjectParser] = {} + self._table_parsers: Dict[ + Type[DebputyDispatchableType], DispatchingTableParser[Any] + ] = {} + self._in_package_context_parser: Dict[str, Any] = {} def register_mapped_type(self, mapped_type: TypeMapping) -> None: existing = self._registered_types.get(mapped_type.target_type) @@ -748,7 +723,49 @@ class ParserGenerator: def discard_mapped_type(self, mapped_type: Type[T]) -> None: del self._registered_types[mapped_type] - def parser_from_typed_dict( + def add_table_parser(self, rt: Type[DebputyDispatchableType], path: str) -> None: + assert rt not in self._table_parsers + self._table_parsers[rt] = DispatchingTableParser(rt, path) + + def add_object_parser( + self, + path: str, + *, + parser_documentation: Optional[ParserDocumentation] = None, + ) -> None: + assert path not in self._in_package_context_parser + assert path not in self._object_parsers + self._object_parsers[path] = DispatchingObjectParser( + path, parser_documentation=parser_documentation + ) + + def add_in_package_context_parser( + self, + path: str, + delegate: DeclarativeInputParser[Any], + ) -> None: + assert path not in self._in_package_context_parser + assert path not in self._object_parsers + self._in_package_context_parser[path] = InPackageContextParser(path, delegate) + + @property + def dispatchable_table_parsers( + self, + ) -> Mapping[Type[DebputyDispatchableType], DispatchingTableParser[Any]]: + return self._table_parsers + + @property + def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]: + return self._object_parsers + + def dispatch_parser_table_for( + self, rule_type: TTP + ) -> Optional[DispatchingTableParser[TP]]: + return cast( + "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type) + ) + + def generate_parser( self, parsed_content: Type[TD], *, @@ -768,7 +785,7 @@ class ParserGenerator: ... sources: List[str] ... into: List[str] >>> pg = ParserGenerator() - >>> simple_parser = pg.parser_from_typed_dict(InstallDocsRule) + >>> simple_parser = pg.generate_parser(InstallDocsRule) This will create a parser that would be able to interpret something like: @@ -789,7 +806,7 @@ class ParserGenerator: ... sources: NotRequired[List[str]] ... into: Union[str, List[str]] >>> pg = ParserGenerator() - >>> flexible_parser = pg.parser_from_typed_dict( + >>> flexible_parser = pg.generate_parser( ... InstallDocsRule, ... source_content=InputDocsRuleInputFormat, ... ) @@ -826,7 +843,7 @@ class ParserGenerator: ... List[str], ... ] >>> pg = ParserGenerator() - >>> flexible_parser = pg.parser_from_typed_dict( + >>> flexible_parser = pg.generate_parser( ... DiscardRule, ... source_content=DiscardRuleInputWithAltFormat, ... ) @@ -894,10 +911,28 @@ class ParserGenerator: if get_origin(orig_parsed_content) == list: parsed_content = get_args(orig_parsed_content)[0] is_list_wrapped = True + + if isinstance(parsed_content, type) and issubclass( + parsed_content, DebputyDispatchableType + ): + parser = self.dispatch_parser_table_for(parsed_content) + if parser is None: + raise ValueError( + f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." + f" The class {parsed_content.__qualname__} is not a pre-registered type." + ) + # FIXME: Only the list wrapped version has documentation. + if is_list_wrapped: + parser = ListWrappedDeclarativeInputParser( + parser, + inline_reference_documentation=inline_reference_documentation, + ) + return parser + if not is_typeddict(parsed_content): raise ValueError( f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}." - ' Only "TypedDict"-based types supported.' + ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.' ) if is_list_wrapped: if get_origin(source_content) != list: @@ -1060,17 +1095,12 @@ class ParserGenerator: parsed_alt_form is not None, ) if non_mapping_source_only: - if is_list_wrapped: - raise ValueError( - f"Unsupported case: {non_mapping_source_only=} + {is_list_wrapped=}" - " (TODO: Look whether it is feasible)" - ) - return DeclarativeNonMappingInputParser( + parser = DeclarativeNonMappingInputParser( assume_not_none(parsed_alt_form), inline_reference_documentation=inline_reference_documentation, ) else: - return DeclarativeMappingInputParser( + parser = DeclarativeMappingInputParser( _as_attr_names(source_typed_dict.__required_keys__), _as_attr_names(all_parameters), manifest_attributes, @@ -1080,8 +1110,10 @@ class ParserGenerator: at_least_one_of=at_least_one_of, inline_reference_documentation=inline_reference_documentation, path_hint_source_attributes=tuple(path_hint_source_attributes), - is_list_wrapped=is_list_wrapped, ) + if is_list_wrapped: + parser = ListWrappedDeclarativeInputParser(parser) + return parser def _as_type_validator( self, @@ -1192,7 +1224,7 @@ class ParserGenerator: mapper=type_normalizer, ).combine_mapper(list_mapper) if is_typeddict(provided_type): - subparser = self.parser_from_typed_dict(cast("Type[TD]", provided_type)) + subparser = self.generate_parser(cast("Type[TD]", provided_type)) return AttributeTypeHandler( description=f"{provided_type.__name__} (Typed Mapping)", ensure_type=lambda v, ap: None, @@ -1430,7 +1462,7 @@ class ParserGenerator: ) parsed_annotations = DetectedDebputyParseHint.parse_annotations( anno, - f" The alternative for source_format.", + " The alternative for source_format.", None, False, default_target_attribute=default_target_attribute, @@ -1578,7 +1610,7 @@ class ParserGenerator: if target_orig == list and target_args: mapped = self._registered_types.get(target_args[0]) if mapped is not None: - # mypy is dense and forgots `mapped` cannot be optional in the comprehensions. + # mypy is dense and forgot `mapped` cannot be optional in the comprehensions. mapped_type: TypeMapping = mapped if input_type == mapped.source_type: # Source -> List[Target] @@ -1824,7 +1856,7 @@ def _dispatch_parse_generator( ): assert parser_context is not None dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type) - return dispatching_parser.parse( + return dispatching_parser.parse_input( value, attribute_path, parser_context=parser_context ) diff --git a/src/debputy/manifest_parser/parser_doc.py b/src/debputy/manifest_parser/parser_doc.py new file mode 100644 index 0000000..7046d7b --- /dev/null +++ b/src/debputy/manifest_parser/parser_doc.py @@ -0,0 +1,273 @@ +import itertools +from typing import Optional, Iterable, Any, Tuple, Mapping, Sequence, FrozenSet + +from debputy import DEBPUTY_DOC_ROOT_DIR +from debputy.manifest_parser.declarative_parser import ( + DeclarativeMappingInputParser, + DeclarativeNonMappingInputParser, + AttributeDescription, +) +from debputy.plugin.api.impl_types import ( + DebputyPluginMetadata, + DeclarativeInputParser, + DispatchingObjectParser, + ListWrappedDeclarativeInputParser, + InPackageContextParser, +) +from debputy.plugin.api.spec import ( + ParserDocumentation, + reference_documentation, + undocumented_attr, +) +from debputy.util import assume_not_none + + +def _provide_placeholder_parser_doc( + parser_doc: Optional[ParserDocumentation], + attributes: Iterable[str], +) -> ParserDocumentation: + if parser_doc is None: + parser_doc = reference_documentation() + changes = {} + if parser_doc.attribute_doc is None: + changes["attribute_doc"] = [undocumented_attr(attr) for attr in attributes] + + if changes: + return parser_doc.replace(**changes) + return parser_doc + + +def doc_args_for_parser_doc( + rule_name: str, + declarative_parser: DeclarativeInputParser[Any], + plugin_metadata: DebputyPluginMetadata, +) -> Tuple[Mapping[str, str], ParserDocumentation]: + attributes: Iterable[str] + if isinstance(declarative_parser, DeclarativeMappingInputParser): + attributes = declarative_parser.source_attributes.keys() + else: + attributes = [] + doc_args = { + "RULE_NAME": rule_name, + "MANIFEST_FORMAT_DOC": f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md", + "PLUGIN_NAME": plugin_metadata.plugin_name, + } + parser_doc = _provide_placeholder_parser_doc( + declarative_parser.inline_reference_documentation, + attributes, + ) + return doc_args, parser_doc + + +def render_attribute_doc( + parser: Any, + attributes: Mapping[str, "AttributeDescription"], + required_attributes: FrozenSet[str], + conditionally_required_attributes: FrozenSet[FrozenSet[str]], + parser_doc: ParserDocumentation, + doc_args: Mapping[str, str], + *, + rule_name: str = "<unset>", + is_root_rule: bool = False, + is_interactive: bool = False, +) -> Iterable[Tuple[FrozenSet[str], Sequence[str]]]: + provided_attribute_docs = ( + parser_doc.attribute_doc if parser_doc.attribute_doc is not None else [] + ) + + for attr_doc in assume_not_none(provided_attribute_docs): + attr_description = attr_doc.description + rendered_doc = [] + + for parameter in sorted(attr_doc.attributes): + parameter_details = attributes.get(parameter) + if parameter_details is not None: + source_name = parameter_details.source_attribute_name + describe_type = parameter_details.type_validator.describe_type() + else: + assert isinstance(parser, DispatchingObjectParser) + source_name = parameter + subparser = parser.parser_for(source_name).parser + if isinstance(subparser, InPackageContextParser): + if is_interactive: + describe_type = "PackageContext" + else: + rule_prefix = rule_name if not is_root_rule else "" + describe_type = f"PackageContext (chains to `{rule_prefix}::{subparser.manifest_attribute_path_template}`)" + + elif isinstance(subparser, DispatchingObjectParser): + if is_interactive: + describe_type = "Object" + else: + rule_prefix = rule_name if not is_root_rule else "" + describe_type = f"Object (see `{rule_prefix}::{subparser.manifest_attribute_path_template}`)" + elif isinstance(subparser, DeclarativeMappingInputParser): + describe_type = "<Type definition not implemented yet>" # TODO: Derive from subparser + elif isinstance(subparser, DeclarativeNonMappingInputParser): + describe_type = ( + subparser.alt_form_parser.type_validator.describe_type() + ) + else: + describe_type = f"<Unknown: Non-introspectable subparser - {subparser.__class__.__name__}>" + + if source_name in required_attributes: + req_str = "required" + elif any(source_name in s for s in conditionally_required_attributes): + req_str = "conditional" + else: + req_str = "optional" + rendered_doc.append(f"`{source_name}` ({req_str}): {describe_type}") + + if attr_description: + rendered_doc.append("") + rendered_doc.extend( + line + for line in attr_description.format(**doc_args).splitlines( + keepends=False + ) + ) + rendered_doc.append("") + yield attr_doc.attributes, rendered_doc + + +def render_rule( + rule_name: str, + declarative_parser: DeclarativeInputParser[Any], + plugin_metadata: DebputyPluginMetadata, + *, + is_root_rule: bool = False, +) -> str: + doc_args, parser_doc = doc_args_for_parser_doc( + "the manifest root" if is_root_rule else rule_name, + declarative_parser, + plugin_metadata, + ) + t = assume_not_none(parser_doc.title).format(**doc_args) + r = [ + t, + "=" * len(t), + "", + assume_not_none(parser_doc.description).format(**doc_args).rstrip(), + "", + ] + + alt_form_parser = getattr(declarative_parser, "alt_form_parser", None) + is_list_wrapped = False + unwrapped_parser = declarative_parser + if isinstance(declarative_parser, ListWrappedDeclarativeInputParser): + is_list_wrapped = True + unwrapped_parser = declarative_parser.delegate + + if isinstance( + unwrapped_parser, (DeclarativeMappingInputParser, DispatchingObjectParser) + ): + + if isinstance(unwrapped_parser, DeclarativeMappingInputParser): + attributes = unwrapped_parser.source_attributes + required = unwrapped_parser.input_time_required_parameters + conditionally_required = unwrapped_parser.at_least_one_of + mutually_exclusive = unwrapped_parser.mutually_exclusive_attributes + else: + attributes = {} + required = frozenset() + conditionally_required = frozenset() + mutually_exclusive = frozenset() + if is_list_wrapped: + r.append("List where each element has the following attributes:") + else: + r.append("Attributes:") + + rendered_attr_doc = render_attribute_doc( + unwrapped_parser, + attributes, + required, + conditionally_required, + parser_doc, + doc_args, + is_root_rule=is_root_rule, + rule_name=rule_name, + is_interactive=False, + ) + for _, rendered_doc in rendered_attr_doc: + prefix = " - " + for line in rendered_doc: + if line: + r.append(f"{prefix}{line}") + else: + r.append("") + prefix = " " + + if ( + bool(conditionally_required) + or bool(mutually_exclusive) + or any(pd.conflicting_attributes for pd in attributes.values()) + ): + r.append("") + if is_list_wrapped: + r.append( + "This rule enforces the following restrictions on each element in the list:" + ) + else: + r.append("This rule enforces the following restrictions:") + + if conditionally_required or mutually_exclusive: + all_groups = set( + itertools.chain(conditionally_required, mutually_exclusive) + ) + for g in all_groups: + anames = "`, `".join(g) + is_mx = g in mutually_exclusive + is_cr = g in conditionally_required + if is_mx and is_cr: + r.append(f" - The rule must use exactly one of: `{anames}`") + elif is_cr: + r.append(f" - The rule must use at least one of: `{anames}`") + else: + assert is_mx + r.append( + f" - The following attributes are mutually exclusive: `{anames}`" + ) + + if mutually_exclusive or any( + pd.conflicting_attributes for pd in attributes.values() + ): + for parameter, parameter_details in sorted(attributes.items()): + source_name = parameter_details.source_attribute_name + conflicts = set(parameter_details.conflicting_attributes) + for mx in mutually_exclusive: + if parameter in mx and mx not in conditionally_required: + conflicts |= mx + if conflicts: + conflicts.discard(parameter) + cnames = "`, `".join( + attributes[a].source_attribute_name for a in conflicts + ) + r.append( + f" - The attribute `{source_name}` cannot be used with any of: `{cnames}`" + ) + r.append("") + if alt_form_parser is not None: + # FIXME: Mapping[str, Any] ends here, which is ironic given the headline. + r.append( + f"Non-mapping format: {alt_form_parser.type_validator.describe_type()}" + ) + alt_parser_desc = parser_doc.alt_parser_description + if alt_parser_desc: + r.extend( + f" {line}" + for line in alt_parser_desc.format(**doc_args).splitlines( + keepends=False + ) + ) + r.append("") + + if declarative_parser.reference_documentation_url is not None: + r.append( + f"Reference documentation: {declarative_parser.reference_documentation_url}" + ) + else: + r.append( + "Reference documentation: No reference documentation link provided by the plugin" + ) + + return "\n".join(r) diff --git a/src/debputy/manifest_parser/util.py b/src/debputy/manifest_parser/util.py index 1600a90..ad214e2 100644 --- a/src/debputy/manifest_parser/util.py +++ b/src/debputy/manifest_parser/util.py @@ -13,6 +13,7 @@ from typing import ( Type, TypeVar, TYPE_CHECKING, + Iterable, ) if TYPE_CHECKING: @@ -61,6 +62,11 @@ class AttributePath(object): p.path_hint = path_hint return p + def path_segments(self) -> Iterable[Union[str, int]]: + segments = list(self._iter_path()) + segments.reverse() + yield from (s.name for s in segments) + @property def path(self) -> str: segments = list(self._iter_path()) diff --git a/src/debputy/plugin/api/feature_set.py b/src/debputy/plugin/api/feature_set.py index 6552361..a56f37b 100644 --- a/src/debputy/plugin/api/feature_set.py +++ b/src/debputy/plugin/api/feature_set.py @@ -1,7 +1,10 @@ import dataclasses +import textwrap from typing import Dict, List, Tuple, Sequence, Any +from debputy import DEBPUTY_DOC_ROOT_DIR from debputy.manifest_parser.declarative_parser import ParserGenerator +from debputy.plugin.api import reference_documentation from debputy.plugin.api.impl_types import ( DebputyPluginMetadata, PackagerProvidedFileClassSpec, @@ -18,9 +21,23 @@ from debputy.plugin.api.impl_types import ( ServiceManagerDetails, PluginProvidedKnownPackagingFile, PluginProvidedTypeMapping, + OPARSER_PACKAGES, + OPARSER_PACKAGES_ROOT, ) +def _initialize_parser_generator() -> ParserGenerator: + pg = ParserGenerator() + + for path, ref_doc in SUPPORTED_DISPATCHABLE_OBJECT_PARSERS.items(): + pg.add_object_parser(path, parser_documentation=ref_doc) + + for rt, path in SUPPORTED_DISPATCHABLE_TABLE_PARSERS.items(): + pg.add_table_parser(rt, path) + + return pg + + @dataclasses.dataclass(slots=True) class PluginProvidedFeatureSet: plugin_data: Dict[str, DebputyPluginMetadata] = dataclasses.field( @@ -32,22 +49,6 @@ class PluginProvidedFeatureSet: metadata_maintscript_detectors: Dict[str, List[MetadataOrMaintscriptDetector]] = ( dataclasses.field(default_factory=dict) ) - dispatchable_table_parsers: Dict[TTP, "DispatchingTableParser[TP]"] = ( - dataclasses.field( - default_factory=lambda: { - rt: DispatchingTableParser(rt, path) - for rt, path in SUPPORTED_DISPATCHABLE_TABLE_PARSERS.items() - } - ) - ) - dispatchable_object_parsers: Dict[str, "DispatchingObjectParser"] = ( - dataclasses.field( - default_factory=lambda: { - path: DispatchingObjectParser(path, parser_documentation=ref_doc) - for path, ref_doc in SUPPORTED_DISPATCHABLE_OBJECT_PARSERS.items() - } - ) - ) manifest_variables: Dict[str, PluginProvidedManifestVariable] = dataclasses.field( default_factory=dict ) @@ -67,7 +68,7 @@ class PluginProvidedFeatureSet: default_factory=dict ) manifest_parser_generator: ParserGenerator = dataclasses.field( - default_factory=ParserGenerator + default_factory=_initialize_parser_generator ) def package_processors_in_order(self) -> Sequence[PluginProvidedPackageProcessor]: diff --git a/src/debputy/plugin/api/impl.py b/src/debputy/plugin/api/impl.py index 3c9da60..64a1ca8 100644 --- a/src/debputy/plugin/api/impl.py +++ b/src/debputy/plugin/api/impl.py @@ -762,14 +762,15 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer): inline_reference_documentation: Optional[ParserDocumentation] = None, ) -> None: self._restricted_api() - if rule_type not in self._feature_set.dispatchable_table_parsers: + parser_generator = self._feature_set.manifest_parser_generator + if rule_type not in parser_generator.dispatchable_table_parsers: types = ", ".join( - sorted(x.__name__ for x in self._feature_set.dispatchable_table_parsers) + sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers) ) raise ValueError( f"The rule_type was not a supported type. It must be one of {types}" ) - dispatching_parser = self._feature_set.dispatchable_table_parsers[rule_type] + dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] dispatching_parser.register_keyword( rule_name, handler, @@ -796,11 +797,14 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer): None, ] ] = None, + nested_in_package_context: bool = False, ) -> None: self._restricted_api() if object_parser_key is None: object_parser_key = rule_name - dispatchable_object_parsers = self._feature_set.dispatchable_object_parsers + + parser_generator = self._feature_set.manifest_parser_generator + dispatchable_object_parsers = parser_generator.dispatchable_object_parsers if rule_type not in dispatchable_object_parsers: types = ", ".join(sorted(dispatchable_object_parsers)) raise ValueError( @@ -818,6 +822,7 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer): child_dispatcher, self._plugin_metadata, on_end_parse_step=on_end_parse_step, + nested_in_package_context=nested_in_package_context, ) def _unload() -> None: @@ -839,24 +844,27 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer): ) -> None: self._restricted_api() feature_set = self._feature_set + parser_generator = feature_set.manifest_parser_generator if isinstance(rule_type, str): - if rule_type not in feature_set.dispatchable_object_parsers: - types = ", ".join(sorted(feature_set.dispatchable_object_parsers)) + if rule_type not in parser_generator.dispatchable_object_parsers: + types = ", ".join(sorted(parser_generator.dispatchable_object_parsers)) raise ValueError( f"The rule_type was not a supported type. It must be one of {types}" ) - dispatching_parser = feature_set.dispatchable_object_parsers[rule_type] + dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type] else: - if rule_type not in feature_set.dispatchable_table_parsers: + if rule_type not in parser_generator.dispatchable_table_parsers: types = ", ".join( - sorted(x.__name__ for x in feature_set.dispatchable_table_parsers) + sorted( + x.__name__ for x in parser_generator.dispatchable_table_parsers + ) ) raise ValueError( f"The rule_type was not a supported type. It must be one of {types}" ) - dispatching_parser = feature_set.dispatchable_table_parsers[rule_type] + dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type] - parser = feature_set.manifest_parser_generator.parser_from_typed_dict( + parser = feature_set.manifest_parser_generator.generate_parser( parsed_format, source_content=source_format, inline_reference_documentation=inline_reference_documentation, diff --git a/src/debputy/plugin/api/impl_types.py b/src/debputy/plugin/api/impl_types.py index 76579fb..5aca980 100644 --- a/src/debputy/plugin/api/impl_types.py +++ b/src/debputy/plugin/api/impl_types.py @@ -291,6 +291,63 @@ class DeclarativeInputParser(Generic[TD]): raise NotImplementedError +class DelegatingDeclarativeInputParser(DeclarativeInputParser[TD]): + __slots__ = ("delegate", "_reference_documentation") + + def __init__( + self, + delegate: DeclarativeInputParser[TD], + *, + inline_reference_documentation: Optional[ParserDocumentation] = None, + ) -> None: + self.delegate = delegate + self._reference_documentation = inline_reference_documentation + + @property + def inline_reference_documentation(self) -> Optional[ParserDocumentation]: + doc = self._reference_documentation + if doc is None: + return self.delegate.inline_reference_documentation + return doc + + +class ListWrappedDeclarativeInputParser(DelegatingDeclarativeInputParser[TD]): + __slots__ = () + + def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str: + doc_url = self.reference_documentation_url + if doc_url is not None: + if see_url_version: + return f" Please see {doc_url} for the documentation." + return f" (Documentation: {doc_url})" + return "" + + def parse_input( + self, + value: object, + path: "AttributePath", + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TD: + if not isinstance(value, list): + doc_ref = self._doc_url_error_suffix(see_url_version=True) + raise ManifestParseException( + f"The attribute {path.path} must be a list.{doc_ref}" + ) + result = [] + delegate = self.delegate + for idx, element in enumerate(value): + element_path = path[idx] + result.append( + delegate.parse_input( + element, + element_path, + parser_context=parser_context, + ) + ) + return result + + class DispatchingParserBase(Generic[TP]): def __init__(self, manifest_attribute_path_template: str) -> None: self.manifest_attribute_path_template = manifest_attribute_path_template @@ -385,7 +442,7 @@ class DispatchingParserBase(Generic[TP]): def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None: self._parsers[keyword] = ppp - def parse( + def parse_input( self, orig_value: object, attribute_path: "AttributePath", @@ -447,6 +504,7 @@ class DispatchingObjectParser( None, ] ] = None, + nested_in_package_context: bool = False, ) -> None: def _handler( name: str, @@ -457,6 +515,12 @@ class DispatchingObjectParser( on_end_parse_step(name, value, path, parser_context) return value + if nested_in_package_context: + parser = InPackageContextParser( + keyword, + parser, + ) + p = PluginProvidedParser( parser, _handler, @@ -464,18 +528,8 @@ class DispatchingObjectParser( ) self._add_parser(keyword, p) - # FIXME: Agree on naming (parse vs. parse_input) def parse_input( self, - value: object, - path: "AttributePath", - *, - parser_context: Optional["ParserContextData"] = None, - ) -> TD: - return self.parse(value, path, parser_context=parser_context) - - def parse( - self, orig_value: object, attribute_path: "AttributePath", *, @@ -534,12 +588,80 @@ class DispatchingObjectParser( return result -class DispatchingTableParser(DispatchingParserBase[TP]): +@dataclasses.dataclass(slots=True, frozen=True) +class PackageContextData(Generic[TP]): + resolved_package_name: str + value: TP + + +class InPackageContextParser( + DelegatingDeclarativeInputParser[Mapping[str, PackageContextData[TP]]] +): + def __init__( + self, + manifest_attribute_path_template: str, + delegate: DeclarativeInputParser[TP], + *, + parser_documentation: Optional[ParserDocumentation] = None, + ) -> None: + self.manifest_attribute_path_template = manifest_attribute_path_template + self._attribute_documentation: List[ParserAttributeDocumentation] = [] + super().__init__(delegate, inline_reference_documentation=parser_documentation) + + def parse_input( + self, + orig_value: object, + attribute_path: "AttributePath", + *, + parser_context: Optional["ParserContextData"] = None, + ) -> TP: + assert parser_context is not None + doc_ref = "" + if self.reference_documentation_url is not None: + doc_ref = ( + f" Please see {self.reference_documentation_url} for the documentation." + ) + if not isinstance(orig_value, dict) or not orig_value: + raise ManifestParseException( + f"The attribute {attribute_path.path} must be a non-empty mapping.{doc_ref}" + ) + delegate = self.delegate + result = {} + for package_name_raw, value in orig_value.items(): + + definition_source = attribute_path[package_name_raw] + package_name = package_name_raw + if "{{" in package_name: + package_name = parser_context.substitution.substitute( + package_name_raw, + definition_source.path, + ) + package_state: PackageTransformationDefinition + with parser_context.binary_package_context(package_name) as package_state: + if package_state.is_auto_generated_package: + # Maybe lift (part) of this restriction. + raise ManifestParseException( + f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an' + " auto-generated package." + ) + parsed_value = delegate.parse_input( + value, definition_source, parser_context=parser_context + ) + result[package_name_raw] = PackageContextData( + package_name, parsed_value + ) + return result + + +class DispatchingTableParser( + DispatchingParserBase[TP], + DeclarativeInputParser[TP], +): def __init__(self, base_type: TTP, manifest_attribute_path_template: str) -> None: super().__init__(manifest_attribute_path_template) self.base_type = base_type - def parse( + def parse_input( self, orig_value: object, attribute_path: "AttributePath", @@ -608,6 +730,7 @@ SUPPORTED_DISPATCHABLE_TABLE_PARSERS = { } OPARSER_MANIFEST_ROOT = "<ROOT>" +OPARSER_PACKAGES_ROOT = "packages" OPARSER_PACKAGES = "packages.{{PACKAGE}}" OPARSER_MANIFEST_DEFINITIONS = "definitions" diff --git a/src/debputy/plugin/api/plugin_parser.py b/src/debputy/plugin/api/plugin_parser.py index ad2489f..dd5c0d0 100644 --- a/src/debputy/plugin/api/plugin_parser.py +++ b/src/debputy/plugin/api/plugin_parser.py @@ -52,15 +52,15 @@ def _initialize_plugin_metadata_parser_generator() -> ParserGenerator: PLUGIN_METADATA_PARSER_GENERATOR = _initialize_plugin_metadata_parser_generator() -PLUGIN_METADATA_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict( +PLUGIN_METADATA_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser( PluginJsonMetadata ) -PLUGIN_PPF_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict( +PLUGIN_PPF_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser( PackagerProvidedFileJsonDescription ) -PLUGIN_MANIFEST_VARS_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict( +PLUGIN_MANIFEST_VARS_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser( ManifestVariableJsonDescription ) -PLUGIN_KNOWN_PACKAGING_FILES_PARSER = ( - PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict(KnownPackagingFileInfo) +PLUGIN_KNOWN_PACKAGING_FILES_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser( + KnownPackagingFileInfo ) diff --git a/src/debputy/plugin/debputy/binary_package_rules.py b/src/debputy/plugin/debputy/binary_package_rules.py index 4753c79..14d9b91 100644 --- a/src/debputy/plugin/debputy/binary_package_rules.py +++ b/src/debputy/plugin/debputy/binary_package_rules.py @@ -109,11 +109,10 @@ def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None api.pluggable_manifest_rule( OPARSER_PACKAGES, "transformations", - ListOfTransformationRulesFormat, + List[TransformationRule], _unpack_list, - source_format=List[TransformationRule], inline_reference_documentation=reference_documentation( - title="Transformations (`packages.{{PACKAGE}}.transformations`)", + title="Transformations (`transformations`)", description=textwrap.dedent( """\ You can define a `transformations` under the package definition, which is a list a transformation @@ -140,16 +139,15 @@ def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None overlap or conflict. """ ), - reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md#transformations-packagespackagetransformations", + reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md#transformations-transformations", ), ) api.pluggable_manifest_rule( OPARSER_PACKAGES, "conffile-management", - ListOfDpkgMaintscriptHelperCommandFormat, + List[DpkgMaintscriptHelperCommand], _unpack_list, - source_format=List[DpkgMaintscriptHelperCommand], ) api.pluggable_manifest_rule( @@ -520,11 +518,11 @@ def _process_service_rules( def _unpack_list( _name: str, - parsed_data: ListParsedFormat, + parsed_data: List[Any], _attribute_path: AttributePath, _parser_context: ParserContextData, ) -> List[Any]: - return parsed_data["elements"] + return parsed_data class CleanAfterRemovalRuleSourceFormat(TypedDict): @@ -544,7 +542,7 @@ class CleanAfterRemovalRule(DebputyParsedContent): # FIXME: Not optimal that we are doing an initialization of ParserGenerator here. But the rule is not depending on any # complex types that is registered by plugins, so it will work for now. -_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().parser_from_typed_dict( +_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().generate_parser( CleanAfterRemovalRule, source_content=Union[CleanAfterRemovalRuleSourceFormat, str, List[str]], inline_reference_documentation=reference_documentation( diff --git a/src/debputy/plugin/debputy/manifest_root_rules.py b/src/debputy/plugin/debputy/manifest_root_rules.py index 86a1c27..ca8cf1e 100644 --- a/src/debputy/plugin/debputy/manifest_root_rules.py +++ b/src/debputy/plugin/debputy/manifest_root_rules.py @@ -108,9 +108,8 @@ def register_manifest_root_rules(api: DebputyPluginInitializerProvider) -> None: api.pluggable_manifest_rule( OPARSER_MANIFEST_ROOT, MK_INSTALLATIONS, - ListOfInstallRulesFormat, + List[InstallRule], _handle_installation_rules, - source_format=List[InstallRule], inline_reference_documentation=reference_documentation( title="Installations", description=textwrap.dedent( @@ -156,15 +155,12 @@ def register_manifest_root_rules(api: DebputyPluginInitializerProvider) -> None: ), ), ) - api.pluggable_manifest_rule( + api.pluggable_object_parser( OPARSER_MANIFEST_ROOT, MK_PACKAGES, - DictFormat, - _handle_opaque_dict, - source_format=Dict[str, Any], - inline_reference_documentation=SUPPORTED_DISPATCHABLE_OBJECT_PARSERS[ - OPARSER_PACKAGES - ], + object_parser_key=OPARSER_PACKAGES, + on_end_parse_step=lambda _a, _b, _c, mp: mp._ensure_package_states_is_initialized(), + nested_in_package_context=True, ) @@ -238,11 +234,11 @@ def _handle_manifest_variables( def _handle_installation_rules( _name: str, - parsed_data: ListOfInstallRulesFormat, + parsed_data: List[InstallRule], _attribute_path: AttributePath, _parser_context: ParserContextData, ) -> List[Any]: - return parsed_data["elements"] + return parsed_data def _handle_opaque_dict( diff --git a/src/debputy/plugin/debputy/private_api.py b/src/debputy/plugin/debputy/private_api.py index b9aa043..3b5087b 100644 --- a/src/debputy/plugin/debputy/private_api.py +++ b/src/debputy/plugin/debputy/private_api.py @@ -921,7 +921,7 @@ def register_install_rules(api: DebputyPluginInitializerProvider) -> None: as-is. When a directory is matched, then the directory is installed along with all the contents that have not already been installed somewhere. - - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a + - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a directory for `install-examples` will give you an `examples/examples` directory in the package, which is rarely what you want. Often, you can solve this by using `examples/*` instead. Similar for `install-docs` @@ -1023,7 +1023,7 @@ def register_install_rules(api: DebputyPluginInitializerProvider) -> None: as-is. When a directory is matched, then the directory is installed along with all the contents that have not already been installed somewhere. - - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a + - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a directory for `install-examples` will give you an `examples/examples` directory in the package, which is rarely what you want. Often, you can solve this by using `examples/*` instead. Similar for `install-docs` diff --git a/src/debputy/yaml/__init__.py b/src/debputy/yaml/__init__.py new file mode 100644 index 0000000..325dff5 --- /dev/null +++ b/src/debputy/yaml/__init__.py @@ -0,0 +1,9 @@ +from .compat import YAML, YAMLError, MarkedYAMLError + +MANIFEST_YAML = YAML() + +__all__ = [ + "MANIFEST_YAML", + "YAMLError", + "MarkedYAMLError", +] diff --git a/src/debputy/yaml/compat.py b/src/debputy/yaml/compat.py new file mode 100644 index 0000000..f26af02 --- /dev/null +++ b/src/debputy/yaml/compat.py @@ -0,0 +1,19 @@ +__all__ = [ + "YAML", + "YAMLError", + "MarkedYAMLError", + "Node", + "LineCol", + "CommentedBase", + "CommentedMap", + "CommentedSeq", +] + +try: + from ruyaml import YAMLError, YAML, Node + from ruyaml.comments import LineCol, CommentedBase, CommentedMap, CommentedSeq + from ruyaml.error import MarkedYAMLError +except (ImportError, ModuleNotFoundError): + from ruamel.yaml import YAMLError, YAML, Node + from ruamel.yaml.comments import LineCol, CommentedBase, CommentedMap, CommentedSeq + from ruamel.yaml.error import MarkedYAMLError |