summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/agent/functions
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/plugin/go.d/agent/functions')
-rw-r--r--src/go/plugin/go.d/agent/functions/ext.go30
-rw-r--r--src/go/plugin/go.d/agent/functions/function.go96
-rw-r--r--src/go/plugin/go.d/agent/functions/input.go35
-rw-r--r--src/go/plugin/go.d/agent/functions/manager.go127
-rw-r--r--src/go/plugin/go.d/agent/functions/manager_test.go320
5 files changed, 608 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/functions/ext.go b/src/go/plugin/go.d/agent/functions/ext.go
new file mode 100644
index 00000000..28c717d8
--- /dev/null
+++ b/src/go/plugin/go.d/agent/functions/ext.go
@@ -0,0 +1,30 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+func (m *Manager) Register(name string, fn func(Function)) {
+ if fn == nil {
+ m.Warningf("not registering '%s': nil function", name)
+ return
+ }
+
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ if _, ok := m.FunctionRegistry[name]; !ok {
+ m.Debugf("registering function '%s'", name)
+ } else {
+ m.Warningf("re-registering function '%s'", name)
+ }
+ m.FunctionRegistry[name] = fn
+}
+
+func (m *Manager) Unregister(name string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ if _, ok := m.FunctionRegistry[name]; !ok {
+ delete(m.FunctionRegistry, name)
+ m.Debugf("unregistering function '%s'", name)
+ }
+}
diff --git a/src/go/plugin/go.d/agent/functions/function.go b/src/go/plugin/go.d/agent/functions/function.go
new file mode 100644
index 00000000..b65d3d71
--- /dev/null
+++ b/src/go/plugin/go.d/agent/functions/function.go
@@ -0,0 +1,96 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "bytes"
+ "context"
+ "encoding/csv"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+)
+
+type Function struct {
+ key string
+ UID string
+ Timeout time.Duration
+ Name string
+ Args []string
+ Payload []byte
+ Permissions string
+ Source string
+ ContentType string
+}
+
+func (f *Function) String() string {
+ return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'",
+ f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload))
+}
+
+func parseFunction(s string) (*Function, error) {
+ r := csv.NewReader(strings.NewReader(s))
+ r.Comma = ' '
+
+ parts, err := r.Read()
+ if err != nil {
+ return nil, err
+ }
+
+ // FUNCTION UID Timeout "Name ...Parameters" 0xPermissions "SourceType" [ContentType]
+ if n := len(parts); n != 6 && n != 7 {
+ return nil, fmt.Errorf("unexpected number of words: want 6 or 7, got %d (%v)", n, parts)
+ }
+
+ timeout, err := strconv.ParseInt(parts[2], 10, 64)
+ if err != nil {
+ return nil, err
+ }
+
+ cmd := strings.Split(parts[3], " ")
+
+ fn := &Function{
+ key: parts[0],
+ UID: parts[1],
+ Timeout: time.Duration(timeout) * time.Second,
+ Name: cmd[0],
+ Args: cmd[1:],
+ Permissions: parts[4],
+ Source: parts[5],
+ }
+
+ if len(parts) == 7 {
+ fn.ContentType = parts[6]
+ }
+
+ return fn, nil
+}
+
+func parseFunctionWithPayload(ctx context.Context, s string, in input) (*Function, error) {
+ fn, err := parseFunction(s)
+ if err != nil {
+ return nil, err
+ }
+
+ var buf bytes.Buffer
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, nil
+ case line, ok := <-in.lines():
+ if !ok {
+ return nil, nil
+ }
+ if line == "FUNCTION_PAYLOAD_END" {
+ fn.Payload = append(fn.Payload, buf.Bytes()...)
+ return fn, nil
+ }
+ if buf.Len() > 0 {
+ buf.WriteString("\n")
+ }
+ buf.WriteString(line)
+ }
+ }
+}
diff --git a/src/go/plugin/go.d/agent/functions/input.go b/src/go/plugin/go.d/agent/functions/input.go
new file mode 100644
index 00000000..cb50c54d
--- /dev/null
+++ b/src/go/plugin/go.d/agent/functions/input.go
@@ -0,0 +1,35 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "bufio"
+ "os"
+)
+
+type input interface {
+ lines() chan string
+}
+
+var stdinInput = func() input {
+ r := &stdinReader{chLines: make(chan string)}
+ go r.run()
+ return r
+}()
+
+type stdinReader struct {
+ chLines chan string
+}
+
+func (in *stdinReader) run() {
+ sc := bufio.NewScanner(bufio.NewReader(os.Stdin))
+
+ for sc.Scan() {
+ text := sc.Text()
+ in.chLines <- text
+ }
+}
+
+func (in *stdinReader) lines() chan string {
+ return in.chLines
+}
diff --git a/src/go/plugin/go.d/agent/functions/manager.go b/src/go/plugin/go.d/agent/functions/manager.go
new file mode 100644
index 00000000..b7cdecd6
--- /dev/null
+++ b/src/go/plugin/go.d/agent/functions/manager.go
@@ -0,0 +1,127 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/logger"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter"
+)
+
+func NewManager() *Manager {
+ return &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "functions manager"),
+ ),
+ api: netdataapi.New(safewriter.Stdout),
+ input: stdinInput,
+ mux: &sync.Mutex{},
+ FunctionRegistry: make(map[string]func(Function)),
+ }
+}
+
+type Manager struct {
+ *logger.Logger
+
+ api *netdataapi.API
+
+ input input
+
+ mux *sync.Mutex
+ FunctionRegistry map[string]func(Function)
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.run(ctx) }()
+
+ wg.Wait()
+
+ <-ctx.Done()
+}
+
+func (m *Manager) run(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case line, ok := <-m.input.lines():
+ if !ok {
+ return
+ }
+
+ var fn *Function
+ var err error
+
+ // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears,
+ // we need to discard the current one and switch to the new one
+ switch {
+ case strings.HasPrefix(line, "FUNCTION "):
+ fn, err = parseFunction(line)
+ case strings.HasPrefix(line, "FUNCTION_PAYLOAD "):
+ fn, err = parseFunctionWithPayload(ctx, line, m.input)
+ case line == "":
+ continue
+ default:
+ m.Warningf("unexpected line: '%s'", line)
+ continue
+ }
+
+ if err != nil {
+ m.Warningf("parse function: %v ('%s')", err, line)
+ continue
+ }
+ if fn == nil {
+ continue
+ }
+
+ function, ok := m.lookupFunction(fn.Name)
+ if !ok {
+ m.Infof("skipping execution of '%s': unregistered function", fn.Name)
+ m.respf(fn, 501, "unregistered function: %s", fn.Name)
+ continue
+ }
+ if function == nil {
+ m.Warningf("skipping execution of '%s': nil function registered", fn.Name)
+ m.respf(fn, 501, "nil function: %s", fn.Name)
+ continue
+ }
+
+ function(*fn)
+ }
+ }
+}
+
+func (m *Manager) lookupFunction(name string) (func(Function), bool) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ f, ok := m.FunctionRegistry[name]
+ return f, ok
+}
+
+func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) {
+ bs, _ := json.Marshal(struct {
+ Status int `json:"status"`
+ Message string `json:"message"`
+ }{
+ Status: code,
+ Message: fmt.Sprintf(msgf, a...),
+ })
+ ts := strconv.FormatInt(time.Now().Unix(), 10)
+ m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
+}
diff --git a/src/go/plugin/go.d/agent/functions/manager_test.go b/src/go/plugin/go.d/agent/functions/manager_test.go
new file mode 100644
index 00000000..c19519bc
--- /dev/null
+++ b/src/go/plugin/go.d/agent/functions/manager_test.go
@@ -0,0 +1,320 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "bufio"
+ "context"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewManager(t *testing.T) {
+ mgr := NewManager()
+
+ assert.NotNilf(t, mgr.input, "Input")
+ assert.NotNilf(t, mgr.FunctionRegistry, "FunctionRegistry")
+}
+
+func TestManager_Register(t *testing.T) {
+ type testInputFn struct {
+ name string
+ invalid bool
+ }
+ tests := map[string]struct {
+ input []testInputFn
+ expected []string
+ }{
+ "valid registration": {
+ input: []testInputFn{
+ {name: "fn1"},
+ {name: "fn2"},
+ },
+ expected: []string{"fn1", "fn2"},
+ },
+ "registration with duplicates": {
+ input: []testInputFn{
+ {name: "fn1"},
+ {name: "fn2"},
+ {name: "fn1"},
+ },
+ expected: []string{"fn1", "fn2"},
+ },
+ "registration with nil functions": {
+ input: []testInputFn{
+ {name: "fn1"},
+ {name: "fn2", invalid: true},
+ },
+ expected: []string{"fn1"},
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ mgr := NewManager()
+
+ for _, v := range test.input {
+ if v.invalid {
+ mgr.Register(v.name, nil)
+ } else {
+ mgr.Register(v.name, func(Function) {})
+ }
+ }
+
+ var got []string
+ for name := range mgr.FunctionRegistry {
+ got = append(got, name)
+ }
+ sort.Strings(got)
+ sort.Strings(test.expected)
+
+ assert.Equal(t, test.expected, got)
+ })
+ }
+}
+
+func TestManager_Run(t *testing.T) {
+ tests := map[string]struct {
+ register []string
+ input string
+ expected []Function
+ }{
+ "valid function: single": {
+ register: []string{"fn1"},
+ input: `
+FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test"
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ },
+ },
+ "valid function: multiple": {
+ register: []string{"fn1", "fn2"},
+ input: `
+FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test"
+FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test"
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn2",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ },
+ },
+ "valid function: single with payload": {
+ register: []string{"fn1", "fn2"},
+ input: `
+FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line1
+payload line2
+FUNCTION_PAYLOAD_END
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line1\npayload line2"),
+ },
+ },
+ },
+ "valid function: multiple with payload": {
+ register: []string{"fn1", "fn2"},
+ input: `
+FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line1
+payload line2
+FUNCTION_PAYLOAD_END
+
+FUNCTION_PAYLOAD UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line3
+payload line4
+FUNCTION_PAYLOAD_END
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line1\npayload line2"),
+ },
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn2",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line3\npayload line4"),
+ },
+ },
+ },
+ "valid function: multiple with and without payload": {
+ register: []string{"fn1", "fn2", "fn3", "fn4"},
+ input: `
+FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line1
+payload line2
+FUNCTION_PAYLOAD_END
+
+FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test"
+FUNCTION UID 1 "fn3 arg1 arg2" 0xFFFF "method=api,role=test"
+
+FUNCTION_PAYLOAD UID 1 "fn4 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line3
+payload line4
+FUNCTION_PAYLOAD_END
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line1\npayload line2"),
+ },
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn2",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn3",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn4",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line3\npayload line4"),
+ },
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ mgr := NewManager()
+
+ mgr.input = newMockInput(test.input)
+
+ mock := &mockFunctionExecutor{}
+ for _, v := range test.register {
+ mgr.Register(v, mock.execute)
+ }
+
+ testTime := time.Second * 5
+ ctx, cancel := context.WithTimeout(context.Background(), testTime)
+ defer cancel()
+
+ done := make(chan struct{})
+
+ go func() { defer close(done); mgr.Run(ctx) }()
+
+ timeout := testTime + time.Second*2
+ tk := time.NewTimer(timeout)
+ defer tk.Stop()
+
+ select {
+ case <-done:
+ assert.Equal(t, test.expected, mock.executed)
+ case <-tk.C:
+ t.Errorf("timed out after %s", timeout)
+ }
+ })
+ }
+}
+
+type mockFunctionExecutor struct {
+ executed []Function
+}
+
+func (m *mockFunctionExecutor) execute(fn Function) {
+ m.executed = append(m.executed, fn)
+}
+
+func newMockInput(data string) *mockInput {
+ m := &mockInput{chLines: make(chan string)}
+ sc := bufio.NewScanner(strings.NewReader(data))
+ go func() {
+ for sc.Scan() {
+ m.chLines <- sc.Text()
+ }
+ close(m.chLines)
+ }()
+ return m
+}
+
+type mockInput struct {
+ chLines chan string
+}
+
+func (m *mockInput) lines() chan string {
+ return m.chLines
+}