102 lines
3 KiB
Python
102 lines
3 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
"""TCP Connection Management tests - prefix length
|
|
|
|
RFC1035
|
|
4.2.2. TCP usage
|
|
The message is prefixed with a two byte length field which gives the message
|
|
length, excluding the two byte length field.
|
|
|
|
The following test suite focuses on edge cases for the prefix - when it
|
|
is either too short or too long, instead of matching the length of DNS
|
|
message exactly.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
import utils
|
|
|
|
|
|
@pytest.fixture(params=[
|
|
'no_query_before',
|
|
'query_before',
|
|
'query_before_in_single_buffer',
|
|
])
|
|
def send_query(request):
|
|
"""Function sends a buffer, either by itself, or with a valid query before.
|
|
If a valid query is sent before, it can be sent either in a separate buffer, or
|
|
along with the provided buffer."""
|
|
|
|
# pylint: disable=possibly-unused-variable
|
|
|
|
def no_query_before(sock, buff): # pylint: disable=unused-argument
|
|
sock.sendall(buff)
|
|
|
|
def query_before(sock, buff, single_buffer=False):
|
|
"""Send an initial query and expect a response."""
|
|
msg_buff, msgid = utils.get_msgbuff()
|
|
|
|
if single_buffer:
|
|
sock.sendall(msg_buff + buff)
|
|
else:
|
|
sock.sendall(msg_buff)
|
|
sock.sendall(buff)
|
|
|
|
answer = utils.receive_parse_answer(sock)
|
|
assert answer.id == msgid
|
|
|
|
def query_before_in_single_buffer(sock, buff):
|
|
return query_before(sock, buff, single_buffer=True)
|
|
|
|
return locals()[request.param]
|
|
|
|
|
|
@pytest.mark.parametrize('datalen', [
|
|
1, # just one byte of DNS header
|
|
11, # DNS header size minus 1
|
|
14, # DNS Header size plus 2
|
|
])
|
|
def test_prefix_cuts_message(kresd_sock, datalen, send_query):
|
|
"""Prefix is shorter than the DNS message."""
|
|
wire, _ = utils.prepare_wire()
|
|
assert datalen < len(wire)
|
|
invalid_buff = utils.prepare_buffer(wire, datalen)
|
|
|
|
send_query(kresd_sock, invalid_buff) # buffer breaks parsing of TCP stream
|
|
|
|
with utils.expect_kresd_close():
|
|
utils.ping_alive(kresd_sock)
|
|
|
|
|
|
def test_prefix_greater_than_message(kresd_sock, send_query):
|
|
"""Prefix is greater than the length of the entire DNS message."""
|
|
wire, _ = utils.prepare_wire()
|
|
datalen = len(wire) + 16
|
|
invalid_buff = utils.prepare_buffer(wire, datalen)
|
|
|
|
send_query(kresd_sock, invalid_buff)
|
|
|
|
# The query is ignored due to trailing garbage and parsing stream
|
|
# is broken by the invalid_buff, so valid query is never answered.
|
|
with utils.expect_kresd_close():
|
|
utils.ping_alive(kresd_sock)
|
|
|
|
|
|
@pytest.mark.parametrize('glength', [
|
|
1,
|
|
8,
|
|
1024,
|
|
4096,
|
|
20000,
|
|
])
|
|
def test_prefix_trailing_garbage(kresd_sock, glength, query_before):
|
|
"""Send messages with trailing garbage (its length included in prefix)."""
|
|
if query_before:
|
|
utils.ping_alive(kresd_sock)
|
|
|
|
wire, _ = utils.prepare_wire()
|
|
wire += utils.get_garbage(glength)
|
|
buff = utils.prepare_buffer(wire)
|
|
|
|
kresd_sock.sendall(buff)
|
|
# The query is ignored due to trailing garbage but others can proceed.
|
|
utils.ping_alive(kresd_sock)
|