summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/vernemq/collect.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/vernemq/collect.go288
1 files changed, 288 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/vernemq/collect.go b/src/go/collectors/go.d.plugin/modules/vernemq/collect.go
new file mode 100644
index 000000000..4ec6a1bf2
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/vernemq/collect.go
@@ -0,0 +1,288 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package vernemq
+
+import (
+ "errors"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/prometheus"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/stm"
+)
+
+func isValidVerneMQMetrics(pms prometheus.Series) bool {
+ return pms.FindByName(metricPUBLISHError).Len() > 0 && pms.FindByName(metricRouterSubscriptions).Len() > 0
+}
+
+func (v *VerneMQ) collect() (map[string]int64, error) {
+ pms, err := v.prom.ScrapeSeries()
+ if err != nil {
+ return nil, err
+ }
+
+ if !isValidVerneMQMetrics(pms) {
+ return nil, errors.New("returned metrics aren't VerneMQ metrics")
+ }
+
+ mx := v.collectVerneMQ(pms)
+
+ return stm.ToMap(mx), nil
+}
+
+func (v *VerneMQ) collectVerneMQ(pms prometheus.Series) map[string]float64 {
+ mx := make(map[string]float64)
+ collectSockets(mx, pms)
+ collectQueues(mx, pms)
+ collectSubscriptions(mx, pms)
+ v.collectErlangVM(mx, pms)
+ collectBandwidth(mx, pms)
+ collectRetain(mx, pms)
+ collectCluster(mx, pms)
+ collectUptime(mx, pms)
+
+ v.collectAUTH(mx, pms)
+ v.collectCONNECT(mx, pms)
+ v.collectDISCONNECT(mx, pms)
+ v.collectSUBSCRIBE(mx, pms)
+ v.collectUNSUBSCRIBE(mx, pms)
+ v.collectPUBLISH(mx, pms)
+ v.collectPING(mx, pms)
+ v.collectMQTTInvalidMsgSize(mx, pms)
+ return mx
+}
+
+func (v *VerneMQ) collectCONNECT(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricCONNECTReceived,
+ metricCONNACKSent,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectDISCONNECT(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricDISCONNECTReceived,
+ metricDISCONNECTSent,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectPUBLISH(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricPUBACKReceived,
+ metricPUBACKSent,
+ metricPUBACKInvalid,
+
+ metricPUBCOMPReceived,
+ metricPUBCOMPSent,
+ metricPUNCOMPInvalid,
+
+ metricPUBSLISHReceived,
+ metricPUBSLIHSent,
+ metricPUBLISHError,
+ metricPUBLISHAuthError,
+
+ metricPUBRECReceived,
+ metricPUBRECSent,
+ metricPUBRECInvalid,
+
+ metricPUBRELReceived,
+ metricPUBRELSent,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectSUBSCRIBE(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricSUBSCRIBEReceived,
+ metricSUBACKSent,
+ metricSUBSCRIBEError,
+ metricSUBSCRIBEAuthError,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectUNSUBSCRIBE(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricUNSUBSCRIBEReceived,
+ metricUNSUBACKSent,
+ metricUNSUBSCRIBEError,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectPING(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricPINGREQReceived,
+ metricPINGRESPSent,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectAUTH(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricAUTHReceived,
+ metricAUTHSent,
+ )
+ v.collectMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectMQTTInvalidMsgSize(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByName(metricMQTTInvalidMsgSizeError)
+ v.collectMQTT(mx, pms)
+}
+
+func collectSockets(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricSocketClose,
+ metricSocketCloseTimeout,
+ metricSocketError,
+ metricSocketOpen,
+ metricClientKeepaliveExpired,
+ )
+ collectNonMQTT(mx, pms)
+ mx["open_sockets"] = mx[metricSocketOpen] - mx[metricSocketClose]
+}
+
+func collectQueues(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricQueueInitializedFromStorage,
+ metricQueueMessageDrop,
+ metricQueueMessageExpired,
+ metricQueueMessageIn,
+ metricQueueMessageOut,
+ metricQueueMessageUnhandled,
+ metricQueueProcesses,
+ metricQueueSetup,
+ metricQueueTeardown,
+ )
+ collectNonMQTT(mx, pms)
+}
+
+func collectSubscriptions(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricRouterMatchesLocal,
+ metricRouterMatchesRemote,
+ metricRouterMemory,
+ metricRouterSubscriptions,
+ )
+ collectNonMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectErlangVM(mx map[string]float64, pms prometheus.Series) {
+ v.collectSchedulersUtilization(mx, pms)
+ pms = pms.FindByNames(
+ metricSystemContextSwitches,
+ metricSystemGCCount,
+ metricSystemIOIn,
+ metricSystemIOOut,
+ metricSystemProcessCount,
+ metricSystemReductions,
+ metricSystemRunQueue,
+ metricSystemUtilization,
+ metricSystemWordsReclaimedByGC,
+ metricVMMemoryProcesses,
+ metricVMMemorySystem,
+ )
+ collectNonMQTT(mx, pms)
+}
+
+func (v *VerneMQ) collectSchedulersUtilization(mx map[string]float64, pms prometheus.Series) {
+ for _, pm := range pms {
+ if isSchedulerUtilizationMetric(pm) {
+ mx[pm.Name()] += pm.Value
+ v.notifyNewScheduler(pm.Name())
+ }
+ }
+}
+
+func collectBandwidth(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricBytesReceived,
+ metricBytesSent,
+ )
+ collectNonMQTT(mx, pms)
+}
+
+func collectRetain(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricRetainMemory,
+ metricRetainMessages,
+ )
+ collectNonMQTT(mx, pms)
+}
+
+func collectCluster(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByNames(
+ metricClusterBytesDropped,
+ metricClusterBytesReceived,
+ metricClusterBytesSent,
+ metricNetSplitDetected,
+ metricNetSplitResolved,
+ )
+ collectNonMQTT(mx, pms)
+ mx["netsplit_unresolved"] = mx[metricNetSplitDetected] - mx[metricNetSplitResolved]
+}
+
+func collectUptime(mx map[string]float64, pms prometheus.Series) {
+ pms = pms.FindByName(metricSystemWallClock)
+ collectNonMQTT(mx, pms)
+}
+
+func collectNonMQTT(mx map[string]float64, pms prometheus.Series) {
+ for _, pm := range pms {
+ mx[pm.Name()] += pm.Value
+ }
+}
+
+func (v *VerneMQ) collectMQTT(mx map[string]float64, pms prometheus.Series) {
+ for _, pm := range pms {
+ if !isMQTTMetric(pm) {
+ continue
+ }
+ version := versionLabelValue(pm)
+ if version == "" {
+ continue
+ }
+
+ mx[pm.Name()] += pm.Value
+ mx[join(pm.Name(), "v", version)] += pm.Value
+
+ if reason := reasonCodeLabelValue(pm); reason != "" {
+ mx[join(pm.Name(), reason)] += pm.Value
+ mx[join(pm.Name(), "v", version, reason)] += pm.Value
+
+ v.notifyNewReason(pm.Name(), reason)
+ }
+ }
+}
+
+func isMQTTMetric(pm prometheus.SeriesSample) bool {
+ return strings.HasPrefix(pm.Name(), "mqtt_")
+}
+
+func isSchedulerUtilizationMetric(pm prometheus.SeriesSample) bool {
+ return strings.HasPrefix(pm.Name(), "system_utilization_scheduler_")
+}
+
+func reasonCodeLabelValue(pm prometheus.SeriesSample) string {
+ if v := pm.Labels.Get("reason_code"); v != "" {
+ return v
+ }
+ // "mqtt_connack_sent" v4 has return_code
+ return pm.Labels.Get("return_code")
+}
+
+func versionLabelValue(pm prometheus.SeriesSample) string {
+ return pm.Labels.Get("mqtt_version")
+}
+
+func join(a, b string, rest ...string) string {
+ v := a + "_" + b
+ switch len(rest) {
+ case 0:
+ return v
+ default:
+ return join(v, rest[0], rest[1:]...)
+ }
+}