path: root/tools/testing/selftests/net/lib/py
diff options
Diffstat (limited to 'tools/testing/selftests/net/lib/py')
7 files changed, 492 insertions, 0 deletions
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..b6d498d125
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-2.0
+from .consts import KSRC
+from .ksft import *
+from .netns import NetNS
+from .nsim import *
+from .utils import *
+from .ynl import NlError, YnlFamily, EthtoolFamily, NetdevFamily, RtnlFamily
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..f518ce79d8
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,9 @@
+# SPDX-License-Identifier: GPL-2.0
+import sys
+from pathlib import Path
+KSFT_DIR = (Path(__file__).parent / "../../..").resolve()
+KSRC = (Path(__file__).parent / "../../../../../..").resolve()
+KSFT_MAIN_NAME = Path(sys.argv[0]).with_suffix("").name
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..4769b4eb1e
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,159 @@
+# SPDX-License-Identifier: GPL-2.0
+import builtins
+import inspect
+import sys
+import time
+import traceback
+from .consts import KSFT_MAIN_NAME
+class KsftFailEx(Exception):
+ pass
+class KsftSkipEx(Exception):
+ pass
+class KsftXfailEx(Exception):
+ pass
+def ksft_pr(*objs, **kwargs):
+ print("#", *objs, **kwargs)
+def _fail(*args):
+ global KSFT_RESULT
+ frame = inspect.stack()[2]
+ ksft_pr("At " + frame.filename + " line " + str(frame.lineno) + ":")
+ ksft_pr(*args)
+def ksft_eq(a, b, comment=""):
+ global KSFT_RESULT
+ if a != b:
+ _fail("Check failed", a, "!=", b, comment)
+def ksft_true(a, comment=""):
+ if not a:
+ _fail("Check failed", a, "does not eval to True", comment)
+def ksft_in(a, b, comment=""):
+ if a not in b:
+ _fail("Check failed", a, "not in", b, comment)
+def ksft_ge(a, b, comment=""):
+ if a < b:
+ _fail("Check failed", a, "<", b, comment)
+class ksft_raises:
+ def __init__(self, expected_type):
+ self.exception = None
+ self.expected_type = expected_type
+ def __enter__(self):
+ return self
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is None:
+ _fail(f"Expected exception {str(self.expected_type.__name__)}, none raised")
+ elif self.expected_type != exc_type:
+ _fail(f"Expected exception {str(self.expected_type.__name__)}, raised {str(exc_type.__name__)}")
+ self.exception = exc_val
+ # Suppress the exception if its the expected one
+ return self.expected_type == exc_type
+def ksft_busy_wait(cond, sleep=0.005, deadline=1, comment=""):
+ end = time.monotonic() + deadline
+ while True:
+ if cond():
+ return
+ if time.monotonic() > end:
+ _fail("Waiting for condition timed out", comment)
+ return
+ time.sleep(sleep)
+def ktap_result(ok, cnt=1, case="", comment=""):
+ res = ""
+ if not ok:
+ res += "not "
+ res += "ok "
+ res += str(cnt) + " "
+ if case:
+ res += "." + str(case.__name__)
+ if comment:
+ res += " # " + comment
+ print(res)
+def ksft_run(cases=None, globs=None, case_pfx=None, args=()):
+ cases = cases or []
+ if globs and case_pfx:
+ for key, value in globs.items():
+ if not callable(value):
+ continue
+ for prefix in case_pfx:
+ if key.startswith(prefix):
+ cases.append(value)
+ break
+ totals = {"pass": 0, "fail": 0, "skip": 0, "xfail": 0}
+ print("KTAP version 1")
+ print("1.." + str(len(cases)))
+ global KSFT_RESULT
+ cnt = 0
+ for case in cases:
+ cnt += 1
+ try:
+ case(*args)
+ except KsftSkipEx as e:
+ ktap_result(True, cnt, case, comment="SKIP " + str(e))
+ totals['skip'] += 1
+ continue
+ except KsftXfailEx as e:
+ ktap_result(True, cnt, case, comment="XFAIL " + str(e))
+ totals['xfail'] += 1
+ continue
+ except Exception as e:
+ tb = traceback.format_exc()
+ for line in tb.strip().split('\n'):
+ ksft_pr("Exception|", line)
+ ktap_result(False, cnt, case)
+ totals['fail'] += 1
+ continue
+ ktap_result(KSFT_RESULT, cnt, case)
+ totals['pass'] += 1
+ else:
+ totals['fail'] += 1
+ print(
+ f"# Totals: pass:{totals['pass']} fail:{totals['fail']} xfail:{totals['xfail']} xpass:0 skip:{totals['skip']} error:0"
+ )
+def ksft_exit():
+ sys.exit(0 if KSFT_RESULT_ALL else 1)
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..ecff85f907
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,31 @@
+# SPDX-License-Identifier: GPL-2.0
+from .utils import ip
+import random
+import string
+class NetNS:
+ def __init__(self, name=None):
+ if name:
+ = name
+ else:
+ = ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
+ ip('netns add ' +
+ def __del__(self):
+ if
+ ip('netns del ' +
+ = None
+ def __enter__(self):
+ return self
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ self.__del__()
+ def __str__(self):
+ return
+ def __repr__(self):
+ return f"NetNS({})"
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..f571a8b313
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,134 @@
+# SPDX-License-Identifier: GPL-2.0
+import json
+import os
+import random
+import re
+import time
+from .utils import cmd, ip
+class NetdevSim:
+ """
+ Class for netdevsim netdevice and its attributes.
+ """
+ def __init__(self, nsimdev, port_index, ifname, ns=None):
+ # In case udev renamed the netdev to according to new schema,
+ # check if the name matches the port_index.
+ nsimnamere = re.compile(r"eni\d+np(\d+)")
+ match = nsimnamere.match(ifname)
+ if match and int(match.groups()[0]) != port_index + 1:
+ raise Exception("netdevice name mismatches the expected one")
+ self.ifname = ifname
+ self.nsimdev = nsimdev
+ self.port_index = port_index
+ self.ns = ns
+ self.dfs_dir = "%s/ports/%u/" % (nsimdev.dfs_dir, port_index)
+ ret = ip("-j link show dev %s" % ifname, ns=ns)
+ = json.loads(ret.stdout)[0]
+ self.ifindex =["ifindex"]
+ def dfs_write(self, path, val):
+ self.nsimdev.dfs_write(f'ports/{self.port_index}/' + path, val)
+class NetdevSimDev:
+ """
+ Class for netdevsim bus device and its attributes.
+ """
+ @staticmethod
+ def ctrl_write(path, val):
+ fullpath = os.path.join("/sys/bus/netdevsim/", path)
+ with open(fullpath, "w") as f:
+ f.write(val)
+ def dfs_write(self, path, val):
+ fullpath = os.path.join(f"/sys/kernel/debug/netdevsim/netdevsim{self.addr}/", path)
+ with open(fullpath, "w") as f:
+ f.write(val)
+ def __init__(self, port_count=1, queue_count=1, ns=None):
+ # nsim will spawn in init_net, we'll set to actual ns once we switch it there
+ self.ns = None
+ if not os.path.exists("/sys/bus/netdevsim"):
+ cmd("modprobe netdevsim")
+ addr = random.randrange(1 << 15)
+ while True:
+ try:
+ self.ctrl_write("new_device", "%u %u %u" % (addr, port_count, queue_count))
+ except OSError as e:
+ if e.errno == errno.ENOSPC:
+ addr = random.randrange(1 << 15)
+ continue
+ raise e
+ break
+ self.addr = addr
+ # As probe of netdevsim device might happen from a workqueue,
+ # so wait here until all netdevs appear.
+ self.wait_for_netdevs(port_count)
+ if ns:
+ cmd(f"devlink dev reload netdevsim/netdevsim{addr} netns {}")
+ self.ns = ns
+ cmd("udevadm settle", ns=self.ns)
+ ifnames = self.get_ifnames()
+ self.dfs_dir = "/sys/kernel/debug/netdevsim/netdevsim%u/" % addr
+ self.nsims = []
+ for port_index in range(port_count):
+ self.nsims.append(self._make_port(port_index, ifnames[port_index]))
+ self.removed = False
+ def __enter__(self):
+ return self
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ """
+ __exit__ gets called at the end of a "with" block.
+ """
+ self.remove()
+ def _make_port(self, port_index, ifname):
+ return NetdevSim(self, port_index, ifname, self.ns)
+ def get_ifnames(self):
+ ifnames = []
+ listdir = cmd(f"ls /sys/bus/netdevsim/devices/netdevsim{self.addr}/net/",
+ ns=self.ns).stdout.split()
+ for ifname in listdir:
+ ifnames.append(ifname)
+ ifnames.sort()
+ return ifnames
+ def wait_for_netdevs(self, port_count):
+ timeout = 5
+ timeout_start = time.time()
+ while True:
+ try:
+ ifnames = self.get_ifnames()
+ except FileNotFoundError as e:
+ ifnames = []
+ if len(ifnames) == port_count:
+ break
+ if time.time() < timeout_start + timeout:
+ continue
+ raise Exception("netdevices did not appear within timeout")
+ def remove(self):
+ if not self.removed:
+ self.ctrl_write("del_device", "%u" % (self.addr, ))
+ self.removed = True
+ def remove_nsim(self, nsim):
+ self.nsims.remove(nsim)
+ self.ctrl_write("devices/netdevsim%u/del_port" % (self.addr, ),
+ "%u" % (nsim.port_index, ))
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..0540ea2492
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,102 @@
+# SPDX-License-Identifier: GPL-2.0
+import json as _json
+import random
+import re
+import subprocess
+import time
+class cmd:
+ def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None, timeout=5):
+ if ns:
+ comm = f'ip netns exec {ns} ' + comm
+ self.stdout = None
+ self.stderr = None
+ self.ret = None
+ self.comm = comm
+ if host:
+ self.proc = host.cmd(comm)
+ else:
+ self.proc = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ if not background:
+ self.process(terminate=False, fail=fail, timeout=timeout)
+ def process(self, terminate=True, fail=None, timeout=5):
+ if fail is None:
+ fail = not terminate
+ if terminate:
+ self.proc.terminate()
+ stdout, stderr = self.proc.communicate(timeout)
+ self.stdout = stdout.decode("utf-8")
+ self.stderr = stderr.decode("utf-8")
+ self.proc.stdout.close()
+ self.proc.stderr.close()
+ self.ret = self.proc.returncode
+ if self.proc.returncode != 0 and fail:
+ if len(stderr) > 0 and stderr[-1] == "\n":
+ stderr = stderr[:-1]
+ raise Exception("Command failed: %s\nSTDOUT: %s\nSTDERR: %s" %
+ (self.proc.args, stdout, stderr))
+class bkg(cmd):
+ def __init__(self, comm, shell=True, fail=None, ns=None, host=None,
+ exit_wait=False):
+ super().__init__(comm, background=True,
+ shell=shell, fail=fail, ns=ns, host=host)
+ self.terminate = not exit_wait
+ self.check_fail = fail
+ def __enter__(self):
+ return self
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ return self.process(terminate=self.terminate, fail=self.check_fail)
+def tool(name, args, json=None, ns=None, host=None):
+ cmd_str = name + ' '
+ if json:
+ cmd_str += '--json '
+ cmd_str += args
+ cmd_obj = cmd(cmd_str, ns=ns, host=host)
+ if json:
+ return _json.loads(cmd_obj.stdout)
+ return cmd_obj
+def ip(args, json=None, ns=None, host=None):
+ if ns:
+ args = f'-netns {ns} ' + args
+ return tool('ip', args, json=json, host=host)
+def rand_port():
+ """
+ Get unprivileged port, for now just random, one day we may decide to check if used.
+ """
+ return random.randint(10000, 65535)
+def wait_port_listen(port, proto="tcp", ns=None, host=None, sleep=0.005, deadline=5):
+ end = time.monotonic() + deadline
+ pattern = f":{port:04X} .* "
+ if proto == "tcp": # for tcp protocol additionally check the socket state
+ pattern += "0A"
+ pattern = re.compile(pattern)
+ while True:
+ data = cmd(f'cat /proc/net/{proto}*', ns=ns, host=host, shell=True).stdout
+ for row in data.split("\n"):
+ if
+ return
+ if time.monotonic() > end:
+ raise Exception("Waiting for port listen timed out")
+ time.sleep(sleep)
diff --git a/tools/testing/selftests/net/lib/py/ b/tools/testing/selftests/net/lib/py/
new file mode 100644
index 0000000000..1ace58370c
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/
@@ -0,0 +1,49 @@
+# SPDX-License-Identifier: GPL-2.0
+import sys
+from pathlib import Path
+from .consts import KSRC, KSFT_DIR
+from .ksft import ksft_pr, ktap_result
+# Resolve paths
+ if (KSFT_DIR / "kselftest-list.txt").exists():
+ # Running in "installed" selftests
+ tools_full_path = KSFT_DIR
+ SPEC_PATH = KSFT_DIR / "net/lib/specs"
+ sys.path.append(tools_full_path.as_posix())
+ from net.lib.ynl.lib import YnlFamily, NlError
+ else:
+ # Running in tree
+ tools_full_path = KSRC / "tools"
+ SPEC_PATH = KSRC / "Documentation/netlink/specs"
+ sys.path.append(tools_full_path.as_posix())
+ from net.ynl.lib import YnlFamily, NlError
+except ModuleNotFoundError as e:
+ ksft_pr("Failed importing `ynl` library from kernel sources")
+ ksft_pr(str(e))
+ ktap_result(True, comment="SKIP")
+ sys.exit(4)
+# Wrapper classes, loading the right specs
+# Set schema='' to avoid jsonschema validation, it's slow
+class EthtoolFamily(YnlFamily):
+ def __init__(self):
+ super().__init__((SPEC_PATH / Path('ethtool.yaml')).as_posix(),
+ schema='')
+class RtnlFamily(YnlFamily):
+ def __init__(self):
+ super().__init__((SPEC_PATH / Path('rt_link.yaml')).as_posix(),
+ schema='')
+class NetdevFamily(YnlFamily):
+ def __init__(self):
+ super().__init__((SPEC_PATH / Path('netdev.yaml')).as_posix(),
+ schema='')