summaryrefslogtreecommitdiffstats
path: root/test/suite_capture.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/suite_capture.py')
-rw-r--r--test/suite_capture.py599
1 files changed, 599 insertions, 0 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py
new file mode 100644
index 00000000..ffcd6cd5
--- /dev/null
+++ b/test/suite_capture.py
@@ -0,0 +1,599 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Capture tests'''
+
+import glob
+import hashlib
+import os
+import socket
+import subprocess
+import subprocesstest
+from subprocesstest import cat_dhcp_command, cat_cap_file_command, count_output, grep_output, check_packet_count
+import sys
+import threading
+import time
+import uuid
+import sysconfig
+import pytest
+
+capture_duration = 5
+
+testout_pcap = 'testout.pcap'
+testout_pcapng = 'testout.pcapng'
+snapshot_len = 96
+
+class UdpTrafficGenerator(threading.Thread):
+ def __init__(self):
+ super().__init__(daemon=True)
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.stopped = False
+
+ def run(self):
+ while not self.stopped:
+ time.sleep(.05)
+ self.sock.sendto(b'Wireshark test\n', ('127.0.0.1', 9))
+
+ def stop(self):
+ if not self.stopped:
+ self.stopped = True
+ self.join()
+
+
+@pytest.fixture
+def traffic_generator():
+ '''
+ Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
+ where cfilter is a capture filter to match the generated traffic.
+ start_func can be invoked to start generating traffic and returns a function
+ which can be used to stop traffic generation early.
+ Currently generates a bunch of UDP traffic to localhost.
+ '''
+ threads = []
+ def start_processes():
+ thread = UdpTrafficGenerator()
+ thread.start()
+ threads.append(thread)
+ return thread.stop
+ try:
+ yield start_processes, 'udp port 9'
+ finally:
+ for thread in threads:
+ thread.stop()
+
+
+@pytest.fixture(scope='session')
+def wireshark_k(wireshark_command):
+ return tuple(list(wireshark_command) + ['-k'])
+
+
+def capture_command(*args, shell=False):
+ cmd_args = list(args)
+ if type(cmd_args[0]) != str:
+ # Assume something like ['wireshark', '-k']
+ cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
+ if shell:
+ cmd_args = ' '.join(cmd_args)
+ return cmd_args
+
+
+@pytest.fixture
+def check_capture_10_packets(capture_interface, cmd_capinfos, traffic_generator, result_file):
+ start_traffic, cfilter = traffic_generator
+ def check_capture_10_packets_real(self, cmd=None, to_stdout=False, env=None):
+ assert cmd is not None
+ testout_file = result_file(testout_pcap)
+ stop_traffic = start_traffic()
+ if to_stdout:
+ subprocesstest.check_run(capture_command(cmd,
+ '-i', '"{}"'.format(capture_interface),
+ '-p',
+ '-w', '-',
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', '"{}"'.format(cfilter),
+ '>', testout_file,
+ shell=True
+ ),
+ shell=True, env=env)
+ else:
+ subprocesstest.check_run(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', cfilter,
+ ), env=env)
+ stop_traffic()
+ check_packet_count(cmd_capinfos, 10, testout_file)
+ return check_capture_10_packets_real
+
+
+@pytest.fixture
+def check_capture_fifo(cmd_capinfos, result_file):
+ if sys.platform == 'win32':
+ pytest.skip('Test requires OS fifo support.')
+
+ def check_capture_fifo_real(self, cmd=None, env=None):
+ assert cmd is not None
+ testout_file = result_file(testout_pcap)
+ fifo_file = result_file('testout.fifo')
+ try:
+ # If a previous test left its fifo laying around, e.g. from a failure, remove it.
+ os.unlink(fifo_file)
+ except Exception:
+ pass
+ os.mkfifo(fifo_file)
+ slow_dhcp_cmd = cat_dhcp_command('slow')
+ fifo_proc = subprocess.Popen(
+ ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
+ shell=True)
+ subprocesstest.check_run(capture_command(cmd,
+ '-i', fifo_file,
+ '-p',
+ '-w', testout_file,
+ '-a', 'duration:{}'.format(capture_duration),
+ ), env=env)
+ fifo_proc.kill()
+ assert os.path.isfile(testout_file)
+ check_packet_count(cmd_capinfos, 8, testout_file)
+ return check_capture_fifo_real
+
+
+@pytest.fixture
+def check_capture_stdin(cmd_capinfos, result_file):
+ # Capturing always requires dumpcap, hence the dependency on it.
+ def check_capture_stdin_real(self, cmd=None, env=None):
+ # Similar to suite_io.check_io_4_packets.
+ assert cmd is not None
+ testout_file = result_file(testout_pcap)
+ slow_dhcp_cmd = cat_dhcp_command('slow')
+ capture_cmd = capture_command(cmd,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', 'duration:{}'.format(capture_duration),
+ shell=True
+ )
+ is_gui = type(cmd) != str and '-k' in cmd[0]
+ if is_gui:
+ capture_cmd += ' --log-level=info'
+ if sysconfig.get_platform().startswith('mingw'):
+ pytest.skip('FIXME Pipes are broken with the MSYS2 shell')
+ pipe_proc = subprocesstest.check_run(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True, capture_output=True, env=env)
+ if is_gui:
+ # Wireshark uses stdout and not stderr for diagnostic messages
+ # XXX: Confirm this
+ assert grep_output(pipe_proc.stdout, 'Wireshark is up and ready to go'), 'No startup message.'
+ assert grep_output(pipe_proc.stdout, 'Capture started'), 'No capture start message.'
+ assert grep_output(pipe_proc.stdout, 'Capture stopped'), 'No capture stop message.'
+ assert os.path.isfile(testout_file)
+ check_packet_count(cmd_capinfos, 8, testout_file)
+ return check_capture_stdin_real
+
+
+@pytest.fixture
+def check_capture_read_filter(capture_interface, traffic_generator, cmd_capinfos, result_file):
+ start_traffic, cfilter = traffic_generator
+ def check_capture_read_filter_real(self, cmd=None, env=None):
+ assert cmd is not None
+ testout_file = result_file(testout_pcap)
+ stop_traffic = start_traffic()
+ subprocesstest.check_run(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-2',
+ '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', cfilter,
+ ), env=env)
+ stop_traffic()
+ check_packet_count(cmd_capinfos, 0, testout_file)
+ return check_capture_read_filter_real
+
+@pytest.fixture
+def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, cmd_capinfos, result_file):
+ start_traffic, cfilter = traffic_generator
+ def check_capture_snapshot_len_real(self, cmd=None, env=None):
+ assert cmd is not None
+ stop_traffic = start_traffic()
+ testout_file = result_file(testout_pcap)
+ subprocesstest.check_run(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-s', str(snapshot_len),
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', cfilter,
+ ), env=env)
+ stop_traffic()
+ assert os.path.isfile(testout_file)
+
+ # Use tshark to filter out all packets larger than 68 bytes.
+ testout2_file = result_file('testout2.pcap')
+
+ subprocesstest.check_run((cmd_tshark,
+ '-r', testout_file,
+ '-w', testout2_file,
+ '-Y', 'frame.cap_len>{}'.format(snapshot_len),
+ ), env=env)
+ check_packet_count(cmd_capinfos, 0, testout2_file)
+ return check_capture_snapshot_len_real
+
+
+@pytest.fixture
+def check_dumpcap_autostop_stdin(cmd_dumpcap, cmd_capinfos, result_file):
+ def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None, env=None):
+ # Similar to check_capture_stdin.
+ testout_file = result_file(testout_pcap)
+ cat100_dhcp_cmd = cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+ else:
+ raise AssertionError('Need one of packets or filesize')
+
+ cmd_ = '"{}"'.format(cmd_dumpcap)
+ capture_cmd = ' '.join((cmd_,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', condition,
+ ))
+ if sysconfig.get_platform().startswith('mingw'):
+ pytest.skip('FIXME Pipes are broken with the MSYS2 shell')
+ subprocesstest.check_run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env)
+ assert os.path.isfile(testout_file)
+
+ if packets is not None:
+ check_packet_count(cmd_capinfos, packets, testout_file)
+ elif filesize is not None:
+ capturekb = os.path.getsize(testout_file) / 1000
+ assert capturekb >= filesize
+ return check_dumpcap_autostop_stdin_real
+
+
+@pytest.fixture
+def check_dumpcap_ringbuffer_stdin(cmd_dumpcap, cmd_capinfos, result_file):
+ def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None, env=None):
+ # Similar to check_capture_stdin.
+ rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
+ testout_file = result_file('testout.{}.pcapng'.format(rb_unique))
+ testout_glob = result_file('testout.{}_*.pcapng'.format(rb_unique))
+ cat100_dhcp_cmd = cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+ else:
+ raise AssertionError('Need one of packets or filesize')
+
+ cmd_ = '"{}"'.format(cmd_dumpcap)
+ capture_cmd = ' '.join((cmd_,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', 'files:2',
+ '-b', condition,
+ ))
+ if sysconfig.get_platform().startswith('mingw'):
+ pytest.skip('FIXME Pipes are broken with the MSYS2 shell')
+ subprocesstest.check_run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True, env=env)
+
+ rb_files = glob.glob(testout_glob)
+ assert len(rb_files) == 2
+
+ for rbf in rb_files:
+ assert os.path.isfile(rbf)
+ if packets is not None:
+ check_packet_count(cmd_capinfos, packets, rbf)
+ elif filesize is not None:
+ capturekb = os.path.getsize(rbf) / 1000
+ assert capturekb >= filesize
+ return check_dumpcap_ringbuffer_stdin_real
+
+
+@pytest.fixture
+def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, cmd_capinfos, capture_file, result_file):
+ if sys.platform == 'win32':
+ pytest.skip('Test requires OS fifo support.')
+ def check_dumpcap_pcapng_sections_real(self, multi_input=False, multi_output=False, env=None):
+ # Make sure we always test multiple SHBs in an input.
+ in_files_l = [ [
+ capture_file('many_interfaces.pcapng.1'),
+ capture_file('many_interfaces.pcapng.2')
+ ] ]
+ if multi_input:
+ in_files_l.append([ capture_file('many_interfaces.pcapng.3') ])
+ fifo_files = []
+ fifo_procs = []
+ # Default values for our validity tests
+ check_val_d = {
+ 'filename': None,
+ 'packet_count': 0,
+ 'idb_count': 0,
+ 'ua_pt1_count': 0,
+ 'ua_pt2_count': 0,
+ 'ua_pt3_count': 0,
+ 'ua_dc_count': 0,
+ }
+ check_vals = [ check_val_d ]
+
+ for in_files in in_files_l:
+ fifo_file = result_file('dumpcap_pcapng_sections_{}.fifo'.format(len(fifo_files) + 1))
+ fifo_files.append(fifo_file)
+ # If a previous test left its fifo laying around, e.g. from a failure, remove it.
+ try:
+ os.unlink(fifo_file)
+ except Exception: pass
+ os.mkfifo(fifo_file)
+ cat_cmd = cat_cap_file_command(in_files)
+ fifo_procs.append(subprocess.Popen(('{0} > {1}'.format(cat_cmd, fifo_file)), shell=True))
+
+ if multi_output:
+ rb_unique = 'sections_rb_' + uuid.uuid4().hex[:6] # Random ID
+ testout_file = result_file('testout.{}.pcapng'.format(rb_unique))
+ testout_glob = result_file('testout.{}_*.pcapng'.format(rb_unique))
+ check_vals.append(check_val_d.copy())
+ # check_vals[]['filename'] will be filled in below
+ else:
+ testout_file = result_file(testout_pcapng)
+ check_vals[0]['filename'] = testout_file
+
+ # Capture commands
+ if not multi_input and not multi_output:
+ # Passthrough SHBs, single output file
+ capture_cmd_args = (
+ '-i', fifo_files[0],
+ '-w', testout_file
+ )
+ check_vals[0]['packet_count'] = 79
+ check_vals[0]['idb_count'] = 22
+ check_vals[0]['ua_pt1_count'] = 1
+ check_vals[0]['ua_pt2_count'] = 1
+ elif not multi_input and multi_output:
+ # Passthrough SHBs, multiple output files
+ capture_cmd_args = (
+ '-i', fifo_files[0],
+ '-w', testout_file,
+ '-a', 'files:2',
+ '-b', 'packets:53'
+ )
+ check_vals[0]['packet_count'] = 53
+ check_vals[0]['idb_count'] = 11
+ check_vals[0]['ua_pt1_count'] = 1
+ check_vals[1]['packet_count'] = 26
+ check_vals[1]['idb_count'] = 22
+ check_vals[1]['ua_pt1_count'] = 1
+ check_vals[1]['ua_pt2_count'] = 1
+ elif multi_input and not multi_output:
+ # Dumpcap SHBs, single output file
+ capture_cmd_args = (
+ '-i', fifo_files[0],
+ '-i', fifo_files[1],
+ '-w', testout_file
+ )
+ check_vals[0]['packet_count'] = 88
+ check_vals[0]['idb_count'] = 33
+ check_vals[0]['ua_dc_count'] = 1
+ else:
+ # Dumpcap SHBs, multiple output files
+ capture_cmd_args = (
+ '-i', fifo_files[0],
+ '-i', fifo_files[1],
+ '-w', testout_file,
+ '-a', 'files:2',
+ '-b', 'packets:53'
+ )
+ check_vals[0]['packet_count'] = 53
+ check_vals[0]['idb_count'] = 11
+ check_vals[0]['ua_dc_count'] = 1
+ check_vals[1]['packet_count'] = 35
+ check_vals[1]['idb_count'] = 33
+ check_vals[1]['ua_dc_count'] = 1
+
+ capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args)
+
+ subprocesstest.check_run(capture_cmd, env=env)
+ for fifo_proc in fifo_procs: fifo_proc.kill()
+
+ rb_files = []
+ if multi_output:
+ rb_files = sorted(glob.glob(testout_glob))
+ assert len(rb_files) == 2
+ check_vals[0]['filename'] = rb_files[0]
+ check_vals[1]['filename'] = rb_files[1]
+
+ for rbf in rb_files:
+ assert os.path.isfile(rbf)
+
+ # Output tests
+
+ if not multi_input and not multi_output:
+ # Check strict bit-for-bit passthrough.
+ in_hash = hashlib.sha256()
+ out_hash = hashlib.sha256()
+ for in_file in in_files_l[0]:
+ in_cap_file = capture_file(in_file)
+ with open(in_cap_file, 'rb') as f:
+ in_hash.update(f.read())
+ with open(testout_file, 'rb') as f:
+ out_hash.update(f.read())
+ assert in_hash.hexdigest() == out_hash.hexdigest()
+
+ # many_interfaces.pcapng.1 : 64 packets written by "Passthrough test #1"
+ # many_interfaces.pcapng.2 : 15 packets written by "Passthrough test #2"
+ # many_interfaces.pcapng.3 : 9 packets written by "Passthrough test #3"
+ # Each has 11 interfaces.
+ idb_compare_eq = True
+ if multi_input and multi_output:
+ # Having multiple inputs forces the use of threads. In our
+ # case this means that non-packet block counts in the first
+ # file in is nondeterministic.
+ idb_compare_eq = False
+ for check_val in check_vals:
+ check_packet_count(cmd_capinfos, check_val['packet_count'], check_val['filename'])
+
+ tshark_proc = subprocesstest.check_run(capture_command(cmd_tshark,
+ '-r', check_val['filename'],
+ '-V',
+ '-X', 'read_format:MIME Files Format'
+ ), capture_output=True, env=env)
+ # XXX Are there any other sanity checks we should run?
+ if idb_compare_eq:
+ assert count_output(tshark_proc.stdout, r'Block \d+: Interface Description Block \d+') \
+ == check_val['idb_count']
+ else:
+ assert count_output(tshark_proc.stdout, r'Block \d+: Interface Description Block \d+') \
+ >= check_val['idb_count']
+ idb_compare_eq = True
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #1') \
+ == check_val['ua_pt1_count']
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #2') \
+ == check_val['ua_pt2_count']
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #3') \
+ == check_val['ua_pt3_count']
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Dumpcap \(Wireshark\)') \
+ == check_val['ua_dc_count']
+ return check_dumpcap_pcapng_sections_real
+
+
+class TestWiresharkCapture:
+ def test_wireshark_capture_10_packets_to_file(self, request, wireshark_k, check_capture_10_packets, make_screenshot_on_error, test_env):
+ '''Capture 10 packets from the network to a file using Wireshark'''
+ disabled = request.config.getoption('--disable-gui', default=False)
+ if disabled:
+ pytest.skip('GUI tests are disabled via --disable-gui')
+ with make_screenshot_on_error():
+ check_capture_10_packets(self, cmd=wireshark_k, env=test_env)
+
+ # Wireshark doesn't currently support writing to stdout while capturing.
+ # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
+ # '''Capture 10 packets from the network to stdout using Wireshark'''
+ # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
+
+ def test_wireshark_capture_from_fifo(self, request, wireshark_k, check_capture_fifo, make_screenshot_on_error, test_env):
+ '''Capture from a fifo using Wireshark'''
+ disabled = request.config.getoption('--disable-gui', default=False)
+ if disabled:
+ pytest.skip('GUI tests are disabled via --disable-gui')
+ with make_screenshot_on_error():
+ check_capture_fifo(self, cmd=wireshark_k, env=test_env)
+
+ def test_wireshark_capture_from_stdin(self, request, wireshark_k, check_capture_stdin, make_screenshot_on_error, test_env):
+ '''Capture from stdin using Wireshark'''
+ disabled = request.config.getoption('--disable-gui', default=False)
+ if disabled:
+ pytest.skip('GUI tests are disabled via --disable-gui')
+ with make_screenshot_on_error():
+ check_capture_stdin(self, cmd=wireshark_k, env=test_env)
+
+ def test_wireshark_capture_snapshot_len(self, request, wireshark_k, check_capture_snapshot_len, make_screenshot_on_error, test_env):
+ '''Capture truncated packets using Wireshark'''
+ disabled = request.config.getoption('--disable-gui', default=False)
+ if disabled:
+ pytest.skip('GUI tests are disabled via --disable-gui')
+ with make_screenshot_on_error():
+ check_capture_snapshot_len(self, cmd=wireshark_k, env=test_env)
+
+
+class TestTsharkCapture:
+ def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets, test_env):
+ '''Capture 10 packets from the network to a file using TShark'''
+ check_capture_10_packets(self, cmd=cmd_tshark, env=test_env)
+
+ def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets, test_env):
+ '''Capture 10 packets from the network to stdout using TShark'''
+ check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True, env=test_env)
+
+ def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo, test_env):
+ '''Capture from a fifo using TShark'''
+ check_capture_fifo(self, cmd=cmd_tshark, env=test_env)
+
+ def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin, test_env):
+ '''Capture from stdin using TShark'''
+ check_capture_stdin(self, cmd=cmd_tshark, env=test_env)
+
+ def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len, test_env):
+ '''Capture truncated packets using TShark'''
+ check_capture_snapshot_len(self, cmd=cmd_tshark, env=test_env)
+
+
+class TestDumpcapCapture:
+ def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets, base_env):
+ '''Capture 10 packets from the network to a file using Dumpcap'''
+ check_capture_10_packets(self, cmd=cmd_dumpcap, env=base_env)
+
+ def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets, base_env):
+ '''Capture 10 packets from the network to stdout using Dumpcap'''
+ check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True, env=base_env)
+
+ def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo, base_env):
+ '''Capture from a fifo using Dumpcap'''
+ check_capture_fifo(self, cmd=cmd_dumpcap, env=base_env)
+
+ def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin, base_env):
+ '''Capture from stdin using Dumpcap'''
+ check_capture_stdin(self, cmd=cmd_dumpcap, env=base_env)
+
+ def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap, base_env):
+ '''Capture truncated packets using Dumpcap'''
+ check_capture_snapshot_len(self, cmd=cmd_dumpcap, env=base_env)
+
+
+class TestDumpcapAutostop:
+ # duration, filesize, packets, files
+ def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin, base_env):
+ '''Capture from stdin using Dumpcap until we reach a file size limit'''
+ check_dumpcap_autostop_stdin(self, filesize=15, env=base_env)
+
+ def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin, base_env):
+ '''Capture from stdin using Dumpcap until we reach a packet limit'''
+ check_dumpcap_autostop_stdin(self, packets=97, env=base_env) # Last prime before 100. Arbitrary.
+
+
+class TestDumpcapRingbuffer:
+ # duration, interval, filesize, packets, files
+ def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin, base_env):
+ '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
+ check_dumpcap_ringbuffer_stdin(self, filesize=15, env=base_env)
+
+ def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin, base_env):
+ '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
+ check_dumpcap_ringbuffer_stdin(self, packets=47, env=base_env) # Last prime before 50. Arbitrary.
+
+
+class TestDumpcapPcapngSections:
+ def test_dumpcap_pcapng_single_in_single_out(self, check_dumpcap_pcapng_sections, base_env):
+ '''Capture from a single pcapng source using Dumpcap and write a single file'''
+ if sys.byteorder == 'big':
+ pytest.skip('this test is supported on little endian only')
+ check_dumpcap_pcapng_sections(self, env=base_env)
+
+ def test_dumpcap_pcapng_single_in_multi_out(self, check_dumpcap_pcapng_sections, base_env):
+ '''Capture from a single pcapng source using Dumpcap and write two files'''
+ if sys.byteorder == 'big':
+ pytest.skip('this test is supported on little endian only')
+ check_dumpcap_pcapng_sections(self, multi_output=True, env=base_env)
+
+ def test_dumpcap_pcapng_multi_in_single_out(self, check_dumpcap_pcapng_sections, base_env):
+ '''Capture from two pcapng sources using Dumpcap and write a single file'''
+ if sys.byteorder == 'big':
+ pytest.skip('this test is supported on little endian only')
+ check_dumpcap_pcapng_sections(self, multi_input=True, env=base_env)
+
+ def test_dumpcap_pcapng_multi_in_multi_out(self, check_dumpcap_pcapng_sections, base_env):
+ '''Capture from two pcapng sources using Dumpcap and write two files'''
+ if sys.byteorder == 'big':
+ pytest.skip('this test is supported on little endian only')
+ check_dumpcap_pcapng_sections(self, multi_input=True, multi_output=True, env=base_env)