summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/agent/functions/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/plugin/go.d/agent/functions/manager.go')
-rw-r--r--src/go/plugin/go.d/agent/functions/manager.go127
1 files changed, 127 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/functions/manager.go b/src/go/plugin/go.d/agent/functions/manager.go
new file mode 100644
index 000000000..b7cdecd6a
--- /dev/null
+++ b/src/go/plugin/go.d/agent/functions/manager.go
@@ -0,0 +1,127 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/logger"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter"
+)
+
+func NewManager() *Manager {
+ return &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "functions manager"),
+ ),
+ api: netdataapi.New(safewriter.Stdout),
+ input: stdinInput,
+ mux: &sync.Mutex{},
+ FunctionRegistry: make(map[string]func(Function)),
+ }
+}
+
+type Manager struct {
+ *logger.Logger
+
+ api *netdataapi.API
+
+ input input
+
+ mux *sync.Mutex
+ FunctionRegistry map[string]func(Function)
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.run(ctx) }()
+
+ wg.Wait()
+
+ <-ctx.Done()
+}
+
+func (m *Manager) run(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case line, ok := <-m.input.lines():
+ if !ok {
+ return
+ }
+
+ var fn *Function
+ var err error
+
+ // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears,
+ // we need to discard the current one and switch to the new one
+ switch {
+ case strings.HasPrefix(line, "FUNCTION "):
+ fn, err = parseFunction(line)
+ case strings.HasPrefix(line, "FUNCTION_PAYLOAD "):
+ fn, err = parseFunctionWithPayload(ctx, line, m.input)
+ case line == "":
+ continue
+ default:
+ m.Warningf("unexpected line: '%s'", line)
+ continue
+ }
+
+ if err != nil {
+ m.Warningf("parse function: %v ('%s')", err, line)
+ continue
+ }
+ if fn == nil {
+ continue
+ }
+
+ function, ok := m.lookupFunction(fn.Name)
+ if !ok {
+ m.Infof("skipping execution of '%s': unregistered function", fn.Name)
+ m.respf(fn, 501, "unregistered function: %s", fn.Name)
+ continue
+ }
+ if function == nil {
+ m.Warningf("skipping execution of '%s': nil function registered", fn.Name)
+ m.respf(fn, 501, "nil function: %s", fn.Name)
+ continue
+ }
+
+ function(*fn)
+ }
+ }
+}
+
+func (m *Manager) lookupFunction(name string) (func(Function), bool) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ f, ok := m.FunctionRegistry[name]
+ return f, ok
+}
+
+func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) {
+ bs, _ := json.Marshal(struct {
+ Status int `json:"status"`
+ Message string `json:"message"`
+ }{
+ Status: code,
+ Message: fmt.Sprintf(msgf, a...),
+ })
+ ts := strconv.FormatInt(time.Now().Unix(), 10)
+ m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
+}