diff options
Diffstat (limited to 'src/go/plugin/go.d/agent/discovery')
56 files changed, 9242 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/discovery/cache.go b/src/go/plugin/go.d/agent/discovery/cache.go new file mode 100644 index 000000000..032ccca38 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/cache.go @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package discovery + +import ( + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +type cache map[string]*confgroup.Group // [Source] + +func newCache() *cache { + return &cache{} +} + +func (c cache) update(groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + for _, group := range groups { + if group != nil { + c[group.Source] = group + } + } +} + +func (c cache) reset() { + for key := range c { + delete(c, key) + } +} + +func (c cache) groups() []*confgroup.Group { + groups := make([]*confgroup.Group, 0, len(c)) + for _, group := range c { + groups = append(groups, group) + } + return groups +} diff --git a/src/go/plugin/go.d/agent/discovery/config.go b/src/go/plugin/go.d/agent/discovery/config.go new file mode 100644 index 000000000..258d1b830 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/config.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package discovery + +import ( + "errors" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/dummy" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/file" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd" +) + +type Config struct { + Registry confgroup.Registry + File file.Config + Dummy dummy.Config + SD sd.Config +} + +func validateConfig(cfg Config) error { + if len(cfg.Registry) == 0 { + return errors.New("empty config registry") + } + if len(cfg.File.Read)+len(cfg.File.Watch) == 0 && len(cfg.Dummy.Names) == 0 { + return errors.New("discoverers not set") + } + return nil +} diff --git a/src/go/plugin/go.d/agent/discovery/dummy/config.go b/src/go/plugin/go.d/agent/discovery/dummy/config.go new file mode 100644 index 000000000..1e8e8f333 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/dummy/config.go @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dummy + +import ( + "errors" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +type Config struct { + Registry confgroup.Registry + Names []string +} + +func validateConfig(cfg Config) error { + if len(cfg.Registry) == 0 { + return errors.New("empty config registry") + } + if len(cfg.Names) == 0 { + return errors.New("names not set") + } + return nil +} diff --git a/src/go/plugin/go.d/agent/discovery/dummy/discovery.go b/src/go/plugin/go.d/agent/discovery/dummy/discovery.go new file mode 100644 index 000000000..6fad0f059 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/dummy/discovery.go @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dummy + +import ( + "context" + "fmt" + "log/slog" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +func NewDiscovery(cfg Config) (*Discovery, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("config validation: %v", err) + } + d := &Discovery{ + Logger: logger.New().With( + slog.String("component", "discovery"), + slog.String("discoverer", "dummy"), + ), + reg: cfg.Registry, + names: cfg.Names, + } + return d, nil +} + +type Discovery struct { + *logger.Logger + + reg confgroup.Registry + names []string +} + +func (d *Discovery) String() string { + return d.Name() +} + +func (d *Discovery) Name() string { + return "dummy discovery" +} + +func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + select { + case <-ctx.Done(): + case in <- d.groups(): + } + + close(in) +} + +func (d *Discovery) groups() []*confgroup.Group { + group := &confgroup.Group{Source: "internal"} + + for _, name := range d.names { + def, ok := d.reg.Lookup(name) + if !ok { + continue + } + src := "internal" + cfg := confgroup.Config{} + cfg.SetModule(name) + cfg.SetProvider("dummy") + cfg.SetSourceType(confgroup.TypeStock) + cfg.SetSource(src) + cfg.ApplyDefaults(def) + + group.Configs = append(group.Configs, cfg) + } + + return []*confgroup.Group{group} +} diff --git a/src/go/plugin/go.d/agent/discovery/dummy/discovery_test.go b/src/go/plugin/go.d/agent/discovery/dummy/discovery_test.go new file mode 100644 index 000000000..2c908eb66 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/dummy/discovery_test.go @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dummy + +import ( + "context" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewDiscovery(t *testing.T) { + tests := map[string]struct { + cfg Config + wantErr bool + }{ + "valid config": { + cfg: Config{ + Registry: confgroup.Registry{"module1": confgroup.Default{}}, + Names: []string{"module1", "module2"}, + }, + }, + "invalid config, registry not set": { + cfg: Config{ + Names: []string{"module1", "module2"}, + }, + wantErr: true, + }, + "invalid config, names not set": { + cfg: Config{ + Names: []string{"module1", "module2"}, + }, + wantErr: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + d, err := NewDiscovery(test.cfg) + + if test.wantErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.NotNil(t, d) + } + }) + } +} + +func TestDiscovery_Run(t *testing.T) { + expected := []*confgroup.Group{ + { + Source: "internal", + Configs: []confgroup.Config{ + { + "name": "module1", + "module": "module1", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "dummy", + "__source_type__": confgroup.TypeStock, + "__source__": "internal", + }, + { + "name": "module2", + "module": "module2", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "dummy", + "__source_type__": confgroup.TypeStock, + "__source__": "internal", + }, + }, + }, + } + + reg := confgroup.Registry{ + "module1": {}, + "module2": {}, + } + cfg := Config{ + Registry: reg, + Names: []string{"module1", "module2"}, + } + + discovery, err := NewDiscovery(cfg) + require.NoError(t, err) + + in := make(chan []*confgroup.Group) + timeout := time.Second * 2 + + go discovery.Run(context.Background(), in) + + var actual []*confgroup.Group + select { + case actual = <-in: + case <-time.After(timeout): + t.Logf("discovery timed out after %s", timeout) + } + assert.Equal(t, expected, actual) +} diff --git a/src/go/plugin/go.d/agent/discovery/file/config.go b/src/go/plugin/go.d/agent/discovery/file/config.go new file mode 100644 index 000000000..3836d201a --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/config.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "errors" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +type Config struct { + Registry confgroup.Registry + Read []string + Watch []string +} + +func validateConfig(cfg Config) error { + if len(cfg.Registry) == 0 { + return errors.New("empty config registry") + } + if len(cfg.Read)+len(cfg.Watch) == 0 { + return errors.New("discoverers not set") + } + return nil +} diff --git a/src/go/plugin/go.d/agent/discovery/file/discovery.go b/src/go/plugin/go.d/agent/discovery/file/discovery.go new file mode 100644 index 000000000..527b1cbbc --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/discovery.go @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +var log = logger.New().With( + slog.String("component", "discovery"), + slog.String("discoverer", "file"), +) + +func NewDiscovery(cfg Config) (*Discovery, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("file discovery config validation: %v", err) + } + + d := Discovery{ + Logger: log, + } + + if err := d.registerDiscoverers(cfg); err != nil { + return nil, fmt.Errorf("file discovery initialization: %v", err) + } + + return &d, nil +} + +type ( + Discovery struct { + *logger.Logger + discoverers []discoverer + } + discoverer interface { + Run(ctx context.Context, in chan<- []*confgroup.Group) + } +) + +func (d *Discovery) String() string { + return d.Name() +} + +func (d *Discovery) Name() string { + return fmt.Sprintf("file discovery: %v", d.discoverers) +} + +func (d *Discovery) registerDiscoverers(cfg Config) error { + if len(cfg.Read) != 0 { + d.discoverers = append(d.discoverers, NewReader(cfg.Registry, cfg.Read)) + } + if len(cfg.Watch) != 0 { + d.discoverers = append(d.discoverers, NewWatcher(cfg.Registry, cfg.Watch)) + } + if len(d.discoverers) == 0 { + return errors.New("zero registered discoverers") + } + return nil +} + +func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + var wg sync.WaitGroup + + for _, dd := range d.discoverers { + wg.Add(1) + go func(dd discoverer) { + defer wg.Done() + d.runDiscoverer(ctx, dd, in) + }(dd) + } + + wg.Wait() + <-ctx.Done() +} + +func (d *Discovery) runDiscoverer(ctx context.Context, dd discoverer, in chan<- []*confgroup.Group) { + updates := make(chan []*confgroup.Group) + go dd.Run(ctx, updates) + for { + select { + case <-ctx.Done(): + return + case groups, ok := <-updates: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case in <- groups: + } + } + } +} diff --git a/src/go/plugin/go.d/agent/discovery/file/discovery_test.go b/src/go/plugin/go.d/agent/discovery/file/discovery_test.go new file mode 100644 index 000000000..2bdb669eb --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/discovery_test.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TODO: tech dept +func TestNewDiscovery(t *testing.T) { + +} + +// TODO: tech dept +func TestDiscovery_Run(t *testing.T) { + +} + +func prepareDiscovery(t *testing.T, cfg Config) *Discovery { + d, err := NewDiscovery(cfg) + require.NoError(t, err) + return d +} diff --git a/src/go/plugin/go.d/agent/discovery/file/parse.go b/src/go/plugin/go.d/agent/discovery/file/parse.go new file mode 100644 index 000000000..5fd31f32a --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/parse.go @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + + "gopkg.in/yaml.v2" +) + +type format int + +const ( + unknownFormat format = iota + unknownEmptyFormat + staticFormat + sdFormat +) + +func parse(req confgroup.Registry, path string) (*confgroup.Group, error) { + bs, err := os.ReadFile(path) + if err != nil { + return nil, err + } + if len(bs) == 0 { + return nil, nil + } + + switch cfgFormat(bs) { + case staticFormat: + return parseStaticFormat(req, path, bs) + case sdFormat: + return parseSDFormat(req, path, bs) + case unknownEmptyFormat: + return nil, nil + default: + return nil, fmt.Errorf("unknown file format: '%s'", path) + } +} + +func parseStaticFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) { + name := fileName(path) + // TODO: properly handle module renaming + // See agent/setup.go buildDiscoveryConf() for details + if name == "wmi" { + name = "windows" + } + modDef, ok := reg.Lookup(name) + if !ok { + return nil, nil + } + + var modCfg staticConfig + if err := yaml.Unmarshal(bs, &modCfg); err != nil { + return nil, err + } + + for _, cfg := range modCfg.Jobs { + cfg.SetModule(name) + def := mergeDef(modCfg.Default, modDef) + cfg.ApplyDefaults(def) + } + + group := &confgroup.Group{ + Configs: modCfg.Jobs, + Source: path, + } + + return group, nil +} + +func parseSDFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) { + var cfgs sdConfig + if err := yaml.Unmarshal(bs, &cfgs); err != nil { + return nil, err + } + + var i int + for _, cfg := range cfgs { + if def, ok := reg.Lookup(cfg.Module()); ok && cfg.Module() != "" { + cfg.ApplyDefaults(def) + cfgs[i] = cfg + i++ + } + } + + group := &confgroup.Group{ + Configs: cfgs[:i], + Source: path, + } + + return group, nil +} + +func cfgFormat(bs []byte) format { + var data interface{} + if err := yaml.Unmarshal(bs, &data); err != nil { + return unknownFormat + } + if data == nil { + return unknownEmptyFormat + } + + type ( + static = map[any]any + sd = []any + ) + switch data.(type) { + case static: + return staticFormat + case sd: + return sdFormat + default: + return unknownFormat + } +} + +func mergeDef(a, b confgroup.Default) confgroup.Default { + return confgroup.Default{ + MinUpdateEvery: firstPositive(a.MinUpdateEvery, b.MinUpdateEvery), + UpdateEvery: firstPositive(a.UpdateEvery, b.UpdateEvery), + AutoDetectionRetry: firstPositive(a.AutoDetectionRetry, b.AutoDetectionRetry), + Priority: firstPositive(a.Priority, b.Priority), + } +} + +func firstPositive(value int, others ...int) int { + if value > 0 || len(others) == 0 { + return value + } + return firstPositive(others[0], others[1:]...) +} + +func fileName(path string) string { + _, file := filepath.Split(path) + ext := filepath.Ext(path) + return file[:len(file)-len(ext)] +} diff --git a/src/go/plugin/go.d/agent/discovery/file/parse_test.go b/src/go/plugin/go.d/agent/discovery/file/parse_test.go new file mode 100644 index 000000000..5790f5650 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/parse_test.go @@ -0,0 +1,431 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParse(t *testing.T) { + const ( + jobDef = 11 + cfgDef = 22 + modDef = 33 + ) + tests := map[string]struct { + test func(t *testing.T, tmp *tmpDir) + }{ + "static, default: +job +conf +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := staticConfig{ + Default: confgroup.Default{ + UpdateEvery: cfgDef, + AutoDetectionRetry: cfgDef, + Priority: cfgDef, + }, + Jobs: []confgroup.Config{ + { + "name": "name", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: +job +conf +module (merge all)": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + Priority: modDef, + }, + } + cfg := staticConfig{ + Default: confgroup.Default{ + AutoDetectionRetry: cfgDef, + }, + Jobs: []confgroup.Config{ + { + "name": "name", + "update_every": jobDef, + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": jobDef, + "autodetection_retry": cfgDef, + "priority": modDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: -job +conf +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := staticConfig{ + Default: confgroup.Default{ + UpdateEvery: cfgDef, + AutoDetectionRetry: cfgDef, + Priority: cfgDef, + }, + Jobs: []confgroup.Config{ + { + "name": "name", + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": cfgDef, + "autodetection_retry": cfgDef, + "priority": cfgDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: -job -conf +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := staticConfig{ + Jobs: []confgroup.Config{ + { + "name": "name", + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "autodetection_retry": modDef, + "priority": modDef, + "update_every": modDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "static, default: -job -conf -module (+global)": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": {}, + } + cfg := staticConfig{ + Jobs: []confgroup.Config{ + { + "name": "name", + }, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "update_every": module.UpdateEvery, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, default: +job +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := sdConfig{ + { + "name": "name", + "module": "sd_module", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "module": "sd_module", + "name": "name", + "update_every": jobDef, + "autodetection_retry": jobDef, + "priority": jobDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, default: -job +module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": { + UpdateEvery: modDef, + AutoDetectionRetry: modDef, + Priority: modDef, + }, + } + cfg := sdConfig{ + { + "name": "name", + "module": "sd_module", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "sd_module", + "update_every": modDef, + "autodetection_retry": modDef, + "priority": modDef, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, default: -job -module (+global)": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "sd_module", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "sd_module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + }, + }, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "sd, job has no 'module' or 'module' is empty": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": {}, + } + cfg := sdConfig{ + { + "name": "name", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{}, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "conf registry has no module": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "sd_module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + tmp.writeYAML(filename, cfg) + + expected := &confgroup.Group{ + Source: filename, + Configs: []confgroup.Config{}, + } + + group, err := parse(reg, filename) + + require.NoError(t, err) + assert.Equal(t, expected, group) + }, + }, + "empty file": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{ + "module": {}, + } + + filename := tmp.createFile("empty-*") + group, err := parse(reg, filename) + + assert.Nil(t, group) + require.NoError(t, err) + }, + }, + "only comments, unknown empty format": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{} + + filename := tmp.createFile("unknown-empty-format-*") + tmp.writeString(filename, "# a comment") + group, err := parse(reg, filename) + + assert.Nil(t, group) + assert.NoError(t, err) + }, + }, + "unknown format": { + test: func(t *testing.T, tmp *tmpDir) { + reg := confgroup.Registry{} + + filename := tmp.createFile("unknown-format-*") + tmp.writeYAML(filename, "unknown") + group, err := parse(reg, filename) + + assert.Nil(t, group) + assert.Error(t, err) + }, + }, + } + + for name, scenario := range tests { + t.Run(name, func(t *testing.T) { + tmp := newTmpDir(t, "parse-file-*") + defer tmp.cleanup() + + scenario.test(t, tmp) + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/file/read.go b/src/go/plugin/go.d/agent/discovery/file/read.go new file mode 100644 index 000000000..3e7869ba7 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/read.go @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" +) + +type ( + staticConfig struct { + confgroup.Default `yaml:",inline"` + Jobs []confgroup.Config `yaml:"jobs"` + } + sdConfig []confgroup.Config +) + +func NewReader(reg confgroup.Registry, paths []string) *Reader { + return &Reader{ + Logger: log, + reg: reg, + paths: paths, + } +} + +type Reader struct { + *logger.Logger + + reg confgroup.Registry + paths []string +} + +func (r *Reader) String() string { + return r.Name() +} + +func (r *Reader) Name() string { + return "file reader" +} + +func (r *Reader) Run(ctx context.Context, in chan<- []*confgroup.Group) { + r.Info("instance is started") + defer func() { r.Info("instance is stopped") }() + + select { + case <-ctx.Done(): + case in <- r.groups(): + } + + close(in) +} + +func (r *Reader) groups() (groups []*confgroup.Group) { + for _, pattern := range r.paths { + matches, err := filepath.Glob(pattern) + if err != nil { + continue + } + + for _, path := range matches { + if fi, err := os.Stat(path); err != nil || !fi.Mode().IsRegular() { + continue + } + + group, err := parse(r.reg, path) + if err != nil { + r.Warningf("parse '%s': %v", path, err) + continue + } + + if group == nil { + group = &confgroup.Group{Source: path} + } else { + for _, cfg := range group.Configs { + cfg.SetProvider("file reader") + cfg.SetSourceType(configSourceType(path)) + cfg.SetSource(fmt.Sprintf("discoverer=file_reader,file=%s", path)) + } + } + groups = append(groups, group) + } + } + + return groups +} + +func configSourceType(path string) string { + if strings.Contains(path, "/etc/netdata") { + return "user" + } + return "stock" +} diff --git a/src/go/plugin/go.d/agent/discovery/file/read_test.go b/src/go/plugin/go.d/agent/discovery/file/read_test.go new file mode 100644 index 000000000..1bde06c5e --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/read_test.go @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "fmt" + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + + "github.com/stretchr/testify/assert" +) + +func TestReader_String(t *testing.T) { + assert.NotEmpty(t, NewReader(confgroup.Registry{}, nil)) +} + +func TestNewReader(t *testing.T) { + tests := map[string]struct { + reg confgroup.Registry + paths []string + }{ + "empty inputs": { + reg: confgroup.Registry{}, + paths: []string{}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.NotNil(t, NewReader(test.reg, test.paths)) + }) + } +} + +func TestReader_Run(t *testing.T) { + tests := map[string]struct { + createSim func(tmp *tmpDir) discoverySim + }{ + "read multiple files": { + createSim: func(tmp *tmpDir) discoverySim { + module1 := tmp.join("module1.conf") + module2 := tmp.join("module2.conf") + module3 := tmp.join("module3.conf") + + tmp.writeYAML(module1, staticConfig{ + Jobs: []confgroup.Config{{"name": "name"}}, + }) + tmp.writeYAML(module2, staticConfig{ + Jobs: []confgroup.Config{{"name": "name"}}, + }) + tmp.writeString(module3, "# a comment") + + reg := confgroup.Registry{ + "module1": {}, + "module2": {}, + "module3": {}, + } + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Read: []string{module1, module2, module3}, + }) + expected := []*confgroup.Group{ + { + Source: module1, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module1", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file reader", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module1), + }, + }, + }, + { + Source: module2, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module2", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file reader", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module2), + }, + }, + }, + { + Source: module3, + }, + } + + return discoverySim{ + discovery: discovery, + expectedGroups: expected, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + tmp := newTmpDir(t, "reader-run-*") + defer tmp.cleanup() + + test.createSim(tmp).run(t) + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/file/sim_test.go b/src/go/plugin/go.d/agent/discovery/file/sim_test.go new file mode 100644 index 000000000..3219c6892 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/sim_test.go @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +type ( + discoverySim struct { + discovery *Discovery + beforeRun func() + afterRun func() + expectedGroups []*confgroup.Group + } +) + +func (sim discoverySim) run(t *testing.T) { + t.Helper() + require.NotNil(t, sim.discovery) + + if sim.beforeRun != nil { + sim.beforeRun() + } + + in, out := make(chan []*confgroup.Group), make(chan []*confgroup.Group) + go sim.collectGroups(t, in, out) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + go sim.discovery.Run(ctx, in) + time.Sleep(time.Millisecond * 250) + + if sim.afterRun != nil { + sim.afterRun() + } + + actual := <-out + + sortGroups(actual) + sortGroups(sim.expectedGroups) + + assert.Equal(t, sim.expectedGroups, actual) +} + +func (sim discoverySim) collectGroups(t *testing.T, in, out chan []*confgroup.Group) { + timeout := time.Second * 5 + var groups []*confgroup.Group +loop: + for { + select { + case updates := <-in: + if groups = append(groups, updates...); len(groups) >= len(sim.expectedGroups) { + break loop + } + case <-time.After(timeout): + t.Logf("discovery %s timed out after %s, got %d groups, expected %d, some events are skipped", + sim.discovery.discoverers, timeout, len(groups), len(sim.expectedGroups)) + break loop + } + } + out <- groups +} + +type tmpDir struct { + dir string + t *testing.T +} + +func newTmpDir(t *testing.T, pattern string) *tmpDir { + pattern = "netdata-go-test-discovery-file-" + pattern + dir, err := os.MkdirTemp(os.TempDir(), pattern) + require.NoError(t, err) + return &tmpDir{dir: dir, t: t} +} + +func (d *tmpDir) cleanup() { + assert.NoError(d.t, os.RemoveAll(d.dir)) +} + +func (d *tmpDir) join(filename string) string { + return filepath.Join(d.dir, filename) +} + +func (d *tmpDir) createFile(pattern string) string { + f, err := os.CreateTemp(d.dir, pattern) + require.NoError(d.t, err) + _ = f.Close() + return f.Name() +} + +func (d *tmpDir) removeFile(filename string) { + err := os.Remove(filename) + require.NoError(d.t, err) +} + +func (d *tmpDir) renameFile(origFilename, newFilename string) { + err := os.Rename(origFilename, newFilename) + require.NoError(d.t, err) +} + +func (d *tmpDir) writeYAML(filename string, in interface{}) { + bs, err := yaml.Marshal(in) + require.NoError(d.t, err) + err = os.WriteFile(filename, bs, 0644) + require.NoError(d.t, err) +} + +func (d *tmpDir) writeString(filename, data string) { + err := os.WriteFile(filename, []byte(data), 0644) + require.NoError(d.t, err) +} + +func sortGroups(groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + sort.Slice(groups, func(i, j int) bool { return groups[i].Source < groups[j].Source }) +} diff --git a/src/go/plugin/go.d/agent/discovery/file/watch.go b/src/go/plugin/go.d/agent/discovery/file/watch.go new file mode 100644 index 000000000..7adefd261 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/watch.go @@ -0,0 +1,220 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + + "github.com/fsnotify/fsnotify" +) + +type ( + Watcher struct { + *logger.Logger + + paths []string + reg confgroup.Registry + watcher *fsnotify.Watcher + cache cache + refreshEvery time.Duration + } + cache map[string]time.Time +) + +func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok } +func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok } +func (c cache) remove(path string) { delete(c, path) } +func (c cache) put(path string, modTime time.Time) { c[path] = modTime } + +func NewWatcher(reg confgroup.Registry, paths []string) *Watcher { + d := &Watcher{ + Logger: log, + paths: paths, + reg: reg, + watcher: nil, + cache: make(cache), + refreshEvery: time.Minute, + } + return d +} + +func (w *Watcher) String() string { + return w.Name() +} + +func (w *Watcher) Name() string { + return "file watcher" +} + +func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) { + w.Info("instance is started") + defer func() { w.Info("instance is stopped") }() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + w.Errorf("fsnotify watcher initialization: %v", err) + return + } + + w.watcher = watcher + defer w.stop() + w.refresh(ctx, in) + + tk := time.NewTicker(w.refreshEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + w.refresh(ctx, in) + case event := <-w.watcher.Events: + // TODO: check if event.Has will do + if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) { + break + } + if event.Has(fsnotify.Create) && w.cache.has(event.Name) { + // vim "backupcopy=no" case, already collected after Rename event. + break + } + if event.Has(fsnotify.Rename) { + // It is common to modify files using vim. + // When writing to a file a backup is made. "backupcopy" option tells how it's done. + // Default is "no": rename the file and write a new one. + // This is cheap attempt to not send empty group for the old file. + time.Sleep(time.Millisecond * 100) + } + w.refresh(ctx, in) + case err := <-w.watcher.Errors: + if err != nil { + w.Warningf("watch: %v", err) + } + } + } +} + +func (w *Watcher) fileMatches(file string) bool { + for _, pattern := range w.paths { + if ok, _ := filepath.Match(pattern, file); ok { + return true + } + } + return false +} + +func (w *Watcher) listFiles() (files []string) { + for _, pattern := range w.paths { + if matches, err := filepath.Glob(pattern); err == nil { + files = append(files, matches...) + } + } + return files +} + +func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) { + select { + case <-ctx.Done(): + return + default: + } + var groups []*confgroup.Group + seen := make(map[string]bool) + + for _, file := range w.listFiles() { + fi, err := os.Lstat(file) + if err != nil { + w.Warningf("lstat '%s': %v", file, err) + continue + } + + if !fi.Mode().IsRegular() { + continue + } + + seen[file] = true + if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) { + continue + } + w.cache.put(file, fi.ModTime()) + + if group, err := parse(w.reg, file); err != nil { + w.Warningf("parse '%s': %v", file, err) + } else if group == nil { + groups = append(groups, &confgroup.Group{Source: file}) + } else { + for _, cfg := range group.Configs { + cfg.SetProvider("file watcher") + cfg.SetSourceType(configSourceType(file)) + cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file)) + } + groups = append(groups, group) + } + } + + for name := range w.cache { + if seen[name] { + continue + } + w.cache.remove(name) + groups = append(groups, &confgroup.Group{Source: name}) + } + + send(ctx, in, groups) + + w.watchDirs() +} + +func (w *Watcher) watchDirs() { + for _, path := range w.paths { + if idx := strings.LastIndex(path, "/"); idx > -1 { + path = path[:idx] + } else { + path = "./" + } + if err := w.watcher.Add(path); err != nil { + w.Errorf("start watching '%s': %v", path, err) + } + } +} + +func (w *Watcher) stop() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // closing the watcher deadlocks unless all events and errors are drained. + go func() { + for { + select { + case <-w.watcher.Errors: + case <-w.watcher.Events: + case <-ctx.Done(): + return + } + } + }() + + _ = w.watcher.Close() +} + +func isChmodOnly(event fsnotify.Event) bool { + return event.Op^fsnotify.Chmod == 0 +} + +func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + select { + case <-ctx.Done(): + case in <- groups: + } +} diff --git a/src/go/plugin/go.d/agent/discovery/file/watch_test.go b/src/go/plugin/go.d/agent/discovery/file/watch_test.go new file mode 100644 index 000000000..f29b5d579 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/file/watch_test.go @@ -0,0 +1,378 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "fmt" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + + "github.com/stretchr/testify/assert" +) + +func TestWatcher_String(t *testing.T) { + assert.NotEmpty(t, NewWatcher(confgroup.Registry{}, nil)) +} + +func TestNewWatcher(t *testing.T) { + tests := map[string]struct { + reg confgroup.Registry + paths []string + }{ + "empty inputs": { + reg: confgroup.Registry{}, + paths: []string{}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.NotNil(t, NewWatcher(test.reg, test.paths)) + }) + } +} + +func TestWatcher_Run(t *testing.T) { + tests := map[string]struct { + createSim func(tmp *tmpDir) discoverySim + }{ + "file exists before start": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfg) + }, + expectedGroups: expected, + } + return sim + }, + }, + "empty file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeString(filename, "") + }, + expectedGroups: expected, + } + return sim + }, + }, + "only comments, no data": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeString(filename, "# a comment") + }, + expectedGroups: expected, + } + return sim + }, + }, + "add file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + afterRun: func() { + tmp.writeYAML(filename, cfg) + }, + expectedGroups: expected, + } + return sim + }, + }, + "remove file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + { + Source: filename, + Configs: nil, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfg) + }, + afterRun: func() { + tmp.removeFile(filename) + }, + expectedGroups: expected, + } + return sim + }, + }, + "change file": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfgOrig := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + cfgChanged := sdConfig{ + { + "name": "name_changed", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name_changed", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfgOrig) + }, + afterRun: func() { + tmp.writeYAML(filename, cfgChanged) + time.Sleep(time.Millisecond * 500) + }, + expectedGroups: expected, + } + return sim + }, + }, + "vim 'backupcopy=no' (writing to a file and backup)": { + createSim: func(tmp *tmpDir) discoverySim { + reg := confgroup.Registry{ + "module": {}, + } + cfg := sdConfig{ + { + "name": "name", + "module": "module", + }, + } + filename := tmp.join("module.conf") + discovery := prepareDiscovery(t, Config{ + Registry: reg, + Watch: []string{tmp.join("*.conf")}, + }) + expected := []*confgroup.Group{ + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": confgroup.TypeStock, + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + { + Source: filename, + Configs: []confgroup.Config{ + { + "name": "name", + "module": "module", + "update_every": module.UpdateEvery, + "autodetection_retry": module.AutoDetectionRetry, + "priority": module.Priority, + "__provider__": "file watcher", + "__source_type__": "stock", + "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename), + }, + }, + }, + } + + sim := discoverySim{ + discovery: discovery, + beforeRun: func() { + tmp.writeYAML(filename, cfg) + }, + afterRun: func() { + newFilename := filename + ".swp" + tmp.renameFile(filename, newFilename) + tmp.writeYAML(filename, cfg) + tmp.removeFile(newFilename) + time.Sleep(time.Millisecond * 500) + }, + expectedGroups: expected, + } + return sim + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + tmp := newTmpDir(t, "watch-run-*") + defer tmp.cleanup() + + test.createSim(tmp).run(t) + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/manager.go b/src/go/plugin/go.d/agent/discovery/manager.go new file mode 100644 index 000000000..646616023 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/manager.go @@ -0,0 +1,199 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package discovery + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/dummy" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/file" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd" +) + +func NewManager(cfg Config) (*Manager, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("discovery manager config validation: %v", err) + } + + mgr := &Manager{ + Logger: logger.New().With( + slog.String("component", "discovery manager"), + ), + send: make(chan struct{}, 1), + sendEvery: time.Second * 2, // timeout to aggregate changes + discoverers: make([]discoverer, 0), + mux: &sync.RWMutex{}, + cache: newCache(), + } + + if err := mgr.registerDiscoverers(cfg); err != nil { + return nil, fmt.Errorf("discovery manager initializaion: %v", err) + } + + return mgr, nil +} + +type discoverer interface { + Run(ctx context.Context, in chan<- []*confgroup.Group) +} + +type Manager struct { + *logger.Logger + discoverers []discoverer + send chan struct{} + sendEvery time.Duration + mux *sync.RWMutex + cache *cache +} + +func (m *Manager) String() string { + return fmt.Sprintf("discovery manager: %v", m.discoverers) +} + +func (m *Manager) Run(ctx context.Context, in chan<- []*confgroup.Group) { + m.Info("instance is started") + defer func() { m.Info("instance is stopped") }() + + var wg sync.WaitGroup + + for _, d := range m.discoverers { + wg.Add(1) + go func(d discoverer) { + defer wg.Done() + m.runDiscoverer(ctx, d) + }(d) + } + + wg.Add(1) + go func() { + defer wg.Done() + m.sendLoop(ctx, in) + }() + + wg.Wait() + <-ctx.Done() +} + +func (m *Manager) registerDiscoverers(cfg Config) error { + if len(cfg.File.Read) > 0 || len(cfg.File.Watch) > 0 { + cfg.File.Registry = cfg.Registry + d, err := file.NewDiscovery(cfg.File) + if err != nil { + return err + } + m.discoverers = append(m.discoverers, d) + } + + if len(cfg.Dummy.Names) > 0 { + cfg.Dummy.Registry = cfg.Registry + d, err := dummy.NewDiscovery(cfg.Dummy) + if err != nil { + return err + } + m.discoverers = append(m.discoverers, d) + } + + if len(cfg.SD.ConfDir) != 0 { + cfg.SD.ConfigDefaults = cfg.Registry + d, err := sd.NewServiceDiscovery(cfg.SD) + if err != nil { + return err + } + m.discoverers = append(m.discoverers, d) + } + + if len(m.discoverers) == 0 { + return errors.New("zero registered discoverers") + } + + m.Infof("registered discoverers: %v", m.discoverers) + + return nil +} + +func (m *Manager) runDiscoverer(ctx context.Context, d discoverer) { + updates := make(chan []*confgroup.Group) + go d.Run(ctx, updates) + + for { + select { + case <-ctx.Done(): + return + case groups, ok := <-updates: + if !ok { + return + } + func() { + m.mux.Lock() + defer m.mux.Unlock() + + m.cache.update(groups) + m.triggerSend() + }() + } + } +} + +func (m *Manager) sendLoop(ctx context.Context, in chan<- []*confgroup.Group) { + m.mustSend(ctx, in) + + tk := time.NewTicker(m.sendEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + select { + case <-m.send: + m.trySend(in) + default: + } + } + } +} + +func (m *Manager) mustSend(ctx context.Context, in chan<- []*confgroup.Group) { + select { + case <-ctx.Done(): + return + case <-m.send: + m.mux.Lock() + groups := m.cache.groups() + m.cache.reset() + m.mux.Unlock() + + select { + case <-ctx.Done(): + case in <- groups: + } + return + } +} + +func (m *Manager) trySend(in chan<- []*confgroup.Group) { + m.mux.Lock() + defer m.mux.Unlock() + + select { + case in <- m.cache.groups(): + m.cache.reset() + default: + m.triggerSend() + } +} + +func (m *Manager) triggerSend() { + select { + case m.send <- struct{}{}: + default: + } +} diff --git a/src/go/plugin/go.d/agent/discovery/manager_test.go b/src/go/plugin/go.d/agent/discovery/manager_test.go new file mode 100644 index 000000000..5861b0902 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/manager_test.go @@ -0,0 +1,177 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package discovery + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/file" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewManager(t *testing.T) { + tests := map[string]struct { + cfg Config + wantErr bool + }{ + "valid config": { + cfg: Config{ + Registry: confgroup.Registry{"module1": confgroup.Default{}}, + File: file.Config{Read: []string{"path"}}, + }, + }, + "invalid config, registry not set": { + cfg: Config{ + File: file.Config{Read: []string{"path"}}, + }, + wantErr: true, + }, + "invalid config, discoverers not set": { + cfg: Config{ + Registry: confgroup.Registry{"module1": confgroup.Default{}}, + }, + wantErr: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + mgr, err := NewManager(test.cfg) + + if test.wantErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.NotNil(t, mgr) + } + }) + } +} + +func TestManager_Run(t *testing.T) { + tests := map[string]func() discoverySim{ + "several discoverers, unique groups with delayed collect": func() discoverySim { + const numGroups, numCfgs = 2, 2 + d1 := prepareMockDiscoverer("test1", numGroups, numCfgs) + d2 := prepareMockDiscoverer("test2", numGroups, numCfgs) + mgr := prepareManager(d1, d2) + expected := combineGroups(d1.groups, d2.groups) + + sim := discoverySim{ + mgr: mgr, + collectDelay: mgr.sendEvery + time.Second, + expectedGroups: expected, + } + return sim + }, + "several discoverers, unique groups": func() discoverySim { + const numGroups, numCfgs = 2, 2 + d1 := prepareMockDiscoverer("test1", numGroups, numCfgs) + d2 := prepareMockDiscoverer("test2", numGroups, numCfgs) + mgr := prepareManager(d1, d2) + expected := combineGroups(d1.groups, d2.groups) + sim := discoverySim{ + mgr: mgr, + expectedGroups: expected, + } + return sim + }, + "several discoverers, same groups": func() discoverySim { + const numGroups, numTargets = 2, 2 + d1 := prepareMockDiscoverer("test1", numGroups, numTargets) + mgr := prepareManager(d1, d1) + expected := combineGroups(d1.groups) + + sim := discoverySim{ + mgr: mgr, + expectedGroups: expected, + } + return sim + }, + "several discoverers, empty groups": func() discoverySim { + const numGroups, numCfgs = 1, 0 + d1 := prepareMockDiscoverer("test1", numGroups, numCfgs) + d2 := prepareMockDiscoverer("test2", numGroups, numCfgs) + mgr := prepareManager(d1, d2) + expected := combineGroups(d1.groups, d2.groups) + + sim := discoverySim{ + mgr: mgr, + expectedGroups: expected, + } + return sim + }, + "several discoverers, nil groups": func() discoverySim { + const numGroups, numCfgs = 0, 0 + d1 := prepareMockDiscoverer("test1", numGroups, numCfgs) + d2 := prepareMockDiscoverer("test2", numGroups, numCfgs) + mgr := prepareManager(d1, d2) + + sim := discoverySim{ + mgr: mgr, + expectedGroups: nil, + } + return sim + }, + } + + for name, sim := range tests { + t.Run(name, func(t *testing.T) { sim().run(t) }) + } +} + +func prepareMockDiscoverer(source string, groups, configs int) mockDiscoverer { + d := mockDiscoverer{} + + for i := 0; i < groups; i++ { + group := confgroup.Group{ + Source: fmt.Sprintf("%s_group_%d", source, i+1), + } + for j := 0; j < configs; j++ { + group.Configs = append(group.Configs, + confgroup.Config{"name": fmt.Sprintf("%s_group_%d_target_%d", source, i+1, j+1)}) + } + d.groups = append(d.groups, &group) + } + return d +} + +func prepareManager(discoverers ...discoverer) *Manager { + mgr := &Manager{ + send: make(chan struct{}, 1), + sendEvery: 2 * time.Second, + discoverers: discoverers, + cache: newCache(), + mux: &sync.RWMutex{}, + } + return mgr +} + +type mockDiscoverer struct { + groups []*confgroup.Group +} + +func (md mockDiscoverer) Run(ctx context.Context, out chan<- []*confgroup.Group) { + for { + select { + case <-ctx.Done(): + return + case out <- md.groups: + return + } + } +} + +func combineGroups(groups ...[]*confgroup.Group) (combined []*confgroup.Group) { + for _, set := range groups { + combined = append(combined, set...) + } + return combined +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/conffile.go b/src/go/plugin/go.d/agent/discovery/sd/conffile.go new file mode 100644 index 000000000..e08a4021b --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/conffile.go @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package sd + +import ( + "context" + "os" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/multipath" +) + +type confFile struct { + source string + content []byte +} + +func newConfFileReader(log *logger.Logger, dir multipath.MultiPath) *confFileReader { + return &confFileReader{ + Logger: log, + confDir: dir, + confChan: make(chan confFile), + } +} + +type confFileReader struct { + *logger.Logger + + confDir multipath.MultiPath + confChan chan confFile +} + +func (c *confFileReader) run(ctx context.Context) { + files, err := c.confDir.FindFiles(".conf") + if err != nil { + c.Error(err) + return + } + + if len(files) == 0 { + return + } + + var confFiles []confFile + + for _, file := range files { + bs, err := os.ReadFile(file) + if err != nil { + c.Error(err) + continue + } + confFiles = append(confFiles, confFile{ + source: file, + content: bs, + }) + } + + for _, conf := range confFiles { + select { + case <-ctx.Done(): + case c.confChan <- conf: + } + } + +} + +func (c *confFileReader) configs() chan confFile { + return c.confChan +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go new file mode 100644 index 000000000..1cea014a9 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "fmt" + "log/slog" + "net" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/dockerhost" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + docker "github.com/docker/docker/client" + "github.com/ilyam8/hashstructure" +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "docker"), + ), + cfgSource: cfg.Source, + newDockerClient: func(addr string) (dockerClient, error) { + return docker.NewClientWithOpts(docker.WithHost(addr)) + }, + addr: docker.DefaultDockerHost, + listInterval: time.Second * 60, + timeout: time.Second * 2, + seenTggSources: make(map[string]bool), + started: make(chan struct{}), + } + + if addr := dockerhost.FromEnv(); addr != "" && d.addr == docker.DefaultDockerHost { + d.Infof("using docker host from environment: %s ", addr) + d.addr = addr + } + + d.Tags().Merge(tags) + + if cfg.Timeout.Duration().Seconds() != 0 { + d.timeout = cfg.Timeout.Duration() + } + if cfg.Address != "" { + d.addr = cfg.Address + } + + return d, nil +} + +type Config struct { + Source string + + Tags string `yaml:"tags"` + Address string `yaml:"address"` + Timeout web.Duration `yaml:"timeout"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + dockerClient dockerClient + newDockerClient func(addr string) (dockerClient, error) + addr string + + cfgSource string + + listInterval time.Duration + timeout time.Duration + seenTggSources map[string]bool // [targetGroup.Source] + + started chan struct{} + } + dockerClient interface { + NegotiateAPIVersion(context.Context) + ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error) + Close() error + } +) + +func (d *Discoverer) String() string { + return "sd:docker" +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.cleanup(); d.Info("instance is stopped") }() + + close(d.started) + + if d.dockerClient == nil { + client, err := d.newDockerClient(d.addr) + if err != nil { + d.Errorf("error on creating docker client: %v", err) + return + } + d.dockerClient = client + } + + d.dockerClient.NegotiateAPIVersion(ctx) + + if err := d.listContainers(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.listInterval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.listContainers(ctx, in); err != nil { + d.Warning(err) + } + } + } +} + +func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error { + listCtx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + + containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{}) + if err != nil { + return err + } + + var tggs []model.TargetGroup + seen := make(map[string]bool) + + for _, cntr := range containers { + if tgg := d.buildTargetGroup(cntr); tgg != nil { + tggs = append(tggs, tgg) + seen[tgg.Source()] = true + } + } + + for src := range d.seenTggSources { + if !seen[src] { + tggs = append(tggs, &targetGroup{source: src}) + } + } + d.seenTggSources = seen + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup { + if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 { + return nil + } + + tgg := &targetGroup{ + source: cntrSource(cntr), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + for netDriver, network := range cntr.NetworkSettings.Networks { + // container with network mode host will be discovered by local-listeners + for _, port := range cntr.Ports { + tgt := &target{ + ID: cntr.ID, + Name: strings.TrimPrefix(cntr.Names[0], "/"), + Image: cntr.Image, + Command: cntr.Command, + Labels: mapAny(cntr.Labels), + PrivatePort: strconv.Itoa(int(port.PrivatePort)), + PublicPort: strconv.Itoa(int(port.PublicPort)), + PublicPortIP: port.IP, + PortProtocol: port.Type, + NetworkMode: cntr.HostConfig.NetworkMode, + NetworkDriver: netDriver, + IPAddress: network.IPAddress, + } + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func (d *Discoverer) cleanup() { + if d.dockerClient != nil { + _ = d.dockerClient.Close() + } +} + +func cntrSource(cntr types.Container) string { + name := strings.TrimPrefix(cntr.Names[0], "/") + return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image) +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/dockerd_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/dockerd_test.go new file mode 100644 index 000000000..630afb0f5 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/dockerd_test.go @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/docker/docker/api/types" + typesNetwork "github.com/docker/docker/api/types/network" +) + +func TestDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() *discoverySim + }{ + "add containers": { + createSim: func() *discoverySim { + nginx1 := prepareNginxContainer("nginx1") + nginx2 := prepareNginxContainer("nginx2") + + sim := &discoverySim{ + dockerCli: func(cli dockerCli, _ time.Duration) { + cli.addContainer(nginx1) + cli.addContainer(nginx2) + }, + wantGroups: []model.TargetGroup{ + &targetGroup{ + source: cntrSource(nginx1), + targets: []model.Target{ + withHash(&target{ + ID: nginx1.ID, + Name: nginx1.Names[0][1:], + Image: nginx1.Image, + Command: nginx1.Command, + Labels: mapAny(nginx1.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + &targetGroup{ + source: cntrSource(nginx2), + targets: []model.Target{ + withHash(&target{ + ID: nginx2.ID, + Name: nginx2.Names[0][1:], + Image: nginx2.Image, + Command: nginx2.Command, + Labels: mapAny(nginx2.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + }, + } + return sim + }, + }, + "remove containers": { + createSim: func() *discoverySim { + nginx1 := prepareNginxContainer("nginx1") + nginx2 := prepareNginxContainer("nginx2") + + sim := &discoverySim{ + dockerCli: func(cli dockerCli, interval time.Duration) { + cli.addContainer(nginx1) + cli.addContainer(nginx2) + time.Sleep(interval * 2) + cli.removeContainer(nginx1.ID) + }, + wantGroups: []model.TargetGroup{ + &targetGroup{ + source: cntrSource(nginx1), + targets: nil, + }, + &targetGroup{ + source: cntrSource(nginx2), + targets: []model.Target{ + withHash(&target{ + ID: nginx2.ID, + Name: nginx2.Names[0][1:], + Image: nginx2.Image, + Command: nginx2.Command, + Labels: mapAny(nginx2.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + }, + } + return sim + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareNginxContainer(name string) types.Container { + return types.Container{ + ID: "id-" + name, + Names: []string{"/" + name}, + Image: "nginx-image", + ImageID: "nginx-image-id", + Command: "nginx-command", + Ports: []types.Port{ + { + IP: "0.0.0.0", + PrivatePort: 80, + PublicPort: 8080, + Type: "tcp", + }, + }, + Labels: map[string]string{"key1": "value1"}, + HostConfig: struct { + NetworkMode string `json:",omitempty"` + Annotations map[string]string `json:",omitempty"` + }{ + NetworkMode: "default", + }, + NetworkSettings: &types.SummaryNetworkSettings{ + Networks: map[string]*typesNetwork.EndpointSettings{ + "bridge": {IPAddress: "192.0.2.0"}, + }, + }, + } +} + +func withHash(tgt *target) *target { + tgt.hash, _ = calcHash(tgt) + tags, _ := model.ParseTags("docker") + tgt.Tags().Merge(tags) + return tgt +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/sim_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/sim_test.go new file mode 100644 index 000000000..fcdbeb894 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/sim_test.go @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "sort" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type dockerCli interface { + addContainer(cntr types.Container) + removeContainer(id string) +} + +type discoverySim struct { + dockerCli func(cli dockerCli, interval time.Duration) + wantGroups []model.TargetGroup +} + +func (sim *discoverySim) run(t *testing.T) { + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "docker", + }) + require.NoError(t, err) + + mock := newMockDockerd() + + d.newDockerClient = func(addr string) (dockerClient, error) { + return mock, nil + } + d.listInterval = time.Millisecond * 100 + + seen := make(map[string]model.TargetGroup) + ctx, cancel := context.WithCancel(context.Background()) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } + } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") + } + + sim.dockerCli(mock, d.listInterval) + time.Sleep(time.Second) + + cancel() + + select { + case <-done: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := len(sim.wantGroups), len(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) + + assert.True(t, mock.negApiVerCalled, "NegotiateAPIVersion called") + assert.True(t, mock.closeCalled, "Close called") +} + +func newMockDockerd() *mockDockerd { + return &mockDockerd{ + containers: make(map[string]types.Container), + } +} + +type mockDockerd struct { + negApiVerCalled bool + closeCalled bool + mux sync.Mutex + containers map[string]types.Container +} + +func (m *mockDockerd) addContainer(cntr types.Container) { + m.mux.Lock() + defer m.mux.Unlock() + + m.containers[cntr.ID] = cntr +} + +func (m *mockDockerd) removeContainer(id string) { + m.mux.Lock() + defer m.mux.Unlock() + + delete(m.containers, id) +} + +func (m *mockDockerd) ContainerList(_ context.Context, _ typesContainer.ListOptions) ([]types.Container, error) { + m.mux.Lock() + defer m.mux.Unlock() + + var cntrs []types.Container + for _, cntr := range m.containers { + cntrs = append(cntrs, cntr) + } + + return cntrs, nil +} + +func (m *mockDockerd) NegotiateAPIVersion(_ context.Context) { + m.negApiVerCalled = true +} + +func (m *mockDockerd) Close() error { + m.closeCalled = true + return nil +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/target.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/target.go new file mode 100644 index 000000000..2cf0575b5 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/target.go @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "fmt" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" +) + +type targetGroup struct { + source string + targets []model.Target +} + +func (g *targetGroup) Provider() string { return "sd:docker" } +func (g *targetGroup) Source() string { return g.source } +func (g *targetGroup) Targets() []model.Target { return g.targets } + +type target struct { + model.Base + + hash uint64 + + ID string + Name string + Image string + Command string + Labels map[string]any + PrivatePort string // Port on the container + PublicPort string // Port exposed on the host + PublicPortIP string // Host IP address that the container's port is mapped to + PortProtocol string + NetworkMode string + NetworkDriver string + IPAddress string + + Address string // "IPAddress:PrivatePort" +} + +func (t *target) TUID() string { + if t.PublicPort != "" { + return fmt.Sprintf("%s_%s_%s_%s_%s_%s", + t.Name, t.IPAddress, t.PublicPortIP, t.PortProtocol, t.PublicPort, t.PrivatePort) + } + if t.PrivatePort != "" { + return fmt.Sprintf("%s_%s_%s_%s", + t.Name, t.IPAddress, t.PortProtocol, t.PrivatePort) + } + return fmt.Sprintf("%s_%s", t.Name, t.IPAddress) +} + +func (t *target) Hash() uint64 { + return t.hash +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/config.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/config.go new file mode 100644 index 000000000..15a1e4745 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/config.go @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "errors" + "fmt" +) + +type Config struct { + APIServer string `yaml:"api_server"` // TODO: not used + Role string `yaml:"role"` + Tags string `yaml:"tags"` + Namespaces []string `yaml:"namespaces"` + Selector struct { + Label string `yaml:"label"` + Field string `yaml:"field"` + } `yaml:"selector"` + Pod struct { + LocalMode bool `yaml:"local_mode"` + } `yaml:"pod"` +} + +func validateConfig(cfg Config) error { + switch role(cfg.Role) { + case rolePod, roleService: + default: + return fmt.Errorf("unknown role: '%s'", cfg.Role) + } + if cfg.Tags == "" { + return errors.New("'tags' not set") + } + return nil +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go new file mode 100644 index 000000000..439e2b695 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go @@ -0,0 +1,268 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "os" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/k8sclient" + + "github.com/ilyam8/hashstructure" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type role string + +const ( + rolePod role = "pod" + roleService role = "service" +) + +const ( + envNodeName = "MY_NODE_NAME" +) + +var log = logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "kubernetes"), +) + +func NewKubeDiscoverer(cfg Config) (*KubeDiscoverer, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("config validation: %v", err) + } + + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + client, err := k8sclient.New("Netdata/service-td") + if err != nil { + return nil, fmt.Errorf("create clientset: %v", err) + } + + ns := cfg.Namespaces + if len(ns) == 0 { + ns = []string{corev1.NamespaceAll} + } + + selectorField := cfg.Selector.Field + if role(cfg.Role) == rolePod && cfg.Pod.LocalMode { + name := os.Getenv(envNodeName) + if name == "" { + return nil, fmt.Errorf("local_mode is enabled, but env '%s' not set", envNodeName) + } + selectorField = joinSelectors(selectorField, "spec.nodeName="+name) + } + + d := &KubeDiscoverer{ + Logger: log, + client: client, + tags: tags, + role: role(cfg.Role), + namespaces: ns, + selectorLabel: cfg.Selector.Label, + selectorField: selectorField, + discoverers: make([]model.Discoverer, 0, len(ns)), + started: make(chan struct{}), + } + + return d, nil +} + +type KubeDiscoverer struct { + *logger.Logger + + client kubernetes.Interface + + tags model.Tags + role role + namespaces []string + selectorLabel string + selectorField string + discoverers []model.Discoverer + started chan struct{} +} + +func (d *KubeDiscoverer) String() string { + return "sd:k8s" +} + +const resyncPeriod = 10 * time.Minute + +func (d *KubeDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer d.Info("instance is stopped") + + for _, namespace := range d.namespaces { + var dd model.Discoverer + switch d.role { + case rolePod: + dd = d.setupPodDiscoverer(ctx, namespace) + case roleService: + dd = d.setupServiceDiscoverer(ctx, namespace) + default: + d.Errorf("unknown role: '%s'", d.role) + continue + } + d.discoverers = append(d.discoverers, dd) + } + + if len(d.discoverers) == 0 { + d.Error("no discoverers registered") + return + } + + d.Infof("registered: %v", d.discoverers) + + var wg sync.WaitGroup + updates := make(chan []model.TargetGroup) + + for _, disc := range d.discoverers { + wg.Add(1) + go func(disc model.Discoverer) { defer wg.Done(); disc.Discover(ctx, updates) }(disc) + } + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + close(d.started) + + for { + select { + case <-ctx.Done(): + select { + case <-done: + d.Info("all discoverers exited") + case <-time.After(time.Second * 5): + d.Warning("not all discoverers exited") + } + return + case <-done: + d.Info("all discoverers exited") + return + case tggs := <-updates: + select { + case <-ctx.Done(): + case in <- tggs: + } + } + } +} + +func (d *KubeDiscoverer) setupPodDiscoverer(ctx context.Context, ns string) *podDiscoverer { + pod := d.client.CoreV1().Pods(ns) + podLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return pod.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return pod.Watch(ctx, opts) + }, + } + + cmap := d.client.CoreV1().ConfigMaps(ns) + cmapLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return cmap.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return cmap.Watch(ctx, opts) + }, + } + + secret := d.client.CoreV1().Secrets(ns) + secretLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return secret.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return secret.Watch(ctx, opts) + }, + } + + td := newPodDiscoverer( + cache.NewSharedInformer(podLW, &corev1.Pod{}, resyncPeriod), + cache.NewSharedInformer(cmapLW, &corev1.ConfigMap{}, resyncPeriod), + cache.NewSharedInformer(secretLW, &corev1.Secret{}, resyncPeriod), + ) + td.Tags().Merge(d.tags) + + return td +} + +func (d *KubeDiscoverer) setupServiceDiscoverer(ctx context.Context, namespace string) *serviceDiscoverer { + svc := d.client.CoreV1().Services(namespace) + + svcLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return svc.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return svc.Watch(ctx, opts) + }, + } + + inf := cache.NewSharedInformer(svcLW, &corev1.Service{}, resyncPeriod) + + td := newServiceDiscoverer(inf) + td.Tags().Merge(d.tags) + + return td +} + +func enqueue(queue *workqueue.Type, obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + queue.Add(key) +} + +func send(ctx context.Context, in chan<- []model.TargetGroup, tgg model.TargetGroup) { + if tgg == nil { + return + } + select { + case <-ctx.Done(): + case in <- []model.TargetGroup{tgg}: + } +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func joinSelectors(srs ...string) string { + var i int + for _, v := range srs { + if v != "" { + srs[i] = v + i++ + } + } + return strings.Join(srs[:i], ",") +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go new file mode 100644 index 000000000..ba60a47b4 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "fmt" + "os" + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/k8sclient" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +var discoveryTags, _ = model.ParseTags("k8s") + +func TestMain(m *testing.M) { + _ = os.Setenv(envNodeName, "m01") + _ = os.Setenv(k8sclient.EnvFakeClient, "true") + code := m.Run() + _ = os.Unsetenv(envNodeName) + _ = os.Unsetenv(k8sclient.EnvFakeClient) + os.Exit(code) +} + +func TestNewKubeDiscoverer(t *testing.T) { + tests := map[string]struct { + cfg Config + wantErr bool + }{ + "pod role config": { + wantErr: false, + cfg: Config{Role: string(rolePod), Tags: "k8s"}, + }, + "service role config": { + wantErr: false, + cfg: Config{Role: string(roleService), Tags: "k8s"}, + }, + "empty config": { + wantErr: true, + cfg: Config{}, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + disc, err := NewKubeDiscoverer(test.cfg) + + if test.wantErr { + assert.Error(t, err) + assert.Nil(t, disc) + } else { + assert.NoError(t, err) + assert.NotNil(t, disc) + } + }) + } +} + +func TestKubeDiscoverer_Discover(t *testing.T) { + const prod = "prod" + const dev = "dev" + prodNamespace := newNamespace(prod) + devNamespace := newNamespace(dev) + + tests := map[string]struct { + createSim func() discoverySim + }{ + "multiple namespaces pod td": { + createSim: func() discoverySim { + httpdProd, nginxProd := newHTTPDPod(), newNGINXPod() + httpdProd.Namespace = prod + nginxProd.Namespace = prod + + httpdDev, nginxDev := newHTTPDPod(), newNGINXPod() + httpdDev.Namespace = dev + nginxDev.Namespace = dev + + disc, _ := preparePodDiscoverer( + []string{prod, dev}, + prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev) + + return discoverySim{ + td: disc, + sortBeforeVerify: true, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpdDev), + preparePodTargetGroup(nginxDev), + preparePodTargetGroup(httpdProd), + preparePodTargetGroup(nginxProd), + }, + } + }, + }, + "multiple namespaces ClusterIP service td": { + createSim: func() discoverySim { + httpdProd, nginxProd := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpdProd.Namespace = prod + nginxProd.Namespace = prod + + httpdDev, nginxDev := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpdDev.Namespace = dev + nginxDev.Namespace = dev + + disc, _ := prepareSvcDiscoverer( + []string{prod, dev}, + prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev) + + return discoverySim{ + td: disc, + sortBeforeVerify: true, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpdDev), + prepareSvcTargetGroup(nginxDev), + prepareSvcTargetGroup(httpdProd), + prepareSvcTargetGroup(nginxProd), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareDiscoverer(role role, namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + client := fake.NewSimpleClientset(objects...) + tags, _ := model.ParseTags("k8s") + disc := &KubeDiscoverer{ + tags: tags, + role: role, + namespaces: namespaces, + client: client, + discoverers: nil, + started: make(chan struct{}), + } + return disc, client +} + +func newNamespace(name string) *corev1.Namespace { + return &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} +} + +func mustCalcHash(obj any) uint64 { + hash, err := calcHash(obj) + if err != nil { + panic(fmt.Sprintf("hash calculation: %v", err)) + } + return hash +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go new file mode 100644 index 000000000..617081742 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go @@ -0,0 +1,434 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type podTargetGroup struct { + targets []model.Target + source string +} + +func (p podTargetGroup) Provider() string { return "sd:k8s:pod" } +func (p podTargetGroup) Source() string { return p.source } +func (p podTargetGroup) Targets() []model.Target { return p.targets } + +type PodTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + NodeName string + PodIP string + ControllerName string + ControllerKind string + ContName string + Image string + Env map[string]any + Port string + PortName string + PortProtocol string +} + +func (p PodTarget) Hash() uint64 { return p.hash } +func (p PodTarget) TUID() string { return p.tuid } + +func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer { + + if pod == nil || cmap == nil || secret == nil { + panic("nil pod or cmap or secret informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"}) + + _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &podDiscoverer{ + Logger: log, + podInformer: pod, + cmapInformer: cmap, + secretInformer: secret, + queue: queue, + } +} + +type podDiscoverer struct { + *logger.Logger + model.Base + + podInformer cache.SharedInformer + cmapInformer cache.SharedInformer + secretInformer cache.SharedInformer + queue *workqueue.Type +} + +func (p *podDiscoverer) String() string { + return "sd:k8s:pod" +} + +func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + p.Info("instance is started") + defer p.Info("instance is stopped") + defer p.queue.ShutDown() + + go p.podInformer.Run(ctx.Done()) + go p.cmapInformer.Run(ctx.Done()) + go p.secretInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), + p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) { + p.Error("failed to sync caches") + return + } + + go p.run(ctx, in) + + <-ctx.Done() +} + +func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := p.queue.Get() + if shutdown { + return + } + p.handleQueueItem(ctx, in, item) + } +} + +func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer p.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, ok, err := p.podInformer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !ok { + tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + pod, err := toPod(obj) + if err != nil { + return + } + + tgg := p.buildTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(p.Tags()) + } + + send(ctx, in, tgg) + +} + +func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup { + if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 { + return &podTargetGroup{ + source: podSource(pod), + } + } + return &podTargetGroup{ + source: podSource(pod), + targets: p.buildTargets(pod), + } +} + +func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) { + var name, kind string + for _, ref := range pod.OwnerReferences { + if ref.Controller != nil && *ref.Controller { + name = ref.Name + kind = ref.Kind + break + } + } + + for _, container := range pod.Spec.Containers { + env := p.collectEnv(pod.Namespace, container) + + if len(container.Ports) == 0 { + tgt := &PodTarget{ + tuid: podTUID(pod, container), + Address: pod.Status.PodIP, + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } else { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + } + } + + return targets +} + +func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string { + vars := make(map[string]string) + + // When a key exists in multiple sources, + // the value associated with the last source will take precedence. + // Values defined by an Env with a duplicate key will take precedence. + // + // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go) + // - envFrom: configMapRef, secretRef + // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap + + for _, src := range container.EnvFrom { + switch { + case src.ConfigMapRef != nil: + p.envFromConfigMap(vars, ns, src) + case src.SecretRef != nil: + p.envFromSecret(vars, ns, src) + } + } + + for _, env := range container.Env { + if env.Name == "" || isVar(env.Name) { + continue + } + switch { + case env.Value != "": + vars[env.Name] = env.Value + case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil: + p.valueFromSecret(vars, ns, env) + case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil: + p.valueFromConfigMap(vars, ns, env) + } + } + + if len(vars) == 0 { + return nil + } + return vars +} + +func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" { + return + } + + sr := env.ValueFrom.ConfigMapKeyRef + key := ns + "/" + sr.Name + + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + if v, ok := cmap.Data[sr.Key]; ok { + vars[env.Name] = v + } +} + +func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" { + return + } + + secretKey := env.ValueFrom.SecretKeyRef + key := ns + "/" + secretKey.Name + + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + if v, ok := secret.Data[secretKey.Key]; ok { + vars[env.Name] = string(v) + } +} + +func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.ConfigMapRef.Name == "" { + return + } + + key := ns + "/" + src.ConfigMapRef.Name + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + for k, v := range cmap.Data { + vars[src.Prefix+k] = v + } +} + +func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.SecretRef.Name == "" { + return + } + + key := ns + "/" + src.SecretRef.Name + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + for k, v := range secret.Data { + vars[src.Prefix+k] = string(v) + } +} + +func podTUID(pod *corev1.Pod, container corev1.Container) string { + return fmt.Sprintf("%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + ) +} + +func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string { + return fmt.Sprintf("%s_%s_%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatUint(uint64(port.ContainerPort), 10), + ) +} + +func podSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name) +} + +func podSource(pod *corev1.Pod) string { + return podSourceFromNsName(pod.Namespace, pod.Name) +} + +func toPod(obj any) (*corev1.Pod, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return pod, nil +} + +func toConfigMap(obj any) (*corev1.ConfigMap, error) { + cmap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return cmap, nil +} + +func toSecret(obj any) (*corev1.Secret, error) { + secret, ok := obj.(*corev1.Secret) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return secret, nil +} + +func isVar(name string) bool { + // Variable references $(VAR_NAME) are expanded using the previous defined + // environment variables in the container and any service environment + // variables. + return strings.IndexByte(name, '$') != -1 +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod_test.go new file mode 100644 index 000000000..838c2413f --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod_test.go @@ -0,0 +1,648 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func TestPodTargetGroup_Provider(t *testing.T) { + var p podTargetGroup + assert.NotEmpty(t, p.Provider()) +} + +func TestPodTargetGroup_Source(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantSources []string + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantSources: []string{ + "discoverer=k8s,kind=pod,namespace=default,pod_name=httpd-dd95c4d68-5bkwl", + "discoverer=k8s,kind=pod,namespace=default,pod_name=nginx-7cfd77469b-q6kxj", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var sources []string + for _, tgg := range sim.run(t) { + sources = append(sources, tgg.Source()) + } + + assert.Equal(t, test.wantSources, sources) + }) + } +} + +func TestPodTargetGroup_Targets(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTargets int + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantTargets: 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var targets int + for _, tgg := range sim.run(t) { + targets += len(tgg.Targets()) + } + + assert.Equal(t, test.wantTargets, targets) + }) + } +} + +func TestPodTarget_Hash(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantHashes []uint64 + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantHashes: []uint64{ + 12703169414253998055, + 13351713096133918928, + 8241692333761256175, + 11562466355572729519, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var hashes []uint64 + for _, tgg := range sim.run(t) { + for _, tg := range tgg.Targets() { + hashes = append(hashes, tg.Hash()) + } + } + + assert.Equal(t, test.wantHashes, hashes) + }) + } +} + +func TestPodTarget_TUID(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTUID []string + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantTUID: []string{ + "default_httpd-dd95c4d68-5bkwl_httpd_tcp_80", + "default_httpd-dd95c4d68-5bkwl_httpd_tcp_443", + "default_nginx-7cfd77469b-q6kxj_nginx_tcp_80", + "default_nginx-7cfd77469b-q6kxj_nginx_tcp_443", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var tuid []string + for _, tgg := range sim.run(t) { + for _, tg := range tgg.Targets() { + tuid = append(tuid, tg.TUID()) + } + } + + assert.Equal(t, test.wantTUID, tuid) + }) + } +} + +func TestNewPodDiscoverer(t *testing.T) { + tests := map[string]struct { + podInf cache.SharedInformer + cmapInf cache.SharedInformer + secretInf cache.SharedInformer + wantPanic bool + }{ + "valid informers": { + wantPanic: false, + podInf: cache.NewSharedInformer(nil, &corev1.Pod{}, resyncPeriod), + cmapInf: cache.NewSharedInformer(nil, &corev1.ConfigMap{}, resyncPeriod), + secretInf: cache.NewSharedInformer(nil, &corev1.Secret{}, resyncPeriod), + }, + "nil informers": { + wantPanic: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + f := func() { newPodDiscoverer(test.podInf, test.cmapInf, test.secretInf) } + + if test.wantPanic { + assert.Panics(t, f) + } else { + assert.NotPanics(t, f) + } + }) + } +} + +func TestPodDiscoverer_String(t *testing.T) { + var p podDiscoverer + assert.NotEmpty(t, p.String()) +} + +func TestPodDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + }{ + "ADD: pods exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + td, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: td, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "ADD: pods exist before run and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "DELETE: remove pods after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd, nginx) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _ = podClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "DELETE,ADD: remove and add pods after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + prepareEmptyPodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "ADD: pods with empty PodIP": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Status.PodIP = "" + nginx.Status.PodIP = "" + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "UPDATE: set pods PodIP after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Status.PodIP = "" + nginx.Status.PodIP = "" + disc, client := prepareAllNsPodDiscoverer(httpd, nginx) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _, _ = podClient.Update(ctx, newHTTPDPod(), metav1.UpdateOptions{}) + _, _ = podClient.Update(ctx, newNGINXPod(), metav1.UpdateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + preparePodTargetGroup(newHTTPDPod()), + preparePodTargetGroup(newNGINXPod()), + }, + } + }, + }, + "ADD: pods without containers": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Spec.Containers = httpd.Spec.Containers[:0] + nginx.Spec.Containers = httpd.Spec.Containers[:0] + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "Env: from value": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + {Name: "key1", Value: "value1"}, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + + disc, _ := prepareAllNsPodDiscoverer(httpd) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "Env: from Secret": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + { + Name: "key1", + ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}, + Key: "key1", + }}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + secret := prepareSecret("my-secret", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, secret) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "Env: from ConfigMap": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + { + Name: "key1", + ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}, + Key: "key1", + }}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + cmap := prepareConfigMap("my-cmap", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, cmap) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "EnvFrom: from ConfigMap": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.EnvFrom = []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1", "key2": "value2"} + cmap := prepareConfigMap("my-cmap", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, cmap) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "EnvFrom: from Secret": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.EnvFrom = []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1", "key2": "value2"} + secret := prepareSecret("my-secret", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, secret) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareAllNsPodDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(rolePod, []string{corev1.NamespaceAll}, objects...) +} + +func preparePodDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(rolePod, namespaces, objects...) +} + +func mangleContainers(containers []corev1.Container, mange func(container *corev1.Container)) { + for i := range containers { + mange(&containers[i]) + } +} + +var controllerTrue = true + +func newHTTPDPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "httpd-dd95c4d68-5bkwl", + Namespace: "default", + UID: "1cebb6eb-0c1e-495b-8131-8fa3e6668dc8", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "httpd", "tier": "frontend"}, + OwnerReferences: []metav1.OwnerReference{ + {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue}, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "m01", + Containers: []corev1.Container{ + { + Name: "httpd", + Image: "httpd", + Ports: []corev1.ContainerPort{ + {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443}, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "172.17.0.1", + }, + } +} + +func newNGINXPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-7cfd77469b-q6kxj", + Namespace: "default", + UID: "09e883f2-d740-4c5f-970d-02cf02876522", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "nginx", "tier": "frontend"}, + OwnerReferences: []metav1.OwnerReference{ + {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue}, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "m01", + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + Ports: []corev1.ContainerPort{ + {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443}, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "172.17.0.2", + }, + } +} + +func prepareConfigMap(name string, data map[string]string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8167" + name), + }, + Data: data, + } +} + +func prepareSecret(name string, data map[string]string) *corev1.Secret { + secretData := make(map[string][]byte, len(data)) + for k, v := range data { + secretData[k] = []byte(v) + } + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8161" + name), + }, + Data: secretData, + } +} + +func prepareEmptyPodTargetGroup(pod *corev1.Pod) *podTargetGroup { + return &podTargetGroup{source: podSource(pod)} +} + +func preparePodTargetGroup(pod *corev1.Pod) *podTargetGroup { + tgg := prepareEmptyPodTargetGroup(pod) + + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: "netdata-test", + ControllerKind: "DaemonSet", + ContName: container.Name, + Image: container.Image, + Env: nil, + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + tgt.hash = mustCalcHash(tgt) + tgt.Tags().Merge(discoveryTags) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func preparePodTargetGroupWithEnv(pod *corev1.Pod, env map[string]string) *podTargetGroup { + tgg := preparePodTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.(*PodTarget).Env = mapAny(env) + tgt.(*PodTarget).hash = mustCalcHash(tgt) + } + + return tgg +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go new file mode 100644 index 000000000..1d5ae7cd5 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go @@ -0,0 +1,209 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type serviceTargetGroup struct { + targets []model.Target + source string +} + +func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" } +func (s serviceTargetGroup) Source() string { return s.source } +func (s serviceTargetGroup) Targets() []model.Target { return s.targets } + +type ServiceTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + Port string + PortName string + PortProtocol string + ClusterIP string + ExternalName string + Type string +} + +func (s ServiceTarget) Hash() uint64 { return s.hash } +func (s ServiceTarget) TUID() string { return s.tuid } + +type serviceDiscoverer struct { + *logger.Logger + model.Base + + informer cache.SharedInformer + queue *workqueue.Type +} + +func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer { + if inf == nil { + panic("nil service informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"}) + _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &serviceDiscoverer{ + Logger: log, + informer: inf, + queue: queue, + } +} + +func (s *serviceDiscoverer) String() string { + return "k8s service" +} + +func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) { + s.Info("instance is started") + defer s.Info("instance is stopped") + defer s.queue.ShutDown() + + go s.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) { + s.Error("failed to sync caches") + return + } + + go s.run(ctx, ch) + + <-ctx.Done() +} + +func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := s.queue.Get() + if shutdown { + return + } + + s.handleQueueItem(ctx, in, item) + } +} + +func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer s.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, exists, err := s.informer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !exists { + tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + svc, err := toService(obj) + if err != nil { + return + } + + tgg := s.buildTargetGroup(svc) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(s.Tags()) + } + + send(ctx, in, tgg) +} + +func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup { + // TODO: headless service? + if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 { + return &serviceTargetGroup{ + source: serviceSource(svc), + } + } + return &serviceTargetGroup{ + source: serviceSource(svc), + targets: s.buildTargets(svc), + } +} + +func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) { + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + + return targets +} + +func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string { + return fmt.Sprintf("%s_%s_%s_%s", + svc.Namespace, + svc.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatInt(int64(port.Port), 10), + ) +} + +func serviceSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name) +} + +func serviceSource(svc *corev1.Service) string { + return serviceSourceFromNsName(svc.Namespace, svc.Name) +} + +func toService(obj any) (*corev1.Service, error) { + svc, ok := obj.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return svc, nil +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service_test.go new file mode 100644 index 000000000..c3e83e202 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service_test.go @@ -0,0 +1,456 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func TestServiceTargetGroup_Provider(t *testing.T) { + var s serviceTargetGroup + assert.NotEmpty(t, s.Provider()) +} + +func TestServiceTargetGroup_Source(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantSources []string + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantSources: []string{ + "discoverer=k8s,kind=service,namespace=default,service_name=httpd-cluster-ip-service", + "discoverer=k8s,kind=service,namespace=default,service_name=nginx-cluster-ip-service", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var sources []string + for _, tgg := range sim.run(t) { + sources = append(sources, tgg.Source()) + } + + assert.Equal(t, test.wantSources, sources) + }) + } +} + +func TestServiceTargetGroup_Targets(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTargets int + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantTargets: 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var targets int + for _, tgg := range sim.run(t) { + targets += len(tgg.Targets()) + } + + assert.Equal(t, test.wantTargets, targets) + }) + } +} + +func TestServiceTarget_Hash(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantHashes []uint64 + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantHashes: []uint64{ + 17611803477081780974, + 6019985892433421258, + 4151907287549842238, + 5757608926096186119, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var hashes []uint64 + for _, tgg := range sim.run(t) { + for _, tgt := range tgg.Targets() { + hashes = append(hashes, tgt.Hash()) + } + } + + assert.Equal(t, test.wantHashes, hashes) + }) + } +} + +func TestServiceTarget_TUID(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTUID []string + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantTUID: []string{ + "default_httpd-cluster-ip-service_tcp_80", + "default_httpd-cluster-ip-service_tcp_443", + "default_nginx-cluster-ip-service_tcp_80", + "default_nginx-cluster-ip-service_tcp_443", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var tuid []string + for _, tgg := range sim.run(t) { + for _, tgt := range tgg.Targets() { + tuid = append(tuid, tgt.TUID()) + } + } + + assert.Equal(t, test.wantTUID, tuid) + }) + } +} + +func TestNewServiceDiscoverer(t *testing.T) { + tests := map[string]struct { + informer cache.SharedInformer + wantPanic bool + }{ + "valid informer": { + wantPanic: false, + informer: cache.NewSharedInformer(nil, &corev1.Service{}, resyncPeriod), + }, + "nil informer": { + wantPanic: true, + informer: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + f := func() { newServiceDiscoverer(test.informer) } + + if test.wantPanic { + assert.Panics(t, f) + } else { + assert.NotPanics(t, f) + } + }) + } +} + +func TestServiceDiscoverer_String(t *testing.T) { + var s serviceDiscoverer + assert.NotEmpty(t, s.String()) +} + +func TestServiceDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + }{ + "ADD: ClusterIP svc exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "ADD: ClusterIP svc exist before run and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "DELETE: ClusterIP svc remove after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd, nginx) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _ = svcClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + "ADD,DELETE: ClusterIP svc remove and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "ADD: Headless svc exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + "UPDATE: Headless => ClusterIP svc after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService() + httpdUpd, nginxUpd := *httpd, *nginx + httpdUpd.Spec.ClusterIP = "10.100.0.1" + nginxUpd.Spec.ClusterIP = "10.100.0.2" + disc, client := prepareAllNsSvcDiscoverer(httpd, nginx) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _, _ = svcClient.Update(ctx, &httpdUpd, metav1.UpdateOptions{}) + _, _ = svcClient.Update(ctx, &nginxUpd, metav1.UpdateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + prepareSvcTargetGroup(&httpdUpd), + prepareSvcTargetGroup(&nginxUpd), + }, + } + }, + }, + "ADD: ClusterIP svc with zero exposed ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpd.Spec.Ports = httpd.Spec.Ports[:0] + nginx.Spec.Ports = httpd.Spec.Ports[:0] + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareAllNsSvcDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(roleService, []string{corev1.NamespaceAll}, objects...) +} + +func prepareSvcDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(roleService, namespaces, objects...) +} + +func newHTTPDClusterIPService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "httpd-cluster-ip-service", + Namespace: "default", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "httpd", "tier": "frontend"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443}, + }, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "10.100.0.1", + Selector: map[string]string{"app": "httpd", "tier": "frontend"}, + }, + } +} + +func newNGINXClusterIPService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-cluster-ip-service", + Namespace: "default", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "nginx", "tier": "frontend"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443}, + }, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "10.100.0.2", + Selector: map[string]string{"app": "nginx", "tier": "frontend"}, + }, + } +} + +func newHTTPDHeadlessService() *corev1.Service { + svc := newHTTPDClusterIPService() + svc.Name = "httpd-headless-service" + svc.Spec.ClusterIP = "" + return svc +} + +func newNGINXHeadlessService() *corev1.Service { + svc := newNGINXClusterIPService() + svc.Name = "nginx-headless-service" + svc.Spec.ClusterIP = "" + return svc +} + +func prepareEmptySvcTargetGroup(svc *corev1.Service) *serviceTargetGroup { + return &serviceTargetGroup{source: serviceSource(svc)} +} + +func prepareSvcTargetGroup(svc *corev1.Service) *serviceTargetGroup { + tgg := prepareEmptySvcTargetGroup(svc) + + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + tgt.hash = mustCalcHash(tgt) + tgt.Tags().Merge(discoveryTags) + tgg.targets = append(tgg.targets, tgt) + } + + return tgg +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/sim_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/sim_test.go new file mode 100644 index 000000000..99bdfae54 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/sim_test.go @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/tools/cache" +) + +const ( + startWaitTimeout = time.Second * 3 + finishWaitTimeout = time.Second * 5 +) + +type discoverySim struct { + td *KubeDiscoverer + runAfterSync func(ctx context.Context) + sortBeforeVerify bool + wantTargetGroups []model.TargetGroup +} + +func (sim discoverySim) run(t *testing.T) []model.TargetGroup { + t.Helper() + require.NotNil(t, sim.td) + require.NotEmpty(t, sim.wantTargetGroups) + + in, out := make(chan []model.TargetGroup), make(chan []model.TargetGroup) + go sim.collectTargetGroups(t, in, out) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + go sim.td.Discover(ctx, in) + + select { + case <-sim.td.started: + case <-time.After(startWaitTimeout): + t.Fatalf("td %s failed to start in %s", sim.td.discoverers, startWaitTimeout) + } + + synced := cache.WaitForCacheSync(ctx.Done(), sim.td.hasSynced) + require.Truef(t, synced, "td %s failed to sync", sim.td.discoverers) + + if sim.runAfterSync != nil { + sim.runAfterSync(ctx) + } + + groups := <-out + + if sim.sortBeforeVerify { + sortTargetGroups(groups) + } + + sim.verifyResult(t, groups) + return groups +} + +func (sim discoverySim) collectTargetGroups(t *testing.T, in, out chan []model.TargetGroup) { + var tggs []model.TargetGroup +loop: + for { + select { + case inGroups := <-in: + if tggs = append(tggs, inGroups...); len(tggs) >= len(sim.wantTargetGroups) { + break loop + } + case <-time.After(finishWaitTimeout): + t.Logf("td %s timed out after %s, got %d groups, expected %d, some events are skipped", + sim.td.discoverers, finishWaitTimeout, len(tggs), len(sim.wantTargetGroups)) + break loop + } + } + out <- tggs +} + +func (sim discoverySim) verifyResult(t *testing.T, result []model.TargetGroup) { + var expected, actual any + + if len(sim.wantTargetGroups) == len(result) { + expected = sim.wantTargetGroups + actual = result + } else { + want := make(map[string]model.TargetGroup) + for _, group := range sim.wantTargetGroups { + want[group.Source()] = group + } + got := make(map[string]model.TargetGroup) + for _, group := range result { + got[group.Source()] = group + } + expected, actual = want, got + } + + assert.Equal(t, expected, actual) +} + +type hasSynced interface { + hasSynced() bool +} + +var ( + _ hasSynced = &KubeDiscoverer{} + _ hasSynced = &podDiscoverer{} + _ hasSynced = &serviceDiscoverer{} +) + +func (d *KubeDiscoverer) hasSynced() bool { + for _, disc := range d.discoverers { + v, ok := disc.(hasSynced) + if !ok || !v.hasSynced() { + return false + } + } + return true +} + +func (p *podDiscoverer) hasSynced() bool { + return p.podInformer.HasSynced() && p.cmapInformer.HasSynced() && p.secretInformer.HasSynced() +} + +func (s *serviceDiscoverer) hasSynced() bool { + return s.informer.HasSynced() +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go new file mode 100644 index 000000000..6f536c49e --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go @@ -0,0 +1,326 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "log/slog" + "net" + "os" + "os/exec" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/pkg/executable" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/ilyam8/hashstructure" +) + +var ( + shortName = "net_listeners" + fullName = fmt.Sprintf("sd:%s", shortName) +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + dir := os.Getenv("NETDATA_PLUGINS_DIR") + if dir == "" { + dir = executable.Directory + } + if dir == "" { + dir, _ = os.Getwd() + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", shortName), + ), + cfgSource: cfg.Source, + ll: &localListenersExec{ + binPath: filepath.Join(dir, "local-listeners"), + timeout: time.Second * 5, + }, + interval: time.Minute * 2, + expiryTime: time.Minute * 10, + cache: make(map[uint64]*cacheItem), + started: make(chan struct{}), + } + + d.Tags().Merge(tags) + + return d, nil +} + +type Config struct { + Source string `yaml:"-"` + Tags string `yaml:"tags"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + cfgSource string + + interval time.Duration + ll localListeners + + expiryTime time.Duration + cache map[uint64]*cacheItem // [target.Hash] + + started chan struct{} + } + cacheItem struct { + lastSeenTime time.Time + tgt model.Target + } + localListeners interface { + discover(ctx context.Context) ([]byte, error) + } +) + +func (d *Discoverer) String() string { + return fullName +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + close(d.started) + + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.interval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Warning(err) + return + } + } + } +} + +func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error { + bs, err := d.ll.discover(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return err + } + + tgts, err := d.parseLocalListeners(bs) + if err != nil { + return err + } + + tggs := d.processTargets(tgts) + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup { + tgg := &targetGroup{ + provider: fullName, + source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + if d.expiryTime.Milliseconds() == 0 { + tgg.targets = tgts + return []model.TargetGroup{tgg} + } + + now := time.Now() + + for _, tgt := range tgts { + hash := tgt.Hash() + if _, ok := d.cache[hash]; !ok { + d.cache[hash] = &cacheItem{tgt: tgt} + } + d.cache[hash].lastSeenTime = now + } + + for k, v := range d.cache { + if now.Sub(v.lastSeenTime) > d.expiryTime { + delete(d.cache, k) + continue + } + tgg.targets = append(tgg.targets, v.tgt) + } + + return []model.TargetGroup{tgg} +} + +func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { + const ( + local4 = "127.0.0.1" + local6 = "::1" + ) + + var targets []target + sc := bufio.NewScanner(bytes.NewReader(bs)) + + for sc.Scan() { + text := strings.TrimSpace(sc.Text()) + if text == "" { + continue + } + + // Protocol|IPAddress|Port|Cmdline + parts := strings.SplitN(text, "|", 4) + if len(parts) != 4 { + return nil, fmt.Errorf("unexpected data: '%s'", text) + } + + tgt := target{ + Protocol: parts[0], + IPAddress: parts[1], + Port: parts[2], + Comm: extractComm(parts[3]), + Cmdline: parts[3], + } + + if tgt.Comm == "docker-proxy" { + continue + } + + if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") { + tgt.IPAddress = local4 + } else if tgt.IPAddress == "::" { + tgt.IPAddress = local6 + } + + // quick support for https://github.com/netdata/netdata/pull/17866 + // TODO: create both ipv4 and ipv6 targets? + if tgt.IPAddress == "*" { + tgt.IPAddress = local4 + } + + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + targets = append(targets, tgt) + } + + // order: TCP, TCP6, UDP, UDP6 + sort.Slice(targets, func(i, j int) bool { + tgt1, tgt2 := targets[i], targets[j] + if tgt1.Protocol != tgt2.Protocol { + return tgt1.Protocol < tgt2.Protocol + } + + p1, _ := strconv.Atoi(targets[i].Port) + p2, _ := strconv.Atoi(targets[j].Port) + if p1 != p2 { + return p1 < p2 + } + + return tgt1.IPAddress == local4 || tgt1.IPAddress == local6 + }) + + seen := make(map[string]bool) + tgts := make([]model.Target, len(targets)) + var n int + + for _, tgt := range targets { + tgt := tgt + + proto := strings.TrimSuffix(tgt.Protocol, "6") + key := tgt.Protocol + ":" + tgt.Address + keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port) + keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port) + + // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface (a.b.c.d). + // Create a target only for localhost. Assumption: any address always goes first. + if seen[key] || seen[keyLocal4] || seen[keyLocal6] { + continue + } + seen[key] = true + + tgts[n] = &tgt + n++ + } + + return tgts[:n], nil +} + +type localListenersExec struct { + binPath string + timeout time.Duration +} + +func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) { + execCtx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + + // TCPv4/6 and UPDv4 sockets in LISTEN state + // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c + args := []string{ + "no-udp6", + "no-local", + "no-inbound", + "no-outbound", + "no-namespaces", + } + + cmd := exec.CommandContext(execCtx, e.binPath, args...) + + bs, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("error on executing '%s': %v", cmd, err) + } + + return bs, nil +} + +func extractComm(cmdLine string) string { + if i := strings.IndexByte(cmdLine, ' '); i != -1 { + cmdLine = cmdLine[:i] + } + _, comm := filepath.Split(cmdLine) + return strings.TrimSuffix(comm, ":") +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go new file mode 100644 index 000000000..9b3cae801 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go @@ -0,0 +1,169 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" +) + +func TestDiscoverer_Discover(t *testing.T) { + tests := map[string]discoverySim{ + "add listeners": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP|127.0.0.1|323|/usr/sbin/chronyd") + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|2001:DB8::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|0.0.0.0|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|192.0.2.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + cli.addListener("TCP46|*|80|/usr/sbin/httpd -k start") + cli.addListener("TCP6|::|80|/usr/sbin/apache2 -k start") + cli.addListener("TCP|0.0.0.0|80|/usr/sbin/apache2 -k start") + cli.addListener("TCP|0.0.0.0|8080|/usr/sbin/docker-proxy -proto tcp -host-ip 0.0.0.0 -host-port 8080 -container-ip 172.17.0.4 -container-port 80") + cli.addListener("TCP6|::|8080|/usr/sbin/docker-proxy -proto tcp -host-ip :: -host-port 8080 -container-ip 172.17.0.4 -container-port 80") + time.Sleep(interval * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "UDP", + IPAddress: "127.0.0.1", + Port: "323", + Address: "127.0.0.1:323", + Comm: "chronyd", + Cmdline: "/usr/sbin/chronyd", + }), + withHash(&target{ + Protocol: "TCP46", + IPAddress: "127.0.0.1", + Port: "80", + Address: "127.0.0.1:80", + Comm: "httpd", + Cmdline: "/usr/sbin/httpd -k start", + }), + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "80", + Address: "127.0.0.1:80", + Comm: "apache2", + Cmdline: "/usr/sbin/apache2 -k start", + }), + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "8125", + Address: "127.0.0.1:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "UDP", + IPAddress: "127.0.0.1", + Port: "53768", + Address: "127.0.0.1:53768", + Comm: "go.d.plugin", + Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1", + }), + withHash(&target{ + Protocol: "UDP6", + IPAddress: "::1", + Port: "8125", + Address: "[::1]:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + }, + }}, + }, + "remove listeners; not expired": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "UDP6", + IPAddress: "::1", + Port: "8125", + Address: "[::1]:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "8125", + Address: "127.0.0.1:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "UDP", + IPAddress: "127.0.0.1", + Port: "53768", + Address: "127.0.0.1:53768", + Comm: "go.d.plugin", + Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1", + }), + }, + }}, + }, + "remove listeners; expired": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(expiry * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "8125", + Address: "127.0.0.1:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + }, + }}, + }, + } + + for name, sim := range tests { + t.Run(name, func(t *testing.T) { + sim.run(t) + }) + } +} + +func withHash(l *target) *target { + l.hash, _ = calcHash(l) + tags, _ := model.ParseTags("netlisteners") + l.Tags().Merge(tags) + return l +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/sim_test.go new file mode 100644 index 000000000..4cb65832d --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/sim_test.go @@ -0,0 +1,167 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "context" + "errors" + "slices" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type listenersCli interface { + addListener(s string) + removeListener(s string) +} + +type discoverySim struct { + listenersCli func(cli listenersCli, interval, expiry time.Duration) + wantGroups []model.TargetGroup +} + +func (sim *discoverySim) run(t *testing.T) { + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "netlisteners", + }) + require.NoError(t, err) + + mock := newMockLocalListenersExec() + + d.ll = mock + + d.interval = time.Millisecond * 100 + d.expiryTime = time.Second * 1 + + seen := make(map[string]model.TargetGroup) + ctx, cancel := context.WithCancel(context.Background()) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } + } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") + } + + sim.listenersCli(mock, d.interval, d.expiryTime) + + cancel() + + select { + case <-done: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) +} + +func newMockLocalListenersExec() *mockLocalListenersExec { + return &mockLocalListenersExec{} +} + +type mockLocalListenersExec struct { + errResponse bool + mux sync.Mutex + listeners []string +} + +func (m *mockLocalListenersExec) addListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() + + m.listeners = append(m.listeners, s) +} + +func (m *mockLocalListenersExec) removeListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() + + if i := slices.Index(m.listeners, s); i != -1 { + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) + } +} + +func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) { + if m.errResponse { + return nil, errors.New("mock discover() error") + } + + m.mux.Lock() + defer m.mux.Unlock() + + var buf strings.Builder + for _, s := range m.listeners { + buf.WriteString(s) + buf.WriteByte('\n') + } + + return []byte(buf.String()), nil +} + +func calcTargets(tggs []model.TargetGroup) int { + var n int + for _, tgg := range tggs { + n += len(tgg.Targets()) + } + return n +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) + + for idx := range tggs { + tgts := tggs[idx].Targets() + sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/target.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/target.go new file mode 100644 index 000000000..9d57d3cc7 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/target.go @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "fmt" + "strings" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" +) + +type targetGroup struct { + provider string + source string + targets []model.Target +} + +func (g *targetGroup) Provider() string { return g.provider } +func (g *targetGroup) Source() string { return g.source } +func (g *targetGroup) Targets() []model.Target { return g.targets } + +type target struct { + model.Base + + hash uint64 + + Protocol string + IPAddress string + Port string + Comm string + Cmdline string + + Address string // "IPAddress:Port" +} + +func (t *target) TUID() string { return tuid(t) } +func (t *target) Hash() uint64 { return t.hash } + +func tuid(tgt *target) string { + return fmt.Sprintf("%s_%s_%d", strings.ToLower(tgt.Protocol), tgt.Port, tgt.hash) +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/model/discoverer.go b/src/go/plugin/go.d/agent/discovery/sd/model/discoverer.go new file mode 100644 index 000000000..301322d32 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/model/discoverer.go @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package model + +import ( + "context" +) + +type Discoverer interface { + Discover(ctx context.Context, ch chan<- []TargetGroup) +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/model/tags.go b/src/go/plugin/go.d/agent/discovery/sd/model/tags.go new file mode 100644 index 000000000..22517d77e --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/model/tags.go @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package model + +import ( + "fmt" + "sort" + "strings" +) + +type Base struct { + tags Tags +} + +func (b *Base) Tags() Tags { + if b.tags == nil { + b.tags = NewTags() + } + return b.tags +} + +type Tags map[string]struct{} + +func NewTags() Tags { + return Tags{} +} + +func (t Tags) Merge(tags Tags) { + for tag := range tags { + if strings.HasPrefix(tag, "-") { + delete(t, tag[1:]) + } else { + t[tag] = struct{}{} + } + } +} + +func (t Tags) Clone() Tags { + ts := NewTags() + ts.Merge(t) + return ts +} + +func (t Tags) String() string { + ts := make([]string, 0, len(t)) + for key := range t { + ts = append(ts, key) + } + sort.Strings(ts) + return fmt.Sprintf("{%s}", strings.Join(ts, ", ")) +} + +func ParseTags(line string) (Tags, error) { + words := strings.Fields(line) + if len(words) == 0 { + return NewTags(), nil + } + + tags := NewTags() + for _, tag := range words { + if !isTagWordValid(tag) { + return nil, fmt.Errorf("tags '%s' contains tag '%s' with forbidden symbol", line, tag) + } + tags[tag] = struct{}{} + } + return tags, nil +} + +func isTagWordValid(word string) bool { + // valid: + // ^[a-zA-Z][a-zA-Z0-9=_.]*$ + word = strings.TrimPrefix(word, "-") + if len(word) == 0 { + return false + } + for i, b := range word { + switch { + default: + return false + case b >= 'a' && b <= 'z': + case b >= 'A' && b <= 'Z': + case b >= '0' && b <= '9' && i > 0: + case (b == '=' || b == '_' || b == '.') && i > 0: + } + } + return true +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/model/tags_test.go b/src/go/plugin/go.d/agent/discovery/sd/model/tags_test.go new file mode 100644 index 000000000..4f07bcbf6 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/model/tags_test.go @@ -0,0 +1,3 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package model diff --git a/src/go/plugin/go.d/agent/discovery/sd/model/target.go b/src/go/plugin/go.d/agent/discovery/sd/model/target.go new file mode 100644 index 000000000..eb2bd9d51 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/model/target.go @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package model + +type Target interface { + Hash() uint64 + Tags() Tags + TUID() string +} + +type TargetGroup interface { + Targets() []Target + Provider() string + Source() string +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/accumulator.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/accumulator.go new file mode 100644 index 000000000..60c901492 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/accumulator.go @@ -0,0 +1,152 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" +) + +func newAccumulator() *accumulator { + return &accumulator{ + send: make(chan struct{}, 1), + sendEvery: time.Second * 2, + mux: &sync.Mutex{}, + tggs: make(map[string]model.TargetGroup), + } +} + +type accumulator struct { + *logger.Logger + discoverers []model.Discoverer + send chan struct{} + sendEvery time.Duration + mux *sync.Mutex + tggs map[string]model.TargetGroup +} + +func (a *accumulator) run(ctx context.Context, in chan []model.TargetGroup) { + updates := make(chan []model.TargetGroup) + + var wg sync.WaitGroup + for _, d := range a.discoverers { + wg.Add(1) + d := d + go func() { defer wg.Done(); a.runDiscoverer(ctx, d, updates) }() + } + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + tk := time.NewTicker(a.sendEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + a.Info("all discoverers exited") + case <-time.After(time.Second * 3): + a.Warning("not all discoverers exited") + } + a.trySend(in) + return + case <-done: + if !isDone(ctx) { + a.Info("all discoverers exited before ctx done") + } else { + a.Info("all discoverers exited") + } + a.trySend(in) + return + case <-tk.C: + select { + case <-a.send: + a.trySend(in) + default: + } + } + } +} + +func (a *accumulator) runDiscoverer(ctx context.Context, d model.Discoverer, updates chan []model.TargetGroup) { + done := make(chan struct{}) + go func() { defer close(done); d.Discover(ctx, updates) }() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + case <-time.After(time.Second * 2): + a.Warningf("discoverer '%v' didn't exit on ctx done", d) + } + return + case <-done: + if !isDone(ctx) { + a.Infof("discoverer '%v' exited before ctx done", d) + } + return + case tggs := <-updates: + a.mux.Lock() + a.groupsUpdate(tggs) + a.mux.Unlock() + a.triggerSend() + } + } +} + +func (a *accumulator) trySend(in chan<- []model.TargetGroup) { + a.mux.Lock() + defer a.mux.Unlock() + + select { + case in <- a.groupsList(): + a.groupsReset() + default: + a.triggerSend() + } +} + +func (a *accumulator) triggerSend() { + select { + case a.send <- struct{}{}: + default: + } +} + +func (a *accumulator) groupsUpdate(tggs []model.TargetGroup) { + for _, tgg := range tggs { + a.tggs[tgg.Source()] = tgg + } +} + +func (a *accumulator) groupsReset() { + for key := range a.tggs { + delete(a.tggs, key) + } +} + +func (a *accumulator) groupsList() []model.TargetGroup { + tggs := make([]model.TargetGroup, 0, len(a.tggs)) + for _, tgg := range a.tggs { + if tgg != nil { + tggs = append(tggs, tgg) + } + } + return tggs +} + +func isDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/classify.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/classify.go new file mode 100644 index 000000000..a7490d2e0 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/classify.go @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "bytes" + "fmt" + "strings" + "text/template" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" +) + +func newTargetClassificator(cfg []ClassifyRuleConfig) (*targetClassificator, error) { + rules, err := newClassifyRules(cfg) + if err != nil { + return nil, err + } + + c := &targetClassificator{ + rules: rules, + buf: bytes.Buffer{}, + } + + return c, nil +} + +type ( + targetClassificator struct { + *logger.Logger + rules []*classifyRule + buf bytes.Buffer + } + + classifyRule struct { + name string + sr selector + tags model.Tags + match []*classifyRuleMatch + } + classifyRuleMatch struct { + tags model.Tags + expr *template.Template + } +) + +func (c *targetClassificator) classify(tgt model.Target) model.Tags { + tgtTags := tgt.Tags().Clone() + var tags model.Tags + + for i, rule := range c.rules { + if !rule.sr.matches(tgtTags) { + continue + } + + for j, match := range rule.match { + c.buf.Reset() + + if err := match.expr.Execute(&c.buf, tgt); err != nil { + c.Warningf("failed to execute classify rule[%d]->match[%d]->expr on target '%s'", i+1, j+1, tgt.TUID()) + continue + } + if strings.TrimSpace(c.buf.String()) != "true" { + continue + } + + if tags == nil { + tags = model.NewTags() + } + + tags.Merge(rule.tags) + tags.Merge(match.tags) + tgtTags.Merge(tags) + } + } + + return tags +} + +func newClassifyRules(cfg []ClassifyRuleConfig) ([]*classifyRule, error) { + var rules []*classifyRule + + fmap := newFuncMap() + + for i, ruleCfg := range cfg { + i++ + rule := classifyRule{name: ruleCfg.Name} + + sr, err := parseSelector(ruleCfg.Selector) + if err != nil { + return nil, fmt.Errorf("rule '%d': %v", i, err) + } + rule.sr = sr + + tags, err := model.ParseTags(ruleCfg.Tags) + if err != nil { + return nil, fmt.Errorf("rule '%d': %v", i, err) + } + rule.tags = tags + + for j, matchCfg := range ruleCfg.Match { + j++ + var match classifyRuleMatch + + tags, err := model.ParseTags(matchCfg.Tags) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + match.tags = tags + + tmpl, err := parseTemplate(matchCfg.Expr, fmap) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + match.expr = tmpl + + rule.match = append(rule.match, &match) + } + + rules = append(rules, &rule) + } + + return rules, nil +} + +func parseTemplate(s string, fmap template.FuncMap) (*template.Template, error) { + return template.New("root"). + Option("missingkey=error"). + Funcs(fmap). + Parse(s) +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/classify_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/classify_test.go new file mode 100644 index 000000000..606e3411c --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/classify_test.go @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestTargetClassificator_classify(t *testing.T) { + config := ` +- selector: "rule0" + tags: "skip" + match: + - tags: "skip" + expr: '{{ glob .Name "*" }}' +- selector: "!skip rule1" + tags: "foo1" + match: + - tags: "bar1" + expr: '{{ glob .Name "mock*1*" }}' + - tags: "bar2" + expr: '{{ glob .Name "mock*2*" }}' +- selector: "!skip rule2" + tags: "foo2" + match: + - tags: "bar3" + expr: '{{ glob .Name "mock*3*" }}' + - tags: "bar4" + expr: '{{ glob .Name "mock*4*" }}' +- selector: "rule3" + tags: "foo3" + match: + - tags: "bar5" + expr: '{{ glob .Name "mock*5*" }}' + - tags: "bar6" + expr: '{{ glob .Name "mock*6*" }}' +` + tests := map[string]struct { + target model.Target + wantTags model.Tags + }{ + "no rules match": { + target: newMockTarget("mock1"), + wantTags: nil, + }, + "one rule one match": { + target: newMockTarget("mock4", "rule2"), + wantTags: mustParseTags("foo2 bar4"), + }, + "one rule two match": { + target: newMockTarget("mock56", "rule3"), + wantTags: mustParseTags("foo3 bar5 bar6"), + }, + "all rules all matches": { + target: newMockTarget("mock123456", "rule1 rule2 rule3"), + wantTags: mustParseTags("foo1 foo2 foo3 bar1 bar2 bar3 bar4 bar5 bar6"), + }, + "applying labels after every rule": { + target: newMockTarget("mock123456", "rule0 rule1 rule2 rule3"), + wantTags: mustParseTags("skip foo3 bar5 bar6"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var cfg []ClassifyRuleConfig + + err := yaml.Unmarshal([]byte(config), &cfg) + require.NoError(t, err, "yaml unmarshalling of config") + + clr, err := newTargetClassificator(cfg) + require.NoError(t, err, "targetClassificator creation") + + assert.Equal(t, test.wantTags, clr.classify(test.target)) + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/compose.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/compose.go new file mode 100644 index 000000000..80830fd6d --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/compose.go @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "bytes" + "errors" + "fmt" + "text/template" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "gopkg.in/yaml.v2" +) + +func newConfigComposer(cfg []ComposeRuleConfig) (*configComposer, error) { + rules, err := newComposeRules(cfg) + if err != nil { + return nil, err + } + + c := &configComposer{ + rules: rules, + buf: bytes.Buffer{}, + } + + return c, nil +} + +type ( + configComposer struct { + *logger.Logger + rules []*composeRule + buf bytes.Buffer + } + + composeRule struct { + name string + sr selector + conf []*composeRuleConf + } + composeRuleConf struct { + sr selector + tmpl *template.Template + } +) + +func (c *configComposer) compose(tgt model.Target) []confgroup.Config { + var configs []confgroup.Config + + for i, rule := range c.rules { + if !rule.sr.matches(tgt.Tags()) { + continue + } + + for j, conf := range rule.conf { + if !conf.sr.matches(tgt.Tags()) { + continue + } + + c.buf.Reset() + + if err := conf.tmpl.Execute(&c.buf, tgt); err != nil { + c.Warningf("failed to execute rule[%d]->config[%d]->template on target '%s': %v", + i+1, j+1, tgt.TUID(), err) + continue + } + if c.buf.Len() == 0 { + continue + } + + cfgs, err := c.parseTemplateData(c.buf.Bytes()) + if err != nil { + c.Warningf("failed to parse template data: %v", err) + continue + } + + configs = append(configs, cfgs...) + } + } + + if len(configs) > 0 { + c.Debugf("created %d config(s) for target '%s'", len(configs), tgt.TUID()) + } + return configs +} + +func (c *configComposer) parseTemplateData(bs []byte) ([]confgroup.Config, error) { + var data any + if err := yaml.Unmarshal(bs, &data); err != nil { + return nil, err + } + + type ( + single = map[any]any + multi = []any + ) + + switch data.(type) { + case single: + var cfg confgroup.Config + if err := yaml.Unmarshal(bs, &cfg); err != nil { + return nil, err + } + return []confgroup.Config{cfg}, nil + case multi: + var cfgs []confgroup.Config + if err := yaml.Unmarshal(bs, &cfgs); err != nil { + return nil, err + } + return cfgs, nil + default: + return nil, errors.New("unknown config format") + } +} + +func newComposeRules(cfg []ComposeRuleConfig) ([]*composeRule, error) { + var rules []*composeRule + + fmap := newFuncMap() + + for i, ruleCfg := range cfg { + i++ + rule := composeRule{name: ruleCfg.Name} + + sr, err := parseSelector(ruleCfg.Selector) + if err != nil { + return nil, fmt.Errorf("rule '%d': %v", i, err) + } + rule.sr = sr + + for j, confCfg := range ruleCfg.Config { + j++ + var conf composeRuleConf + + sr, err := parseSelector(confCfg.Selector) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + conf.sr = sr + + tmpl, err := parseTemplate(confCfg.Template, fmap) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + conf.tmpl = tmpl + + rule.conf = append(rule.conf, &conf) + } + + rules = append(rules, &rule) + } + + return rules, nil +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/compose_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/compose_test.go new file mode 100644 index 000000000..1c56bf086 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/compose_test.go @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestConfigComposer_compose(t *testing.T) { + config := ` +- selector: "rule1" + config: + - selector: "bar1" + template: | + name: {{ .Name }}-1 + - selector: "bar2" + template: | + name: {{ .Name }}-2 +- selector: "rule2" + config: + - selector: "bar3" + template: | + name: {{ .Name }}-3 + - selector: "bar4" + template: | + name: {{ .Name }}-4 +- selector: "rule3" + config: + - selector: "bar5" + template: | + name: {{ .Name }}-5 + - selector: "bar6" + template: | + - name: {{ .Name }}-6 + - name: {{ .Name }}-7 +` + tests := map[string]struct { + target model.Target + wantConfigs []confgroup.Config + }{ + "no rules matches": { + target: newMockTarget("mock"), + wantConfigs: nil, + }, + "one rule one config": { + target: newMockTarget("mock", "rule1 bar1"), + wantConfigs: []confgroup.Config{ + {"name": "mock-1"}, + }, + }, + "one rule two config": { + target: newMockTarget("mock", "rule2 bar3 bar4"), + wantConfigs: []confgroup.Config{ + {"name": "mock-3"}, + {"name": "mock-4"}, + }, + }, + "all rules all configs": { + target: newMockTarget("mock", "rule1 bar1 bar2 rule2 bar3 bar4 rule3 bar5 bar6"), + wantConfigs: []confgroup.Config{ + {"name": "mock-1"}, + {"name": "mock-2"}, + {"name": "mock-3"}, + {"name": "mock-4"}, + {"name": "mock-5"}, + {"name": "mock-6"}, + {"name": "mock-7"}, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var cfg []ComposeRuleConfig + + err := yaml.Unmarshal([]byte(config), &cfg) + require.NoErrorf(t, err, "yaml unmarshalling of config") + + cmr, err := newConfigComposer(cfg) + require.NoErrorf(t, err, "configComposer creation") + + assert.Equal(t, test.wantConfigs, cmr.compose(test.target)) + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/config.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/config.go new file mode 100644 index 000000000..9df7ec59d --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/config.go @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "errors" + "fmt" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/dockerd" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/kubernetes" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/netlisteners" +) + +type Config struct { + Source string `yaml:"-"` + ConfigDefaults confgroup.Registry `yaml:"-"` + + Disabled bool `yaml:"disabled"` + Name string `yaml:"name"` + Discover []DiscoveryConfig `yaml:"discover"` + Classify []ClassifyRuleConfig `yaml:"classify"` + Compose []ComposeRuleConfig `yaml:"compose"` +} + +type DiscoveryConfig struct { + Discoverer string `yaml:"discoverer"` + NetListeners netlisteners.Config `yaml:"net_listeners"` + Docker dockerd.Config `yaml:"docker"` + K8s []kubernetes.Config `yaml:"k8s"` +} + +type ClassifyRuleConfig struct { + Name string `yaml:"name"` + Selector string `yaml:"selector"` // mandatory + Tags string `yaml:"tags"` // mandatory + Match []struct { + Tags string `yaml:"tags"` // mandatory + Expr string `yaml:"expr"` // mandatory + } `yaml:"match"` // mandatory, at least 1 +} + +type ComposeRuleConfig struct { + Name string `yaml:"name"` // optional + Selector string `yaml:"selector"` // mandatory + Config []struct { + Selector string `yaml:"selector"` // mandatory + Template string `yaml:"template"` // mandatory + } `yaml:"config"` // mandatory, at least 1 +} + +func validateConfig(cfg Config) error { + if cfg.Name == "" { + return errors.New("'name' not set") + } + if err := validateDiscoveryConfig(cfg.Discover); err != nil { + return fmt.Errorf("discover config: %v", err) + } + if err := validateClassifyConfig(cfg.Classify); err != nil { + return fmt.Errorf("classify rules: %v", err) + } + if err := validateComposeConfig(cfg.Compose); err != nil { + return fmt.Errorf("compose rules: %v", err) + } + return nil +} + +func validateDiscoveryConfig(config []DiscoveryConfig) error { + if len(config) == 0 { + return errors.New("no discoverers, must be at least one") + } + for _, cfg := range config { + switch cfg.Discoverer { + case "net_listeners", "docker", "k8s": + default: + return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer) + } + } + return nil +} + +func validateClassifyConfig(rules []ClassifyRuleConfig) error { + if len(rules) == 0 { + return errors.New("empty config, need least 1 rule") + } + for i, rule := range rules { + i++ + if rule.Selector == "" { + return fmt.Errorf("'rule[%s][%d]->selector' not set", rule.Name, i) + } + if rule.Tags == "" { + return fmt.Errorf("'rule[%s][%d]->tags' not set", rule.Name, i) + } + if len(rule.Match) == 0 { + return fmt.Errorf("'rule[%s][%d]->match' not set, need at least 1 rule match", rule.Name, i) + } + + for j, match := range rule.Match { + j++ + if match.Tags == "" { + return fmt.Errorf("'rule[%s][%d]->match[%d]->tags' not set", rule.Name, i, j) + } + if match.Expr == "" { + return fmt.Errorf("'rule[%s][%d]->match[%d]->expr' not set", rule.Name, i, j) + } + } + } + return nil +} + +func validateComposeConfig(rules []ComposeRuleConfig) error { + if len(rules) == 0 { + return errors.New("empty config, need least 1 rule") + } + for i, rule := range rules { + i++ + if rule.Selector == "" { + return fmt.Errorf("'rule[%s][%d]->selector' not set", rule.Name, i) + } + + if len(rule.Config) == 0 { + return fmt.Errorf("'rule[%s][%d]->config' not set", rule.Name, i) + } + + for j, conf := range rule.Config { + j++ + if conf.Selector == "" { + return fmt.Errorf("'rule[%s][%d]->config[%d]->selector' not set", rule.Name, i, j) + } + if conf.Template == "" { + return fmt.Errorf("'rule[%s][%d]->config[%d]->template' not set", rule.Name, i, j) + } + } + } + return nil +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go new file mode 100644 index 000000000..5ed188a54 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "regexp" + "strconv" + "text/template" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/matcher" + + "github.com/Masterminds/sprig/v3" + "github.com/bmatcuk/doublestar/v4" +) + +func newFuncMap() template.FuncMap { + custom := map[string]interface{}{ + "match": funcMatchAny, + "glob": func(value, pattern string, patterns ...string) bool { + return funcMatchAny("glob", value, pattern, patterns...) + }, + "promPort": func(port string) string { + v, _ := strconv.Atoi(port) + return prometheusPortAllocations[v] + }, + } + + fm := sprig.HermeticTxtFuncMap() + + for name, fn := range custom { + fm[name] = fn + } + + return fm +} + +func funcMatchAny(typ, value, pattern string, patterns ...string) bool { + switch len(patterns) { + case 0: + return funcMatch(typ, value, pattern) + default: + return funcMatch(typ, value, pattern) || funcMatchAny(typ, value, patterns[0], patterns[1:]...) + } +} + +func funcMatch(typ string, value, pattern string) bool { + switch typ { + case "glob", "": + m, err := matcher.NewGlobMatcher(pattern) + return err == nil && m.MatchString(value) + case "sp": + m, err := matcher.NewSimplePatternsMatcher(pattern) + return err == nil && m.MatchString(value) + case "re": + ok, err := regexp.MatchString(pattern, value) + return err == nil && ok + case "dstar": + ok, err := doublestar.Match(pattern, value) + return err == nil && ok + default: + return false + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap_test.go new file mode 100644 index 000000000..3de71ef70 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap_test.go @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_funcMatchAny(t *testing.T) { + tests := map[string]struct { + typ string + patterns []string + value string + wantMatch bool + }{ + "dstar: one param, matches": { + wantMatch: true, + typ: "dstar", + patterns: []string{"*"}, + value: "value", + }, + "dstar: one param, matches with *": { + wantMatch: true, + typ: "dstar", + patterns: []string{"**/value"}, + value: "/one/two/three/value", + }, + "dstar: one param, not matches": { + wantMatch: false, + typ: "dstar", + patterns: []string{"Value"}, + value: "value", + }, + "dstar: several params, last one matches": { + wantMatch: true, + typ: "dstar", + patterns: []string{"not", "matches", "*"}, + value: "value", + }, + "dstar: several params, no matches": { + wantMatch: false, + typ: "dstar", + patterns: []string{"not", "matches", "really"}, + value: "value", + }, + "re: one param, matches": { + wantMatch: true, + typ: "re", + patterns: []string{"^value$"}, + value: "value", + }, + "re: one param, not matches": { + wantMatch: false, + typ: "re", + patterns: []string{"^Value$"}, + value: "value", + }, + "re: several params, last one matches": { + wantMatch: true, + typ: "re", + patterns: []string{"not", "matches", "va[lue]{3}"}, + value: "value", + }, + "re: several params, no matches": { + wantMatch: false, + typ: "re", + patterns: []string{"not", "matches", "val[^l]ue"}, + value: "value", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ok := funcMatchAny(test.typ, test.value, test.patterns[0], test.patterns[1:]...) + + assert.Equal(t, test.wantMatch, ok) + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go new file mode 100644 index 000000000..4d391d41e --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go @@ -0,0 +1,236 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/dockerd" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/kubernetes" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/netlisteners" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/hostinfo" +) + +func New(cfg Config) (*Pipeline, error) { + if err := validateConfig(cfg); err != nil { + return nil, err + } + + clr, err := newTargetClassificator(cfg.Classify) + if err != nil { + return nil, fmt.Errorf("classify rules: %v", err) + } + + cmr, err := newConfigComposer(cfg.Compose) + if err != nil { + return nil, fmt.Errorf("compose rules: %v", err) + } + + p := &Pipeline{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("pipeline", cfg.Name), + ), + configDefaults: cfg.ConfigDefaults, + clr: clr, + cmr: cmr, + accum: newAccumulator(), + discoverers: make([]model.Discoverer, 0), + configs: make(map[string]map[uint64][]confgroup.Config), + } + p.accum.Logger = p.Logger + + if err := p.registerDiscoverers(cfg); err != nil { + return nil, err + } + + return p, nil +} + +type ( + Pipeline struct { + *logger.Logger + + configDefaults confgroup.Registry + discoverers []model.Discoverer + accum *accumulator + clr classificator + cmr composer + configs map[string]map[uint64][]confgroup.Config // [targetSource][targetHash] + } + classificator interface { + classify(model.Target) model.Tags + } + composer interface { + compose(model.Target) []confgroup.Config + } +) + +func (p *Pipeline) registerDiscoverers(conf Config) error { + for _, cfg := range conf.Discover { + switch cfg.Discoverer { + case "net_listeners": + cfg.NetListeners.Source = conf.Source + td, err := netlisteners.NewDiscoverer(cfg.NetListeners) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + case "docker": + if hostinfo.IsInsideK8sCluster() { + p.Infof("not registering '%s' discoverer: disabled in k8s environment", cfg.Discoverer) + continue + } + cfg.Docker.Source = conf.Source + td, err := dockerd.NewDiscoverer(cfg.Docker) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + case "k8s": + for _, k8sCfg := range cfg.K8s { + td, err := kubernetes.NewKubeDiscoverer(k8sCfg) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + } + default: + return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer) + } + } + + if len(p.discoverers) == 0 { + return errors.New("no discoverers registered") + } + + return nil +} + +func (p *Pipeline) Run(ctx context.Context, in chan<- []*confgroup.Group) { + p.Info("instance is started") + defer p.Info("instance is stopped") + + p.accum.discoverers = p.discoverers + + updates := make(chan []model.TargetGroup) + done := make(chan struct{}) + + go func() { defer close(done); p.accum.run(ctx, updates) }() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + case <-time.After(time.Second * 4): + } + return + case <-done: + return + case tggs := <-updates: + p.Debugf("received %d target groups", len(tggs)) + if cfggs := p.processGroups(tggs); len(cfggs) > 0 { + select { + case <-ctx.Done(): + case in <- cfggs: // FIXME: potentially stale configs if upstream cannot receive (blocking) + } + } + } + } +} + +func (p *Pipeline) processGroups(tggs []model.TargetGroup) []*confgroup.Group { + var groups []*confgroup.Group + // updates come from the accumulator, this ensures that all groups have different sources + for _, tgg := range tggs { + p.Debugf("processing group '%s' with %d target(s)", tgg.Source(), len(tgg.Targets())) + if v := p.processGroup(tgg); v != nil { + groups = append(groups, v) + } + } + return groups +} + +func (p *Pipeline) processGroup(tgg model.TargetGroup) *confgroup.Group { + if len(tgg.Targets()) == 0 { + if _, ok := p.configs[tgg.Source()]; !ok { + return nil + } + delete(p.configs, tgg.Source()) + + return &confgroup.Group{Source: tgg.Source()} + } + + targetsCache, ok := p.configs[tgg.Source()] + if !ok { + targetsCache = make(map[uint64][]confgroup.Config) + p.configs[tgg.Source()] = targetsCache + } + + var changed bool + seen := make(map[uint64]bool) + + for _, tgt := range tgg.Targets() { + if tgt == nil { + continue + } + + hash := tgt.Hash() + seen[hash] = true + + if _, ok := targetsCache[hash]; ok { + continue + } + + targetsCache[hash] = nil + + if tags := p.clr.classify(tgt); len(tags) > 0 { + tgt.Tags().Merge(tags) + + if cfgs := p.cmr.compose(tgt); len(cfgs) > 0 { + targetsCache[hash] = cfgs + changed = true + + for _, cfg := range cfgs { + cfg.SetProvider(tgg.Provider()) + cfg.SetSource(tgg.Source()) + cfg.SetSourceType(confgroup.TypeDiscovered) + if def, ok := p.configDefaults.Lookup(cfg.Module()); ok { + cfg.ApplyDefaults(def) + } + } + } + } + } + + for hash := range targetsCache { + if seen[hash] { + continue + } + if cfgs := targetsCache[hash]; len(cfgs) > 0 { + changed = true + } + delete(targetsCache, hash) + } + + if !changed { + return nil + } + + // TODO: deepcopy? + cfgGroup := &confgroup.Group{Source: tgg.Source()} + + for _, cfgs := range targetsCache { + cfgGroup.Configs = append(cfgGroup.Configs, cfgs...) + } + + return cfgGroup +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go new file mode 100644 index 000000000..e67b6d7ce --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go @@ -0,0 +1,303 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/ilyam8/hashstructure" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func Test_defaultConfigs(t *testing.T) { + dir := "../../../../config/go.d/sd/" + entries, err := os.ReadDir(dir) + require.NoError(t, err) + + require.NotEmpty(t, entries) + + for _, e := range entries { + if strings.Contains(e.Name(), "prometheus") { + continue + } + file, err := filepath.Abs(filepath.Join(dir, e.Name())) + require.NoError(t, err, "abs path") + + bs, err := os.ReadFile(file) + require.NoError(t, err, "read config file") + + var cfg Config + require.NoError(t, yaml.Unmarshal(bs, &cfg), "unmarshal") + + _, err = New(cfg) + require.NoError(t, err, "create pipeline") + } +} + +func TestNew(t *testing.T) { + tests := map[string]struct { + config string + wantErr bool + }{ + "fails when config unset": { + wantErr: true, + config: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + + var cfg Config + err := yaml.Unmarshal([]byte(test.config), &cfg) + require.Nilf(t, err, "cfg unmarshal") + + _, err = New(cfg) + + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestPipeline_Run(t *testing.T) { + const config = ` +classify: + - selector: "rule1" + tags: "foo1" + match: + - tags: "bar1" + expr: '{{ glob .Name "mock*1*" }}' + - tags: "bar2" + expr: '{{ glob .Name "mock*2*" }}' +compose: + - selector: "foo1" + config: + - selector: "bar1" + template: | + name: {{ .Name }}-foobar1 + - selector: "bar2" + template: | + name: {{ .Name }}-foobar2 +` + tests := map[string]discoverySim{ + "new group with no targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("", + newMockTargetGroup("test"), + ), + }, + wantClassifyCalls: 0, + wantComposeCalls: 0, + wantConfGroups: nil, + }, + "new group with targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + }, + wantClassifyCalls: 2, + wantComposeCalls: 2, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + }, + }, + "existing group with same targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test", "mock1", "mock2"), + ), + }, + wantClassifyCalls: 2, + wantComposeCalls: 2, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + }, + }, + "existing group that previously had targets with no targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test"), + ), + }, + wantClassifyCalls: 2, + wantComposeCalls: 2, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + prepareDiscoveredGroup(), + }, + }, + "existing group with old and new targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test", "mock1", "mock2", "mock11", "mock22"), + ), + }, + wantClassifyCalls: 4, + wantComposeCalls: 4, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2", "mock11-foobar1", "mock22-foobar2"), + }, + }, + "existing group with new targets only": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test", "mock11", "mock22"), + ), + }, + wantClassifyCalls: 4, + wantComposeCalls: 4, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + prepareDiscoveredGroup("mock11-foobar1", "mock22-foobar2"), + }, + }, + } + + for name, sim := range tests { + t.Run(name, func(t *testing.T) { + sim.run(t) + }) + } +} + +func prepareDiscoveredGroup(configNames ...string) *confgroup.Group { + var configs []confgroup.Config + + for _, name := range configNames { + configs = append(configs, confgroup.Config{}. + SetProvider("mock"). + SetSourceType(confgroup.TypeDiscovered). + SetSource("test"). + SetName(name)) + } + + return &confgroup.Group{ + Source: "test", + Configs: configs, + } +} + +func newMockDiscoverer(tags string, tggs ...model.TargetGroup) *mockDiscoverer { + return &mockDiscoverer{ + tags: mustParseTags(tags), + tggs: tggs, + } +} + +func newDelayedMockDiscoverer(tags string, delay int, tggs ...model.TargetGroup) *mockDiscoverer { + return &mockDiscoverer{ + tags: mustParseTags(tags), + tggs: tggs, + delay: time.Duration(delay) * time.Second, + } +} + +type mockDiscoverer struct { + tggs []model.TargetGroup + tags model.Tags + delay time.Duration +} + +func (md mockDiscoverer) String() string { + return "mock discoverer" +} + +func (md mockDiscoverer) Discover(ctx context.Context, out chan<- []model.TargetGroup) { + for _, tgg := range md.tggs { + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(md.tags) + } + } + + select { + case <-ctx.Done(): + case <-time.After(md.delay): + select { + case <-ctx.Done(): + case out <- md.tggs: + } + } +} + +func newMockTargetGroup(source string, targets ...string) *mockTargetGroup { + m := &mockTargetGroup{source: source} + for _, name := range targets { + m.targets = append(m.targets, &mockTarget{Name: name}) + } + return m +} + +type mockTargetGroup struct { + targets []model.Target + source string +} + +func (mg mockTargetGroup) Targets() []model.Target { return mg.targets } +func (mg mockTargetGroup) Source() string { return mg.source } +func (mg mockTargetGroup) Provider() string { return "mock" } + +func newMockTarget(name string, tags ...string) *mockTarget { + m := &mockTarget{Name: name} + v, _ := model.ParseTags(strings.Join(tags, " ")) + m.Tags().Merge(v) + return m +} + +type mockTarget struct { + model.Base + Name string +} + +func (mt mockTarget) TUID() string { return mt.Name } +func (mt mockTarget) Hash() uint64 { return mustCalcHash(mt.Name) } + +func mustParseTags(line string) model.Tags { + v, err := model.ParseTags(line) + if err != nil { + panic(fmt.Sprintf("mustParseTags: %v", err)) + } + return v +} + +func mustCalcHash(obj any) uint64 { + hash, err := hashstructure.Hash(obj, nil) + if err != nil { + panic(fmt.Sprintf("hash calculation: %v", err)) + } + return hash +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go new file mode 100644 index 000000000..646e1abb1 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go @@ -0,0 +1,662 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +// https://github.com/prometheus/prometheus/wiki/Default-port-allocations +var prometheusPortAllocations = map[int]string{ + 2019: "caddy", + 3000: "grafana", + 3100: "loki", + 5555: "prometheus-jdbc-exporter", + 6060: "crowdsec", + 7300: "midonet_agent", + 8001: "netbox", + 8088: "fawkes", + 8089: "prom2teams", + 8292: "phabricator_webhook_for_alertmanager", + 8404: "ha_proxy_v2_plus", + 9042: "rds_exporter", + 9087: "telegram_bot_for_alertmanager", + 9091: "pushgateway", + 9097: "jiralert", + 9101: "haproxy_exporter", + 9102: "statsd_exporter", + 9103: "collectd_exporter", + 9104: "mysqld_exporter", + 9105: "mesos_exporter", + 9106: "cloudwatch_exporter", + 9107: "consul_exporter", + 9108: "graphite_exporter", + 9109: "graphite_exporter", + 9110: "blackbox_exporter", + 9111: "expvar_exporter", + 9112: "promacct_pcap-based_network_traffic_accounting", + 9113: "nginx_exporter", + 9114: "elasticsearch_exporter", + 9115: "blackbox_exporter", + 9116: "snmp_exporter", + 9117: "apache_exporter", + 9118: "jenkins_exporter", + 9119: "bind_exporter", + 9120: "powerdns_exporter", + 9121: "redis_exporter", + 9122: "influxdb_exporter", + 9123: "rethinkdb_exporter", + 9124: "freebsd_sysctl_exporter", + 9125: "statsd_exporter", + 9126: "new_relic_exporter", + 9127: "pgbouncer_exporter", + 9128: "ceph_exporter", + 9129: "haproxy_log_exporter", + 9130: "unifi_poller", + 9131: "varnish_exporter", + 9132: "airflow_exporter", + 9133: "fritz_box_exporter", + 9134: "zfs_exporter", + 9135: "rtorrent_exporter", + 9136: "collins_exporter", + 9137: "silicondust_hdhomerun_exporter", + 9138: "heka_exporter", + 9139: "azure_sql_exporter", + 9140: "mirth_exporter", + 9141: "zookeeper_exporter", + 9142: "big-ip_exporter", + 9143: "cloudmonitor_exporter", + 9145: "aerospike_exporter", + 9146: "icecast_exporter", + 9147: "nginx_request_exporter", + 9148: "nats_exporter", + 9149: "passenger_exporter", + 9150: "memcached_exporter", + 9151: "varnish_request_exporter", + 9152: "command_runner_exporter", + 9154: "postfix_exporter", + 9155: "vsphere_graphite", + 9156: "webdriver_exporter", + 9157: "ibm_mq_exporter", + 9158: "pingdom_exporter", + 9160: "apache_flink_exporter", + 9161: "oracle_db_exporter", + 9162: "apcupsd_exporter", + 9163: "zgres_exporter", + 9164: "s6_exporter", + 9165: "keepalived_exporter", + 9166: "dovecot_exporter", + 9167: "unbound_exporter", + 9168: "gitlab-monitor", + 9169: "lustre_exporter", + 9170: "docker_hub_exporter", + 9171: "github_exporter", + 9172: "script_exporter", + 9173: "rancher_exporter", + 9174: "docker-cloud_exporter", + 9175: "saltstack_exporter", + 9176: "openvpn_exporter", + 9177: "libvirt_exporter", + 9178: "stream_exporter", + 9179: "shield_exporter", + 9180: "scylladb_exporter", + 9181: "openstack_ceilometer_exporter", + 9183: "openstack_exporter", + 9184: "twitch_exporter", + 9185: "kafka_topic_exporter", + 9186: "cloud_foundry_firehose_exporter", + 9187: "postgresql_exporter", + 9188: "crypto_exporter", + 9189: "hetzner_cloud_csi_driver_nodes", + 9190: "bosh_exporter", + 9191: "netflow_exporter", + 9192: "ceph_exporter", + 9193: "cloud_foundry_exporter", + 9194: "bosh_tsdb_exporter", + 9195: "maxscale_exporter", + 9196: "upnp_internet_gateway_device_exporter", + 9198: "logstash_exporter", + 9199: "cloudflare_exporter", + 9202: "pacemaker_exporter", + 9203: "domain_exporter", + 9204: "pcsensor_temper_exporter", + 9205: "nextcloud_exporter", + 9206: "elasticsearch_exporter", + 9207: "mysql_exporter", + 9208: "kafka_consumer_group_exporter", + 9209: "fastnetmon_advanced_exporter", + 9210: "netatmo_exporter", + 9211: "dnsbl-exporter", + 9212: "digitalocean_exporter", + 9213: "custom_exporter", + 9214: "mqtt_blackbox_exporter", + 9215: "prometheus_graphite_bridge", + 9216: "mongodb_exporter", + 9217: "consul_agent_exporter", + 9218: "promql-guard", + 9219: "ssl_certificate_exporter", + 9220: "netapp_trident_exporter", + 9221: "proxmox_ve_exporter", + 9222: "aws_ecs_exporter", + 9223: "bladepsgi_exporter", + 9224: "fluentd_exporter", + 9225: "mailexporter", + 9226: "allas", + 9227: "proc_exporter", + 9228: "flussonic_exporter", + 9229: "gitlab-workhorse", + 9230: "network_ups_tools_exporter", + 9231: "solr_exporter", + 9232: "osquery_exporter", + 9233: "mgmt_exporter", + 9234: "mosquitto_exporter", + 9235: "gitlab-pages_exporter", + 9236: "gitlab_gitaly_exporter", + 9237: "sql_exporter", + 9238: "uwsgi_expoter", + 9239: "surfboard_exporter", + 9240: "tinyproxy_exporter", + 9241: "arangodb_exporter", + 9242: "ceph_radosgw_usage_exporter", + 9243: "chef_compliance_exporter", + 9244: "moby_container_exporter", + 9245: "naemon_nagios_exporter", + 9246: "smartpi", + 9247: "sphinx_exporter", + 9248: "freebsd_gstat_exporter", + 9249: "apache_flink_metrics_reporter", + 9250: "opentsdb_exporter", + 9251: "sensu_exporter", + 9252: "gitlab_runner_exporter", + 9253: "php-fpm_exporter", + 9254: "kafka_burrow_exporter", + 9255: "google_stackdriver_exporter", + 9256: "td-agent_exporter", + 9257: "smart_exporter", + 9258: "hello_sense_exporter", + 9259: "azure_resources_exporter", + 9260: "buildkite_exporter", + 9261: "grafana_exporter", + 9262: "bloomsky_exporter", + 9263: "vmware_guest_exporter", + 9264: "nest_exporter", + 9265: "weather_exporter", + 9266: "openhab_exporter", + 9267: "nagios_livestatus_exporter", + 9268: "cratedb_remote_remote_read_write_adapter", + 9269: "fluent-agent-lite_exporter", + 9270: "jmeter_exporter", + 9271: "pagespeed_exporter", + 9272: "vmware_exporter", + 9274: "kubernetes_persistentvolume_disk_usage_exporter", + 9275: "nrpe_exporter", + 9276: "azure_monitor_exporter", + 9277: "mongo_collection_exporter", + 9278: "crypto_miner_exporter", + 9279: "instaclustr_exporter", + 9280: "citrix_netscaler_exporter", + 9281: "fastd_exporter", + 9282: "freeswitch_exporter", + 9283: "ceph_ceph-mgr_prometheus_plugin", + 9284: "gobetween", + 9285: "database_exporter", + 9286: "vdo_compression_and_deduplication_exporter", + 9287: "ceph_iscsi_gateway_statistics", + 9288: "consrv", + 9289: "lovoos_ipmi_exporter", + 9290: "soundclouds_ipmi_exporter", + 9291: "ibm_z_hmc_exporter", + 9292: "netapp_ontap_api_exporter", + 9293: "connection_status_exporter", + 9294: "miflora_flower_care_exporter", + 9295: "freifunk_exporter", + 9296: "odbc_exporter", + 9297: "machbase_exporter", + 9298: "generic_exporter", + 9299: "exporter_aggregator", + 9301: "squid_exporter", + 9302: "faucet_sdn_faucet_exporter", + 9303: "faucet_sdn_gauge_exporter", + 9304: "logstash_exporter", + 9305: "go-ethereum_exporter", + 9306: "kyototycoon_exporter", + 9307: "audisto_exporter", + 9308: "kafka_exporter", + 9309: "fluentd_exporter", + 9310: "open_vswitch_exporter", + 9311: "iota_exporter", + 9313: "cloudprober_exporter", + 9314: "eris_exporter", + 9315: "centrifugo_exporter", + 9316: "tado_exporter", + 9317: "tellstick_local_exporter", + 9318: "conntrack_exporter", + 9319: "flexlm_exporter", + 9320: "consul_telemetry_exporter", + 9321: "spring_boot_actuator_exporter", + 9322: "haproxy_abuser_exporter", + 9323: "docker_prometheus_metrics", + 9324: "bird_routing_daemon_exporter", + 9325: "ovirt_exporter", + 9326: "junos_exporter", + 9327: "s3_exporter", + 9328: "openldap_syncrepl_exporter", + 9329: "cups_exporter", + 9330: "openldap_metrics_exporter", + 9331: "influx-spout_prometheus_metrics", + 9332: "network_exporter", + 9333: "vault_pki_exporter", + 9334: "ejabberd_exporter", + 9335: "nexsan_exporter", + 9336: "mediacom_internet_usage_exporter", + 9337: "mqttgateway", + 9339: "aws_s3_exporter", + 9340: "financial_quotes_exporter", + 9341: "slurm_exporter", + 9342: "frr_exporter", + 9343: "gridserver_exporter", + 9344: "mqtt_exporter", + 9345: "ruckus_smartzone_exporter", + 9346: "ping_exporter", + 9347: "junos_exporter", + 9348: "bigquery_exporter", + 9349: "configurable_elasticsearch_query_exporter", + 9350: "thousandeyes_exporter", + 9351: "wal-e_wal-g_exporter", + 9352: "nature_remo_exporter", + 9353: "ceph_exporter", + 9354: "deluge_exporter", + 9355: "nightwatchjs_exporter", + 9356: "pacemaker_exporter", + 9357: "p1_exporter", + 9358: "performance_counters_exporter", + 9359: "sidekiq_prometheus", + 9360: "powershell_exporter", + 9361: "scaleway_sd_exporter", + 9362: "cisco_exporter", + // Netdata has clickhouse collector. + // CH itself exposes messy Prometheus metrics: camelCase names, appends instances to names instead of labels. + //9363: "clickhouse", + 9364: "continent8_exporter", + 9365: "cumulus_linux_exporter", + 9366: "haproxy_stick_table_exporter", + 9367: "teamspeak3_exporter", + 9368: "ethereum_client_exporter", + 9369: "prometheus_pushprox", + 9370: "u-bmc", + 9371: "conntrack-stats-exporter", + 9372: "appmetrics_prometheus", + 9373: "gcp_service_discovery", + 9374: "smokeping_prober", + 9375: "particle_exporter", + 9376: "falco", + 9377: "cisco_aci_exporter", + 9378: "etcd_grpc_proxy_exporter", + 9379: "etcd_exporter", + 9380: "mythtv_exporter", + 9381: "kafka_zookeeper_exporter", + 9382: "frrouting_exporter", + 9383: "aws_health_exporter", + 9384: "aws_sqs_exporter", + 9385: "apcupsdexporter", + 9386: "tankerkönig_api_exporter", + 9387: "sabnzbd_exporter", + 9388: "linode_exporter", + 9389: "scylla-cluster-tests_exporter", + 9390: "kannel_exporter", + 9391: "concourse_prometheus_metrics", + 9392: "generic_command_line_output_exporter", + 9393: "alertmanager_github_webhook_receiver", + 9394: "ruby_prometheus_exporter", + 9395: "ldap_exporter", + 9396: "monerod_exporter", + 9397: "comap", + 9398: "open_hardware_monitor_exporter", + 9399: "prometheus_sql_exporter", + 9400: "ripe_atlas_exporter", + 9401: "1-wire_exporter", + 9402: "google_cloud_platform_exporter", + 9403: "zerto_exporter", + 9404: "jmx_exporter", + 9405: "discourse_exporter", + 9406: "hhvm_exporter", + 9407: "obs_studio_exporter", + 9408: "rds_enhanced_monitoring_exporter", + 9409: "ovn-kubernetes_master_exporter", + 9410: "ovn-kubernetes_node_exporter", + 9411: "softether_exporter", + 9412: "sentry_exporter", + 9413: "mogilefs_exporter", + 9414: "homey_exporter", + 9415: "cloudwatch_read_adapter", + 9416: "hp_ilo_metrics_exporter", + 9417: "ethtool_exporter", + 9418: "gearman_exporter", + 9419: "rabbitmq_exporter", + 9420: "couchbase_exporter", + 9421: "apicast", + 9422: "jolokia_exporter", + 9423: "hp_raid_exporter", + 9424: "influxdb_stats_exporter", + 9425: "pachyderm_exporter", + 9426: "vespa_engine_exporter", + 9427: "ping_exporter", + 9428: "ssh_exporter", + 9429: "uptimerobot_exporter", + 9430: "corerad", + 9431: "hpfeeds_broker_exporter", + 9432: "windows_perflib_exporter", + 9433: "knot_exporter", + 9434: "opensips_exporter", + 9435: "ebpf_exporter", + 9436: "mikrotik-exporter", + 9437: "dell_emc_isilon_exporter", + 9438: "dell_emc_ecs_exporter", + 9439: "bitcoind_exporter", + 9440: "ravendb_exporter", + 9441: "nomad_exporter", + 9442: "mcrouter_exporter", + 9444: "foundationdb_exporter", + 9445: "nvidia_gpu_exporter", + 9446: "orange_livebox_dsl_modem_exporter", + 9447: "resque_exporter", + 9448: "eventstore_exporter", + 9449: "omeroserver_exporter", + 9450: "habitat_exporter", + 9451: "reindexer_exporter", + 9452: "freebsd_jail_exporter", + 9453: "midonet-kubernetes", + 9454: "nvidia_smi_exporter", + 9455: "iptables_exporter", + 9456: "aws_lambda_exporter", + 9457: "files_content_exporter", + 9458: "rocketchat_exporter", + 9459: "yarn_exporter", + 9460: "hana_exporter", + 9461: "aws_lambda_read_adapter", + 9462: "php_opcache_exporter", + 9463: "virgin_media_liberty_global_hub3_exporter", + 9464: "opencensus-nodejs_prometheus_exporter", + 9465: "hetzner_cloud_k8s_cloud_controller_manager", + 9466: "mqtt_push_gateway", + 9467: "nginx-prometheus-shiny-exporter", + 9468: "nasa-swpc-exporter", + 9469: "script_exporter", + 9470: "cachet_exporter", + 9471: "lxc-exporter", + 9472: "hetzner_cloud_csi_driver_controller", + 9473: "stellar-core-exporter", + 9474: "libvirtd_exporter", + 9475: "wgipamd", + 9476: "ovn_metrics_exporter", + 9477: "csp_violation_report_exporter", + 9478: "sentinel_exporter", + 9479: "elasticbeat_exporter", + 9480: "brigade_exporter", + 9481: "drbd9_exporter", + 9482: "vector_packet_process_vpp_exporter", + 9483: "ibm_app_connect_enterprise_exporter", + 9484: "kubedex-exporter", + 9485: "emarsys_exporter", + 9486: "domoticz_exporter", + 9487: "docker_stats_exporter", + 9488: "bmw_connected_drive_exporter", + 9489: "tezos_node_metrics_exporter", + 9490: "exporter_for_docker_libnetwork_plugin_for_ovn", + 9491: "docker_container_stats_exporter_docker_ps", + 9492: "azure_exporter_monitor_and_usage", + 9493: "prosafe_exporter", + 9494: "kamailio_exporter", + 9495: "ingestor_exporter", + 9496: "389ds_ipa_exporter", + 9497: "immudb_exporter", + 9498: "tp-link_hs110_exporter", + 9499: "smartthings_exporter", + 9500: "cassandra_exporter", + 9501: "hetznercloud_exporter", + 9502: "hetzner_exporter", + 9503: "scaleway_exporter", + 9504: "github_exporter", + 9505: "dockerhub_exporter", + 9506: "jenkins_exporter", + 9507: "owncloud_exporter", + 9508: "ccache_exporter", + 9509: "hetzner_storagebox_exporter", + 9510: "dummy_exporter", + 9512: "cloudera_exporter", + 9513: "openconfig_streaming_telemetry_exporter", + 9514: "app_stores_exporter", + 9515: "swarm-exporter", + 9516: "prometheus_speedtest_exporter", + 9517: "matroschka_prober", + 9518: "crypto_stock_exchanges_funds_exporter", + 9519: "acurite_exporter", + 9520: "swift_health_exporter", + 9521: "ruuvi_exporter", + 9522: "tftp_exporter", + 9523: "3cx_exporter", + 9524: "loki_exporter", + 9525: "alibaba_cloud_exporter", + 9526: "kafka_lag_exporter", + 9527: "netgear_cable_modem_exporter", + 9528: "total_connect_comfort_exporter", + 9529: "octoprint_exporter", + 9530: "custom_prometheus_exporter", + 9531: "jfrog_artifactory_exporter", + 9532: "snyk_exporter", + 9533: "network_exporter_for_cisco_api", + 9534: "humio_exporter", + 9535: "cron_exporter", + 9536: "ipsec_exporter", + 9537: "cri-o", + 9538: "bull_queue", + 9539: "modemmanager_exporter", + 9540: "emq_exporter", + 9541: "smartmon_exporter", + 9542: "sakuracloud_exporter", + 9543: "kube2iam_exporter", + 9544: "pgio_exporter", + 9545: "hp_ilo4_exporter", + 9546: "pwrstat-exporter", + 9547: "patroni_exporter", + 9548: "trafficserver_exporter", + 9549: "raspberry_exporter", + 9550: "rtl_433_exporter", + 9551: "hostapd_exporter", + 9552: "aws_elastic_beanstalk_exporter", + 9553: "apt_exporter", + 9554: "acc_server_manager_exporter", + 9555: "sona_exporter", + 9556: "routinator_exporter", + 9557: "mysql_count_exporter", + 9558: "systemd_exporter", + 9559: "ntp_exporter", + 9560: "sql_queries_exporter", + 9561: "qbittorrent_exporter", + 9562: "ptv_xserver_exporter", + 9563: "kibana_exporter", + 9564: "purpleair_exporter", + 9565: "bminer_exporter", + 9566: "rabbitmq_cli_consumer", + 9567: "alertsnitch", + 9568: "dell_poweredge_ipmi_exporter", + 9569: "hvpa_controller", + 9570: "vpa_exporter", + 9571: "helm_exporter", + 9572: "ctld_exporter", + 9573: "jkstatus_exporter", + 9574: "opentracker_exporter", + 9575: "poweradmin_server_monitor_exporter", + 9576: "exabgp_exporter", + 9578: "aria2_exporter", + 9579: "iperf3_exporter", + 9580: "azure_service_bus_exporter", + 9581: "codenotary_vcn_exporter", + 9583: "signatory_a_remote_operation_signer_for_tezos", + 9584: "bunnycdn_exporter", + 9585: "opvizor_performance_analyzer_process_exporter", + 9586: "wireguard_exporter", + 9587: "nfs-ganesha_exporter", + 9588: "ltsv-tailer_exporter", + 9589: "goflow_exporter", + 9590: "flow_exporter", + 9591: "srcds_exporter", + 9592: "gcp_quota_exporter", + 9593: "lighthouse_exporter", + 9594: "plex_exporter", + 9595: "netio_exporter", + 9596: "azure_elastic_sql_exporter", + 9597: "github_vulnerability_alerts_exporter", + 9599: "pirograph_exporter", + 9600: "circleci_exporter", + 9601: "messagebird_exporter", + 9602: "modbus_exporter", + 9603: "xen_exporter_using_xenlight", + 9604: "xmpp_blackbox_exporter", + 9605: "fping-exporter", + 9606: "ecr-exporter", + 9607: "raspberry_pi_sense_hat_exporter", + 9608: "ironic_prometheus_exporter", + 9609: "netapp_exporter", + 9610: "kubernetes_exporter", + 9611: "speedport_exporter", + 9612: "opflex-agent_exporter", + 9613: "azure_health_exporter", + 9614: "nut_upsc_exporter", + 9615: "mellanox_mlx5_exporter", + 9616: "mailgun_exporter", + 9617: "pi-hole_exporter", + 9618: "stellar-account-exporter", + 9619: "stellar-horizon-exporter", + 9620: "rundeck_exporter", + 9621: "opennebula_exporter", + 9622: "bmc_exporter", + 9623: "tc4400_exporter", + 9624: "pact_broker_exporter", + 9625: "bareos_exporter", + 9626: "hockeypuck", + 9627: "artifactory_exporter", + 9628: "solace_pubsub_plus_exporter", + 9629: "prometheus_gitlab_notifier", + 9630: "nftables_exporter", + 9631: "a_op5_monitor_exporter", + 9632: "opflex-server_exporter", + 9633: "smartctl_exporter", + 9634: "aerospike_ttl_exporter", + 9635: "fail2ban_exporter", + 9636: "exim4_exporter", + 9637: "kubeversion_exporter", + 9638: "a_icinga2_exporter", + 9639: "scriptable_jmx_exporter", + 9640: "logstash_output_exporter", + 9641: "coturn_exporter", + 9642: "bugsnag_exporter", + 9644: "exporter_for_grouped_process", + 9645: "burp_exporter", + 9646: "locust_exporter", + 9647: "docker_exporter", + 9648: "ntpmon_exporter", + 9649: "logstash_exporter", + 9650: "keepalived_exporter", + 9651: "storj_exporter", + 9652: "praefect_exporter", + 9653: "jira_issues_exporter", + 9654: "ansible_galaxy_exporter", + 9655: "kube-netc_exporter", + 9656: "matrix", + 9657: "krill_exporter", + 9658: "sap_hana_sql_exporter", + 9660: "kaiterra_laser_egg_exporter", + 9661: "hashpipe_exporter", + 9662: "pms5003_particulate_matter_sensor_exporter", + 9663: "sap_nwrfc_exporter", + 9664: "linux_ha_clusterlabs_exporter", + 9665: "senderscore_exporter", + 9666: "alertmanager_silences_exporter", + 9667: "smtpd_exporter", + 9668: "suses_sap_hana_exporter", + 9669: "panopticon_native_metrics", + 9670: "flare_native_metrics", + 9671: "aws_ec2_spot_exporter", + 9672: "aircontrol_co2_exporter", + 9673: "co2_monitor_exporter", + 9674: "google_analytics_exporter", + 9675: "docker_swarm_exporter", + 9676: "hetzner_traffic_exporter", + 9677: "aws_ecs_exporter", + 9678: "ircd_user_exporter", + 9679: "aws_health_exporter", + 9680: "suses_sap_host_exporter", + 9681: "myfitnesspal_exporter", + 9682: "powder_monkey", + 9683: "infiniband_exporter", + 9684: "kibana_standalone_exporter", + 9685: "eideticom", + 9686: "aws_ec2_exporter", + 9687: "gitaly_blackbox_exporter", + 9689: "lan_server_modbus_exporter", + 9690: "tcp_longterm_connection_exporter", + 9691: "celery_redis_exporter", + 9692: "gcp_gce_exporter", + 9693: "sigma_air_manager_exporter", + 9694: "per-user_usage_exporter_for_cisco_xe_lnss", + 9695: "cifs_exporter", + 9696: "jitsi_videobridge_exporter", + 9697: "tendermint_blockchain_exporter", + 9698: "integrated_dell_remote_access_controller_idrac_exporter", + 9699: "pyncette_exporter", + 9700: "jitsi_meet_exporter", + 9701: "workbook_exporter", + 9702: "homeplug_plc_exporter", + 9703: "vircadia", + 9704: "linux_tc_exporter", + 9705: "upc_connect_box_exporter", + 9706: "postfix_exporter", + 9707: "radarr_exporter", + 9708: "sonarr_exporter", + 9709: "hadoop_hdfs_fsimage_exporter", + 9710: "nut-exporter", + 9711: "cloudflare_flan_scan_report_exporter", + 9712: "siemens_s7_plc_exporter", + 9713: "glusterfs_exporter", + 9714: "fritzbox_exporter", + 9715: "twincat_ads_web_service_exporter", + 9716: "signald_webhook_receiver", + 9717: "tplink_easysmart_switch_exporter", + 9718: "warp10_exporter", + 9719: "pgpool-ii_exporter", + 9720: "moodle_db_exporter", + 9721: "gtp_exporter", + 9722: "miele_exporter", + 9724: "freeswitch_exporter", + 9725: "sunnyboy_exporter", + 9726: "python_rq_exporter", + 9727: "ctdb_exporter", + 9728: "nginx-rtmp_exporter", + 9729: "libvirtd_exporter", + 9730: "lynis_exporter", + 9731: "nebula_mam_exporter", + 9732: "nftables_exporter", + 9733: "honeypot_exporter", + 9734: "a10-networks_prometheus_exporter", + 9735: "webweaver", + 9736: "mongodb_query_exporter", + 9737: "folding_home_exporter", + 9738: "processor_counter_monitor_exporter", + 9739: "kafka_consumer_lag_monitoring", + 9740: "flightdeck", + 9741: "ibm_spectrum_exporter", + 9742: "transmission-exporter", + 9743: "sma-exporter", + 9803: "site24x7_exporter", + 9901: "envoy_proxy", + 9913: "nginx_vts_exporter", + 9943: "filestat_exporter", + 9980: "login_exporter", + 9983: "sia_exporter", + 9984: "couchdb_exporter", + 9987: "netapp_solidfire_exporter", + 9990: "wildfly_exporter", + 16995: "storidge_exporter", + 19091: "transmission_exporter", + 24231: "fluent_plugin_for_prometheus", + 42004: "proxysql_exporter", + 44323: "pcp_exporter", + 61091: "dcos_exporter", +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/selector.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/selector.go new file mode 100644 index 000000000..cdd2cf000 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/selector.go @@ -0,0 +1,154 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "errors" + "fmt" + "strings" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" +) + +type selector interface { + matches(model.Tags) bool +} + +type ( + exactSelector string + trueSelector struct{} + negSelector struct{ selector } + orSelector struct{ lhs, rhs selector } + andSelector struct{ lhs, rhs selector } +) + +func (s exactSelector) matches(tags model.Tags) bool { _, ok := tags[string(s)]; return ok } +func (s trueSelector) matches(model.Tags) bool { return true } +func (s negSelector) matches(tags model.Tags) bool { return !s.selector.matches(tags) } +func (s orSelector) matches(tags model.Tags) bool { return s.lhs.matches(tags) || s.rhs.matches(tags) } +func (s andSelector) matches(tags model.Tags) bool { return s.lhs.matches(tags) && s.rhs.matches(tags) } + +func (s exactSelector) String() string { return "{" + string(s) + "}" } +func (s negSelector) String() string { return "{!" + stringify(s.selector) + "}" } +func (s trueSelector) String() string { return "{*}" } +func (s orSelector) String() string { return "{" + stringify(s.lhs) + "|" + stringify(s.rhs) + "}" } +func (s andSelector) String() string { return "{" + stringify(s.lhs) + ", " + stringify(s.rhs) + "}" } +func stringify(sr selector) string { return strings.Trim(fmt.Sprintf("%s", sr), "{}") } + +func parseSelector(line string) (sr selector, err error) { + words := strings.Fields(line) + if len(words) == 0 { + return trueSelector{}, nil + } + + var srs []selector + for _, word := range words { + if idx := strings.IndexByte(word, '|'); idx > 0 { + sr, err = parseOrSelectorWord(word) + } else { + sr, err = parseSingleSelectorWord(word) + } + if err != nil { + return nil, fmt.Errorf("selector '%s' contains selector '%s' with forbidden symbol", line, word) + } + srs = append(srs, sr) + } + + switch len(srs) { + case 0: + return trueSelector{}, nil + case 1: + return srs[0], nil + default: + return newAndSelector(srs[0], srs[1], srs[2:]...), nil + } +} + +func parseOrSelectorWord(orWord string) (sr selector, err error) { + var srs []selector + for _, word := range strings.Split(orWord, "|") { + if sr, err = parseSingleSelectorWord(word); err != nil { + return nil, err + } + srs = append(srs, sr) + } + switch len(srs) { + case 0: + return trueSelector{}, nil + case 1: + return srs[0], nil + default: + return newOrSelector(srs[0], srs[1], srs[2:]...), nil + } +} + +func parseSingleSelectorWord(word string) (selector, error) { + if len(word) == 0 { + return nil, errors.New("empty word") + } + neg := word[0] == '!' + if neg { + word = word[1:] + } + if len(word) == 0 { + return nil, errors.New("empty word") + } + if word != "*" && !isSelectorWordValid(word) { + return nil, errors.New("forbidden symbol") + } + + var sr selector + switch word { + case "*": + sr = trueSelector{} + default: + sr = exactSelector(word) + } + if neg { + return negSelector{sr}, nil + } + return sr, nil +} + +func newAndSelector(lhs, rhs selector, others ...selector) selector { + m := andSelector{lhs: lhs, rhs: rhs} + switch len(others) { + case 0: + return m + default: + return newAndSelector(m, others[0], others[1:]...) + } +} + +func newOrSelector(lhs, rhs selector, others ...selector) selector { + m := orSelector{lhs: lhs, rhs: rhs} + switch len(others) { + case 0: + return m + default: + return newOrSelector(m, others[0], others[1:]...) + } +} + +func isSelectorWordValid(word string) bool { + // valid: + // * + // ^[a-zA-Z][a-zA-Z0-9=_.]*$ + if len(word) == 0 { + return false + } + if word == "*" { + return true + } + for i, b := range word { + switch { + case b >= 'a' && b <= 'z': + case b >= 'A' && b <= 'Z': + case b >= '0' && b <= '9' && i > 0: + case (b == '=' || b == '_' || b == '.') && i > 0: + default: + return false + } + } + return true +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/selector_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/selector_test.go new file mode 100644 index 000000000..bed2150e2 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/selector_test.go @@ -0,0 +1,248 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "regexp" + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" +) + +var reSrString = regexp.MustCompile(`^{[^{}]+}$`) + +func TestTrueSelector_String(t *testing.T) { + var sr trueSelector + assert.Equal(t, "{*}", sr.String()) +} + +func TestExactSelector_String(t *testing.T) { + sr := exactSelector("selector") + + assert.True(t, reSrString.MatchString(sr.String())) +} + +func TestNegSelector_String(t *testing.T) { + srs := []selector{ + exactSelector("selector"), + negSelector{exactSelector("selector")}, + orSelector{ + lhs: exactSelector("selector"), + rhs: exactSelector("selector")}, + orSelector{ + lhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + }, + andSelector{ + lhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + }, + } + + for i, sr := range srs { + neg := negSelector{sr} + assert.True(t, reSrString.MatchString(neg.String()), "selector num %d", i+1) + } +} + +func TestOrSelector_String(t *testing.T) { + sr := orSelector{ + lhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + } + + assert.True(t, reSrString.MatchString(sr.String())) +} + +func TestAndSelector_String(t *testing.T) { + sr := andSelector{ + lhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + } + + assert.True(t, reSrString.MatchString(sr.String())) +} + +func TestExactSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []exactSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []exactSelector{ + "a", + "b", + }, + } + notMatchTests := struct { + tags model.Tags + srs []exactSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []exactSelector{ + "c", + "d", + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestNegSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []negSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []negSelector{ + {exactSelector("c")}, + {exactSelector("d")}, + }, + } + notMatchTests := struct { + tags model.Tags + srs []negSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []negSelector{ + {exactSelector("a")}, + {exactSelector("b")}, + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestOrSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []orSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []orSelector{ + { + lhs: orSelector{lhs: exactSelector("c"), rhs: exactSelector("d")}, + rhs: orSelector{lhs: exactSelector("e"), rhs: exactSelector("b")}, + }, + }, + } + notMatchTests := struct { + tags model.Tags + srs []orSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []orSelector{ + { + lhs: orSelector{lhs: exactSelector("c"), rhs: exactSelector("d")}, + rhs: orSelector{lhs: exactSelector("e"), rhs: exactSelector("f")}, + }, + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestAndSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []andSelector + }{ + tags: model.Tags{"a": {}, "b": {}, "c": {}, "d": {}}, + srs: []andSelector{ + { + lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}, + rhs: andSelector{lhs: exactSelector("c"), rhs: exactSelector("d")}, + }, + }, + } + notMatchTests := struct { + tags model.Tags + srs []andSelector + }{ + tags: model.Tags{"a": {}, "b": {}, "c": {}, "d": {}}, + srs: []andSelector{ + { + lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}, + rhs: andSelector{lhs: exactSelector("c"), rhs: exactSelector("z")}, + }, + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestParseSelector(t *testing.T) { + tests := map[string]struct { + wantSelector selector + wantErr bool + }{ + "": {wantSelector: trueSelector{}}, + "a": {wantSelector: exactSelector("a")}, + "Z": {wantSelector: exactSelector("Z")}, + "a_b": {wantSelector: exactSelector("a_b")}, + "a=b": {wantSelector: exactSelector("a=b")}, + "!a": {wantSelector: negSelector{exactSelector("a")}}, + "a b": {wantSelector: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}}, + "a|b": {wantSelector: orSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}}, + "*": {wantSelector: trueSelector{}}, + "!*": {wantSelector: negSelector{trueSelector{}}}, + "a b !c d|e f": { + wantSelector: andSelector{ + lhs: andSelector{ + lhs: andSelector{ + lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}, + rhs: negSelector{exactSelector("c")}, + }, + rhs: orSelector{ + lhs: exactSelector("d"), + rhs: exactSelector("e"), + }, + }, + rhs: exactSelector("f"), + }, + }, + "!": {wantErr: true}, + "a !": {wantErr: true}, + "a!b": {wantErr: true}, + "0a": {wantErr: true}, + "a b c*": {wantErr: true}, + "__": {wantErr: true}, + "a|b|c*": {wantErr: true}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sr, err := parseSelector(name) + + if test.wantErr { + assert.Nil(t, sr) + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.wantSelector, sr) + } + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/sim_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/sim_test.go new file mode 100644 index 000000000..657009478 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/sim_test.go @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +type discoverySim struct { + config string + discoverers []model.Discoverer + wantClassifyCalls int + wantComposeCalls int + wantConfGroups []*confgroup.Group +} + +func (sim discoverySim) run(t *testing.T) { + t.Helper() + + var cfg Config + err := yaml.Unmarshal([]byte(sim.config), &cfg) + require.Nilf(t, err, "cfg unmarshal") + + clr, err := newTargetClassificator(cfg.Classify) + require.Nil(t, err, "newTargetClassificator") + + cmr, err := newConfigComposer(cfg.Compose) + require.Nil(t, err, "newConfigComposer") + + mockClr := &mockClassificator{clr: clr} + mockCmr := &mockComposer{cmr: cmr} + + accum := newAccumulator() + accum.sendEvery = time.Second * 2 + + pl := &Pipeline{ + Logger: logger.New(), + discoverers: sim.discoverers, + accum: accum, + clr: mockClr, + cmr: mockCmr, + configs: make(map[string]map[uint64][]confgroup.Config), + } + + pl.accum.Logger = pl.Logger + clr.Logger = pl.Logger + cmr.Logger = pl.Logger + + groups := sim.collectGroups(t, pl) + + sortConfigGroups(groups) + sortConfigGroups(sim.wantConfGroups) + + assert.Equal(t, sim.wantConfGroups, groups) + assert.Equalf(t, sim.wantClassifyCalls, mockClr.calls, "classify calls") + assert.Equalf(t, sim.wantComposeCalls, mockCmr.calls, "compose calls") +} + +func (sim discoverySim) collectGroups(t *testing.T, pl *Pipeline) []*confgroup.Group { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + in := make(chan []*confgroup.Group) + done := make(chan struct{}) + + go func() { defer close(done); pl.Run(ctx, in) }() + + timeout := time.Second * 10 + var groups []*confgroup.Group + + func() { + for { + select { + case inGroups := <-in: + groups = append(groups, inGroups...) + case <-done: + return + case <-time.After(timeout): + t.Logf("discovery timed out after %s, got %d groups, expected %d, some events are skipped", + timeout, len(groups), len(sim.wantConfGroups)) + return + } + } + }() + + return groups +} + +type mockClassificator struct { + calls int + clr *targetClassificator +} + +func (m *mockClassificator) classify(tgt model.Target) model.Tags { + m.calls++ + return m.clr.classify(tgt) +} + +type mockComposer struct { + calls int + cmr *configComposer +} + +func (m *mockComposer) compose(tgt model.Target) []confgroup.Config { + m.calls++ + return m.cmr.compose(tgt) +} + +func sortConfigGroups(groups []*confgroup.Group) { + sort.Slice(groups, func(i, j int) bool { + return groups[i].Source < groups[j].Source + }) + + for _, g := range groups { + sort.Slice(g.Configs, func(i, j int) bool { + return g.Configs[i].Name() < g.Configs[j].Name() + }) + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/sd.go b/src/go/plugin/go.d/agent/discovery/sd/sd.go new file mode 100644 index 000000000..687ebfba8 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/sd.go @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package sd + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/pipeline" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/multipath" + + "gopkg.in/yaml.v2" +) + +type Config struct { + ConfigDefaults confgroup.Registry + ConfDir multipath.MultiPath +} + +func NewServiceDiscovery(cfg Config) (*ServiceDiscovery, error) { + log := logger.New().With( + slog.String("component", "service discovery"), + ) + + d := &ServiceDiscovery{ + Logger: log, + confProv: newConfFileReader(log, cfg.ConfDir), + configDefaults: cfg.ConfigDefaults, + newPipeline: func(config pipeline.Config) (sdPipeline, error) { + return pipeline.New(config) + }, + pipelines: make(map[string]func()), + } + + return d, nil +} + +type ( + ServiceDiscovery struct { + *logger.Logger + + confProv confFileProvider + + configDefaults confgroup.Registry + newPipeline func(config pipeline.Config) (sdPipeline, error) + pipelines map[string]func() + } + sdPipeline interface { + Run(ctx context.Context, in chan<- []*confgroup.Group) + } + confFileProvider interface { + run(ctx context.Context) + configs() chan confFile + } +) + +func (d *ServiceDiscovery) String() string { + return "service discovery" +} + +func (d *ServiceDiscovery) Run(ctx context.Context, in chan<- []*confgroup.Group) { + d.Info("instance is started") + defer func() { d.cleanup(); d.Info("instance is stopped") }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); d.confProv.run(ctx) }() + + wg.Add(1) + go func() { defer wg.Done(); d.run(ctx, in) }() + + wg.Wait() + <-ctx.Done() +} + +func (d *ServiceDiscovery) run(ctx context.Context, in chan<- []*confgroup.Group) { + for { + select { + case <-ctx.Done(): + return + case cfg := <-d.confProv.configs(): + if cfg.source == "" { + continue + } + if len(cfg.content) == 0 { + d.removePipeline(cfg) + } else { + d.addPipeline(ctx, cfg, in) + } + } + } +} + +func (d *ServiceDiscovery) removePipeline(conf confFile) { + if stop, ok := d.pipelines[conf.source]; ok { + d.Infof("received an empty config, stopping the pipeline ('%s')", conf.source) + delete(d.pipelines, conf.source) + stop() + } +} + +func (d *ServiceDiscovery) addPipeline(ctx context.Context, conf confFile, in chan<- []*confgroup.Group) { + var cfg pipeline.Config + + if err := yaml.Unmarshal(conf.content, &cfg); err != nil { + d.Error(err) + return + } + + if cfg.Disabled { + d.Infof("pipeline config is disabled '%s' (%s)", cfg.Name, cfg.Source) + return + } + + cfg.Source = fmt.Sprintf("file=%s", conf.source) + cfg.ConfigDefaults = d.configDefaults + + pl, err := d.newPipeline(cfg) + if err != nil { + d.Error(err) + return + } + + if stop, ok := d.pipelines[conf.source]; ok { + stop() + } + + var wg sync.WaitGroup + plCtx, cancel := context.WithCancel(ctx) + + wg.Add(1) + go func() { defer wg.Done(); pl.Run(plCtx, in) }() + + stop := func() { cancel(); wg.Wait() } + d.pipelines[conf.source] = stop +} + +func (d *ServiceDiscovery) cleanup() { + for _, stop := range d.pipelines { + stop() + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/sd_test.go b/src/go/plugin/go.d/agent/discovery/sd/sd_test.go new file mode 100644 index 000000000..4269bfd3a --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/sd_test.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package sd + +import ( + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/pipeline" + + "gopkg.in/yaml.v2" +) + +func TestServiceDiscovery_Run(t *testing.T) { + tests := map[string]discoverySim{ + "add pipeline": { + configs: []confFile{ + prepareConfigFile("source", "name"), + }, + wantPipelines: []*mockPipeline{ + {name: "name", started: true, stopped: false}, + }, + }, + "add disabled pipeline": { + configs: []confFile{ + prepareDisabledConfigFile("source", "name"), + }, + wantPipelines: nil, + }, + "remove pipeline": { + configs: []confFile{ + prepareConfigFile("source", "name"), + prepareEmptyConfigFile("source"), + }, + wantPipelines: []*mockPipeline{ + {name: "name", started: true, stopped: true}, + }, + }, + "re-add pipeline multiple times": { + configs: []confFile{ + prepareConfigFile("source", "name"), + prepareConfigFile("source", "name"), + prepareConfigFile("source", "name"), + }, + wantPipelines: []*mockPipeline{ + {name: "name", started: true, stopped: true}, + {name: "name", started: true, stopped: true}, + {name: "name", started: true, stopped: false}, + }, + }, + "restart pipeline": { + configs: []confFile{ + prepareConfigFile("source", "name1"), + prepareConfigFile("source", "name2"), + }, + wantPipelines: []*mockPipeline{ + {name: "name1", started: true, stopped: true}, + {name: "name2", started: true, stopped: false}, + }, + }, + "invalid pipeline config": { + configs: []confFile{ + prepareConfigFile("source", "invalid"), + }, + wantPipelines: nil, + }, + "invalid config for running pipeline": { + configs: []confFile{ + prepareConfigFile("source", "name"), + prepareConfigFile("source", "invalid"), + }, + wantPipelines: []*mockPipeline{ + {name: "name", started: true, stopped: false}, + }, + }, + } + + for name, sim := range tests { + t.Run(name, func(t *testing.T) { + sim.run(t) + }) + } +} + +func prepareConfigFile(source, name string) confFile { + bs, _ := yaml.Marshal(pipeline.Config{Name: name}) + + return confFile{ + source: source, + content: bs, + } +} + +func prepareEmptyConfigFile(source string) confFile { + return confFile{ + source: source, + } +} + +func prepareDisabledConfigFile(source, name string) confFile { + bs, _ := yaml.Marshal(pipeline.Config{Name: name, Disabled: true}) + + return confFile{ + source: source, + content: bs, + } +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/sim_test.go b/src/go/plugin/go.d/agent/discovery/sd/sim_test.go new file mode 100644 index 000000000..930c40125 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/sim_test.go @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package sd + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/pipeline" + + "github.com/stretchr/testify/assert" +) + +var lock = &sync.Mutex{} + +type discoverySim struct { + configs []confFile + wantPipelines []*mockPipeline +} + +func (sim *discoverySim) run(t *testing.T) { + fact := &mockFactory{} + mgr := &ServiceDiscovery{ + Logger: logger.New(), + newPipeline: func(config pipeline.Config) (sdPipeline, error) { + return fact.create(config) + }, + confProv: &mockConfigProvider{ + confFiles: sim.configs, + ch: make(chan confFile), + }, + pipelines: make(map[string]func()), + } + + in := make(chan<- []*confgroup.Group) + done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { defer close(done); mgr.Run(ctx, in) }() + + time.Sleep(time.Second * 3) + + lock.Lock() + assert.Equalf(t, sim.wantPipelines, fact.pipelines, "before stop") + lock.Unlock() + + cancel() + + timeout := time.Second * 5 + + select { + case <-done: + lock.Lock() + for _, pl := range fact.pipelines { + assert.Truef(t, pl.stopped, "pipeline '%s' is not stopped after cancel()", pl.name) + } + lock.Unlock() + case <-time.After(timeout): + t.Errorf("sd failed to exit in %s", timeout) + } +} + +type mockConfigProvider struct { + confFiles []confFile + ch chan confFile +} + +func (m *mockConfigProvider) run(ctx context.Context) { + for _, conf := range m.confFiles { + select { + case <-ctx.Done(): + return + case m.ch <- conf: + } + } + <-ctx.Done() +} + +func (m *mockConfigProvider) configs() chan confFile { + return m.ch +} + +type mockFactory struct { + pipelines []*mockPipeline +} + +func (m *mockFactory) create(cfg pipeline.Config) (sdPipeline, error) { + lock.Lock() + defer lock.Unlock() + + if cfg.Name == "invalid" { + return nil, errors.New("mock sdPipelineFactory.create() error") + } + + pl := mockPipeline{name: cfg.Name} + m.pipelines = append(m.pipelines, &pl) + + return &pl, nil +} + +type mockPipeline struct { + name string + started bool + stopped bool +} + +func (m *mockPipeline) Run(ctx context.Context, _ chan<- []*confgroup.Group) { + lock.Lock() + m.started = true + lock.Unlock() + defer func() { lock.Lock(); m.stopped = true; lock.Unlock() }() + <-ctx.Done() +} diff --git a/src/go/plugin/go.d/agent/discovery/sim_test.go b/src/go/plugin/go.d/agent/discovery/sim_test.go new file mode 100644 index 000000000..b20344c3c --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sim_test.go @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package discovery + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type discoverySim struct { + mgr *Manager + collectDelay time.Duration + expectedGroups []*confgroup.Group +} + +func (sim discoverySim) run(t *testing.T) { + t.Helper() + require.NotNil(t, sim.mgr) + + in, out := make(chan []*confgroup.Group), make(chan []*confgroup.Group) + go sim.collectGroups(t, in, out) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go sim.mgr.Run(ctx, in) + + actualGroups := <-out + + sortGroups(sim.expectedGroups) + sortGroups(actualGroups) + + assert.Equal(t, sim.expectedGroups, actualGroups) +} + +func (sim discoverySim) collectGroups(t *testing.T, in, out chan []*confgroup.Group) { + time.Sleep(sim.collectDelay) + + timeout := sim.mgr.sendEvery + time.Second*2 + var groups []*confgroup.Group +loop: + for { + select { + case inGroups := <-in: + if groups = append(groups, inGroups...); len(groups) >= len(sim.expectedGroups) { + break loop + } + case <-time.After(timeout): + t.Logf("discovery %s timed out after %s, got %d groups, expected %d, some events are skipped", + sim.mgr.discoverers, timeout, len(groups), len(sim.expectedGroups)) + break loop + } + } + out <- groups +} + +func sortGroups(groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + sort.Slice(groups, func(i, j int) bool { return groups[i].Source < groups[j].Source }) +} |