summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/pulsar/collect.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/collect.go138
1 files changed, 138 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/collect.go b/src/go/collectors/go.d.plugin/modules/pulsar/collect.go
new file mode 100644
index 000000000..f28e6cb2c
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/pulsar/collect.go
@@ -0,0 +1,138 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pulsar
+
+import (
+ "errors"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/stm"
+)
+
+func isValidPulsarMetrics(pms prometheus.Series) bool {
+ return pms.FindByName(metricPulsarTopicsCount).Len() > 0
+}
+
+func (p *Pulsar) resetCurCache() {
+ for ns := range p.curCache.namespaces {
+ delete(p.curCache.namespaces, ns)
+ }
+ for top := range p.curCache.topics {
+ delete(p.curCache.topics, top)
+ }
+}
+
+func (p *Pulsar) collect() (map[string]int64, error) {
+ pms, err := p.prom.ScrapeSeries()
+ if err != nil {
+ return nil, err
+ }
+
+ if !isValidPulsarMetrics(pms) {
+ return nil, errors.New("returned metrics aren't Apache Pulsar metrics")
+ }
+
+ p.once.Do(func() {
+ p.adjustCharts(pms)
+ })
+
+ mx := p.collectMetrics(pms)
+ p.updateCharts()
+ p.resetCurCache()
+
+ return stm.ToMap(mx), nil
+}
+
+func (p *Pulsar) collectMetrics(pms prometheus.Series) map[string]float64 {
+ mx := make(map[string]float64)
+ p.collectBroker(mx, pms)
+ return mx
+}
+
+func (p *Pulsar) collectBroker(mx map[string]float64, pms prometheus.Series) {
+ pms = findPulsarMetrics(pms)
+ for _, pm := range pms {
+ ns, top := newNamespace(pm), newTopic(pm)
+ if ns.name == "" {
+ continue
+ }
+
+ p.curCache.namespaces[ns] = true
+
+ value := pm.Value * precision(pm.Name())
+ mx[pm.Name()] += value
+ mx[pm.Name()+"_"+ns.name] += value
+
+ if top.name == "" || !p.topicFilter.MatchString(top.name) {
+ continue
+ }
+
+ p.curCache.topics[top] = true
+ mx[pm.Name()+"_"+top.name] += value
+ }
+ mx["pulsar_namespaces_count"] = float64(len(p.curCache.namespaces))
+}
+
+func newNamespace(pm prometheus.SeriesSample) namespace {
+ return namespace{
+ name: pm.Labels.Get("namespace"),
+ }
+}
+
+func newTopic(pm prometheus.SeriesSample) topic {
+ return topic{
+ namespace: pm.Labels.Get("namespace"),
+ name: pm.Labels.Get("topic"),
+ }
+}
+
+func findPulsarMetrics(pms prometheus.Series) prometheus.Series {
+ var ms prometheus.Series
+ for _, pm := range pms {
+ if isPulsarHistogram(pm) {
+ ms = append(ms, pm)
+ }
+ }
+ pms = pms.FindByNames(
+ metricPulsarTopicsCount,
+ metricPulsarSubscriptionDelayed,
+ metricPulsarSubscriptionsCount,
+ metricPulsarProducersCount,
+ metricPulsarConsumersCount,
+ metricPulsarRateIn,
+ metricPulsarRateOut,
+ metricPulsarThroughputIn,
+ metricPulsarThroughputOut,
+ metricPulsarStorageSize,
+ metricPulsarStorageWriteRate,
+ metricPulsarStorageReadRate,
+ metricPulsarMsgBacklog,
+ metricPulsarSubscriptionMsgRateRedeliver,
+ metricPulsarSubscriptionBlockedOnUnackedMessages,
+ )
+ return append(ms, pms...)
+}
+
+func isPulsarHistogram(pm prometheus.SeriesSample) bool {
+ s := pm.Name()
+ return strings.HasPrefix(s, "pulsar_storage_write_latency") || strings.HasPrefix(s, "pulsar_entry_size")
+}
+
+func precision(metric string) float64 {
+ switch metric {
+ case metricPulsarRateIn,
+ metricPulsarRateOut,
+ metricPulsarThroughputIn,
+ metricPulsarThroughputOut,
+ metricPulsarStorageWriteRate,
+ metricPulsarStorageReadRate,
+ metricPulsarSubscriptionMsgRateRedeliver,
+ metricPulsarReplicationRateIn,
+ metricPulsarReplicationRateOut,
+ metricPulsarReplicationThroughputIn,
+ metricPulsarReplicationThroughputOut:
+ return 1000
+ }
+ return 1
+}