summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/prometheus/test_module.py
blob: 0647cb658c75de174a9ced594f5bd9ab5fd794a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from typing import Dict
from unittest import TestCase

from prometheus.module import Metric, LabelValues, Number


class MetricGroupTest(TestCase):
    def setUp(self):
        self.DISK_OCCUPATION = (
            "ceph_daemon",
            "device",
            "db_device",
            "wal_device",
            "instance",
        )
        self.metric_value: Dict[LabelValues, Number] = {
            ("osd.0", "/dev/dm-0", "", "", "node1"): 1,
            ("osd.1", "/dev/dm-0", "", "", "node3"): 1,
            ("osd.2", "/dev/dm-0", "", "", "node2"): 1,
            ("osd.3", "/dev/dm-1", "", "", "node1"): 1,
            ("osd.4", "/dev/dm-1", "", "", "node3"): 1,
            ("osd.5", "/dev/dm-1", "", "", "node2"): 1,
            ("osd.6", "/dev/dm-1", "", "", "node2"): 1,
        }

    def test_metric_group_by(self):
        m = Metric("untyped", "disk_occupation", "", self.DISK_OCCUPATION)
        m.value = self.metric_value
        grouped_metric = m.group_by(
            ["device", "instance"],
            {"ceph_daemon": lambda xs: "+".join(xs)},
            name="disk_occupation_display",
        )
        self.assertEqual(
            grouped_metric.value,
            {
                ("osd.0", "/dev/dm-0", "node1"): 1,
                ("osd.1", "/dev/dm-0", "node3"): 1,
                ("osd.2", "/dev/dm-0", "node2"): 1,
                ("osd.3", "/dev/dm-1", "node1"): 1,
                ("osd.4", "/dev/dm-1", "node3"): 1,
                ("osd.5+osd.6", "/dev/dm-1", "node2"): 1,
            },
        )
        self.maxDiff = None
        self.assertEqual(
            grouped_metric.str_expfmt(),
            """
# HELP ceph_disk_occupation_display 
# TYPE ceph_disk_occupation_display untyped
ceph_disk_occupation_display{ceph_daemon="osd.0",device="/dev/dm-0",instance="node1"} 1.0
ceph_disk_occupation_display{ceph_daemon="osd.1",device="/dev/dm-0",instance="node3"} 1.0
ceph_disk_occupation_display{ceph_daemon="osd.2",device="/dev/dm-0",instance="node2"} 1.0
ceph_disk_occupation_display{ceph_daemon="osd.3",device="/dev/dm-1",instance="node1"} 1.0
ceph_disk_occupation_display{ceph_daemon="osd.4",device="/dev/dm-1",instance="node3"} 1.0
ceph_disk_occupation_display{ceph_daemon="osd.5+osd.6",device="/dev/dm-1",instance="node2"} 1.0""",  # noqa: W291
        )
        self.assertEqual(
            grouped_metric.labelnames, ("ceph_daemon", "device", "instance")
        )

    def test_metric_group_by__no_value(self):
        m = Metric("metric_type", "name", "desc", labels=('foo', 'bar'))
        grouped = m.group_by(['foo'], {'bar': lambda bars: ', '.join(bars)})
        self.assertEqual(grouped.value, {})
        self.assertEqual(grouped.str_expfmt(),
                         '\n# HELP ceph_name desc\n# TYPE ceph_name metric_type')

    def test_metric_group_by__no_labels(self):
        m = Metric("metric_type", "name", "desc", labels=None)
        with self.assertRaises(AssertionError) as cm:
            m.group_by([], {})
        self.assertEqual(str(cm.exception), "cannot match keys without label names")

    def test_metric_group_by__key_not_in_labels(self):
        m = Metric("metric_type", "name", "desc", labels=("foo", "bar"))
        m.value = self.metric_value
        with self.assertRaises(AssertionError) as cm:
            m.group_by(["baz"], {})
        self.assertEqual(str(cm.exception), "unknown key: baz")

    def test_metric_group_by__empty_joins(self):
        m = Metric("", "", "", ("foo", "bar"))
        with self.assertRaises(AssertionError) as cm:
            m.group_by(["foo"], joins={})
        self.assertEqual(str(cm.exception), "joins must not be empty")

    def test_metric_group_by__joins_not_callable(self):
        m = Metric("", "", "", ("foo", "bar"))
        m.value = self.metric_value
        with self.assertRaises(AssertionError) as cm:
            m.group_by(["foo"], {"bar": "not callable str"})
        self.assertEqual(str(cm.exception), "joins must be callable")