summaryrefslogtreecommitdiffstats
path: root/src/debputy/lsp/lsp_features.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/debputy/lsp/lsp_features.py')
-rw-r--r--src/debputy/lsp/lsp_features.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/src/debputy/lsp/lsp_features.py b/src/debputy/lsp/lsp_features.py
new file mode 100644
index 0000000..b417dd3
--- /dev/null
+++ b/src/debputy/lsp/lsp_features.py
@@ -0,0 +1,196 @@
+import collections
+import inspect
+from typing import Callable, TypeVar, Sequence, Union, Dict, List, Optional
+
+from lsprotocol.types import (
+ TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL,
+ TEXT_DOCUMENT_CODE_ACTION,
+ DidChangeTextDocumentParams,
+ Diagnostic,
+ DidOpenTextDocumentParams,
+)
+
+try:
+ from pygls.server import LanguageServer
+except ImportError:
+ pass
+
+from debputy.linting.lint_util import LinterImpl
+from debputy.lsp.quickfixes import provide_standard_quickfixes_from_diagnostics
+from debputy.lsp.text_util import on_save_trim_end_of_line_whitespace
+
+C = TypeVar("C", bound=Callable)
+
+
+DIAGNOSTIC_HANDLERS = {}
+COMPLETER_HANDLERS = {}
+HOVER_HANDLERS = {}
+CODE_ACTION_HANDLERS = {}
+WILL_SAVE_WAIT_UNTIL_HANDLERS = {}
+_ALIAS_OF = {}
+
+_STANDARD_HANDLERS = {
+ TEXT_DOCUMENT_CODE_ACTION: (
+ CODE_ACTION_HANDLERS,
+ provide_standard_quickfixes_from_diagnostics,
+ ),
+ TEXT_DOCUMENT_WILL_SAVE_WAIT_UNTIL: (
+ WILL_SAVE_WAIT_UNTIL_HANDLERS,
+ on_save_trim_end_of_line_whitespace,
+ ),
+}
+
+
+def lint_diagnostics(
+ file_formats: Union[str, Sequence[str]]
+) -> Callable[[LinterImpl], LinterImpl]:
+
+ def _wrapper(func: C) -> C:
+ if not inspect.iscoroutinefunction(func):
+
+ async def _lint_wrapper(
+ ls: "LanguageServer",
+ params: Union[
+ DidOpenTextDocumentParams,
+ DidChangeTextDocumentParams,
+ ],
+ ) -> Optional[List[Diagnostic]]:
+ doc = ls.workspace.get_text_document(params.text_document.uri)
+ yield func(
+ doc.uri,
+ doc.path,
+ doc.lines,
+ doc.position_codec,
+ )
+
+ else:
+ raise ValueError("Linters are all non-async at the moment")
+
+ for file_format in file_formats:
+ if file_format in DIAGNOSTIC_HANDLERS:
+ raise AssertionError(
+ "There is already a diagnostics handler for " + file_format
+ )
+ DIAGNOSTIC_HANDLERS[file_format] = _lint_wrapper
+
+ return func
+
+ return _wrapper
+
+
+def lsp_diagnostics(file_formats: Union[str, Sequence[str]]) -> Callable[[C], C]:
+
+ def _wrapper(func: C) -> C:
+
+ if not inspect.iscoroutinefunction(func):
+
+ async def _linter(*args, **kwargs) -> None:
+ res = func(*args, **kwargs)
+ if inspect.isgenerator(res):
+ for r in res:
+ yield r
+ else:
+ yield res
+
+ else:
+
+ _linter = func
+
+ _register_handler(file_formats, DIAGNOSTIC_HANDLERS, _linter)
+
+ return func
+
+ return _wrapper
+
+
+def lsp_completer(file_formats: Union[str, Sequence[str]]) -> Callable[[C], C]:
+ return _registering_wrapper(file_formats, COMPLETER_HANDLERS)
+
+
+def lsp_hover(file_formats: Union[str, Sequence[str]]) -> Callable[[C], C]:
+ return _registering_wrapper(file_formats, HOVER_HANDLERS)
+
+
+def lsp_standard_handler(file_formats: Union[str, Sequence[str]], topic: str) -> None:
+ res = _STANDARD_HANDLERS.get(topic)
+ if res is None:
+ raise ValueError(f"No standard handler for {topic}")
+
+ table, handler = res
+
+ _register_handler(file_formats, table, handler)
+
+
+def _registering_wrapper(
+ file_formats: Union[str, Sequence[str]], handler_dict: Dict[str, C]
+) -> Callable[[C], C]:
+ def _wrapper(func: C) -> C:
+ _register_handler(file_formats, handler_dict, func)
+ return func
+
+ return _wrapper
+
+
+def _register_handler(
+ file_formats: Union[str, Sequence[str]],
+ handler_dict: Dict[str, C],
+ handler: C,
+) -> None:
+ if isinstance(file_formats, str):
+ file_formats = [file_formats]
+ else:
+ if not file_formats:
+ raise ValueError("At least one language ID (file format) must be provided")
+ main = file_formats[0]
+ for alias in file_formats[1:]:
+ if alias not in _ALIAS_OF:
+ _ALIAS_OF[alias] = main
+
+ for file_format in file_formats:
+ if file_format in handler_dict:
+ raise AssertionError(f"There is already a handler for {file_format}")
+
+ handler_dict[file_format] = handler
+
+
+def ensure_lsp_features_are_loaded() -> None:
+ # FIXME: This import is needed to force loading of the LSP files. But it only works
+ # for files with a linter (which currently happens to be all of them, but this is
+ # a bit fragile).
+ from debputy.linting.lint_impl import LINTER_FORMATS
+
+ assert LINTER_FORMATS
+
+
+def describe_lsp_features() -> None:
+
+ ensure_lsp_features_are_loaded()
+
+ feature_list = [
+ ("diagnostics (lint)", DIAGNOSTIC_HANDLERS),
+ ("code actions/quickfixes", CODE_ACTION_HANDLERS),
+ ("completion suggestions", COMPLETER_HANDLERS),
+ ("hover docs", HOVER_HANDLERS),
+ ("on-save handler", WILL_SAVE_WAIT_UNTIL_HANDLERS),
+ ]
+ print("LSP language IDs and their features:")
+ all_ids = sorted(set(lid for _, t in feature_list for lid in t))
+ for lang_id in all_ids:
+ if lang_id in _ALIAS_OF:
+ continue
+ features = [n for n, t in feature_list if lang_id in t]
+ print(f" * {lang_id}:")
+ for feature in features:
+ print(f" - {feature}")
+
+ aliases = collections.defaultdict(list)
+ for lang_id in all_ids:
+ main_lang = _ALIAS_OF.get(lang_id)
+ if main_lang is None:
+ continue
+ aliases[main_lang].append(lang_id)
+
+ print()
+ print("Aliases:")
+ for main_id, aliases in aliases.items():
+ print(f" * {main_id}: {', '.join(aliases)}")