summaryrefslogtreecommitdiffstats
path: root/bin/tests/system/pipelined/ans5/ans.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/tests/system/pipelined/ans5/ans.py')
-rw-r--r--bin/tests/system/pipelined/ans5/ans.py212
1 files changed, 212 insertions, 0 deletions
diff --git a/bin/tests/system/pipelined/ans5/ans.py b/bin/tests/system/pipelined/ans5/ans.py
new file mode 100644
index 0000000..bac5ed3
--- /dev/null
+++ b/bin/tests/system/pipelined/ans5/ans.py
@@ -0,0 +1,212 @@
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+############################################################################
+#
+# This tool acts as a TCP/UDP proxy and delays all incoming packets by 500
+# milliseconds.
+#
+# We use it to check pipelining - a client sents 8 questions over a
+# pipelined connection - that require asking a normal (examplea) and a
+# slow-responding (exampleb) servers:
+# a.examplea
+# a.exampleb
+# b.examplea
+# b.exampleb
+# c.examplea
+# c.exampleb
+# d.examplea
+# d.exampleb
+#
+# If pipelining works properly the answers will be returned out of order
+# with all answers from examplea returned first, and then all answers
+# from exampleb.
+#
+############################################################################
+
+from __future__ import print_function
+
+import datetime
+import os
+import select
+import signal
+import socket
+import sys
+import time
+import threading
+import struct
+
+DELAY = 0.5
+THREADS = []
+
+
+def log(msg):
+ print(datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f ") + msg)
+
+
+def sigterm(*_):
+ log("SIGTERM received, shutting down")
+ for thread in THREADS:
+ thread.close()
+ thread.join()
+ os.remove("ans.pid")
+ sys.exit(0)
+
+
+class TCPDelayer(threading.Thread):
+ """For a given TCP connection conn we open a connection to (ip, port),
+ and then we delay each incoming packet by DELAY by putting it in a
+ queue.
+ In the pipelined test TCP should not be used, but it's here for
+ completnes.
+ """
+
+ def __init__(self, conn, ip, port):
+ threading.Thread.__init__(self)
+ self.conn = conn
+ self.cconn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.cconn.connect((ip, port))
+ self.queue = []
+ self.running = True
+
+ def close(self):
+ self.running = False
+
+ def run(self):
+ while self.running:
+ curr_timeout = 0.5
+ try:
+ curr_timeout = self.queue[0][0] - time.time()
+ except StopIteration:
+ pass
+ if curr_timeout > 0:
+ if curr_timeout == 0:
+ curr_timeout = 0.5
+ rfds, _, _ = select.select(
+ [self.conn, self.cconn], [], [], curr_timeout
+ )
+ if self.conn in rfds:
+ data = self.conn.recv(65535)
+ if not data:
+ return
+ self.queue.append((time.time() + DELAY, data))
+ if self.cconn in rfds:
+ data = self.cconn.recv(65535)
+ if not data == 0:
+ return
+ self.conn.send(data)
+ try:
+ while self.queue[0][0] - time.time() < 0:
+ _, data = self.queue.pop(0)
+ self.cconn.send(data)
+ except StopIteration:
+ pass
+
+
+class UDPDelayer(threading.Thread):
+ """Every incoming UDP packet is put in a queue for DELAY time, then
+ it's sent to (ip, port). We remember the query id to send the
+ response we get to a proper source, responses are not delayed.
+ """
+
+ def __init__(self, usock, ip, port):
+ threading.Thread.__init__(self)
+ self.sock = usock
+ self.csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.dst = (ip, port)
+ self.queue = []
+ self.qid_mapping = {}
+ self.running = True
+
+ def close(self):
+ self.running = False
+
+ def run(self):
+ while self.running:
+ curr_timeout = 0.5
+ if self.queue:
+ curr_timeout = self.queue[0][0] - time.time()
+ if curr_timeout >= 0:
+ if curr_timeout == 0:
+ curr_timeout = 0.5
+ rfds, _, _ = select.select(
+ [self.sock, self.csock], [], [], curr_timeout
+ )
+ if self.sock in rfds:
+ data, addr = self.sock.recvfrom(65535)
+ if not data:
+ return
+ self.queue.append((time.time() + DELAY, data))
+ qid = struct.unpack(">H", data[:2])[0]
+ log("Received a query from %s, queryid %d" % (str(addr), qid))
+ self.qid_mapping[qid] = addr
+ if self.csock in rfds:
+ data, addr = self.csock.recvfrom(65535)
+ if not data:
+ return
+ qid = struct.unpack(">H", data[:2])[0]
+ dst = self.qid_mapping.get(qid)
+ if dst is not None:
+ self.sock.sendto(data, dst)
+ log(
+ "Received a response from %s, queryid %d, sending to %s"
+ % (str(addr), qid, str(dst))
+ )
+ while self.queue and self.queue[0][0] - time.time() < 0:
+ _, data = self.queue.pop(0)
+ qid = struct.unpack(">H", data[:2])[0]
+ log("Sending a query to %s, queryid %d" % (str(self.dst), qid))
+ self.csock.sendto(data, self.dst)
+
+
+def main():
+ signal.signal(signal.SIGTERM, sigterm)
+ signal.signal(signal.SIGINT, sigterm)
+
+ with open("ans.pid", "w") as pidfile:
+ print(os.getpid(), file=pidfile)
+
+ listenip = "10.53.0.5"
+ serverip = "10.53.0.2"
+
+ try:
+ port = int(os.environ["PORT"])
+ except KeyError:
+ port = 5300
+
+ log("Listening on %s:%d" % (listenip, port))
+
+ usock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ usock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ usock.bind((listenip, port))
+ thread = UDPDelayer(usock, serverip, port)
+ thread.start()
+ THREADS.append(thread)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((listenip, port))
+ sock.listen(1)
+ sock.settimeout(1)
+
+ while True:
+ try:
+ (clientsock, _) = sock.accept()
+ log("Accepted connection from %s" % clientsock)
+ thread = TCPDelayer(clientsock, serverip, port)
+ thread.start()
+ THREADS.append(thread)
+ except socket.timeout:
+ pass
+
+
+if __name__ == "__main__":
+ main()