summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/agent/filestatus
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 08:15:24 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 08:15:35 +0000
commitf09848204fa5283d21ea43e262ee41aa578e1808 (patch)
treec62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/agent/filestatus
parentReleasing debian version 1.46.3-2. (diff)
downloadnetdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz
netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/agent/filestatus')
-rw-r--r--src/go/plugin/go.d/agent/filestatus/manager.go98
-rw-r--r--src/go/plugin/go.d/agent/filestatus/manager_test.go122
-rw-r--r--src/go/plugin/go.d/agent/filestatus/store.go90
-rw-r--r--src/go/plugin/go.d/agent/filestatus/store_test.go138
4 files changed, 448 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/filestatus/manager.go b/src/go/plugin/go.d/agent/filestatus/manager.go
new file mode 100644
index 000000000..03e0dd2fc
--- /dev/null
+++ b/src/go/plugin/go.d/agent/filestatus/manager.go
@@ -0,0 +1,98 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package filestatus
+
+import (
+ "context"
+ "log/slog"
+ "os"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/logger"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+)
+
+func NewManager(path string) *Manager {
+ return &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "filestatus manager"),
+ ),
+ path: path,
+ store: &Store{},
+ flushEvery: time.Second * 5,
+ flushCh: make(chan struct{}, 1),
+ }
+}
+
+type Manager struct {
+ *logger.Logger
+
+ path string
+
+ store *Store
+
+ flushEvery time.Duration
+ flushCh chan struct{}
+}
+
+func (m *Manager) Run(ctx context.Context) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ tk := time.NewTicker(m.flushEvery)
+ defer tk.Stop()
+ defer m.flush()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ m.tryFlush()
+ }
+ }
+}
+
+func (m *Manager) Save(cfg confgroup.Config, status string) {
+ if v, ok := m.store.lookup(cfg); !ok || status != v {
+ m.store.add(cfg, status)
+ m.triggerFlush()
+ }
+}
+
+func (m *Manager) Remove(cfg confgroup.Config) {
+ if _, ok := m.store.lookup(cfg); ok {
+ m.store.remove(cfg)
+ m.triggerFlush()
+ }
+}
+
+func (m *Manager) triggerFlush() {
+ select {
+ case m.flushCh <- struct{}{}:
+ default:
+ }
+}
+
+func (m *Manager) tryFlush() {
+ select {
+ case <-m.flushCh:
+ m.flush()
+ default:
+ }
+}
+
+func (m *Manager) flush() {
+ bs, err := m.store.bytes()
+ if err != nil {
+ return
+ }
+
+ f, err := os.Create(m.path)
+ if err != nil {
+ return
+ }
+ defer func() { _ = f.Close() }()
+
+ _, _ = f.Write(bs)
+}
diff --git a/src/go/plugin/go.d/agent/filestatus/manager_test.go b/src/go/plugin/go.d/agent/filestatus/manager_test.go
new file mode 100644
index 000000000..1c7b32884
--- /dev/null
+++ b/src/go/plugin/go.d/agent/filestatus/manager_test.go
@@ -0,0 +1,122 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package filestatus
+
+import (
+ "context"
+ "os"
+ "path"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewManager(t *testing.T) {
+ mgr := NewManager("")
+ assert.NotNil(t, mgr.store)
+}
+
+func TestManager_Run(t *testing.T) {
+ type testAction struct {
+ name string
+ cfg confgroup.Config
+ status string
+ }
+ tests := map[string]struct {
+ actions []testAction
+ wantFile string
+ }{
+ "save": {
+ actions: []testAction{
+ {
+ name: "save", status: "ok",
+ cfg: prepareConfig("module", "module1", "name", "name1"),
+ },
+ {
+ name: "save", status: "ok",
+ cfg: prepareConfig("module", "module2", "name", "name2"),
+ },
+ },
+ wantFile: `
+{
+ "module1": {
+ "name1:5956328514325012774": "ok"
+ },
+ "module2": {
+ "name2:14684454322123948394": "ok"
+ }
+}
+`,
+ },
+ "remove": {
+ actions: []testAction{
+ {
+ name: "save", status: "ok",
+ cfg: prepareConfig("module", "module1", "name", "name1"),
+ },
+ {
+ name: "save", status: "ok",
+ cfg: prepareConfig("module", "module2", "name", "name2"),
+ },
+ {
+ name: "remove",
+ cfg: prepareConfig("module", "module2", "name", "name2"),
+ },
+ },
+ wantFile: `
+{
+ "module1": {
+ "name1:5956328514325012774": "ok"
+ }
+}
+`,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ dir, err := os.MkdirTemp(os.TempDir(), "netdata-go-test-filestatus-run")
+ require.NoError(t, err)
+ defer func() { assert.NoError(t, os.RemoveAll(dir)) }()
+
+ filename := path.Join(dir, "filestatus")
+
+ mgr := NewManager(filename)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
+ go func() { defer close(done); mgr.Run(ctx) }()
+
+ for _, v := range test.actions {
+ switch v.name {
+ case "save":
+ mgr.Save(v.cfg, v.status)
+ case "remove":
+ mgr.Remove(v.cfg)
+ }
+ }
+
+ cancel()
+
+ timeout := time.Second * 5
+ tk := time.NewTimer(timeout)
+ defer tk.Stop()
+
+ select {
+ case <-done:
+ case <-tk.C:
+ t.Errorf("timed out after %s", timeout)
+ }
+
+ bs, err := os.ReadFile(filename)
+ require.NoError(t, err)
+
+ assert.Equal(t, strings.TrimSpace(test.wantFile), string(bs))
+ })
+ }
+}
diff --git a/src/go/plugin/go.d/agent/filestatus/store.go b/src/go/plugin/go.d/agent/filestatus/store.go
new file mode 100644
index 000000000..3f500dec6
--- /dev/null
+++ b/src/go/plugin/go.d/agent/filestatus/store.go
@@ -0,0 +1,90 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package filestatus
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "slices"
+ "sync"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+)
+
+func LoadStore(path string) (*Store, error) {
+ var s Store
+
+ f, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ defer func() { _ = f.Close() }()
+
+ return &s, json.NewDecoder(f).Decode(&s.items)
+}
+
+type Store struct {
+ mux sync.Mutex
+ items map[string]map[string]string // [module][name:hash]status
+}
+
+func (s *Store) Contains(cfg confgroup.Config, statuses ...string) bool {
+ status, ok := s.lookup(cfg)
+ if !ok {
+ return false
+ }
+
+ return slices.Contains(statuses, status)
+}
+
+func (s *Store) lookup(cfg confgroup.Config) (string, bool) {
+ s.mux.Lock()
+ defer s.mux.Unlock()
+
+ jobs, ok := s.items[cfg.Module()]
+ if !ok {
+ return "", false
+ }
+
+ status, ok := jobs[storeJobKey(cfg)]
+
+ return status, ok
+}
+
+func (s *Store) add(cfg confgroup.Config, status string) {
+ s.mux.Lock()
+ defer s.mux.Unlock()
+
+ if s.items == nil {
+ s.items = make(map[string]map[string]string)
+ }
+
+ if s.items[cfg.Module()] == nil {
+ s.items[cfg.Module()] = make(map[string]string)
+ }
+
+ s.items[cfg.Module()][storeJobKey(cfg)] = status
+}
+
+func (s *Store) remove(cfg confgroup.Config) {
+ s.mux.Lock()
+ defer s.mux.Unlock()
+
+ delete(s.items[cfg.Module()], storeJobKey(cfg))
+
+ if len(s.items[cfg.Module()]) == 0 {
+ delete(s.items, cfg.Module())
+ }
+}
+
+func (s *Store) bytes() ([]byte, error) {
+ s.mux.Lock()
+ defer s.mux.Unlock()
+
+ return json.MarshalIndent(s.items, "", " ")
+}
+
+func storeJobKey(cfg confgroup.Config) string {
+ return fmt.Sprintf("%s:%d", cfg.Name(), cfg.Hash())
+}
diff --git a/src/go/plugin/go.d/agent/filestatus/store_test.go b/src/go/plugin/go.d/agent/filestatus/store_test.go
new file mode 100644
index 000000000..d8e18539e
--- /dev/null
+++ b/src/go/plugin/go.d/agent/filestatus/store_test.go
@@ -0,0 +1,138 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package filestatus
+
+import (
+ "testing"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// TODO: tech debt
+func TestLoadStore(t *testing.T) {
+
+}
+
+// TODO: tech debt
+func TestStore_Contains(t *testing.T) {
+
+}
+
+func TestStore_add(t *testing.T) {
+ tests := map[string]struct {
+ prepare func() *Store
+ input confgroup.Config
+ wantItemsNum int
+ }{
+ "add cfg to the empty store": {
+ prepare: func() *Store {
+ return &Store{}
+ },
+ input: prepareConfig(
+ "module", "modName",
+ "name", "jobName",
+ ),
+ wantItemsNum: 1,
+ },
+ "add cfg that already in the store": {
+ prepare: func() *Store {
+ return &Store{
+ items: map[string]map[string]string{
+ "modName": {"jobName:18299273693089411682": "state"},
+ },
+ }
+ },
+ input: prepareConfig(
+ "module", "modName",
+ "name", "jobName",
+ ),
+ wantItemsNum: 1,
+ },
+ "add cfg with same module, same name, but specific options": {
+ prepare: func() *Store {
+ return &Store{
+ items: map[string]map[string]string{
+ "modName": {"jobName:18299273693089411682": "state"},
+ },
+ }
+ },
+ input: prepareConfig(
+ "module", "modName",
+ "name", "jobName",
+ "opt", "val",
+ ),
+ wantItemsNum: 2,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ s := test.prepare()
+ s.add(test.input, "state")
+ assert.Equal(t, test.wantItemsNum, calcStoreItems(s))
+ })
+ }
+}
+
+func TestStore_remove(t *testing.T) {
+ tests := map[string]struct {
+ prepare func() *Store
+ input confgroup.Config
+ wantItemsNum int
+ }{
+ "remove cfg from the empty store": {
+ prepare: func() *Store {
+ return &Store{}
+ },
+ input: prepareConfig(
+ "module", "modName",
+ "name", "jobName",
+ ),
+ wantItemsNum: 0,
+ },
+ "remove cfg from the store": {
+ prepare: func() *Store {
+ return &Store{
+ items: map[string]map[string]string{
+ "modName": {
+ "jobName:18299273693089411682": "state",
+ "jobName:18299273693089411683": "state",
+ },
+ },
+ }
+ },
+ input: prepareConfig(
+ "module", "modName",
+ "name", "jobName",
+ ),
+ wantItemsNum: 1,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ s := test.prepare()
+ s.remove(test.input)
+ assert.Equal(t, test.wantItemsNum, calcStoreItems(s))
+ })
+ }
+}
+
+func calcStoreItems(s *Store) (num int) {
+ for _, v := range s.items {
+ for range v {
+ num++
+ }
+ }
+ return num
+}
+
+func prepareConfig(values ...string) confgroup.Config {
+ cfg := confgroup.Config{}
+ for i := 1; i < len(values); i += 2 {
+ cfg[values[i-1]] = values[i]
+ }
+ return cfg
+}