Coverage for src/debputy/lsp/lsp_dispatch.py: 43%
82 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2024-04-07 12:14 +0200
1import asyncio
2import os.path
3from typing import (
4 Dict,
5 Sequence,
6 Union,
7 Optional,
8 TypeVar,
9 Callable,
10 Mapping,
11 List,
12 Tuple,
13)
15from lsprotocol.types import (
16 DidOpenTextDocumentParams,
17 DidChangeTextDocumentParams,
18 TEXT_DOCUMENT_DID_CHANGE,
19 TEXT_DOCUMENT_DID_OPEN,
20 TEXT_DOCUMENT_COMPLETION,
21 CompletionList,
22 CompletionItem,
23 CompletionParams,
24 TEXT_DOCUMENT_HOVER,
25 TEXT_DOCUMENT_FOLDING_RANGE,
26 FoldingRange,
27 FoldingRangeParams,
28 TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL,
29 SemanticTokensParams,
30 SemanticTokens,
31 Hover,
32 TEXT_DOCUMENT_CODE_ACTION,
33 Command,
34 CodeAction,
35 CodeActionParams,
36 SemanticTokensRegistrationOptions,
37)
39from debputy import __version__
40from debputy.lsp.lsp_features import (
41 DIAGNOSTIC_HANDLERS,
42 COMPLETER_HANDLERS,
43 HOVER_HANDLERS,
44 SEMANTIC_TOKENS_FULL_HANDLERS,
45 CODE_ACTION_HANDLERS,
46 SEMANTIC_TOKENS_LEGEND,
47)
48from debputy.util import _info
50_DOCUMENT_VERSION_TABLE: Dict[str, int] = {}
52try:
53 from pygls.server import LanguageServer
54 from pygls.workspace import TextDocument
55 from debputy.lsp.debputy_ls import DebputyLanguageServer
57 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}")
58except ImportError as e:
60 class Mock:
62 def feature(self, *args, **kwargs):
63 return lambda x: x
65 DEBPUTY_LANGUAGE_SERVER = Mock()
68P = TypeVar("P")
69R = TypeVar("R")
70L = TypeVar("L", "LanguageServer", "DebputyLanguageServer")
73def is_doc_at_version(uri: str, version: int) -> bool:
74 dv = _DOCUMENT_VERSION_TABLE.get(uri)
75 return dv == version
78def determine_language_id(doc: "TextDocument") -> Tuple[str, str]:
79 lang_id = doc.language_id
80 if lang_id and not lang_id.isspace():
81 return "declared", lang_id
82 path = doc.path
83 try:
84 last_idx = path.rindex("debian/")
85 except ValueError:
86 return "filename", os.path.basename(path)
87 guess_language_id = path[last_idx:]
88 return "filename", guess_language_id
91@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_OPEN)
92@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_CHANGE)
93async def _open_or_changed_document(
94 ls: "DebputyLanguageServer",
95 params: Union[DidOpenTextDocumentParams, DidChangeTextDocumentParams],
96) -> None:
97 version = params.text_document.version
98 doc_uri = params.text_document.uri
99 doc = ls.workspace.get_text_document(doc_uri)
101 _DOCUMENT_VERSION_TABLE[doc_uri] = version
102 id_source, language_id = determine_language_id(doc)
103 handler = DIAGNOSTIC_HANDLERS.get(language_id)
104 if handler is None:
105 _info(
106 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - no diagnostics handler"
107 )
108 return
109 _info(
110 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - running diagnostics for doc version {version}"
111 )
112 last_publish_count = -1
114 diagnostics_scanner = handler(ls, params)
115 async for diagnostics in diagnostics_scanner:
116 await asyncio.sleep(0)
117 if not is_doc_at_version(doc_uri, version):
118 # This basically happens with very edit, so lets not notify the client
119 # for that.
120 _info(
121 f"Cancel (obsolete) diagnostics for doc version {version}: document version changed"
122 )
123 break
124 if diagnostics is None or last_publish_count != len(diagnostics):
125 last_publish_count = len(diagnostics) if diagnostics is not None else 0
126 ls.publish_diagnostics(
127 doc.uri,
128 diagnostics,
129 )
132@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_COMPLETION)
133def _completions(
134 ls: "DebputyLanguageServer",
135 params: CompletionParams,
136) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]:
137 return _dispatch_standard_handler(
138 ls,
139 params.text_document.uri,
140 params,
141 COMPLETER_HANDLERS,
142 "Complete request",
143 )
146@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_HOVER)
147def _hover(
148 ls: "DebputyLanguageServer",
149 params: CompletionParams,
150) -> Optional[Hover]:
151 return _dispatch_standard_handler(
152 ls,
153 params.text_document.uri,
154 params,
155 HOVER_HANDLERS,
156 "Hover doc request",
157 )
160@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_CODE_ACTION)
161def _code_actions(
162 ls: "DebputyLanguageServer",
163 params: CodeActionParams,
164) -> Optional[List[Union[Command, CodeAction]]]:
165 return _dispatch_standard_handler(
166 ls,
167 params.text_document.uri,
168 params,
169 CODE_ACTION_HANDLERS,
170 "Code action request",
171 )
174@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_FOLDING_RANGE)
175def _folding_ranges(
176 ls: "DebputyLanguageServer",
177 params: FoldingRangeParams,
178) -> Optional[Sequence[FoldingRange]]:
179 return _dispatch_standard_handler(
180 ls,
181 params.text_document.uri,
182 params,
183 HOVER_HANDLERS,
184 "Folding range request",
185 )
188@DEBPUTY_LANGUAGE_SERVER.feature(
189 TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL,
190 SemanticTokensRegistrationOptions(
191 SEMANTIC_TOKENS_LEGEND,
192 full=True,
193 ),
194)
195def _semantic_tokens_full(
196 ls: "DebputyLanguageServer",
197 params: SemanticTokensParams,
198) -> Optional[SemanticTokens]:
199 return _dispatch_standard_handler(
200 ls,
201 params.text_document.uri,
202 params,
203 SEMANTIC_TOKENS_FULL_HANDLERS,
204 "Semantic tokens request",
205 )
208def _dispatch_standard_handler(
209 ls: "DebputyLanguageServer",
210 doc_uri: str,
211 params: P,
212 handler_table: Mapping[str, Callable[[L, P], R]],
213 request_type: str,
214) -> R:
215 doc = ls.workspace.get_text_document(doc_uri)
217 id_source, language_id = determine_language_id(doc)
218 handler = handler_table.get(language_id)
219 if handler is None:
220 _info(
221 f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - no handler"
222 )
223 return
224 _info(
225 f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - delegating to handler"
226 )
228 return handler(
229 ls,
230 params,
231 )