summaryrefslogtreecommitdiffstats
path: root/test/test_ssh2.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:42:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 17:42:34 +0000
commit271164583de5842c9093ef4e2866196b24dc6109 (patch)
treec6d060dd47ad362d3e6d3f5311b4398d0b894f4f /test/test_ssh2.py
parentInitial commit. (diff)
downloadssh-audit-271164583de5842c9093ef4e2866196b24dc6109.tar.xz
ssh-audit-271164583de5842c9093ef4e2866196b24dc6109.zip
Adding upstream version 2.5.0.upstream/2.5.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/test_ssh2.py')
-rw-r--r--test/test_ssh2.py165
1 files changed, 165 insertions, 0 deletions
diff --git a/test/test_ssh2.py b/test/test_ssh2.py
new file mode 100644
index 0000000..fc4c0fa
--- /dev/null
+++ b/test/test_ssh2.py
@@ -0,0 +1,165 @@
+import os
+import struct
+import pytest
+
+from ssh_audit.auditconf import AuditConf
+from ssh_audit.outputbuffer import OutputBuffer
+from ssh_audit.protocol import Protocol
+from ssh_audit.readbuf import ReadBuf
+from ssh_audit.ssh2_kex import SSH2_Kex
+from ssh_audit.ssh2_kexparty import SSH2_KexParty
+from ssh_audit.ssh_audit import audit
+from ssh_audit.writebuf import WriteBuf
+
+
+# pylint: disable=line-too-long,attribute-defined-outside-init
+class TestSSH2:
+ @pytest.fixture(autouse=True)
+ def init(self, ssh_audit):
+ self.OutputBuffer = OutputBuffer
+ self.protocol = Protocol
+ self.ssh2_kex = SSH2_Kex
+ self.ssh2_kexparty = SSH2_KexParty
+ self.rbuf = ReadBuf
+ self.wbuf = WriteBuf
+ self.audit = audit
+ self.AuditConf = AuditConf
+
+ def _conf(self):
+ conf = self.AuditConf('localhost', 22)
+ conf.colors = False
+ conf.batch = True
+ conf.verbose = True
+ conf.ssh1 = False
+ conf.ssh2 = True
+ return conf
+
+ @classmethod
+ def _create_ssh2_packet(cls, payload):
+ padding = -(len(payload) + 5) % 8
+ if padding < 4:
+ padding += 8
+ plen = len(payload) + padding + 1
+ pad_bytes = b'\x00' * padding
+ data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
+ return data
+
+ def _kex_payload(self):
+ w = self.wbuf()
+ w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
+ w.write_list(['bogus_kex1', 'bogus_kex2']) # We use a bogus kex, otherwise the host key tests will kick off and fail.
+ w.write_list(['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519'])
+ w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'])
+ w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'])
+ w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'])
+ w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'])
+ w.write_list(['none', 'zlib@openssh.com'])
+ w.write_list(['none', 'zlib@openssh.com'])
+ w.write_list([''])
+ w.write_list([''])
+ w.write_byte(False)
+ w.write_int(0)
+ return w.write_flush()
+
+ def test_kex_read(self):
+ kex = self.ssh2_kex.parse(self._kex_payload())
+ assert kex is not None
+ assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
+ assert kex.kex_algorithms == ['bogus_kex1', 'bogus_kex2']
+ assert kex.key_algorithms == ['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519']
+ assert kex.client is not None
+ assert kex.server is not None
+ assert kex.client.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']
+ assert kex.server.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']
+ assert kex.client.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']
+ assert kex.server.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']
+ assert kex.client.compression == ['none', 'zlib@openssh.com']
+ assert kex.server.compression == ['none', 'zlib@openssh.com']
+ assert kex.client.languages == ['']
+ assert kex.server.languages == ['']
+ assert kex.follows is False
+ assert kex.unused == 0
+
+ def _get_empty_kex(self, cookie=None):
+ kex_algs, key_algs = [], []
+ enc, mac, compression, languages = [], [], ['none'], []
+ cli = self.ssh2_kexparty(enc, mac, compression, languages)
+ enc, mac, compression, languages = [], [], ['none'], []
+ srv = self.ssh2_kexparty(enc, mac, compression, languages)
+ if cookie is None:
+ cookie = os.urandom(16)
+ kex = self.ssh2_kex(cookie, kex_algs, key_algs, cli, srv, 0)
+ return kex
+
+ def _get_kex_variat1(self):
+ cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
+ kex = self._get_empty_kex(cookie)
+ kex.kex_algorithms.append('bogus_kex1')
+ kex.kex_algorithms.append('bogus_kex2')
+ kex.key_algorithms.append('ssh-rsa')
+ kex.key_algorithms.append('rsa-sha2-512')
+ kex.key_algorithms.append('rsa-sha2-256')
+ kex.key_algorithms.append('ssh-ed25519')
+ kex.server.encryption.append('chacha20-poly1305@openssh.com')
+ kex.server.encryption.append('aes128-ctr')
+ kex.server.encryption.append('aes192-ctr')
+ kex.server.encryption.append('aes256-ctr')
+ kex.server.encryption.append('aes128-gcm@openssh.com')
+ kex.server.encryption.append('aes256-gcm@openssh.com')
+ kex.server.encryption.append('aes128-cbc')
+ kex.server.encryption.append('aes192-cbc')
+ kex.server.encryption.append('aes256-cbc')
+ kex.server.mac.append('umac-64-etm@openssh.com')
+ kex.server.mac.append('umac-128-etm@openssh.com')
+ kex.server.mac.append('hmac-sha2-256-etm@openssh.com')
+ kex.server.mac.append('hmac-sha2-512-etm@openssh.com')
+ kex.server.mac.append('hmac-sha1-etm@openssh.com')
+ kex.server.mac.append('umac-64@openssh.com')
+ kex.server.mac.append('umac-128@openssh.com')
+ kex.server.mac.append('hmac-sha2-256')
+ kex.server.mac.append('hmac-sha2-512')
+ kex.server.mac.append('hmac-sha1')
+ kex.server.compression.append('zlib@openssh.com')
+ for a in kex.server.encryption:
+ kex.client.encryption.append(a)
+ for a in kex.server.mac:
+ kex.client.mac.append(a)
+ for a in kex.server.compression:
+ if a == 'none':
+ continue
+ kex.client.compression.append(a)
+ return kex
+
+ def test_key_payload(self):
+ kex1 = self._get_kex_variat1()
+ kex2 = self.ssh2_kex.parse(self._kex_payload())
+ assert kex1.payload == kex2.payload
+
+ def test_ssh2_server_simple(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ w = self.wbuf()
+ w.write_byte(self.protocol.MSG_KEXINIT)
+ w.write(self._kex_payload())
+ vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
+ vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
+ output_spy.begin()
+ out = self.OutputBuffer()
+ self.audit(out, self._conf())
+ out.write()
+ lines = output_spy.flush()
+ assert len(lines) == 70
+
+ def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket):
+ vsocket = virtual_socket
+ w = self.wbuf()
+ w.write_byte(self.protocol.MSG_KEXINIT + 1)
+ vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
+ vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
+ output_spy.begin()
+ out = self.OutputBuffer()
+ ret = self.audit(out, self._conf())
+ out.write()
+ assert ret != 0
+ lines = output_spy.flush()
+ assert len(lines) == 5
+ assert 'unknown message' in lines[-1]