diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/functions/manager.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/functions/manager.go | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager.go b/src/go/collectors/go.d.plugin/agent/functions/manager.go new file mode 100644 index 000000000..365d0670b --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/functions/manager.go @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi" + "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/mattn/go-isatty" + "github.com/muesli/cancelreader" +) + +var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd()) + +func NewManager() *Manager { + return &Manager{ + Logger: logger.New().With( + slog.String("component", "functions manager"), + ), + Input: os.Stdin, + api: netdataapi.New(safewriter.Stdout), + mux: &sync.Mutex{}, + FunctionRegistry: make(map[string]func(Function)), + } +} + +type Manager struct { + *logger.Logger + + Input io.Reader + api *netdataapi.API + mux *sync.Mutex + FunctionRegistry map[string]func(Function) +} + +func (m *Manager) Run(ctx context.Context) { + m.Info("instance is started") + defer func() { m.Info("instance is stopped") }() + + if !isTerminal { + r, err := cancelreader.NewReader(m.Input) + if err != nil { + m.Errorf("fail to create cancel reader: %v", err) + return + } + + go func() { <-ctx.Done(); r.Cancel() }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); m.run(r) }() + + wg.Wait() + _ = r.Close() + } + + <-ctx.Done() +} + +func (m *Manager) run(r io.Reader) { + sc := bufio.NewScanner(r) + + for sc.Scan() { + text := sc.Text() + + var fn *Function + var err error + + // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears, + // we need to discard the current one and switch to the new one + switch { + case strings.HasPrefix(text, "FUNCTION "): + fn, err = parseFunction(text) + case strings.HasPrefix(text, "FUNCTION_PAYLOAD "): + fn, err = parseFunctionWithPayload(text, sc) + case text == "": + continue + default: + m.Warningf("unexpected line: '%s'", text) + continue + } + + if err != nil { + m.Warningf("parse function: %v ('%s')", err, text) + continue + } + + function, ok := m.lookupFunction(fn.Name) + if !ok { + m.Infof("skipping execution of '%s': unregistered function", fn.Name) + m.respf(fn, 501, "unregistered function: %s", fn.Name) + continue + } + if function == nil { + m.Warningf("skipping execution of '%s': nil function registered", fn.Name) + m.respf(fn, 501, "nil function: %s", fn.Name) + continue + } + + function(*fn) + } +} + +func (m *Manager) lookupFunction(name string) (func(Function), bool) { + m.mux.Lock() + defer m.mux.Unlock() + + f, ok := m.FunctionRegistry[name] + return f, ok +} + +func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) { + bs, _ := json.Marshal(struct { + Status int `json:"status"` + Message string `json:"message"` + }{ + Status: code, + Message: fmt.Sprintf(msgf, a...), + }) + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) +} |