summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go179
1 files changed, 179 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go b/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go
new file mode 100644
index 000000000..665dfdfc8
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go
@@ -0,0 +1,179 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package rabbitmq
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "path/filepath"
+
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/stm"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/web"
+)
+
+const (
+ urlPathAPIOverview = "/api/overview"
+ urlPathAPINodes = "/api/nodes/"
+ urlPathAPIVhosts = "/api/vhosts"
+ urlPathAPIQueues = "/api/queues"
+)
+
+// TODO: there is built-in prometheus collector since v3.8.0 (https://www.rabbitmq.com/prometheus.html).
+// Should use it (in addition?), it is the recommended option according to the docs.
+func (r *RabbitMQ) collect() (map[string]int64, error) {
+ mx := make(map[string]int64)
+
+ if err := r.collectOverviewStats(mx); err != nil {
+ return nil, err
+ }
+ if err := r.collectNodeStats(mx); err != nil {
+ return mx, err
+ }
+ if err := r.collectVhostsStats(mx); err != nil {
+ return mx, err
+ }
+ if r.CollectQueues {
+ if err := r.collectQueuesStats(mx); err != nil {
+ return mx, err
+ }
+ }
+
+ return mx, nil
+}
+
+func (r *RabbitMQ) collectOverviewStats(mx map[string]int64) error {
+ var stats overviewStats
+ if err := r.doOKDecode(urlPathAPIOverview, &stats); err != nil {
+ return err
+ }
+
+ if r.nodeName == "" {
+ r.nodeName = stats.Node
+ }
+
+ for k, v := range stm.ToMap(stats) {
+ mx[k] = v
+ }
+
+ return nil
+}
+
+func (r *RabbitMQ) collectNodeStats(mx map[string]int64) error {
+ if r.nodeName == "" {
+ return nil
+ }
+
+ var stats nodeStats
+ if err := r.doOKDecode(filepath.Join(urlPathAPINodes, r.nodeName), &stats); err != nil {
+ return err
+ }
+
+ for k, v := range stm.ToMap(stats) {
+ mx[k] = v
+ }
+ mx["proc_available"] = int64(stats.ProcTotal - stats.ProcUsed)
+
+ return nil
+}
+
+func (r *RabbitMQ) collectVhostsStats(mx map[string]int64) error {
+ var stats []vhostStats
+ if err := r.doOKDecode(urlPathAPIVhosts, &stats); err != nil {
+ return err
+ }
+
+ seen := make(map[string]bool)
+
+ for _, vhost := range stats {
+ seen[vhost.Name] = true
+ for k, v := range stm.ToMap(vhost) {
+ mx[fmt.Sprintf("vhost_%s_%s", vhost.Name, k)] = v
+ }
+ }
+
+ for name := range seen {
+ if !r.vhosts[name] {
+ r.vhosts[name] = true
+ r.Debugf("new vhost name='%s': creating charts", name)
+ r.addVhostCharts(name)
+ }
+ }
+ for name := range r.vhosts {
+ if !seen[name] {
+ delete(r.vhosts, name)
+ r.Debugf("stale vhost name='%s': removing charts", name)
+ r.removeVhostCharts(name)
+ }
+ }
+
+ return nil
+}
+
+func (r *RabbitMQ) collectQueuesStats(mx map[string]int64) error {
+ var stats []queueStats
+ if err := r.doOKDecode(urlPathAPIQueues, &stats); err != nil {
+ return err
+ }
+
+ seen := make(map[string]queueCache)
+
+ for _, queue := range stats {
+ seen[queue.Name+"|"+queue.Vhost] = queueCache{name: queue.Name, vhost: queue.Vhost}
+ for k, v := range stm.ToMap(queue) {
+ mx[fmt.Sprintf("queue_%s_vhost_%s_%s", queue.Name, queue.Vhost, k)] = v
+ }
+ }
+
+ for key, queue := range seen {
+ if _, ok := r.queues[key]; !ok {
+ r.queues[key] = queue
+ r.Debugf("new queue name='%s', vhost='%s': creating charts", queue.name, queue.vhost)
+ r.addQueueCharts(queue.name, queue.vhost)
+ }
+ }
+ for key, queue := range r.queues {
+ if _, ok := seen[key]; !ok {
+ delete(r.queues, key)
+ r.Debugf("stale queue name='%s', vhost='%s': removing charts", queue.name, queue.vhost)
+ r.removeQueueCharts(queue.name, queue.vhost)
+ }
+ }
+
+ return nil
+}
+
+func (r *RabbitMQ) doOKDecode(urlPath string, in interface{}) error {
+ req, err := web.NewHTTPRequest(r.Request.Copy())
+ if err != nil {
+ return fmt.Errorf("error on creating request: %v", err)
+ }
+
+ req.URL.Path = urlPath
+
+ r.Debugf("doing HTTP %s to '%s'", req.Method, req.URL)
+ resp, err := r.httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("error on request to %s: %v", req.URL, err)
+ }
+
+ defer closeBody(resp)
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s returned HTTP status %d (%s)", req.URL, resp.StatusCode, resp.Status)
+ }
+
+ if err = json.NewDecoder(resp.Body).Decode(&in); err != nil {
+ return fmt.Errorf("error on decoding response from %s: %v", req.URL, err)
+ }
+
+ return nil
+}
+
+func closeBody(resp *http.Response) {
+ if resp != nil && resp.Body != nil {
+ _, _ = io.Copy(io.Discard, resp.Body)
+ _ = resp.Body.Close()
+ }
+}