diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/pulsar/collect.go | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/collect.go b/src/go/collectors/go.d.plugin/modules/pulsar/collect.go new file mode 100644 index 000000000..f28e6cb2c --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/pulsar/collect.go @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pulsar + +import ( + "errors" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus" + "github.com/netdata/netdata/go/go.d.plugin/pkg/stm" +) + +func isValidPulsarMetrics(pms prometheus.Series) bool { + return pms.FindByName(metricPulsarTopicsCount).Len() > 0 +} + +func (p *Pulsar) resetCurCache() { + for ns := range p.curCache.namespaces { + delete(p.curCache.namespaces, ns) + } + for top := range p.curCache.topics { + delete(p.curCache.topics, top) + } +} + +func (p *Pulsar) collect() (map[string]int64, error) { + pms, err := p.prom.ScrapeSeries() + if err != nil { + return nil, err + } + + if !isValidPulsarMetrics(pms) { + return nil, errors.New("returned metrics aren't Apache Pulsar metrics") + } + + p.once.Do(func() { + p.adjustCharts(pms) + }) + + mx := p.collectMetrics(pms) + p.updateCharts() + p.resetCurCache() + + return stm.ToMap(mx), nil +} + +func (p *Pulsar) collectMetrics(pms prometheus.Series) map[string]float64 { + mx := make(map[string]float64) + p.collectBroker(mx, pms) + return mx +} + +func (p *Pulsar) collectBroker(mx map[string]float64, pms prometheus.Series) { + pms = findPulsarMetrics(pms) + for _, pm := range pms { + ns, top := newNamespace(pm), newTopic(pm) + if ns.name == "" { + continue + } + + p.curCache.namespaces[ns] = true + + value := pm.Value * precision(pm.Name()) + mx[pm.Name()] += value + mx[pm.Name()+"_"+ns.name] += value + + if top.name == "" || !p.topicFilter.MatchString(top.name) { + continue + } + + p.curCache.topics[top] = true + mx[pm.Name()+"_"+top.name] += value + } + mx["pulsar_namespaces_count"] = float64(len(p.curCache.namespaces)) +} + +func newNamespace(pm prometheus.SeriesSample) namespace { + return namespace{ + name: pm.Labels.Get("namespace"), + } +} + +func newTopic(pm prometheus.SeriesSample) topic { + return topic{ + namespace: pm.Labels.Get("namespace"), + name: pm.Labels.Get("topic"), + } +} + +func findPulsarMetrics(pms prometheus.Series) prometheus.Series { + var ms prometheus.Series + for _, pm := range pms { + if isPulsarHistogram(pm) { + ms = append(ms, pm) + } + } + pms = pms.FindByNames( + metricPulsarTopicsCount, + metricPulsarSubscriptionDelayed, + metricPulsarSubscriptionsCount, + metricPulsarProducersCount, + metricPulsarConsumersCount, + metricPulsarRateIn, + metricPulsarRateOut, + metricPulsarThroughputIn, + metricPulsarThroughputOut, + metricPulsarStorageSize, + metricPulsarStorageWriteRate, + metricPulsarStorageReadRate, + metricPulsarMsgBacklog, + metricPulsarSubscriptionMsgRateRedeliver, + metricPulsarSubscriptionBlockedOnUnackedMessages, + ) + return append(ms, pms...) +} + +func isPulsarHistogram(pm prometheus.SeriesSample) bool { + s := pm.Name() + return strings.HasPrefix(s, "pulsar_storage_write_latency") || strings.HasPrefix(s, "pulsar_entry_size") +} + +func precision(metric string) float64 { + switch metric { + case metricPulsarRateIn, + metricPulsarRateOut, + metricPulsarThroughputIn, + metricPulsarThroughputOut, + metricPulsarStorageWriteRate, + metricPulsarStorageReadRate, + metricPulsarSubscriptionMsgRateRedeliver, + metricPulsarReplicationRateIn, + metricPulsarReplicationRateOut, + metricPulsarReplicationThroughputIn, + metricPulsarReplicationThroughputOut: + return 1000 + } + return 1 +} |