summaryrefslogtreecommitdiffstats
path: root/test/test_buffer.py
blob: 499dfd96d013f3a67334ed5f0ef5f13c2e0ce6af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import re
import pytest

from ssh_audit.readbuf import ReadBuf
from ssh_audit.writebuf import WriteBuf


# pylint: disable=attribute-defined-outside-init,bad-whitespace
class TestBuffer:
    @pytest.fixture(autouse=True)
    def init(self, ssh_audit):
        self.rbuf = ReadBuf
        self.wbuf = WriteBuf
        self.utf8rchar = b'\xef\xbf\xbd'

    @classmethod
    def _b(cls, v):
        v = re.sub(r'\s', '', v)
        data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)]
        return bytes(bytearray(data))

    def test_unread(self):
        w = self.wbuf().write_byte(1).write_int(2).write_flush()
        r = self.rbuf(w)
        assert r.unread_len == 5
        r.read_byte()
        assert r.unread_len == 4
        r.read_int()
        assert r.unread_len == 0

    def test_byte(self):
        w = lambda x: self.wbuf().write_byte(x).write_flush()  # noqa
        r = lambda x: self.rbuf(x).read_byte()  # noqa
        tc = [(0x00, '00'),
              (0x01, '01'),
              (0x10, '10'),
              (0xff, 'ff')]
        for p in tc:
            assert w(p[0]) == self._b(p[1])
            assert r(self._b(p[1])) == p[0]

    def test_bool(self):
        w = lambda x: self.wbuf().write_bool(x).write_flush()  # noqa
        r = lambda x: self.rbuf(x).read_bool()  # noqa
        tc = [(True,  '01'),
              (False, '00')]
        for p in tc:
            assert w(p[0]) == self._b(p[1])
            assert r(self._b(p[1])) == p[0]

    def test_int(self):
        w = lambda x: self.wbuf().write_int(x).write_flush()  # noqa
        r = lambda x: self.rbuf(x).read_int()  # noqa
        tc = [(0x00,       '00 00 00 00'),
              (0x01,       '00 00 00 01'),
              (0xabcd,     '00 00 ab cd'),
              (0xffffffff, 'ff ff ff ff')]
        for p in tc:
            assert w(p[0]) == self._b(p[1])
            assert r(self._b(p[1])) == p[0]

    def test_string(self):
        w = lambda x: self.wbuf().write_string(x).write_flush()  # noqa
        r = lambda x: self.rbuf(x).read_string()  # noqa
        tc = [('abc1',  '00 00 00 04 61 62 63 31'),
              (b'abc2',  '00 00 00 04 61 62 63 32')]
        for p in tc:
            v = p[0]
            assert w(v) == self._b(p[1])
            if not isinstance(v, bytes):
                v = bytes(bytearray(v, 'utf-8'))
            assert r(self._b(p[1])) == v

    def test_list(self):
        w = lambda x: self.wbuf().write_list(x).write_flush()  # noqa
        r = lambda x: self.rbuf(x).read_list()  # noqa
        tc = [(['d', 'ef', 'ault'], '00 00 00 09 64 2c 65 66 2c 61 75 6c 74')]
        for p in tc:
            assert w(p[0]) == self._b(p[1])
            assert r(self._b(p[1])) == p[0]

    def test_list_nonutf8(self):
        r = lambda x: self.rbuf(x).read_list()  # noqa
        src = self._b('00 00 00 04 de ad be ef')
        dst = [(b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')]
        assert r(src) == dst

    def test_line(self):
        w = lambda x: self.wbuf().write_line(x).write_flush()  # noqa
        r = lambda x: self.rbuf(x).read_line()  # noqa
        tc = [('example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')]
        for p in tc:
            assert w(p[0]) == self._b(p[1])
            assert r(self._b(p[1])) == p[0]

    def test_line_nonutf8(self):
        r = lambda x: self.rbuf(x).read_line()  # noqa
        src = self._b('de ad be af')
        dst = (b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')
        assert r(src) == dst

    def test_bitlen(self):
        # pylint: disable=protected-access
        class Py26Int(int):
            def bit_length(self):
                raise AttributeError
        assert self.wbuf._bitlength(42) == 6
        assert self.wbuf._bitlength(Py26Int(42)) == 6

    def test_mpint1(self):
        mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush()  # noqa
        mpint1r = lambda x: self.rbuf(x).read_mpint1()  # noqa
        tc = [(0x0,     '00 00'),
              (0x1234,  '00 0d 12 34'),
              (0x12345, '00 11 01 23 45'),
              (0xdeadbeef, '00 20 de ad be ef')]
        for p in tc:
            assert mpint1w(p[0]) == self._b(p[1])
            assert mpint1r(self._b(p[1])) == p[0]

    def test_mpint2(self):
        mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush()  # noqa
        mpint2r = lambda x: self.rbuf(x).read_mpint2()  # noqa
        tc = [(0x0,               '00 00 00 00'),
              (0x80,              '00 00 00 02 00 80'),
              (0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'),
              (-0x1234,           '00 00 00 02 ed cc'),
              (-0xdeadbeef,       '00 00 00 05 ff 21 52 41 11'),
              (-0x8000,           '00 00 00 02 80 00'),
              (-0x80,             '00 00 00 01 80')]
        for p in tc:
            assert mpint2w(p[0]) == self._b(p[1])
            assert mpint2r(self._b(p[1])) == p[0]
        assert mpint2r(self._b('00 00 00 02 ff 80')) == -0x80

    def test_reset(self):
        w = self.wbuf()
        w.write_int(7)
        w.write_int(13)
        assert len(w.write_flush()) == 8

        w.write_int(7)
        w.write_int(13)
        w.reset()
        assert len(w.write_flush()) == 0