diff options
Diffstat (limited to 'test/suite_capture.py')
-rw-r--r-- | test/suite_capture.py | 599 |
1 files changed, 599 insertions, 0 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py new file mode 100644 index 00000000..ffcd6cd5 --- /dev/null +++ b/test/suite_capture.py @@ -0,0 +1,599 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Capture tests''' + +import glob +import hashlib +import os +import socket +import subprocess +import subprocesstest +from subprocesstest import cat_dhcp_command, cat_cap_file_command, count_output, grep_output, check_packet_count +import sys +import threading +import time +import uuid +import sysconfig +import pytest + +capture_duration = 5 + +testout_pcap = 'testout.pcap' +testout_pcapng = 'testout.pcapng' +snapshot_len = 96 + +class UdpTrafficGenerator(threading.Thread): + def __init__(self): + super().__init__(daemon=True) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.stopped = False + + def run(self): + while not self.stopped: + time.sleep(.05) + self.sock.sendto(b'Wireshark test\n', ('127.0.0.1', 9)) + + def stop(self): + if not self.stopped: + self.stopped = True + self.join() + + +@pytest.fixture +def traffic_generator(): + ''' + Traffic generator factory. Invoking it returns a tuple (start_func, cfilter) + where cfilter is a capture filter to match the generated traffic. + start_func can be invoked to start generating traffic and returns a function + which can be used to stop traffic generation early. + Currently generates a bunch of UDP traffic to localhost. + ''' + threads = [] + def start_processes(): + thread = UdpTrafficGenerator() + thread.start() + threads.append(thread) + return thread.stop + try: + yield start_processes, 'udp port 9' + finally: + for thread in threads: + thread.stop() + + +@pytest.fixture(scope='session') +def wireshark_k(wireshark_command): + return tuple(list(wireshark_command) + ['-k']) + + +def capture_command(*args, shell=False): + cmd_args = list(args) + if type(cmd_args[0]) != str: + # Assume something like ['wireshark', '-k'] + cmd_args = list(cmd_args[0]) + list(cmd_args)[1:] + if shell: + cmd_args = ' '.join(cmd_args) + return cmd_args + + +@pytest.fixture +def check_capture_10_packets(capture_interface, cmd_capinfos, traffic_generator, result_file): + start_traffic, cfilter = traffic_generator + def check_capture_10_packets_real(self, cmd=None, to_stdout=False, env=None): + assert cmd is not None + testout_file = result_file(testout_pcap) + stop_traffic = start_traffic() + if to_stdout: + subprocesstest.check_run(capture_command(cmd, + '-i', '"{}"'.format(capture_interface), + '-p', + '-w', '-', + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', '"{}"'.format(cfilter), + '>', testout_file, + shell=True + ), + shell=True, env=env) + else: + subprocesstest.check_run(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', cfilter, + ), env=env) + stop_traffic() + check_packet_count(cmd_capinfos, 10, testout_file) + return check_capture_10_packets_real + + +@pytest.fixture +def check_capture_fifo(cmd_capinfos, result_file): + if sys.platform == 'win32': + pytest.skip('Test requires OS fifo support.') + + def check_capture_fifo_real(self, cmd=None, env=None): + assert cmd is not None + testout_file = result_file(testout_pcap) + fifo_file = result_file('testout.fifo') + try: + # If a previous test left its fifo laying around, e.g. from a failure, remove it. + os.unlink(fifo_file) + except Exception: + pass + os.mkfifo(fifo_file) + slow_dhcp_cmd = cat_dhcp_command('slow') + fifo_proc = subprocess.Popen( + ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), + shell=True) + subprocesstest.check_run(capture_command(cmd, + '-i', fifo_file, + '-p', + '-w', testout_file, + '-a', 'duration:{}'.format(capture_duration), + ), env=env) + fifo_proc.kill() + assert os.path.isfile(testout_file) + check_packet_count(cmd_capinfos, 8, testout_file) + return check_capture_fifo_real + + +@pytest.fixture +def check_capture_stdin(cmd_capinfos, result_file): + # Capturing always requires dumpcap, hence the dependency on it. + def check_capture_stdin_real(self, cmd=None, env=None): + # Similar to suite_io.check_io_4_packets. + assert cmd is not None + testout_file = result_file(testout_pcap) + slow_dhcp_cmd = cat_dhcp_command('slow') + capture_cmd = capture_command(cmd, + '-i', '-', + '-w', testout_file, + '-a', 'duration:{}'.format(capture_duration), + shell=True + ) + is_gui = type(cmd) != str and '-k' in cmd[0] + if is_gui: + capture_cmd += ' --log-level=info' + if sysconfig.get_platform().startswith('mingw'): + pytest.skip('FIXME Pipes are broken with the MSYS2 shell') + pipe_proc = subprocesstest.check_run(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True, capture_output=True, env=env) + if is_gui: + # Wireshark uses stdout and not stderr for diagnostic messages + # XXX: Confirm this + assert grep_output(pipe_proc.stdout, 'Wireshark is up and ready to go'), 'No startup message.' + assert grep_output(pipe_proc.stdout, 'Capture started'), 'No capture start message.' + assert grep_output(pipe_proc.stdout, 'Capture stopped'), 'No capture stop message.' + assert os.path.isfile(testout_file) + check_packet_count(cmd_capinfos, 8, testout_file) + return check_capture_stdin_real + + +@pytest.fixture +def check_capture_read_filter(capture_interface, traffic_generator, cmd_capinfos, result_file): + start_traffic, cfilter = traffic_generator + def check_capture_read_filter_real(self, cmd=None, env=None): + assert cmd is not None + testout_file = result_file(testout_pcap) + stop_traffic = start_traffic() + subprocesstest.check_run(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-2', + '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', cfilter, + ), env=env) + stop_traffic() + check_packet_count(cmd_capinfos, 0, testout_file) + return check_capture_read_filter_real + +@pytest.fixture +def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, cmd_capinfos, result_file): + start_traffic, cfilter = traffic_generator + def check_capture_snapshot_len_real(self, cmd=None, env=None): + assert cmd is not None + stop_traffic = start_traffic() + testout_file = result_file(testout_pcap) + subprocesstest.check_run(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-s', str(snapshot_len), + '-a', 'duration:{}'.format(capture_duration), + '-f', cfilter, + ), env=env) + stop_traffic() + assert os.path.isfile(testout_file) + + # Use tshark to filter out all packets larger than 68 bytes. + testout2_file = result_file('testout2.pcap') + + subprocesstest.check_run((cmd_tshark, + '-r', testout_file, + '-w', testout2_file, + '-Y', 'frame.cap_len>{}'.format(snapshot_len), + ), env=env) + check_packet_count(cmd_capinfos, 0, testout2_file) + return check_capture_snapshot_len_real + + +@pytest.fixture +def check_dumpcap_autostop_stdin(cmd_dumpcap, cmd_capinfos, result_file): + def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None, env=None): + # Similar to check_capture_stdin. + testout_file = result_file(testout_pcap) + cat100_dhcp_cmd = cat_dhcp_command('cat100') + condition='oops:invalid' + + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + else: + raise AssertionError('Need one of packets or filesize') + + cmd_ = '"{}"'.format(cmd_dumpcap) + capture_cmd = ' '.join((cmd_, + '-i', '-', + '-w', testout_file, + '-a', condition, + )) + if sysconfig.get_platform().startswith('mingw'): + pytest.skip('FIXME Pipes are broken with the MSYS2 shell') + subprocesstest.check_run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) + assert os.path.isfile(testout_file) + + if packets is not None: + check_packet_count(cmd_capinfos, packets, testout_file) + elif filesize is not None: + capturekb = os.path.getsize(testout_file) / 1000 + assert capturekb >= filesize + return check_dumpcap_autostop_stdin_real + + +@pytest.fixture +def check_dumpcap_ringbuffer_stdin(cmd_dumpcap, cmd_capinfos, result_file): + def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None, env=None): + # Similar to check_capture_stdin. + rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID + testout_file = result_file('testout.{}.pcapng'.format(rb_unique)) + testout_glob = result_file('testout.{}_*.pcapng'.format(rb_unique)) + cat100_dhcp_cmd = cat_dhcp_command('cat100') + condition='oops:invalid' + + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + else: + raise AssertionError('Need one of packets or filesize') + + cmd_ = '"{}"'.format(cmd_dumpcap) + capture_cmd = ' '.join((cmd_, + '-i', '-', + '-w', testout_file, + '-a', 'files:2', + '-b', condition, + )) + if sysconfig.get_platform().startswith('mingw'): + pytest.skip('FIXME Pipes are broken with the MSYS2 shell') + subprocesstest.check_run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env) + + rb_files = glob.glob(testout_glob) + assert len(rb_files) == 2 + + for rbf in rb_files: + assert os.path.isfile(rbf) + if packets is not None: + check_packet_count(cmd_capinfos, packets, rbf) + elif filesize is not None: + capturekb = os.path.getsize(rbf) / 1000 + assert capturekb >= filesize + return check_dumpcap_ringbuffer_stdin_real + + +@pytest.fixture +def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, cmd_capinfos, capture_file, result_file): + if sys.platform == 'win32': + pytest.skip('Test requires OS fifo support.') + def check_dumpcap_pcapng_sections_real(self, multi_input=False, multi_output=False, env=None): + # Make sure we always test multiple SHBs in an input. + in_files_l = [ [ + capture_file('many_interfaces.pcapng.1'), + capture_file('many_interfaces.pcapng.2') + ] ] + if multi_input: + in_files_l.append([ capture_file('many_interfaces.pcapng.3') ]) + fifo_files = [] + fifo_procs = [] + # Default values for our validity tests + check_val_d = { + 'filename': None, + 'packet_count': 0, + 'idb_count': 0, + 'ua_pt1_count': 0, + 'ua_pt2_count': 0, + 'ua_pt3_count': 0, + 'ua_dc_count': 0, + } + check_vals = [ check_val_d ] + + for in_files in in_files_l: + fifo_file = result_file('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1)) + fifo_files.append(fifo_file) + # If a previous test left its fifo laying around, e.g. from a failure, remove it. + try: + os.unlink(fifo_file) + except Exception: pass + os.mkfifo(fifo_file) + cat_cmd = cat_cap_file_command(in_files) + fifo_procs.append(subprocess.Popen(('{0} > {1}'.format(cat_cmd, fifo_file)), shell=True)) + + if multi_output: + rb_unique = 'sections_rb_' + uuid.uuid4().hex[:6] # Random ID + testout_file = result_file('testout.{}.pcapng'.format(rb_unique)) + testout_glob = result_file('testout.{}_*.pcapng'.format(rb_unique)) + check_vals.append(check_val_d.copy()) + # check_vals[]['filename'] will be filled in below + else: + testout_file = result_file(testout_pcapng) + check_vals[0]['filename'] = testout_file + + # Capture commands + if not multi_input and not multi_output: + # Passthrough SHBs, single output file + capture_cmd_args = ( + '-i', fifo_files[0], + '-w', testout_file + ) + check_vals[0]['packet_count'] = 79 + check_vals[0]['idb_count'] = 22 + check_vals[0]['ua_pt1_count'] = 1 + check_vals[0]['ua_pt2_count'] = 1 + elif not multi_input and multi_output: + # Passthrough SHBs, multiple output files + capture_cmd_args = ( + '-i', fifo_files[0], + '-w', testout_file, + '-a', 'files:2', + '-b', 'packets:53' + ) + check_vals[0]['packet_count'] = 53 + check_vals[0]['idb_count'] = 11 + check_vals[0]['ua_pt1_count'] = 1 + check_vals[1]['packet_count'] = 26 + check_vals[1]['idb_count'] = 22 + check_vals[1]['ua_pt1_count'] = 1 + check_vals[1]['ua_pt2_count'] = 1 + elif multi_input and not multi_output: + # Dumpcap SHBs, single output file + capture_cmd_args = ( + '-i', fifo_files[0], + '-i', fifo_files[1], + '-w', testout_file + ) + check_vals[0]['packet_count'] = 88 + check_vals[0]['idb_count'] = 33 + check_vals[0]['ua_dc_count'] = 1 + else: + # Dumpcap SHBs, multiple output files + capture_cmd_args = ( + '-i', fifo_files[0], + '-i', fifo_files[1], + '-w', testout_file, + '-a', 'files:2', + '-b', 'packets:53' + ) + check_vals[0]['packet_count'] = 53 + check_vals[0]['idb_count'] = 11 + check_vals[0]['ua_dc_count'] = 1 + check_vals[1]['packet_count'] = 35 + check_vals[1]['idb_count'] = 33 + check_vals[1]['ua_dc_count'] = 1 + + capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args) + + subprocesstest.check_run(capture_cmd, env=env) + for fifo_proc in fifo_procs: fifo_proc.kill() + + rb_files = [] + if multi_output: + rb_files = sorted(glob.glob(testout_glob)) + assert len(rb_files) == 2 + check_vals[0]['filename'] = rb_files[0] + check_vals[1]['filename'] = rb_files[1] + + for rbf in rb_files: + assert os.path.isfile(rbf) + + # Output tests + + if not multi_input and not multi_output: + # Check strict bit-for-bit passthrough. + in_hash = hashlib.sha256() + out_hash = hashlib.sha256() + for in_file in in_files_l[0]: + in_cap_file = capture_file(in_file) + with open(in_cap_file, 'rb') as f: + in_hash.update(f.read()) + with open(testout_file, 'rb') as f: + out_hash.update(f.read()) + assert in_hash.hexdigest() == out_hash.hexdigest() + + # many_interfaces.pcapng.1 : 64 packets written by "Passthrough test #1" + # many_interfaces.pcapng.2 : 15 packets written by "Passthrough test #2" + # many_interfaces.pcapng.3 : 9 packets written by "Passthrough test #3" + # Each has 11 interfaces. + idb_compare_eq = True + if multi_input and multi_output: + # Having multiple inputs forces the use of threads. In our + # case this means that non-packet block counts in the first + # file in is nondeterministic. + idb_compare_eq = False + for check_val in check_vals: + check_packet_count(cmd_capinfos, check_val['packet_count'], check_val['filename']) + + tshark_proc = subprocesstest.check_run(capture_command(cmd_tshark, + '-r', check_val['filename'], + '-V', + '-X', 'read_format:MIME Files Format' + ), capture_output=True, env=env) + # XXX Are there any other sanity checks we should run? + if idb_compare_eq: + assert count_output(tshark_proc.stdout, r'Block \d+: Interface Description Block \d+') \ + == check_val['idb_count'] + else: + assert count_output(tshark_proc.stdout, r'Block \d+: Interface Description Block \d+') \ + >= check_val['idb_count'] + idb_compare_eq = True + assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #1') \ + == check_val['ua_pt1_count'] + assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #2') \ + == check_val['ua_pt2_count'] + assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #3') \ + == check_val['ua_pt3_count'] + assert count_output(tshark_proc.stdout, r'Option: User Application = Dumpcap \(Wireshark\)') \ + == check_val['ua_dc_count'] + return check_dumpcap_pcapng_sections_real + + +class TestWiresharkCapture: + def test_wireshark_capture_10_packets_to_file(self, request, wireshark_k, check_capture_10_packets, make_screenshot_on_error, test_env): + '''Capture 10 packets from the network to a file using Wireshark''' + disabled = request.config.getoption('--disable-gui', default=False) + if disabled: + pytest.skip('GUI tests are disabled via --disable-gui') + with make_screenshot_on_error(): + check_capture_10_packets(self, cmd=wireshark_k, env=test_env) + + # Wireshark doesn't currently support writing to stdout while capturing. + # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets): + # '''Capture 10 packets from the network to stdout using Wireshark''' + # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True) + + def test_wireshark_capture_from_fifo(self, request, wireshark_k, check_capture_fifo, make_screenshot_on_error, test_env): + '''Capture from a fifo using Wireshark''' + disabled = request.config.getoption('--disable-gui', default=False) + if disabled: + pytest.skip('GUI tests are disabled via --disable-gui') + with make_screenshot_on_error(): + check_capture_fifo(self, cmd=wireshark_k, env=test_env) + + def test_wireshark_capture_from_stdin(self, request, wireshark_k, check_capture_stdin, make_screenshot_on_error, test_env): + '''Capture from stdin using Wireshark''' + disabled = request.config.getoption('--disable-gui', default=False) + if disabled: + pytest.skip('GUI tests are disabled via --disable-gui') + with make_screenshot_on_error(): + check_capture_stdin(self, cmd=wireshark_k, env=test_env) + + def test_wireshark_capture_snapshot_len(self, request, wireshark_k, check_capture_snapshot_len, make_screenshot_on_error, test_env): + '''Capture truncated packets using Wireshark''' + disabled = request.config.getoption('--disable-gui', default=False) + if disabled: + pytest.skip('GUI tests are disabled via --disable-gui') + with make_screenshot_on_error(): + check_capture_snapshot_len(self, cmd=wireshark_k, env=test_env) + + +class TestTsharkCapture: + def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets, test_env): + '''Capture 10 packets from the network to a file using TShark''' + check_capture_10_packets(self, cmd=cmd_tshark, env=test_env) + + def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets, test_env): + '''Capture 10 packets from the network to stdout using TShark''' + check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True, env=test_env) + + def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo, test_env): + '''Capture from a fifo using TShark''' + check_capture_fifo(self, cmd=cmd_tshark, env=test_env) + + def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin, test_env): + '''Capture from stdin using TShark''' + check_capture_stdin(self, cmd=cmd_tshark, env=test_env) + + def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len, test_env): + '''Capture truncated packets using TShark''' + check_capture_snapshot_len(self, cmd=cmd_tshark, env=test_env) + + +class TestDumpcapCapture: + def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets, base_env): + '''Capture 10 packets from the network to a file using Dumpcap''' + check_capture_10_packets(self, cmd=cmd_dumpcap, env=base_env) + + def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets, base_env): + '''Capture 10 packets from the network to stdout using Dumpcap''' + check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True, env=base_env) + + def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo, base_env): + '''Capture from a fifo using Dumpcap''' + check_capture_fifo(self, cmd=cmd_dumpcap, env=base_env) + + def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin, base_env): + '''Capture from stdin using Dumpcap''' + check_capture_stdin(self, cmd=cmd_dumpcap, env=base_env) + + def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap, base_env): + '''Capture truncated packets using Dumpcap''' + check_capture_snapshot_len(self, cmd=cmd_dumpcap, env=base_env) + + +class TestDumpcapAutostop: + # duration, filesize, packets, files + def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin, base_env): + '''Capture from stdin using Dumpcap until we reach a file size limit''' + check_dumpcap_autostop_stdin(self, filesize=15, env=base_env) + + def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin, base_env): + '''Capture from stdin using Dumpcap until we reach a packet limit''' + check_dumpcap_autostop_stdin(self, packets=97, env=base_env) # Last prime before 100. Arbitrary. + + +class TestDumpcapRingbuffer: + # duration, interval, filesize, packets, files + def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin, base_env): + '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit''' + check_dumpcap_ringbuffer_stdin(self, filesize=15, env=base_env) + + def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin, base_env): + '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit''' + check_dumpcap_ringbuffer_stdin(self, packets=47, env=base_env) # Last prime before 50. Arbitrary. + + +class TestDumpcapPcapngSections: + def test_dumpcap_pcapng_single_in_single_out(self, check_dumpcap_pcapng_sections, base_env): + '''Capture from a single pcapng source using Dumpcap and write a single file''' + if sys.byteorder == 'big': + pytest.skip('this test is supported on little endian only') + check_dumpcap_pcapng_sections(self, env=base_env) + + def test_dumpcap_pcapng_single_in_multi_out(self, check_dumpcap_pcapng_sections, base_env): + '''Capture from a single pcapng source using Dumpcap and write two files''' + if sys.byteorder == 'big': + pytest.skip('this test is supported on little endian only') + check_dumpcap_pcapng_sections(self, multi_output=True, env=base_env) + + def test_dumpcap_pcapng_multi_in_single_out(self, check_dumpcap_pcapng_sections, base_env): + '''Capture from two pcapng sources using Dumpcap and write a single file''' + if sys.byteorder == 'big': + pytest.skip('this test is supported on little endian only') + check_dumpcap_pcapng_sections(self, multi_input=True, env=base_env) + + def test_dumpcap_pcapng_multi_in_multi_out(self, check_dumpcap_pcapng_sections, base_env): + '''Capture from two pcapng sources using Dumpcap and write two files''' + if sys.byteorder == 'big': + pytest.skip('this test is supported on little endian only') + check_dumpcap_pcapng_sections(self, multi_input=True, multi_output=True, env=base_env) |