diff options
Diffstat (limited to 'src/go/plugin/go.d/agent/functions/manager.go')
-rw-r--r-- | src/go/plugin/go.d/agent/functions/manager.go | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/functions/manager.go b/src/go/plugin/go.d/agent/functions/manager.go new file mode 100644 index 000000000..b7cdecd6a --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/manager.go @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter" +) + +func NewManager() *Manager { + return &Manager{ + Logger: logger.New().With( + slog.String("component", "functions manager"), + ), + api: netdataapi.New(safewriter.Stdout), + input: stdinInput, + mux: &sync.Mutex{}, + FunctionRegistry: make(map[string]func(Function)), + } +} + +type Manager struct { + *logger.Logger + + api *netdataapi.API + + input input + + mux *sync.Mutex + FunctionRegistry map[string]func(Function) +} + +func (m *Manager) Run(ctx context.Context) { + m.Info("instance is started") + defer func() { m.Info("instance is stopped") }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); m.run(ctx) }() + + wg.Wait() + + <-ctx.Done() +} + +func (m *Manager) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case line, ok := <-m.input.lines(): + if !ok { + return + } + + var fn *Function + var err error + + // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears, + // we need to discard the current one and switch to the new one + switch { + case strings.HasPrefix(line, "FUNCTION "): + fn, err = parseFunction(line) + case strings.HasPrefix(line, "FUNCTION_PAYLOAD "): + fn, err = parseFunctionWithPayload(ctx, line, m.input) + case line == "": + continue + default: + m.Warningf("unexpected line: '%s'", line) + continue + } + + if err != nil { + m.Warningf("parse function: %v ('%s')", err, line) + continue + } + if fn == nil { + continue + } + + function, ok := m.lookupFunction(fn.Name) + if !ok { + m.Infof("skipping execution of '%s': unregistered function", fn.Name) + m.respf(fn, 501, "unregistered function: %s", fn.Name) + continue + } + if function == nil { + m.Warningf("skipping execution of '%s': nil function registered", fn.Name) + m.respf(fn, 501, "nil function: %s", fn.Name) + continue + } + + function(*fn) + } + } +} + +func (m *Manager) lookupFunction(name string) (func(Function), bool) { + m.mux.Lock() + defer m.mux.Unlock() + + f, ok := m.FunctionRegistry[name] + return f, ok +} + +func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) { + bs, _ := json.Marshal(struct { + Status int `json:"status"` + Message string `json:"message"` + }{ + Status: code, + Message: fmt.Sprintf(msgf, a...), + }) + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) +} |