diff options
Diffstat (limited to 'test/test_errors.py')
-rw-r--r-- | test/test_errors.py | 172 |
1 files changed, 172 insertions, 0 deletions
diff --git a/test/test_errors.py b/test/test_errors.py new file mode 100644 index 0000000..90cfd1c --- /dev/null +++ b/test/test_errors.py @@ -0,0 +1,172 @@ +import socket +import errno +import pytest + +from ssh_audit.outputbuffer import OutputBuffer + + +# pylint: disable=attribute-defined-outside-init +class TestErrors: + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.AuditConf = ssh_audit.AuditConf + self.OutputBuffer = ssh_audit.OutputBuffer + self.audit = ssh_audit.audit + + def _conf(self): + conf = self.AuditConf('localhost', 22) + conf.colors = False + conf.batch = True + return conf + + def _audit(self, spy, conf=None, exit_expected=False): + if conf is None: + conf = self._conf() + spy.begin() + + out = OutputBuffer() + if exit_expected: + with pytest.raises(SystemExit): + self.audit(out, conf) + else: + ret = self.audit(out, conf) + assert ret != 0 + + out.write() + lines = spy.flush() + + # If the last line is empty, delete it. + if len(lines) > 1 and lines[-1] == '': + del lines[-1] + + return lines + + def test_connection_unresolved(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.gsock.addrinfodata['localhost#22'] = [] + lines = self._audit(output_spy, exit_expected=True) + assert len(lines) == 1 + assert 'has no DNS records' in lines[-1] + + def test_connection_refused(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.errors['connect'] = socket.error(errno.ECONNREFUSED, 'Connection refused') + lines = self._audit(output_spy, exit_expected=True) + assert len(lines) == 1 + assert 'Connection refused' in lines[-1] + + def test_connection_timeout(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.errors['connect'] = socket.timeout('timed out') + lines = self._audit(output_spy, exit_expected=True) + assert len(lines) == 1 + assert 'timed out' in lines[-1] + + def test_recv_empty(self, output_spy, virtual_socket): + lines = self._audit(output_spy) + assert len(lines) == 1 + assert 'did not receive banner' in lines[-1] + + def test_recv_timeout(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(socket.timeout('timed out')) + lines = self._audit(output_spy) + assert len(lines) == 1 + assert 'did not receive banner' in lines[-1] + assert 'timed out' in lines[-1] + + def test_recv_retry_till_timeout(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) + vsocket.rdata.append(socket.error(errno.EWOULDBLOCK, 'Resource temporarily unavailable')) + vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) + vsocket.rdata.append(socket.timeout('timed out')) + lines = self._audit(output_spy) + assert len(lines) == 1 + assert 'did not receive banner' in lines[-1] + assert 'timed out' in lines[-1] + + def test_recv_retry_till_reset(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) + vsocket.rdata.append(socket.error(errno.EWOULDBLOCK, 'Resource temporarily unavailable')) + vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable')) + vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer')) + lines = self._audit(output_spy) + assert len(lines) == 1 + assert 'did not receive banner' in lines[-1] + assert 'reset by peer' in lines[-1] + + def test_connection_closed_before_banner(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer')) + lines = self._audit(output_spy) + assert len(lines) == 1 + assert 'did not receive banner' in lines[-1] + assert 'reset by peer' in lines[-1] + + def test_connection_closed_after_header(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'header line 1\n') + vsocket.rdata.append(b'\n') + vsocket.rdata.append(b'header line 2\n') + vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer')) + lines = self._audit(output_spy) + assert len(lines) == 3 + assert 'did not receive banner' in lines[-1] + assert 'reset by peer' in lines[-1] + + def test_connection_closed_after_banner(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') + vsocket.rdata.append(socket.error(54, 'Connection reset by peer')) + lines = self._audit(output_spy) + assert len(lines) == 2 + assert 'error reading packet' in lines[-1] + assert 'reset by peer' in lines[-1] + + def test_empty_data_after_banner(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') + lines = self._audit(output_spy) + assert len(lines) == 2 + assert 'error reading packet' in lines[-1] + assert 'empty' in lines[-1] + + def test_wrong_data_after_banner(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') + vsocket.rdata.append(b'xxx\n') + lines = self._audit(output_spy) + assert len(lines) == 2 + assert 'error reading packet' in lines[-1] + assert 'xxx' in lines[-1] + + def test_non_ascii_banner(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n') + lines = self._audit(output_spy) + assert len(lines) == 3 + assert 'error reading packet' in lines[-1] + assert 'ASCII' in lines[-2] + assert lines[-3].endswith('SSH-2.0-ssh-audit-test?') + + def test_nonutf8_data_after_banner(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n') + vsocket.rdata.append(b'\x81\xff\n') + lines = self._audit(output_spy) + assert len(lines) == 2 + assert 'error reading packet' in lines[-1] + assert '\\x81\\xff' in lines[-1] + + def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket): + vsocket = virtual_socket + vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n') + vsocket.rdata.append(b'Protocol major versions differ.\n') + conf = self._conf() + conf.ssh1, conf.ssh2 = True, False + lines = self._audit(output_spy, conf) + assert len(lines) == 4 + assert 'error reading packet' in lines[-1] + assert 'major versions differ' in lines[-1] |