diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go b/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go new file mode 100644 index 000000000..eb3ffb351 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/elasticsearch/collect.go @@ -0,0 +1,308 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package elasticsearch + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "math" + "net/http" + "strconv" + "strings" + "sync" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/stm" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" +) + +const ( + urlPathLocalNodeStats = "/_nodes/_local/stats" + urlPathNodesStats = "/_nodes/stats" + urlPathIndicesStats = "/_cat/indices" + urlPathClusterHealth = "/_cluster/health" + urlPathClusterStats = "/_cluster/stats" +) + +func (es *Elasticsearch) collect() (map[string]int64, error) { + if es.clusterName == "" { + name, err := es.getClusterName() + if err != nil { + return nil, err + } + es.clusterName = name + } + + ms := es.scrapeElasticsearch() + if ms.empty() { + return nil, nil + } + + mx := make(map[string]int64) + + es.collectNodesStats(mx, ms) + es.collectClusterHealth(mx, ms) + es.collectClusterStats(mx, ms) + es.collectLocalIndicesStats(mx, ms) + + return mx, nil +} + +func (es *Elasticsearch) collectNodesStats(mx map[string]int64, ms *esMetrics) { + if !ms.hasNodesStats() { + return + } + + seen := make(map[string]bool) + + for nodeID, node := range ms.NodesStats.Nodes { + seen[nodeID] = true + + if !es.nodes[nodeID] { + es.nodes[nodeID] = true + es.addNodeCharts(nodeID, node) + } + + merge(mx, stm.ToMap(node), "node_"+nodeID) + } + + for nodeID := range es.nodes { + if !seen[nodeID] { + delete(es.nodes, nodeID) + es.removeNodeCharts(nodeID) + } + } +} + +func (es *Elasticsearch) collectClusterHealth(mx map[string]int64, ms *esMetrics) { + if !ms.hasClusterHealth() { + return + } + + es.addClusterHealthChartsOnce.Do(es.addClusterHealthCharts) + + merge(mx, stm.ToMap(ms.ClusterHealth), "cluster") + + mx["cluster_status_green"] = boolToInt(ms.ClusterHealth.Status == "green") + mx["cluster_status_yellow"] = boolToInt(ms.ClusterHealth.Status == "yellow") + mx["cluster_status_red"] = boolToInt(ms.ClusterHealth.Status == "red") +} + +func (es *Elasticsearch) collectClusterStats(mx map[string]int64, ms *esMetrics) { + if !ms.hasClusterStats() { + return + } + + es.addClusterStatsChartsOnce.Do(es.addClusterStatsCharts) + + merge(mx, stm.ToMap(ms.ClusterStats), "cluster") +} + +func (es *Elasticsearch) collectLocalIndicesStats(mx map[string]int64, ms *esMetrics) { + if !ms.hasLocalIndicesStats() { + return + } + + seen := make(map[string]bool) + + for _, v := range ms.LocalIndicesStats { + seen[v.Index] = true + + if !es.indices[v.Index] { + es.indices[v.Index] = true + es.addIndexCharts(v.Index) + } + + px := fmt.Sprintf("node_index_%s_stats_", v.Index) + + mx[px+"health_green"] = boolToInt(v.Health == "green") + mx[px+"health_yellow"] = boolToInt(v.Health == "yellow") + mx[px+"health_red"] = boolToInt(v.Health == "red") + mx[px+"shards_count"] = strToInt(v.Rep) + mx[px+"docs_count"] = strToInt(v.DocsCount) + mx[px+"store_size_in_bytes"] = convertIndexStoreSizeToBytes(v.StoreSize) + } + + for index := range es.indices { + if !seen[index] { + delete(es.indices, index) + es.removeIndexCharts(index) + } + } +} + +func (es *Elasticsearch) scrapeElasticsearch() *esMetrics { + ms := &esMetrics{} + wg := &sync.WaitGroup{} + + if es.DoNodeStats { + wg.Add(1) + go func() { defer wg.Done(); es.scrapeNodesStats(ms) }() + } + if es.DoClusterHealth { + wg.Add(1) + go func() { defer wg.Done(); es.scrapeClusterHealth(ms) }() + } + if es.DoClusterStats { + wg.Add(1) + go func() { defer wg.Done(); es.scrapeClusterStats(ms) }() + } + if !es.ClusterMode && es.DoIndicesStats { + wg.Add(1) + go func() { defer wg.Done(); es.scrapeLocalIndicesStats(ms) }() + } + wg.Wait() + + return ms +} + +func (es *Elasticsearch) scrapeNodesStats(ms *esMetrics) { + req, _ := web.NewHTTPRequest(es.Request) + if es.ClusterMode { + req.URL.Path = urlPathNodesStats + } else { + req.URL.Path = urlPathLocalNodeStats + } + + var stats esNodesStats + if err := es.doOKDecode(req, &stats); err != nil { + es.Warning(err) + return + } + + ms.NodesStats = &stats +} + +func (es *Elasticsearch) scrapeClusterHealth(ms *esMetrics) { + req, _ := web.NewHTTPRequest(es.Request) + req.URL.Path = urlPathClusterHealth + + var health esClusterHealth + if err := es.doOKDecode(req, &health); err != nil { + es.Warning(err) + return + } + + ms.ClusterHealth = &health +} + +func (es *Elasticsearch) scrapeClusterStats(ms *esMetrics) { + req, _ := web.NewHTTPRequest(es.Request) + req.URL.Path = urlPathClusterStats + + var stats esClusterStats + if err := es.doOKDecode(req, &stats); err != nil { + es.Warning(err) + return + } + + ms.ClusterStats = &stats +} + +func (es *Elasticsearch) scrapeLocalIndicesStats(ms *esMetrics) { + req, _ := web.NewHTTPRequest(es.Request) + req.URL.Path = urlPathIndicesStats + req.URL.RawQuery = "local=true&format=json" + + var stats []esIndexStats + if err := es.doOKDecode(req, &stats); err != nil { + es.Warning(err) + return + } + + ms.LocalIndicesStats = removeSystemIndices(stats) +} + +func (es *Elasticsearch) getClusterName() (string, error) { + req, _ := web.NewHTTPRequest(es.Request) + + var info struct { + ClusterName string `json:"cluster_name"` + } + + if err := es.doOKDecode(req, &info); err != nil { + return "", err + } + + if info.ClusterName == "" { + return "", errors.New("empty cluster name") + } + + return info.ClusterName, nil +} + +func (es *Elasticsearch) doOKDecode(req *http.Request, in interface{}) error { + resp, err := es.httpClient.Do(req) + if err != nil { + return fmt.Errorf("error on HTTP request '%s': %v", req.URL, err) + } + defer closeBody(resp) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("'%s' returned HTTP status code: %d", req.URL, resp.StatusCode) + } + + if err := json.NewDecoder(resp.Body).Decode(in); err != nil { + return fmt.Errorf("error on decoding response from '%s': %v", req.URL, err) + } + return nil +} + +func closeBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} + +func convertIndexStoreSizeToBytes(size string) int64 { + var num float64 + switch { + case strings.HasSuffix(size, "kb"): + num, _ = strconv.ParseFloat(size[:len(size)-2], 64) + num *= math.Pow(1024, 1) + case strings.HasSuffix(size, "mb"): + num, _ = strconv.ParseFloat(size[:len(size)-2], 64) + num *= math.Pow(1024, 2) + case strings.HasSuffix(size, "gb"): + num, _ = strconv.ParseFloat(size[:len(size)-2], 64) + num *= math.Pow(1024, 3) + case strings.HasSuffix(size, "tb"): + num, _ = strconv.ParseFloat(size[:len(size)-2], 64) + num *= math.Pow(1024, 4) + case strings.HasSuffix(size, "b"): + num, _ = strconv.ParseFloat(size[:len(size)-1], 64) + } + return int64(num) +} + +func strToInt(s string) int64 { + v, _ := strconv.Atoi(s) + return int64(v) +} + +func boolToInt(v bool) int64 { + if v { + return 1 + } + return 0 +} + +func removeSystemIndices(indices []esIndexStats) []esIndexStats { + var i int + for _, index := range indices { + if strings.HasPrefix(index.Index, ".") { + continue + } + indices[i] = index + i++ + } + return indices[:i] +} + +func merge(dst, src map[string]int64, prefix string) { + for k, v := range src { + dst[prefix+"_"+k] = v + } +} |