From afce081b90c1e2c50c3507758c7558a0dfa1f33e Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 15:18:03 +0200 Subject: Adding upstream version 2:8.2.2434. Signed-off-by: Daniel Baumann --- src/testdir/test_channel.py | 273 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 src/testdir/test_channel.py (limited to 'src/testdir/test_channel.py') diff --git a/src/testdir/test_channel.py b/src/testdir/test_channel.py new file mode 100644 index 0000000..8dba3ba --- /dev/null +++ b/src/testdir/test_channel.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python +# +# Server that will accept connections from a Vim channel. +# Used by test_channel.vim. +# +# This requires Python 2.6 or later. + +from __future__ import print_function +import json +import socket +import sys +import time +import threading + +try: + # Python 3 + import socketserver +except ImportError: + # Python 2 + import SocketServer as socketserver + +class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): + + def setup(self): + self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + def handle(self): + print("=== socket opened ===") + while True: + try: + received = self.request.recv(4096).decode('utf-8') + except socket.error: + print("=== socket error ===") + break + except IOError: + print("=== socket closed ===") + break + if received == '': + print("=== socket closed ===") + break + print("received: {0}".format(received)) + + # We may receive two messages at once. Take the part up to the + # newline, which should be after the matching "]". + todo = received + while todo != '': + splitidx = todo.find('\n') + if splitidx < 0: + used = todo + todo = '' + else: + used = todo[:splitidx] + todo = todo[splitidx + 1:] + if used != received: + print("using: {0}".format(used)) + + try: + decoded = json.loads(used) + except ValueError: + print("json decoding failed") + decoded = [-1, ''] + + # Send a response if the sequence number is positive. + if decoded[0] >= 0: + if decoded[1] == 'hello!': + # simply send back a string + response = "got it" + elif decoded[1] == 'malformed1': + cmd = '["ex",":"]wrong!["ex","smi"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + # Need to wait for Vim to give up, otherwise it + # sometimes fails on OS X. + time.sleep(0.2) + elif decoded[1] == 'malformed2': + cmd = '"unterminated string' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + # Need to wait for Vim to give up, otherwise the double + # quote in the "ok" response terminates the string. + time.sleep(0.2) + elif decoded[1] == 'malformed3': + cmd = '["ex","missing ]"' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + # Need to wait for Vim to give up, otherwise the ] + # in the "ok" response terminates the list. + time.sleep(0.2) + elif decoded[1] == 'split': + cmd = '["ex","let ' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + time.sleep(0.01) + cmd = 'g:split = 123"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1].startswith("echo "): + # send back the argument + response = decoded[1][5:] + elif decoded[1] == 'make change': + # Send two ex commands at the same time, before + # replying to the request. + cmd = '["ex","call append(\\"$\\",\\"added1\\")"]' + cmd += '["ex","call append(\\"$\\",\\"added2\\")"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'bad command': + cmd = '["ex","foo bar"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'do normal': + # Send a normal command. + cmd = '["normal","G$s more\u001b"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-works': + # Send an eval request. We ignore the response. + cmd = '["expr","\\"foo\\" . 123", -1]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-special': + # Send an eval request. We ignore the response. + cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-getline': + # Send an eval request. We ignore the response. + cmd = '["expr","getline(3)", -3]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-fails': + # Send an eval request that will fail. + cmd = '["expr","xxx", -4]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-error': + # Send an eval request that works but the result can't + # be encoded. + cmd = '["expr","function(\\"tr\\")", -5]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-bad': + # Send an eval request missing the third argument. + cmd = '["expr","xxx"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'an expr': + # Send an expr request. + cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'call-func': + cmd = '["call","MyFunction",[1,2,3], 0]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'redraw': + cmd = '["redraw",""]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'redraw!': + cmd = '["redraw","force"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'empty-request': + cmd = '[]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'eval-result': + # Send back the last received eval result. + response = last_eval + elif decoded[1] == 'call me': + cmd = '[0,"we called you"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == 'call me again': + cmd = '[0,"we did call you"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "" + elif decoded[1] == 'send zero': + cmd = '[0,"zero index"]' + print("sending: {0}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "sent zero" + elif decoded[1] == 'close me': + print("closing") + self.request.close() + response = "" + elif decoded[1] == 'wait a bit': + time.sleep(0.2) + response = "waited" + elif decoded[1] == '!quit!': + # we're done + self.server.shutdown() + return + elif decoded[1] == '!crash!': + # Crash! + 42 / 0 + else: + response = "what?" + + if response == "": + print("no response") + else: + encoded = json.dumps([decoded[0], response]) + print("sending: {0}".format(encoded)) + self.request.sendall(encoded.encode('utf-8')) + + # Negative numbers are used for "eval" responses. + elif decoded[0] < 0: + last_eval = decoded + +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + pass + +def writePortInFile(port): + # Write the port number in Xportnr, so that the test knows it. + f = open("Xportnr", "w") + f.write("{0}".format(port)) + f.close() + +def main(host, port, server_class=ThreadedTCPServer): + # Wait half a second before opening the port to test waittime in ch_open(). + # We do want to get the port number, get that first. We cannot open the + # socket, guess a port is free. + if len(sys.argv) >= 2 and sys.argv[1] == 'delay': + port = 13684 + writePortInFile(port) + + print("Wait for it...") + time.sleep(0.5) + + server = server_class((host, port), ThreadedTCPRequestHandler) + ip, port = server.server_address[0:2] + + # Start a thread with the server. That thread will then start a new thread + # for each connection. + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() + + writePortInFile(port) + + print("Listening on port {0}".format(port)) + + # Main thread terminates, but the server continues running + # until server.shutdown() is called. + try: + while server_thread.is_alive(): + server_thread.join(1) + except (KeyboardInterrupt, SystemExit): + server.shutdown() + +if __name__ == "__main__": + main("localhost", 0) -- cgit v1.2.3