diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/modules/logstash/collect.go | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/logstash/collect.go b/src/go/collectors/go.d.plugin/modules/logstash/collect.go new file mode 100644 index 000000000..3eceb9bf6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/logstash/collect.go @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package logstash + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/stm" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" +) + +const urlPathNodeStatsAPI = "/_node/stats" + +func (l *Logstash) collect() (map[string]int64, error) { + stats, err := l.queryNodeStats() + if err != nil { + return nil, err + } + + l.updateCharts(stats.Pipelines) + + return stm.ToMap(stats), nil +} + +func (l *Logstash) updateCharts(pipelines map[string]pipelineStats) { + seen := make(map[string]bool) + + for id := range pipelines { + seen[id] = true + if !l.pipelines[id] { + l.pipelines[id] = true + l.addPipelineCharts(id) + } + } + + for id := range l.pipelines { + if !seen[id] { + delete(l.pipelines, id) + l.removePipelineCharts(id) + } + } +} + +func (l *Logstash) queryNodeStats() (*nodeStats, error) { + req, _ := web.NewHTTPRequest(l.Request.Copy()) + req.URL.Path = urlPathNodeStatsAPI + + var stats nodeStats + + if err := l.doWithDecode(&stats, req); err != nil { + return nil, err + } + + return &stats, nil +} + +func (l *Logstash) doWithDecode(dst interface{}, req *http.Request) error { + l.Debugf("executing %s '%s'", req.Method, req.URL) + resp, err := l.httpClient.Do(req) + if err != nil { + return err + } + defer closeBody(resp) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned %d status code (%s)", req.URL, resp.StatusCode, resp.Status) + } + + content, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error on reading response from %s : %v", req.URL, err) + } + + if err := json.Unmarshal(content, dst); err != nil { + return fmt.Errorf("error on parsing response from %s : %v", req.URL, err) + } + + return nil +} + +func closeBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} |