summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/file
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/config.go25
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go104
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go25
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/parse.go142
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go431
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/read.go98
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go116
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go130
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/watch.go220
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go378
10 files changed, 1669 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/config.go b/src/go/collectors/go.d.plugin/agent/discovery/file/config.go
new file mode 100644
index 000000000..cc19ee445
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/config.go
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "errors"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+)
+
+type Config struct {
+ Registry confgroup.Registry
+ Read []string
+ Watch []string
+}
+
+func validateConfig(cfg Config) error {
+ if len(cfg.Registry) == 0 {
+ return errors.New("empty config registry")
+ }
+ if len(cfg.Read)+len(cfg.Watch) == 0 {
+ return errors.New("discoverers not set")
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
new file mode 100644
index 000000000..97b437fc3
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
@@ -0,0 +1,104 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "sync"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+var log = logger.New().With(
+ slog.String("component", "discovery"),
+ slog.String("discoverer", "file"),
+)
+
+func NewDiscovery(cfg Config) (*Discovery, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("file discovery config validation: %v", err)
+ }
+
+ d := Discovery{
+ Logger: log,
+ }
+
+ if err := d.registerDiscoverers(cfg); err != nil {
+ return nil, fmt.Errorf("file discovery initialization: %v", err)
+ }
+
+ return &d, nil
+}
+
+type (
+ Discovery struct {
+ *logger.Logger
+ discoverers []discoverer
+ }
+ discoverer interface {
+ Run(ctx context.Context, in chan<- []*confgroup.Group)
+ }
+)
+
+func (d *Discovery) String() string {
+ return d.Name()
+}
+
+func (d *Discovery) Name() string {
+ return fmt.Sprintf("file discovery: %v", d.discoverers)
+}
+
+func (d *Discovery) registerDiscoverers(cfg Config) error {
+ if len(cfg.Read) != 0 {
+ d.discoverers = append(d.discoverers, NewReader(cfg.Registry, cfg.Read))
+ }
+ if len(cfg.Watch) != 0 {
+ d.discoverers = append(d.discoverers, NewWatcher(cfg.Registry, cfg.Watch))
+ }
+ if len(d.discoverers) == 0 {
+ return errors.New("zero registered discoverers")
+ }
+ return nil
+}
+
+func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ for _, dd := range d.discoverers {
+ wg.Add(1)
+ go func(dd discoverer) {
+ defer wg.Done()
+ d.runDiscoverer(ctx, dd, in)
+ }(dd)
+ }
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (d *Discovery) runDiscoverer(ctx context.Context, dd discoverer, in chan<- []*confgroup.Group) {
+ updates := make(chan []*confgroup.Group)
+ go dd.Run(ctx, updates)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case groups, ok := <-updates:
+ if !ok {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case in <- groups:
+ }
+ }
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go
new file mode 100644
index 000000000..2bdb669eb
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+// TODO: tech dept
+func TestNewDiscovery(t *testing.T) {
+
+}
+
+// TODO: tech dept
+func TestDiscovery_Run(t *testing.T) {
+
+}
+
+func prepareDiscovery(t *testing.T, cfg Config) *Discovery {
+ d, err := NewDiscovery(cfg)
+ require.NoError(t, err)
+ return d
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go b/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go
new file mode 100644
index 000000000..412d2b73e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go
@@ -0,0 +1,142 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+
+ "gopkg.in/yaml.v2"
+)
+
+type format int
+
+const (
+ unknownFormat format = iota
+ unknownEmptyFormat
+ staticFormat
+ sdFormat
+)
+
+func parse(req confgroup.Registry, path string) (*confgroup.Group, error) {
+ bs, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ if len(bs) == 0 {
+ return nil, nil
+ }
+
+ switch cfgFormat(bs) {
+ case staticFormat:
+ return parseStaticFormat(req, path, bs)
+ case sdFormat:
+ return parseSDFormat(req, path, bs)
+ case unknownEmptyFormat:
+ return nil, nil
+ default:
+ return nil, fmt.Errorf("unknown file format: '%s'", path)
+ }
+}
+
+func parseStaticFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) {
+ name := fileName(path)
+ // TODO: properly handle module renaming
+ // See agent/setup.go buildDiscoveryConf() for details
+ if name == "wmi" {
+ name = "windows"
+ }
+ modDef, ok := reg.Lookup(name)
+ if !ok {
+ return nil, nil
+ }
+
+ var modCfg staticConfig
+ if err := yaml.Unmarshal(bs, &modCfg); err != nil {
+ return nil, err
+ }
+
+ for _, cfg := range modCfg.Jobs {
+ cfg.SetModule(name)
+ def := mergeDef(modCfg.Default, modDef)
+ cfg.ApplyDefaults(def)
+ }
+
+ group := &confgroup.Group{
+ Configs: modCfg.Jobs,
+ Source: path,
+ }
+
+ return group, nil
+}
+
+func parseSDFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) {
+ var cfgs sdConfig
+ if err := yaml.Unmarshal(bs, &cfgs); err != nil {
+ return nil, err
+ }
+
+ var i int
+ for _, cfg := range cfgs {
+ if def, ok := reg.Lookup(cfg.Module()); ok && cfg.Module() != "" {
+ cfg.ApplyDefaults(def)
+ cfgs[i] = cfg
+ i++
+ }
+ }
+
+ group := &confgroup.Group{
+ Configs: cfgs[:i],
+ Source: path,
+ }
+
+ return group, nil
+}
+
+func cfgFormat(bs []byte) format {
+ var data interface{}
+ if err := yaml.Unmarshal(bs, &data); err != nil {
+ return unknownFormat
+ }
+ if data == nil {
+ return unknownEmptyFormat
+ }
+
+ type (
+ static = map[any]any
+ sd = []any
+ )
+ switch data.(type) {
+ case static:
+ return staticFormat
+ case sd:
+ return sdFormat
+ default:
+ return unknownFormat
+ }
+}
+
+func mergeDef(a, b confgroup.Default) confgroup.Default {
+ return confgroup.Default{
+ MinUpdateEvery: firstPositive(a.MinUpdateEvery, b.MinUpdateEvery),
+ UpdateEvery: firstPositive(a.UpdateEvery, b.UpdateEvery),
+ AutoDetectionRetry: firstPositive(a.AutoDetectionRetry, b.AutoDetectionRetry),
+ Priority: firstPositive(a.Priority, b.Priority),
+ }
+}
+
+func firstPositive(value int, others ...int) int {
+ if value > 0 || len(others) == 0 {
+ return value
+ }
+ return firstPositive(others[0], others[1:]...)
+}
+
+func fileName(path string) string {
+ _, file := filepath.Split(path)
+ ext := filepath.Ext(path)
+ return file[:len(file)-len(ext)]
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go
new file mode 100644
index 000000000..8b20210ff
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go
@@ -0,0 +1,431 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestParse(t *testing.T) {
+ const (
+ jobDef = 11
+ cfgDef = 22
+ modDef = 33
+ )
+ tests := map[string]struct {
+ test func(t *testing.T, tmp *tmpDir)
+ }{
+ "static, default: +job +conf +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Default: confgroup.Default{
+ UpdateEvery: cfgDef,
+ AutoDetectionRetry: cfgDef,
+ Priority: cfgDef,
+ },
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: +job +conf +module (merge all)": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Default: confgroup.Default{
+ AutoDetectionRetry: cfgDef,
+ },
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ "update_every": jobDef,
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": jobDef,
+ "autodetection_retry": cfgDef,
+ "priority": modDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: -job +conf +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Default: confgroup.Default{
+ UpdateEvery: cfgDef,
+ AutoDetectionRetry: cfgDef,
+ Priority: cfgDef,
+ },
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": cfgDef,
+ "autodetection_retry": cfgDef,
+ "priority": cfgDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: -job -conf +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "autodetection_retry": modDef,
+ "priority": modDef,
+ "update_every": modDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: -job -conf -module (+global)": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := staticConfig{
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "update_every": module.UpdateEvery,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, default: +job +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "sd_module",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "module": "sd_module",
+ "name": "name",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, default: -job +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "sd_module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "sd_module",
+ "update_every": modDef,
+ "autodetection_retry": modDef,
+ "priority": modDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, default: -job -module (+global)": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "sd_module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "sd_module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, job has no 'module' or 'module' is empty": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{},
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "conf registry has no module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{},
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "empty file": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+
+ filename := tmp.createFile("empty-*")
+ group, err := parse(reg, filename)
+
+ assert.Nil(t, group)
+ require.NoError(t, err)
+ },
+ },
+ "only comments, unknown empty format": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{}
+
+ filename := tmp.createFile("unknown-empty-format-*")
+ tmp.writeString(filename, "# a comment")
+ group, err := parse(reg, filename)
+
+ assert.Nil(t, group)
+ assert.NoError(t, err)
+ },
+ },
+ "unknown format": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{}
+
+ filename := tmp.createFile("unknown-format-*")
+ tmp.writeYAML(filename, "unknown")
+ group, err := parse(reg, filename)
+
+ assert.Nil(t, group)
+ assert.Error(t, err)
+ },
+ },
+ }
+
+ for name, scenario := range tests {
+ t.Run(name, func(t *testing.T) {
+ tmp := newTmpDir(t, "parse-file-*")
+ defer tmp.cleanup()
+
+ scenario.test(t, tmp)
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/read.go b/src/go/collectors/go.d.plugin/agent/discovery/file/read.go
new file mode 100644
index 000000000..1b45b3767
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/read.go
@@ -0,0 +1,98 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+type (
+ staticConfig struct {
+ confgroup.Default `yaml:",inline"`
+ Jobs []confgroup.Config `yaml:"jobs"`
+ }
+ sdConfig []confgroup.Config
+)
+
+func NewReader(reg confgroup.Registry, paths []string) *Reader {
+ return &Reader{
+ Logger: log,
+ reg: reg,
+ paths: paths,
+ }
+}
+
+type Reader struct {
+ *logger.Logger
+
+ reg confgroup.Registry
+ paths []string
+}
+
+func (r *Reader) String() string {
+ return r.Name()
+}
+
+func (r *Reader) Name() string {
+ return "file reader"
+}
+
+func (r *Reader) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ r.Info("instance is started")
+ defer func() { r.Info("instance is stopped") }()
+
+ select {
+ case <-ctx.Done():
+ case in <- r.groups():
+ }
+
+ close(in)
+}
+
+func (r *Reader) groups() (groups []*confgroup.Group) {
+ for _, pattern := range r.paths {
+ matches, err := filepath.Glob(pattern)
+ if err != nil {
+ continue
+ }
+
+ for _, path := range matches {
+ if fi, err := os.Stat(path); err != nil || !fi.Mode().IsRegular() {
+ continue
+ }
+
+ group, err := parse(r.reg, path)
+ if err != nil {
+ r.Warningf("parse '%s': %v", path, err)
+ continue
+ }
+
+ if group == nil {
+ group = &confgroup.Group{Source: path}
+ } else {
+ for _, cfg := range group.Configs {
+ cfg.SetProvider("file reader")
+ cfg.SetSourceType(configSourceType(path))
+ cfg.SetSource(fmt.Sprintf("discoverer=file_reader,file=%s", path))
+ }
+ }
+ groups = append(groups, group)
+ }
+ }
+
+ return groups
+}
+
+func configSourceType(path string) string {
+ if strings.Contains(path, "/etc/netdata") {
+ return "user"
+ }
+ return "stock"
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go
new file mode 100644
index 000000000..d2404d54e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go
@@ -0,0 +1,116 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReader_String(t *testing.T) {
+ assert.NotEmpty(t, NewReader(confgroup.Registry{}, nil))
+}
+
+func TestNewReader(t *testing.T) {
+ tests := map[string]struct {
+ reg confgroup.Registry
+ paths []string
+ }{
+ "empty inputs": {
+ reg: confgroup.Registry{},
+ paths: []string{},
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ assert.NotNil(t, NewReader(test.reg, test.paths))
+ })
+ }
+}
+
+func TestReader_Run(t *testing.T) {
+ tests := map[string]struct {
+ createSim func(tmp *tmpDir) discoverySim
+ }{
+ "read multiple files": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ module1 := tmp.join("module1.conf")
+ module2 := tmp.join("module2.conf")
+ module3 := tmp.join("module3.conf")
+
+ tmp.writeYAML(module1, staticConfig{
+ Jobs: []confgroup.Config{{"name": "name"}},
+ })
+ tmp.writeYAML(module2, staticConfig{
+ Jobs: []confgroup.Config{{"name": "name"}},
+ })
+ tmp.writeString(module3, "# a comment")
+
+ reg := confgroup.Registry{
+ "module1": {},
+ "module2": {},
+ "module3": {},
+ }
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Read: []string{module1, module2, module3},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: module1,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module1",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file reader",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module1),
+ },
+ },
+ },
+ {
+ Source: module2,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module2",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file reader",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module2),
+ },
+ },
+ },
+ {
+ Source: module3,
+ },
+ }
+
+ return discoverySim{
+ discovery: discovery,
+ expectedGroups: expected,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ tmp := newTmpDir(t, "reader-run-*")
+ defer tmp.cleanup()
+
+ test.createSim(tmp).run(t)
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go
new file mode 100644
index 000000000..cd9fa05ac
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go
@@ -0,0 +1,130 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v2"
+)
+
+type (
+ discoverySim struct {
+ discovery *Discovery
+ beforeRun func()
+ afterRun func()
+ expectedGroups []*confgroup.Group
+ }
+)
+
+func (sim discoverySim) run(t *testing.T) {
+ t.Helper()
+ require.NotNil(t, sim.discovery)
+
+ if sim.beforeRun != nil {
+ sim.beforeRun()
+ }
+
+ in, out := make(chan []*confgroup.Group), make(chan []*confgroup.Group)
+ go sim.collectGroups(t, in, out)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ defer cancel()
+ go sim.discovery.Run(ctx, in)
+ time.Sleep(time.Millisecond * 250)
+
+ if sim.afterRun != nil {
+ sim.afterRun()
+ }
+
+ actual := <-out
+
+ sortGroups(actual)
+ sortGroups(sim.expectedGroups)
+
+ assert.Equal(t, sim.expectedGroups, actual)
+}
+
+func (sim discoverySim) collectGroups(t *testing.T, in, out chan []*confgroup.Group) {
+ timeout := time.Second * 5
+ var groups []*confgroup.Group
+loop:
+ for {
+ select {
+ case updates := <-in:
+ if groups = append(groups, updates...); len(groups) >= len(sim.expectedGroups) {
+ break loop
+ }
+ case <-time.After(timeout):
+ t.Logf("discovery %s timed out after %s, got %d groups, expected %d, some events are skipped",
+ sim.discovery.discoverers, timeout, len(groups), len(sim.expectedGroups))
+ break loop
+ }
+ }
+ out <- groups
+}
+
+type tmpDir struct {
+ dir string
+ t *testing.T
+}
+
+func newTmpDir(t *testing.T, pattern string) *tmpDir {
+ pattern = "netdata-go-test-discovery-file-" + pattern
+ dir, err := os.MkdirTemp(os.TempDir(), pattern)
+ require.NoError(t, err)
+ return &tmpDir{dir: dir, t: t}
+}
+
+func (d *tmpDir) cleanup() {
+ assert.NoError(d.t, os.RemoveAll(d.dir))
+}
+
+func (d *tmpDir) join(filename string) string {
+ return filepath.Join(d.dir, filename)
+}
+
+func (d *tmpDir) createFile(pattern string) string {
+ f, err := os.CreateTemp(d.dir, pattern)
+ require.NoError(d.t, err)
+ _ = f.Close()
+ return f.Name()
+}
+
+func (d *tmpDir) removeFile(filename string) {
+ err := os.Remove(filename)
+ require.NoError(d.t, err)
+}
+
+func (d *tmpDir) renameFile(origFilename, newFilename string) {
+ err := os.Rename(origFilename, newFilename)
+ require.NoError(d.t, err)
+}
+
+func (d *tmpDir) writeYAML(filename string, in interface{}) {
+ bs, err := yaml.Marshal(in)
+ require.NoError(d.t, err)
+ err = os.WriteFile(filename, bs, 0644)
+ require.NoError(d.t, err)
+}
+
+func (d *tmpDir) writeString(filename, data string) {
+ err := os.WriteFile(filename, []byte(data), 0644)
+ require.NoError(d.t, err)
+}
+
+func sortGroups(groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ sort.Slice(groups, func(i, j int) bool { return groups[i].Source < groups[j].Source })
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
new file mode 100644
index 000000000..a723b706e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/fsnotify/fsnotify"
+)
+
+type (
+ Watcher struct {
+ *logger.Logger
+
+ paths []string
+ reg confgroup.Registry
+ watcher *fsnotify.Watcher
+ cache cache
+ refreshEvery time.Duration
+ }
+ cache map[string]time.Time
+)
+
+func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok }
+func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok }
+func (c cache) remove(path string) { delete(c, path) }
+func (c cache) put(path string, modTime time.Time) { c[path] = modTime }
+
+func NewWatcher(reg confgroup.Registry, paths []string) *Watcher {
+ d := &Watcher{
+ Logger: log,
+ paths: paths,
+ reg: reg,
+ watcher: nil,
+ cache: make(cache),
+ refreshEvery: time.Minute,
+ }
+ return d
+}
+
+func (w *Watcher) String() string {
+ return w.Name()
+}
+
+func (w *Watcher) Name() string {
+ return "file watcher"
+}
+
+func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ w.Info("instance is started")
+ defer func() { w.Info("instance is stopped") }()
+
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ w.Errorf("fsnotify watcher initialization: %v", err)
+ return
+ }
+
+ w.watcher = watcher
+ defer w.stop()
+ w.refresh(ctx, in)
+
+ tk := time.NewTicker(w.refreshEvery)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ w.refresh(ctx, in)
+ case event := <-w.watcher.Events:
+ // TODO: check if event.Has will do
+ if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) {
+ break
+ }
+ if event.Has(fsnotify.Create) && w.cache.has(event.Name) {
+ // vim "backupcopy=no" case, already collected after Rename event.
+ break
+ }
+ if event.Has(fsnotify.Rename) {
+ // It is common to modify files using vim.
+ // When writing to a file a backup is made. "backupcopy" option tells how it's done.
+ // Default is "no": rename the file and write a new one.
+ // This is cheap attempt to not send empty group for the old file.
+ time.Sleep(time.Millisecond * 100)
+ }
+ w.refresh(ctx, in)
+ case err := <-w.watcher.Errors:
+ if err != nil {
+ w.Warningf("watch: %v", err)
+ }
+ }
+ }
+}
+
+func (w *Watcher) fileMatches(file string) bool {
+ for _, pattern := range w.paths {
+ if ok, _ := filepath.Match(pattern, file); ok {
+ return true
+ }
+ }
+ return false
+}
+
+func (w *Watcher) listFiles() (files []string) {
+ for _, pattern := range w.paths {
+ if matches, err := filepath.Glob(pattern); err == nil {
+ files = append(files, matches...)
+ }
+ }
+ return files
+}
+
+func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ var groups []*confgroup.Group
+ seen := make(map[string]bool)
+
+ for _, file := range w.listFiles() {
+ fi, err := os.Lstat(file)
+ if err != nil {
+ w.Warningf("lstat '%s': %v", file, err)
+ continue
+ }
+
+ if !fi.Mode().IsRegular() {
+ continue
+ }
+
+ seen[file] = true
+ if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) {
+ continue
+ }
+ w.cache.put(file, fi.ModTime())
+
+ if group, err := parse(w.reg, file); err != nil {
+ w.Warningf("parse '%s': %v", file, err)
+ } else if group == nil {
+ groups = append(groups, &confgroup.Group{Source: file})
+ } else {
+ for _, cfg := range group.Configs {
+ cfg.SetProvider("file watcher")
+ cfg.SetSourceType(configSourceType(file))
+ cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file))
+ }
+ groups = append(groups, group)
+ }
+ }
+
+ for name := range w.cache {
+ if seen[name] {
+ continue
+ }
+ w.cache.remove(name)
+ groups = append(groups, &confgroup.Group{Source: name})
+ }
+
+ send(ctx, in, groups)
+
+ w.watchDirs()
+}
+
+func (w *Watcher) watchDirs() {
+ for _, path := range w.paths {
+ if idx := strings.LastIndex(path, "/"); idx > -1 {
+ path = path[:idx]
+ } else {
+ path = "./"
+ }
+ if err := w.watcher.Add(path); err != nil {
+ w.Errorf("start watching '%s': %v", path, err)
+ }
+ }
+}
+
+func (w *Watcher) stop() {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // closing the watcher deadlocks unless all events and errors are drained.
+ go func() {
+ for {
+ select {
+ case <-w.watcher.Errors:
+ case <-w.watcher.Events:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ _ = w.watcher.Close()
+}
+
+func isChmodOnly(event fsnotify.Event) bool {
+ return event.Op^fsnotify.Chmod == 0
+}
+
+func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ case in <- groups:
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go
new file mode 100644
index 000000000..20e21e65e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go
@@ -0,0 +1,378 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWatcher_String(t *testing.T) {
+ assert.NotEmpty(t, NewWatcher(confgroup.Registry{}, nil))
+}
+
+func TestNewWatcher(t *testing.T) {
+ tests := map[string]struct {
+ reg confgroup.Registry
+ paths []string
+ }{
+ "empty inputs": {
+ reg: confgroup.Registry{},
+ paths: []string{},
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ assert.NotNil(t, NewWatcher(test.reg, test.paths))
+ })
+ }
+}
+
+func TestWatcher_Run(t *testing.T) {
+ tests := map[string]struct {
+ createSim func(tmp *tmpDir) discoverySim
+ }{
+ "file exists before start": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "empty file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeString(filename, "")
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "only comments, no data": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeString(filename, "# a comment")
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "add file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ afterRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "remove file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ {
+ Source: filename,
+ Configs: nil,
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ afterRun: func() {
+ tmp.removeFile(filename)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "change file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfgOrig := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ cfgChanged := sdConfig{
+ {
+ "name": "name_changed",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name_changed",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfgOrig)
+ },
+ afterRun: func() {
+ tmp.writeYAML(filename, cfgChanged)
+ time.Sleep(time.Millisecond * 500)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "vim 'backupcopy=no' (writing to a file and backup)": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": "stock",
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ afterRun: func() {
+ newFilename := filename + ".swp"
+ tmp.renameFile(filename, newFilename)
+ tmp.writeYAML(filename, cfg)
+ tmp.removeFile(newFilename)
+ time.Sleep(time.Millisecond * 500)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ tmp := newTmpDir(t, "watch-run-*")
+ defer tmp.cleanup()
+
+ test.createSim(tmp).run(t)
+ })
+ }
+}