summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/functions/manager.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/functions/manager.go136
1 files changed, 136 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager.go b/src/go/collectors/go.d.plugin/agent/functions/manager.go
new file mode 100644
index 000000000..365d0670b
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/functions/manager.go
@@ -0,0 +1,136 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "bufio"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/mattn/go-isatty"
+ "github.com/muesli/cancelreader"
+)
+
+var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd())
+
+func NewManager() *Manager {
+ return &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "functions manager"),
+ ),
+ Input: os.Stdin,
+ api: netdataapi.New(safewriter.Stdout),
+ mux: &sync.Mutex{},
+ FunctionRegistry: make(map[string]func(Function)),
+ }
+}
+
+type Manager struct {
+ *logger.Logger
+
+ Input io.Reader
+ api *netdataapi.API
+ mux *sync.Mutex
+ FunctionRegistry map[string]func(Function)
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ if !isTerminal {
+ r, err := cancelreader.NewReader(m.Input)
+ if err != nil {
+ m.Errorf("fail to create cancel reader: %v", err)
+ return
+ }
+
+ go func() { <-ctx.Done(); r.Cancel() }()
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.run(r) }()
+
+ wg.Wait()
+ _ = r.Close()
+ }
+
+ <-ctx.Done()
+}
+
+func (m *Manager) run(r io.Reader) {
+ sc := bufio.NewScanner(r)
+
+ for sc.Scan() {
+ text := sc.Text()
+
+ var fn *Function
+ var err error
+
+ // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears,
+ // we need to discard the current one and switch to the new one
+ switch {
+ case strings.HasPrefix(text, "FUNCTION "):
+ fn, err = parseFunction(text)
+ case strings.HasPrefix(text, "FUNCTION_PAYLOAD "):
+ fn, err = parseFunctionWithPayload(text, sc)
+ case text == "":
+ continue
+ default:
+ m.Warningf("unexpected line: '%s'", text)
+ continue
+ }
+
+ if err != nil {
+ m.Warningf("parse function: %v ('%s')", err, text)
+ continue
+ }
+
+ function, ok := m.lookupFunction(fn.Name)
+ if !ok {
+ m.Infof("skipping execution of '%s': unregistered function", fn.Name)
+ m.respf(fn, 501, "unregistered function: %s", fn.Name)
+ continue
+ }
+ if function == nil {
+ m.Warningf("skipping execution of '%s': nil function registered", fn.Name)
+ m.respf(fn, 501, "nil function: %s", fn.Name)
+ continue
+ }
+
+ function(*fn)
+ }
+}
+
+func (m *Manager) lookupFunction(name string) (func(Function), bool) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ f, ok := m.FunctionRegistry[name]
+ return f, ok
+}
+
+func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) {
+ bs, _ := json.Marshal(struct {
+ Status int `json:"status"`
+ Message string `json:"message"`
+ }{
+ Status: code,
+ Message: fmt.Sprintf(msgf, a...),
+ })
+ ts := strconv.FormatInt(time.Now().Unix(), 10)
+ m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
+}