1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
import binascii
import os
import re
import sys
import time
from datetime import timedelta, datetime
from io import SEEK_END
from typing import List
class LogFile:
def __init__(self, path: str):
self._path = path
self._start_pos = 0
self._last_pos = self._start_pos
@property
def path(self) -> str:
return self._path
def reset(self):
self._start_pos = 0
self._last_pos = self._start_pos
def advance(self) -> None:
if os.path.isfile(self._path):
with open(self._path) as fd:
self._start_pos = fd.seek(0, SEEK_END)
def get_recent(self, advance=True) -> List[str]:
lines = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._last_pos, os.SEEK_SET)
for line in fd:
lines.append(line)
if advance:
self._last_pos = fd.tell()
return lines
def scan_recent(self, pattern: re, timeout=10) -> bool:
if not os.path.isfile(self.path):
return False
with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout)
while True:
fd.seek(self._last_pos, os.SEEK_SET)
for line in fd:
if pattern.match(line):
return True
if datetime.now() > end:
raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
time.sleep(.1)
return False
class HexDumpScanner:
def __init__(self, source, leading_regex=None):
self._source = source
self._leading_regex = leading_regex
def __iter__(self):
data = b''
offset = 0 if self._leading_regex is None else -1
idx = 0
for l in self._source:
if offset == -1:
pass
elif offset == 0:
# possible start of a hex dump
m = re.match(r'^\s*0+(\s+-)?((\s+[0-9a-f]{2}){1,16})(\s+.*)$',
l, re.IGNORECASE)
if m:
data = binascii.unhexlify(re.sub(r'\s+', '', m.group(2)))
offset = 16
idx = 1
continue
else:
# possible continuation of a hexdump
m = re.match(r'^\s*([0-9a-f]+)(\s+-)?((\s+[0-9a-f]{2}){1,16})'
r'(\s+.*)$', l, re.IGNORECASE)
if m:
loffset = int(m.group(1), 16)
if loffset == offset or loffset == idx:
data += binascii.unhexlify(re.sub(r'\s+', '',
m.group(3)))
offset += 16
idx += 1
continue
else:
sys.stderr.write(f'wrong offset {loffset}, expected {offset} or {idx}\n')
# not a hexdump line, produce any collected data
if len(data) > 0:
yield data
data = b''
offset = 0 if self._leading_regex is None \
or self._leading_regex.match(l) else -1
if len(data) > 0:
yield data
|