From f09848204fa5283d21ea43e262ee41aa578e1808 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 26 Aug 2024 10:15:24 +0200 Subject: Merging upstream version 1.47.0. Signed-off-by: Daniel Baumann --- src/go/plugin/go.d/agent/filestatus/manager.go | 98 +++++++++++++++ .../plugin/go.d/agent/filestatus/manager_test.go | 122 ++++++++++++++++++ src/go/plugin/go.d/agent/filestatus/store.go | 90 ++++++++++++++ src/go/plugin/go.d/agent/filestatus/store_test.go | 138 +++++++++++++++++++++ 4 files changed, 448 insertions(+) create mode 100644 src/go/plugin/go.d/agent/filestatus/manager.go create mode 100644 src/go/plugin/go.d/agent/filestatus/manager_test.go create mode 100644 src/go/plugin/go.d/agent/filestatus/store.go create mode 100644 src/go/plugin/go.d/agent/filestatus/store_test.go (limited to 'src/go/plugin/go.d/agent/filestatus') diff --git a/src/go/plugin/go.d/agent/filestatus/manager.go b/src/go/plugin/go.d/agent/filestatus/manager.go new file mode 100644 index 000000000..03e0dd2fc --- /dev/null +++ b/src/go/plugin/go.d/agent/filestatus/manager.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package filestatus + +import ( + "context" + "log/slog" + "os" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +func NewManager(path string) *Manager { + return &Manager{ + Logger: logger.New().With( + slog.String("component", "filestatus manager"), + ), + path: path, + store: &Store{}, + flushEvery: time.Second * 5, + flushCh: make(chan struct{}, 1), + } +} + +type Manager struct { + *logger.Logger + + path string + + store *Store + + flushEvery time.Duration + flushCh chan struct{} +} + +func (m *Manager) Run(ctx context.Context) { + m.Info("instance is started") + defer func() { m.Info("instance is stopped") }() + + tk := time.NewTicker(m.flushEvery) + defer tk.Stop() + defer m.flush() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + m.tryFlush() + } + } +} + +func (m *Manager) Save(cfg confgroup.Config, status string) { + if v, ok := m.store.lookup(cfg); !ok || status != v { + m.store.add(cfg, status) + m.triggerFlush() + } +} + +func (m *Manager) Remove(cfg confgroup.Config) { + if _, ok := m.store.lookup(cfg); ok { + m.store.remove(cfg) + m.triggerFlush() + } +} + +func (m *Manager) triggerFlush() { + select { + case m.flushCh <- struct{}{}: + default: + } +} + +func (m *Manager) tryFlush() { + select { + case <-m.flushCh: + m.flush() + default: + } +} + +func (m *Manager) flush() { + bs, err := m.store.bytes() + if err != nil { + return + } + + f, err := os.Create(m.path) + if err != nil { + return + } + defer func() { _ = f.Close() }() + + _, _ = f.Write(bs) +} diff --git a/src/go/plugin/go.d/agent/filestatus/manager_test.go b/src/go/plugin/go.d/agent/filestatus/manager_test.go new file mode 100644 index 000000000..1c7b32884 --- /dev/null +++ b/src/go/plugin/go.d/agent/filestatus/manager_test.go @@ -0,0 +1,122 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package filestatus + +import ( + "context" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewManager(t *testing.T) { + mgr := NewManager("") + assert.NotNil(t, mgr.store) +} + +func TestManager_Run(t *testing.T) { + type testAction struct { + name string + cfg confgroup.Config + status string + } + tests := map[string]struct { + actions []testAction + wantFile string + }{ + "save": { + actions: []testAction{ + { + name: "save", status: "ok", + cfg: prepareConfig("module", "module1", "name", "name1"), + }, + { + name: "save", status: "ok", + cfg: prepareConfig("module", "module2", "name", "name2"), + }, + }, + wantFile: ` +{ + "module1": { + "name1:5956328514325012774": "ok" + }, + "module2": { + "name2:14684454322123948394": "ok" + } +} +`, + }, + "remove": { + actions: []testAction{ + { + name: "save", status: "ok", + cfg: prepareConfig("module", "module1", "name", "name1"), + }, + { + name: "save", status: "ok", + cfg: prepareConfig("module", "module2", "name", "name2"), + }, + { + name: "remove", + cfg: prepareConfig("module", "module2", "name", "name2"), + }, + }, + wantFile: ` +{ + "module1": { + "name1:5956328514325012774": "ok" + } +} +`, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + dir, err := os.MkdirTemp(os.TempDir(), "netdata-go-test-filestatus-run") + require.NoError(t, err) + defer func() { assert.NoError(t, os.RemoveAll(dir)) }() + + filename := path.Join(dir, "filestatus") + + mgr := NewManager(filename) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { defer close(done); mgr.Run(ctx) }() + + for _, v := range test.actions { + switch v.name { + case "save": + mgr.Save(v.cfg, v.status) + case "remove": + mgr.Remove(v.cfg) + } + } + + cancel() + + timeout := time.Second * 5 + tk := time.NewTimer(timeout) + defer tk.Stop() + + select { + case <-done: + case <-tk.C: + t.Errorf("timed out after %s", timeout) + } + + bs, err := os.ReadFile(filename) + require.NoError(t, err) + + assert.Equal(t, strings.TrimSpace(test.wantFile), string(bs)) + }) + } +} diff --git a/src/go/plugin/go.d/agent/filestatus/store.go b/src/go/plugin/go.d/agent/filestatus/store.go new file mode 100644 index 000000000..3f500dec6 --- /dev/null +++ b/src/go/plugin/go.d/agent/filestatus/store.go @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package filestatus + +import ( + "encoding/json" + "fmt" + "os" + "slices" + "sync" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +func LoadStore(path string) (*Store, error) { + var s Store + + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer func() { _ = f.Close() }() + + return &s, json.NewDecoder(f).Decode(&s.items) +} + +type Store struct { + mux sync.Mutex + items map[string]map[string]string // [module][name:hash]status +} + +func (s *Store) Contains(cfg confgroup.Config, statuses ...string) bool { + status, ok := s.lookup(cfg) + if !ok { + return false + } + + return slices.Contains(statuses, status) +} + +func (s *Store) lookup(cfg confgroup.Config) (string, bool) { + s.mux.Lock() + defer s.mux.Unlock() + + jobs, ok := s.items[cfg.Module()] + if !ok { + return "", false + } + + status, ok := jobs[storeJobKey(cfg)] + + return status, ok +} + +func (s *Store) add(cfg confgroup.Config, status string) { + s.mux.Lock() + defer s.mux.Unlock() + + if s.items == nil { + s.items = make(map[string]map[string]string) + } + + if s.items[cfg.Module()] == nil { + s.items[cfg.Module()] = make(map[string]string) + } + + s.items[cfg.Module()][storeJobKey(cfg)] = status +} + +func (s *Store) remove(cfg confgroup.Config) { + s.mux.Lock() + defer s.mux.Unlock() + + delete(s.items[cfg.Module()], storeJobKey(cfg)) + + if len(s.items[cfg.Module()]) == 0 { + delete(s.items, cfg.Module()) + } +} + +func (s *Store) bytes() ([]byte, error) { + s.mux.Lock() + defer s.mux.Unlock() + + return json.MarshalIndent(s.items, "", " ") +} + +func storeJobKey(cfg confgroup.Config) string { + return fmt.Sprintf("%s:%d", cfg.Name(), cfg.Hash()) +} diff --git a/src/go/plugin/go.d/agent/filestatus/store_test.go b/src/go/plugin/go.d/agent/filestatus/store_test.go new file mode 100644 index 000000000..d8e18539e --- /dev/null +++ b/src/go/plugin/go.d/agent/filestatus/store_test.go @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package filestatus + +import ( + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + + "github.com/stretchr/testify/assert" +) + +// TODO: tech debt +func TestLoadStore(t *testing.T) { + +} + +// TODO: tech debt +func TestStore_Contains(t *testing.T) { + +} + +func TestStore_add(t *testing.T) { + tests := map[string]struct { + prepare func() *Store + input confgroup.Config + wantItemsNum int + }{ + "add cfg to the empty store": { + prepare: func() *Store { + return &Store{} + }, + input: prepareConfig( + "module", "modName", + "name", "jobName", + ), + wantItemsNum: 1, + }, + "add cfg that already in the store": { + prepare: func() *Store { + return &Store{ + items: map[string]map[string]string{ + "modName": {"jobName:18299273693089411682": "state"}, + }, + } + }, + input: prepareConfig( + "module", "modName", + "name", "jobName", + ), + wantItemsNum: 1, + }, + "add cfg with same module, same name, but specific options": { + prepare: func() *Store { + return &Store{ + items: map[string]map[string]string{ + "modName": {"jobName:18299273693089411682": "state"}, + }, + } + }, + input: prepareConfig( + "module", "modName", + "name", "jobName", + "opt", "val", + ), + wantItemsNum: 2, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + s := test.prepare() + s.add(test.input, "state") + assert.Equal(t, test.wantItemsNum, calcStoreItems(s)) + }) + } +} + +func TestStore_remove(t *testing.T) { + tests := map[string]struct { + prepare func() *Store + input confgroup.Config + wantItemsNum int + }{ + "remove cfg from the empty store": { + prepare: func() *Store { + return &Store{} + }, + input: prepareConfig( + "module", "modName", + "name", "jobName", + ), + wantItemsNum: 0, + }, + "remove cfg from the store": { + prepare: func() *Store { + return &Store{ + items: map[string]map[string]string{ + "modName": { + "jobName:18299273693089411682": "state", + "jobName:18299273693089411683": "state", + }, + }, + } + }, + input: prepareConfig( + "module", "modName", + "name", "jobName", + ), + wantItemsNum: 1, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + s := test.prepare() + s.remove(test.input) + assert.Equal(t, test.wantItemsNum, calcStoreItems(s)) + }) + } +} + +func calcStoreItems(s *Store) (num int) { + for _, v := range s.items { + for range v { + num++ + } + } + return num +} + +func prepareConfig(values ...string) confgroup.Config { + cfg := confgroup.Config{} + for i := 1; i < len(values); i += 2 { + cfg[values[i-1]] = values[i] + } + return cfg +} -- cgit v1.2.3