diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:42:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 17:42:34 +0000 |
commit | 271164583de5842c9093ef4e2866196b24dc6109 (patch) | |
tree | c6d060dd47ad362d3e6d3f5311b4398d0b894f4f /test/test_ssh2.py | |
parent | Initial commit. (diff) | |
download | ssh-audit-271164583de5842c9093ef4e2866196b24dc6109.tar.xz ssh-audit-271164583de5842c9093ef4e2866196b24dc6109.zip |
Adding upstream version 2.5.0.upstream/2.5.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/test_ssh2.py')
-rw-r--r-- | test/test_ssh2.py | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/test/test_ssh2.py b/test/test_ssh2.py new file mode 100644 index 0000000..fc4c0fa --- /dev/null +++ b/test/test_ssh2.py @@ -0,0 +1,165 @@ +import os +import struct +import pytest + +from ssh_audit.auditconf import AuditConf +from ssh_audit.outputbuffer import OutputBuffer +from ssh_audit.protocol import Protocol +from ssh_audit.readbuf import ReadBuf +from ssh_audit.ssh2_kex import SSH2_Kex +from ssh_audit.ssh2_kexparty import SSH2_KexParty +from ssh_audit.ssh_audit import audit +from ssh_audit.writebuf import WriteBuf + + +# pylint: disable=line-too-long,attribute-defined-outside-init +class TestSSH2: + @pytest.fixture(autouse=True) + def init(self, ssh_audit): + self.OutputBuffer = OutputBuffer + self.protocol = Protocol + self.ssh2_kex = SSH2_Kex + self.ssh2_kexparty = SSH2_KexParty + self.rbuf = ReadBuf + self.wbuf = WriteBuf + self.audit = audit + self.AuditConf = AuditConf + + def _conf(self): + conf = self.AuditConf('localhost', 22) + conf.colors = False + conf.batch = True + conf.verbose = True + conf.ssh1 = False + conf.ssh2 = True + return conf + + @classmethod + def _create_ssh2_packet(cls, payload): + padding = -(len(payload) + 5) % 8 + if padding < 4: + padding += 8 + plen = len(payload) + padding + 1 + pad_bytes = b'\x00' * padding + data = struct.pack('>Ib', plen, padding) + payload + pad_bytes + return data + + def _kex_payload(self): + w = self.wbuf() + w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff') + w.write_list(['bogus_kex1', 'bogus_kex2']) # We use a bogus kex, otherwise the host key tests will kick off and fail. + w.write_list(['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519']) + w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']) + w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']) + w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']) + w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']) + w.write_list(['none', 'zlib@openssh.com']) + w.write_list(['none', 'zlib@openssh.com']) + w.write_list(['']) + w.write_list(['']) + w.write_byte(False) + w.write_int(0) + return w.write_flush() + + def test_kex_read(self): + kex = self.ssh2_kex.parse(self._kex_payload()) + assert kex is not None + assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff' + assert kex.kex_algorithms == ['bogus_kex1', 'bogus_kex2'] + assert kex.key_algorithms == ['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519'] + assert kex.client is not None + assert kex.server is not None + assert kex.client.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'] + assert kex.server.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'] + assert kex.client.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] + assert kex.server.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'] + assert kex.client.compression == ['none', 'zlib@openssh.com'] + assert kex.server.compression == ['none', 'zlib@openssh.com'] + assert kex.client.languages == [''] + assert kex.server.languages == [''] + assert kex.follows is False + assert kex.unused == 0 + + def _get_empty_kex(self, cookie=None): + kex_algs, key_algs = [], [] + enc, mac, compression, languages = [], [], ['none'], [] + cli = self.ssh2_kexparty(enc, mac, compression, languages) + enc, mac, compression, languages = [], [], ['none'], [] + srv = self.ssh2_kexparty(enc, mac, compression, languages) + if cookie is None: + cookie = os.urandom(16) + kex = self.ssh2_kex(cookie, kex_algs, key_algs, cli, srv, 0) + return kex + + def _get_kex_variat1(self): + cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff' + kex = self._get_empty_kex(cookie) + kex.kex_algorithms.append('bogus_kex1') + kex.kex_algorithms.append('bogus_kex2') + kex.key_algorithms.append('ssh-rsa') + kex.key_algorithms.append('rsa-sha2-512') + kex.key_algorithms.append('rsa-sha2-256') + kex.key_algorithms.append('ssh-ed25519') + kex.server.encryption.append('chacha20-poly1305@openssh.com') + kex.server.encryption.append('aes128-ctr') + kex.server.encryption.append('aes192-ctr') + kex.server.encryption.append('aes256-ctr') + kex.server.encryption.append('aes128-gcm@openssh.com') + kex.server.encryption.append('aes256-gcm@openssh.com') + kex.server.encryption.append('aes128-cbc') + kex.server.encryption.append('aes192-cbc') + kex.server.encryption.append('aes256-cbc') + kex.server.mac.append('umac-64-etm@openssh.com') + kex.server.mac.append('umac-128-etm@openssh.com') + kex.server.mac.append('hmac-sha2-256-etm@openssh.com') + kex.server.mac.append('hmac-sha2-512-etm@openssh.com') + kex.server.mac.append('hmac-sha1-etm@openssh.com') + kex.server.mac.append('umac-64@openssh.com') + kex.server.mac.append('umac-128@openssh.com') + kex.server.mac.append('hmac-sha2-256') + kex.server.mac.append('hmac-sha2-512') + kex.server.mac.append('hmac-sha1') + kex.server.compression.append('zlib@openssh.com') + for a in kex.server.encryption: + kex.client.encryption.append(a) + for a in kex.server.mac: + kex.client.mac.append(a) + for a in kex.server.compression: + if a == 'none': + continue + kex.client.compression.append(a) + return kex + + def test_key_payload(self): + kex1 = self._get_kex_variat1() + kex2 = self.ssh2_kex.parse(self._kex_payload()) + assert kex1.payload == kex2.payload + + def test_ssh2_server_simple(self, output_spy, virtual_socket): + vsocket = virtual_socket + w = self.wbuf() + w.write_byte(self.protocol.MSG_KEXINIT) + w.write(self._kex_payload()) + vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n') + vsocket.rdata.append(self._create_ssh2_packet(w.write_flush())) + output_spy.begin() + out = self.OutputBuffer() + self.audit(out, self._conf()) + out.write() + lines = output_spy.flush() + assert len(lines) == 70 + + def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket): + vsocket = virtual_socket + w = self.wbuf() + w.write_byte(self.protocol.MSG_KEXINIT + 1) + vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n') + vsocket.rdata.append(self._create_ssh2_packet(w.write_flush())) + output_spy.begin() + out = self.OutputBuffer() + ret = self.audit(out, self._conf()) + out.write() + assert ret != 0 + lines = output_spy.flush() + assert len(lines) == 5 + assert 'unknown message' in lines[-1] |