summaryrefslogtreecommitdiffstats
path: root/src/testdir/test_channel_lsp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/testdir/test_channel_lsp.py')
-rw-r--r--src/testdir/test_channel_lsp.py333
1 files changed, 333 insertions, 0 deletions
diff --git a/src/testdir/test_channel_lsp.py b/src/testdir/test_channel_lsp.py
new file mode 100644
index 0000000..10b4fb4
--- /dev/null
+++ b/src/testdir/test_channel_lsp.py
@@ -0,0 +1,333 @@
+#!/usr/bin/env python
+#
+# Server that will accept connections from a Vim channel.
+# Used by test_channel.vim to test LSP functionality.
+#
+# This requires Python 2.6 or later.
+
+from __future__ import print_function
+import json
+import socket
+import sys
+import time
+import threading
+
+try:
+ # Python 3
+ import socketserver
+except ImportError:
+ # Python 2
+ import SocketServer as socketserver
+
+class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
+
+ def setup(self):
+ self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ def debuglog(self, msg):
+ if self.debug:
+ with open("Xlspserver.log", "a") as myfile:
+ myfile.write(msg)
+
+ def send_lsp_msg(self, msgid, resp_dict):
+ v = {'jsonrpc': '2.0', 'result': resp_dict}
+ if msgid != -1:
+ v['id'] = msgid
+ s = json.dumps(v)
+ resp = "Content-Length: " + str(len(s)) + "\r\n"
+ resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
+ resp += "\r\n"
+ resp += s
+ if self.debug:
+ self.debuglog("SEND: ({0} bytes) '{1}'\n".format(len(resp), resp))
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_wrong_payload(self):
+ v = 'wrong-payload'
+ s = json.dumps(v)
+ resp = "Content-Length: " + str(len(s)) + "\r\n"
+ resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
+ resp += "\r\n"
+ resp += s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_empty_header(self, msgid, resp_dict):
+ v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
+ s = json.dumps(v)
+ resp = "\r\n"
+ resp += s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_empty_payload(self):
+ resp = "Content-Length: 0\r\n"
+ resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
+ resp += "\r\n"
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_extra_hdr_fields(self, msgid, resp_dict):
+ # test for sending extra fields in the http header
+ v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
+ s = json.dumps(v)
+ resp = "Host: abc.vim.org\r\n"
+ resp += "User-Agent: Python\r\n"
+ resp += "Accept-Language: en-US,en\r\n"
+ resp += "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
+ resp += "Content-Length: " + str(len(s)) + "\r\n"
+ resp += "\r\n"
+ resp += s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_delayed_payload(self, msgid, resp_dict):
+ # test for sending the hdr first and then after some delay, send the
+ # payload
+ v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
+ s = json.dumps(v)
+ resp = "Content-Length: " + str(len(s)) + "\r\n"
+ resp += "\r\n"
+ self.request.sendall(resp.encode('utf-8'))
+ time.sleep(0.05)
+ resp = s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_hdr_without_len(self, msgid, resp_dict):
+ # test for sending the http header without length
+ v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
+ s = json.dumps(v)
+ resp = "Content-Type: application/vim-jsonrpc; charset=utf-8\r\n"
+ resp += "\r\n"
+ resp += s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_hdr_with_wrong_len(self, msgid, resp_dict):
+ # test for sending the http header with wrong length
+ v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
+ s = json.dumps(v)
+ resp = "Content-Length: 1000\r\n"
+ resp += "\r\n"
+ resp += s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def send_hdr_with_negative_len(self, msgid, resp_dict):
+ # test for sending the http header with negative length
+ v = {'jsonrpc': '2.0', 'id': msgid, 'result': resp_dict}
+ s = json.dumps(v)
+ resp = "Content-Length: -1\r\n"
+ resp += "\r\n"
+ resp += s
+ self.request.sendall(resp.encode('utf-8'))
+
+ def do_ping(self, payload):
+ time.sleep(0.2)
+ self.send_lsp_msg(payload['id'], 'alive')
+
+ def do_echo(self, payload):
+ self.send_lsp_msg(-1, payload)
+
+ def do_simple_rpc(self, payload):
+ # test for a simple RPC request
+ self.send_lsp_msg(payload['id'], 'simple-rpc')
+
+ def do_rpc_with_notif(self, payload):
+ # test for sending a notification before replying to a request message
+ self.send_lsp_msg(-1, 'rpc-with-notif-notif')
+ # sleep for some time to make sure the notification is delivered
+ time.sleep(0.2)
+ self.send_lsp_msg(payload['id'], 'rpc-with-notif-resp')
+
+ def do_wrong_payload(self, payload):
+ # test for sending a non dict payload
+ self.send_wrong_payload()
+ time.sleep(0.2)
+ self.send_lsp_msg(-1, 'wrong-payload')
+
+ def do_large_payload(self, payload):
+ # test for sending a large (> 64K) payload
+ self.send_lsp_msg(payload['id'], payload)
+
+ def do_rpc_resp_incorrect_id(self, payload):
+ self.send_lsp_msg(-1, 'rpc-resp-incorrect-id-1')
+ self.send_lsp_msg(-1, 'rpc-resp-incorrect-id-2')
+ self.send_lsp_msg(1, 'rpc-resp-incorrect-id-3')
+ time.sleep(0.2)
+ self.send_lsp_msg(payload['id'], 'rpc-resp-incorrect-id-4')
+
+ def do_simple_notif(self, payload):
+ # notification message test
+ self.send_lsp_msg(-1, 'simple-notif')
+
+ def do_multi_notif(self, payload):
+ # send multiple notifications
+ self.send_lsp_msg(-1, 'multi-notif1')
+ self.send_lsp_msg(-1, 'multi-notif2')
+
+ def do_msg_with_id(self, payload):
+ self.send_lsp_msg(payload['id'], 'msg-with-id')
+
+ def do_msg_specific_cb(self, payload):
+ self.send_lsp_msg(payload['id'], 'msg-specific-cb')
+
+ def do_server_req(self, payload):
+ self.send_lsp_msg(201, {'method': 'checkhealth', 'params': {'a': 20}})
+
+ def do_extra_hdr_fields(self, payload):
+ self.send_extra_hdr_fields(payload['id'], 'extra-hdr-fields')
+
+ def do_delayad_payload(self, payload):
+ self.send_delayed_payload(payload['id'], 'delayed-payload')
+
+ def do_hdr_without_len(self, payload):
+ self.send_hdr_without_len(payload['id'], 'hdr-without-len')
+
+ def do_hdr_with_wrong_len(self, payload):
+ self.send_hdr_with_wrong_len(payload['id'], 'hdr-with-wrong-len')
+
+ def do_hdr_with_negative_len(self, payload):
+ self.send_hdr_with_negative_len(payload['id'], 'hdr-with-negative-len')
+
+ def do_empty_header(self, payload):
+ self.send_empty_header(payload['id'], 'empty-header')
+
+ def do_empty_payload(self, payload):
+ self.send_empty_payload()
+
+ def process_msg(self, msg):
+ try:
+ decoded = json.loads(msg)
+ if 'method' in decoded:
+ test_map = {
+ 'ping': self.do_ping,
+ 'echo': self.do_echo,
+ 'simple-rpc': self.do_simple_rpc,
+ 'rpc-with-notif': self.do_rpc_with_notif,
+ 'wrong-payload': self.do_wrong_payload,
+ 'large-payload': self.do_large_payload,
+ 'rpc-resp-incorrect-id': self.do_rpc_resp_incorrect_id,
+ 'simple-notif': self.do_simple_notif,
+ 'multi-notif': self.do_multi_notif,
+ 'msg-with-id': self.do_msg_with_id,
+ 'msg-specific-cb': self.do_msg_specific_cb,
+ 'server-req': self.do_server_req,
+ 'extra-hdr-fields': self.do_extra_hdr_fields,
+ 'delayed-payload': self.do_delayad_payload,
+ 'hdr-without-len': self.do_hdr_without_len,
+ 'hdr-with-wrong-len': self.do_hdr_with_wrong_len,
+ 'hdr-with-negative-len': self.do_hdr_with_negative_len,
+ 'empty-header': self.do_empty_header,
+ 'empty-payload': self.do_empty_payload
+ }
+ if decoded['method'] in test_map:
+ test_map[decoded['method']](decoded)
+ else:
+ self.debuglog("Error: Unsupported method - " + decoded['method'] + "\n")
+ else:
+ self.debuglog("Error: 'method' field is not found\n")
+
+ except ValueError:
+ self.debuglog("Error: json decoding failed\n")
+
+ def process_msgs(self, msgbuf):
+ while True:
+ sidx = msgbuf.find('Content-Length: ')
+ if sidx == -1:
+ # partial message received
+ return msgbuf
+ sidx += 16
+ eidx = msgbuf.find('\r\n')
+ if eidx == -1:
+ # partial message received
+ return msgbuf
+ msglen = int(msgbuf[sidx:eidx])
+
+ hdrend = msgbuf.find('\r\n\r\n')
+ if hdrend == -1:
+ # partial message received
+ return msgbuf
+
+ if msglen > len(msgbuf[hdrend + 4:]):
+ if self.debug:
+ self.debuglog("Partial message ({0} bytes)\n".format(len(msgbuf)))
+ # partial message received
+ return msgbuf
+
+ if self.debug:
+ self.debuglog("Complete message ({0} bytes) received\n".format(msglen))
+
+ # Remove the header
+ msgbuf = msgbuf[hdrend + 4:]
+ payload = msgbuf[:msglen]
+
+ self.process_msg(payload)
+
+ # Remove the processed message
+ msgbuf = msgbuf[msglen:]
+
+ def handle(self):
+ self.debug = False
+ self.debuglog("=== socket opened ===\n")
+ msgbuf = ''
+ while True:
+ try:
+ received = self.request.recv(4096).decode('utf-8')
+ except socket.error:
+ self.debuglog("=== socket error ===\n")
+ break
+ except IOError:
+ self.debuglog("=== socket closed ===\n")
+ break
+ if received == '':
+ self.debuglog("=== socket closed ===\n")
+ break
+
+ # Write the received lines into the file for debugging
+ if self.debug:
+ self.debuglog("RECV: ({0} bytes) '{1}'\n".format(len(received), received))
+
+ # Can receive more than one line in a response or a partial line.
+ # Accumulate all the received characters and process one line at
+ # a time.
+ msgbuf += received
+ msgbuf = self.process_msgs(msgbuf)
+
+class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
+ pass
+
+def writePortInFile(port):
+ # Write the port number in Xportnr, so that the test knows it.
+ f = open("Xportnr", "w")
+ f.write("{0}".format(port))
+ f.close()
+
+def main(host, port, server_class=ThreadedTCPServer):
+ # Wait half a second before opening the port to test waittime in ch_open().
+ # We do want to get the port number, get that first. We cannot open the
+ # socket, guess a port is free.
+ if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
+ port = 13684
+ writePortInFile(port)
+ time.sleep(0.5)
+
+ addrs = socket.getaddrinfo(host, port, 0, 0, socket.IPPROTO_TCP)
+ # Each addr is a (family, type, proto, canonname, sockaddr) tuple
+ sockaddr = addrs[0][4]
+ server_class.address_family = addrs[0][0]
+
+ server = server_class(sockaddr[0:2], ThreadedTCPRequestHandler)
+ ip, port = server.server_address[0:2]
+
+ # Start a thread with the server. That thread will then start a new thread
+ # for each connection.
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.start()
+
+ writePortInFile(port)
+
+ # Main thread terminates, but the server continues running
+ # until server.shutdown() is called.
+ try:
+ while server_thread.is_alive():
+ server_thread.join(1)
+ except (KeyboardInterrupt, SystemExit):
+ server.shutdown()
+
+if __name__ == "__main__":
+ main("localhost", 0)