summaryrefslogtreecommitdiffstats
path: root/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py')
-rwxr-xr-xsrc/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py128
1 files changed, 128 insertions, 0 deletions
diff --git a/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py b/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py
new file mode 100755
index 000000000..785b2fb88
--- /dev/null
+++ b/src/spdk/dpdk/examples/ipsec-secgw/test/pkttest.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+
+import fcntl
+import pkg_resources
+import socket
+import struct
+import sys
+import unittest
+
+
+if sys.version_info < (3, 0):
+ print("Python3 is required to run this script")
+ sys.exit(1)
+
+
+try:
+ from scapy.all import Ether
+except ImportError:
+ print("Scapy module is required")
+ sys.exit(1)
+
+
+PKTTEST_REQ = [
+ "scapy>=2.4.3",
+]
+
+
+def assert_requirements(req):
+ """
+ assert requirement is met
+ req can hold a string or a list of strings
+ """
+ try:
+ pkg_resources.require(req)
+ except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
+ print("Requirement assertion: " + str(e))
+ sys.exit(1)
+
+
+TAP_UNPROTECTED = "dtap1"
+TAP_PROTECTED = "dtap0"
+
+
+class Interface(object):
+ ETH_P_ALL = 3
+ MAX_PACKET_SIZE = 1280
+ IOCTL_GET_INFO = 0x8927
+ SOCKET_TIMEOUT = 0.5
+ def __init__(self, ifname):
+ self.name = ifname
+
+ # create and bind socket to specified interface
+ self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
+ self.s.settimeout(Interface.SOCKET_TIMEOUT)
+ self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
+
+ # get interface MAC address
+ info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
+ self.mac = ':'.join(['%02x' % i for i in info[18:24]])
+
+ def __del__(self):
+ self.s.close()
+
+ def send_l3packet(self, pkt, mac):
+ e = Ether(src=self.mac, dst=mac)
+ self.send_packet(e/pkt)
+
+ def send_packet(self, pkt):
+ self.send_bytes(bytes(pkt))
+
+ def send_bytes(self, bytedata):
+ self.s.send(bytedata)
+
+ def recv_packet(self):
+ return Ether(self.recv_bytes())
+
+ def recv_bytes(self):
+ return self.s.recv(Interface.MAX_PACKET_SIZE)
+
+ def get_mac(self):
+ return self.mac
+
+
+class PacketXfer(object):
+ def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
+ self.protected_port = Interface(protected_iface)
+ self.unprotected_port = Interface(unprotected_iface)
+
+ def send_to_protected_port(self, pkt, remote_mac=None):
+ if remote_mac is None:
+ remote_mac = self.unprotected_port.get_mac()
+ self.protected_port.send_l3packet(pkt, remote_mac)
+
+ def send_to_unprotected_port(self, pkt, remote_mac=None):
+ if remote_mac is None:
+ remote_mac = self.protected_port.get_mac()
+ self.unprotected_port.send_l3packet(pkt, remote_mac)
+
+ def xfer_unprotected(self, pkt):
+ self.send_to_unprotected_port(pkt)
+ return self.protected_port.recv_packet()
+
+ def xfer_protected(self, pkt):
+ self.send_to_protected_port(pkt)
+ return self.unprotected_port.recv_packet()
+
+
+def pkttest():
+ if len(sys.argv) == 1:
+ sys.exit(unittest.main(verbosity=2))
+ elif len(sys.argv) == 2:
+ if sys.argv[1] == "config":
+ module = __import__('__main__')
+ try:
+ print(module.config())
+ except AttributeError:
+ sys.stderr.write("Cannot find \"config()\" in a test")
+ sys.exit(1)
+ else:
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
+ assert_requirements(PKTTEST_REQ)
+ else:
+ print("Usage: " + sys.argv[0] + " check_reqs")