diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:53:24 +0000 |
commit | b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch) | |
tree | d4d31289c39fc00da064a825df13a0b98ce95b10 /src/go/collectors/go.d.plugin/modules/pulsar/charts.go | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.tar.xz netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.zip |
Adding upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/pulsar/charts.go | 664 |
1 files changed, 664 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/pulsar/charts.go b/src/go/collectors/go.d.plugin/modules/pulsar/charts.go new file mode 100644 index 000000000..3ddff66f6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/pulsar/charts.go @@ -0,0 +1,664 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pulsar + +import ( + "fmt" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus" + + "github.com/netdata/netdata/go/go.d.plugin/agent/module" +) + +type ( + Charts = module.Charts + Chart = module.Chart + Dims = module.Dims + Dim = module.Dim + Opts = module.Opts +) + +var summaryCharts = Charts{ + sumBrokerComponentsChart.Copy(), + + sumMessagesRateChart.Copy(), + sumThroughputRateChart.Copy(), + + sumStorageSizeChart.Copy(), + sumStorageOperationsRateChart.Copy(), // optional + sumMsgBacklogSizeChart.Copy(), + sumStorageWriteLatencyChart.Copy(), + sumEntrySizeChart.Copy(), + + sumSubsDelayedChart.Copy(), + sumSubsMsgRateRedeliverChart.Copy(), // optional + sumSubsBlockedOnUnackedMsgChart.Copy(), // optional + + sumReplicationRateChart.Copy(), // optional + sumReplicationThroughputRateChart.Copy(), // optional + sumReplicationBacklogChart.Copy(), // optional +} + +var ( + sumBrokerComponentsChart = Chart{ + ID: "broker_components", + Title: "Broker Components", + Units: "components", + Fam: "ns summary", + Ctx: "pulsar.broker_components", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: "pulsar_namespaces_count", Name: "namespaces"}, + {ID: metricPulsarTopicsCount, Name: "topics"}, + {ID: metricPulsarSubscriptionsCount, Name: "subscriptions"}, + {ID: metricPulsarProducersCount, Name: "producers"}, + {ID: metricPulsarConsumersCount, Name: "consumers"}, + }, + } + sumMessagesRateChart = Chart{ + ID: "messages_rate", + Title: "Messages Rate", + Units: "messages/s", + Fam: "ns summary", + Ctx: "pulsar.messages_rate", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarRateIn, Name: "publish", Div: 1000}, + {ID: metricPulsarRateOut, Name: "dispatch", Mul: -1, Div: 1000}, + }, + } + sumThroughputRateChart = Chart{ + ID: "throughput_rate", + Title: "Throughput Rate", + Units: "KiB/s", + Fam: "ns summary", + Ctx: "pulsar.throughput_rate", + Type: module.Area, + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarThroughputIn, Name: "publish", Div: 1024 * 1000}, + {ID: metricPulsarThroughputOut, Name: "dispatch", Mul: -1, Div: 1024 * 1000}, + }, + } + sumStorageSizeChart = Chart{ + ID: "storage_size", + Title: "Storage Size", + Units: "KiB", + Fam: "ns summary", + Ctx: "pulsar.storage_size", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarStorageSize, Name: "used", Div: 1024}, + }, + } + sumStorageOperationsRateChart = Chart{ + ID: "storage_operations_rate", + Title: "Storage Read/Write Operations Rate", + Units: "message batches/s", + Fam: "ns summary", + Ctx: "pulsar.storage_operations_rate", + Type: module.Area, + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarStorageReadRate, Name: "read", Div: 1000}, + {ID: metricPulsarStorageWriteRate, Name: "write", Mul: -1, Div: 1000}, + }, + } + sumMsgBacklogSizeChart = Chart{ + ID: "msg_backlog", + Title: "Messages Backlog Size", + Units: "messages", + Fam: "ns summary", + Ctx: "pulsar.msg_backlog", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarMsgBacklog, Name: "backlog"}, + }, + } + sumStorageWriteLatencyChart = Chart{ + ID: "storage_write_latency", + Title: "Storage Write Latency", + Units: "entries/s", + Fam: "ns summary", + Ctx: "pulsar.storage_write_latency", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: "pulsar_storage_write_latency_le_0_5", Name: "<=0.5ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_1", Name: "<=1ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_5", Name: "<=5ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_10", Name: "<=10ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_20", Name: "<=20ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_50", Name: "<=50ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_100", Name: "<=100ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_200", Name: "<=200ms", Div: 60}, + {ID: "pulsar_storage_write_latency_le_1000", Name: "<=1s", Div: 60}, + {ID: "pulsar_storage_write_latency_overflow", Name: ">1s", Div: 60}, + }, + } + sumEntrySizeChart = Chart{ + ID: "entry_size", + Title: "Entry Size", + Units: "entries/s", + Fam: "ns summary", + Ctx: "pulsar.entry_size", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: "pulsar_entry_size_le_128", Name: "<=128B", Div: 60}, + {ID: "pulsar_entry_size_le_512", Name: "<=512B", Div: 60}, + {ID: "pulsar_entry_size_le_1_kb", Name: "<=1KB", Div: 60}, + {ID: "pulsar_entry_size_le_2_kb", Name: "<=2KB", Div: 60}, + {ID: "pulsar_entry_size_le_4_kb", Name: "<=4KB", Div: 60}, + {ID: "pulsar_entry_size_le_16_kb", Name: "<=16KB", Div: 60}, + {ID: "pulsar_entry_size_le_100_kb", Name: "<=100KB", Div: 60}, + {ID: "pulsar_entry_size_le_1_mb", Name: "<=1MB", Div: 60}, + {ID: "pulsar_entry_size_le_overflow", Name: ">1MB", Div: 60}, + }, + } + sumSubsDelayedChart = Chart{ + ID: "subscription_delayed", + Title: "Subscriptions Delayed for Dispatching", + Units: "message batches", + Fam: "ns summary", + Ctx: "pulsar.subscription_delayed", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarSubscriptionDelayed, Name: "delayed"}, + }, + } + sumSubsMsgRateRedeliverChart = Chart{ + ID: "subscription_msg_rate_redeliver", + Title: "Subscriptions Redelivered Message Rate", + Units: "messages/s", + Fam: "ns summary", + Ctx: "pulsar.subscription_msg_rate_redeliver", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarSubscriptionMsgRateRedeliver, Name: "redelivered", Div: 1000}, + }, + } + sumSubsBlockedOnUnackedMsgChart = Chart{ + ID: "subscription_blocked_on_unacked_messages", + Title: "Subscriptions Blocked On Unacked Messages", + Units: "subscriptions", + Fam: "ns summary", + Ctx: "pulsar.subscription_blocked_on_unacked_messages", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarSubscriptionBlockedOnUnackedMessages, Name: "blocked"}, + }, + } + sumReplicationRateChart = Chart{ + ID: "replication_rate", + Title: "Replication Rate", + Units: "messages/s", + Fam: "ns summary", + Ctx: "pulsar.replication_rate", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarReplicationRateIn, Name: "in", Div: 1000}, + {ID: metricPulsarReplicationRateOut, Name: "out", Mul: -1, Div: 1000}, + }, + } + sumReplicationThroughputRateChart = Chart{ + ID: "replication_throughput_rate", + Title: "Replication Throughput Rate", + Units: "KiB/s", + Fam: "ns summary", + Ctx: "pulsar.replication_throughput_rate", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarReplicationThroughputIn, Name: "in", Div: 1024 * 1000}, + {ID: metricPulsarReplicationThroughputOut, Name: "out", Mul: -1, Div: 1024 * 1000}, + }, + } + sumReplicationBacklogChart = Chart{ + ID: "replication_backlog", + Title: "Replication Backlog", + Units: "messages", + Fam: "ns summary", + Ctx: "pulsar.replication_backlog", + Opts: Opts{StoreFirst: true}, + Dims: Dims{ + {ID: metricPulsarReplicationBacklog, Name: "backlog"}, + }, + } +) + +var namespaceCharts = Charts{ + nsBrokerComponentsChart.Copy(), + topicProducersChart.Copy(), + topicSubscriptionsChart.Copy(), + topicConsumersChart.Copy(), + + nsMessagesRateChart.Copy(), + topicMessagesRateInChart.Copy(), + topicMessagesRateOutChart.Copy(), + nsThroughputRateCharts.Copy(), + topicThroughputRateInChart.Copy(), + topicThroughputRateOutChart.Copy(), + + nsStorageSizeChart.Copy(), + topicStorageSizeChart.Copy(), + nsStorageOperationsChart.Copy(), // optional + topicStorageReadRateChart.Copy(), // optional + topicStorageWriteRateChart.Copy(), // optional + nsMsgBacklogSizeChart.Copy(), + topicMsgBacklogSizeChart.Copy(), + nsStorageWriteLatencyChart.Copy(), + nsEntrySizeChart.Copy(), + + nsSubsDelayedChart.Copy(), + topicSubsDelayedChart.Copy(), + nsSubsMsgRateRedeliverChart.Copy(), // optional + topicSubsMsgRateRedeliverChart.Copy(), // optional + nsSubsBlockedOnUnackedMsgChart.Copy(), // optional + topicSubsBlockedOnUnackedMsgChart.Copy(), // optional + + nsReplicationRateChart.Copy(), // optional + topicReplicationRateInChart.Copy(), // optional + topicReplicationRateOutChart.Copy(), // optional + nsReplicationThroughputChart.Copy(), // optional + topicReplicationThroughputRateInChart.Copy(), // optional + topicReplicationThroughputRateOutChart.Copy(), // optional + nsReplicationBacklogChart.Copy(), // optional + topicReplicationBacklogChart.Copy(), // optional +} + +func toNamespaceChart(chart Chart) Chart { + chart = *chart.Copy() + if chart.ID == sumBrokerComponentsChart.ID { + _ = chart.RemoveDim("pulsar_namespaces_count") + } + chart.ID += "_namespace_%s" + chart.Fam = "ns %s" + if idx := strings.IndexByte(chart.Ctx, '.'); idx > 0 { + // pulsar.messages_rate => pulsar.namespace_messages_rate + chart.Ctx = chart.Ctx[:idx+1] + "namespace_" + chart.Ctx[idx+1:] + } + for _, dim := range chart.Dims { + dim.ID += "_%s" + } + return chart +} + +var ( + nsBrokerComponentsChart = toNamespaceChart(sumBrokerComponentsChart) + nsMessagesRateChart = toNamespaceChart(sumMessagesRateChart) + nsThroughputRateCharts = toNamespaceChart(sumThroughputRateChart) + nsStorageSizeChart = toNamespaceChart(sumStorageSizeChart) + nsStorageOperationsChart = toNamespaceChart(sumStorageOperationsRateChart) + nsMsgBacklogSizeChart = toNamespaceChart(sumMsgBacklogSizeChart) + nsStorageWriteLatencyChart = toNamespaceChart(sumStorageWriteLatencyChart) + nsEntrySizeChart = toNamespaceChart(sumEntrySizeChart) + nsSubsDelayedChart = toNamespaceChart(sumSubsDelayedChart) + nsSubsMsgRateRedeliverChart = toNamespaceChart(sumSubsMsgRateRedeliverChart) + nsSubsBlockedOnUnackedMsgChart = toNamespaceChart(sumSubsBlockedOnUnackedMsgChart) + nsReplicationRateChart = toNamespaceChart(sumReplicationRateChart) + nsReplicationThroughputChart = toNamespaceChart(sumReplicationThroughputRateChart) + nsReplicationBacklogChart = toNamespaceChart(sumReplicationBacklogChart) + + topicProducersChart = Chart{ + ID: "topic_producers_namespace_%s", + Title: "Topic Producers", + Units: "producers", + Fam: "ns %s", + Ctx: "pulsar.topic_producers", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicSubscriptionsChart = Chart{ + ID: "topic_subscriptions_namespace_%s", + Title: "Topic Subscriptions", + Units: "subscriptions", + Fam: "ns %s", + Ctx: "pulsar.topic_subscriptions", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicConsumersChart = Chart{ + ID: "topic_consumers_namespace_%s", + Title: "Topic Consumers", + Units: "consumers", + Fam: "ns %s", + Ctx: "pulsar.topic_consumers", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicMessagesRateInChart = Chart{ + ID: "topic_messages_rate_in_namespace_%s", + Title: "Topic Publish Messages Rate", + Units: "publishes/s", + Fam: "ns %s", + Ctx: "pulsar.topic_messages_rate_in", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicMessagesRateOutChart = Chart{ + ID: "topic_messages_rate_out_namespace_%s", + Title: "Topic Dispatch Messages Rate", + Units: "dispatches/s", + Fam: "ns %s", + Ctx: "pulsar.topic_messages_rate_out", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicThroughputRateInChart = Chart{ + ID: "topic_throughput_rate_in_namespace_%s", + Title: "Topic Publish Throughput Rate", + Units: "KiB/s", + Fam: "ns %s", + Ctx: "pulsar.topic_throughput_rate_in", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicThroughputRateOutChart = Chart{ + ID: "topic_throughput_rate_out_namespace_%s", + Title: "Topic Dispatch Throughput Rate", + Units: "KiB/s", + Fam: "ns %s", + Ctx: "pulsar.topic_throughput_rate_out", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicStorageSizeChart = Chart{ + ID: "topic_storage_size_namespace_%s", + Title: "Topic Storage Size", + Units: "KiB", + Fam: "ns %s", + Ctx: "pulsar.topic_storage_size", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicStorageReadRateChart = Chart{ + ID: "topic_storage_read_rate_namespace_%s", + Title: "Topic Storage Read Rate", + Units: "message batches/s", + Fam: "ns %s", + Ctx: "pulsar.topic_storage_read_rate", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicStorageWriteRateChart = Chart{ + ID: "topic_storage_write_rate_namespace_%s", + Title: "Topic Storage Write Rate", + Units: "message batches/s", + Fam: "ns %s", + Ctx: "pulsar.topic_storage_write_rate", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicMsgBacklogSizeChart = Chart{ + ID: "topic_msg_backlog_namespace_%s", + Title: "Topic Messages Backlog Size", + Units: "messages", + Fam: "ns %s", + Ctx: "pulsar.topic_msg_backlog", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicSubsDelayedChart = Chart{ + ID: "topic_subscription_delayed_namespace_%s", + Title: "Topic Subscriptions Delayed for Dispatching", + Units: "message batches", + Fam: "ns %s", + Ctx: "pulsar.topic_subscription_delayed", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicSubsMsgRateRedeliverChart = Chart{ + ID: "topic_subscription_msg_rate_redeliver_namespace_%s", + Title: "Topic Subscriptions Redelivered Message Rate", + Units: "messages/s", + Fam: "ns %s", + Ctx: "pulsar.topic_subscription_msg_rate_redeliver", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicSubsBlockedOnUnackedMsgChart = Chart{ + ID: "topic_subscription_blocked_on_unacked_messages_namespace_%s", + Title: "Topic Subscriptions Blocked On Unacked Messages", + Units: "blocked subscriptions", + Fam: "ns %s", + Ctx: "pulsar.topic_subscription_blocked_on_unacked_messages", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicReplicationRateInChart = Chart{ + ID: "topic_replication_rate_in_namespace_%s", + Title: "Topic Replication Rate From Remote Cluster", + Units: "messages/s", + Fam: "ns %s", + Ctx: "pulsar.topic_replication_rate_in", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicReplicationRateOutChart = Chart{ + ID: "replication_rate_out_namespace_%s", + Title: "Topic Replication Rate To Remote Cluster", + Units: "messages/s", + Fam: "ns %s", + Ctx: "pulsar.topic_replication_rate_out", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicReplicationThroughputRateInChart = Chart{ + ID: "topic_replication_throughput_rate_in_namespace_%s", + Title: "Topic Replication Throughput Rate From Remote Cluster", + Units: "KiB/s", + Fam: "ns %s", + Ctx: "pulsar.topic_replication_throughput_rate_in", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicReplicationThroughputRateOutChart = Chart{ + ID: "topic_replication_throughput_rate_out_namespace_%s", + Title: "Topic Replication Throughput Rate To Remote Cluster", + Units: "KiB/s", + Fam: "ns %s", + Ctx: "pulsar.topic_replication_throughput_rate_out", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } + topicReplicationBacklogChart = Chart{ + ID: "topic_replication_backlog_namespace_%s", + Title: "Topic Replication Backlog", + Units: "messages", + Fam: "ns %s", + Ctx: "pulsar.topic_replication_backlog", + Type: module.Stacked, + Opts: Opts{StoreFirst: true}, + } +) + +func (p *Pulsar) adjustCharts(pms prometheus.Series) { + if pms := pms.FindByName(metricPulsarStorageReadRate); pms.Len() == 0 || pms[0].Labels.Get("namespace") == "" { + p.removeSummaryChart(sumStorageOperationsRateChart.ID) + p.removeNamespaceChart(nsStorageOperationsChart.ID) + p.removeNamespaceChart(topicStorageReadRateChart.ID) + p.removeNamespaceChart(topicStorageWriteRateChart.ID) + delete(p.topicChartsMapping, topicStorageReadRateChart.ID) + delete(p.topicChartsMapping, topicStorageWriteRateChart.ID) + } + if pms.FindByName(metricPulsarSubscriptionMsgRateRedeliver).Len() == 0 { + p.removeSummaryChart(sumSubsMsgRateRedeliverChart.ID) + p.removeSummaryChart(sumSubsBlockedOnUnackedMsgChart.ID) + p.removeNamespaceChart(nsSubsMsgRateRedeliverChart.ID) + p.removeNamespaceChart(nsSubsBlockedOnUnackedMsgChart.ID) + p.removeNamespaceChart(topicSubsMsgRateRedeliverChart.ID) + p.removeNamespaceChart(topicSubsBlockedOnUnackedMsgChart.ID) + delete(p.topicChartsMapping, topicSubsMsgRateRedeliverChart.ID) + delete(p.topicChartsMapping, topicSubsBlockedOnUnackedMsgChart.ID) + } + if pms.FindByName(metricPulsarReplicationBacklog).Len() == 0 { + p.removeSummaryChart(sumReplicationRateChart.ID) + p.removeSummaryChart(sumReplicationThroughputRateChart.ID) + p.removeSummaryChart(sumReplicationBacklogChart.ID) + p.removeNamespaceChart(nsReplicationRateChart.ID) + p.removeNamespaceChart(nsReplicationThroughputChart.ID) + p.removeNamespaceChart(nsReplicationBacklogChart.ID) + p.removeNamespaceChart(topicReplicationRateInChart.ID) + p.removeNamespaceChart(topicReplicationRateOutChart.ID) + p.removeNamespaceChart(topicReplicationThroughputRateInChart.ID) + p.removeNamespaceChart(topicReplicationThroughputRateOutChart.ID) + p.removeNamespaceChart(topicReplicationBacklogChart.ID) + delete(p.topicChartsMapping, topicReplicationRateInChart.ID) + delete(p.topicChartsMapping, topicReplicationRateOutChart.ID) + delete(p.topicChartsMapping, topicReplicationThroughputRateInChart.ID) + delete(p.topicChartsMapping, topicReplicationThroughputRateOutChart.ID) + delete(p.topicChartsMapping, topicReplicationBacklogChart.ID) + } +} + +func (p *Pulsar) removeSummaryChart(chartID string) { + if err := p.Charts().Remove(chartID); err != nil { + p.Warning(err) + } +} + +func (p *Pulsar) removeNamespaceChart(chartID string) { + if err := p.nsCharts.Remove(chartID); err != nil { + p.Warning(err) + } +} + +func (p *Pulsar) updateCharts() { + // NOTE: order is important + for ns := range p.curCache.namespaces { + if !p.cache.namespaces[ns] { + p.cache.namespaces[ns] = true + p.addNamespaceCharts(ns) + } + } + for top := range p.curCache.topics { + if !p.cache.topics[top] { + p.cache.topics[top] = true + p.addTopicToCharts(top) + } + } + for top := range p.cache.topics { + if p.curCache.topics[top] { + continue + } + delete(p.cache.topics, top) + p.removeTopicFromCharts(top) + } + for ns := range p.cache.namespaces { + if p.curCache.namespaces[ns] { + continue + } + delete(p.cache.namespaces, ns) + p.removeNamespaceFromCharts(ns) + } +} + +func (p *Pulsar) addNamespaceCharts(ns namespace) { + charts := p.nsCharts.Copy() + for _, chart := range *charts { + chart.ID = fmt.Sprintf(chart.ID, ns.name) + chart.Fam = fmt.Sprintf(chart.Fam, ns.name) + for _, dim := range chart.Dims { + dim.ID = fmt.Sprintf(dim.ID, ns.name) + } + } + if err := p.Charts().Add(*charts...); err != nil { + p.Warning(err) + } +} + +func (p *Pulsar) removeNamespaceFromCharts(ns namespace) { + for _, chart := range *p.nsCharts { + id := fmt.Sprintf(chart.ID, ns.name) + if chart = p.Charts().Get(id); chart != nil { + chart.MarkRemove() + } else { + p.Warningf("could not remove namespace chart '%s'", id) + } + } +} + +func (p *Pulsar) addTopicToCharts(top topic) { + for id, metric := range p.topicChartsMapping { + id = fmt.Sprintf(id, top.namespace) + chart := p.Charts().Get(id) + if chart == nil { + p.Warningf("could not add topic '%s' to chart '%s': chart not found", top.name, id) + continue + } + + dim := Dim{ID: metric + "_" + top.name, Name: extractTopicName(top)} + switch metric { + case metricPulsarThroughputIn, + metricPulsarThroughputOut, + metricPulsarReplicationThroughputIn, + metricPulsarReplicationThroughputOut: + dim.Div = 1024 * 1000 + case metricPulsarRateIn, + metricPulsarRateOut, + metricPulsarStorageWriteRate, + metricPulsarStorageReadRate, + metricPulsarSubscriptionMsgRateRedeliver, + metricPulsarReplicationRateIn, + metricPulsarReplicationRateOut: + dim.Div = 1000 + case metricPulsarStorageSize: + dim.Div = 1024 + } + + if err := chart.AddDim(&dim); err != nil { + p.Warning(err) + } + chart.MarkNotCreated() + } +} + +func (p *Pulsar) removeTopicFromCharts(top topic) { + for id, metric := range p.topicChartsMapping { + id = fmt.Sprintf(id, top.namespace) + chart := p.Charts().Get(id) + if chart == nil { + p.Warningf("could not remove topic '%s' from chart '%s': chart not found", top.name, id) + continue + } + + if err := chart.MarkDimRemove(metric+"_"+top.name, true); err != nil { + p.Warning(err) + } + chart.MarkNotCreated() + } +} + +func topicChartsMapping() map[string]string { + return map[string]string{ + topicSubscriptionsChart.ID: metricPulsarSubscriptionsCount, + topicProducersChart.ID: metricPulsarProducersCount, + topicConsumersChart.ID: metricPulsarConsumersCount, + topicMessagesRateInChart.ID: metricPulsarRateIn, + topicMessagesRateOutChart.ID: metricPulsarRateOut, + topicThroughputRateInChart.ID: metricPulsarThroughputIn, + topicThroughputRateOutChart.ID: metricPulsarThroughputOut, + topicStorageSizeChart.ID: metricPulsarStorageSize, + topicStorageReadRateChart.ID: metricPulsarStorageReadRate, + topicStorageWriteRateChart.ID: metricPulsarStorageWriteRate, + topicMsgBacklogSizeChart.ID: metricPulsarMsgBacklog, + topicSubsDelayedChart.ID: metricPulsarSubscriptionDelayed, + topicSubsMsgRateRedeliverChart.ID: metricPulsarSubscriptionMsgRateRedeliver, + topicSubsBlockedOnUnackedMsgChart.ID: metricPulsarSubscriptionBlockedOnUnackedMessages, + topicReplicationRateInChart.ID: metricPulsarReplicationRateIn, + topicReplicationRateOutChart.ID: metricPulsarReplicationRateOut, + topicReplicationThroughputRateInChart.ID: metricPulsarReplicationThroughputIn, + topicReplicationThroughputRateOutChart.ID: metricPulsarReplicationThroughputOut, + topicReplicationBacklogChart.ID: metricPulsarReplicationBacklog, + } +} + +func extractTopicName(top topic) string { + // persistent://sample/ns1/demo-1 => p:demo-1 + if idx := strings.LastIndexByte(top.name, '/'); idx > 0 { + return top.name[:1] + ":" + top.name[idx+1:] + } + return top.name +} |