1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# August 21 2023, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2023, LabN Consulting, L.L.C.
#
"""A module supporting an object for watching a logfile."""
import asyncio
import logging
import re
from pathlib import Path
class MatchFoundError(Exception):
"""An error raised when a match is not found."""
def __init__(self, watchlog, match):
self.watchlog = watchlog
self.match = match
super().__init__(watchlog, match)
class WatchLog:
"""An object for watching a logfile."""
def __init__(self, path, encoding="utf-8"):
"""Watch a logfile.
Args:
path: that path of the logfile to watch
encoding: the encoding of the logfile
"""
# Immutable
self.path = Path(path)
self.encoding = encoding
# Mutable
self.content = ""
self.last_snap_mark = 0
self.last_user_mark = 0
self.stat = None
if self.path.exists():
self.snapshot()
def _stat_snapshot(self):
ostat = self.stat
if not self.path.exists():
self.stat = None
return ostat is not None
stat = self.path.stat()
self.stat = stat
if ostat is None:
return True
return (
stat.st_mtime_ns != ostat.st_mtime_ns
or stat.st_ctime_ns != ostat.st_ctime_ns
or stat.st_ino != ostat.st_ino
or stat.st_size != ostat.st_size
)
def reset(self):
self.content = ""
self.last_user_mark = 0
self.last_snap_mark = 0
def update_content(self):
ostat = self.stat
osize = ostat.st_size if ostat else 0
oino = ostat.st_ino if ostat else -1
if not self._stat_snapshot():
logging.debug("XXX logfile %s no stat change", self.path)
return ""
nino = self.stat.st_ino
# If the inode changed and we had content previously warn
if oino != -1 and oino != nino and self.content:
logging.warning(
"logfile %s replaced (new inode) resetting content", self.path
)
self.reset()
osize = 0
nsize = self.stat.st_size
if osize > nsize:
logging.warning("logfile %s shrunk resetting content", self.path)
self.reset()
osize = 0
if osize == nsize:
logging.debug(
"XXX watchlog: %s no update, osize == nsize == %s", self.path, osize
)
return ""
# Read non-blocking
with open(self.path, "r", encoding=self.encoding) as f:
if osize:
f.seek(osize)
logging.debug(
"XXX watchlog: %s reading new content from %s to %s",
self.path,
osize,
nsize,
)
newcontent = f.read(nsize - osize)
self.content += newcontent
return newcontent
def raise_if_match_task(self, match):
"""Start an async task that searches for a match.
This doesn't work well with pytest as the task must be awaited for the exception
to propagate.
"""
async def scan_for_match(wl, regex):
while True:
logging.debug("watchlog: %s scan for updating content", wl.path)
wl.update_content()
if m := regex.search(wl.content):
logging.error(
"XXX watchlog: %s regexp FOUND raising exception!", wl.path
)
raise MatchFoundError(wl, m)
await asyncio.sleep(2)
aw = scan_for_match(self, re.compile(match))
return asyncio.create_task(aw)
def from_mark(self, mark=None):
"""Return the file content starting from ``mark``.
If ``mark`` is None then return content since last ``set_mark`` was called.
Args:
mark: the mark in the content to return file content from.
Return:
returns the content between ``mark`` and the end of content.
"""
return self.content[mark:]
def set_mark(self):
"""Set a mark for later use."""
last_mark = self.last_user_mark
self.last_user_mark = len(self.content)
return last_mark
def snapshot(self):
"""Update the file content and return new text.
Returns any new text added since the last snapshot,
also updates the snapshot mark.
Return:
Newly added text.
"""
# Update the content which may reset marks
self.update_content()
last_mark = self.last_snap_mark
self.last_snap_mark = len(self.content)
return self.content[last_mark:]
|