summaryrefslogtreecommitdiffstats
path: root/test/test_errors.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_errors.py')
-rw-r--r--test/test_errors.py172
1 files changed, 172 insertions, 0 deletions
diff --git a/test/test_errors.py b/test/test_errors.py
new file mode 100644
index 0000000..90cfd1c
--- /dev/null
+++ b/test/test_errors.py
@@ -0,0 +1,172 @@
+import socket
+import errno
+import pytest
+
+from ssh_audit.outputbuffer import OutputBuffer
+
+
+# pylint: disable=attribute-defined-outside-init
+class TestErrors:
+ @pytest.fixture(autouse=True)
+ def init(self, ssh_audit):
+ self.AuditConf = ssh_audit.AuditConf
+ self.OutputBuffer = ssh_audit.OutputBuffer
+ self.audit = ssh_audit.audit
+
+ def _conf(self):
+ conf = self.AuditConf('localhost', 22)
+ conf.colors = False
+ conf.batch = True
+ return conf
+
+ def _audit(self, spy, conf=None, exit_expected=False):
+ if conf is None:
+ conf = self._conf()
+ spy.begin()
+
+ out = OutputBuffer()
+ if exit_expected:
+ with pytest.raises(SystemExit):
+ self.audit(out, conf)
+ else:
+ ret = self.audit(out, conf)
+ assert ret != 0
+
+ out.write()
+ lines = spy.flush()
+
+ # If the last line is empty, delete it.
+ if len(lines) > 1 and lines[-1] == '':
+ del lines[-1]
+
+ return lines
+
+ def test_connection_unresolved(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.gsock.addrinfodata['localhost#22'] = []
+ lines = self._audit(output_spy, exit_expected=True)
+ assert len(lines) == 1
+ assert 'has no DNS records' in lines[-1]
+
+ def test_connection_refused(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.errors['connect'] = socket.error(errno.ECONNREFUSED, 'Connection refused')
+ lines = self._audit(output_spy, exit_expected=True)
+ assert len(lines) == 1
+ assert 'Connection refused' in lines[-1]
+
+ def test_connection_timeout(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.errors['connect'] = socket.timeout('timed out')
+ lines = self._audit(output_spy, exit_expected=True)
+ assert len(lines) == 1
+ assert 'timed out' in lines[-1]
+
+ def test_recv_empty(self, output_spy, virtual_socket):
+ lines = self._audit(output_spy)
+ assert len(lines) == 1
+ assert 'did not receive banner' in lines[-1]
+
+ def test_recv_timeout(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(socket.timeout('timed out'))
+ lines = self._audit(output_spy)
+ assert len(lines) == 1
+ assert 'did not receive banner' in lines[-1]
+ assert 'timed out' in lines[-1]
+
+ def test_recv_retry_till_timeout(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
+ vsocket.rdata.append(socket.error(errno.EWOULDBLOCK, 'Resource temporarily unavailable'))
+ vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
+ vsocket.rdata.append(socket.timeout('timed out'))
+ lines = self._audit(output_spy)
+ assert len(lines) == 1
+ assert 'did not receive banner' in lines[-1]
+ assert 'timed out' in lines[-1]
+
+ def test_recv_retry_till_reset(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
+ vsocket.rdata.append(socket.error(errno.EWOULDBLOCK, 'Resource temporarily unavailable'))
+ vsocket.rdata.append(socket.error(errno.EAGAIN, 'Resource temporarily unavailable'))
+ vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
+ lines = self._audit(output_spy)
+ assert len(lines) == 1
+ assert 'did not receive banner' in lines[-1]
+ assert 'reset by peer' in lines[-1]
+
+ def test_connection_closed_before_banner(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
+ lines = self._audit(output_spy)
+ assert len(lines) == 1
+ assert 'did not receive banner' in lines[-1]
+ assert 'reset by peer' in lines[-1]
+
+ def test_connection_closed_after_header(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'header line 1\n')
+ vsocket.rdata.append(b'\n')
+ vsocket.rdata.append(b'header line 2\n')
+ vsocket.rdata.append(socket.error(errno.ECONNRESET, 'Connection reset by peer'))
+ lines = self._audit(output_spy)
+ assert len(lines) == 3
+ assert 'did not receive banner' in lines[-1]
+ assert 'reset by peer' in lines[-1]
+
+ def test_connection_closed_after_banner(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
+ vsocket.rdata.append(socket.error(54, 'Connection reset by peer'))
+ lines = self._audit(output_spy)
+ assert len(lines) == 2
+ assert 'error reading packet' in lines[-1]
+ assert 'reset by peer' in lines[-1]
+
+ def test_empty_data_after_banner(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
+ lines = self._audit(output_spy)
+ assert len(lines) == 2
+ assert 'error reading packet' in lines[-1]
+ assert 'empty' in lines[-1]
+
+ def test_wrong_data_after_banner(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
+ vsocket.rdata.append(b'xxx\n')
+ lines = self._audit(output_spy)
+ assert len(lines) == 2
+ assert 'error reading packet' in lines[-1]
+ assert 'xxx' in lines[-1]
+
+ def test_non_ascii_banner(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\xc3\xbc\r\n')
+ lines = self._audit(output_spy)
+ assert len(lines) == 3
+ assert 'error reading packet' in lines[-1]
+ assert 'ASCII' in lines[-2]
+ assert lines[-3].endswith('SSH-2.0-ssh-audit-test?')
+
+ def test_nonutf8_data_after_banner(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'SSH-2.0-ssh-audit-test\r\n')
+ vsocket.rdata.append(b'\x81\xff\n')
+ lines = self._audit(output_spy)
+ assert len(lines) == 2
+ assert 'error reading packet' in lines[-1]
+ assert '\\x81\\xff' in lines[-1]
+
+ def test_protocol_mismatch_by_conf(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ vsocket.rdata.append(b'SSH-1.3-ssh-audit-test\r\n')
+ vsocket.rdata.append(b'Protocol major versions differ.\n')
+ conf = self._conf()
+ conf.ssh1, conf.ssh2 = True, False
+ lines = self._audit(output_spy, conf)
+ assert len(lines) == 4
+ assert 'error reading packet' in lines[-1]
+ assert 'major versions differ' in lines[-1]