summaryrefslogtreecommitdiffstats
path: root/tests/topotests/munet/watchlog.py
blob: 27bc3251a679f04df9de743dfa6775cef7dd4f16 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# -*- coding: utf-8 eval: (blacken-mode 1) -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# August 21 2023, Christian Hopps <chopps@labn.net>
#
# Copyright (c) 2023, LabN Consulting, L.L.C.
#
"""A module supporting an object for watching a logfile."""
import asyncio
import logging
import re

from pathlib import Path


class MatchFoundError(Exception):
    """An error raised when a match is not found."""
    def __init__(self, watchlog, match):
        self.watchlog = watchlog
        self.match = match
        super().__init__(watchlog, match)


class WatchLog:
    """An object for watching a logfile."""

    def __init__(self, path, encoding="utf-8"):
        """Watch a logfile.

        Args:
            path: that path of the logfile to watch
            encoding: the encoding of the logfile
        """
        # Immutable
        self.path = Path(path)
        self.encoding = encoding

        # Mutable
        self.content = ""
        self.last_snap_mark = 0
        self.last_user_mark = 0
        self.stat = None

        if self.path.exists():
            self.snapshot()

    def _stat_snapshot(self):
        ostat = self.stat

        if not self.path.exists():
            self.stat = None
            return ostat is not None

        stat = self.path.stat()
        self.stat = stat

        if ostat is None:
            return True

        return (
            stat.st_mtime_ns != ostat.st_mtime_ns
            or stat.st_ctime_ns != ostat.st_ctime_ns
            or stat.st_ino != ostat.st_ino
            or stat.st_size != ostat.st_size
        )

    def reset(self):
        self.content = ""
        self.last_user_mark = 0
        self.last_snap_mark = 0

    def update_content(self):
        ostat = self.stat
        osize = ostat.st_size if ostat else 0
        oino = ostat.st_ino if ostat else -1
        if not self._stat_snapshot():
            logging.debug("XXX logfile %s no stat change", self.path)
            return ""

        nino = self.stat.st_ino
        # If the inode changed and we had content previously warn
        if oino != -1 and oino != nino and self.content:
            logging.warning(
                "logfile %s replaced (new inode) resetting content", self.path
            )
            self.reset()
            osize = 0

        nsize = self.stat.st_size
        if osize > nsize:
            logging.warning("logfile %s shrunk resetting content", self.path)
            self.reset()
            osize = 0

        if osize == nsize:
            logging.debug(
                "XXX watchlog: %s no update, osize == nsize == %s", self.path, osize
            )
            return ""

        # Read non-blocking
        with open(self.path, "r", encoding=self.encoding) as f:
            if osize:
                f.seek(osize)
            logging.debug(
                "XXX watchlog: %s reading new content from %s to %s",
                self.path,
                osize,
                nsize,
            )
            newcontent = f.read(nsize - osize)

        self.content += newcontent
        return newcontent

    def raise_if_match_task(self, match):
        """Start an async task that searches for a match.

        This doesn't work well with pytest as the task must be awaited for the exception
        to propagate.
        """

        async def scan_for_match(wl, regex):
            while True:
                logging.debug("watchlog: %s scan for updating content", wl.path)
                wl.update_content()
                if m := regex.search(wl.content):
                    logging.error(
                        "XXX watchlog: %s regexp FOUND raising exception!", wl.path
                    )
                    raise MatchFoundError(wl, m)
                await asyncio.sleep(2)

        aw = scan_for_match(self, re.compile(match))
        return asyncio.create_task(aw)

    def from_mark(self, mark=None):
        """Return the file content starting from ``mark``.

        If ``mark`` is None then return content since last ``set_mark`` was called.

        Args:
            mark: the mark in the content to return file content from.

        Return:
            returns the content between ``mark`` and the end of content.
        """
        return self.content[mark:]

    def set_mark(self):
        """Set a mark for later use."""
        last_mark = self.last_user_mark
        self.last_user_mark = len(self.content)
        return last_mark

    def snapshot(self):
        """Update the file content and return new text.

        Returns any new text added since the last snapshot,
        also updates the snapshot mark.

        Return:
            Newly added text.
        """
        # Update the content which may reset marks
        self.update_content()

        last_mark = self.last_snap_mark
        self.last_snap_mark = len(self.content)
        return self.content[last_mark:]