1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
#!/bin/env python3
# SPDX-License-Identifier: GPL-2.0
"""
This script helps generate fragmented UDP packets.
While it is technically possible to dynamically generate
fragmented packets in C, it is much harder to read and write
said code. `scapy` is relatively industry standard and really
easy to read / write.
So we choose to write this script that generates a valid C
header. Rerun script and commit generated file after any
modifications.
"""
import argparse
import os
from scapy.all import *
# These constants must stay in sync with `ip_check_defrag.c`
VETH1_ADDR = "172.16.1.200"
VETH0_ADDR6 = "fc00::100"
VETH1_ADDR6 = "fc00::200"
CLIENT_PORT = 48878
SERVER_PORT = 48879
MAGIC_MESSAGE = "THIS IS THE ORIGINAL MESSAGE, PLEASE REASSEMBLE ME"
def print_header(f):
f.write("// SPDX-License-Identifier: GPL-2.0\n")
f.write("/* DO NOT EDIT -- this file is generated */\n")
f.write("\n")
f.write("#ifndef _IP_CHECK_DEFRAG_FRAGS_H\n")
f.write("#define _IP_CHECK_DEFRAG_FRAGS_H\n")
f.write("\n")
f.write("#include <stdint.h>\n")
f.write("\n")
def print_frags(f, frags, v6):
for idx, frag in enumerate(frags):
# 10 bytes per line to keep width in check
chunks = [frag[i : i + 10] for i in range(0, len(frag), 10)]
chunks_fmted = [", ".join([str(hex(b)) for b in chunk]) for chunk in chunks]
suffix = "6" if v6 else ""
f.write(f"static uint8_t frag{suffix}_{idx}[] = {{\n")
for chunk in chunks_fmted:
f.write(f"\t{chunk},\n")
f.write(f"}};\n")
def print_trailer(f):
f.write("\n")
f.write("#endif /* _IP_CHECK_DEFRAG_FRAGS_H */\n")
def main(f):
# srcip of 0 is filled in by IP_HDRINCL
sip = "0.0.0.0"
sip6 = VETH0_ADDR6
dip = VETH1_ADDR
dip6 = VETH1_ADDR6
sport = CLIENT_PORT
dport = SERVER_PORT
payload = MAGIC_MESSAGE.encode()
# Disable UDPv4 checksums to keep code simpler
pkt = IP(src=sip,dst=dip) / UDP(sport=sport,dport=dport,chksum=0) / Raw(load=payload)
# UDPv6 requires a checksum
# Also pin the ipv6 fragment header ID, otherwise it's a random value
pkt6 = IPv6(src=sip6,dst=dip6) / IPv6ExtHdrFragment(id=0xBEEF) / UDP(sport=sport,dport=dport) / Raw(load=payload)
frags = [f.build() for f in pkt.fragment(24)]
frags6 = [f.build() for f in fragment6(pkt6, 72)]
print_header(f)
print_frags(f, frags, False)
print_frags(f, frags6, True)
print_trailer(f)
if __name__ == "__main__":
dir = os.path.dirname(os.path.realpath(__file__))
header = f"{dir}/ip_check_defrag_frags.h"
with open(header, "w") as f:
main(f)
|