summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/pulsar/collect.go
blob: f28e6cb2c40dd42eace5c74b615232583d426bf5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// SPDX-License-Identifier: GPL-3.0-or-later

package pulsar

import (
	"errors"
	"strings"

	"github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
	"github.com/netdata/netdata/go/go.d.plugin/pkg/stm"
)

func isValidPulsarMetrics(pms prometheus.Series) bool {
	return pms.FindByName(metricPulsarTopicsCount).Len() > 0
}

func (p *Pulsar) resetCurCache() {
	for ns := range p.curCache.namespaces {
		delete(p.curCache.namespaces, ns)
	}
	for top := range p.curCache.topics {
		delete(p.curCache.topics, top)
	}
}

func (p *Pulsar) collect() (map[string]int64, error) {
	pms, err := p.prom.ScrapeSeries()
	if err != nil {
		return nil, err
	}

	if !isValidPulsarMetrics(pms) {
		return nil, errors.New("returned metrics aren't Apache Pulsar metrics")
	}

	p.once.Do(func() {
		p.adjustCharts(pms)
	})

	mx := p.collectMetrics(pms)
	p.updateCharts()
	p.resetCurCache()

	return stm.ToMap(mx), nil
}

func (p *Pulsar) collectMetrics(pms prometheus.Series) map[string]float64 {
	mx := make(map[string]float64)
	p.collectBroker(mx, pms)
	return mx
}

func (p *Pulsar) collectBroker(mx map[string]float64, pms prometheus.Series) {
	pms = findPulsarMetrics(pms)
	for _, pm := range pms {
		ns, top := newNamespace(pm), newTopic(pm)
		if ns.name == "" {
			continue
		}

		p.curCache.namespaces[ns] = true

		value := pm.Value * precision(pm.Name())
		mx[pm.Name()] += value
		mx[pm.Name()+"_"+ns.name] += value

		if top.name == "" || !p.topicFilter.MatchString(top.name) {
			continue
		}

		p.curCache.topics[top] = true
		mx[pm.Name()+"_"+top.name] += value
	}
	mx["pulsar_namespaces_count"] = float64(len(p.curCache.namespaces))
}

func newNamespace(pm prometheus.SeriesSample) namespace {
	return namespace{
		name: pm.Labels.Get("namespace"),
	}
}

func newTopic(pm prometheus.SeriesSample) topic {
	return topic{
		namespace: pm.Labels.Get("namespace"),
		name:      pm.Labels.Get("topic"),
	}
}

func findPulsarMetrics(pms prometheus.Series) prometheus.Series {
	var ms prometheus.Series
	for _, pm := range pms {
		if isPulsarHistogram(pm) {
			ms = append(ms, pm)
		}
	}
	pms = pms.FindByNames(
		metricPulsarTopicsCount,
		metricPulsarSubscriptionDelayed,
		metricPulsarSubscriptionsCount,
		metricPulsarProducersCount,
		metricPulsarConsumersCount,
		metricPulsarRateIn,
		metricPulsarRateOut,
		metricPulsarThroughputIn,
		metricPulsarThroughputOut,
		metricPulsarStorageSize,
		metricPulsarStorageWriteRate,
		metricPulsarStorageReadRate,
		metricPulsarMsgBacklog,
		metricPulsarSubscriptionMsgRateRedeliver,
		metricPulsarSubscriptionBlockedOnUnackedMessages,
	)
	return append(ms, pms...)
}

func isPulsarHistogram(pm prometheus.SeriesSample) bool {
	s := pm.Name()
	return strings.HasPrefix(s, "pulsar_storage_write_latency") || strings.HasPrefix(s, "pulsar_entry_size")
}

func precision(metric string) float64 {
	switch metric {
	case metricPulsarRateIn,
		metricPulsarRateOut,
		metricPulsarThroughputIn,
		metricPulsarThroughputOut,
		metricPulsarStorageWriteRate,
		metricPulsarStorageReadRate,
		metricPulsarSubscriptionMsgRateRedeliver,
		metricPulsarReplicationRateIn,
		metricPulsarReplicationRateOut,
		metricPulsarReplicationThroughputIn,
		metricPulsarReplicationThroughputOut:
		return 1000
	}
	return 1
}