diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:35 +0000 |
commit | f09848204fa5283d21ea43e262ee41aa578e1808 (patch) | |
tree | c62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/agent/functions | |
parent | Releasing debian version 1.46.3-2. (diff) | |
download | netdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip |
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/agent/functions')
-rw-r--r-- | src/go/plugin/go.d/agent/functions/ext.go | 30 | ||||
-rw-r--r-- | src/go/plugin/go.d/agent/functions/function.go | 96 | ||||
-rw-r--r-- | src/go/plugin/go.d/agent/functions/input.go | 35 | ||||
-rw-r--r-- | src/go/plugin/go.d/agent/functions/manager.go | 127 | ||||
-rw-r--r-- | src/go/plugin/go.d/agent/functions/manager_test.go | 320 |
5 files changed, 608 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/functions/ext.go b/src/go/plugin/go.d/agent/functions/ext.go new file mode 100644 index 000000000..28c717d88 --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/ext.go @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +func (m *Manager) Register(name string, fn func(Function)) { + if fn == nil { + m.Warningf("not registering '%s': nil function", name) + return + } + + m.mux.Lock() + defer m.mux.Unlock() + + if _, ok := m.FunctionRegistry[name]; !ok { + m.Debugf("registering function '%s'", name) + } else { + m.Warningf("re-registering function '%s'", name) + } + m.FunctionRegistry[name] = fn +} + +func (m *Manager) Unregister(name string) { + m.mux.Lock() + defer m.mux.Unlock() + + if _, ok := m.FunctionRegistry[name]; !ok { + delete(m.FunctionRegistry, name) + m.Debugf("unregistering function '%s'", name) + } +} diff --git a/src/go/plugin/go.d/agent/functions/function.go b/src/go/plugin/go.d/agent/functions/function.go new file mode 100644 index 000000000..b65d3d713 --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/function.go @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bytes" + "context" + "encoding/csv" + "fmt" + "strconv" + "strings" + "time" +) + +type Function struct { + key string + UID string + Timeout time.Duration + Name string + Args []string + Payload []byte + Permissions string + Source string + ContentType string +} + +func (f *Function) String() string { + return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'", + f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload)) +} + +func parseFunction(s string) (*Function, error) { + r := csv.NewReader(strings.NewReader(s)) + r.Comma = ' ' + + parts, err := r.Read() + if err != nil { + return nil, err + } + + // FUNCTION UID Timeout "Name ...Parameters" 0xPermissions "SourceType" [ContentType] + if n := len(parts); n != 6 && n != 7 { + return nil, fmt.Errorf("unexpected number of words: want 6 or 7, got %d (%v)", n, parts) + } + + timeout, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return nil, err + } + + cmd := strings.Split(parts[3], " ") + + fn := &Function{ + key: parts[0], + UID: parts[1], + Timeout: time.Duration(timeout) * time.Second, + Name: cmd[0], + Args: cmd[1:], + Permissions: parts[4], + Source: parts[5], + } + + if len(parts) == 7 { + fn.ContentType = parts[6] + } + + return fn, nil +} + +func parseFunctionWithPayload(ctx context.Context, s string, in input) (*Function, error) { + fn, err := parseFunction(s) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + + for { + select { + case <-ctx.Done(): + return nil, nil + case line, ok := <-in.lines(): + if !ok { + return nil, nil + } + if line == "FUNCTION_PAYLOAD_END" { + fn.Payload = append(fn.Payload, buf.Bytes()...) + return fn, nil + } + if buf.Len() > 0 { + buf.WriteString("\n") + } + buf.WriteString(line) + } + } +} diff --git a/src/go/plugin/go.d/agent/functions/input.go b/src/go/plugin/go.d/agent/functions/input.go new file mode 100644 index 000000000..cb50c54d0 --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/input.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bufio" + "os" +) + +type input interface { + lines() chan string +} + +var stdinInput = func() input { + r := &stdinReader{chLines: make(chan string)} + go r.run() + return r +}() + +type stdinReader struct { + chLines chan string +} + +func (in *stdinReader) run() { + sc := bufio.NewScanner(bufio.NewReader(os.Stdin)) + + for sc.Scan() { + text := sc.Text() + in.chLines <- text + } +} + +func (in *stdinReader) lines() chan string { + return in.chLines +} diff --git a/src/go/plugin/go.d/agent/functions/manager.go b/src/go/plugin/go.d/agent/functions/manager.go new file mode 100644 index 000000000..b7cdecd6a --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/manager.go @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter" +) + +func NewManager() *Manager { + return &Manager{ + Logger: logger.New().With( + slog.String("component", "functions manager"), + ), + api: netdataapi.New(safewriter.Stdout), + input: stdinInput, + mux: &sync.Mutex{}, + FunctionRegistry: make(map[string]func(Function)), + } +} + +type Manager struct { + *logger.Logger + + api *netdataapi.API + + input input + + mux *sync.Mutex + FunctionRegistry map[string]func(Function) +} + +func (m *Manager) Run(ctx context.Context) { + m.Info("instance is started") + defer func() { m.Info("instance is stopped") }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); m.run(ctx) }() + + wg.Wait() + + <-ctx.Done() +} + +func (m *Manager) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case line, ok := <-m.input.lines(): + if !ok { + return + } + + var fn *Function + var err error + + // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears, + // we need to discard the current one and switch to the new one + switch { + case strings.HasPrefix(line, "FUNCTION "): + fn, err = parseFunction(line) + case strings.HasPrefix(line, "FUNCTION_PAYLOAD "): + fn, err = parseFunctionWithPayload(ctx, line, m.input) + case line == "": + continue + default: + m.Warningf("unexpected line: '%s'", line) + continue + } + + if err != nil { + m.Warningf("parse function: %v ('%s')", err, line) + continue + } + if fn == nil { + continue + } + + function, ok := m.lookupFunction(fn.Name) + if !ok { + m.Infof("skipping execution of '%s': unregistered function", fn.Name) + m.respf(fn, 501, "unregistered function: %s", fn.Name) + continue + } + if function == nil { + m.Warningf("skipping execution of '%s': nil function registered", fn.Name) + m.respf(fn, 501, "nil function: %s", fn.Name) + continue + } + + function(*fn) + } + } +} + +func (m *Manager) lookupFunction(name string) (func(Function), bool) { + m.mux.Lock() + defer m.mux.Unlock() + + f, ok := m.FunctionRegistry[name] + return f, ok +} + +func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) { + bs, _ := json.Marshal(struct { + Status int `json:"status"` + Message string `json:"message"` + }{ + Status: code, + Message: fmt.Sprintf(msgf, a...), + }) + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) +} diff --git a/src/go/plugin/go.d/agent/functions/manager_test.go b/src/go/plugin/go.d/agent/functions/manager_test.go new file mode 100644 index 000000000..c19519bc1 --- /dev/null +++ b/src/go/plugin/go.d/agent/functions/manager_test.go @@ -0,0 +1,320 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package functions + +import ( + "bufio" + "context" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewManager(t *testing.T) { + mgr := NewManager() + + assert.NotNilf(t, mgr.input, "Input") + assert.NotNilf(t, mgr.FunctionRegistry, "FunctionRegistry") +} + +func TestManager_Register(t *testing.T) { + type testInputFn struct { + name string + invalid bool + } + tests := map[string]struct { + input []testInputFn + expected []string + }{ + "valid registration": { + input: []testInputFn{ + {name: "fn1"}, + {name: "fn2"}, + }, + expected: []string{"fn1", "fn2"}, + }, + "registration with duplicates": { + input: []testInputFn{ + {name: "fn1"}, + {name: "fn2"}, + {name: "fn1"}, + }, + expected: []string{"fn1", "fn2"}, + }, + "registration with nil functions": { + input: []testInputFn{ + {name: "fn1"}, + {name: "fn2", invalid: true}, + }, + expected: []string{"fn1"}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mgr := NewManager() + + for _, v := range test.input { + if v.invalid { + mgr.Register(v.name, nil) + } else { + mgr.Register(v.name, func(Function) {}) + } + } + + var got []string + for name := range mgr.FunctionRegistry { + got = append(got, name) + } + sort.Strings(got) + sort.Strings(test.expected) + + assert.Equal(t, test.expected, got) + }) + } +} + +func TestManager_Run(t *testing.T) { + tests := map[string]struct { + register []string + input string + expected []Function + }{ + "valid function: single": { + register: []string{"fn1"}, + input: ` +FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" +`, + expected: []Function{ + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + }, + }, + "valid function: multiple": { + register: []string{"fn1", "fn2"}, + input: ` +FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" +FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" +`, + expected: []Function{ + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn2", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + }, + }, + "valid function: single with payload": { + register: []string{"fn1", "fn2"}, + input: ` +FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line1 +payload line2 +FUNCTION_PAYLOAD_END +`, + expected: []Function{ + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line1\npayload line2"), + }, + }, + }, + "valid function: multiple with payload": { + register: []string{"fn1", "fn2"}, + input: ` +FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line1 +payload line2 +FUNCTION_PAYLOAD_END + +FUNCTION_PAYLOAD UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line3 +payload line4 +FUNCTION_PAYLOAD_END +`, + expected: []Function{ + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line1\npayload line2"), + }, + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn2", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line3\npayload line4"), + }, + }, + }, + "valid function: multiple with and without payload": { + register: []string{"fn1", "fn2", "fn3", "fn4"}, + input: ` +FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line1 +payload line2 +FUNCTION_PAYLOAD_END + +FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" +FUNCTION UID 1 "fn3 arg1 arg2" 0xFFFF "method=api,role=test" + +FUNCTION_PAYLOAD UID 1 "fn4 arg1 arg2" 0xFFFF "method=api,role=test" application/json +payload line3 +payload line4 +FUNCTION_PAYLOAD_END +`, + expected: []Function{ + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn1", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line1\npayload line2"), + }, + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn2", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + { + key: "FUNCTION", + UID: "UID", + Timeout: time.Second, + Name: "fn3", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "", + Payload: nil, + }, + { + key: "FUNCTION_PAYLOAD", + UID: "UID", + Timeout: time.Second, + Name: "fn4", + Args: []string{"arg1", "arg2"}, + Permissions: "0xFFFF", + Source: "method=api,role=test", + ContentType: "application/json", + Payload: []byte("payload line3\npayload line4"), + }, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mgr := NewManager() + + mgr.input = newMockInput(test.input) + + mock := &mockFunctionExecutor{} + for _, v := range test.register { + mgr.Register(v, mock.execute) + } + + testTime := time.Second * 5 + ctx, cancel := context.WithTimeout(context.Background(), testTime) + defer cancel() + + done := make(chan struct{}) + + go func() { defer close(done); mgr.Run(ctx) }() + + timeout := testTime + time.Second*2 + tk := time.NewTimer(timeout) + defer tk.Stop() + + select { + case <-done: + assert.Equal(t, test.expected, mock.executed) + case <-tk.C: + t.Errorf("timed out after %s", timeout) + } + }) + } +} + +type mockFunctionExecutor struct { + executed []Function +} + +func (m *mockFunctionExecutor) execute(fn Function) { + m.executed = append(m.executed, fn) +} + +func newMockInput(data string) *mockInput { + m := &mockInput{chLines: make(chan string)} + sc := bufio.NewScanner(strings.NewReader(data)) + go func() { + for sc.Scan() { + m.chLines <- sc.Text() + } + close(m.chLines) + }() + return m +} + +type mockInput struct { + chLines chan string +} + +func (m *mockInput) lines() chan string { + return m.chLines +} |