1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
#!/usr/bin/env python3
#
# mcast-tx.py
#
# Copyright (c) 2018 Cumulus Networks, Inc.
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
# that the above copyright notice and this permission notice appear
# in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND Cumulus Networks DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
#
import argparse
import logging
import socket
import struct
import time
import sys
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(levelname)5s: %(message)s"
)
# Color the errors and warnings in red
logging.addLevelName(
logging.ERROR, "\033[91m %s\033[0m" % logging.getLevelName(logging.ERROR)
)
logging.addLevelName(
logging.WARNING, "\033[91m%s\033[0m" % logging.getLevelName(logging.WARNING)
)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description="Multicast packet generator")
parser.add_argument("group", help="Multicast IP")
parser.add_argument("ifname", help="Interface name")
parser.add_argument("--port", type=int, help="UDP port number", default=1000)
parser.add_argument("--ttl", type=int, help="time-to-live", default=20)
parser.add_argument("--count", type=int, help="Packets to send", default=1)
parser.add_argument("--interval", type=int, help="ms between packets", default=100)
args = parser.parse_args()
# Create the datagram socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# IN.SO_BINDTODEVICE is not defined in some releases of python but it is 25
# https://github.com/sivel/bonding/issues/10
#
# Bind our socket to ifname
#
# Note ugly python version incompatibility
#
if sys.version_info[0] > 2:
sock.setsockopt(
socket.SOL_SOCKET,
25,
struct.pack("%ds" % len(args.ifname), args.ifname.encode("utf-8")),
)
else:
sock.setsockopt(
socket.SOL_SOCKET, 25, struct.pack("%ds" % len(args.ifname), args.ifname)
)
# We need to make sure our sendto() finishes before we close the socket
sock.setblocking(1)
# Set the time-to-live
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, struct.pack("b", args.ttl))
ms = args.interval / 1000.0
# Send data to the multicast group
for x in range(args.count):
log.info(
"TX multicast UDP packet to %s:%d on %s" % (args.group, args.port, args.ifname)
)
#
# Note ugly python version incompatibility
#
if sys.version_info[0] > 2:
sent = sock.sendto(b"foobar %d" % x, (args.group, args.port))
else:
sent = sock.sendto("foobar %d" % x, (args.group, args.port))
if args.count > 1 and ms:
time.sleep(ms)
sock.close()
|