summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/functions
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/functions/ext.go30
-rw-r--r--src/go/collectors/go.d.plugin/agent/functions/function.go88
-rw-r--r--src/go/collectors/go.d.plugin/agent/functions/manager.go136
-rw-r--r--src/go/collectors/go.d.plugin/agent/functions/manager_test.go299
4 files changed, 553 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/functions/ext.go b/src/go/collectors/go.d.plugin/agent/functions/ext.go
new file mode 100644
index 000000000..28c717d88
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/functions/ext.go
@@ -0,0 +1,30 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+func (m *Manager) Register(name string, fn func(Function)) {
+ if fn == nil {
+ m.Warningf("not registering '%s': nil function", name)
+ return
+ }
+
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ if _, ok := m.FunctionRegistry[name]; !ok {
+ m.Debugf("registering function '%s'", name)
+ } else {
+ m.Warningf("re-registering function '%s'", name)
+ }
+ m.FunctionRegistry[name] = fn
+}
+
+func (m *Manager) Unregister(name string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ if _, ok := m.FunctionRegistry[name]; !ok {
+ delete(m.FunctionRegistry, name)
+ m.Debugf("unregistering function '%s'", name)
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/functions/function.go b/src/go/collectors/go.d.plugin/agent/functions/function.go
new file mode 100644
index 000000000..b2fd42932
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/functions/function.go
@@ -0,0 +1,88 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/csv"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+)
+
+type Function struct {
+ key string
+ UID string
+ Timeout time.Duration
+ Name string
+ Args []string
+ Payload []byte
+ Permissions string
+ Source string
+ ContentType string
+}
+
+func (f *Function) String() string {
+ return fmt.Sprintf("key: '%s', uid: '%s', timeout: '%s', function: '%s', args: '%v', permissions: '%s', source: '%s', contentType: '%s', payload: '%s'",
+ f.key, f.UID, f.Timeout, f.Name, f.Args, f.Permissions, f.Source, f.ContentType, string(f.Payload))
+}
+
+func parseFunction(s string) (*Function, error) {
+ r := csv.NewReader(strings.NewReader(s))
+ r.Comma = ' '
+
+ parts, err := r.Read()
+ if err != nil {
+ return nil, err
+ }
+
+ // FUNCTION UID Timeout "Name ...Parameters" 0xPermissions "SourceType" [ContentType]
+ if n := len(parts); n != 6 && n != 7 {
+ return nil, fmt.Errorf("unexpected number of words: want 6 or 7, got %d (%v)", n, parts)
+ }
+
+ timeout, err := strconv.ParseInt(parts[2], 10, 64)
+ if err != nil {
+ return nil, err
+ }
+
+ cmd := strings.Split(parts[3], " ")
+
+ fn := &Function{
+ key: parts[0],
+ UID: parts[1],
+ Timeout: time.Duration(timeout) * time.Second,
+ Name: cmd[0],
+ Args: cmd[1:],
+ Permissions: parts[4],
+ Source: parts[5],
+ }
+
+ if len(parts) == 7 {
+ fn.ContentType = parts[6]
+ }
+
+ return fn, nil
+}
+
+func parseFunctionWithPayload(s string, sc *bufio.Scanner) (*Function, error) {
+ fn, err := parseFunction(s)
+ if err != nil {
+ return nil, err
+ }
+
+ var n int
+ var buf bytes.Buffer
+ for sc.Scan() && sc.Text() != "FUNCTION_PAYLOAD_END" {
+ if n++; n > 1 {
+ buf.WriteString("\n")
+ }
+ buf.WriteString(sc.Text())
+ }
+
+ fn.Payload = append(fn.Payload, buf.Bytes()...)
+
+ return fn, nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager.go b/src/go/collectors/go.d.plugin/agent/functions/manager.go
new file mode 100644
index 000000000..365d0670b
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/functions/manager.go
@@ -0,0 +1,136 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "bufio"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/mattn/go-isatty"
+ "github.com/muesli/cancelreader"
+)
+
+var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd())
+
+func NewManager() *Manager {
+ return &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "functions manager"),
+ ),
+ Input: os.Stdin,
+ api: netdataapi.New(safewriter.Stdout),
+ mux: &sync.Mutex{},
+ FunctionRegistry: make(map[string]func(Function)),
+ }
+}
+
+type Manager struct {
+ *logger.Logger
+
+ Input io.Reader
+ api *netdataapi.API
+ mux *sync.Mutex
+ FunctionRegistry map[string]func(Function)
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ if !isTerminal {
+ r, err := cancelreader.NewReader(m.Input)
+ if err != nil {
+ m.Errorf("fail to create cancel reader: %v", err)
+ return
+ }
+
+ go func() { <-ctx.Done(); r.Cancel() }()
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.run(r) }()
+
+ wg.Wait()
+ _ = r.Close()
+ }
+
+ <-ctx.Done()
+}
+
+func (m *Manager) run(r io.Reader) {
+ sc := bufio.NewScanner(r)
+
+ for sc.Scan() {
+ text := sc.Text()
+
+ var fn *Function
+ var err error
+
+ // FIXME: if we are waiting for FUNCTION_PAYLOAD_END and a new FUNCTION* appears,
+ // we need to discard the current one and switch to the new one
+ switch {
+ case strings.HasPrefix(text, "FUNCTION "):
+ fn, err = parseFunction(text)
+ case strings.HasPrefix(text, "FUNCTION_PAYLOAD "):
+ fn, err = parseFunctionWithPayload(text, sc)
+ case text == "":
+ continue
+ default:
+ m.Warningf("unexpected line: '%s'", text)
+ continue
+ }
+
+ if err != nil {
+ m.Warningf("parse function: %v ('%s')", err, text)
+ continue
+ }
+
+ function, ok := m.lookupFunction(fn.Name)
+ if !ok {
+ m.Infof("skipping execution of '%s': unregistered function", fn.Name)
+ m.respf(fn, 501, "unregistered function: %s", fn.Name)
+ continue
+ }
+ if function == nil {
+ m.Warningf("skipping execution of '%s': nil function registered", fn.Name)
+ m.respf(fn, 501, "nil function: %s", fn.Name)
+ continue
+ }
+
+ function(*fn)
+ }
+}
+
+func (m *Manager) lookupFunction(name string) (func(Function), bool) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ f, ok := m.FunctionRegistry[name]
+ return f, ok
+}
+
+func (m *Manager) respf(fn *Function, code int, msgf string, a ...any) {
+ bs, _ := json.Marshal(struct {
+ Status int `json:"status"`
+ Message string `json:"message"`
+ }{
+ Status: code,
+ Message: fmt.Sprintf(msgf, a...),
+ })
+ ts := strconv.FormatInt(time.Now().Unix(), 10)
+ m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
+}
diff --git a/src/go/collectors/go.d.plugin/agent/functions/manager_test.go b/src/go/collectors/go.d.plugin/agent/functions/manager_test.go
new file mode 100644
index 000000000..26a8cdd0c
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/functions/manager_test.go
@@ -0,0 +1,299 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package functions
+
+import (
+ "context"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewManager(t *testing.T) {
+ mgr := NewManager()
+
+ assert.NotNilf(t, mgr.Input, "Input")
+ assert.NotNilf(t, mgr.FunctionRegistry, "FunctionRegistry")
+}
+
+func TestManager_Register(t *testing.T) {
+ type testInputFn struct {
+ name string
+ invalid bool
+ }
+ tests := map[string]struct {
+ input []testInputFn
+ expected []string
+ }{
+ "valid registration": {
+ input: []testInputFn{
+ {name: "fn1"},
+ {name: "fn2"},
+ },
+ expected: []string{"fn1", "fn2"},
+ },
+ "registration with duplicates": {
+ input: []testInputFn{
+ {name: "fn1"},
+ {name: "fn2"},
+ {name: "fn1"},
+ },
+ expected: []string{"fn1", "fn2"},
+ },
+ "registration with nil functions": {
+ input: []testInputFn{
+ {name: "fn1"},
+ {name: "fn2", invalid: true},
+ },
+ expected: []string{"fn1"},
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ mgr := NewManager()
+
+ for _, v := range test.input {
+ if v.invalid {
+ mgr.Register(v.name, nil)
+ } else {
+ mgr.Register(v.name, func(Function) {})
+ }
+ }
+
+ var got []string
+ for name := range mgr.FunctionRegistry {
+ got = append(got, name)
+ }
+ sort.Strings(got)
+ sort.Strings(test.expected)
+
+ assert.Equal(t, test.expected, got)
+ })
+ }
+}
+
+func TestManager_Run(t *testing.T) {
+ tests := map[string]struct {
+ register []string
+ input string
+ expected []Function
+ }{
+ "valid function: single": {
+ register: []string{"fn1"},
+ input: `
+FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test"
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ },
+ },
+ "valid function: multiple": {
+ register: []string{"fn1", "fn2"},
+ input: `
+FUNCTION UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test"
+FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test"
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn2",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ },
+ },
+ "valid function: single with payload": {
+ register: []string{"fn1", "fn2"},
+ input: `
+FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line1
+payload line2
+FUNCTION_PAYLOAD_END
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line1\npayload line2"),
+ },
+ },
+ },
+ "valid function: multiple with payload": {
+ register: []string{"fn1", "fn2"},
+ input: `
+FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line1
+payload line2
+FUNCTION_PAYLOAD_END
+
+FUNCTION_PAYLOAD UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line3
+payload line4
+FUNCTION_PAYLOAD_END
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line1\npayload line2"),
+ },
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn2",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line3\npayload line4"),
+ },
+ },
+ },
+ "valid function: multiple with and without payload": {
+ register: []string{"fn1", "fn2", "fn3", "fn4"},
+ input: `
+FUNCTION_PAYLOAD UID 1 "fn1 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line1
+payload line2
+FUNCTION_PAYLOAD_END
+
+FUNCTION UID 1 "fn2 arg1 arg2" 0xFFFF "method=api,role=test"
+FUNCTION UID 1 "fn3 arg1 arg2" 0xFFFF "method=api,role=test"
+
+FUNCTION_PAYLOAD UID 1 "fn4 arg1 arg2" 0xFFFF "method=api,role=test" application/json
+payload line3
+payload line4
+FUNCTION_PAYLOAD_END
+`,
+ expected: []Function{
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn1",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line1\npayload line2"),
+ },
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn2",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ {
+ key: "FUNCTION",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn3",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "",
+ Payload: nil,
+ },
+ {
+ key: "FUNCTION_PAYLOAD",
+ UID: "UID",
+ Timeout: time.Second,
+ Name: "fn4",
+ Args: []string{"arg1", "arg2"},
+ Permissions: "0xFFFF",
+ Source: "method=api,role=test",
+ ContentType: "application/json",
+ Payload: []byte("payload line3\npayload line4"),
+ },
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ mgr := NewManager()
+
+ mgr.Input = strings.NewReader(test.input)
+
+ mock := &mockFunctionExecutor{}
+ for _, v := range test.register {
+ mgr.Register(v, mock.execute)
+ }
+
+ testTime := time.Second * 5
+ ctx, cancel := context.WithTimeout(context.Background(), testTime)
+ defer cancel()
+
+ done := make(chan struct{})
+
+ go func() { defer close(done); mgr.Run(ctx) }()
+
+ timeout := testTime + time.Second*2
+ tk := time.NewTimer(timeout)
+ defer tk.Stop()
+
+ select {
+ case <-done:
+ assert.Equal(t, test.expected, mock.executed)
+ case <-tk.C:
+ t.Errorf("timed out after %s", timeout)
+ }
+ })
+ }
+}
+
+type mockFunctionExecutor struct {
+ executed []Function
+}
+
+func (m *mockFunctionExecutor) execute(fn Function) {
+ m.executed = append(m.executed, fn)
+}