summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozlog/mozlog/handlers/bufferhandler.py
blob: 5fcdcf967a2082f3af757eef8b8de41ba17d5727 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from .base import BaseHandler


class BufferHandler(BaseHandler):
    """Handler that maintains a circular buffer of messages based on the
    size and actions specified by a user.

    :param inner: The underlying handler used to emit messages.
    :param message_limit: The maximum number of messages to retain for
                          context. If None, the buffer will grow without limit.
    :param buffered_actions: The set of actions to include in the buffer
                             rather than log directly.
    """

    def __init__(self, inner, message_limit=100, buffered_actions=None):
        BaseHandler.__init__(self, inner)
        self.inner = inner
        self.message_limit = message_limit
        if buffered_actions is None:
            buffered_actions = ["log", "test_status"]
        self.buffered_actions = set(buffered_actions)
        self._buffering = True

        if self.message_limit is not None:
            self._buffer = [None] * self.message_limit
            self._buffer_pos = 0
        else:
            self._buffer = []

        self.message_handler.register_message_handlers(
            "buffer",
            {
                "on": self._enable_buffering,
                "off": self._disable_buffering,
                "flush": self._flush_buffered,
                "clear": self._clear_buffer,
            },
        )

    def __call__(self, data):
        action = data["action"]
        if "bypass_mozlog_buffer" in data:
            data.pop("bypass_mozlog_buffer")
            self.inner(data)
            return
        if not self._buffering or action not in self.buffered_actions:
            self.inner(data)
            return

        self._add_message(data)

    def _add_message(self, data):
        if self.message_limit is None:
            self._buffer.append(data)
        else:
            self._buffer[self._buffer_pos] = data
            self._buffer_pos = (self._buffer_pos + 1) % self.message_limit

    def _enable_buffering(self):
        self._buffering = True

    def _disable_buffering(self):
        self._buffering = False

    def _clear_buffer(self):
        """Clear the buffer of unwanted messages."""
        current_size = len([m for m in self._buffer if m is not None])
        if self.message_limit is not None:
            self._buffer = [None] * self.message_limit
        else:
            self._buffer = []
        return current_size

    def _flush_buffered(self):
        """Logs the contents of the current buffer"""
        for msg in self._buffer[self._buffer_pos :]:
            if msg is not None:
                self.inner(msg)
        for msg in self._buffer[: self._buffer_pos]:
            if msg is not None:
                self.inner(msg)
        return self._clear_buffer()