summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/cassandra/collect.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/cassandra/collect.go403
1 files changed, 403 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/cassandra/collect.go b/src/go/collectors/go.d.plugin/modules/cassandra/collect.go
new file mode 100644
index 000000000..511aac642
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/cassandra/collect.go
@@ -0,0 +1,403 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package cassandra
+
+import (
+ "errors"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
+ "strings"
+)
+
+const (
+ suffixCount = "_count"
+ suffixValue = "_value"
+)
+
+func (c *Cassandra) collect() (map[string]int64, error) {
+ pms, err := c.prom.ScrapeSeries()
+ if err != nil {
+ return nil, err
+ }
+
+ if c.validateMetrics {
+ if !isCassandraMetrics(pms) {
+ return nil, errors.New("collected metrics aren't Cassandra metrics")
+ }
+ c.validateMetrics = false
+ }
+
+ mx := make(map[string]int64)
+
+ c.resetMetrics()
+ c.collectMetrics(pms)
+ c.processMetric(mx)
+
+ return mx, nil
+}
+
+func (c *Cassandra) resetMetrics() {
+ cm := newCassandraMetrics()
+ for key, p := range c.mx.threadPools {
+ cm.threadPools[key] = &threadPoolMetrics{
+ name: p.name,
+ hasCharts: p.hasCharts,
+ }
+ }
+ c.mx = cm
+}
+
+func (c *Cassandra) processMetric(mx map[string]int64) {
+ c.mx.clientReqTotalLatencyReads.write(mx, "client_request_total_latency_reads")
+ c.mx.clientReqTotalLatencyWrites.write(mx, "client_request_total_latency_writes")
+ c.mx.clientReqLatencyReads.write(mx, "client_request_latency_reads")
+ c.mx.clientReqLatencyWrites.write(mx, "client_request_latency_writes")
+ c.mx.clientReqTimeoutsReads.write(mx, "client_request_timeouts_reads")
+ c.mx.clientReqTimeoutsWrites.write(mx, "client_request_timeouts_writes")
+ c.mx.clientReqUnavailablesReads.write(mx, "client_request_unavailables_reads")
+ c.mx.clientReqUnavailablesWrites.write(mx, "client_request_unavailables_writes")
+ c.mx.clientReqFailuresReads.write(mx, "client_request_failures_reads")
+ c.mx.clientReqFailuresWrites.write(mx, "client_request_failures_writes")
+
+ c.mx.clientReqReadLatencyP50.write(mx, "client_request_read_latency_p50")
+ c.mx.clientReqReadLatencyP75.write(mx, "client_request_read_latency_p75")
+ c.mx.clientReqReadLatencyP95.write(mx, "client_request_read_latency_p95")
+ c.mx.clientReqReadLatencyP98.write(mx, "client_request_read_latency_p98")
+ c.mx.clientReqReadLatencyP99.write(mx, "client_request_read_latency_p99")
+ c.mx.clientReqReadLatencyP999.write(mx, "client_request_read_latency_p999")
+ c.mx.clientReqWriteLatencyP50.write(mx, "client_request_write_latency_p50")
+ c.mx.clientReqWriteLatencyP75.write(mx, "client_request_write_latency_p75")
+ c.mx.clientReqWriteLatencyP95.write(mx, "client_request_write_latency_p95")
+ c.mx.clientReqWriteLatencyP98.write(mx, "client_request_write_latency_p98")
+ c.mx.clientReqWriteLatencyP99.write(mx, "client_request_write_latency_p99")
+ c.mx.clientReqWriteLatencyP999.write(mx, "client_request_write_latency_p999")
+
+ c.mx.rowCacheHits.write(mx, "row_cache_hits")
+ c.mx.rowCacheMisses.write(mx, "row_cache_misses")
+ c.mx.rowCacheSize.write(mx, "row_cache_size")
+ if c.mx.rowCacheHits.isSet && c.mx.rowCacheMisses.isSet {
+ if s := c.mx.rowCacheHits.value + c.mx.rowCacheMisses.value; s > 0 {
+ mx["row_cache_hit_ratio"] = int64((c.mx.rowCacheHits.value * 100 / s) * 1000)
+ } else {
+ mx["row_cache_hit_ratio"] = 0
+ }
+ }
+ if c.mx.rowCacheCapacity.isSet && c.mx.rowCacheSize.isSet {
+ if s := c.mx.rowCacheCapacity.value; s > 0 {
+ mx["row_cache_utilization"] = int64((c.mx.rowCacheSize.value * 100 / s) * 1000)
+ } else {
+ mx["row_cache_utilization"] = 0
+ }
+ }
+
+ c.mx.keyCacheHits.write(mx, "key_cache_hits")
+ c.mx.keyCacheMisses.write(mx, "key_cache_misses")
+ c.mx.keyCacheSize.write(mx, "key_cache_size")
+ if c.mx.keyCacheHits.isSet && c.mx.keyCacheMisses.isSet {
+ if s := c.mx.keyCacheHits.value + c.mx.keyCacheMisses.value; s > 0 {
+ mx["key_cache_hit_ratio"] = int64((c.mx.keyCacheHits.value * 100 / s) * 1000)
+ } else {
+ mx["key_cache_hit_ratio"] = 0
+ }
+ }
+ if c.mx.keyCacheCapacity.isSet && c.mx.keyCacheSize.isSet {
+ if s := c.mx.keyCacheCapacity.value; s > 0 {
+ mx["key_cache_utilization"] = int64((c.mx.keyCacheSize.value * 100 / s) * 1000)
+ } else {
+ mx["key_cache_utilization"] = 0
+ }
+ }
+
+ c.mx.droppedMessages.write1k(mx, "dropped_messages")
+
+ c.mx.storageLoad.write(mx, "storage_load")
+ c.mx.storageExceptions.write(mx, "storage_exceptions")
+
+ c.mx.compactionBytesCompacted.write(mx, "compaction_bytes_compacted")
+ c.mx.compactionPendingTasks.write(mx, "compaction_pending_tasks")
+ c.mx.compactionCompletedTasks.write(mx, "compaction_completed_tasks")
+
+ c.mx.jvmMemoryHeapUsed.write(mx, "jvm_memory_heap_used")
+ c.mx.jvmMemoryNonHeapUsed.write(mx, "jvm_memory_nonheap_used")
+ c.mx.jvmGCParNewCount.write(mx, "jvm_gc_parnew_count")
+ c.mx.jvmGCParNewTime.write1k(mx, "jvm_gc_parnew_time")
+ c.mx.jvmGCCMSCount.write(mx, "jvm_gc_cms_count")
+ c.mx.jvmGCCMSTime.write1k(mx, "jvm_gc_cms_time")
+
+ for _, p := range c.mx.threadPools {
+ if !p.hasCharts {
+ p.hasCharts = true
+ c.addThreadPoolCharts(p)
+ }
+
+ px := "thread_pool_" + p.name + "_"
+ p.activeTasks.write(mx, px+"active_tasks")
+ p.pendingTasks.write(mx, px+"pending_tasks")
+ p.blockedTasks.write(mx, px+"blocked_tasks")
+ p.totalBlockedTasks.write(mx, px+"total_blocked_tasks")
+ }
+}
+
+func (c *Cassandra) collectMetrics(pms prometheus.Series) {
+ c.collectClientRequestMetrics(pms)
+ c.collectDroppedMessagesMetrics(pms)
+ c.collectThreadPoolsMetrics(pms)
+ c.collectStorageMetrics(pms)
+ c.collectCacheMetrics(pms)
+ c.collectJVMMetrics(pms)
+ c.collectCompactionMetrics(pms)
+}
+
+func (c *Cassandra) collectClientRequestMetrics(pms prometheus.Series) {
+ const metric = "org_apache_cassandra_metrics_clientrequest"
+
+ var rw struct{ read, write *metricValue }
+ for _, pm := range pms.FindByName(metric + suffixCount) {
+ name := pm.Labels.Get("name")
+ scope := pm.Labels.Get("scope")
+
+ switch name {
+ case "TotalLatency":
+ rw.read, rw.write = &c.mx.clientReqTotalLatencyReads, &c.mx.clientReqTotalLatencyWrites
+ case "Latency":
+ rw.read, rw.write = &c.mx.clientReqLatencyReads, &c.mx.clientReqLatencyWrites
+ case "Timeouts":
+ rw.read, rw.write = &c.mx.clientReqTimeoutsReads, &c.mx.clientReqTimeoutsWrites
+ case "Unavailables":
+ rw.read, rw.write = &c.mx.clientReqUnavailablesReads, &c.mx.clientReqUnavailablesWrites
+ case "Failures":
+ rw.read, rw.write = &c.mx.clientReqFailuresReads, &c.mx.clientReqFailuresWrites
+ default:
+ continue
+ }
+
+ switch scope {
+ case "Read":
+ rw.read.add(pm.Value)
+ case "Write":
+ rw.write.add(pm.Value)
+ }
+ }
+
+ rw = struct{ read, write *metricValue }{}
+
+ for _, pm := range pms.FindByNames(
+ metric+"_50thpercentile",
+ metric+"_75thpercentile",
+ metric+"_95thpercentile",
+ metric+"_98thpercentile",
+ metric+"_99thpercentile",
+ metric+"_999thpercentile",
+ ) {
+ name := pm.Labels.Get("name")
+ scope := pm.Labels.Get("scope")
+
+ if name != "Latency" {
+ continue
+ }
+
+ switch {
+ case strings.HasSuffix(pm.Name(), "_50thpercentile"):
+ rw.read, rw.write = &c.mx.clientReqReadLatencyP50, &c.mx.clientReqWriteLatencyP50
+ case strings.HasSuffix(pm.Name(), "_75thpercentile"):
+ rw.read, rw.write = &c.mx.clientReqReadLatencyP75, &c.mx.clientReqWriteLatencyP75
+ case strings.HasSuffix(pm.Name(), "_95thpercentile"):
+ rw.read, rw.write = &c.mx.clientReqReadLatencyP95, &c.mx.clientReqWriteLatencyP95
+ case strings.HasSuffix(pm.Name(), "_98thpercentile"):
+ rw.read, rw.write = &c.mx.clientReqReadLatencyP98, &c.mx.clientReqWriteLatencyP98
+ case strings.HasSuffix(pm.Name(), "_99thpercentile"):
+ rw.read, rw.write = &c.mx.clientReqReadLatencyP99, &c.mx.clientReqWriteLatencyP99
+ case strings.HasSuffix(pm.Name(), "_999thpercentile"):
+ rw.read, rw.write = &c.mx.clientReqReadLatencyP999, &c.mx.clientReqWriteLatencyP999
+ default:
+ continue
+ }
+
+ switch scope {
+ case "Read":
+ rw.read.add(pm.Value)
+ case "Write":
+ rw.write.add(pm.Value)
+ }
+ }
+}
+
+func (c *Cassandra) collectCacheMetrics(pms prometheus.Series) {
+ const metric = "org_apache_cassandra_metrics_cache"
+
+ var hm struct{ hits, misses *metricValue }
+ for _, pm := range pms.FindByName(metric + suffixCount) {
+ name := pm.Labels.Get("name")
+ scope := pm.Labels.Get("scope")
+
+ switch scope {
+ case "KeyCache":
+ hm.hits, hm.misses = &c.mx.keyCacheHits, &c.mx.keyCacheMisses
+ case "RowCache":
+ hm.hits, hm.misses = &c.mx.rowCacheHits, &c.mx.rowCacheMisses
+ default:
+ continue
+ }
+
+ switch name {
+ case "Hits":
+ hm.hits.add(pm.Value)
+ case "Misses":
+ hm.misses.add(pm.Value)
+ }
+ }
+
+ var cs struct{ cap, size *metricValue }
+ for _, pm := range pms.FindByName(metric + suffixValue) {
+ name := pm.Labels.Get("name")
+ scope := pm.Labels.Get("scope")
+
+ switch scope {
+ case "KeyCache":
+ cs.cap, cs.size = &c.mx.keyCacheCapacity, &c.mx.keyCacheSize
+ case "RowCache":
+ cs.cap, cs.size = &c.mx.rowCacheCapacity, &c.mx.rowCacheSize
+ default:
+ continue
+ }
+
+ switch name {
+ case "Capacity":
+ cs.cap.add(pm.Value)
+ case "Size":
+ cs.size.add(pm.Value)
+ }
+ }
+}
+
+func (c *Cassandra) collectThreadPoolsMetrics(pms prometheus.Series) {
+ const metric = "org_apache_cassandra_metrics_threadpools"
+
+ for _, pm := range pms.FindByName(metric + suffixValue) {
+ name := pm.Labels.Get("name")
+ scope := pm.Labels.Get("scope")
+ pool := c.getThreadPoolMetrics(scope)
+
+ switch name {
+ case "ActiveTasks":
+ pool.activeTasks.add(pm.Value)
+ case "PendingTasks":
+ pool.pendingTasks.add(pm.Value)
+ }
+ }
+ for _, pm := range pms.FindByName(metric + suffixCount) {
+ name := pm.Labels.Get("name")
+ scope := pm.Labels.Get("scope")
+ pool := c.getThreadPoolMetrics(scope)
+
+ switch name {
+ case "CompletedTasks":
+ pool.totalBlockedTasks.add(pm.Value)
+ case "TotalBlockedTasks":
+ pool.totalBlockedTasks.add(pm.Value)
+ case "CurrentlyBlockedTasks":
+ pool.blockedTasks.add(pm.Value)
+ }
+ }
+}
+
+func (c *Cassandra) collectStorageMetrics(pms prometheus.Series) {
+ const metric = "org_apache_cassandra_metrics_storage"
+
+ for _, pm := range pms.FindByName(metric + suffixCount) {
+ name := pm.Labels.Get("name")
+
+ switch name {
+ case "Load":
+ c.mx.storageLoad.add(pm.Value)
+ case "Exceptions":
+ c.mx.storageExceptions.add(pm.Value)
+ }
+ }
+}
+
+func (c *Cassandra) collectDroppedMessagesMetrics(pms prometheus.Series) {
+ const metric = "org_apache_cassandra_metrics_droppedmessage"
+
+ for _, pm := range pms.FindByName(metric + suffixCount) {
+ c.mx.droppedMessages.add(pm.Value)
+ }
+}
+
+func (c *Cassandra) collectJVMMetrics(pms prometheus.Series) {
+ const metricMemUsed = "jvm_memory_bytes_used"
+ const metricGC = "jvm_gc_collection_seconds"
+
+ for _, pm := range pms.FindByName(metricMemUsed) {
+ area := pm.Labels.Get("area")
+
+ switch area {
+ case "heap":
+ c.mx.jvmMemoryHeapUsed.add(pm.Value)
+ case "nonheap":
+ c.mx.jvmMemoryNonHeapUsed.add(pm.Value)
+ }
+ }
+
+ for _, pm := range pms.FindByName(metricGC + suffixCount) {
+ gc := pm.Labels.Get("gc")
+
+ switch gc {
+ case "ParNew":
+ c.mx.jvmGCParNewCount.add(pm.Value)
+ case "ConcurrentMarkSweep":
+ c.mx.jvmGCCMSCount.add(pm.Value)
+ }
+ }
+
+ for _, pm := range pms.FindByName(metricGC + "_sum") {
+ gc := pm.Labels.Get("gc")
+
+ switch gc {
+ case "ParNew":
+ c.mx.jvmGCParNewTime.add(pm.Value)
+ case "ConcurrentMarkSweep":
+ c.mx.jvmGCCMSTime.add(pm.Value)
+ }
+ }
+}
+
+func (c *Cassandra) collectCompactionMetrics(pms prometheus.Series) {
+ const metric = "org_apache_cassandra_metrics_compaction"
+
+ for _, pm := range pms.FindByName(metric + suffixValue) {
+ name := pm.Labels.Get("name")
+
+ switch name {
+ case "CompletedTasks":
+ c.mx.compactionCompletedTasks.add(pm.Value)
+ case "PendingTasks":
+ c.mx.compactionPendingTasks.add(pm.Value)
+ }
+ }
+ for _, pm := range pms.FindByName(metric + suffixCount) {
+ name := pm.Labels.Get("name")
+
+ switch name {
+ case "BytesCompacted":
+ c.mx.compactionBytesCompacted.add(pm.Value)
+ }
+ }
+}
+
+func (c *Cassandra) getThreadPoolMetrics(name string) *threadPoolMetrics {
+ pool, ok := c.mx.threadPools[name]
+ if !ok {
+ pool = &threadPoolMetrics{name: name}
+ c.mx.threadPools[name] = pool
+ }
+ return pool
+}
+
+func isCassandraMetrics(pms prometheus.Series) bool {
+ for _, pm := range pms {
+ if strings.HasPrefix(pm.Name(), "org_apache_cassandra_metrics") {
+ return true
+ }
+ }
+ return false
+}