diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/functions')
4 files changed, 553 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/functions/ext.go b/src/go/collectors/go.d.plugin/agent/functions/ext.go new file mode 100644 index 000000000..28c717d88 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/functions/ext.go @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +func (m *Manager) Register(name string, fn func(Function)) { + if fn == nil { + m.Warningf("not registering '%s': nil function", name) + return + } + + m.mux.Lock() + defer m.mux.Unlock() + + if _, ok := m.FunctionRegistry[name]; !ok { + m.Debugf("registering function '%s'", name) + } else { + m.Warningf("re-registering function '%s'", name) + } + m.FunctionRegistry[name] = fn +} + +func (m *Manager) Unregister(name string) { + m.mux.Lock() + defer m.mux.Unlock() + + if _, ok := m.FunctionRegistry[name]; !ok { + delete(m.FunctionRegistry, name) + m.Debugf("unregistering function '%s'", name) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/functions/function.go b/src/go/collectors/go.d.plugin/agent/functions/function.go new file mode 100644 index 000000000..b2fd42932 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/functions/function.go @@ -0,0 +1,88 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bufio" + "bytes" + "encoding/csv" + "fmt" + "strconv" + "strings" + "time" +) + +type Function struct { + key string + UID string + Timeout time.Duration + Name string + Args []string + Payload []byte + Permissions string + Source string + ContentType string +} + +func (f *Function) String() string { + return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'", + f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload)) +} + +func parseFunction(s string) (*Function, error) { + r := csv.NewReader(strings.NewReader(s)) + r.Comma = ' ' + + parts, err := r.Read() + if err != nil { + return nil, err + } + + // FUNCTION UID Timeout "Name ...Parameters" 0xPermissions "SourceType" [ContentType] + if n := len(parts); n != 6 && n != 7 { + return nil, fmt.Errorf("unexpected number of words: want 6 or 7, got %d (%v)", n, parts) + } + + timeout, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return nil, err + } + + cmd := strings.Split(parts[3], " ") + + fn := &Function{ + key: parts[0], + UID: parts[1], + Timeout: time.Duration(timeout) * time.Second, + Name: cmd[0], + Args: cmd[1:], + Permissions: parts[4], + Source: parts[5], + } + + if len(parts) == 7 { + fn.ContentType = parts[6] + } + + return fn, nil +} + +func parseFunctionWithPayload(s string, sc *bufio.Scanner) (*Function, error) { + fn, err := parseFunction(s) + if err != nil { + return nil, err + } + + var n int + var buf bytes.Buffer + for sc.Scan() && sc.Text() != "FUNCTION_PAYLOAD_END" { + if n++; n > 1 { + buf.WriteString("\n") + } + buf.WriteString(sc.Text()) + } + + fn.Payload = append(fn.Payload, buf.Bytes()...) + + return fn, nil +} diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager.go b/src/go/collectors/go.d.plugin/agent/functions/manager.go new file mode 100644 index 000000000..365d0670b --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/functions/manager.go @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi" + "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/mattn/go-isatty" + "github.com/muesli/cancelreader" +) + +var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd()) + +func NewManager() *Manager { + return &Manager{ + Logger: logger.New().With( + slog.String("component", "functions manager"), + ), + Input: os.Stdin, + api: netdataapi.New(safewriter.Stdout), + mux: &sync.Mutex{}, + FunctionRegistry: make(map[string]func(Function)), + } +} + +type Manager struct { + *logger.Logger + + Input io.Reader + api *netdataapi.API + mux *sync.Mutex + FunctionRegistry map[string]func(Function) +} + +func (m *Manager) Run(ctx context.Context) { + m.Info("instance is started") + defer func() { m.Info("instance is stopped") }() + + if !isTerminal { + r, err := cancelreader.NewReader(m.Input) + if err != nil { + m.Errorf("fail to create cancel reader: %v", err) + return + } + + go func() { <-ctx.Done(); r.Cancel() }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); m.run(r) }() + + wg.Wait() + _ = r.Close() + } + + <-ctx.Done() +} + +func (m *Manager) run(r io.Reader) { + sc := bufio.NewScanner(r) + + for sc.Scan() { + text := sc.Text() + + var fn *Function + var err error + + // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears, + // we need to discard the current one and switch to the new one + switch { + case strings.HasPrefix(text, "FUNCTION "): + fn, err = parseFunction(text) + case strings.HasPrefix(text, "FUNCTION_PAYLOAD "): + fn, err = parseFunctionWithPayload(text, sc) + case text == "": + continue + default: + m.Warningf("unexpected line: '%s'", text) + continue + } + + if err != nil { + m.Warningf("parse function: %v ('%s')", err, text) + continue + } + + function, ok := m.lookupFunction(fn.Name) + if !ok { + m.Infof("skipping execution of '%s': unregistered function", fn.Name) + m.respf(fn, 501, "unregistered function: %s", fn.Name) + continue + } + if function == nil { + m.Warningf("skipping execution of '%s': nil function registered", fn.Name) + m.respf(fn, 501, "nil function: %s", fn.Name) + continue + } + + function(*fn) + } +} + +func (m *Manager) lookupFunction(name string) (func(Function), bool) { + m.mux.Lock() + defer m.mux.Unlock() + + f, ok := m.FunctionRegistry[name] + return f, ok +} + +func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) { + bs, _ := json.Marshal(struct { + Status int `json:"status"` + Message string `json:"message"` + }{ + Status: code, + Message: fmt.Sprintf(msgf, a...), + }) + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) +} diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager_test.go b/src/go/collectors/go.d.plugin/agent/functions/manager_test.go new file mode 100644 index 000000000..26a8cdd0c --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/functions/manager_test.go @@ -0,0 +1,299 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "context" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewManager(t *testing.T) { + mgr := NewManager() + + assert.NotNilf(t, mgr.Input, "Input") + assert.NotNilf(t, mgr.FunctionRegistry, "FunctionRegistry") +} + +func TestManager_Register(t *testing.T) { + type testInputFn struct { + name string + invalid bool + } + tests := map[string]struct { + input []testInputFn + expected []string + }{ + "valid registration": { + input: []testInputFn{ + {name: "fn1"}, + {name: "fn2"}, + }, + expected: []string{"fn1", "fn2"}, + }, + "registration with duplicates": { + input: []testInputFn{ + {name: "fn1"}, + {name: "fn2"}, + {name: "fn1"}, + }, + expected: []string{"fn1", "fn2"}, + }, + "registration with nil functions": { + input: []testInputFn{ + {name: "fn1"}, + {name: "fn2", invalid: true}, + }, + expected: []string{"fn1"}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mgr := NewManager() + + for _, v := range test.input { + if v.invalid { + mgr.Register(v.name, nil) + } else { + mgr.Register(v.name, func(Function) {}) + } + } + + var got []string + for name := range mgr.FunctionRegistry { + got = append(got, name) + } + sort.Strings(got) + sort.Strings(test.expected) + + assert.Equal(t, test.expected, got) + }) + } +} + +func TestManager_Run(t *testing.T) { + tests := map[string]struct { + register []string + input string + expected []Function + }{ + "valid function: single": { + register: []string{"fn1"}, + input: ` +FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" +`, + expected: []Function{ + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + }, + }, + "valid function: multiple": { + register: []string{"fn1", "fn2"}, + input: ` +FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" +FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" +`, + expected: []Function{ + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn2", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + }, + }, + "valid function: single with payload": { + register: []string{"fn1", "fn2"}, + input: ` +FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line1 +payload line2 +FUNCTION_PAYLOAD_END +`, + expected: []Function{ + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line1\npayload line2"), + }, + }, + }, + "valid function: multiple with payload": { + register: []string{"fn1", "fn2"}, + input: ` +FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line1 +payload line2 +FUNCTION_PAYLOAD_END + +FUNCTION_PAYLOAD UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line3 +payload line4 +FUNCTION_PAYLOAD_END +`, + expected: []Function{ + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line1\npayload line2"), + }, + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn2", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line3\npayload line4"), + }, + }, + }, + "valid function: multiple with and without payload": { + register: []string{"fn1", "fn2", "fn3", "fn4"}, + input: ` +FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line1 +payload line2 +FUNCTION_PAYLOAD_END + +FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" +FUNCTION UID 1 "fn3 arg1 arg2" 0xFFFF "method=api,role=test" + +FUNCTION_PAYLOAD UID 1 "fn4 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line3 +payload line4 +FUNCTION_PAYLOAD_END +`, + expected: []Function{ + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line1\npayload line2"), + }, + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn2", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn3", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn4", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line3\npayload line4"), + }, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mgr := NewManager() + + mgr.Input = strings.NewReader(test.input) + + mock := &mockFunctionExecutor{} + for _, v := range test.register { + mgr.Register(v, mock.execute) + } + + testTime := time.Second * 5 + ctx, cancel := context.WithTimeout(context.Background(), testTime) + defer cancel() + + done := make(chan struct{}) + + go func() { defer close(done); mgr.Run(ctx) }() + + timeout := testTime + time.Second*2 + tk := time.NewTimer(timeout) + defer tk.Stop() + + select { + case <-done: + assert.Equal(t, test.expected, mock.executed) + case <-tk.C: + t.Errorf("timed out after %s", timeout) + } + }) + } +} + +type mockFunctionExecutor struct { + executed []Function +} + +func (m *mockFunctionExecutor) execute(fn Function) { + m.executed = append(m.executed, fn) +} |