summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/pulsar/charts.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/pulsar/charts.go664
1 files changed, 664 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/charts.go b/src/go/collectors/go.d.plugin/modules/pulsar/charts.go
new file mode 100644
index 000000000..3ddff66f6
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/pulsar/charts.go
@@ -0,0 +1,664 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pulsar
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+)
+
+type (
+ Charts = module.Charts
+ Chart = module.Chart
+ Dims = module.Dims
+ Dim = module.Dim
+ Opts = module.Opts
+)
+
+var summaryCharts = Charts{
+ sumBrokerComponentsChart.Copy(),
+
+ sumMessagesRateChart.Copy(),
+ sumThroughputRateChart.Copy(),
+
+ sumStorageSizeChart.Copy(),
+ sumStorageOperationsRateChart.Copy(), // optional
+ sumMsgBacklogSizeChart.Copy(),
+ sumStorageWriteLatencyChart.Copy(),
+ sumEntrySizeChart.Copy(),
+
+ sumSubsDelayedChart.Copy(),
+ sumSubsMsgRateRedeliverChart.Copy(), // optional
+ sumSubsBlockedOnUnackedMsgChart.Copy(), // optional
+
+ sumReplicationRateChart.Copy(), // optional
+ sumReplicationThroughputRateChart.Copy(), // optional
+ sumReplicationBacklogChart.Copy(), // optional
+}
+
+var (
+ sumBrokerComponentsChart = Chart{
+ ID: "broker_components",
+ Title: "Broker Components",
+ Units: "components",
+ Fam: "ns summary",
+ Ctx: "pulsar.broker_components",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: "pulsar_namespaces_count", Name: "namespaces"},
+ {ID: metricPulsarTopicsCount, Name: "topics"},
+ {ID: metricPulsarSubscriptionsCount, Name: "subscriptions"},
+ {ID: metricPulsarProducersCount, Name: "producers"},
+ {ID: metricPulsarConsumersCount, Name: "consumers"},
+ },
+ }
+ sumMessagesRateChart = Chart{
+ ID: "messages_rate",
+ Title: "Messages Rate",
+ Units: "messages/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.messages_rate",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarRateIn, Name: "publish", Div: 1000},
+ {ID: metricPulsarRateOut, Name: "dispatch", Mul: -1, Div: 1000},
+ },
+ }
+ sumThroughputRateChart = Chart{
+ ID: "throughput_rate",
+ Title: "Throughput Rate",
+ Units: "KiB/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.throughput_rate",
+ Type: module.Area,
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarThroughputIn, Name: "publish", Div: 1024 * 1000},
+ {ID: metricPulsarThroughputOut, Name: "dispatch", Mul: -1, Div: 1024 * 1000},
+ },
+ }
+ sumStorageSizeChart = Chart{
+ ID: "storage_size",
+ Title: "Storage Size",
+ Units: "KiB",
+ Fam: "ns summary",
+ Ctx: "pulsar.storage_size",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarStorageSize, Name: "used", Div: 1024},
+ },
+ }
+ sumStorageOperationsRateChart = Chart{
+ ID: "storage_operations_rate",
+ Title: "Storage Read/Write Operations Rate",
+ Units: "message batches/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.storage_operations_rate",
+ Type: module.Area,
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarStorageReadRate, Name: "read", Div: 1000},
+ {ID: metricPulsarStorageWriteRate, Name: "write", Mul: -1, Div: 1000},
+ },
+ }
+ sumMsgBacklogSizeChart = Chart{
+ ID: "msg_backlog",
+ Title: "Messages Backlog Size",
+ Units: "messages",
+ Fam: "ns summary",
+ Ctx: "pulsar.msg_backlog",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarMsgBacklog, Name: "backlog"},
+ },
+ }
+ sumStorageWriteLatencyChart = Chart{
+ ID: "storage_write_latency",
+ Title: "Storage Write Latency",
+ Units: "entries/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.storage_write_latency",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: "pulsar_storage_write_latency_le_0_5", Name: "<=0.5ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_1", Name: "<=1ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_5", Name: "<=5ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_10", Name: "<=10ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_20", Name: "<=20ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_50", Name: "<=50ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_100", Name: "<=100ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_200", Name: "<=200ms", Div: 60},
+ {ID: "pulsar_storage_write_latency_le_1000", Name: "<=1s", Div: 60},
+ {ID: "pulsar_storage_write_latency_overflow", Name: ">1s", Div: 60},
+ },
+ }
+ sumEntrySizeChart = Chart{
+ ID: "entry_size",
+ Title: "Entry Size",
+ Units: "entries/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.entry_size",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: "pulsar_entry_size_le_128", Name: "<=128B", Div: 60},
+ {ID: "pulsar_entry_size_le_512", Name: "<=512B", Div: 60},
+ {ID: "pulsar_entry_size_le_1_kb", Name: "<=1KB", Div: 60},
+ {ID: "pulsar_entry_size_le_2_kb", Name: "<=2KB", Div: 60},
+ {ID: "pulsar_entry_size_le_4_kb", Name: "<=4KB", Div: 60},
+ {ID: "pulsar_entry_size_le_16_kb", Name: "<=16KB", Div: 60},
+ {ID: "pulsar_entry_size_le_100_kb", Name: "<=100KB", Div: 60},
+ {ID: "pulsar_entry_size_le_1_mb", Name: "<=1MB", Div: 60},
+ {ID: "pulsar_entry_size_le_overflow", Name: ">1MB", Div: 60},
+ },
+ }
+ sumSubsDelayedChart = Chart{
+ ID: "subscription_delayed",
+ Title: "Subscriptions Delayed for Dispatching",
+ Units: "message batches",
+ Fam: "ns summary",
+ Ctx: "pulsar.subscription_delayed",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarSubscriptionDelayed, Name: "delayed"},
+ },
+ }
+ sumSubsMsgRateRedeliverChart = Chart{
+ ID: "subscription_msg_rate_redeliver",
+ Title: "Subscriptions Redelivered Message Rate",
+ Units: "messages/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.subscription_msg_rate_redeliver",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarSubscriptionMsgRateRedeliver, Name: "redelivered", Div: 1000},
+ },
+ }
+ sumSubsBlockedOnUnackedMsgChart = Chart{
+ ID: "subscription_blocked_on_unacked_messages",
+ Title: "Subscriptions Blocked On Unacked Messages",
+ Units: "subscriptions",
+ Fam: "ns summary",
+ Ctx: "pulsar.subscription_blocked_on_unacked_messages",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarSubscriptionBlockedOnUnackedMessages, Name: "blocked"},
+ },
+ }
+ sumReplicationRateChart = Chart{
+ ID: "replication_rate",
+ Title: "Replication Rate",
+ Units: "messages/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.replication_rate",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarReplicationRateIn, Name: "in", Div: 1000},
+ {ID: metricPulsarReplicationRateOut, Name: "out", Mul: -1, Div: 1000},
+ },
+ }
+ sumReplicationThroughputRateChart = Chart{
+ ID: "replication_throughput_rate",
+ Title: "Replication Throughput Rate",
+ Units: "KiB/s",
+ Fam: "ns summary",
+ Ctx: "pulsar.replication_throughput_rate",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarReplicationThroughputIn, Name: "in", Div: 1024 * 1000},
+ {ID: metricPulsarReplicationThroughputOut, Name: "out", Mul: -1, Div: 1024 * 1000},
+ },
+ }
+ sumReplicationBacklogChart = Chart{
+ ID: "replication_backlog",
+ Title: "Replication Backlog",
+ Units: "messages",
+ Fam: "ns summary",
+ Ctx: "pulsar.replication_backlog",
+ Opts: Opts{StoreFirst: true},
+ Dims: Dims{
+ {ID: metricPulsarReplicationBacklog, Name: "backlog"},
+ },
+ }
+)
+
+var namespaceCharts = Charts{
+ nsBrokerComponentsChart.Copy(),
+ topicProducersChart.Copy(),
+ topicSubscriptionsChart.Copy(),
+ topicConsumersChart.Copy(),
+
+ nsMessagesRateChart.Copy(),
+ topicMessagesRateInChart.Copy(),
+ topicMessagesRateOutChart.Copy(),
+ nsThroughputRateCharts.Copy(),
+ topicThroughputRateInChart.Copy(),
+ topicThroughputRateOutChart.Copy(),
+
+ nsStorageSizeChart.Copy(),
+ topicStorageSizeChart.Copy(),
+ nsStorageOperationsChart.Copy(), // optional
+ topicStorageReadRateChart.Copy(), // optional
+ topicStorageWriteRateChart.Copy(), // optional
+ nsMsgBacklogSizeChart.Copy(),
+ topicMsgBacklogSizeChart.Copy(),
+ nsStorageWriteLatencyChart.Copy(),
+ nsEntrySizeChart.Copy(),
+
+ nsSubsDelayedChart.Copy(),
+ topicSubsDelayedChart.Copy(),
+ nsSubsMsgRateRedeliverChart.Copy(), // optional
+ topicSubsMsgRateRedeliverChart.Copy(), // optional
+ nsSubsBlockedOnUnackedMsgChart.Copy(), // optional
+ topicSubsBlockedOnUnackedMsgChart.Copy(), // optional
+
+ nsReplicationRateChart.Copy(), // optional
+ topicReplicationRateInChart.Copy(), // optional
+ topicReplicationRateOutChart.Copy(), // optional
+ nsReplicationThroughputChart.Copy(), // optional
+ topicReplicationThroughputRateInChart.Copy(), // optional
+ topicReplicationThroughputRateOutChart.Copy(), // optional
+ nsReplicationBacklogChart.Copy(), // optional
+ topicReplicationBacklogChart.Copy(), // optional
+}
+
+func toNamespaceChart(chart Chart) Chart {
+ chart = *chart.Copy()
+ if chart.ID == sumBrokerComponentsChart.ID {
+ _ = chart.RemoveDim("pulsar_namespaces_count")
+ }
+ chart.ID += "_namespace_%s"
+ chart.Fam = "ns %s"
+ if idx := strings.IndexByte(chart.Ctx, '.'); idx > 0 {
+ // pulsar.messages_rate => pulsar.namespace_messages_rate
+ chart.Ctx = chart.Ctx[:idx+1] + "namespace_" + chart.Ctx[idx+1:]
+ }
+ for _, dim := range chart.Dims {
+ dim.ID += "_%s"
+ }
+ return chart
+}
+
+var (
+ nsBrokerComponentsChart = toNamespaceChart(sumBrokerComponentsChart)
+ nsMessagesRateChart = toNamespaceChart(sumMessagesRateChart)
+ nsThroughputRateCharts = toNamespaceChart(sumThroughputRateChart)
+ nsStorageSizeChart = toNamespaceChart(sumStorageSizeChart)
+ nsStorageOperationsChart = toNamespaceChart(sumStorageOperationsRateChart)
+ nsMsgBacklogSizeChart = toNamespaceChart(sumMsgBacklogSizeChart)
+ nsStorageWriteLatencyChart = toNamespaceChart(sumStorageWriteLatencyChart)
+ nsEntrySizeChart = toNamespaceChart(sumEntrySizeChart)
+ nsSubsDelayedChart = toNamespaceChart(sumSubsDelayedChart)
+ nsSubsMsgRateRedeliverChart = toNamespaceChart(sumSubsMsgRateRedeliverChart)
+ nsSubsBlockedOnUnackedMsgChart = toNamespaceChart(sumSubsBlockedOnUnackedMsgChart)
+ nsReplicationRateChart = toNamespaceChart(sumReplicationRateChart)
+ nsReplicationThroughputChart = toNamespaceChart(sumReplicationThroughputRateChart)
+ nsReplicationBacklogChart = toNamespaceChart(sumReplicationBacklogChart)
+
+ topicProducersChart = Chart{
+ ID: "topic_producers_namespace_%s",
+ Title: "Topic Producers",
+ Units: "producers",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_producers",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicSubscriptionsChart = Chart{
+ ID: "topic_subscriptions_namespace_%s",
+ Title: "Topic Subscriptions",
+ Units: "subscriptions",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_subscriptions",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicConsumersChart = Chart{
+ ID: "topic_consumers_namespace_%s",
+ Title: "Topic Consumers",
+ Units: "consumers",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_consumers",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicMessagesRateInChart = Chart{
+ ID: "topic_messages_rate_in_namespace_%s",
+ Title: "Topic Publish Messages Rate",
+ Units: "publishes/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_messages_rate_in",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicMessagesRateOutChart = Chart{
+ ID: "topic_messages_rate_out_namespace_%s",
+ Title: "Topic Dispatch Messages Rate",
+ Units: "dispatches/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_messages_rate_out",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicThroughputRateInChart = Chart{
+ ID: "topic_throughput_rate_in_namespace_%s",
+ Title: "Topic Publish Throughput Rate",
+ Units: "KiB/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_throughput_rate_in",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicThroughputRateOutChart = Chart{
+ ID: "topic_throughput_rate_out_namespace_%s",
+ Title: "Topic Dispatch Throughput Rate",
+ Units: "KiB/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_throughput_rate_out",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicStorageSizeChart = Chart{
+ ID: "topic_storage_size_namespace_%s",
+ Title: "Topic Storage Size",
+ Units: "KiB",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_storage_size",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicStorageReadRateChart = Chart{
+ ID: "topic_storage_read_rate_namespace_%s",
+ Title: "Topic Storage Read Rate",
+ Units: "message batches/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_storage_read_rate",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicStorageWriteRateChart = Chart{
+ ID: "topic_storage_write_rate_namespace_%s",
+ Title: "Topic Storage Write Rate",
+ Units: "message batches/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_storage_write_rate",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicMsgBacklogSizeChart = Chart{
+ ID: "topic_msg_backlog_namespace_%s",
+ Title: "Topic Messages Backlog Size",
+ Units: "messages",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_msg_backlog",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicSubsDelayedChart = Chart{
+ ID: "topic_subscription_delayed_namespace_%s",
+ Title: "Topic Subscriptions Delayed for Dispatching",
+ Units: "message batches",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_subscription_delayed",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicSubsMsgRateRedeliverChart = Chart{
+ ID: "topic_subscription_msg_rate_redeliver_namespace_%s",
+ Title: "Topic Subscriptions Redelivered Message Rate",
+ Units: "messages/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_subscription_msg_rate_redeliver",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicSubsBlockedOnUnackedMsgChart = Chart{
+ ID: "topic_subscription_blocked_on_unacked_messages_namespace_%s",
+ Title: "Topic Subscriptions Blocked On Unacked Messages",
+ Units: "blocked subscriptions",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_subscription_blocked_on_unacked_messages",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicReplicationRateInChart = Chart{
+ ID: "topic_replication_rate_in_namespace_%s",
+ Title: "Topic Replication Rate From Remote Cluster",
+ Units: "messages/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_replication_rate_in",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicReplicationRateOutChart = Chart{
+ ID: "replication_rate_out_namespace_%s",
+ Title: "Topic Replication Rate To Remote Cluster",
+ Units: "messages/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_replication_rate_out",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicReplicationThroughputRateInChart = Chart{
+ ID: "topic_replication_throughput_rate_in_namespace_%s",
+ Title: "Topic Replication Throughput Rate From Remote Cluster",
+ Units: "KiB/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_replication_throughput_rate_in",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicReplicationThroughputRateOutChart = Chart{
+ ID: "topic_replication_throughput_rate_out_namespace_%s",
+ Title: "Topic Replication Throughput Rate To Remote Cluster",
+ Units: "KiB/s",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_replication_throughput_rate_out",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+ topicReplicationBacklogChart = Chart{
+ ID: "topic_replication_backlog_namespace_%s",
+ Title: "Topic Replication Backlog",
+ Units: "messages",
+ Fam: "ns %s",
+ Ctx: "pulsar.topic_replication_backlog",
+ Type: module.Stacked,
+ Opts: Opts{StoreFirst: true},
+ }
+)
+
+func (p *Pulsar) adjustCharts(pms prometheus.Series) {
+ if pms := pms.FindByName(metricPulsarStorageReadRate); pms.Len() == 0 || pms[0].Labels.Get("namespace") == "" {
+ p.removeSummaryChart(sumStorageOperationsRateChart.ID)
+ p.removeNamespaceChart(nsStorageOperationsChart.ID)
+ p.removeNamespaceChart(topicStorageReadRateChart.ID)
+ p.removeNamespaceChart(topicStorageWriteRateChart.ID)
+ delete(p.topicChartsMapping, topicStorageReadRateChart.ID)
+ delete(p.topicChartsMapping, topicStorageWriteRateChart.ID)
+ }
+ if pms.FindByName(metricPulsarSubscriptionMsgRateRedeliver).Len() == 0 {
+ p.removeSummaryChart(sumSubsMsgRateRedeliverChart.ID)
+ p.removeSummaryChart(sumSubsBlockedOnUnackedMsgChart.ID)
+ p.removeNamespaceChart(nsSubsMsgRateRedeliverChart.ID)
+ p.removeNamespaceChart(nsSubsBlockedOnUnackedMsgChart.ID)
+ p.removeNamespaceChart(topicSubsMsgRateRedeliverChart.ID)
+ p.removeNamespaceChart(topicSubsBlockedOnUnackedMsgChart.ID)
+ delete(p.topicChartsMapping, topicSubsMsgRateRedeliverChart.ID)
+ delete(p.topicChartsMapping, topicSubsBlockedOnUnackedMsgChart.ID)
+ }
+ if pms.FindByName(metricPulsarReplicationBacklog).Len() == 0 {
+ p.removeSummaryChart(sumReplicationRateChart.ID)
+ p.removeSummaryChart(sumReplicationThroughputRateChart.ID)
+ p.removeSummaryChart(sumReplicationBacklogChart.ID)
+ p.removeNamespaceChart(nsReplicationRateChart.ID)
+ p.removeNamespaceChart(nsReplicationThroughputChart.ID)
+ p.removeNamespaceChart(nsReplicationBacklogChart.ID)
+ p.removeNamespaceChart(topicReplicationRateInChart.ID)
+ p.removeNamespaceChart(topicReplicationRateOutChart.ID)
+ p.removeNamespaceChart(topicReplicationThroughputRateInChart.ID)
+ p.removeNamespaceChart(topicReplicationThroughputRateOutChart.ID)
+ p.removeNamespaceChart(topicReplicationBacklogChart.ID)
+ delete(p.topicChartsMapping, topicReplicationRateInChart.ID)
+ delete(p.topicChartsMapping, topicReplicationRateOutChart.ID)
+ delete(p.topicChartsMapping, topicReplicationThroughputRateInChart.ID)
+ delete(p.topicChartsMapping, topicReplicationThroughputRateOutChart.ID)
+ delete(p.topicChartsMapping, topicReplicationBacklogChart.ID)
+ }
+}
+
+func (p *Pulsar) removeSummaryChart(chartID string) {
+ if err := p.Charts().Remove(chartID); err != nil {
+ p.Warning(err)
+ }
+}
+
+func (p *Pulsar) removeNamespaceChart(chartID string) {
+ if err := p.nsCharts.Remove(chartID); err != nil {
+ p.Warning(err)
+ }
+}
+
+func (p *Pulsar) updateCharts() {
+ // NOTE: order is important
+ for ns := range p.curCache.namespaces {
+ if !p.cache.namespaces[ns] {
+ p.cache.namespaces[ns] = true
+ p.addNamespaceCharts(ns)
+ }
+ }
+ for top := range p.curCache.topics {
+ if !p.cache.topics[top] {
+ p.cache.topics[top] = true
+ p.addTopicToCharts(top)
+ }
+ }
+ for top := range p.cache.topics {
+ if p.curCache.topics[top] {
+ continue
+ }
+ delete(p.cache.topics, top)
+ p.removeTopicFromCharts(top)
+ }
+ for ns := range p.cache.namespaces {
+ if p.curCache.namespaces[ns] {
+ continue
+ }
+ delete(p.cache.namespaces, ns)
+ p.removeNamespaceFromCharts(ns)
+ }
+}
+
+func (p *Pulsar) addNamespaceCharts(ns namespace) {
+ charts := p.nsCharts.Copy()
+ for _, chart := range *charts {
+ chart.ID = fmt.Sprintf(chart.ID, ns.name)
+ chart.Fam = fmt.Sprintf(chart.Fam, ns.name)
+ for _, dim := range chart.Dims {
+ dim.ID = fmt.Sprintf(dim.ID, ns.name)
+ }
+ }
+ if err := p.Charts().Add(*charts...); err != nil {
+ p.Warning(err)
+ }
+}
+
+func (p *Pulsar) removeNamespaceFromCharts(ns namespace) {
+ for _, chart := range *p.nsCharts {
+ id := fmt.Sprintf(chart.ID, ns.name)
+ if chart = p.Charts().Get(id); chart != nil {
+ chart.MarkRemove()
+ } else {
+ p.Warningf("could not remove namespace chart '%s'", id)
+ }
+ }
+}
+
+func (p *Pulsar) addTopicToCharts(top topic) {
+ for id, metric := range p.topicChartsMapping {
+ id = fmt.Sprintf(id, top.namespace)
+ chart := p.Charts().Get(id)
+ if chart == nil {
+ p.Warningf("could not add topic '%s' to chart '%s': chart not found", top.name, id)
+ continue
+ }
+
+ dim := Dim{ID: metric + "_" + top.name, Name: extractTopicName(top)}
+ switch metric {
+ case metricPulsarThroughputIn,
+ metricPulsarThroughputOut,
+ metricPulsarReplicationThroughputIn,
+ metricPulsarReplicationThroughputOut:
+ dim.Div = 1024 * 1000
+ case metricPulsarRateIn,
+ metricPulsarRateOut,
+ metricPulsarStorageWriteRate,
+ metricPulsarStorageReadRate,
+ metricPulsarSubscriptionMsgRateRedeliver,
+ metricPulsarReplicationRateIn,
+ metricPulsarReplicationRateOut:
+ dim.Div = 1000
+ case metricPulsarStorageSize:
+ dim.Div = 1024
+ }
+
+ if err := chart.AddDim(&dim); err != nil {
+ p.Warning(err)
+ }
+ chart.MarkNotCreated()
+ }
+}
+
+func (p *Pulsar) removeTopicFromCharts(top topic) {
+ for id, metric := range p.topicChartsMapping {
+ id = fmt.Sprintf(id, top.namespace)
+ chart := p.Charts().Get(id)
+ if chart == nil {
+ p.Warningf("could not remove topic '%s' from chart '%s': chart not found", top.name, id)
+ continue
+ }
+
+ if err := chart.MarkDimRemove(metric+"_"+top.name, true); err != nil {
+ p.Warning(err)
+ }
+ chart.MarkNotCreated()
+ }
+}
+
+func topicChartsMapping() map[string]string {
+ return map[string]string{
+ topicSubscriptionsChart.ID: metricPulsarSubscriptionsCount,
+ topicProducersChart.ID: metricPulsarProducersCount,
+ topicConsumersChart.ID: metricPulsarConsumersCount,
+ topicMessagesRateInChart.ID: metricPulsarRateIn,
+ topicMessagesRateOutChart.ID: metricPulsarRateOut,
+ topicThroughputRateInChart.ID: metricPulsarThroughputIn,
+ topicThroughputRateOutChart.ID: metricPulsarThroughputOut,
+ topicStorageSizeChart.ID: metricPulsarStorageSize,
+ topicStorageReadRateChart.ID: metricPulsarStorageReadRate,
+ topicStorageWriteRateChart.ID: metricPulsarStorageWriteRate,
+ topicMsgBacklogSizeChart.ID: metricPulsarMsgBacklog,
+ topicSubsDelayedChart.ID: metricPulsarSubscriptionDelayed,
+ topicSubsMsgRateRedeliverChart.ID: metricPulsarSubscriptionMsgRateRedeliver,
+ topicSubsBlockedOnUnackedMsgChart.ID: metricPulsarSubscriptionBlockedOnUnackedMessages,
+ topicReplicationRateInChart.ID: metricPulsarReplicationRateIn,
+ topicReplicationRateOutChart.ID: metricPulsarReplicationRateOut,
+ topicReplicationThroughputRateInChart.ID: metricPulsarReplicationThroughputIn,
+ topicReplicationThroughputRateOutChart.ID: metricPulsarReplicationThroughputOut,
+ topicReplicationBacklogChart.ID: metricPulsarReplicationBacklog,
+ }
+}
+
+func extractTopicName(top topic) string {
+ // persistent://sample/ns1/demo-1 => p:demo-1
+ if idx := strings.LastIndexByte(top.name, '/'); idx > 0 {
+ return top.name[:1] + ":" + top.name[idx+1:]
+ }
+ return top.name
+}