diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/couchdb/collect.go | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/couchdb/collect.go b/src/go/collectors/go.d.plugin/modules/couchdb/collect.go new file mode 100644 index 000000000..5c722fd0c --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/couchdb/collect.go @@ -0,0 +1,244 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package couchdb + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "math" + "net/http" + "strings" + "sync" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/stm" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" +) + +const ( + urlPathActiveTasks = "/_active_tasks" + urlPathOverviewStats = "/_node/%s/_stats" + urlPathSystemStats = "/_node/%s/_system" + urlPathDatabases = "/_dbs_info" + + httpStatusCodePrefix = "couchdb_httpd_status_codes_" + httpStatusCodePrefixLen = len(httpStatusCodePrefix) +) + +func (cdb *CouchDB) collect() (map[string]int64, error) { + ms := cdb.scrapeCouchDB() + if ms.empty() { + return nil, nil + } + + collected := make(map[string]int64) + cdb.collectNodeStats(collected, ms) + cdb.collectSystemStats(collected, ms) + cdb.collectActiveTasks(collected, ms) + cdb.collectDBStats(collected, ms) + + return collected, nil +} + +func (cdb *CouchDB) collectNodeStats(collected map[string]int64, ms *cdbMetrics) { + if !ms.hasNodeStats() { + return + } + + for metric, value := range stm.ToMap(ms.NodeStats) { + collected[metric] = value + if strings.HasPrefix(metric, httpStatusCodePrefix) { + code := metric[httpStatusCodePrefixLen:] + collected["couchdb_httpd_status_codes_"+string(code[0])+"xx"] += value + } + } +} + +func (cdb *CouchDB) collectSystemStats(collected map[string]int64, ms *cdbMetrics) { + if !ms.hasNodeSystem() { + return + } + + for metric, value := range stm.ToMap(ms.NodeSystem) { + collected[metric] = value + } + + collected["peak_msg_queue"] = findMaxMQSize(ms.NodeSystem.MessageQueues) +} + +func (cdb *CouchDB) collectActiveTasks(collected map[string]int64, ms *cdbMetrics) { + collected["active_tasks_indexer"] = 0 + collected["active_tasks_database_compaction"] = 0 + collected["active_tasks_replication"] = 0 + collected["active_tasks_view_compaction"] = 0 + + if !ms.hasActiveTasks() { + return + } + + for _, task := range ms.ActiveTasks { + collected["active_tasks_"+task.Type]++ + } +} + +func (cdb *CouchDB) collectDBStats(collected map[string]int64, ms *cdbMetrics) { + if !ms.hasDBStats() { + return + } + + for _, dbStats := range ms.DBStats { + if dbStats.Error != "" { + cdb.Warning("database '", dbStats.Key, "' doesn't exist") + continue + } + merge(collected, stm.ToMap(dbStats.Info), "db_"+dbStats.Key) + } +} + +func (cdb *CouchDB) scrapeCouchDB() *cdbMetrics { + ms := &cdbMetrics{} + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { defer wg.Done(); cdb.scrapeNodeStats(ms) }() + + wg.Add(1) + go func() { defer wg.Done(); cdb.scrapeSystemStats(ms) }() + + wg.Add(1) + go func() { defer wg.Done(); cdb.scrapeActiveTasks(ms) }() + + if len(cdb.databases) > 0 { + wg.Add(1) + go func() { defer wg.Done(); cdb.scrapeDBStats(ms) }() + } + + wg.Wait() + return ms +} + +func (cdb *CouchDB) scrapeNodeStats(ms *cdbMetrics) { + req, _ := web.NewHTTPRequest(cdb.Request) + req.URL.Path = fmt.Sprintf(urlPathOverviewStats, cdb.Config.Node) + + var stats cdbNodeStats + if err := cdb.doOKDecode(req, &stats); err != nil { + cdb.Warning(err) + return + } + ms.NodeStats = &stats +} + +func (cdb *CouchDB) scrapeSystemStats(ms *cdbMetrics) { + req, _ := web.NewHTTPRequest(cdb.Request) + req.URL.Path = fmt.Sprintf(urlPathSystemStats, cdb.Config.Node) + + var stats cdbNodeSystem + if err := cdb.doOKDecode(req, &stats); err != nil { + cdb.Warning(err) + return + } + ms.NodeSystem = &stats +} + +func (cdb *CouchDB) scrapeActiveTasks(ms *cdbMetrics) { + req, _ := web.NewHTTPRequest(cdb.Request) + req.URL.Path = urlPathActiveTasks + + var stats []cdbActiveTask + if err := cdb.doOKDecode(req, &stats); err != nil { + cdb.Warning(err) + return + } + ms.ActiveTasks = stats +} + +func (cdb *CouchDB) scrapeDBStats(ms *cdbMetrics) { + req, _ := web.NewHTTPRequest(cdb.Request) + req.URL.Path = urlPathDatabases + req.Method = http.MethodPost + req.Header.Add("Accept", "application/json") + req.Header.Add("Content-Type", "application/json") + + var q struct { + Keys []string `json:"keys"` + } + q.Keys = cdb.databases + body, err := json.Marshal(q) + if err != nil { + cdb.Error(err) + return + } + req.Body = io.NopCloser(bytes.NewReader(body)) + + var stats []cdbDBStats + if err := cdb.doOKDecode(req, &stats); err != nil { + cdb.Warning(err) + return + } + ms.DBStats = stats +} + +func findMaxMQSize(MessageQueues map[string]interface{}) int64 { + var max float64 + for _, mq := range MessageQueues { + switch mqSize := mq.(type) { + case float64: + max = math.Max(max, mqSize) + case map[string]interface{}: + if v, ok := mqSize["count"].(float64); ok { + max = math.Max(max, v) + } + } + } + return int64(max) +} + +func (cdb *CouchDB) pingCouchDB() error { + req, _ := web.NewHTTPRequest(cdb.Request) + + var info struct{ Couchdb string } + if err := cdb.doOKDecode(req, &info); err != nil { + return err + } + + if info.Couchdb != "Welcome" { + return errors.New("not a CouchDB endpoint") + } + + return nil +} + +func (cdb *CouchDB) doOKDecode(req *http.Request, in interface{}) error { + resp, err := cdb.httpClient.Do(req) + if err != nil { + return fmt.Errorf("error on HTTP request '%s': %v", req.URL, err) + } + defer closeBody(resp) + + // TODO: read resp body, it contains reason + // ex.: {"error":"bad_request","reason":"`keys` member must exist."} (400) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("'%s' returned HTTP status code: %d", req.URL, resp.StatusCode) + } + + if err := json.NewDecoder(resp.Body).Decode(in); err != nil { + return fmt.Errorf("error on decoding response from '%s': %v", req.URL, err) + } + return nil +} + +func closeBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} + +func merge(dst, src map[string]int64, prefix string) { + for k, v := range src { + dst[prefix+"_"+k] = v + } +} |