diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py')
-rwxr-xr-x | src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py b/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py new file mode 100755 index 000000000..785b2fb88 --- /dev/null +++ b/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: BSD-3-Clause + +import fcntl +import pkg_resources +import socket +import struct +import sys +import unittest + + +if sys.version_info < (3, 0): + print("Python3 is required to run this script") + sys.exit(1) + + +try: + from scapy.all import Ether +except ImportError: + print("Scapy module is required") + sys.exit(1) + + +PKTTEST_REQ = [ + "scapy>=2.4.3", +] + + +def assert_requirements(req): + """ + assert requirement is met + req can hold a string or a list of strings + """ + try: + pkg_resources.require(req) + except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e: + print("Requirement assertion: " + str(e)) + sys.exit(1) + + +TAP_UNPROTECTED = "dtap1" +TAP_PROTECTED = "dtap0" + + +class Interface(object): + ETH_P_ALL = 3 + MAX_PACKET_SIZE = 1280 + IOCTL_GET_INFO = 0x8927 + SOCKET_TIMEOUT = 0.5 + def __init__(self, ifname): + self.name = ifname + + # create and bind socket to specified interface + self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL)) + self.s.settimeout(Interface.SOCKET_TIMEOUT) + self.s.bind((self.name, 0, socket.PACKET_OTHERHOST)) + + # get interface MAC address + info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii'))) + self.mac = ':'.join(['%02x' % i for i in info[18:24]]) + + def __del__(self): + self.s.close() + + def send_l3packet(self, pkt, mac): + e = Ether(src=self.mac, dst=mac) + self.send_packet(e/pkt) + + def send_packet(self, pkt): + self.send_bytes(bytes(pkt)) + + def send_bytes(self, bytedata): + self.s.send(bytedata) + + def recv_packet(self): + return Ether(self.recv_bytes()) + + def recv_bytes(self): + return self.s.recv(Interface.MAX_PACKET_SIZE) + + def get_mac(self): + return self.mac + + +class PacketXfer(object): + def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED): + self.protected_port = Interface(protected_iface) + self.unprotected_port = Interface(unprotected_iface) + + def send_to_protected_port(self, pkt, remote_mac=None): + if remote_mac is None: + remote_mac = self.unprotected_port.get_mac() + self.protected_port.send_l3packet(pkt, remote_mac) + + def send_to_unprotected_port(self, pkt, remote_mac=None): + if remote_mac is None: + remote_mac = self.protected_port.get_mac() + self.unprotected_port.send_l3packet(pkt, remote_mac) + + def xfer_unprotected(self, pkt): + self.send_to_unprotected_port(pkt) + return self.protected_port.recv_packet() + + def xfer_protected(self, pkt): + self.send_to_protected_port(pkt) + return self.unprotected_port.recv_packet() + + +def pkttest(): + if len(sys.argv) == 1: + sys.exit(unittest.main(verbosity=2)) + elif len(sys.argv) == 2: + if sys.argv[1] == "config": + module = __import__('__main__') + try: + print(module.config()) + except AttributeError: + sys.stderr.write("Cannot find \"config()\" in a test") + sys.exit(1) + else: + sys.exit(1) + + +if __name__ == "__main__": + if len(sys.argv) == 2 and sys.argv[1] == "check_reqs": + assert_requirements(PKTTEST_REQ) + else: + print("Usage: " + sys.argv[0] + " check_reqs") |