summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go308
1 files changed, 308 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go b/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go
new file mode 100644
index 000000000..eb3ffb351
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go
@@ -0,0 +1,308 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package elasticsearch
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/stm"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/web"
+)
+
+const (
+ urlPathLocalNodeStats = "/_nodes/_local/stats"
+ urlPathNodesStats = "/_nodes/stats"
+ urlPathIndicesStats = "/_cat/indices"
+ urlPathClusterHealth = "/_cluster/health"
+ urlPathClusterStats = "/_cluster/stats"
+)
+
+func (es *Elasticsearch) collect() (map[string]int64, error) {
+ if es.clusterName == "" {
+ name, err := es.getClusterName()
+ if err != nil {
+ return nil, err
+ }
+ es.clusterName = name
+ }
+
+ ms := es.scrapeElasticsearch()
+ if ms.empty() {
+ return nil, nil
+ }
+
+ mx := make(map[string]int64)
+
+ es.collectNodesStats(mx, ms)
+ es.collectClusterHealth(mx, ms)
+ es.collectClusterStats(mx, ms)
+ es.collectLocalIndicesStats(mx, ms)
+
+ return mx, nil
+}
+
+func (es *Elasticsearch) collectNodesStats(mx map[string]int64, ms *esMetrics) {
+ if !ms.hasNodesStats() {
+ return
+ }
+
+ seen := make(map[string]bool)
+
+ for nodeID, node := range ms.NodesStats.Nodes {
+ seen[nodeID] = true
+
+ if !es.nodes[nodeID] {
+ es.nodes[nodeID] = true
+ es.addNodeCharts(nodeID, node)
+ }
+
+ merge(mx, stm.ToMap(node), "node_"+nodeID)
+ }
+
+ for nodeID := range es.nodes {
+ if !seen[nodeID] {
+ delete(es.nodes, nodeID)
+ es.removeNodeCharts(nodeID)
+ }
+ }
+}
+
+func (es *Elasticsearch) collectClusterHealth(mx map[string]int64, ms *esMetrics) {
+ if !ms.hasClusterHealth() {
+ return
+ }
+
+ es.addClusterHealthChartsOnce.Do(es.addClusterHealthCharts)
+
+ merge(mx, stm.ToMap(ms.ClusterHealth), "cluster")
+
+ mx["cluster_status_green"] = boolToInt(ms.ClusterHealth.Status == "green")
+ mx["cluster_status_yellow"] = boolToInt(ms.ClusterHealth.Status == "yellow")
+ mx["cluster_status_red"] = boolToInt(ms.ClusterHealth.Status == "red")
+}
+
+func (es *Elasticsearch) collectClusterStats(mx map[string]int64, ms *esMetrics) {
+ if !ms.hasClusterStats() {
+ return
+ }
+
+ es.addClusterStatsChartsOnce.Do(es.addClusterStatsCharts)
+
+ merge(mx, stm.ToMap(ms.ClusterStats), "cluster")
+}
+
+func (es *Elasticsearch) collectLocalIndicesStats(mx map[string]int64, ms *esMetrics) {
+ if !ms.hasLocalIndicesStats() {
+ return
+ }
+
+ seen := make(map[string]bool)
+
+ for _, v := range ms.LocalIndicesStats {
+ seen[v.Index] = true
+
+ if !es.indices[v.Index] {
+ es.indices[v.Index] = true
+ es.addIndexCharts(v.Index)
+ }
+
+ px := fmt.Sprintf("node_index_%s_stats_", v.Index)
+
+ mx[px+"health_green"] = boolToInt(v.Health == "green")
+ mx[px+"health_yellow"] = boolToInt(v.Health == "yellow")
+ mx[px+"health_red"] = boolToInt(v.Health == "red")
+ mx[px+"shards_count"] = strToInt(v.Rep)
+ mx[px+"docs_count"] = strToInt(v.DocsCount)
+ mx[px+"store_size_in_bytes"] = convertIndexStoreSizeToBytes(v.StoreSize)
+ }
+
+ for index := range es.indices {
+ if !seen[index] {
+ delete(es.indices, index)
+ es.removeIndexCharts(index)
+ }
+ }
+}
+
+func (es *Elasticsearch) scrapeElasticsearch() *esMetrics {
+ ms := &esMetrics{}
+ wg := &sync.WaitGroup{}
+
+ if es.DoNodeStats {
+ wg.Add(1)
+ go func() { defer wg.Done(); es.scrapeNodesStats(ms) }()
+ }
+ if es.DoClusterHealth {
+ wg.Add(1)
+ go func() { defer wg.Done(); es.scrapeClusterHealth(ms) }()
+ }
+ if es.DoClusterStats {
+ wg.Add(1)
+ go func() { defer wg.Done(); es.scrapeClusterStats(ms) }()
+ }
+ if !es.ClusterMode && es.DoIndicesStats {
+ wg.Add(1)
+ go func() { defer wg.Done(); es.scrapeLocalIndicesStats(ms) }()
+ }
+ wg.Wait()
+
+ return ms
+}
+
+func (es *Elasticsearch) scrapeNodesStats(ms *esMetrics) {
+ req, _ := web.NewHTTPRequest(es.Request)
+ if es.ClusterMode {
+ req.URL.Path = urlPathNodesStats
+ } else {
+ req.URL.Path = urlPathLocalNodeStats
+ }
+
+ var stats esNodesStats
+ if err := es.doOKDecode(req, &stats); err != nil {
+ es.Warning(err)
+ return
+ }
+
+ ms.NodesStats = &stats
+}
+
+func (es *Elasticsearch) scrapeClusterHealth(ms *esMetrics) {
+ req, _ := web.NewHTTPRequest(es.Request)
+ req.URL.Path = urlPathClusterHealth
+
+ var health esClusterHealth
+ if err := es.doOKDecode(req, &health); err != nil {
+ es.Warning(err)
+ return
+ }
+
+ ms.ClusterHealth = &health
+}
+
+func (es *Elasticsearch) scrapeClusterStats(ms *esMetrics) {
+ req, _ := web.NewHTTPRequest(es.Request)
+ req.URL.Path = urlPathClusterStats
+
+ var stats esClusterStats
+ if err := es.doOKDecode(req, &stats); err != nil {
+ es.Warning(err)
+ return
+ }
+
+ ms.ClusterStats = &stats
+}
+
+func (es *Elasticsearch) scrapeLocalIndicesStats(ms *esMetrics) {
+ req, _ := web.NewHTTPRequest(es.Request)
+ req.URL.Path = urlPathIndicesStats
+ req.URL.RawQuery = "local=true&format=json"
+
+ var stats []esIndexStats
+ if err := es.doOKDecode(req, &stats); err != nil {
+ es.Warning(err)
+ return
+ }
+
+ ms.LocalIndicesStats = removeSystemIndices(stats)
+}
+
+func (es *Elasticsearch) getClusterName() (string, error) {
+ req, _ := web.NewHTTPRequest(es.Request)
+
+ var info struct {
+ ClusterName string `json:"cluster_name"`
+ }
+
+ if err := es.doOKDecode(req, &info); err != nil {
+ return "", err
+ }
+
+ if info.ClusterName == "" {
+ return "", errors.New("empty cluster name")
+ }
+
+ return info.ClusterName, nil
+}
+
+func (es *Elasticsearch) doOKDecode(req *http.Request, in interface{}) error {
+ resp, err := es.httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("error on HTTP request '%s': %v", req.URL, err)
+ }
+ defer closeBody(resp)
+
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("'%s' returned HTTP status code: %d", req.URL, resp.StatusCode)
+ }
+
+ if err := json.NewDecoder(resp.Body).Decode(in); err != nil {
+ return fmt.Errorf("error on decoding response from '%s': %v", req.URL, err)
+ }
+ return nil
+}
+
+func closeBody(resp *http.Response) {
+ if resp != nil && resp.Body != nil {
+ _, _ = io.Copy(io.Discard, resp.Body)
+ _ = resp.Body.Close()
+ }
+}
+
+func convertIndexStoreSizeToBytes(size string) int64 {
+ var num float64
+ switch {
+ case strings.HasSuffix(size, "kb"):
+ num, _ = strconv.ParseFloat(size[:len(size)-2], 64)
+ num *= math.Pow(1024, 1)
+ case strings.HasSuffix(size, "mb"):
+ num, _ = strconv.ParseFloat(size[:len(size)-2], 64)
+ num *= math.Pow(1024, 2)
+ case strings.HasSuffix(size, "gb"):
+ num, _ = strconv.ParseFloat(size[:len(size)-2], 64)
+ num *= math.Pow(1024, 3)
+ case strings.HasSuffix(size, "tb"):
+ num, _ = strconv.ParseFloat(size[:len(size)-2], 64)
+ num *= math.Pow(1024, 4)
+ case strings.HasSuffix(size, "b"):
+ num, _ = strconv.ParseFloat(size[:len(size)-1], 64)
+ }
+ return int64(num)
+}
+
+func strToInt(s string) int64 {
+ v, _ := strconv.Atoi(s)
+ return int64(v)
+}
+
+func boolToInt(v bool) int64 {
+ if v {
+ return 1
+ }
+ return 0
+}
+
+func removeSystemIndices(indices []esIndexStats) []esIndexStats {
+ var i int
+ for _, index := range indices {
+ if strings.HasPrefix(index.Index, ".") {
+ continue
+ }
+ indices[i] = index
+ i++
+ }
+ return indices[:i]
+}
+
+func merge(dst, src map[string]int64, prefix string) {
+ for k, v := range src {
+ dst[prefix+"_"+k] = v
+ }
+}