summaryrefslogtreecommitdiffstats
path: root/test/test_resolve.py
blob: fbaa033750160b63a653e37a6405f9479827571e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import socket
import pytest


# pylint: disable=attribute-defined-outside-init,protected-access
class TestResolve:
    @pytest.fixture(autouse=True)
    def init(self, ssh_audit):
        self.AuditConf = ssh_audit.AuditConf
        self.audit = ssh_audit.audit
        self.OutputBuffer = ssh_audit.OutputBuffer
        self.ssh_socket = ssh_audit.SSH_Socket

    def _conf(self):
        conf = self.AuditConf('localhost', 22)
        conf.colors = False
        conf.batch = True
        return conf

    def test_resolve_error(self, output_spy, virtual_socket):
        vsocket = virtual_socket
        vsocket.gsock.addrinfodata['localhost#22'] = socket.gaierror(8, 'hostname nor servname provided, or not known')
        conf = self._conf()
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        output_spy.begin()
        with pytest.raises(SystemExit):
            list(s._resolve())
        lines = output_spy.flush()
        assert len(lines) == 1
        assert 'hostname nor servname provided' in lines[-1]

    def test_resolve_hostname_without_records(self, output_spy, virtual_socket):
        vsocket = virtual_socket
        vsocket.gsock.addrinfodata['localhost#22'] = []
        conf = self._conf()
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        output_spy.begin()
        r = list(s._resolve())
        assert len(r) == 0

    def test_resolve_ipv4(self, virtual_socket):
        conf = self._conf()
        conf.ipv4 = True
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        r = list(s._resolve())
        assert len(r) == 1
        assert r[0] == (socket.AF_INET, ('127.0.0.1', 22))

    def test_resolve_ipv6(self, virtual_socket):
        conf = self._conf()
        conf.ipv6 = True
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        r = list(s._resolve())
        assert len(r) == 1
        assert r[0] == (socket.AF_INET6, ('::1', 22))

    def test_resolve_ipv46_both(self, virtual_socket):
        conf = self._conf()
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        r = list(s._resolve())
        assert len(r) == 2
        assert r[0] == (socket.AF_INET, ('127.0.0.1', 22))
        assert r[1] == (socket.AF_INET6, ('::1', 22))

    def test_resolve_ipv46_order(self, virtual_socket):
        conf = self._conf()
        conf.ipv4 = True
        conf.ipv6 = True
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        r = list(s._resolve())
        assert len(r) == 2
        assert r[0] == (socket.AF_INET, ('127.0.0.1', 22))
        assert r[1] == (socket.AF_INET6, ('::1', 22))
        conf = self._conf()
        conf.ipv6 = True
        conf.ipv4 = True
        s = self.ssh_socket(self.OutputBuffer(), 'localhost', 22, conf.ip_version_preference)
        r = list(s._resolve())
        assert len(r) == 2
        assert r[0] == (socket.AF_INET6, ('::1', 22))
        assert r[1] == (socket.AF_INET, ('127.0.0.1', 22))