summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py34
-rw-r--r--src/debputy/commands/debputy_cmd/plugin_cmds.py253
-rw-r--r--src/debputy/highlevel_manifest.py7
-rw-r--r--src/debputy/highlevel_manifest_parser.py45
-rw-r--r--src/debputy/linting/lint_impl.py2
-rw-r--r--src/debputy/lsp/logins-and-people.dic1
-rw-r--r--src/debputy/lsp/lsp_debian_debputy_manifest.py702
-rw-r--r--src/debputy/lsp/lsp_dispatch.py33
-rw-r--r--src/debputy/lsp/lsp_features.py18
-rw-r--r--src/debputy/manifest_parser/declarative_parser.py160
-rw-r--r--src/debputy/manifest_parser/parser_doc.py273
-rw-r--r--src/debputy/manifest_parser/util.py6
-rw-r--r--src/debputy/plugin/api/feature_set.py35
-rw-r--r--src/debputy/plugin/api/impl.py30
-rw-r--r--src/debputy/plugin/api/impl_types.py149
-rw-r--r--src/debputy/plugin/api/plugin_parser.py10
-rw-r--r--src/debputy/plugin/debputy/binary_package_rules.py16
-rw-r--r--src/debputy/plugin/debputy/manifest_root_rules.py18
-rw-r--r--src/debputy/plugin/debputy/private_api.py4
-rw-r--r--src/debputy/yaml/__init__.py9
-rw-r--r--src/debputy/yaml/compat.py19
21 files changed, 1414 insertions, 410 deletions
diff --git a/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py b/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py
index e72a6ce..dcb5063 100644
--- a/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py
+++ b/src/debputy/commands/debputy_cmd/lint_and_lsp_cmds.py
@@ -13,7 +13,7 @@ _EDITOR_SNIPPETS = {
;;
;; Add to ~/.emacs or ~/.emacs.d/init.el and then activate via `M-x eglot`.
;;
- ;; Requires: apt install elpa-dpkg-dev-el
+ ;; Requires: apt install elpa-dpkg-dev-el elpa-yaml-mode
;; Recommends: apt install elpa-markdown-mode
;; Make emacs recognize debian/debputy.manifest as a YAML file
@@ -29,6 +29,8 @@ _EDITOR_SNIPPETS = {
;; The debian/rules file uses the qmake mode.
(add-to-list 'eglot-server-programs
'(makefile-gmake-mode . ("debputy" "lsp" "server")))
+ (add-to-list 'eglot-server-programs
+ '(yaml-mode . ("debputy" "lsp" "server")))
)
;; Auto-start eglot for the relevant modes.
@@ -39,6 +41,7 @@ _EDITOR_SNIPPETS = {
;; (add-hook 'debian-changelog-mode-hook 'eglot-ensure)
(add-hook 'debian-copyright-mode-hook 'eglot-ensure)
(add-hook 'makefile-gmake-mode-hook 'eglot-ensure)
+ (add-hook 'yaml-mode-hook 'eglot-ensure)
"""
),
"vim": "vim+youcompleteme",
@@ -112,9 +115,15 @@ def lsp_server_cmd(context: CommandContext) -> None:
"This feature requires lsprotocol and pygls (apt-get install python3-lsprotocol python3-pygls)"
)
- from debputy.lsp.lsp_features import ensure_lsp_features_are_loaded
+ feature_set = context.load_plugins()
+
+ from debputy.lsp.lsp_features import (
+ ensure_lsp_features_are_loaded,
+ lsp_set_plugin_features,
+ )
from debputy.lsp.lsp_dispatch import DEBPUTY_LANGUAGE_SERVER
+ lsp_set_plugin_features(feature_set)
ensure_lsp_features_are_loaded()
debputy_language_server = DEBPUTY_LANGUAGE_SERVER
@@ -134,12 +143,27 @@ def lsp_server_cmd(context: CommandContext) -> None:
"editor_name",
metavar="editor",
choices=_EDITOR_SNIPPETS,
+ default=None,
+ nargs="?",
help="The editor to provide a snippet for",
),
],
)
def lsp_editor_glue(context: CommandContext) -> None:
editor_name = context.parsed_args.editor_name
+
+ if editor_name is None:
+ content = []
+ for editor_name, payload in _EDITOR_SNIPPETS.items():
+ alias_of = ""
+ if payload in _EDITOR_SNIPPETS:
+ alias_of = f" (short for: {payload})"
+ content.append((editor_name, alias_of))
+ max_name = max(len(c[0]) for c in content)
+ print("This version of debputy has editor snippets for the following editors: ")
+ for editor_name, alias_of in content:
+ print(f" * {editor_name:<{max_name}}{alias_of}")
+ return
result = _EDITOR_SNIPPETS[editor_name]
while result in _EDITOR_SNIPPETS:
result = _EDITOR_SNIPPETS[result]
@@ -200,6 +224,12 @@ def lint_cmd(context: CommandContext) -> None:
from debputy.linting.lint_impl import perform_linting
context.must_be_called_in_source_root()
+ feature_set = context.load_plugins()
+
+ from debputy.lsp.lsp_features import lsp_set_plugin_features
+
+ lsp_set_plugin_features(feature_set)
+
perform_linting(context)
diff --git a/src/debputy/commands/debputy_cmd/plugin_cmds.py b/src/debputy/commands/debputy_cmd/plugin_cmds.py
index 2456902..69b2a2a 100644
--- a/src/debputy/commands/debputy_cmd/plugin_cmds.py
+++ b/src/debputy/commands/debputy_cmd/plugin_cmds.py
@@ -37,6 +37,7 @@ from debputy.manifest_parser.declarative_parser import (
BASIC_SIMPLE_TYPES,
)
from debputy.manifest_parser.parser_data import ParserContextData
+from debputy.manifest_parser.parser_doc import render_rule
from debputy.manifest_parser.util import unpack_type, AttributePath
from debputy.packager_provided_files import detect_all_packager_provided_files
from debputy.plugin.api.example_processing import (
@@ -449,8 +450,9 @@ def _plugin_cmd_list_manifest_rules(context: CommandContext) -> None:
# to derive to this common base type on its own.
base_type = Iterable[Tuple[Union[str, Type[Any]], DispatchingParserBase[Any]]]
- table_parsers: base_type = feature_set.dispatchable_table_parsers.items()
- object_parsers: base_type = feature_set.dispatchable_object_parsers.items()
+ parser_generator = feature_set.manifest_parser_generator
+ table_parsers: base_type = parser_generator.dispatchable_table_parsers.items()
+ object_parsers: base_type = parser_generator.dispatchable_object_parsers.items()
parsers = chain(
table_parsers,
@@ -493,215 +495,6 @@ def _plugin_cmd_list_automatic_discard_rules(context: CommandContext) -> None:
)
-def _provide_placeholder_parser_doc(
- parser_doc: Optional[ParserDocumentation],
- attributes: Iterable[str],
-) -> ParserDocumentation:
- if parser_doc is None:
- parser_doc = reference_documentation()
- changes = {}
- if parser_doc.attribute_doc is None:
- changes["attribute_doc"] = [undocumented_attr(attr) for attr in attributes]
-
- if changes:
- return parser_doc.replace(**changes)
- return parser_doc
-
-
-def _doc_args_parser_doc(
- rule_name: str,
- declarative_parser: DeclarativeInputParser[Any],
- plugin_metadata: DebputyPluginMetadata,
-) -> Tuple[Mapping[str, str], ParserDocumentation]:
- attributes: Iterable[str]
- if isinstance(declarative_parser, DeclarativeMappingInputParser):
- attributes = declarative_parser.source_attributes.keys()
- else:
- attributes = []
- doc_args = {
- "RULE_NAME": rule_name,
- "MANIFEST_FORMAT_DOC": f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md",
- "PLUGIN_NAME": plugin_metadata.plugin_name,
- }
- parser_doc = _provide_placeholder_parser_doc(
- declarative_parser.inline_reference_documentation,
- attributes,
- )
- return doc_args, parser_doc
-
-
-def _render_rule(
- rule_name: str,
- rule_type: str,
- declarative_parser: DeclarativeInputParser[Any],
- plugin_metadata: DebputyPluginMetadata,
- manifest_attribute_path: str,
-) -> None:
- is_root_rule = rule_name == "::"
-
- doc_args, parser_doc = _doc_args_parser_doc(
- "the manifest root" if is_root_rule else rule_name,
- declarative_parser,
- plugin_metadata,
- )
- t = assume_not_none(parser_doc.title).format(**doc_args)
- print(t)
- print("=" * len(t))
- print()
-
- print(assume_not_none(parser_doc.description).format(**doc_args).rstrip())
-
- print()
- alt_form_parser = getattr(declarative_parser, "alt_form_parser", None)
- if isinstance(
- declarative_parser, (DeclarativeMappingInputParser, DispatchingObjectParser)
- ):
- if isinstance(declarative_parser, DeclarativeMappingInputParser):
- attributes = declarative_parser.source_attributes
- required = declarative_parser.input_time_required_parameters
- conditionally_required = declarative_parser.at_least_one_of
- mutually_exclusive = declarative_parser.mutually_exclusive_attributes
- is_list_wrapped = declarative_parser.is_list_wrapped
- else:
- attributes = {}
- required = frozenset()
- conditionally_required = frozenset()
- mutually_exclusive = frozenset()
- is_list_wrapped = False
- if is_list_wrapped:
- print("List where each element has the following attributes:")
- else:
- print("Attributes:")
- attribute_docs = (
- parser_doc.attribute_doc if parser_doc.attribute_doc is not None else []
- )
- for attr_doc in assume_not_none(attribute_docs):
- attr_description = attr_doc.description
- prefix = " - "
-
- for parameter in sorted(attr_doc.attributes):
- parameter_details = attributes.get(parameter)
- if parameter_details is not None:
- source_name = parameter_details.source_attribute_name
- describe_type = parameter_details.type_validator.describe_type()
- else:
- assert isinstance(declarative_parser, DispatchingObjectParser)
- source_name = parameter
- subparser = declarative_parser.parser_for(source_name).parser
- if isinstance(subparser, DispatchingObjectParser):
- rule_prefix = rule_name if rule_name != "::" else ""
- describe_type = f"Object (see `{rule_prefix}::{subparser.manifest_attribute_path_template}`)"
- elif isinstance(subparser, DeclarativeMappingInputParser):
- describe_type = "<Type definition not implemented yet>" # TODO: Derive from subparser
- elif isinstance(subparser, DeclarativeNonMappingInputParser):
- describe_type = (
- subparser.alt_form_parser.type_validator.describe_type()
- )
- else:
- describe_type = f"<Unknown: Non-introspectable subparser - {subparser.__class__.__name__}>"
-
- if source_name in required:
- req_str = "required"
- elif any(source_name in s for s in conditionally_required):
- req_str = "conditional"
- else:
- req_str = "optional"
- print(f"{prefix}`{source_name}` ({req_str}): {describe_type}")
- prefix = " "
-
- if attr_description:
- print()
- for line in attr_description.format(**doc_args).splitlines(
- keepends=False
- ):
- print(f" {line}")
- print()
-
- if (
- bool(conditionally_required)
- or bool(mutually_exclusive)
- or any(pd.conflicting_attributes for pd in attributes.values())
- ):
- print()
- if is_list_wrapped:
- print(
- "This rule enforces the following restrictions on each element in the list:"
- )
- else:
- print("This rule enforces the following restrictions:")
-
- if conditionally_required or mutually_exclusive:
- all_groups = set(
- itertools.chain(conditionally_required, mutually_exclusive)
- )
- for g in all_groups:
- anames = "`, `".join(g)
- is_mx = g in mutually_exclusive
- is_cr = g in conditionally_required
- if is_mx and is_cr:
- print(f" - The rule must use exactly one of: `{anames}`")
- elif is_cr:
- print(f" - The rule must use at least one of: `{anames}`")
- else:
- assert is_mx
- print(
- f" - The following attributes are mutually exclusive: `{anames}`"
- )
-
- if mutually_exclusive or any(
- pd.conflicting_attributes for pd in attributes.values()
- ):
- for parameter, parameter_details in sorted(attributes.items()):
- source_name = parameter_details.source_attribute_name
- conflicts = set(parameter_details.conflicting_attributes)
- for mx in mutually_exclusive:
- if parameter in mx and mx not in conditionally_required:
- conflicts |= mx
- if conflicts:
- conflicts.discard(parameter)
- cnames = "`, `".join(
- attributes[a].source_attribute_name for a in conflicts
- )
- print(
- f" - The attribute `{source_name}` cannot be used with any of: `{cnames}`"
- )
- print()
- if alt_form_parser is not None:
- # FIXME: Mapping[str, Any] ends here, which is ironic given the headline.
- print(f"Non-mapping format: {alt_form_parser.type_validator.describe_type()}")
- alt_parser_desc = parser_doc.alt_parser_description
- if alt_parser_desc:
- for line in alt_parser_desc.format(**doc_args).splitlines(keepends=False):
- print(f" {line}")
- print()
-
- if declarative_parser.reference_documentation_url is not None:
- print(
- f"Reference documentation: {declarative_parser.reference_documentation_url}"
- )
- else:
- print(
- "Reference documentation: No reference documentation link provided by the plugin"
- )
-
- if not is_root_rule:
- print(
- f"Used in: {manifest_attribute_path if manifest_attribute_path != '<ROOT>' else 'The manifest root'}"
- )
- print(f"Rule reference: {rule_type}::{rule_name}")
- print(f"Plugin: {plugin_metadata.plugin_name}")
- else:
- print(f"Rule reference: {rule_name}")
-
- print()
- print(
- "PS: If you want to know more about a non-trivial type of an attribute such as `FileSystemMatchRule`,"
- )
- print(
- "you can use `debputy plugin show type-mappings FileSystemMatchRule` to look it up "
- )
-
-
def _render_manifest_variable_value(v: Optional[str]) -> str:
if v is None:
return "(N/A: Cannot resolve the variable)"
@@ -991,8 +784,9 @@ def _plugin_cmd_show_manifest_rule(context: CommandContext) -> None:
matched = []
base_type = Iterable[Tuple[Union[str, Type[Any]], DispatchingParserBase[Any]]]
- table_parsers: base_type = feature_set.dispatchable_table_parsers.items()
- object_parsers: base_type = feature_set.dispatchable_object_parsers.items()
+ parser_generator = feature_set.manifest_parser_generator
+ table_parsers: base_type = parser_generator.dispatchable_table_parsers.items()
+ object_parsers: base_type = parser_generator.dispatchable_object_parsers.items()
parsers = chain(
table_parsers,
@@ -1034,17 +828,36 @@ def _plugin_cmd_show_manifest_rule(context: CommandContext) -> None:
plugin_metadata = plugin_provided_parser.plugin_metadata
else:
rule_name = "::"
- parser = feature_set.dispatchable_object_parsers[OPARSER_MANIFEST_ROOT]
+ parser = parser_generator.dispatchable_object_parsers[OPARSER_MANIFEST_ROOT]
parser_type_name = ""
plugin_metadata = plugin_metadata_for_debputys_own_plugin()
manifest_attribute_path = ""
- _render_rule(
- rule_name,
- parser_type_name,
- parser,
- plugin_metadata,
- manifest_attribute_path,
+ is_root_rule = rule_name == "::"
+ print(
+ render_rule(
+ rule_name,
+ parser,
+ plugin_metadata,
+ is_root_rule=is_root_rule,
+ )
+ )
+
+ if not is_root_rule:
+ print(
+ f"Used in: {manifest_attribute_path if manifest_attribute_path != '<ROOT>' else 'The manifest root'}"
+ )
+ print(f"Rule reference: {parser_type_name}::{rule_name}")
+ print(f"Plugin: {plugin_metadata.plugin_name}")
+ else:
+ print(f"Rule reference: {rule_name}")
+
+ print()
+ print(
+ "PS: If you want to know more about a non-trivial type of an attribute such as `FileSystemMatchRule`,"
+ )
+ print(
+ "you can use `debputy plugin show type-mappings FileSystemMatchRule` to look it up "
)
diff --git a/src/debputy/highlevel_manifest.py b/src/debputy/highlevel_manifest.py
index 1e92210..30440f1 100644
--- a/src/debputy/highlevel_manifest.py
+++ b/src/debputy/highlevel_manifest.py
@@ -22,9 +22,6 @@ from typing import (
)
from debian.debian_support import DpkgArchTable
-from ruamel.yaml import YAML
-from ruamel.yaml.comments import CommentedMap, CommentedSeq
-
from ._deb_options_profiles import DebBuildOptionsAndProfiles
from ._manifest_constants import *
from .architecture_support import DpkgArchitectureBuildProcessValuesTable
@@ -77,8 +74,8 @@ from .util import (
generated_content_dir,
_info,
)
-
-MANIFEST_YAML = YAML()
+from .yaml import MANIFEST_YAML
+from .yaml.compat import CommentedMap, CommentedSeq
@dataclass(slots=True)
diff --git a/src/debputy/highlevel_manifest_parser.py b/src/debputy/highlevel_manifest_parser.py
index 24d05c7..28a3f80 100644
--- a/src/debputy/highlevel_manifest_parser.py
+++ b/src/debputy/highlevel_manifest_parser.py
@@ -15,13 +15,11 @@ from typing import (
)
from debian.debian_support import DpkgArchTable
-from ruamel.yaml import YAMLError
from debputy.highlevel_manifest import (
HighLevelManifest,
PackageTransformationDefinition,
MutableYAMLManifest,
- MANIFEST_YAML,
)
from debputy.maintscript_snippet import (
MaintscriptSnippet,
@@ -54,10 +52,11 @@ from .plugin.api.impl_types import (
TP,
TTP,
DispatchingTableParser,
- OPARSER_PACKAGES,
OPARSER_MANIFEST_ROOT,
+ PackageContextData,
)
from .plugin.api.feature_set import PluginProvidedFeatureSet
+from .yaml import YAMLError, MANIFEST_YAML
try:
from Levenshtein import distance
@@ -273,7 +272,9 @@ class HighLevelManifestParser(ParserContextData):
self._package_state_stack.pop()
def dispatch_parser_table_for(self, rule_type: TTP) -> DispatchingTableParser[TP]:
- t = self._plugin_provided_feature_set.dispatchable_table_parsers.get(rule_type)
+ t = self._plugin_provided_feature_set.manifest_parser_generator.dispatch_parser_table_for(
+ rule_type
+ )
if t is None:
raise AssertionError(
f"Internal error: No dispatching parser for {rule_type.__name__}"
@@ -440,33 +441,30 @@ class YAMLManifestParser(HighLevelManifestParser):
def from_yaml_dict(self, yaml_data: object) -> "HighLevelManifest":
attribute_path = AttributePath.root_path()
- manifest_root_parser = (
- self._plugin_provided_feature_set.dispatchable_object_parsers[
- OPARSER_MANIFEST_ROOT
- ]
- )
+ parser_generator = self._plugin_provided_feature_set.manifest_parser_generator
+ dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
+ manifest_root_parser = dispatchable_object_parsers[OPARSER_MANIFEST_ROOT]
parsed_data = cast(
"ManifestRootRule",
- manifest_root_parser.parse(
+ manifest_root_parser.parse_input(
yaml_data,
attribute_path,
parser_context=self,
),
)
- packages_dict = parsed_data.get("packages", {})
+ packages_dict: Mapping[str, PackageContextData[Mapping[str, Any]]] = cast(
+ "Mapping[str, PackageContextData[Mapping[str, Any]]]",
+ parsed_data.get("packages", {}),
+ )
install_rules = parsed_data.get("installations")
if install_rules:
self._install_rules = install_rules
packages_parent_path = attribute_path["packages"]
- for package_name_raw, v in packages_dict.items():
+ for package_name_raw, pcd in packages_dict.items():
definition_source = packages_parent_path[package_name_raw]
- package_name = package_name_raw
- if "{{" in package_name:
- package_name = self.substitution.substitute(
- package_name_raw,
- definition_source.path,
- )
+ package_name = pcd.resolved_package_name
+ parsed = pcd.value
package_state: PackageTransformationDefinition
with self.binary_package_context(package_name) as package_state:
@@ -476,17 +474,6 @@ class YAMLManifestParser(HighLevelManifestParser):
f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an'
" auto-generated package."
)
- package_rule_parser = (
- self._plugin_provided_feature_set.dispatchable_object_parsers[
- OPARSER_PACKAGES
- ]
- )
- parsed = cast(
- "Mapping[str, Any]",
- package_rule_parser.parse(
- v, definition_source, parser_context=self
- ),
- )
binary_version = parsed.get("binary-version")
if binary_version is not None:
package_state.binary_version = (
diff --git a/src/debputy/linting/lint_impl.py b/src/debputy/linting/lint_impl.py
index 5df76c4..66ff635 100644
--- a/src/debputy/linting/lint_impl.py
+++ b/src/debputy/linting/lint_impl.py
@@ -75,7 +75,7 @@ def perform_linting(context: CommandContext) -> None:
if os.path.isfile("debian/debputy.manifest"):
_info("Note: Due to a limitation in the linter, debian/debputy.manifest is")
_info("only **partially** checked by this command at the time of writing.")
- _info("Please use `debputy check-manifest` for checking the manifest.")
+ _info("Please use `debputy check-manifest` to fully check the manifest.")
if linter_exit_code:
_exit_with_lint_code(lint_report)
diff --git a/src/debputy/lsp/logins-and-people.dic b/src/debputy/lsp/logins-and-people.dic
index 8c231b2..dca29cf 100644
--- a/src/debputy/lsp/logins-and-people.dic
+++ b/src/debputy/lsp/logins-and-people.dic
@@ -132,6 +132,7 @@ Jorgen
Josip
Josselin
Jover
+Kalnischkies
Kastner
Kel
Kis
diff --git a/src/debputy/lsp/lsp_debian_debputy_manifest.py b/src/debputy/lsp/lsp_debian_debputy_manifest.py
index 97fffcc..1dc5fb3 100644
--- a/src/debputy/lsp/lsp_debian_debputy_manifest.py
+++ b/src/debputy/lsp/lsp_debian_debputy_manifest.py
@@ -1,7 +1,14 @@
-import re
from typing import (
Optional,
List,
+ Any,
+ Tuple,
+ Union,
+ Iterable,
+ Sequence,
+ Literal,
+ get_args,
+ get_origin,
)
from lsprotocol.types import (
@@ -10,17 +17,63 @@ from lsprotocol.types import (
Position,
Range,
DiagnosticSeverity,
+ HoverParams,
+ Hover,
+ MarkupKind,
+ MarkupContent,
+ TEXT_DOCUMENT_CODE_ACTION,
+ CompletionParams,
+ CompletionList,
+ CompletionItem,
+)
+from debputy.lsp.quickfixes import propose_correct_text_quick_fix
+from debputy.manifest_parser.base_types import DebputyDispatchableType
+from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
+from debputy.yaml.compat import (
+ Node,
+ CommentedMap,
+ LineCol,
+ CommentedSeq,
+ CommentedBase,
+ MarkedYAMLError,
+ YAMLError,
)
-from ruamel.yaml.error import MarkedYAMLError, YAMLError
from debputy.highlevel_manifest import MANIFEST_YAML
from debputy.lsp.lsp_features import (
lint_diagnostics,
lsp_standard_handler,
+ lsp_hover,
+ lsp_get_plugin_features,
+ lsp_completer,
)
from debputy.lsp.text_util import (
LintCapablePositionCodec,
+ detect_possible_typo,
+)
+from debputy.manifest_parser.declarative_parser import (
+ AttributeDescription,
+ ParserGenerator,
+)
+from debputy.manifest_parser.declarative_parser import DeclarativeMappingInputParser
+from debputy.manifest_parser.parser_doc import (
+ render_rule,
+ render_attribute_doc,
+ doc_args_for_parser_doc,
)
+from debputy.manifest_parser.util import AttributePath
+from debputy.plugin.api.impl import plugin_metadata_for_debputys_own_plugin
+from debputy.plugin.api.impl_types import (
+ OPARSER_MANIFEST_ROOT,
+ DeclarativeInputParser,
+ DispatchingParserBase,
+ DebputyPluginMetadata,
+ ListWrappedDeclarativeInputParser,
+ InPackageContextParser,
+ DeclarativeValuelessKeywordInputParser,
+)
+from debputy.util import _info, _warn
+
try:
from pygls.server import LanguageServer
@@ -28,11 +81,6 @@ except ImportError:
pass
-_CONTAINS_TAB_OR_COLON = re.compile(r"[\t:]")
-_WORDS_RE = re.compile("([a-zA-Z0-9_-]+)")
-_MAKE_ERROR_RE = re.compile(r"^[^:]+:(\d+):\s*(\S.+)")
-
-
_LANGUAGE_IDS = [
"debian/debputy.manifest",
"debputy.manifest",
@@ -41,7 +89,7 @@ _LANGUAGE_IDS = [
]
-# lsp_standard_handler(_LANGUAGE_IDS, TEXT_DOCUMENT_CODE_ACTION)
+lsp_standard_handler(_LANGUAGE_IDS, TEXT_DOCUMENT_CODE_ACTION)
lsp_standard_handler(_LANGUAGE_IDS, TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL)
@@ -83,14 +131,20 @@ def _lint_debian_debputy_manifest(
return None
diagnostics = []
try:
- MANIFEST_YAML.load("".join(lines))
+ content = MANIFEST_YAML.load("".join(lines))
except MarkedYAMLError as e:
+ if e.context_mark:
+ line = e.context_mark.line
+ column = e.context_mark.column + 1
+ else:
+ line = e.problem_mark.line
+ column = e.problem_mark.column + 1
error_range = position_codec.range_to_client_units(
lines,
_word_range_at_position(
lines,
- e.problem_mark.line,
- e.problem_mark.column,
+ line,
+ column,
),
)
diagnostics.append(
@@ -115,5 +169,629 @@ def _lint_debian_debputy_manifest(
DiagnosticSeverity.Error,
),
)
-
+ else:
+ feature_set = lsp_get_plugin_features()
+ root_parser = feature_set.manifest_parser_generator.dispatchable_object_parsers[
+ OPARSER_MANIFEST_ROOT
+ ]
+ diagnostics.extend(_lint_content(root_parser, content, lines, position_codec))
return diagnostics
+
+
+def _lint_content(
+ parser: DeclarativeInputParser[Any],
+ content: Any,
+ lines: List[str],
+ position_codec: LintCapablePositionCodec,
+) -> Iterable[Diagnostic]:
+ if isinstance(parser, DispatchingParserBase):
+ if not isinstance(content, CommentedMap):
+ return
+ lc = content.lc
+ for key, value in content.items():
+ if not parser.is_known_keyword(key):
+ line, col = lc.key(key)
+ key_range = position_codec.range_to_client_units(
+ lines,
+ Range(
+ Position(
+ line,
+ col,
+ ),
+ Position(
+ line,
+ col + len(key),
+ ),
+ ),
+ )
+
+ candidates = detect_possible_typo(key, parser.registered_keywords())
+
+ yield Diagnostic(
+ key_range,
+ f"Unknown or unsupported key {key}",
+ DiagnosticSeverity.Error,
+ source="debputy",
+ data=[propose_correct_text_quick_fix(n) for n in candidates],
+ )
+ else:
+ subparser = parser.parser_for(key)
+ assert subparser is not None
+ yield from _lint_content(subparser.parser, value, lines, position_codec)
+ elif isinstance(parser, ListWrappedDeclarativeInputParser):
+ if not isinstance(content, CommentedSeq):
+ return
+ subparser = parser.delegate
+ for value in content:
+ yield from _lint_content(subparser, value, lines, position_codec)
+ elif isinstance(parser, InPackageContextParser):
+ if not isinstance(content, CommentedMap):
+ return
+ for v in content.values():
+ yield from _lint_content(parser.delegate, v, lines, position_codec)
+ elif isinstance(parser, DeclarativeMappingInputParser):
+ if not isinstance(content, CommentedMap):
+ return
+ lc = content.lc
+ for key, value in content.items():
+ attr = parser.manifest_attributes.get(key)
+ if attr is None:
+ line, col = lc.key(key)
+ key_range = position_codec.range_to_client_units(
+ lines,
+ Range(
+ Position(
+ line,
+ col,
+ ),
+ Position(
+ line,
+ col + len(key),
+ ),
+ ),
+ )
+
+ candidates = detect_possible_typo(key, parser.manifest_attributes)
+ yield Diagnostic(
+ key_range,
+ f"Unknown or unsupported key {key}",
+ DiagnosticSeverity.Error,
+ source="debputy",
+ data=[propose_correct_text_quick_fix(n) for n in candidates],
+ )
+
+
+def is_at(position: Position, lc_pos: Tuple[int, int]) -> bool:
+ return position.line == lc_pos[0] and position.character == lc_pos[1]
+
+
+def is_before(position: Position, lc_pos: Tuple[int, int]) -> bool:
+ line, column = lc_pos
+ if position.line < line:
+ return True
+ if position.line == line and position.character < column:
+ return True
+ return False
+
+
+def is_after(position: Position, lc_pos: Tuple[int, int]) -> bool:
+ line, column = lc_pos
+ if position.line > line:
+ return True
+ if position.line == line and position.character > column:
+ return True
+ return False
+
+
+def _trace_cursor(
+ content: Any,
+ attribute_path: AttributePath,
+ server_position: Position,
+) -> Optional[Tuple[bool, AttributePath, Any, Any]]:
+ matched_key: Optional[Union[str, int]] = None
+ matched: Optional[Node] = None
+ matched_was_key: bool = False
+
+ if isinstance(content, CommentedMap):
+ dict_lc: LineCol = content.lc
+ for k, v in content.items():
+ k_lc = dict_lc.key(k)
+ if is_before(server_position, k_lc):
+ break
+ v_lc = dict_lc.value(k)
+ if is_before(server_position, v_lc):
+ # TODO: Handle ":" and "whitespace"
+ matched = k
+ matched_key = k
+ matched_was_key = True
+ break
+ matched = v
+ matched_key = k
+ matched_was_key = False
+ elif isinstance(content, CommentedSeq):
+ list_lc: LineCol = content.lc
+ for idx, value in enumerate(content):
+ i_lc = list_lc.item(idx)
+ if is_before(server_position, i_lc):
+ break
+ matched_key = idx
+ matched = value
+
+ if matched is not None:
+ assert matched_key is not None
+ sub_path = attribute_path[matched_key]
+ if not matched_was_key and isinstance(matched, CommentedBase):
+ return _trace_cursor(matched, sub_path, server_position)
+ return matched_was_key, sub_path, matched, content
+ return None
+
+
+_COMPLETION_HINT_KEY = "___COMPLETE:"
+_COMPLETION_HINT_VALUE = "___COMPLETE"
+
+
+def resolve_keyword(
+ current_parser: Union[DeclarativeInputParser[Any], DispatchingParserBase],
+ current_plugin: DebputyPluginMetadata,
+ segments: List[Union[str, int]],
+ segment_idx: int,
+ parser_generator: ParserGenerator,
+ *,
+ is_completion_attempt: bool = False,
+) -> Optional[
+ Tuple[
+ Union[DeclarativeInputParser[Any], DispatchingParserBase],
+ DebputyPluginMetadata,
+ int,
+ ]
+]:
+ if segment_idx >= len(segments):
+ return current_parser, current_plugin, segment_idx
+ current_segment = segments[segment_idx]
+ if isinstance(current_parser, ListWrappedDeclarativeInputParser):
+ if isinstance(current_segment, int):
+ current_parser = current_parser.delegate
+ segment_idx += 1
+ if segment_idx >= len(segments):
+ return current_parser, current_plugin, segment_idx
+ current_segment = segments[segment_idx]
+
+ if not isinstance(current_segment, str):
+ return None
+
+ if is_completion_attempt and current_segment.endswith(
+ (_COMPLETION_HINT_KEY, _COMPLETION_HINT_VALUE)
+ ):
+ return current_parser, current_plugin, segment_idx
+
+ if isinstance(current_parser, InPackageContextParser):
+ return resolve_keyword(
+ current_parser.delegate,
+ current_plugin,
+ segments,
+ segment_idx + 1,
+ parser_generator,
+ is_completion_attempt=is_completion_attempt,
+ )
+ elif isinstance(current_parser, DispatchingParserBase):
+ if not current_parser.is_known_keyword(current_segment):
+ if is_completion_attempt:
+ return current_parser, current_plugin, segment_idx
+ return None
+ subparser = current_parser.parser_for(current_segment)
+ segment_idx += 1
+ if segment_idx < len(segments):
+ return resolve_keyword(
+ subparser.parser,
+ subparser.plugin_metadata,
+ segments,
+ segment_idx,
+ parser_generator,
+ is_completion_attempt=is_completion_attempt,
+ )
+ return subparser.parser, subparser.plugin_metadata, segment_idx
+ elif isinstance(current_parser, DeclarativeMappingInputParser):
+ attr = current_parser.manifest_attributes.get(current_segment)
+ attr_type = attr.attribute_type if attr is not None else None
+ if (
+ attr_type is not None
+ and isinstance(attr_type, type)
+ and issubclass(attr_type, DebputyDispatchableType)
+ ):
+ subparser = parser_generator.dispatch_parser_table_for(attr_type)
+ if subparser is not None and (
+ is_completion_attempt or segment_idx + 1 < len(segments)
+ ):
+ return resolve_keyword(
+ subparser,
+ current_plugin,
+ segments,
+ segment_idx + 1,
+ parser_generator,
+ is_completion_attempt=is_completion_attempt,
+ )
+ return current_parser, current_plugin, segment_idx
+ else:
+ _info(f"Unknown parser: {current_parser.__class__}")
+ return None
+
+
+def _render_param_doc(
+ rule_name: str,
+ declarative_parser: DeclarativeMappingInputParser,
+ plugin_metadata: DebputyPluginMetadata,
+ attribute: str,
+) -> Optional[str]:
+ attr = declarative_parser.source_attributes.get(attribute)
+ if attr is None:
+ return None
+
+ doc_args, parser_doc = doc_args_for_parser_doc(
+ rule_name,
+ declarative_parser,
+ plugin_metadata,
+ )
+ rendered_docs = render_attribute_doc(
+ declarative_parser,
+ declarative_parser.source_attributes,
+ declarative_parser.input_time_required_parameters,
+ declarative_parser.at_least_one_of,
+ parser_doc,
+ doc_args,
+ is_interactive=True,
+ rule_name=rule_name,
+ )
+
+ for attributes, rendered_doc in rendered_docs:
+ if attribute in attributes:
+ full_doc = [
+ f"# Attribute `{attribute}`",
+ "",
+ ]
+ full_doc.extend(rendered_doc)
+
+ return "\n".join(full_doc)
+ return None
+
+
+DEBPUTY_PLUGIN_METADATA = plugin_metadata_for_debputys_own_plugin()
+
+
+def _guess_rule_name(segments: List[Union[str, int]], idx: int) -> str:
+ orig_idx = idx
+ idx -= 1
+ while idx >= 0:
+ segment = segments[idx]
+ if isinstance(segment, str):
+ return segment
+ idx -= 1
+ _warn(f"Unable to derive rule name from {segments} [{orig_idx}]")
+ return "<Bug: unknown rule name>"
+
+
+def _ecsape(v: str) -> str:
+ return '"' + v.replace("\n", "\\n") + '"'
+
+
+def _insert_snippet(lines: List[str], server_position: Position) -> bool:
+ _info(f"Complete at {server_position}")
+ line_no = server_position.line
+ line = lines[line_no]
+ pos_rhs = line[server_position.character :]
+ if pos_rhs and not pos_rhs.isspace():
+ _info(f"No insertion: {_ecsape(line[server_position.character:])}")
+ return False
+ lhs = line[: server_position.character].strip()
+ if not lhs:
+ _info(f"Insertion of key: {_ecsape(line[server_position.character:])}")
+ # Respect the provided indentation
+ new_line = line[: server_position.character] + _COMPLETION_HINT_KEY
+ elif lhs.endswith(":"):
+ new_line = line[: server_position.character] + _COMPLETION_HINT_VALUE
+ else:
+ c = line[server_position.character]
+ _info(f"Not touching line: {_ecsape(line)} -- {_ecsape(c)}")
+ return False
+ _info(f'Evaluating complete on synthetic line: "{new_line}"')
+ lines[line_no] = new_line
+ return True
+
+
+@lsp_completer(_LANGUAGE_IDS)
+def debputy_manifest_completer(
+ ls: "LanguageServer",
+ params: CompletionParams,
+) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]:
+ doc = ls.workspace.get_text_document(params.text_document.uri)
+ if not is_valid_file(doc.path):
+ return None
+ lines = doc.lines
+ server_position = doc.position_codec.position_from_client_units(
+ lines, params.position
+ )
+ attribute_root_path = AttributePath.root_path()
+ added_key = _insert_snippet(lines, server_position)
+ attempts = 1 if added_key else 2
+ content = None
+ while attempts > 0:
+ attempts -= 1
+ try:
+ content = MANIFEST_YAML.load("".join(lines))
+ break
+ except MarkedYAMLError as e:
+ context_line = (
+ e.context_mark.line if e.context_mark else e.problem_mark.line
+ )
+ if (
+ e.problem_mark.line != server_position.line
+ and context_line != server_position.line
+ ):
+ l_data = (
+ lines[e.problem_mark.line].rstrip()
+ if e.problem_mark.line < len(lines)
+ else "N/A (OOB)"
+ )
+ _info(f"Parse error on line: {e.problem_mark.line}: {l_data}")
+ return None
+
+ if attempts > 0:
+ # Try to make it a key and see if that fixes the problem
+ new_line = lines[server_position.line].rstrip() + _COMPLETION_HINT_KEY
+ lines[server_position.line] = new_line
+ except YAMLError:
+ break
+
+ if content is None:
+ context = lines[server_position.line].replace("\n", "\\n")
+ _info(f"Completion failed: parse error: Line in question: {context}")
+ return None
+
+ m = _trace_cursor(content, attribute_root_path, server_position)
+ if m is None:
+ _info("No match")
+ return None
+ matched_key, attr_path, matched, parent = m
+ _info(f"Matched path: {matched} (path: {attr_path.path}) [{matched_key=}]")
+
+ feature_set = lsp_get_plugin_features()
+ root_parser = feature_set.manifest_parser_generator.dispatchable_object_parsers[
+ OPARSER_MANIFEST_ROOT
+ ]
+ segments = list(attr_path.path_segments())
+ if added_key:
+ segments.pop()
+ km = resolve_keyword(
+ root_parser,
+ DEBPUTY_PLUGIN_METADATA,
+ segments,
+ 0,
+ feature_set.manifest_parser_generator,
+ is_completion_attempt=True,
+ )
+ if km is None:
+ return None
+ parser, _, at_depth_idx = km
+ _info(f"Match leaf parser {at_depth_idx} -- {parser.__class__}")
+ items = []
+ if at_depth_idx + 1 >= len(segments):
+ if isinstance(parser, DispatchingParserBase):
+ if matched_key:
+ items = [
+ CompletionItem(f"{k}:")
+ for k in parser.registered_keywords()
+ if k not in parent
+ and not isinstance(
+ parser.parser_for(k).parser,
+ DeclarativeValuelessKeywordInputParser,
+ )
+ ]
+ else:
+ _info("TODO: Match value")
+ elif isinstance(parser, InPackageContextParser):
+ # doc = ls.workspace.get_text_document(params.text_document.uri)
+ _info(f"TODO: Match package - {parent} -- {matched} -- {matched_key=}")
+ elif isinstance(parser, DeclarativeMappingInputParser):
+ print(f"MMM: {matched} - {parent}")
+ if matched_key:
+ _info("Match attributes")
+ locked = set(parent)
+ for mx in parser.mutually_exclusive_attributes:
+ if not mx.isdisjoint(parent.keys()):
+ locked.update(mx)
+ items = [
+ CompletionItem(f"{k}:")
+ for k in parser.manifest_attributes
+ if k not in locked
+ ]
+ else:
+ # Value
+ key = segments[at_depth_idx] if len(segments) > at_depth_idx else None
+ attr = parser.manifest_attributes.get(key)
+ if attr is not None:
+ _info(f"Expand value / key: {key} -- {attr.attribute_type}")
+ items = _completion_from_attr(
+ attr,
+ feature_set.manifest_parser_generator,
+ matched,
+ )
+ else:
+ _info(
+ f"Expand value / key: {key} -- !! {list(parser.manifest_attributes)}"
+ )
+ return items
+
+
+def _completion_from_attr(
+ attr: AttributeDescription,
+ pg: ParserGenerator,
+ matched: Any,
+) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]:
+ orig = get_origin(attr.attribute_type)
+ valid_values: Sequence[Any] = tuple()
+ if orig == Literal:
+ valid_values = get_args(attr.attribute_type)
+ elif orig == bool or attr.attribute_type == bool:
+ valid_values = ("true", "false")
+ elif isinstance(orig, type) and issubclass(orig, DebputyDispatchableType):
+ parser = pg.dispatch_parser_table_for(orig)
+ _info(f"M: {parser}")
+
+ if matched in valid_values:
+ _info(f"Already filled: {matched} is one of {valid_values}")
+ return None
+ if valid_values:
+ return [CompletionItem(x) for x in valid_values]
+ return None
+
+
+@lsp_hover(_LANGUAGE_IDS)
+def debputy_manifest_hover(
+ ls: "LanguageServer",
+ params: HoverParams,
+) -> Optional[Hover]:
+ doc = ls.workspace.get_text_document(params.text_document.uri)
+ if not is_valid_file(doc.path):
+ return None
+ lines = doc.lines
+ position_codec = doc.position_codec
+ attribute_root_path = AttributePath.root_path()
+ server_position = position_codec.position_from_client_units(lines, params.position)
+
+ try:
+ content = MANIFEST_YAML.load("".join(lines))
+ except YAMLError:
+ return None
+ m = _trace_cursor(content, attribute_root_path, server_position)
+ if m is None:
+ _info("No match")
+ return None
+ matched_key, attr_path, matched, _ = m
+ _info(f"Matched path: {matched} (path: {attr_path.path}) [{matched_key=}]")
+
+ feature_set = lsp_get_plugin_features()
+ parser_generator = feature_set.manifest_parser_generator
+ root_parser = parser_generator.dispatchable_object_parsers[OPARSER_MANIFEST_ROOT]
+ segments = list(attr_path.path_segments())
+ km = resolve_keyword(
+ root_parser,
+ DEBPUTY_PLUGIN_METADATA,
+ segments,
+ 0,
+ parser_generator,
+ )
+ if km is None:
+ _info("No keyword match")
+ return
+ parser, plugin_metadata, at_depth_idx = km
+ _info(f"Match leaf parser {at_depth_idx}/{len(segments)} -- {parser.__class__}")
+ hover_doc_text = resolve_hover_text(
+ feature_set,
+ parser,
+ plugin_metadata,
+ segments,
+ at_depth_idx,
+ matched,
+ matched_key,
+ )
+ return _hover_doc(ls, hover_doc_text)
+
+
+def resolve_hover_text_for_value(
+ feature_set: PluginProvidedFeatureSet,
+ parser: DeclarativeMappingInputParser,
+ plugin_metadata: DebputyPluginMetadata,
+ segment: Union[str, int],
+ matched: Any,
+) -> Optional[str]:
+
+ hover_doc_text: Optional[str] = None
+ attr = parser.manifest_attributes.get(segment)
+ attr_type = attr.attribute_type if attr is not None else None
+ if attr_type is None:
+ _info(f"Matched value for {segment} -- No attr or type")
+ return None
+ if isinstance(attr_type, type) and issubclass(attr_type, DebputyDispatchableType):
+ parser_generator = feature_set.manifest_parser_generator
+ parser = parser_generator.dispatch_parser_table_for(attr_type)
+ if parser is None or not isinstance(matched, str):
+ _info(
+ f"Unknown parser for {segment} or matched is not a str -- {attr_type} {type(matched)=}"
+ )
+ return None
+ subparser = parser.parser_for(matched)
+ if subparser is None:
+ _info(f"Unknown parser for {matched} (subparser)")
+ return None
+ hover_doc_text = render_rule(
+ matched,
+ subparser.parser,
+ plugin_metadata,
+ )
+ else:
+ _info(f"Unknown value: {matched} -- {segment}")
+ return hover_doc_text
+
+
+def resolve_hover_text(
+ feature_set: PluginProvidedFeatureSet,
+ parser: Optional[Union[DeclarativeInputParser[Any], DispatchingParserBase]],
+ plugin_metadata: DebputyPluginMetadata,
+ segments: List[Union[str, int]],
+ at_depth_idx: int,
+ matched: Any,
+ matched_key: bool,
+) -> Optional[str]:
+ hover_doc_text: Optional[str] = None
+ if at_depth_idx == len(segments):
+ segment = segments[at_depth_idx - 1]
+ _info(f"Matched {segment} at ==, {matched_key=} ")
+ hover_doc_text = render_rule(
+ segment,
+ parser,
+ plugin_metadata,
+ is_root_rule=False,
+ )
+ elif at_depth_idx + 1 == len(segments) and isinstance(
+ parser, DeclarativeMappingInputParser
+ ):
+ segment = segments[at_depth_idx]
+ _info(f"Matched {segment} at -1, {matched_key=} ")
+ if isinstance(segment, str):
+ if not matched_key:
+ hover_doc_text = resolve_hover_text_for_value(
+ feature_set,
+ parser,
+ plugin_metadata,
+ segment,
+ matched,
+ )
+ if matched_key or hover_doc_text is None:
+ rule_name = _guess_rule_name(segments, at_depth_idx)
+ hover_doc_text = _render_param_doc(
+ rule_name,
+ parser,
+ plugin_metadata,
+ segment,
+ )
+ else:
+ _info(f"No doc: {at_depth_idx=} {len(segments)=}")
+
+ return hover_doc_text
+
+
+def _hover_doc(ls: "LanguageServer", hover_doc_text: Optional[str]) -> Optional[Hover]:
+ if hover_doc_text is None:
+ return None
+ try:
+ supported_formats = ls.client_capabilities.text_document.hover.content_format
+ except AttributeError:
+ supported_formats = []
+ markup_kind = MarkupKind.Markdown
+ if markup_kind not in supported_formats:
+ markup_kind = MarkupKind.PlainText
+ return Hover(
+ contents=MarkupContent(
+ kind=markup_kind,
+ value=hover_doc_text,
+ ),
+ )
diff --git a/src/debputy/lsp/lsp_dispatch.py b/src/debputy/lsp/lsp_dispatch.py
index b7b744c..7a20ae8 100644
--- a/src/debputy/lsp/lsp_dispatch.py
+++ b/src/debputy/lsp/lsp_dispatch.py
@@ -1,14 +1,15 @@
import asyncio
+import os.path
from typing import (
Dict,
Sequence,
Union,
Optional,
- Any,
TypeVar,
Callable,
Mapping,
List,
+ Tuple,
)
from lsprotocol.types import (
@@ -31,7 +32,6 @@ from lsprotocol.types import (
TEXT_DOCUMENT_CODE_ACTION,
Command,
CodeAction,
- TextDocumentCodeActionRequest,
CodeActionParams,
SemanticTokensRegistrationOptions,
)
@@ -51,6 +51,7 @@ _DOCUMENT_VERSION_TABLE: Dict[str, int] = {}
try:
from pygls.server import LanguageServer
+ from pygls.workspace import TextDocument
DEBPUTY_LANGUAGE_SERVER = LanguageServer("debputy", f"v{__version__}")
except ImportError:
@@ -72,6 +73,19 @@ def is_doc_at_version(uri: str, version: int) -> bool:
return dv == version
+def determine_language_id(doc: "TextDocument") -> Tuple[str, str]:
+ lang_id = doc.language_id
+ if lang_id and not lang_id.isspace():
+ return "declared", lang_id
+ path = doc.path
+ try:
+ last_idx = path.rindex("debian/")
+ except ValueError:
+ return "filename", os.path.basename(path)
+ guess_language_id = path[last_idx:]
+ return "filename", guess_language_id
+
+
@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_OPEN)
@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_CHANGE)
async def _open_or_changed_document(
@@ -83,15 +97,15 @@ async def _open_or_changed_document(
doc = ls.workspace.get_text_document(doc_uri)
_DOCUMENT_VERSION_TABLE[doc_uri] = version
-
- handler = DIAGNOSTIC_HANDLERS.get(doc.language_id)
+ id_source, language_id = determine_language_id(doc)
+ handler = DIAGNOSTIC_HANDLERS.get(language_id)
if handler is None:
_info(
- f"Opened/Changed document: {doc.path} ({doc.language_id}) - no diagnostics handler"
+ f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - no diagnostics handler"
)
return
_info(
- f"Opened/Changed document: {doc.path} ({doc.language_id}) - running diagnostics for doc version {version}"
+ f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - running diagnostics for doc version {version}"
)
last_publish_count = -1
@@ -198,14 +212,15 @@ def _dispatch_standard_handler(
) -> R:
doc = ls.workspace.get_text_document(doc_uri)
- handler = handler_table.get(doc.language_id)
+ id_source, language_id = determine_language_id(doc)
+ handler = handler_table.get(language_id)
if handler is None:
_info(
- f"{request_type} for document: {doc.path} ({doc.language_id}) - no handler"
+ f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - no handler"
)
return
_info(
- f"{request_type} for document: {doc.path} ({doc.language_id}) - delegating to handler"
+ f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - delegating to handler"
)
return handler(
diff --git a/src/debputy/lsp/lsp_features.py b/src/debputy/lsp/lsp_features.py
index 5b01266..00bed1b 100644
--- a/src/debputy/lsp/lsp_features.py
+++ b/src/debputy/lsp/lsp_features.py
@@ -11,6 +11,8 @@ from lsprotocol.types import (
SemanticTokensLegend,
)
+from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
+
try:
from pygls.server import LanguageServer
except ImportError:
@@ -30,6 +32,7 @@ SEMANTIC_TOKEN_TYPES_IDS = {
t: idx for idx, t in enumerate(SEMANTIC_TOKENS_LEGEND.token_types)
}
+LSP_PLUGIN_FEATURE_SET: Optional[PluginProvidedFeatureSet] = None
DIAGNOSTIC_HANDLERS = {}
COMPLETER_HANDLERS = {}
HOVER_HANDLERS = {}
@@ -173,6 +176,21 @@ def _register_handler(
handler_dict[file_format] = handler
+def lsp_set_plugin_features(feature_set: Optional[PluginProvidedFeatureSet]) -> None:
+ global LSP_PLUGIN_FEATURE_SET
+ LSP_PLUGIN_FEATURE_SET = feature_set
+
+
+def lsp_get_plugin_features() -> PluginProvidedFeatureSet:
+ global LSP_PLUGIN_FEATURE_SET
+ features = LSP_PLUGIN_FEATURE_SET
+ if features is None:
+ raise RuntimeError(
+ "Initialization error: The plugin feature set has not been initialized before it was needed."
+ )
+ return features
+
+
def ensure_lsp_features_are_loaded() -> None:
# FIXME: This import is needed to force loading of the LSP files. But it only works
# for files with a linter (which currently happens to be all of them, but this is
diff --git a/src/debputy/manifest_parser/declarative_parser.py b/src/debputy/manifest_parser/declarative_parser.py
index bb901fc..f18dc1c 100644
--- a/src/debputy/manifest_parser/declarative_parser.py
+++ b/src/debputy/manifest_parser/declarative_parser.py
@@ -25,6 +25,7 @@ from typing import (
Iterable,
Literal,
Sequence,
+ Container,
)
from debputy.manifest_parser.base_types import (
@@ -49,6 +50,12 @@ from debputy.plugin.api.impl_types import (
TD,
_ALL_PACKAGE_TYPES,
resolve_package_type_selectors,
+ ListWrappedDeclarativeInputParser,
+ DispatchingObjectParser,
+ DispatchingTableParser,
+ TTP,
+ TP,
+ InPackageContextParser,
)
from debputy.plugin.api.spec import ParserDocumentation, PackageTypeSelector
from debputy.util import _info, _warn, assume_not_none
@@ -279,8 +286,6 @@ class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF])
_per_attribute_conflicts_cache: Optional[Mapping[str, FrozenSet[str]]] = None
inline_reference_documentation: Optional[ParserDocumentation] = None
path_hint_source_attributes: Sequence[str] = tuple()
- # TODO: List-wrapping should probably be its own parser that delegetes to subparsers
- is_list_wrapped: bool = False
def _parse_alt_form(
self,
@@ -410,64 +415,29 @@ class DeclarativeMappingInputParser(DeclarativeInputParser[TD], Generic[TD, SF])
return f" (Documentation: {doc_url})"
return ""
- def _parse_input(
+ def parse_input(
self,
value: object,
path: AttributePath,
*,
parser_context: Optional["ParserContextData"] = None,
- is_list_wrapped: bool,
) -> TD:
if value is None:
- if is_list_wrapped:
- form_note = " The attribute must be a list of mappings"
- else:
- form_note = " The attribute must be a mapping."
- if self.alt_form_parser is not None:
- form_note = (
- " The attribute can be a mapping or a non-mapping format"
- ' (usually, "non-mapping format" means a string or a list of strings).'
- )
+ form_note = " The attribute must be a mapping."
+ if self.alt_form_parser is not None:
+ form_note = (
+ " The attribute can be a mapping or a non-mapping format"
+ ' (usually, "non-mapping format" means a string or a list of strings).'
+ )
doc_ref = self._doc_url_error_suffix(see_url_version=True)
raise ManifestParseException(
f"The attribute {path.path} was missing a value. {form_note}{doc_ref}"
)
- if is_list_wrapped:
- if not isinstance(value, list) or not value:
- doc_ref = self._doc_url_error_suffix(see_url_version=True)
- raise ManifestParseException(
- f"The attribute {path.path} must be a non-empty list.{doc_ref}"
- )
- result = []
- for idx, element in enumerate(value):
- element_path = path[idx]
- result.append(
- self._parse_input(
- element,
- element_path,
- parser_context=parser_context,
- is_list_wrapped=False,
- )
- )
- return result
+
if not isinstance(value, dict):
return self._parse_alt_form(value, path, parser_context=parser_context)
return self._parse_typed_dict_form(value, path, parser_context=parser_context)
- def parse_input(
- self,
- value: object,
- path: AttributePath,
- *,
- parser_context: Optional["ParserContextData"] = None,
- ) -> TD:
- return self._parse_input(
- value,
- path,
- parser_context=parser_context,
- is_list_wrapped=self.is_list_wrapped,
- )
-
def _per_attribute_conflicts(self) -> Mapping[str, FrozenSet[str]]:
conflicts = self._per_attribute_conflicts_cache
if conflicts is not None:
@@ -496,7 +466,7 @@ class DebputyParseHint:
>>> class TargetType(TypedDict):
... sources: List[str]
>>> pg = ParserGenerator()
- >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType)
+ >>> parser = pg.generate_parser(TargetType, source_content=SourceType)
In this example, the user can provide either `source` or `sources` and the parser will
map them to the `sources` attribute in the `TargetType`. Note this example relies on
@@ -545,7 +515,7 @@ class DebputyParseHint:
... into_dir: NotRequired[str]
... renamed_to: NotRequired[str]
>>> pg = ParserGenerator()
- >>> parser = pg.parser_from_typed_dict(TargetType, source_content=SourceType)
+ >>> parser = pg.generate_parser(TargetType, source_content=SourceType)
In this example, if the user was to provide `renamed_to` with `sources` or `into_dir` the parser would report
an error. However, the parser will allow `renamed_to` with `source` as the conflict is considered only for
@@ -738,6 +708,11 @@ def _is_path_attribute_candidate(
class ParserGenerator:
def __init__(self) -> None:
self._registered_types: Dict[Any, TypeMapping[Any, Any]] = {}
+ self._object_parsers: Dict[str, DispatchingObjectParser] = {}
+ self._table_parsers: Dict[
+ Type[DebputyDispatchableType], DispatchingTableParser[Any]
+ ] = {}
+ self._in_package_context_parser: Dict[str, Any] = {}
def register_mapped_type(self, mapped_type: TypeMapping) -> None:
existing = self._registered_types.get(mapped_type.target_type)
@@ -748,7 +723,49 @@ class ParserGenerator:
def discard_mapped_type(self, mapped_type: Type[T]) -> None:
del self._registered_types[mapped_type]
- def parser_from_typed_dict(
+ def add_table_parser(self, rt: Type[DebputyDispatchableType], path: str) -> None:
+ assert rt not in self._table_parsers
+ self._table_parsers[rt] = DispatchingTableParser(rt, path)
+
+ def add_object_parser(
+ self,
+ path: str,
+ *,
+ parser_documentation: Optional[ParserDocumentation] = None,
+ ) -> None:
+ assert path not in self._in_package_context_parser
+ assert path not in self._object_parsers
+ self._object_parsers[path] = DispatchingObjectParser(
+ path, parser_documentation=parser_documentation
+ )
+
+ def add_in_package_context_parser(
+ self,
+ path: str,
+ delegate: DeclarativeInputParser[Any],
+ ) -> None:
+ assert path not in self._in_package_context_parser
+ assert path not in self._object_parsers
+ self._in_package_context_parser[path] = InPackageContextParser(path, delegate)
+
+ @property
+ def dispatchable_table_parsers(
+ self,
+ ) -> Mapping[Type[DebputyDispatchableType], DispatchingTableParser[Any]]:
+ return self._table_parsers
+
+ @property
+ def dispatchable_object_parsers(self) -> Mapping[str, DispatchingObjectParser]:
+ return self._object_parsers
+
+ def dispatch_parser_table_for(
+ self, rule_type: TTP
+ ) -> Optional[DispatchingTableParser[TP]]:
+ return cast(
+ "Optional[DispatchingTableParser[TP]]", self._table_parsers.get(rule_type)
+ )
+
+ def generate_parser(
self,
parsed_content: Type[TD],
*,
@@ -768,7 +785,7 @@ class ParserGenerator:
... sources: List[str]
... into: List[str]
>>> pg = ParserGenerator()
- >>> simple_parser = pg.parser_from_typed_dict(InstallDocsRule)
+ >>> simple_parser = pg.generate_parser(InstallDocsRule)
This will create a parser that would be able to interpret something like:
@@ -789,7 +806,7 @@ class ParserGenerator:
... sources: NotRequired[List[str]]
... into: Union[str, List[str]]
>>> pg = ParserGenerator()
- >>> flexible_parser = pg.parser_from_typed_dict(
+ >>> flexible_parser = pg.generate_parser(
... InstallDocsRule,
... source_content=InputDocsRuleInputFormat,
... )
@@ -826,7 +843,7 @@ class ParserGenerator:
... List[str],
... ]
>>> pg = ParserGenerator()
- >>> flexible_parser = pg.parser_from_typed_dict(
+ >>> flexible_parser = pg.generate_parser(
... DiscardRule,
... source_content=DiscardRuleInputWithAltFormat,
... )
@@ -894,10 +911,28 @@ class ParserGenerator:
if get_origin(orig_parsed_content) == list:
parsed_content = get_args(orig_parsed_content)[0]
is_list_wrapped = True
+
+ if isinstance(parsed_content, type) and issubclass(
+ parsed_content, DebputyDispatchableType
+ ):
+ parser = self.dispatch_parser_table_for(parsed_content)
+ if parser is None:
+ raise ValueError(
+ f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
+ f" The class {parsed_content.__qualname__} is not a pre-registered type."
+ )
+ # FIXME: Only the list wrapped version has documentation.
+ if is_list_wrapped:
+ parser = ListWrappedDeclarativeInputParser(
+ parser,
+ inline_reference_documentation=inline_reference_documentation,
+ )
+ return parser
+
if not is_typeddict(parsed_content):
raise ValueError(
f"Unsupported parsed_content descriptor: {parsed_content.__qualname__}."
- ' Only "TypedDict"-based types supported.'
+ ' Only "TypedDict"-based types and a subset of "DebputyDispatchableType" are supported.'
)
if is_list_wrapped:
if get_origin(source_content) != list:
@@ -1060,17 +1095,12 @@ class ParserGenerator:
parsed_alt_form is not None,
)
if non_mapping_source_only:
- if is_list_wrapped:
- raise ValueError(
- f"Unsupported case: {non_mapping_source_only=} + {is_list_wrapped=}"
- " (TODO: Look whether it is feasible)"
- )
- return DeclarativeNonMappingInputParser(
+ parser = DeclarativeNonMappingInputParser(
assume_not_none(parsed_alt_form),
inline_reference_documentation=inline_reference_documentation,
)
else:
- return DeclarativeMappingInputParser(
+ parser = DeclarativeMappingInputParser(
_as_attr_names(source_typed_dict.__required_keys__),
_as_attr_names(all_parameters),
manifest_attributes,
@@ -1080,8 +1110,10 @@ class ParserGenerator:
at_least_one_of=at_least_one_of,
inline_reference_documentation=inline_reference_documentation,
path_hint_source_attributes=tuple(path_hint_source_attributes),
- is_list_wrapped=is_list_wrapped,
)
+ if is_list_wrapped:
+ parser = ListWrappedDeclarativeInputParser(parser)
+ return parser
def _as_type_validator(
self,
@@ -1192,7 +1224,7 @@ class ParserGenerator:
mapper=type_normalizer,
).combine_mapper(list_mapper)
if is_typeddict(provided_type):
- subparser = self.parser_from_typed_dict(cast("Type[TD]", provided_type))
+ subparser = self.generate_parser(cast("Type[TD]", provided_type))
return AttributeTypeHandler(
description=f"{provided_type.__name__} (Typed Mapping)",
ensure_type=lambda v, ap: None,
@@ -1430,7 +1462,7 @@ class ParserGenerator:
)
parsed_annotations = DetectedDebputyParseHint.parse_annotations(
anno,
- f" The alternative for source_format.",
+ " The alternative for source_format.",
None,
False,
default_target_attribute=default_target_attribute,
@@ -1578,7 +1610,7 @@ class ParserGenerator:
if target_orig == list and target_args:
mapped = self._registered_types.get(target_args[0])
if mapped is not None:
- # mypy is dense and forgots `mapped` cannot be optional in the comprehensions.
+ # mypy is dense and forgot `mapped` cannot be optional in the comprehensions.
mapped_type: TypeMapping = mapped
if input_type == mapped.source_type:
# Source -> List[Target]
@@ -1824,7 +1856,7 @@ def _dispatch_parse_generator(
):
assert parser_context is not None
dispatching_parser = parser_context.dispatch_parser_table_for(dispatch_type)
- return dispatching_parser.parse(
+ return dispatching_parser.parse_input(
value, attribute_path, parser_context=parser_context
)
diff --git a/src/debputy/manifest_parser/parser_doc.py b/src/debputy/manifest_parser/parser_doc.py
new file mode 100644
index 0000000..7046d7b
--- /dev/null
+++ b/src/debputy/manifest_parser/parser_doc.py
@@ -0,0 +1,273 @@
+import itertools
+from typing import Optional, Iterable, Any, Tuple, Mapping, Sequence, FrozenSet
+
+from debputy import DEBPUTY_DOC_ROOT_DIR
+from debputy.manifest_parser.declarative_parser import (
+ DeclarativeMappingInputParser,
+ DeclarativeNonMappingInputParser,
+ AttributeDescription,
+)
+from debputy.plugin.api.impl_types import (
+ DebputyPluginMetadata,
+ DeclarativeInputParser,
+ DispatchingObjectParser,
+ ListWrappedDeclarativeInputParser,
+ InPackageContextParser,
+)
+from debputy.plugin.api.spec import (
+ ParserDocumentation,
+ reference_documentation,
+ undocumented_attr,
+)
+from debputy.util import assume_not_none
+
+
+def _provide_placeholder_parser_doc(
+ parser_doc: Optional[ParserDocumentation],
+ attributes: Iterable[str],
+) -> ParserDocumentation:
+ if parser_doc is None:
+ parser_doc = reference_documentation()
+ changes = {}
+ if parser_doc.attribute_doc is None:
+ changes["attribute_doc"] = [undocumented_attr(attr) for attr in attributes]
+
+ if changes:
+ return parser_doc.replace(**changes)
+ return parser_doc
+
+
+def doc_args_for_parser_doc(
+ rule_name: str,
+ declarative_parser: DeclarativeInputParser[Any],
+ plugin_metadata: DebputyPluginMetadata,
+) -> Tuple[Mapping[str, str], ParserDocumentation]:
+ attributes: Iterable[str]
+ if isinstance(declarative_parser, DeclarativeMappingInputParser):
+ attributes = declarative_parser.source_attributes.keys()
+ else:
+ attributes = []
+ doc_args = {
+ "RULE_NAME": rule_name,
+ "MANIFEST_FORMAT_DOC": f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md",
+ "PLUGIN_NAME": plugin_metadata.plugin_name,
+ }
+ parser_doc = _provide_placeholder_parser_doc(
+ declarative_parser.inline_reference_documentation,
+ attributes,
+ )
+ return doc_args, parser_doc
+
+
+def render_attribute_doc(
+ parser: Any,
+ attributes: Mapping[str, "AttributeDescription"],
+ required_attributes: FrozenSet[str],
+ conditionally_required_attributes: FrozenSet[FrozenSet[str]],
+ parser_doc: ParserDocumentation,
+ doc_args: Mapping[str, str],
+ *,
+ rule_name: str = "<unset>",
+ is_root_rule: bool = False,
+ is_interactive: bool = False,
+) -> Iterable[Tuple[FrozenSet[str], Sequence[str]]]:
+ provided_attribute_docs = (
+ parser_doc.attribute_doc if parser_doc.attribute_doc is not None else []
+ )
+
+ for attr_doc in assume_not_none(provided_attribute_docs):
+ attr_description = attr_doc.description
+ rendered_doc = []
+
+ for parameter in sorted(attr_doc.attributes):
+ parameter_details = attributes.get(parameter)
+ if parameter_details is not None:
+ source_name = parameter_details.source_attribute_name
+ describe_type = parameter_details.type_validator.describe_type()
+ else:
+ assert isinstance(parser, DispatchingObjectParser)
+ source_name = parameter
+ subparser = parser.parser_for(source_name).parser
+ if isinstance(subparser, InPackageContextParser):
+ if is_interactive:
+ describe_type = "PackageContext"
+ else:
+ rule_prefix = rule_name if not is_root_rule else ""
+ describe_type = f"PackageContext (chains to `{rule_prefix}::{subparser.manifest_attribute_path_template}`)"
+
+ elif isinstance(subparser, DispatchingObjectParser):
+ if is_interactive:
+ describe_type = "Object"
+ else:
+ rule_prefix = rule_name if not is_root_rule else ""
+ describe_type = f"Object (see `{rule_prefix}::{subparser.manifest_attribute_path_template}`)"
+ elif isinstance(subparser, DeclarativeMappingInputParser):
+ describe_type = "<Type definition not implemented yet>" # TODO: Derive from subparser
+ elif isinstance(subparser, DeclarativeNonMappingInputParser):
+ describe_type = (
+ subparser.alt_form_parser.type_validator.describe_type()
+ )
+ else:
+ describe_type = f"<Unknown: Non-introspectable subparser - {subparser.__class__.__name__}>"
+
+ if source_name in required_attributes:
+ req_str = "required"
+ elif any(source_name in s for s in conditionally_required_attributes):
+ req_str = "conditional"
+ else:
+ req_str = "optional"
+ rendered_doc.append(f"`{source_name}` ({req_str}): {describe_type}")
+
+ if attr_description:
+ rendered_doc.append("")
+ rendered_doc.extend(
+ line
+ for line in attr_description.format(**doc_args).splitlines(
+ keepends=False
+ )
+ )
+ rendered_doc.append("")
+ yield attr_doc.attributes, rendered_doc
+
+
+def render_rule(
+ rule_name: str,
+ declarative_parser: DeclarativeInputParser[Any],
+ plugin_metadata: DebputyPluginMetadata,
+ *,
+ is_root_rule: bool = False,
+) -> str:
+ doc_args, parser_doc = doc_args_for_parser_doc(
+ "the manifest root" if is_root_rule else rule_name,
+ declarative_parser,
+ plugin_metadata,
+ )
+ t = assume_not_none(parser_doc.title).format(**doc_args)
+ r = [
+ t,
+ "=" * len(t),
+ "",
+ assume_not_none(parser_doc.description).format(**doc_args).rstrip(),
+ "",
+ ]
+
+ alt_form_parser = getattr(declarative_parser, "alt_form_parser", None)
+ is_list_wrapped = False
+ unwrapped_parser = declarative_parser
+ if isinstance(declarative_parser, ListWrappedDeclarativeInputParser):
+ is_list_wrapped = True
+ unwrapped_parser = declarative_parser.delegate
+
+ if isinstance(
+ unwrapped_parser, (DeclarativeMappingInputParser, DispatchingObjectParser)
+ ):
+
+ if isinstance(unwrapped_parser, DeclarativeMappingInputParser):
+ attributes = unwrapped_parser.source_attributes
+ required = unwrapped_parser.input_time_required_parameters
+ conditionally_required = unwrapped_parser.at_least_one_of
+ mutually_exclusive = unwrapped_parser.mutually_exclusive_attributes
+ else:
+ attributes = {}
+ required = frozenset()
+ conditionally_required = frozenset()
+ mutually_exclusive = frozenset()
+ if is_list_wrapped:
+ r.append("List where each element has the following attributes:")
+ else:
+ r.append("Attributes:")
+
+ rendered_attr_doc = render_attribute_doc(
+ unwrapped_parser,
+ attributes,
+ required,
+ conditionally_required,
+ parser_doc,
+ doc_args,
+ is_root_rule=is_root_rule,
+ rule_name=rule_name,
+ is_interactive=False,
+ )
+ for _, rendered_doc in rendered_attr_doc:
+ prefix = " - "
+ for line in rendered_doc:
+ if line:
+ r.append(f"{prefix}{line}")
+ else:
+ r.append("")
+ prefix = " "
+
+ if (
+ bool(conditionally_required)
+ or bool(mutually_exclusive)
+ or any(pd.conflicting_attributes for pd in attributes.values())
+ ):
+ r.append("")
+ if is_list_wrapped:
+ r.append(
+ "This rule enforces the following restrictions on each element in the list:"
+ )
+ else:
+ r.append("This rule enforces the following restrictions:")
+
+ if conditionally_required or mutually_exclusive:
+ all_groups = set(
+ itertools.chain(conditionally_required, mutually_exclusive)
+ )
+ for g in all_groups:
+ anames = "`, `".join(g)
+ is_mx = g in mutually_exclusive
+ is_cr = g in conditionally_required
+ if is_mx and is_cr:
+ r.append(f" - The rule must use exactly one of: `{anames}`")
+ elif is_cr:
+ r.append(f" - The rule must use at least one of: `{anames}`")
+ else:
+ assert is_mx
+ r.append(
+ f" - The following attributes are mutually exclusive: `{anames}`"
+ )
+
+ if mutually_exclusive or any(
+ pd.conflicting_attributes for pd in attributes.values()
+ ):
+ for parameter, parameter_details in sorted(attributes.items()):
+ source_name = parameter_details.source_attribute_name
+ conflicts = set(parameter_details.conflicting_attributes)
+ for mx in mutually_exclusive:
+ if parameter in mx and mx not in conditionally_required:
+ conflicts |= mx
+ if conflicts:
+ conflicts.discard(parameter)
+ cnames = "`, `".join(
+ attributes[a].source_attribute_name for a in conflicts
+ )
+ r.append(
+ f" - The attribute `{source_name}` cannot be used with any of: `{cnames}`"
+ )
+ r.append("")
+ if alt_form_parser is not None:
+ # FIXME: Mapping[str, Any] ends here, which is ironic given the headline.
+ r.append(
+ f"Non-mapping format: {alt_form_parser.type_validator.describe_type()}"
+ )
+ alt_parser_desc = parser_doc.alt_parser_description
+ if alt_parser_desc:
+ r.extend(
+ f" {line}"
+ for line in alt_parser_desc.format(**doc_args).splitlines(
+ keepends=False
+ )
+ )
+ r.append("")
+
+ if declarative_parser.reference_documentation_url is not None:
+ r.append(
+ f"Reference documentation: {declarative_parser.reference_documentation_url}"
+ )
+ else:
+ r.append(
+ "Reference documentation: No reference documentation link provided by the plugin"
+ )
+
+ return "\n".join(r)
diff --git a/src/debputy/manifest_parser/util.py b/src/debputy/manifest_parser/util.py
index 1600a90..ad214e2 100644
--- a/src/debputy/manifest_parser/util.py
+++ b/src/debputy/manifest_parser/util.py
@@ -13,6 +13,7 @@ from typing import (
Type,
TypeVar,
TYPE_CHECKING,
+ Iterable,
)
if TYPE_CHECKING:
@@ -61,6 +62,11 @@ class AttributePath(object):
p.path_hint = path_hint
return p
+ def path_segments(self) -> Iterable[Union[str, int]]:
+ segments = list(self._iter_path())
+ segments.reverse()
+ yield from (s.name for s in segments)
+
@property
def path(self) -> str:
segments = list(self._iter_path())
diff --git a/src/debputy/plugin/api/feature_set.py b/src/debputy/plugin/api/feature_set.py
index 6552361..a56f37b 100644
--- a/src/debputy/plugin/api/feature_set.py
+++ b/src/debputy/plugin/api/feature_set.py
@@ -1,7 +1,10 @@
import dataclasses
+import textwrap
from typing import Dict, List, Tuple, Sequence, Any
+from debputy import DEBPUTY_DOC_ROOT_DIR
from debputy.manifest_parser.declarative_parser import ParserGenerator
+from debputy.plugin.api import reference_documentation
from debputy.plugin.api.impl_types import (
DebputyPluginMetadata,
PackagerProvidedFileClassSpec,
@@ -18,9 +21,23 @@ from debputy.plugin.api.impl_types import (
ServiceManagerDetails,
PluginProvidedKnownPackagingFile,
PluginProvidedTypeMapping,
+ OPARSER_PACKAGES,
+ OPARSER_PACKAGES_ROOT,
)
+def _initialize_parser_generator() -> ParserGenerator:
+ pg = ParserGenerator()
+
+ for path, ref_doc in SUPPORTED_DISPATCHABLE_OBJECT_PARSERS.items():
+ pg.add_object_parser(path, parser_documentation=ref_doc)
+
+ for rt, path in SUPPORTED_DISPATCHABLE_TABLE_PARSERS.items():
+ pg.add_table_parser(rt, path)
+
+ return pg
+
+
@dataclasses.dataclass(slots=True)
class PluginProvidedFeatureSet:
plugin_data: Dict[str, DebputyPluginMetadata] = dataclasses.field(
@@ -32,22 +49,6 @@ class PluginProvidedFeatureSet:
metadata_maintscript_detectors: Dict[str, List[MetadataOrMaintscriptDetector]] = (
dataclasses.field(default_factory=dict)
)
- dispatchable_table_parsers: Dict[TTP, "DispatchingTableParser[TP]"] = (
- dataclasses.field(
- default_factory=lambda: {
- rt: DispatchingTableParser(rt, path)
- for rt, path in SUPPORTED_DISPATCHABLE_TABLE_PARSERS.items()
- }
- )
- )
- dispatchable_object_parsers: Dict[str, "DispatchingObjectParser"] = (
- dataclasses.field(
- default_factory=lambda: {
- path: DispatchingObjectParser(path, parser_documentation=ref_doc)
- for path, ref_doc in SUPPORTED_DISPATCHABLE_OBJECT_PARSERS.items()
- }
- )
- )
manifest_variables: Dict[str, PluginProvidedManifestVariable] = dataclasses.field(
default_factory=dict
)
@@ -67,7 +68,7 @@ class PluginProvidedFeatureSet:
default_factory=dict
)
manifest_parser_generator: ParserGenerator = dataclasses.field(
- default_factory=ParserGenerator
+ default_factory=_initialize_parser_generator
)
def package_processors_in_order(self) -> Sequence[PluginProvidedPackageProcessor]:
diff --git a/src/debputy/plugin/api/impl.py b/src/debputy/plugin/api/impl.py
index 3c9da60..64a1ca8 100644
--- a/src/debputy/plugin/api/impl.py
+++ b/src/debputy/plugin/api/impl.py
@@ -762,14 +762,15 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer):
inline_reference_documentation: Optional[ParserDocumentation] = None,
) -> None:
self._restricted_api()
- if rule_type not in self._feature_set.dispatchable_table_parsers:
+ parser_generator = self._feature_set.manifest_parser_generator
+ if rule_type not in parser_generator.dispatchable_table_parsers:
types = ", ".join(
- sorted(x.__name__ for x in self._feature_set.dispatchable_table_parsers)
+ sorted(x.__name__ for x in parser_generator.dispatchable_table_parsers)
)
raise ValueError(
f"The rule_type was not a supported type. It must be one of {types}"
)
- dispatching_parser = self._feature_set.dispatchable_table_parsers[rule_type]
+ dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
dispatching_parser.register_keyword(
rule_name,
handler,
@@ -796,11 +797,14 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer):
None,
]
] = None,
+ nested_in_package_context: bool = False,
) -> None:
self._restricted_api()
if object_parser_key is None:
object_parser_key = rule_name
- dispatchable_object_parsers = self._feature_set.dispatchable_object_parsers
+
+ parser_generator = self._feature_set.manifest_parser_generator
+ dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
if rule_type not in dispatchable_object_parsers:
types = ", ".join(sorted(dispatchable_object_parsers))
raise ValueError(
@@ -818,6 +822,7 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer):
child_dispatcher,
self._plugin_metadata,
on_end_parse_step=on_end_parse_step,
+ nested_in_package_context=nested_in_package_context,
)
def _unload() -> None:
@@ -839,24 +844,27 @@ class DebputyPluginInitializerProvider(DebputyPluginInitializer):
) -> None:
self._restricted_api()
feature_set = self._feature_set
+ parser_generator = feature_set.manifest_parser_generator
if isinstance(rule_type, str):
- if rule_type not in feature_set.dispatchable_object_parsers:
- types = ", ".join(sorted(feature_set.dispatchable_object_parsers))
+ if rule_type not in parser_generator.dispatchable_object_parsers:
+ types = ", ".join(sorted(parser_generator.dispatchable_object_parsers))
raise ValueError(
f"The rule_type was not a supported type. It must be one of {types}"
)
- dispatching_parser = feature_set.dispatchable_object_parsers[rule_type]
+ dispatching_parser = parser_generator.dispatchable_object_parsers[rule_type]
else:
- if rule_type not in feature_set.dispatchable_table_parsers:
+ if rule_type not in parser_generator.dispatchable_table_parsers:
types = ", ".join(
- sorted(x.__name__ for x in feature_set.dispatchable_table_parsers)
+ sorted(
+ x.__name__ for x in parser_generator.dispatchable_table_parsers
+ )
)
raise ValueError(
f"The rule_type was not a supported type. It must be one of {types}"
)
- dispatching_parser = feature_set.dispatchable_table_parsers[rule_type]
+ dispatching_parser = parser_generator.dispatchable_table_parsers[rule_type]
- parser = feature_set.manifest_parser_generator.parser_from_typed_dict(
+ parser = feature_set.manifest_parser_generator.generate_parser(
parsed_format,
source_content=source_format,
inline_reference_documentation=inline_reference_documentation,
diff --git a/src/debputy/plugin/api/impl_types.py b/src/debputy/plugin/api/impl_types.py
index 76579fb..5aca980 100644
--- a/src/debputy/plugin/api/impl_types.py
+++ b/src/debputy/plugin/api/impl_types.py
@@ -291,6 +291,63 @@ class DeclarativeInputParser(Generic[TD]):
raise NotImplementedError
+class DelegatingDeclarativeInputParser(DeclarativeInputParser[TD]):
+ __slots__ = ("delegate", "_reference_documentation")
+
+ def __init__(
+ self,
+ delegate: DeclarativeInputParser[TD],
+ *,
+ inline_reference_documentation: Optional[ParserDocumentation] = None,
+ ) -> None:
+ self.delegate = delegate
+ self._reference_documentation = inline_reference_documentation
+
+ @property
+ def inline_reference_documentation(self) -> Optional[ParserDocumentation]:
+ doc = self._reference_documentation
+ if doc is None:
+ return self.delegate.inline_reference_documentation
+ return doc
+
+
+class ListWrappedDeclarativeInputParser(DelegatingDeclarativeInputParser[TD]):
+ __slots__ = ()
+
+ def _doc_url_error_suffix(self, *, see_url_version: bool = False) -> str:
+ doc_url = self.reference_documentation_url
+ if doc_url is not None:
+ if see_url_version:
+ return f" Please see {doc_url} for the documentation."
+ return f" (Documentation: {doc_url})"
+ return ""
+
+ def parse_input(
+ self,
+ value: object,
+ path: "AttributePath",
+ *,
+ parser_context: Optional["ParserContextData"] = None,
+ ) -> TD:
+ if not isinstance(value, list):
+ doc_ref = self._doc_url_error_suffix(see_url_version=True)
+ raise ManifestParseException(
+ f"The attribute {path.path} must be a list.{doc_ref}"
+ )
+ result = []
+ delegate = self.delegate
+ for idx, element in enumerate(value):
+ element_path = path[idx]
+ result.append(
+ delegate.parse_input(
+ element,
+ element_path,
+ parser_context=parser_context,
+ )
+ )
+ return result
+
+
class DispatchingParserBase(Generic[TP]):
def __init__(self, manifest_attribute_path_template: str) -> None:
self.manifest_attribute_path_template = manifest_attribute_path_template
@@ -385,7 +442,7 @@ class DispatchingParserBase(Generic[TP]):
def _new_parser(self, keyword: str, ppp: "PluginProvidedParser[PF, TP]") -> None:
self._parsers[keyword] = ppp
- def parse(
+ def parse_input(
self,
orig_value: object,
attribute_path: "AttributePath",
@@ -447,6 +504,7 @@ class DispatchingObjectParser(
None,
]
] = None,
+ nested_in_package_context: bool = False,
) -> None:
def _handler(
name: str,
@@ -457,6 +515,12 @@ class DispatchingObjectParser(
on_end_parse_step(name, value, path, parser_context)
return value
+ if nested_in_package_context:
+ parser = InPackageContextParser(
+ keyword,
+ parser,
+ )
+
p = PluginProvidedParser(
parser,
_handler,
@@ -464,18 +528,8 @@ class DispatchingObjectParser(
)
self._add_parser(keyword, p)
- # FIXME: Agree on naming (parse vs. parse_input)
def parse_input(
self,
- value: object,
- path: "AttributePath",
- *,
- parser_context: Optional["ParserContextData"] = None,
- ) -> TD:
- return self.parse(value, path, parser_context=parser_context)
-
- def parse(
- self,
orig_value: object,
attribute_path: "AttributePath",
*,
@@ -534,12 +588,80 @@ class DispatchingObjectParser(
return result
-class DispatchingTableParser(DispatchingParserBase[TP]):
+@dataclasses.dataclass(slots=True, frozen=True)
+class PackageContextData(Generic[TP]):
+ resolved_package_name: str
+ value: TP
+
+
+class InPackageContextParser(
+ DelegatingDeclarativeInputParser[Mapping[str, PackageContextData[TP]]]
+):
+ def __init__(
+ self,
+ manifest_attribute_path_template: str,
+ delegate: DeclarativeInputParser[TP],
+ *,
+ parser_documentation: Optional[ParserDocumentation] = None,
+ ) -> None:
+ self.manifest_attribute_path_template = manifest_attribute_path_template
+ self._attribute_documentation: List[ParserAttributeDocumentation] = []
+ super().__init__(delegate, inline_reference_documentation=parser_documentation)
+
+ def parse_input(
+ self,
+ orig_value: object,
+ attribute_path: "AttributePath",
+ *,
+ parser_context: Optional["ParserContextData"] = None,
+ ) -> TP:
+ assert parser_context is not None
+ doc_ref = ""
+ if self.reference_documentation_url is not None:
+ doc_ref = (
+ f" Please see {self.reference_documentation_url} for the documentation."
+ )
+ if not isinstance(orig_value, dict) or not orig_value:
+ raise ManifestParseException(
+ f"The attribute {attribute_path.path} must be a non-empty mapping.{doc_ref}"
+ )
+ delegate = self.delegate
+ result = {}
+ for package_name_raw, value in orig_value.items():
+
+ definition_source = attribute_path[package_name_raw]
+ package_name = package_name_raw
+ if "{{" in package_name:
+ package_name = parser_context.substitution.substitute(
+ package_name_raw,
+ definition_source.path,
+ )
+ package_state: PackageTransformationDefinition
+ with parser_context.binary_package_context(package_name) as package_state:
+ if package_state.is_auto_generated_package:
+ # Maybe lift (part) of this restriction.
+ raise ManifestParseException(
+ f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an'
+ " auto-generated package."
+ )
+ parsed_value = delegate.parse_input(
+ value, definition_source, parser_context=parser_context
+ )
+ result[package_name_raw] = PackageContextData(
+ package_name, parsed_value
+ )
+ return result
+
+
+class DispatchingTableParser(
+ DispatchingParserBase[TP],
+ DeclarativeInputParser[TP],
+):
def __init__(self, base_type: TTP, manifest_attribute_path_template: str) -> None:
super().__init__(manifest_attribute_path_template)
self.base_type = base_type
- def parse(
+ def parse_input(
self,
orig_value: object,
attribute_path: "AttributePath",
@@ -608,6 +730,7 @@ SUPPORTED_DISPATCHABLE_TABLE_PARSERS = {
}
OPARSER_MANIFEST_ROOT = "<ROOT>"
+OPARSER_PACKAGES_ROOT = "packages"
OPARSER_PACKAGES = "packages.{{PACKAGE}}"
OPARSER_MANIFEST_DEFINITIONS = "definitions"
diff --git a/src/debputy/plugin/api/plugin_parser.py b/src/debputy/plugin/api/plugin_parser.py
index ad2489f..dd5c0d0 100644
--- a/src/debputy/plugin/api/plugin_parser.py
+++ b/src/debputy/plugin/api/plugin_parser.py
@@ -52,15 +52,15 @@ def _initialize_plugin_metadata_parser_generator() -> ParserGenerator:
PLUGIN_METADATA_PARSER_GENERATOR = _initialize_plugin_metadata_parser_generator()
-PLUGIN_METADATA_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict(
+PLUGIN_METADATA_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser(
PluginJsonMetadata
)
-PLUGIN_PPF_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict(
+PLUGIN_PPF_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser(
PackagerProvidedFileJsonDescription
)
-PLUGIN_MANIFEST_VARS_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict(
+PLUGIN_MANIFEST_VARS_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser(
ManifestVariableJsonDescription
)
-PLUGIN_KNOWN_PACKAGING_FILES_PARSER = (
- PLUGIN_METADATA_PARSER_GENERATOR.parser_from_typed_dict(KnownPackagingFileInfo)
+PLUGIN_KNOWN_PACKAGING_FILES_PARSER = PLUGIN_METADATA_PARSER_GENERATOR.generate_parser(
+ KnownPackagingFileInfo
)
diff --git a/src/debputy/plugin/debputy/binary_package_rules.py b/src/debputy/plugin/debputy/binary_package_rules.py
index 4753c79..14d9b91 100644
--- a/src/debputy/plugin/debputy/binary_package_rules.py
+++ b/src/debputy/plugin/debputy/binary_package_rules.py
@@ -109,11 +109,10 @@ def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None
api.pluggable_manifest_rule(
OPARSER_PACKAGES,
"transformations",
- ListOfTransformationRulesFormat,
+ List[TransformationRule],
_unpack_list,
- source_format=List[TransformationRule],
inline_reference_documentation=reference_documentation(
- title="Transformations (`packages.{{PACKAGE}}.transformations`)",
+ title="Transformations (`transformations`)",
description=textwrap.dedent(
"""\
You can define a `transformations` under the package definition, which is a list a transformation
@@ -140,16 +139,15 @@ def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None
overlap or conflict.
"""
),
- reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md#transformations-packagespackagetransformations",
+ reference_documentation_url=f"{DEBPUTY_DOC_ROOT_DIR}/MANIFEST-FORMAT.md#transformations-transformations",
),
)
api.pluggable_manifest_rule(
OPARSER_PACKAGES,
"conffile-management",
- ListOfDpkgMaintscriptHelperCommandFormat,
+ List[DpkgMaintscriptHelperCommand],
_unpack_list,
- source_format=List[DpkgMaintscriptHelperCommand],
)
api.pluggable_manifest_rule(
@@ -520,11 +518,11 @@ def _process_service_rules(
def _unpack_list(
_name: str,
- parsed_data: ListParsedFormat,
+ parsed_data: List[Any],
_attribute_path: AttributePath,
_parser_context: ParserContextData,
) -> List[Any]:
- return parsed_data["elements"]
+ return parsed_data
class CleanAfterRemovalRuleSourceFormat(TypedDict):
@@ -544,7 +542,7 @@ class CleanAfterRemovalRule(DebputyParsedContent):
# FIXME: Not optimal that we are doing an initialization of ParserGenerator here. But the rule is not depending on any
# complex types that is registered by plugins, so it will work for now.
-_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().parser_from_typed_dict(
+_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().generate_parser(
CleanAfterRemovalRule,
source_content=Union[CleanAfterRemovalRuleSourceFormat, str, List[str]],
inline_reference_documentation=reference_documentation(
diff --git a/src/debputy/plugin/debputy/manifest_root_rules.py b/src/debputy/plugin/debputy/manifest_root_rules.py
index 86a1c27..ca8cf1e 100644
--- a/src/debputy/plugin/debputy/manifest_root_rules.py
+++ b/src/debputy/plugin/debputy/manifest_root_rules.py
@@ -108,9 +108,8 @@ def register_manifest_root_rules(api: DebputyPluginInitializerProvider) -> None:
api.pluggable_manifest_rule(
OPARSER_MANIFEST_ROOT,
MK_INSTALLATIONS,
- ListOfInstallRulesFormat,
+ List[InstallRule],
_handle_installation_rules,
- source_format=List[InstallRule],
inline_reference_documentation=reference_documentation(
title="Installations",
description=textwrap.dedent(
@@ -156,15 +155,12 @@ def register_manifest_root_rules(api: DebputyPluginInitializerProvider) -> None:
),
),
)
- api.pluggable_manifest_rule(
+ api.pluggable_object_parser(
OPARSER_MANIFEST_ROOT,
MK_PACKAGES,
- DictFormat,
- _handle_opaque_dict,
- source_format=Dict[str, Any],
- inline_reference_documentation=SUPPORTED_DISPATCHABLE_OBJECT_PARSERS[
- OPARSER_PACKAGES
- ],
+ object_parser_key=OPARSER_PACKAGES,
+ on_end_parse_step=lambda _a, _b, _c, mp: mp._ensure_package_states_is_initialized(),
+ nested_in_package_context=True,
)
@@ -238,11 +234,11 @@ def _handle_manifest_variables(
def _handle_installation_rules(
_name: str,
- parsed_data: ListOfInstallRulesFormat,
+ parsed_data: List[InstallRule],
_attribute_path: AttributePath,
_parser_context: ParserContextData,
) -> List[Any]:
- return parsed_data["elements"]
+ return parsed_data
def _handle_opaque_dict(
diff --git a/src/debputy/plugin/debputy/private_api.py b/src/debputy/plugin/debputy/private_api.py
index b9aa043..3b5087b 100644
--- a/src/debputy/plugin/debputy/private_api.py
+++ b/src/debputy/plugin/debputy/private_api.py
@@ -921,7 +921,7 @@ def register_install_rules(api: DebputyPluginInitializerProvider) -> None:
as-is. When a directory is matched, then the directory is installed along
with all the contents that have not already been installed somewhere.
- - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a
+ - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a
directory for `install-examples` will give you an `examples/examples`
directory in the package, which is rarely what you want. Often, you
can solve this by using `examples/*` instead. Similar for `install-docs`
@@ -1023,7 +1023,7 @@ def register_install_rules(api: DebputyPluginInitializerProvider) -> None:
as-is. When a directory is matched, then the directory is installed along
with all the contents that have not already been installed somewhere.
- - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a
+ - **CAVEAT**: Specifying `source: examples` where `examples` resolves to a
directory for `install-examples` will give you an `examples/examples`
directory in the package, which is rarely what you want. Often, you
can solve this by using `examples/*` instead. Similar for `install-docs`
diff --git a/src/debputy/yaml/__init__.py b/src/debputy/yaml/__init__.py
new file mode 100644
index 0000000..325dff5
--- /dev/null
+++ b/src/debputy/yaml/__init__.py
@@ -0,0 +1,9 @@
+from .compat import YAML, YAMLError, MarkedYAMLError
+
+MANIFEST_YAML = YAML()
+
+__all__ = [
+ "MANIFEST_YAML",
+ "YAMLError",
+ "MarkedYAMLError",
+]
diff --git a/src/debputy/yaml/compat.py b/src/debputy/yaml/compat.py
new file mode 100644
index 0000000..f26af02
--- /dev/null
+++ b/src/debputy/yaml/compat.py
@@ -0,0 +1,19 @@
+__all__ = [
+ "YAML",
+ "YAMLError",
+ "MarkedYAMLError",
+ "Node",
+ "LineCol",
+ "CommentedBase",
+ "CommentedMap",
+ "CommentedSeq",
+]
+
+try:
+ from ruyaml import YAMLError, YAML, Node
+ from ruyaml.comments import LineCol, CommentedBase, CommentedMap, CommentedSeq
+ from ruyaml.error import MarkedYAMLError
+except (ImportError, ModuleNotFoundError):
+ from ruamel.yaml import YAMLError, YAML, Node
+ from ruamel.yaml.comments import LineCol, CommentedBase, CommentedMap, CommentedSeq
+ from ruamel.yaml.error import MarkedYAMLError