summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/controllers/logs.py
blob: 133c33477b585464eb206efe0071ef2420182bfd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# -*- coding: utf-8 -*-

import collections

from ..security import Scope
from ..services.ceph_service import CephService
from ..tools import NotificationQueue
from . import APIDoc, APIRouter, BaseController, Endpoint, EndpointDoc, ReadPermission

LOG_BUFFER_SIZE = 30

LOGS_SCHEMA = {
    "clog": ([str], ""),
    "audit_log": ([{
        "name": (str, ""),
        "rank": (str, ""),
        "addrs": ({
            "addrvec": ([{
                "type": (str, ""),
                "addr": (str, "IP Address"),
                "nonce": (int, ""),
            }], ""),
        }, ""),
        "stamp": (str, ""),
        "seq": (int, ""),
        "channel": (str, ""),
        "priority": (str, ""),
        "message": (str, ""),
    }], "Audit log")
}


@APIRouter('/logs', Scope.LOG)
@APIDoc("Logs Management API", "Logs")
class Logs(BaseController):
    def __init__(self):
        super().__init__()
        self._log_initialized = False
        self.log_buffer = collections.deque(maxlen=LOG_BUFFER_SIZE)
        self.audit_buffer = collections.deque(maxlen=LOG_BUFFER_SIZE)

    def append_log(self, log_struct):
        if log_struct['channel'] == 'audit':
            self.audit_buffer.appendleft(log_struct)
        else:
            self.log_buffer.appendleft(log_struct)

    def load_buffer(self, buf, channel_name):
        lines = CephService.send_command(
            'mon', 'log last', channel=channel_name, num=LOG_BUFFER_SIZE, level='debug')
        for line in lines:
            buf.appendleft(line)

    def initialize_buffers(self):
        if not self._log_initialized:
            self._log_initialized = True

            self.load_buffer(self.log_buffer, 'cluster')
            self.load_buffer(self.audit_buffer, 'audit')

            NotificationQueue.register(self.append_log, 'clog')

    @Endpoint()
    @ReadPermission
    @EndpointDoc("Display Logs Configuration",
                 responses={200: LOGS_SCHEMA})
    def all(self):
        self.initialize_buffers()
        return dict(
            clog=list(self.log_buffer),
            audit_log=list(self.audit_buffer),
        )