diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go | 179 |
1 files changed, 179 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go b/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go new file mode 100644 index 000000000..665dfdfc8 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/rabbitmq/collect.go @@ -0,0 +1,179 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package rabbitmq + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "path/filepath" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/stm" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" +) + +const ( + urlPathAPIOverview = "/api/overview" + urlPathAPINodes = "/api/nodes/" + urlPathAPIVhosts = "/api/vhosts" + urlPathAPIQueues = "/api/queues" +) + +// TODO: there is built-in prometheus collector since v3.8.0 (https://www.rabbitmq.com/prometheus.html). +// Should use it (in addition?), it is the recommended option according to the docs. +func (r *RabbitMQ) collect() (map[string]int64, error) { + mx := make(map[string]int64) + + if err := r.collectOverviewStats(mx); err != nil { + return nil, err + } + if err := r.collectNodeStats(mx); err != nil { + return mx, err + } + if err := r.collectVhostsStats(mx); err != nil { + return mx, err + } + if r.CollectQueues { + if err := r.collectQueuesStats(mx); err != nil { + return mx, err + } + } + + return mx, nil +} + +func (r *RabbitMQ) collectOverviewStats(mx map[string]int64) error { + var stats overviewStats + if err := r.doOKDecode(urlPathAPIOverview, &stats); err != nil { + return err + } + + if r.nodeName == "" { + r.nodeName = stats.Node + } + + for k, v := range stm.ToMap(stats) { + mx[k] = v + } + + return nil +} + +func (r *RabbitMQ) collectNodeStats(mx map[string]int64) error { + if r.nodeName == "" { + return nil + } + + var stats nodeStats + if err := r.doOKDecode(filepath.Join(urlPathAPINodes, r.nodeName), &stats); err != nil { + return err + } + + for k, v := range stm.ToMap(stats) { + mx[k] = v + } + mx["proc_available"] = int64(stats.ProcTotal - stats.ProcUsed) + + return nil +} + +func (r *RabbitMQ) collectVhostsStats(mx map[string]int64) error { + var stats []vhostStats + if err := r.doOKDecode(urlPathAPIVhosts, &stats); err != nil { + return err + } + + seen := make(map[string]bool) + + for _, vhost := range stats { + seen[vhost.Name] = true + for k, v := range stm.ToMap(vhost) { + mx[fmt.Sprintf("vhost_%s_%s", vhost.Name, k)] = v + } + } + + for name := range seen { + if !r.vhosts[name] { + r.vhosts[name] = true + r.Debugf("new vhost name='%s': creating charts", name) + r.addVhostCharts(name) + } + } + for name := range r.vhosts { + if !seen[name] { + delete(r.vhosts, name) + r.Debugf("stale vhost name='%s': removing charts", name) + r.removeVhostCharts(name) + } + } + + return nil +} + +func (r *RabbitMQ) collectQueuesStats(mx map[string]int64) error { + var stats []queueStats + if err := r.doOKDecode(urlPathAPIQueues, &stats); err != nil { + return err + } + + seen := make(map[string]queueCache) + + for _, queue := range stats { + seen[queue.Name+"|"+queue.Vhost] = queueCache{name: queue.Name, vhost: queue.Vhost} + for k, v := range stm.ToMap(queue) { + mx[fmt.Sprintf("queue_%s_vhost_%s_%s", queue.Name, queue.Vhost, k)] = v + } + } + + for key, queue := range seen { + if _, ok := r.queues[key]; !ok { + r.queues[key] = queue + r.Debugf("new queue name='%s', vhost='%s': creating charts", queue.name, queue.vhost) + r.addQueueCharts(queue.name, queue.vhost) + } + } + for key, queue := range r.queues { + if _, ok := seen[key]; !ok { + delete(r.queues, key) + r.Debugf("stale queue name='%s', vhost='%s': removing charts", queue.name, queue.vhost) + r.removeQueueCharts(queue.name, queue.vhost) + } + } + + return nil +} + +func (r *RabbitMQ) doOKDecode(urlPath string, in interface{}) error { + req, err := web.NewHTTPRequest(r.Request.Copy()) + if err != nil { + return fmt.Errorf("error on creating request: %v", err) + } + + req.URL.Path = urlPath + + r.Debugf("doing HTTP %s to '%s'", req.Method, req.URL) + resp, err := r.httpClient.Do(req) + if err != nil { + return fmt.Errorf("error on request to %s: %v", req.URL, err) + } + + defer closeBody(resp) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %d (%s)", req.URL, resp.StatusCode, resp.Status) + } + + if err = json.NewDecoder(resp.Body).Decode(&in); err != nil { + return fmt.Errorf("error on decoding response from %s: %v", req.URL, err) + } + + return nil +} + +func closeBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} |