diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/functions/manager.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/functions/manager.go | 136 |
1 files changed, 0 insertions, 136 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager.go b/src/go/collectors/go.d.plugin/agent/functions/manager.go deleted file mode 100644 index 365d0670b..000000000 --- a/src/go/collectors/go.d.plugin/agent/functions/manager.go +++ /dev/null @@ -1,136 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -package functions - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "log/slog" - "os" - "strconv" - "strings" - "sync" - "time" - - "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi" - "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter" - "github.com/netdata/netdata/go/go.d.plugin/logger" - - "github.com/mattn/go-isatty" - "github.com/muesli/cancelreader" -) - -var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd()) - -func NewManager() *Manager { - return &Manager{ - Logger: logger.New().With( - slog.String("component", "functions manager"), - ), - Input: os.Stdin, - api: netdataapi.New(safewriter.Stdout), - mux: &sync.Mutex{}, - FunctionRegistry: make(map[string]func(Function)), - } -} - -type Manager struct { - *logger.Logger - - Input io.Reader - api *netdataapi.API - mux *sync.Mutex - FunctionRegistry map[string]func(Function) -} - -func (m *Manager) Run(ctx context.Context) { - m.Info("instance is started") - defer func() { m.Info("instance is stopped") }() - - if !isTerminal { - r, err := cancelreader.NewReader(m.Input) - if err != nil { - m.Errorf("fail to create cancel reader: %v", err) - return - } - - go func() { <-ctx.Done(); r.Cancel() }() - - var wg sync.WaitGroup - - wg.Add(1) - go func() { defer wg.Done(); m.run(r) }() - - wg.Wait() - _ = r.Close() - } - - <-ctx.Done() -} - -func (m *Manager) run(r io.Reader) { - sc := bufio.NewScanner(r) - - for sc.Scan() { - text := sc.Text() - - var fn *Function - var err error - - // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears, - // we need to discard the current one and switch to the new one - switch { - case strings.HasPrefix(text, "FUNCTION "): - fn, err = parseFunction(text) - case strings.HasPrefix(text, "FUNCTION_PAYLOAD "): - fn, err = parseFunctionWithPayload(text, sc) - case text == "": - continue - default: - m.Warningf("unexpected line: '%s'", text) - continue - } - - if err != nil { - m.Warningf("parse function: %v ('%s')", err, text) - continue - } - - function, ok := m.lookupFunction(fn.Name) - if !ok { - m.Infof("skipping execution of '%s': unregistered function", fn.Name) - m.respf(fn, 501, "unregistered function: %s", fn.Name) - continue - } - if function == nil { - m.Warningf("skipping execution of '%s': nil function registered", fn.Name) - m.respf(fn, 501, "nil function: %s", fn.Name) - continue - } - - function(*fn) - } -} - -func (m *Manager) lookupFunction(name string) (func(Function), bool) { - m.mux.Lock() - defer m.mux.Unlock() - - f, ok := m.FunctionRegistry[name] - return f, ok -} - -func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) { - bs, _ := json.Marshal(struct { - Status int `json:"status"` - Message string `json:"message"` - }{ - Status: code, - Message: fmt.Sprintf(msgf, a...), - }) - ts := strconv.FormatInt(time.Now().Unix(), 10) - m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) -} |