1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
"""TLS-specific tests"""
import itertools
import os
from socket import AF_INET, AF_INET6
import ssl
import sys
import pytest
from kresd import make_kresd
import utils
def test_tls_no_cert(kresd, sock_family):
"""Use TLS without certificates."""
sock, dest = kresd.stream_socket(sock_family, tls=True)
ctx = utils.make_ssl_context(insecure=True)
ssock = ctx.wrap_socket(sock)
ssock.connect(dest)
utils.ping_alive(ssock)
def test_tls_selfsigned_cert(kresd_tt, sock_family):
"""Use TLS with a self signed certificate."""
sock, dest = kresd_tt.stream_socket(sock_family, tls=True)
ctx = utils.make_ssl_context(verify_location=kresd_tt.tls_cert_path)
ssock = ctx.wrap_socket(sock, server_hostname='transport-test-server.com')
ssock.connect(dest)
utils.ping_alive(ssock)
def test_tls_cert_hostname_mismatch(kresd_tt, sock_family):
"""Attempt to use self signed certificate and incorrect hostname."""
sock, dest = kresd_tt.stream_socket(sock_family, tls=True)
ctx = utils.make_ssl_context(verify_location=kresd_tt.tls_cert_path)
ssock = ctx.wrap_socket(sock, server_hostname='wrong-host-name')
with pytest.raises(ssl.CertificateError):
ssock.connect(dest)
@pytest.mark.skipif(sys.version_info < (3, 6),
reason="requires python3.6 or higher")
@pytest.mark.parametrize('sf1, sf2, sf3', itertools.product(
[AF_INET, AF_INET6], [AF_INET, AF_INET6], [AF_INET, AF_INET6]))
def test_tls_session_resumption(tmpdir, sf1, sf2, sf3):
"""Attempt TLS session resumption against the same kresd instance and a different one."""
# TODO ensure that session can't be resumed after session ticket key regeneration
# at the first kresd instance
def connect(kresd, ctx, sf, session=None):
sock, dest = kresd.stream_socket(sf, tls=True)
ssock = ctx.wrap_socket(
sock, server_hostname='transport-test-server.com', session=session)
ssock.connect(dest)
new_session = ssock.session
assert new_session.has_ticket
assert ssock.session_reused == (session is not None)
utils.ping_alive(ssock)
ssock.close()
return new_session
workdir = os.path.join(str(tmpdir), 'kresd')
os.makedirs(workdir)
with make_kresd(workdir, 'tt') as kresd:
ctx = utils.make_ssl_context(verify_location=kresd.tls_cert_path)
session = connect(kresd, ctx, sf1) # initial conn
connect(kresd, ctx, sf2, session) # resume session on the same instance
workdir2 = os.path.join(str(tmpdir), 'kresd2')
os.makedirs(workdir2)
with make_kresd(workdir2, 'tt') as kresd2:
connect(kresd2, ctx, sf3, session) # resume session on a different instance
|