diff options
Diffstat (limited to 'tests/topotests/munet/watchlog.py')
-rw-r--r-- | tests/topotests/munet/watchlog.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/tests/topotests/munet/watchlog.py b/tests/topotests/munet/watchlog.py new file mode 100644 index 0000000..27bc325 --- /dev/null +++ b/tests/topotests/munet/watchlog.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# SPDX-License-Identifier: GPL-2.0-or-later +# +# August 21 2023, Christian Hopps <chopps@labn.net> +# +# Copyright (c) 2023, LabN Consulting, L.L.C. +# +"""A module supporting an object for watching a logfile.""" +import asyncio +import logging +import re + +from pathlib import Path + + +class MatchFoundError(Exception): + """An error raised when a match is not found.""" + def __init__(self, watchlog, match): + self.watchlog = watchlog + self.match = match + super().__init__(watchlog, match) + + +class WatchLog: + """An object for watching a logfile.""" + + def __init__(self, path, encoding="utf-8"): + """Watch a logfile. + + Args: + path: that path of the logfile to watch + encoding: the encoding of the logfile + """ + # Immutable + self.path = Path(path) + self.encoding = encoding + + # Mutable + self.content = "" + self.last_snap_mark = 0 + self.last_user_mark = 0 + self.stat = None + + if self.path.exists(): + self.snapshot() + + def _stat_snapshot(self): + ostat = self.stat + + if not self.path.exists(): + self.stat = None + return ostat is not None + + stat = self.path.stat() + self.stat = stat + + if ostat is None: + return True + + return ( + stat.st_mtime_ns != ostat.st_mtime_ns + or stat.st_ctime_ns != ostat.st_ctime_ns + or stat.st_ino != ostat.st_ino + or stat.st_size != ostat.st_size + ) + + def reset(self): + self.content = "" + self.last_user_mark = 0 + self.last_snap_mark = 0 + + def update_content(self): + ostat = self.stat + osize = ostat.st_size if ostat else 0 + oino = ostat.st_ino if ostat else -1 + if not self._stat_snapshot(): + logging.debug("XXX logfile %s no stat change", self.path) + return "" + + nino = self.stat.st_ino + # If the inode changed and we had content previously warn + if oino != -1 and oino != nino and self.content: + logging.warning( + "logfile %s replaced (new inode) resetting content", self.path + ) + self.reset() + osize = 0 + + nsize = self.stat.st_size + if osize > nsize: + logging.warning("logfile %s shrunk resetting content", self.path) + self.reset() + osize = 0 + + if osize == nsize: + logging.debug( + "XXX watchlog: %s no update, osize == nsize == %s", self.path, osize + ) + return "" + + # Read non-blocking + with open(self.path, "r", encoding=self.encoding) as f: + if osize: + f.seek(osize) + logging.debug( + "XXX watchlog: %s reading new content from %s to %s", + self.path, + osize, + nsize, + ) + newcontent = f.read(nsize - osize) + + self.content += newcontent + return newcontent + + def raise_if_match_task(self, match): + """Start an async task that searches for a match. + + This doesn't work well with pytest as the task must be awaited for the exception + to propagate. + """ + + async def scan_for_match(wl, regex): + while True: + logging.debug("watchlog: %s scan for updating content", wl.path) + wl.update_content() + if m := regex.search(wl.content): + logging.error( + "XXX watchlog: %s regexp FOUND raising exception!", wl.path + ) + raise MatchFoundError(wl, m) + await asyncio.sleep(2) + + aw = scan_for_match(self, re.compile(match)) + return asyncio.create_task(aw) + + def from_mark(self, mark=None): + """Return the file content starting from ``mark``. + + If ``mark`` is None then return content since last ``set_mark`` was called. + + Args: + mark: the mark in the content to return file content from. + + Return: + returns the content between ``mark`` and the end of content. + """ + return self.content[mark:] + + def set_mark(self): + """Set a mark for later use.""" + last_mark = self.last_user_mark + self.last_user_mark = len(self.content) + return last_mark + + def snapshot(self): + """Update the file content and return new text. + + Returns any new text added since the last snapshot, + also updates the snapshot mark. + + Return: + Newly added text. + """ + # Update the content which may reset marks + self.update_content() + + last_mark = self.last_snap_mark + self.last_snap_mark = len(self.content) + return self.content[last_mark:] |