summaryrefslogtreecommitdiffstats
path: root/examples/tests/ngtcp2test/log.py
blob: 9e8f39945ddb6eaae0bd9442bf6157041d2629cc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import binascii
import os
import re
import sys
import time
from datetime import timedelta, datetime
from io import SEEK_END
from typing import List


class LogFile:

    def __init__(self, path: str):
        self._path = path
        self._start_pos = 0
        self._last_pos = self._start_pos

    @property
    def path(self) -> str:
        return self._path

    def reset(self):
        self._start_pos = 0
        self._last_pos = self._start_pos

    def advance(self) -> None:
        if os.path.isfile(self._path):
            with open(self._path) as fd:
                self._start_pos = fd.seek(0, SEEK_END)

    def get_recent(self, advance=True) -> List[str]:
        lines = []
        if os.path.isfile(self._path):
            with open(self._path) as fd:
                fd.seek(self._last_pos, os.SEEK_SET)
                for line in fd:
                    lines.append(line)
                if advance:
                    self._last_pos = fd.tell()
        return lines

    def scan_recent(self, pattern: re, timeout=10) -> bool:
        if not os.path.isfile(self.path):
            return False
        with open(self.path) as fd:
            end = datetime.now() + timedelta(seconds=timeout)
            while True:
                fd.seek(self._last_pos, os.SEEK_SET)
                for line in fd:
                    if pattern.match(line):
                        return True
                if datetime.now() > end:
                    raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
                time.sleep(.1)
        return False


class HexDumpScanner:

    def __init__(self, source, leading_regex=None):
        self._source = source
        self._leading_regex = leading_regex

    def __iter__(self):
        data = b''
        offset = 0 if self._leading_regex is None else -1
        idx = 0
        for l in self._source:
            if offset == -1:
                pass
            elif offset == 0:
                # possible start of a hex dump
                m = re.match(r'^\s*0+(\s+-)?((\s+[0-9a-f]{2}){1,16})(\s+.*)$',
                             l, re.IGNORECASE)
                if m:
                    data = binascii.unhexlify(re.sub(r'\s+', '', m.group(2)))
                    offset = 16
                    idx = 1
                    continue
            else:
                # possible continuation of a hexdump
                m = re.match(r'^\s*([0-9a-f]+)(\s+-)?((\s+[0-9a-f]{2}){1,16})'
                             r'(\s+.*)$', l, re.IGNORECASE)
                if m:
                    loffset = int(m.group(1), 16)
                    if loffset == offset or loffset == idx:
                        data += binascii.unhexlify(re.sub(r'\s+', '',
                                                          m.group(3)))
                        offset += 16
                        idx += 1
                        continue
                    else:
                        sys.stderr.write(f'wrong offset {loffset}, expected {offset} or {idx}\n')
            # not a hexdump line, produce any collected data
            if len(data) > 0:
                yield data
                data = b''
            offset = 0 if self._leading_regex is None \
                or self._leading_regex.match(l) else -1
        if len(data) > 0:
            yield data