Coverage for src/debputy/lsp/lsp_dispatch.py: 43%

82 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-04-07 12:14 +0200

1import asyncio 

2import os.path 

3from typing import ( 

4 Dict, 

5 Sequence, 

6 Union, 

7 Optional, 

8 TypeVar, 

9 Callable, 

10 Mapping, 

11 List, 

12 Tuple, 

13) 

14 

15from lsprotocol.types import ( 

16 DidOpenTextDocumentParams, 

17 DidChangeTextDocumentParams, 

18 TEXT_DOCUMENT_DID_CHANGE, 

19 TEXT_DOCUMENT_DID_OPEN, 

20 TEXT_DOCUMENT_COMPLETION, 

21 CompletionList, 

22 CompletionItem, 

23 CompletionParams, 

24 TEXT_DOCUMENT_HOVER, 

25 TEXT_DOCUMENT_FOLDING_RANGE, 

26 FoldingRange, 

27 FoldingRangeParams, 

28 TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, 

29 SemanticTokensParams, 

30 SemanticTokens, 

31 Hover, 

32 TEXT_DOCUMENT_CODE_ACTION, 

33 Command, 

34 CodeAction, 

35 CodeActionParams, 

36 SemanticTokensRegistrationOptions, 

37) 

38 

39from debputy import __version__ 

40from debputy.lsp.lsp_features import ( 

41 DIAGNOSTIC_HANDLERS, 

42 COMPLETER_HANDLERS, 

43 HOVER_HANDLERS, 

44 SEMANTIC_TOKENS_FULL_HANDLERS, 

45 CODE_ACTION_HANDLERS, 

46 SEMANTIC_TOKENS_LEGEND, 

47) 

48from debputy.util import _info 

49 

50_DOCUMENT_VERSION_TABLE: Dict[str, int] = {} 

51 

52try: 

53 from pygls.server import LanguageServer 

54 from pygls.workspace import TextDocument 

55 from debputy.lsp.debputy_ls import DebputyLanguageServer 

56 

57 DEBPUTY_LANGUAGE_SERVER = DebputyLanguageServer("debputy", f"v{__version__}") 

58except ImportError as e: 

59 

60 class Mock: 

61 

62 def feature(self, *args, **kwargs): 

63 return lambda x: x 

64 

65 DEBPUTY_LANGUAGE_SERVER = Mock() 

66 

67 

68P = TypeVar("P") 

69R = TypeVar("R") 

70L = TypeVar("L", "LanguageServer", "DebputyLanguageServer") 

71 

72 

73def is_doc_at_version(uri: str, version: int) -> bool: 

74 dv = _DOCUMENT_VERSION_TABLE.get(uri) 

75 return dv == version 

76 

77 

78def determine_language_id(doc: "TextDocument") -> Tuple[str, str]: 

79 lang_id = doc.language_id 

80 if lang_id and not lang_id.isspace(): 

81 return "declared", lang_id 

82 path = doc.path 

83 try: 

84 last_idx = path.rindex("debian/") 

85 except ValueError: 

86 return "filename", os.path.basename(path) 

87 guess_language_id = path[last_idx:] 

88 return "filename", guess_language_id 

89 

90 

91@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_OPEN) 

92@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_DID_CHANGE) 

93async def _open_or_changed_document( 

94 ls: "DebputyLanguageServer", 

95 params: Union[DidOpenTextDocumentParams, DidChangeTextDocumentParams], 

96) -> None: 

97 version = params.text_document.version 

98 doc_uri = params.text_document.uri 

99 doc = ls.workspace.get_text_document(doc_uri) 

100 

101 _DOCUMENT_VERSION_TABLE[doc_uri] = version 

102 id_source, language_id = determine_language_id(doc) 

103 handler = DIAGNOSTIC_HANDLERS.get(language_id) 

104 if handler is None: 

105 _info( 

106 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - no diagnostics handler" 

107 ) 

108 return 

109 _info( 

110 f"Opened/Changed document: {doc.path} ({language_id}, {id_source}) - running diagnostics for doc version {version}" 

111 ) 

112 last_publish_count = -1 

113 

114 diagnostics_scanner = handler(ls, params) 

115 async for diagnostics in diagnostics_scanner: 

116 await asyncio.sleep(0) 

117 if not is_doc_at_version(doc_uri, version): 

118 # This basically happens with very edit, so lets not notify the client 

119 # for that. 

120 _info( 

121 f"Cancel (obsolete) diagnostics for doc version {version}: document version changed" 

122 ) 

123 break 

124 if diagnostics is None or last_publish_count != len(diagnostics): 

125 last_publish_count = len(diagnostics) if diagnostics is not None else 0 

126 ls.publish_diagnostics( 

127 doc.uri, 

128 diagnostics, 

129 ) 

130 

131 

132@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_COMPLETION) 

133def _completions( 

134 ls: "DebputyLanguageServer", 

135 params: CompletionParams, 

136) -> Optional[Union[CompletionList, Sequence[CompletionItem]]]: 

137 return _dispatch_standard_handler( 

138 ls, 

139 params.text_document.uri, 

140 params, 

141 COMPLETER_HANDLERS, 

142 "Complete request", 

143 ) 

144 

145 

146@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_HOVER) 

147def _hover( 

148 ls: "DebputyLanguageServer", 

149 params: CompletionParams, 

150) -> Optional[Hover]: 

151 return _dispatch_standard_handler( 

152 ls, 

153 params.text_document.uri, 

154 params, 

155 HOVER_HANDLERS, 

156 "Hover doc request", 

157 ) 

158 

159 

160@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_CODE_ACTION) 

161def _code_actions( 

162 ls: "DebputyLanguageServer", 

163 params: CodeActionParams, 

164) -> Optional[List[Union[Command, CodeAction]]]: 

165 return _dispatch_standard_handler( 

166 ls, 

167 params.text_document.uri, 

168 params, 

169 CODE_ACTION_HANDLERS, 

170 "Code action request", 

171 ) 

172 

173 

174@DEBPUTY_LANGUAGE_SERVER.feature(TEXT_DOCUMENT_FOLDING_RANGE) 

175def _folding_ranges( 

176 ls: "DebputyLanguageServer", 

177 params: FoldingRangeParams, 

178) -> Optional[Sequence[FoldingRange]]: 

179 return _dispatch_standard_handler( 

180 ls, 

181 params.text_document.uri, 

182 params, 

183 HOVER_HANDLERS, 

184 "Folding range request", 

185 ) 

186 

187 

188@DEBPUTY_LANGUAGE_SERVER.feature( 

189 TEXT_DOCUMENT_SEMANTIC_TOKENS_FULL, 

190 SemanticTokensRegistrationOptions( 

191 SEMANTIC_TOKENS_LEGEND, 

192 full=True, 

193 ), 

194) 

195def _semantic_tokens_full( 

196 ls: "DebputyLanguageServer", 

197 params: SemanticTokensParams, 

198) -> Optional[SemanticTokens]: 

199 return _dispatch_standard_handler( 

200 ls, 

201 params.text_document.uri, 

202 params, 

203 SEMANTIC_TOKENS_FULL_HANDLERS, 

204 "Semantic tokens request", 

205 ) 

206 

207 

208def _dispatch_standard_handler( 

209 ls: "DebputyLanguageServer", 

210 doc_uri: str, 

211 params: P, 

212 handler_table: Mapping[str, Callable[[L, P], R]], 

213 request_type: str, 

214) -> R: 

215 doc = ls.workspace.get_text_document(doc_uri) 

216 

217 id_source, language_id = determine_language_id(doc) 

218 handler = handler_table.get(language_id) 

219 if handler is None: 

220 _info( 

221 f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - no handler" 

222 ) 

223 return 

224 _info( 

225 f"{request_type} for document: {doc.path} ({language_id}, {id_source}) - delegating to handler" 

226 ) 

227 

228 return handler( 

229 ls, 

230 params, 

231 )