diff options
Diffstat (limited to '')
10 files changed, 1669 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/config.go b/src/go/collectors/go.d.plugin/agent/discovery/file/config.go new file mode 100644 index 000000000..cc19ee445 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/config.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "errors" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" +) + +type Config struct { + Registry confgroup.Registry + Read []string + Watch []string +} + +func validateConfig(cfg Config) error { + if len(cfg.Registry) == 0 { + return errors.New("empty config registry") + } + if len(cfg.Read)+len(cfg.Watch) == 0 { + return errors.New("discoverers not set") + } + return nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go new file mode 100644 index 000000000..97b437fc3 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +var log = logger.New().With( + slog.String("component", "discovery"), + slog.String("discoverer", "file"), +) + +func NewDiscovery(cfg Config) (*Discovery, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("file discovery config validation: %v", err) + } + + d := Discovery{ + Logger: log, + } + + if err := d.registerDiscoverers(cfg); err != nil { + return nil, fmt.Errorf("file discovery initialization: %v", err) + } + + return &d, nil +} + +type ( + Discovery struct { + *logger.Logger + discoverers []discoverer + } + discoverer interface { + Run(ctx context.Context, in chan<- []*confgroup.Group) + } +) + +func (d *Discovery) String() string { + return d.Name() +} + +func (d *Discovery) Name() string { + return fmt.Sprintf("file discovery: %v", d.discoverers) +} + +func (d *Discovery) registerDiscoverers(cfg Config) error { + if len(cfg.Read) != 0 { + d.discoverers = append(d.discoverers, NewReader(cfg.Registry, cfg.Read)) + } + if len(cfg.Watch) != 0 { + d.discoverers = append(d.discoverers, NewWatcher(cfg.Registry, cfg.Watch)) + } + if len(d.discoverers) == 0 { + return errors.New("zero registered discoverers") + } + return nil +} + +func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + var wg sync.WaitGroup + + for _, dd := range d.discoverers { + wg.Add(1) + go func(dd discoverer) { + defer wg.Done() + d.runDiscoverer(ctx, dd, in) + }(dd) + } + + wg.Wait() + <-ctx.Done() +} + +func (d *Discovery) runDiscoverer(ctx context.Context, dd discoverer, in chan<- []*confgroup.Group) { + updates := make(chan []*confgroup.Group) + go dd.Run(ctx, updates) + for { + select { + case <-ctx.Done(): + return + case groups, ok := <-updates: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case in <- groups: + } + } + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go new file mode 100644 index 000000000..2bdb669eb --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TODO: tech dept +func TestNewDiscovery(t *testing.T) { + +} + +// TODO: tech dept +func TestDiscovery_Run(t *testing.T) { + +} + +func prepareDiscovery(t *testing.T, cfg Config) *Discovery { + d, err := NewDiscovery(cfg) + require.NoError(t, err) + return d +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go b/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go new file mode 100644 index 000000000..412d2b73e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + + "gopkg.in/yaml.v2" +) + +type format int + +const ( + unknownFormat format = iota + unknownEmptyFormat + staticFormat + sdFormat +) + +func parse(req confgroup.Registry, path string) (*confgroup.Group, error) { + bs, err := os.ReadFile(path) + if err != nil { + return nil, err + } + if len(bs) == 0 { + return nil, nil + } + + switch cfgFormat(bs) { + case staticFormat: + return parseStaticFormat(req, path, bs) + case sdFormat: + return parseSDFormat(req, path, bs) + case unknownEmptyFormat: + return nil, nil + default: + return nil, fmt.Errorf("unknown file format: '%s'", path) + } +} + +func parseStaticFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) { + name := fileName(path) + // TODO: properly handle module renaming + // See agent/setup.go buildDiscoveryConf() for details + if name == "wmi" { + name = "windows" + } + modDef, ok := reg.Lookup(name) + if !ok { + return nil, nil + } + + var modCfg staticConfig + if err := yaml.Unmarshal(bs, &modCfg); err != nil { + return nil, err + } + + for _, cfg := range modCfg.Jobs { + cfg.SetModule(name) + def := mergeDef(modCfg.Default, modDef) + cfg.ApplyDefaults(def) + } + + group := &confgroup.Group{ + Configs: modCfg.Jobs, + Source: path, + } + + return group, nil +} + +func parseSDFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) { + var cfgs sdConfig + if err := yaml.Unmarshal(bs, &cfgs); err != nil { + return nil, err + } + + var i int + for _, cfg := range cfgs { + if def, ok := reg.Lookup(cfg.Module()); ok && cfg.Module() != "" { + cfg.ApplyDefaults(def) + cfgs[i] = cfg + i++ + } + } + + group := &confgroup.Group{ + Configs: cfgs[:i], + Source: path, + } + + return group, nil +} + +func cfgFormat(bs []byte) format { + var data interface{} + if err := yaml.Unmarshal(bs, &data); err != nil { + return unknownFormat + } + if data == nil { + return unknownEmptyFormat + } + + type ( + static = map[any]any + sd = []any + ) + switch data.(type) { + case static: + return staticFormat + case sd: + return sdFormat + default: + return unknownFormat + } +} + +func mergeDef(a, b confgroup.Default) confgroup.Default { + return confgroup.Default{ + MinUpdateEvery: firstPositive(a.MinUpdateEvery, b.MinUpdateEvery), + UpdateEvery: firstPositive(a.UpdateEvery, b.UpdateEvery), + AutoDetectionRetry: firstPositive(a.AutoDetectionRetry, b.AutoDetectionRetry), + Priority: firstPositive(a.Priority, b.Priority), + } +} + +func firstPositive(value int, others ...int) int { + if value > 0 || len(others) == 0 { + return value + } + return firstPositive(others[0], others[1:]...) +} + +func fileName(path string) string { + _, file := filepath.Split(path) + ext := filepath.Ext(path) + return file[:len(file)-len(ext)] +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go new file mode 100644 index 000000000..8b20210ff --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go @@ -0,0 +1,431 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/module" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParse(t *testing.T) { + const ( + jobDef = 11 + cfgDef = 22 + modDef = 33 + ) + tests := map[string]struct { + test func(t *testing.T, tmp *tmpDir) + }{ + "static, default: +job +conf +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := staticConfig{ + Default: confgroup.Default{ + UpdateEvery: cfgDef, + AutoDetectionRetry: cfgDef, + Priority: cfgDef, + }, + Jobs: []confgroup.Config{ + { + "name": "name", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: +job +conf +module (merge all)": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + Priority: modDef, + }, + } + cfg := staticConfig{ + Default: confgroup.Default{ + AutoDetectionRetry: cfgDef, + }, + Jobs: []confgroup.Config{ + { + "name": "name", + "update_every": jobDef, + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": jobDef, + "autodetection_retry": cfgDef, + "priority": modDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: -job +conf +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := staticConfig{ + Default: confgroup.Default{ + UpdateEvery: cfgDef, + AutoDetectionRetry: cfgDef, + Priority: cfgDef, + }, + Jobs: []confgroup.Config{ + { + "name": "name", + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": cfgDef, + "autodetection_retry": cfgDef, + "priority": cfgDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: -job -conf +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := staticConfig{ + Jobs: []confgroup.Config{ + { + "name": "name", + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "autodetection_retry": modDef, + "priority": modDef, + "update_every": modDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: -job -conf -module (+global)": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": {}, + } + cfg := staticConfig{ + Jobs: []confgroup.Config{ + { + "name": "name", + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "update_every": module.UpdateEvery, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, default: +job +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := sdConfig{ + { + "name": "name", + "module": "sd_module", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "module": "sd_module", + "name": "name", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, default: -job +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := sdConfig{ + { + "name": "name", + "module": "sd_module", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "sd_module", + "update_every": modDef, + "autodetection_retry": modDef, + "priority": modDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, default: -job -module (+global)": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "sd_module", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "sd_module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, job has no 'module' or 'module' is empty": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": {}, + } + cfg := sdConfig{ + { + "name": "name", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{}, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "conf registry has no module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{}, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "empty file": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": {}, + } + + filename := tmp.createFile("empty-*") + group, err := parse(reg, filename) + + assert.Nil(t, group) + require.NoError(t, err) + }, + }, + "only comments, unknown empty format": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{} + + filename := tmp.createFile("unknown-empty-format-*") + tmp.writeString(filename, "# a comment") + group, err := parse(reg, filename) + + assert.Nil(t, group) + assert.NoError(t, err) + }, + }, + "unknown format": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{} + + filename := tmp.createFile("unknown-format-*") + tmp.writeYAML(filename, "unknown") + group, err := parse(reg, filename) + + assert.Nil(t, group) + assert.Error(t, err) + }, + }, + } + + for name, scenario := range tests { + t.Run(name, func(t *testing.T) { + tmp := newTmpDir(t, "parse-file-*") + defer tmp.cleanup() + + scenario.test(t, tmp) + }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/read.go b/src/go/collectors/go.d.plugin/agent/discovery/file/read.go new file mode 100644 index 000000000..1b45b3767 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/read.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +type ( + staticConfig struct { + confgroup.Default `yaml:",inline"` + Jobs []confgroup.Config `yaml:"jobs"` + } + sdConfig []confgroup.Config +) + +func NewReader(reg confgroup.Registry, paths []string) *Reader { + return &Reader{ + Logger: log, + reg: reg, + paths: paths, + } +} + +type Reader struct { + *logger.Logger + + reg confgroup.Registry + paths []string +} + +func (r *Reader) String() string { + return r.Name() +} + +func (r *Reader) Name() string { + return "file reader" +} + +func (r *Reader) Run(ctx context.Context, in chan<- []*confgroup.Group) { + r.Info("instance is started") + defer func() { r.Info("instance is stopped") }() + + select { + case <-ctx.Done(): + case in <- r.groups(): + } + + close(in) +} + +func (r *Reader) groups() (groups []*confgroup.Group) { + for _, pattern := range r.paths { + matches, err := filepath.Glob(pattern) + if err != nil { + continue + } + + for _, path := range matches { + if fi, err := os.Stat(path); err != nil || !fi.Mode().IsRegular() { + continue + } + + group, err := parse(r.reg, path) + if err != nil { + r.Warningf("parse '%s': %v", path, err) + continue + } + + if group == nil { + group = &confgroup.Group{Source: path} + } else { + for _, cfg := range group.Configs { + cfg.SetProvider("file reader") + cfg.SetSourceType(configSourceType(path)) + cfg.SetSource(fmt.Sprintf("discoverer=file_reader,file=%s", path)) + } + } + groups = append(groups, group) + } + } + + return groups +} + +func configSourceType(path string) string { + if strings.Contains(path, "/etc/netdata") { + return "user" + } + return "stock" +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go new file mode 100644 index 000000000..d2404d54e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "fmt" + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/module" + + "github.com/stretchr/testify/assert" +) + +func TestReader_String(t *testing.T) { + assert.NotEmpty(t, NewReader(confgroup.Registry{}, nil)) +} + +func TestNewReader(t *testing.T) { + tests := map[string]struct { + reg confgroup.Registry + paths []string + }{ + "empty inputs": { + reg: confgroup.Registry{}, + paths: []string{}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.NotNil(t, NewReader(test.reg, test.paths)) + }) + } +} + +func TestReader_Run(t *testing.T) { + tests := map[string]struct { + createSim func(tmp *tmpDir) discoverySim + }{ + "read multiple files": { + createSim: func(tmp *tmpDir) discoverySim { + module1 := tmp.join("module1.conf") + module2 := tmp.join("module2.conf") + module3 := tmp.join("module3.conf") + + tmp.writeYAML(module1, staticConfig{ + Jobs: []confgroup.Config{{"name": "name"}}, + }) + tmp.writeYAML(module2, staticConfig{ + Jobs: []confgroup.Config{{"name": "name"}}, + }) + tmp.writeString(module3, "# a comment") + + reg := confgroup.Registry{ + "module1": {}, + "module2": {}, + "module3": {}, + } + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Read: []string{module1, module2, module3}, + }) + expected := []*confgroup.Group{ + { + Source: module1, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module1", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file reader", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module1), + }, + }, + }, + { + Source: module2, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module2", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file reader", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module2), + }, + }, + }, + { + Source: module3, + }, + } + + return discoverySim{ + discovery: discovery, + expectedGroups: expected, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + tmp := newTmpDir(t, "reader-run-*") + defer tmp.cleanup() + + test.createSim(tmp).run(t) + }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go new file mode 100644 index 000000000..cd9fa05ac --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +type ( + discoverySim struct { + discovery *Discovery + beforeRun func() + afterRun func() + expectedGroups []*confgroup.Group + } +) + +func (sim discoverySim) run(t *testing.T) { + t.Helper() + require.NotNil(t, sim.discovery) + + if sim.beforeRun != nil { + sim.beforeRun() + } + + in, out := make(chan []*confgroup.Group), make(chan []*confgroup.Group) + go sim.collectGroups(t, in, out) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + go sim.discovery.Run(ctx, in) + time.Sleep(time.Millisecond * 250) + + if sim.afterRun != nil { + sim.afterRun() + } + + actual := <-out + + sortGroups(actual) + sortGroups(sim.expectedGroups) + + assert.Equal(t, sim.expectedGroups, actual) +} + +func (sim discoverySim) collectGroups(t *testing.T, in, out chan []*confgroup.Group) { + timeout := time.Second * 5 + var groups []*confgroup.Group +loop: + for { + select { + case updates := <-in: + if groups = append(groups, updates...); len(groups) >= len(sim.expectedGroups) { + break loop + } + case <-time.After(timeout): + t.Logf("discovery %s timed out after %s, got %d groups, expected %d, some events are skipped", + sim.discovery.discoverers, timeout, len(groups), len(sim.expectedGroups)) + break loop + } + } + out <- groups +} + +type tmpDir struct { + dir string + t *testing.T +} + +func newTmpDir(t *testing.T, pattern string) *tmpDir { + pattern = "netdata-go-test-discovery-file-" + pattern + dir, err := os.MkdirTemp(os.TempDir(), pattern) + require.NoError(t, err) + return &tmpDir{dir: dir, t: t} +} + +func (d *tmpDir) cleanup() { + assert.NoError(d.t, os.RemoveAll(d.dir)) +} + +func (d *tmpDir) join(filename string) string { + return filepath.Join(d.dir, filename) +} + +func (d *tmpDir) createFile(pattern string) string { + f, err := os.CreateTemp(d.dir, pattern) + require.NoError(d.t, err) + _ = f.Close() + return f.Name() +} + +func (d *tmpDir) removeFile(filename string) { + err := os.Remove(filename) + require.NoError(d.t, err) +} + +func (d *tmpDir) renameFile(origFilename, newFilename string) { + err := os.Rename(origFilename, newFilename) + require.NoError(d.t, err) +} + +func (d *tmpDir) writeYAML(filename string, in interface{}) { + bs, err := yaml.Marshal(in) + require.NoError(d.t, err) + err = os.WriteFile(filename, bs, 0644) + require.NoError(d.t, err) +} + +func (d *tmpDir) writeString(filename, data string) { + err := os.WriteFile(filename, []byte(data), 0644) + require.NoError(d.t, err) +} + +func sortGroups(groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + sort.Slice(groups, func(i, j int) bool { return groups[i].Source < groups[j].Source }) +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go new file mode 100644 index 000000000..a723b706e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go @@ -0,0 +1,220 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/fsnotify/fsnotify" +) + +type ( + Watcher struct { + *logger.Logger + + paths []string + reg confgroup.Registry + watcher *fsnotify.Watcher + cache cache + refreshEvery time.Duration + } + cache map[string]time.Time +) + +func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok } +func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok } +func (c cache) remove(path string) { delete(c, path) } +func (c cache) put(path string, modTime time.Time) { c[path] = modTime } + +func NewWatcher(reg confgroup.Registry, paths []string) *Watcher { + d := &Watcher{ + Logger: log, + paths: paths, + reg: reg, + watcher: nil, + cache: make(cache), + refreshEvery: time.Minute, + } + return d +} + +func (w *Watcher) String() string { + return w.Name() +} + +func (w *Watcher) Name() string { + return "file watcher" +} + +func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) { + w.Info("instance is started") + defer func() { w.Info("instance is stopped") }() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + w.Errorf("fsnotify watcher initialization: %v", err) + return + } + + w.watcher = watcher + defer w.stop() + w.refresh(ctx, in) + + tk := time.NewTicker(w.refreshEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + w.refresh(ctx, in) + case event := <-w.watcher.Events: + // TODO: check if event.Has will do + if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) { + break + } + if event.Has(fsnotify.Create) && w.cache.has(event.Name) { + // vim "backupcopy=no" case, already collected after Rename event. + break + } + if event.Has(fsnotify.Rename) { + // It is common to modify files using vim. + // When writing to a file a backup is made. "backupcopy" option tells how it's done. + // Default is "no": rename the file and write a new one. + // This is cheap attempt to not send empty group for the old file. + time.Sleep(time.Millisecond * 100) + } + w.refresh(ctx, in) + case err := <-w.watcher.Errors: + if err != nil { + w.Warningf("watch: %v", err) + } + } + } +} + +func (w *Watcher) fileMatches(file string) bool { + for _, pattern := range w.paths { + if ok, _ := filepath.Match(pattern, file); ok { + return true + } + } + return false +} + +func (w *Watcher) listFiles() (files []string) { + for _, pattern := range w.paths { + if matches, err := filepath.Glob(pattern); err == nil { + files = append(files, matches...) + } + } + return files +} + +func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) { + select { + case <-ctx.Done(): + return + default: + } + var groups []*confgroup.Group + seen := make(map[string]bool) + + for _, file := range w.listFiles() { + fi, err := os.Lstat(file) + if err != nil { + w.Warningf("lstat '%s': %v", file, err) + continue + } + + if !fi.Mode().IsRegular() { + continue + } + + seen[file] = true + if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) { + continue + } + w.cache.put(file, fi.ModTime()) + + if group, err := parse(w.reg, file); err != nil { + w.Warningf("parse '%s': %v", file, err) + } else if group == nil { + groups = append(groups, &confgroup.Group{Source: file}) + } else { + for _, cfg := range group.Configs { + cfg.SetProvider("file watcher") + cfg.SetSourceType(configSourceType(file)) + cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file)) + } + groups = append(groups, group) + } + } + + for name := range w.cache { + if seen[name] { + continue + } + w.cache.remove(name) + groups = append(groups, &confgroup.Group{Source: name}) + } + + send(ctx, in, groups) + + w.watchDirs() +} + +func (w *Watcher) watchDirs() { + for _, path := range w.paths { + if idx := strings.LastIndex(path, "/"); idx > -1 { + path = path[:idx] + } else { + path = "./" + } + if err := w.watcher.Add(path); err != nil { + w.Errorf("start watching '%s': %v", path, err) + } + } +} + +func (w *Watcher) stop() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // closing the watcher deadlocks unless all events and errors are drained. + go func() { + for { + select { + case <-w.watcher.Errors: + case <-w.watcher.Events: + case <-ctx.Done(): + return + } + } + }() + + _ = w.watcher.Close() +} + +func isChmodOnly(event fsnotify.Event) bool { + return event.Op^fsnotify.Chmod == 0 +} + +func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + select { + case <-ctx.Done(): + case in <- groups: + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go new file mode 100644 index 000000000..20e21e65e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go @@ -0,0 +1,378 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "fmt" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/module" + + "github.com/stretchr/testify/assert" +) + +func TestWatcher_String(t *testing.T) { + assert.NotEmpty(t, NewWatcher(confgroup.Registry{}, nil)) +} + +func TestNewWatcher(t *testing.T) { + tests := map[string]struct { + reg confgroup.Registry + paths []string + }{ + "empty inputs": { + reg: confgroup.Registry{}, + paths: []string{}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.NotNil(t, NewWatcher(test.reg, test.paths)) + }) + } +} + +func TestWatcher_Run(t *testing.T) { + tests := map[string]struct { + createSim func(tmp *tmpDir) discoverySim + }{ + "file exists before start": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfg) + }, + expectedGroups: expected, + } + return sim + }, + }, + "empty file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeString(filename, "") + }, + expectedGroups: expected, + } + return sim + }, + }, + "only comments, no data": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeString(filename, "# a comment") + }, + expectedGroups: expected, + } + return sim + }, + }, + "add file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + afterRun: func() { + tmp.writeYAML(filename, cfg) + }, + expectedGroups: expected, + } + return sim + }, + }, + "remove file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + { + Source: filename, + Configs: nil, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfg) + }, + afterRun: func() { + tmp.removeFile(filename) + }, + expectedGroups: expected, + } + return sim + }, + }, + "change file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfgOrig := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + cfgChanged := sdConfig{ + { + "name": "name_changed", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name_changed", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfgOrig) + }, + afterRun: func() { + tmp.writeYAML(filename, cfgChanged) + time.Sleep(time.Millisecond * 500) + }, + expectedGroups: expected, + } + return sim + }, + }, + "vim 'backupcopy=no' (writing to a file and backup)": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": "stock", + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfg) + }, + afterRun: func() { + newFilename := filename + ".swp" + tmp.renameFile(filename, newFilename) + tmp.writeYAML(filename, cfg) + tmp.removeFile(newFilename) + time.Sleep(time.Millisecond * 500) + }, + expectedGroups: expected, + } + return sim + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + tmp := newTmpDir(t, "watch-run-*") + defer tmp.cleanup() + + test.createSim(tmp).run(t) + }) + } +} |