diff options
Diffstat (limited to '')
-rw-r--r-- | bin/tests/system/pipelined/ans5/ans.py | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/bin/tests/system/pipelined/ans5/ans.py b/bin/tests/system/pipelined/ans5/ans.py new file mode 100644 index 0000000..bac5ed3 --- /dev/null +++ b/bin/tests/system/pipelined/ans5/ans.py @@ -0,0 +1,212 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +############################################################################ +# +# This tool acts as a TCP/UDP proxy and delays all incoming packets by 500 +# milliseconds. +# +# We use it to check pipelining - a client sents 8 questions over a +# pipelined connection - that require asking a normal (examplea) and a +# slow-responding (exampleb) servers: +# a.examplea +# a.exampleb +# b.examplea +# b.exampleb +# c.examplea +# c.exampleb +# d.examplea +# d.exampleb +# +# If pipelining works properly the answers will be returned out of order +# with all answers from examplea returned first, and then all answers +# from exampleb. +# +############################################################################ + +from __future__ import print_function + +import datetime +import os +import select +import signal +import socket +import sys +import time +import threading +import struct + +DELAY = 0.5 +THREADS = [] + + +def log(msg): + print(datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f ") + msg) + + +def sigterm(*_): + log("SIGTERM received, shutting down") + for thread in THREADS: + thread.close() + thread.join() + os.remove("ans.pid") + sys.exit(0) + + +class TCPDelayer(threading.Thread): + """For a given TCP connection conn we open a connection to (ip, port), + and then we delay each incoming packet by DELAY by putting it in a + queue. + In the pipelined test TCP should not be used, but it's here for + completnes. + """ + + def __init__(self, conn, ip, port): + threading.Thread.__init__(self) + self.conn = conn + self.cconn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.cconn.connect((ip, port)) + self.queue = [] + self.running = True + + def close(self): + self.running = False + + def run(self): + while self.running: + curr_timeout = 0.5 + try: + curr_timeout = self.queue[0][0] - time.time() + except StopIteration: + pass + if curr_timeout > 0: + if curr_timeout == 0: + curr_timeout = 0.5 + rfds, _, _ = select.select( + [self.conn, self.cconn], [], [], curr_timeout + ) + if self.conn in rfds: + data = self.conn.recv(65535) + if not data: + return + self.queue.append((time.time() + DELAY, data)) + if self.cconn in rfds: + data = self.cconn.recv(65535) + if not data == 0: + return + self.conn.send(data) + try: + while self.queue[0][0] - time.time() < 0: + _, data = self.queue.pop(0) + self.cconn.send(data) + except StopIteration: + pass + + +class UDPDelayer(threading.Thread): + """Every incoming UDP packet is put in a queue for DELAY time, then + it's sent to (ip, port). We remember the query id to send the + response we get to a proper source, responses are not delayed. + """ + + def __init__(self, usock, ip, port): + threading.Thread.__init__(self) + self.sock = usock + self.csock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.dst = (ip, port) + self.queue = [] + self.qid_mapping = {} + self.running = True + + def close(self): + self.running = False + + def run(self): + while self.running: + curr_timeout = 0.5 + if self.queue: + curr_timeout = self.queue[0][0] - time.time() + if curr_timeout >= 0: + if curr_timeout == 0: + curr_timeout = 0.5 + rfds, _, _ = select.select( + [self.sock, self.csock], [], [], curr_timeout + ) + if self.sock in rfds: + data, addr = self.sock.recvfrom(65535) + if not data: + return + self.queue.append((time.time() + DELAY, data)) + qid = struct.unpack(">H", data[:2])[0] + log("Received a query from %s, queryid %d" % (str(addr), qid)) + self.qid_mapping[qid] = addr + if self.csock in rfds: + data, addr = self.csock.recvfrom(65535) + if not data: + return + qid = struct.unpack(">H", data[:2])[0] + dst = self.qid_mapping.get(qid) + if dst is not None: + self.sock.sendto(data, dst) + log( + "Received a response from %s, queryid %d, sending to %s" + % (str(addr), qid, str(dst)) + ) + while self.queue and self.queue[0][0] - time.time() < 0: + _, data = self.queue.pop(0) + qid = struct.unpack(">H", data[:2])[0] + log("Sending a query to %s, queryid %d" % (str(self.dst), qid)) + self.csock.sendto(data, self.dst) + + +def main(): + signal.signal(signal.SIGTERM, sigterm) + signal.signal(signal.SIGINT, sigterm) + + with open("ans.pid", "w") as pidfile: + print(os.getpid(), file=pidfile) + + listenip = "10.53.0.5" + serverip = "10.53.0.2" + + try: + port = int(os.environ["PORT"]) + except KeyError: + port = 5300 + + log("Listening on %s:%d" % (listenip, port)) + + usock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + usock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + usock.bind((listenip, port)) + thread = UDPDelayer(usock, serverip, port) + thread.start() + THREADS.append(thread) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((listenip, port)) + sock.listen(1) + sock.settimeout(1) + + while True: + try: + (clientsock, _) = sock.accept() + log("Accepted connection from %s" % clientsock) + thread = TCPDelayer(clientsock, serverip, port) + thread.start() + THREADS.append(thread) + except socket.timeout: + pass + + +if __name__ == "__main__": + main() |