summaryrefslogtreecommitdiffstats
path: root/src/testdir/test_channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/testdir/test_channel.py')
-rw-r--r--src/testdir/test_channel.py288
1 files changed, 288 insertions, 0 deletions
diff --git a/src/testdir/test_channel.py b/src/testdir/test_channel.py
new file mode 100644
index 0000000..35c34fc
--- /dev/null
+++ b/src/testdir/test_channel.py
@@ -0,0 +1,288 @@
+#!/usr/bin/env python
+#
+# Server that will accept connections from a Vim channel.
+# Used by test_channel.vim.
+#
+# This requires Python 2.6 or later.
+
+from __future__ import print_function
+import json
+import socket
+import sys
+import time
+import threading
+
+try:
+ # Python 3
+ import socketserver
+except ImportError:
+ # Python 2
+ import SocketServer as socketserver
+
+class TestingRequestHandler(socketserver.BaseRequestHandler):
+ def handle(self):
+ print("=== socket opened ===")
+ while True:
+ try:
+ received = self.request.recv(4096).decode('utf-8')
+ except socket.error:
+ print("=== socket error ===")
+ break
+ except IOError:
+ print("=== socket closed ===")
+ break
+ if received == '':
+ print("=== socket closed ===")
+ break
+ print("received: {0}".format(received))
+
+ # We may receive two messages at once. Take the part up to the
+ # newline, which should be after the matching "]".
+ todo = received
+ while todo != '':
+ splitidx = todo.find('\n')
+ if splitidx < 0:
+ used = todo
+ todo = ''
+ else:
+ used = todo[:splitidx]
+ todo = todo[splitidx + 1:]
+ if used != received:
+ print("using: {0}".format(used))
+
+ try:
+ decoded = json.loads(used)
+ except ValueError:
+ print("json decoding failed")
+ decoded = [-1, '']
+
+ # Send a response if the sequence number is positive.
+ if decoded[0] >= 0:
+ if decoded[1] == 'hello!':
+ # simply send back a string
+ response = "got it"
+ elif decoded[1] == 'malformed1':
+ cmd = '["ex",":"]wrong!["ex","smi"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ # Need to wait for Vim to give up, otherwise it
+ # sometimes fails on OS X.
+ time.sleep(0.2)
+ elif decoded[1] == 'malformed2':
+ cmd = '"unterminated string'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ # Need to wait for Vim to give up, otherwise the double
+ # quote in the "ok" response terminates the string.
+ time.sleep(0.2)
+ elif decoded[1] == 'malformed3':
+ cmd = '["ex","missing ]"'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ # Need to wait for Vim to give up, otherwise the ]
+ # in the "ok" response terminates the list.
+ time.sleep(0.2)
+ elif decoded[1] == 'split':
+ cmd = '["ex","let '
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ time.sleep(0.01)
+ cmd = 'g:split = 123"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1].startswith("echo "):
+ # send back the argument
+ response = decoded[1][5:]
+ elif decoded[1] == 'make change':
+ # Send two ex commands at the same time, before
+ # replying to the request.
+ cmd = '["ex","call append(\\"$\\",\\"added1\\")"]'
+ cmd += '["ex","call append(\\"$\\",\\"added2\\")"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'echoerr':
+ cmd = '["ex","echoerr \\\"this is an error\\\""]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ # Wait a bit, so that the "ex" command is handled
+ # before the "ch_evalexpr() returns. Otherwise we are
+ # outside the try/catch when the "ex" command is
+ # handled.
+ time.sleep(0.02)
+ elif decoded[1] == 'bad command':
+ cmd = '["ex","foo bar"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'do normal':
+ # Send a normal command.
+ cmd = '["normal","G$s more\u001b"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-works':
+ # Send an eval request. We ignore the response.
+ cmd = '["expr","\\"foo\\" . 123", -1]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-special':
+ # Send an eval request. We ignore the response.
+ cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-getline':
+ # Send an eval request. We ignore the response.
+ cmd = '["expr","getline(3)", -3]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-fails':
+ # Send an eval request that will fail.
+ cmd = '["expr","xxx", -4]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-error':
+ # Send an eval request that works but the result can't
+ # be encoded.
+ cmd = '["expr","function(\\"tr\\")", -5]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-bad':
+ # Send an eval request missing the third argument.
+ cmd = '["expr","xxx"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'an expr':
+ # Send an expr request.
+ cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'call-func':
+ cmd = '["call","MyFunction",[1,2,3], 0]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'redraw':
+ cmd = '["redraw",""]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'redraw!':
+ cmd = '["redraw","force"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'empty-request':
+ cmd = '[]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'eval-result':
+ # Send back the last received eval result.
+ response = last_eval
+ elif decoded[1] == 'call me':
+ cmd = '[0,"we called you"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "ok"
+ elif decoded[1] == 'call me again':
+ cmd = '[0,"we did call you"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = ""
+ elif decoded[1] == 'send zero':
+ cmd = '[0,"zero index"]'
+ print("sending: {0}".format(cmd))
+ self.request.sendall(cmd.encode('utf-8'))
+ response = "sent zero"
+ elif decoded[1] == 'close me':
+ print("closing")
+ self.request.close()
+ response = ""
+ elif decoded[1] == 'wait a bit':
+ time.sleep(0.2)
+ response = "waited"
+ elif decoded[1] == '!quit!':
+ # we're done
+ self.server.shutdown()
+ return
+ elif decoded[1] == '!crash!':
+ # Crash!
+ 42 / 0
+ else:
+ response = "what?"
+
+ if response == "":
+ print("no response")
+ else:
+ encoded = json.dumps([decoded[0], response])
+ print("sending: {0}".format(encoded))
+ self.request.sendall(encoded.encode('utf-8'))
+
+ # Negative numbers are used for "eval" responses.
+ elif decoded[0] < 0:
+ last_eval = decoded
+
+class ThreadedTCPRequestHandler(TestingRequestHandler):
+ def setup(self):
+ self.request.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
+ pass
+
+def writePortInFile(port):
+ # Write the port number in Xportnr, so that the test knows it.
+ f = open("Xportnr", "w")
+ f.write("{0}".format(port))
+ f.close()
+
+def main(host, port, server_class=ThreadedTCPServer):
+ # Wait half a second before opening the port to test waittime in ch_open().
+ # We do want to get the port number, get that first. We cannot open the
+ # socket, guess a port is free.
+ if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
+ port = 13684
+ writePortInFile(port)
+
+ print("Wait for it...")
+ time.sleep(0.5)
+
+ addrs = socket.getaddrinfo(host, port, 0, 0, socket.IPPROTO_TCP)
+ # Each addr is a (family, type, proto, canonname, sockaddr) tuple
+ sockaddr = addrs[0][4]
+ server_class.address_family = addrs[0][0]
+
+ server = server_class(sockaddr[0:2], ThreadedTCPRequestHandler)
+ ip, port = server.server_address[0:2]
+
+ # Start a thread with the server. That thread will then start a new thread
+ # for each connection.
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.start()
+
+ writePortInFile(port)
+
+ print("Listening on port {0}".format(port))
+
+ # Main thread terminates, but the server continues running
+ # until server.shutdown() is called.
+ try:
+ while server_thread.is_alive():
+ server_thread.join(1)
+ except (KeyboardInterrupt, SystemExit):
+ server.shutdown()
+
+if __name__ == "__main__":
+ main("localhost", 0)