1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
import re
import pytest
from ssh_audit.readbuf import ReadBuf
from ssh_audit.writebuf import WriteBuf
# pylint: disable=attribute-defined-outside-init,bad-whitespace
class TestBuffer:
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.rbuf = ReadBuf
self.wbuf = WriteBuf
self.utf8rchar = b'\xef\xbf\xbd'
@classmethod
def _b(cls, v):
v = re.sub(r'\s', '', v)
data = [int(v[i * 2:i * 2 + 2], 16) for i in range(len(v) // 2)]
return bytes(bytearray(data))
def test_unread(self):
w = self.wbuf().write_byte(1).write_int(2).write_flush()
r = self.rbuf(w)
assert r.unread_len == 5
r.read_byte()
assert r.unread_len == 4
r.read_int()
assert r.unread_len == 0
def test_byte(self):
w = lambda x: self.wbuf().write_byte(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_byte() # noqa
tc = [(0x00, '00'),
(0x01, '01'),
(0x10, '10'),
(0xff, 'ff')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_bool(self):
w = lambda x: self.wbuf().write_bool(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_bool() # noqa
tc = [(True, '01'),
(False, '00')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_int(self):
w = lambda x: self.wbuf().write_int(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_int() # noqa
tc = [(0x00, '00 00 00 00'),
(0x01, '00 00 00 01'),
(0xabcd, '00 00 ab cd'),
(0xffffffff, 'ff ff ff ff')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_string(self):
w = lambda x: self.wbuf().write_string(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_string() # noqa
tc = [('abc1', '00 00 00 04 61 62 63 31'),
(b'abc2', '00 00 00 04 61 62 63 32')]
for p in tc:
v = p[0]
assert w(v) == self._b(p[1])
if not isinstance(v, bytes):
v = bytes(bytearray(v, 'utf-8'))
assert r(self._b(p[1])) == v
def test_list(self):
w = lambda x: self.wbuf().write_list(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_list() # noqa
tc = [(['d', 'ef', 'ault'], '00 00 00 09 64 2c 65 66 2c 61 75 6c 74')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_list_nonutf8(self):
r = lambda x: self.rbuf(x).read_list() # noqa
src = self._b('00 00 00 04 de ad be ef')
dst = [(b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')]
assert r(src) == dst
def test_line(self):
w = lambda x: self.wbuf().write_line(x).write_flush() # noqa
r = lambda x: self.rbuf(x).read_line() # noqa
tc = [('example line', '65 78 61 6d 70 6c 65 20 6c 69 6e 65 0d 0a')]
for p in tc:
assert w(p[0]) == self._b(p[1])
assert r(self._b(p[1])) == p[0]
def test_line_nonutf8(self):
r = lambda x: self.rbuf(x).read_line() # noqa
src = self._b('de ad be af')
dst = (b'\xde\xad' + self.utf8rchar + self.utf8rchar).decode('utf-8')
assert r(src) == dst
def test_bitlen(self):
# pylint: disable=protected-access
class Py26Int(int):
def bit_length(self):
raise AttributeError
assert self.wbuf._bitlength(42) == 6
assert self.wbuf._bitlength(Py26Int(42)) == 6
def test_mpint1(self):
mpint1w = lambda x: self.wbuf().write_mpint1(x).write_flush() # noqa
mpint1r = lambda x: self.rbuf(x).read_mpint1() # noqa
tc = [(0x0, '00 00'),
(0x1234, '00 0d 12 34'),
(0x12345, '00 11 01 23 45'),
(0xdeadbeef, '00 20 de ad be ef')]
for p in tc:
assert mpint1w(p[0]) == self._b(p[1])
assert mpint1r(self._b(p[1])) == p[0]
def test_mpint2(self):
mpint2w = lambda x: self.wbuf().write_mpint2(x).write_flush() # noqa
mpint2r = lambda x: self.rbuf(x).read_mpint2() # noqa
tc = [(0x0, '00 00 00 00'),
(0x80, '00 00 00 02 00 80'),
(0x9a378f9b2e332a7, '00 00 00 08 09 a3 78 f9 b2 e3 32 a7'),
(-0x1234, '00 00 00 02 ed cc'),
(-0xdeadbeef, '00 00 00 05 ff 21 52 41 11'),
(-0x8000, '00 00 00 02 80 00'),
(-0x80, '00 00 00 01 80')]
for p in tc:
assert mpint2w(p[0]) == self._b(p[1])
assert mpint2r(self._b(p[1])) == p[0]
assert mpint2r(self._b('00 00 00 02 ff 80')) == -0x80
def test_reset(self):
w = self.wbuf()
w.write_int(7)
w.write_int(13)
assert len(w.write_flush()) == 8
w.write_int(7)
w.write_int(13)
w.reset()
assert len(w.write_flush()) == 0
|