summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/cache.go38
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/config.go29
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/dummy/config.go24
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery.go76
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery_test.go109
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/config.go25
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go104
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go25
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/parse.go142
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go431
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/read.go98
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go116
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go130
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/watch.go220
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go378
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/manager.go199
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/manager_test.go177
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/conffile.go69
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go241
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go162
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go162
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go55
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go34
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go268
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go160
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go434
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go648
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go209
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go456
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go137
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go326
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go169
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go167
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go41
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/model/discoverer.go11
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags.go87
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags_test.go3
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/model/target.go15
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go152
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go132
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go83
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go157
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go92
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go136
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go63
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go81
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go236
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go303
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go662
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go154
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go248
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go130
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go147
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/sd_test.go106
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go118
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sim_test.go67
56 files changed, 9242 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/cache.go b/src/go/collectors/go.d.plugin/agent/discovery/cache.go
new file mode 100644
index 000000000..31802aa91
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/cache.go
@@ -0,0 +1,38 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package discovery
+
+import (
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+)
+
+type cache map[string]*confgroup.Group // [Source]
+
+func newCache() *cache {
+ return &cache{}
+}
+
+func (c cache) update(groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ for _, group := range groups {
+ if group != nil {
+ c[group.Source] = group
+ }
+ }
+}
+
+func (c cache) reset() {
+ for key := range c {
+ delete(c, key)
+ }
+}
+
+func (c cache) groups() []*confgroup.Group {
+ groups := make([]*confgroup.Group, 0, len(c))
+ for _, group := range c {
+ groups = append(groups, group)
+ }
+ return groups
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/config.go b/src/go/collectors/go.d.plugin/agent/discovery/config.go
new file mode 100644
index 000000000..6cbd2db1e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/config.go
@@ -0,0 +1,29 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package discovery
+
+import (
+ "errors"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/dummy"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/file"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd"
+)
+
+type Config struct {
+ Registry confgroup.Registry
+ File file.Config
+ Dummy dummy.Config
+ SD sd.Config
+}
+
+func validateConfig(cfg Config) error {
+ if len(cfg.Registry) == 0 {
+ return errors.New("empty config registry")
+ }
+ if len(cfg.File.Read)+len(cfg.File.Watch) == 0 && len(cfg.Dummy.Names) == 0 {
+ return errors.New("discoverers not set")
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/dummy/config.go b/src/go/collectors/go.d.plugin/agent/discovery/dummy/config.go
new file mode 100644
index 000000000..4da80a8dc
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/dummy/config.go
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dummy
+
+import (
+ "errors"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+)
+
+type Config struct {
+ Registry confgroup.Registry
+ Names []string
+}
+
+func validateConfig(cfg Config) error {
+ if len(cfg.Registry) == 0 {
+ return errors.New("empty config registry")
+ }
+ if len(cfg.Names) == 0 {
+ return errors.New("names not set")
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery.go b/src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery.go
new file mode 100644
index 000000000..fed257b2f
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery.go
@@ -0,0 +1,76 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dummy
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func NewDiscovery(cfg Config) (*Discovery, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("config validation: %v", err)
+ }
+ d := &Discovery{
+ Logger: logger.New().With(
+ slog.String("component", "discovery"),
+ slog.String("discoverer", "dummy"),
+ ),
+ reg: cfg.Registry,
+ names: cfg.Names,
+ }
+ return d, nil
+}
+
+type Discovery struct {
+ *logger.Logger
+
+ reg confgroup.Registry
+ names []string
+}
+
+func (d *Discovery) String() string {
+ return d.Name()
+}
+
+func (d *Discovery) Name() string {
+ return "dummy discovery"
+}
+
+func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ select {
+ case <-ctx.Done():
+ case in <- d.groups():
+ }
+
+ close(in)
+}
+
+func (d *Discovery) groups() []*confgroup.Group {
+ group := &confgroup.Group{Source: "internal"}
+
+ for _, name := range d.names {
+ def, ok := d.reg.Lookup(name)
+ if !ok {
+ continue
+ }
+ src := "internal"
+ cfg := confgroup.Config{}
+ cfg.SetModule(name)
+ cfg.SetProvider("dummy")
+ cfg.SetSourceType(confgroup.TypeStock)
+ cfg.SetSource(src)
+ cfg.ApplyDefaults(def)
+
+ group.Configs = append(group.Configs, cfg)
+ }
+
+ return []*confgroup.Group{group}
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery_test.go b/src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery_test.go
new file mode 100644
index 000000000..e42ee2041
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/dummy/discovery_test.go
@@ -0,0 +1,109 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dummy
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewDiscovery(t *testing.T) {
+ tests := map[string]struct {
+ cfg Config
+ wantErr bool
+ }{
+ "valid config": {
+ cfg: Config{
+ Registry: confgroup.Registry{"module1": confgroup.Default{}},
+ Names: []string{"module1", "module2"},
+ },
+ },
+ "invalid config, registry not set": {
+ cfg: Config{
+ Names: []string{"module1", "module2"},
+ },
+ wantErr: true,
+ },
+ "invalid config, names not set": {
+ cfg: Config{
+ Names: []string{"module1", "module2"},
+ },
+ wantErr: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ d, err := NewDiscovery(test.cfg)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ assert.NotNil(t, d)
+ }
+ })
+ }
+}
+
+func TestDiscovery_Run(t *testing.T) {
+ expected := []*confgroup.Group{
+ {
+ Source: "internal",
+ Configs: []confgroup.Config{
+ {
+ "name": "module1",
+ "module": "module1",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "dummy",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": "internal",
+ },
+ {
+ "name": "module2",
+ "module": "module2",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "dummy",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": "internal",
+ },
+ },
+ },
+ }
+
+ reg := confgroup.Registry{
+ "module1": {},
+ "module2": {},
+ }
+ cfg := Config{
+ Registry: reg,
+ Names: []string{"module1", "module2"},
+ }
+
+ discovery, err := NewDiscovery(cfg)
+ require.NoError(t, err)
+
+ in := make(chan []*confgroup.Group)
+ timeout := time.Second * 2
+
+ go discovery.Run(context.Background(), in)
+
+ var actual []*confgroup.Group
+ select {
+ case actual = <-in:
+ case <-time.After(timeout):
+ t.Logf("discovery timed out after %s", timeout)
+ }
+ assert.Equal(t, expected, actual)
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/config.go b/src/go/collectors/go.d.plugin/agent/discovery/file/config.go
new file mode 100644
index 000000000..cc19ee445
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/config.go
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "errors"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+)
+
+type Config struct {
+ Registry confgroup.Registry
+ Read []string
+ Watch []string
+}
+
+func validateConfig(cfg Config) error {
+ if len(cfg.Registry) == 0 {
+ return errors.New("empty config registry")
+ }
+ if len(cfg.Read)+len(cfg.Watch) == 0 {
+ return errors.New("discoverers not set")
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
new file mode 100644
index 000000000..97b437fc3
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
@@ -0,0 +1,104 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "sync"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+var log = logger.New().With(
+ slog.String("component", "discovery"),
+ slog.String("discoverer", "file"),
+)
+
+func NewDiscovery(cfg Config) (*Discovery, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("file discovery config validation: %v", err)
+ }
+
+ d := Discovery{
+ Logger: log,
+ }
+
+ if err := d.registerDiscoverers(cfg); err != nil {
+ return nil, fmt.Errorf("file discovery initialization: %v", err)
+ }
+
+ return &d, nil
+}
+
+type (
+ Discovery struct {
+ *logger.Logger
+ discoverers []discoverer
+ }
+ discoverer interface {
+ Run(ctx context.Context, in chan<- []*confgroup.Group)
+ }
+)
+
+func (d *Discovery) String() string {
+ return d.Name()
+}
+
+func (d *Discovery) Name() string {
+ return fmt.Sprintf("file discovery: %v", d.discoverers)
+}
+
+func (d *Discovery) registerDiscoverers(cfg Config) error {
+ if len(cfg.Read) != 0 {
+ d.discoverers = append(d.discoverers, NewReader(cfg.Registry, cfg.Read))
+ }
+ if len(cfg.Watch) != 0 {
+ d.discoverers = append(d.discoverers, NewWatcher(cfg.Registry, cfg.Watch))
+ }
+ if len(d.discoverers) == 0 {
+ return errors.New("zero registered discoverers")
+ }
+ return nil
+}
+
+func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ for _, dd := range d.discoverers {
+ wg.Add(1)
+ go func(dd discoverer) {
+ defer wg.Done()
+ d.runDiscoverer(ctx, dd, in)
+ }(dd)
+ }
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (d *Discovery) runDiscoverer(ctx context.Context, dd discoverer, in chan<- []*confgroup.Group) {
+ updates := make(chan []*confgroup.Group)
+ go dd.Run(ctx, updates)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case groups, ok := <-updates:
+ if !ok {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case in <- groups:
+ }
+ }
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go
new file mode 100644
index 000000000..2bdb669eb
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery_test.go
@@ -0,0 +1,25 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+// TODO: tech dept
+func TestNewDiscovery(t *testing.T) {
+
+}
+
+// TODO: tech dept
+func TestDiscovery_Run(t *testing.T) {
+
+}
+
+func prepareDiscovery(t *testing.T, cfg Config) *Discovery {
+ d, err := NewDiscovery(cfg)
+ require.NoError(t, err)
+ return d
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go b/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go
new file mode 100644
index 000000000..412d2b73e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/parse.go
@@ -0,0 +1,142 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+
+ "gopkg.in/yaml.v2"
+)
+
+type format int
+
+const (
+ unknownFormat format = iota
+ unknownEmptyFormat
+ staticFormat
+ sdFormat
+)
+
+func parse(req confgroup.Registry, path string) (*confgroup.Group, error) {
+ bs, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ if len(bs) == 0 {
+ return nil, nil
+ }
+
+ switch cfgFormat(bs) {
+ case staticFormat:
+ return parseStaticFormat(req, path, bs)
+ case sdFormat:
+ return parseSDFormat(req, path, bs)
+ case unknownEmptyFormat:
+ return nil, nil
+ default:
+ return nil, fmt.Errorf("unknown file format: '%s'", path)
+ }
+}
+
+func parseStaticFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) {
+ name := fileName(path)
+ // TODO: properly handle module renaming
+ // See agent/setup.go buildDiscoveryConf() for details
+ if name == "wmi" {
+ name = "windows"
+ }
+ modDef, ok := reg.Lookup(name)
+ if !ok {
+ return nil, nil
+ }
+
+ var modCfg staticConfig
+ if err := yaml.Unmarshal(bs, &modCfg); err != nil {
+ return nil, err
+ }
+
+ for _, cfg := range modCfg.Jobs {
+ cfg.SetModule(name)
+ def := mergeDef(modCfg.Default, modDef)
+ cfg.ApplyDefaults(def)
+ }
+
+ group := &confgroup.Group{
+ Configs: modCfg.Jobs,
+ Source: path,
+ }
+
+ return group, nil
+}
+
+func parseSDFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.Group, error) {
+ var cfgs sdConfig
+ if err := yaml.Unmarshal(bs, &cfgs); err != nil {
+ return nil, err
+ }
+
+ var i int
+ for _, cfg := range cfgs {
+ if def, ok := reg.Lookup(cfg.Module()); ok && cfg.Module() != "" {
+ cfg.ApplyDefaults(def)
+ cfgs[i] = cfg
+ i++
+ }
+ }
+
+ group := &confgroup.Group{
+ Configs: cfgs[:i],
+ Source: path,
+ }
+
+ return group, nil
+}
+
+func cfgFormat(bs []byte) format {
+ var data interface{}
+ if err := yaml.Unmarshal(bs, &data); err != nil {
+ return unknownFormat
+ }
+ if data == nil {
+ return unknownEmptyFormat
+ }
+
+ type (
+ static = map[any]any
+ sd = []any
+ )
+ switch data.(type) {
+ case static:
+ return staticFormat
+ case sd:
+ return sdFormat
+ default:
+ return unknownFormat
+ }
+}
+
+func mergeDef(a, b confgroup.Default) confgroup.Default {
+ return confgroup.Default{
+ MinUpdateEvery: firstPositive(a.MinUpdateEvery, b.MinUpdateEvery),
+ UpdateEvery: firstPositive(a.UpdateEvery, b.UpdateEvery),
+ AutoDetectionRetry: firstPositive(a.AutoDetectionRetry, b.AutoDetectionRetry),
+ Priority: firstPositive(a.Priority, b.Priority),
+ }
+}
+
+func firstPositive(value int, others ...int) int {
+ if value > 0 || len(others) == 0 {
+ return value
+ }
+ return firstPositive(others[0], others[1:]...)
+}
+
+func fileName(path string) string {
+ _, file := filepath.Split(path)
+ ext := filepath.Ext(path)
+ return file[:len(file)-len(ext)]
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go
new file mode 100644
index 000000000..8b20210ff
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/parse_test.go
@@ -0,0 +1,431 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestParse(t *testing.T) {
+ const (
+ jobDef = 11
+ cfgDef = 22
+ modDef = 33
+ )
+ tests := map[string]struct {
+ test func(t *testing.T, tmp *tmpDir)
+ }{
+ "static, default: +job +conf +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Default: confgroup.Default{
+ UpdateEvery: cfgDef,
+ AutoDetectionRetry: cfgDef,
+ Priority: cfgDef,
+ },
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: +job +conf +module (merge all)": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Default: confgroup.Default{
+ AutoDetectionRetry: cfgDef,
+ },
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ "update_every": jobDef,
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": jobDef,
+ "autodetection_retry": cfgDef,
+ "priority": modDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: -job +conf +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Default: confgroup.Default{
+ UpdateEvery: cfgDef,
+ AutoDetectionRetry: cfgDef,
+ Priority: cfgDef,
+ },
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": cfgDef,
+ "autodetection_retry": cfgDef,
+ "priority": cfgDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: -job -conf +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := staticConfig{
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "autodetection_retry": modDef,
+ "priority": modDef,
+ "update_every": modDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "static, default: -job -conf -module (+global)": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := staticConfig{
+ Jobs: []confgroup.Config{
+ {
+ "name": "name",
+ },
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "update_every": module.UpdateEvery,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, default: +job +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "sd_module",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "module": "sd_module",
+ "name": "name",
+ "update_every": jobDef,
+ "autodetection_retry": jobDef,
+ "priority": jobDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, default: -job +module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {
+ UpdateEvery: modDef,
+ AutoDetectionRetry: modDef,
+ Priority: modDef,
+ },
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "sd_module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "sd_module",
+ "update_every": modDef,
+ "autodetection_retry": modDef,
+ "priority": modDef,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, default: -job -module (+global)": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "sd_module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "sd_module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ },
+ },
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "sd, job has no 'module' or 'module' is empty": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{},
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "conf registry has no module": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "sd_module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ tmp.writeYAML(filename, cfg)
+
+ expected := &confgroup.Group{
+ Source: filename,
+ Configs: []confgroup.Config{},
+ }
+
+ group, err := parse(reg, filename)
+
+ require.NoError(t, err)
+ assert.Equal(t, expected, group)
+ },
+ },
+ "empty file": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+
+ filename := tmp.createFile("empty-*")
+ group, err := parse(reg, filename)
+
+ assert.Nil(t, group)
+ require.NoError(t, err)
+ },
+ },
+ "only comments, unknown empty format": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{}
+
+ filename := tmp.createFile("unknown-empty-format-*")
+ tmp.writeString(filename, "# a comment")
+ group, err := parse(reg, filename)
+
+ assert.Nil(t, group)
+ assert.NoError(t, err)
+ },
+ },
+ "unknown format": {
+ test: func(t *testing.T, tmp *tmpDir) {
+ reg := confgroup.Registry{}
+
+ filename := tmp.createFile("unknown-format-*")
+ tmp.writeYAML(filename, "unknown")
+ group, err := parse(reg, filename)
+
+ assert.Nil(t, group)
+ assert.Error(t, err)
+ },
+ },
+ }
+
+ for name, scenario := range tests {
+ t.Run(name, func(t *testing.T) {
+ tmp := newTmpDir(t, "parse-file-*")
+ defer tmp.cleanup()
+
+ scenario.test(t, tmp)
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/read.go b/src/go/collectors/go.d.plugin/agent/discovery/file/read.go
new file mode 100644
index 000000000..1b45b3767
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/read.go
@@ -0,0 +1,98 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+type (
+ staticConfig struct {
+ confgroup.Default `yaml:",inline"`
+ Jobs []confgroup.Config `yaml:"jobs"`
+ }
+ sdConfig []confgroup.Config
+)
+
+func NewReader(reg confgroup.Registry, paths []string) *Reader {
+ return &Reader{
+ Logger: log,
+ reg: reg,
+ paths: paths,
+ }
+}
+
+type Reader struct {
+ *logger.Logger
+
+ reg confgroup.Registry
+ paths []string
+}
+
+func (r *Reader) String() string {
+ return r.Name()
+}
+
+func (r *Reader) Name() string {
+ return "file reader"
+}
+
+func (r *Reader) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ r.Info("instance is started")
+ defer func() { r.Info("instance is stopped") }()
+
+ select {
+ case <-ctx.Done():
+ case in <- r.groups():
+ }
+
+ close(in)
+}
+
+func (r *Reader) groups() (groups []*confgroup.Group) {
+ for _, pattern := range r.paths {
+ matches, err := filepath.Glob(pattern)
+ if err != nil {
+ continue
+ }
+
+ for _, path := range matches {
+ if fi, err := os.Stat(path); err != nil || !fi.Mode().IsRegular() {
+ continue
+ }
+
+ group, err := parse(r.reg, path)
+ if err != nil {
+ r.Warningf("parse '%s': %v", path, err)
+ continue
+ }
+
+ if group == nil {
+ group = &confgroup.Group{Source: path}
+ } else {
+ for _, cfg := range group.Configs {
+ cfg.SetProvider("file reader")
+ cfg.SetSourceType(configSourceType(path))
+ cfg.SetSource(fmt.Sprintf("discoverer=file_reader,file=%s", path))
+ }
+ }
+ groups = append(groups, group)
+ }
+ }
+
+ return groups
+}
+
+func configSourceType(path string) string {
+ if strings.Contains(path, "/etc/netdata") {
+ return "user"
+ }
+ return "stock"
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go
new file mode 100644
index 000000000..d2404d54e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/read_test.go
@@ -0,0 +1,116 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReader_String(t *testing.T) {
+ assert.NotEmpty(t, NewReader(confgroup.Registry{}, nil))
+}
+
+func TestNewReader(t *testing.T) {
+ tests := map[string]struct {
+ reg confgroup.Registry
+ paths []string
+ }{
+ "empty inputs": {
+ reg: confgroup.Registry{},
+ paths: []string{},
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ assert.NotNil(t, NewReader(test.reg, test.paths))
+ })
+ }
+}
+
+func TestReader_Run(t *testing.T) {
+ tests := map[string]struct {
+ createSim func(tmp *tmpDir) discoverySim
+ }{
+ "read multiple files": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ module1 := tmp.join("module1.conf")
+ module2 := tmp.join("module2.conf")
+ module3 := tmp.join("module3.conf")
+
+ tmp.writeYAML(module1, staticConfig{
+ Jobs: []confgroup.Config{{"name": "name"}},
+ })
+ tmp.writeYAML(module2, staticConfig{
+ Jobs: []confgroup.Config{{"name": "name"}},
+ })
+ tmp.writeString(module3, "# a comment")
+
+ reg := confgroup.Registry{
+ "module1": {},
+ "module2": {},
+ "module3": {},
+ }
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Read: []string{module1, module2, module3},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: module1,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module1",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file reader",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module1),
+ },
+ },
+ },
+ {
+ Source: module2,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module2",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file reader",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_reader,file=%s", module2),
+ },
+ },
+ },
+ {
+ Source: module3,
+ },
+ }
+
+ return discoverySim{
+ discovery: discovery,
+ expectedGroups: expected,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ tmp := newTmpDir(t, "reader-run-*")
+ defer tmp.cleanup()
+
+ test.createSim(tmp).run(t)
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go
new file mode 100644
index 000000000..cd9fa05ac
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/sim_test.go
@@ -0,0 +1,130 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v2"
+)
+
+type (
+ discoverySim struct {
+ discovery *Discovery
+ beforeRun func()
+ afterRun func()
+ expectedGroups []*confgroup.Group
+ }
+)
+
+func (sim discoverySim) run(t *testing.T) {
+ t.Helper()
+ require.NotNil(t, sim.discovery)
+
+ if sim.beforeRun != nil {
+ sim.beforeRun()
+ }
+
+ in, out := make(chan []*confgroup.Group), make(chan []*confgroup.Group)
+ go sim.collectGroups(t, in, out)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ defer cancel()
+ go sim.discovery.Run(ctx, in)
+ time.Sleep(time.Millisecond * 250)
+
+ if sim.afterRun != nil {
+ sim.afterRun()
+ }
+
+ actual := <-out
+
+ sortGroups(actual)
+ sortGroups(sim.expectedGroups)
+
+ assert.Equal(t, sim.expectedGroups, actual)
+}
+
+func (sim discoverySim) collectGroups(t *testing.T, in, out chan []*confgroup.Group) {
+ timeout := time.Second * 5
+ var groups []*confgroup.Group
+loop:
+ for {
+ select {
+ case updates := <-in:
+ if groups = append(groups, updates...); len(groups) >= len(sim.expectedGroups) {
+ break loop
+ }
+ case <-time.After(timeout):
+ t.Logf("discovery %s timed out after %s, got %d groups, expected %d, some events are skipped",
+ sim.discovery.discoverers, timeout, len(groups), len(sim.expectedGroups))
+ break loop
+ }
+ }
+ out <- groups
+}
+
+type tmpDir struct {
+ dir string
+ t *testing.T
+}
+
+func newTmpDir(t *testing.T, pattern string) *tmpDir {
+ pattern = "netdata-go-test-discovery-file-" + pattern
+ dir, err := os.MkdirTemp(os.TempDir(), pattern)
+ require.NoError(t, err)
+ return &tmpDir{dir: dir, t: t}
+}
+
+func (d *tmpDir) cleanup() {
+ assert.NoError(d.t, os.RemoveAll(d.dir))
+}
+
+func (d *tmpDir) join(filename string) string {
+ return filepath.Join(d.dir, filename)
+}
+
+func (d *tmpDir) createFile(pattern string) string {
+ f, err := os.CreateTemp(d.dir, pattern)
+ require.NoError(d.t, err)
+ _ = f.Close()
+ return f.Name()
+}
+
+func (d *tmpDir) removeFile(filename string) {
+ err := os.Remove(filename)
+ require.NoError(d.t, err)
+}
+
+func (d *tmpDir) renameFile(origFilename, newFilename string) {
+ err := os.Rename(origFilename, newFilename)
+ require.NoError(d.t, err)
+}
+
+func (d *tmpDir) writeYAML(filename string, in interface{}) {
+ bs, err := yaml.Marshal(in)
+ require.NoError(d.t, err)
+ err = os.WriteFile(filename, bs, 0644)
+ require.NoError(d.t, err)
+}
+
+func (d *tmpDir) writeString(filename, data string) {
+ err := os.WriteFile(filename, []byte(data), 0644)
+ require.NoError(d.t, err)
+}
+
+func sortGroups(groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ sort.Slice(groups, func(i, j int) bool { return groups[i].Source < groups[j].Source })
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
new file mode 100644
index 000000000..a723b706e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/fsnotify/fsnotify"
+)
+
+type (
+ Watcher struct {
+ *logger.Logger
+
+ paths []string
+ reg confgroup.Registry
+ watcher *fsnotify.Watcher
+ cache cache
+ refreshEvery time.Duration
+ }
+ cache map[string]time.Time
+)
+
+func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok }
+func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok }
+func (c cache) remove(path string) { delete(c, path) }
+func (c cache) put(path string, modTime time.Time) { c[path] = modTime }
+
+func NewWatcher(reg confgroup.Registry, paths []string) *Watcher {
+ d := &Watcher{
+ Logger: log,
+ paths: paths,
+ reg: reg,
+ watcher: nil,
+ cache: make(cache),
+ refreshEvery: time.Minute,
+ }
+ return d
+}
+
+func (w *Watcher) String() string {
+ return w.Name()
+}
+
+func (w *Watcher) Name() string {
+ return "file watcher"
+}
+
+func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ w.Info("instance is started")
+ defer func() { w.Info("instance is stopped") }()
+
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ w.Errorf("fsnotify watcher initialization: %v", err)
+ return
+ }
+
+ w.watcher = watcher
+ defer w.stop()
+ w.refresh(ctx, in)
+
+ tk := time.NewTicker(w.refreshEvery)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ w.refresh(ctx, in)
+ case event := <-w.watcher.Events:
+ // TODO: check if event.Has will do
+ if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) {
+ break
+ }
+ if event.Has(fsnotify.Create) && w.cache.has(event.Name) {
+ // vim "backupcopy=no" case, already collected after Rename event.
+ break
+ }
+ if event.Has(fsnotify.Rename) {
+ // It is common to modify files using vim.
+ // When writing to a file a backup is made. "backupcopy" option tells how it's done.
+ // Default is "no": rename the file and write a new one.
+ // This is cheap attempt to not send empty group for the old file.
+ time.Sleep(time.Millisecond * 100)
+ }
+ w.refresh(ctx, in)
+ case err := <-w.watcher.Errors:
+ if err != nil {
+ w.Warningf("watch: %v", err)
+ }
+ }
+ }
+}
+
+func (w *Watcher) fileMatches(file string) bool {
+ for _, pattern := range w.paths {
+ if ok, _ := filepath.Match(pattern, file); ok {
+ return true
+ }
+ }
+ return false
+}
+
+func (w *Watcher) listFiles() (files []string) {
+ for _, pattern := range w.paths {
+ if matches, err := filepath.Glob(pattern); err == nil {
+ files = append(files, matches...)
+ }
+ }
+ return files
+}
+
+func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ var groups []*confgroup.Group
+ seen := make(map[string]bool)
+
+ for _, file := range w.listFiles() {
+ fi, err := os.Lstat(file)
+ if err != nil {
+ w.Warningf("lstat '%s': %v", file, err)
+ continue
+ }
+
+ if !fi.Mode().IsRegular() {
+ continue
+ }
+
+ seen[file] = true
+ if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) {
+ continue
+ }
+ w.cache.put(file, fi.ModTime())
+
+ if group, err := parse(w.reg, file); err != nil {
+ w.Warningf("parse '%s': %v", file, err)
+ } else if group == nil {
+ groups = append(groups, &confgroup.Group{Source: file})
+ } else {
+ for _, cfg := range group.Configs {
+ cfg.SetProvider("file watcher")
+ cfg.SetSourceType(configSourceType(file))
+ cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file))
+ }
+ groups = append(groups, group)
+ }
+ }
+
+ for name := range w.cache {
+ if seen[name] {
+ continue
+ }
+ w.cache.remove(name)
+ groups = append(groups, &confgroup.Group{Source: name})
+ }
+
+ send(ctx, in, groups)
+
+ w.watchDirs()
+}
+
+func (w *Watcher) watchDirs() {
+ for _, path := range w.paths {
+ if idx := strings.LastIndex(path, "/"); idx > -1 {
+ path = path[:idx]
+ } else {
+ path = "./"
+ }
+ if err := w.watcher.Add(path); err != nil {
+ w.Errorf("start watching '%s': %v", path, err)
+ }
+ }
+}
+
+func (w *Watcher) stop() {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // closing the watcher deadlocks unless all events and errors are drained.
+ go func() {
+ for {
+ select {
+ case <-w.watcher.Errors:
+ case <-w.watcher.Events:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ _ = w.watcher.Close()
+}
+
+func isChmodOnly(event fsnotify.Event) bool {
+ return event.Op^fsnotify.Chmod == 0
+}
+
+func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ case in <- groups:
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go
new file mode 100644
index 000000000..20e21e65e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch_test.go
@@ -0,0 +1,378 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWatcher_String(t *testing.T) {
+ assert.NotEmpty(t, NewWatcher(confgroup.Registry{}, nil))
+}
+
+func TestNewWatcher(t *testing.T) {
+ tests := map[string]struct {
+ reg confgroup.Registry
+ paths []string
+ }{
+ "empty inputs": {
+ reg: confgroup.Registry{},
+ paths: []string{},
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ assert.NotNil(t, NewWatcher(test.reg, test.paths))
+ })
+ }
+}
+
+func TestWatcher_Run(t *testing.T) {
+ tests := map[string]struct {
+ createSim func(tmp *tmpDir) discoverySim
+ }{
+ "file exists before start": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "empty file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeString(filename, "")
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "only comments, no data": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeString(filename, "# a comment")
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "add file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ afterRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "remove file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ {
+ Source: filename,
+ Configs: nil,
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ afterRun: func() {
+ tmp.removeFile(filename)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "change file": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfgOrig := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ cfgChanged := sdConfig{
+ {
+ "name": "name_changed",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name_changed",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfgOrig)
+ },
+ afterRun: func() {
+ tmp.writeYAML(filename, cfgChanged)
+ time.Sleep(time.Millisecond * 500)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ "vim 'backupcopy=no' (writing to a file and backup)": {
+ createSim: func(tmp *tmpDir) discoverySim {
+ reg := confgroup.Registry{
+ "module": {},
+ }
+ cfg := sdConfig{
+ {
+ "name": "name",
+ "module": "module",
+ },
+ }
+ filename := tmp.join("module.conf")
+ discovery := prepareDiscovery(t, Config{
+ Registry: reg,
+ Watch: []string{tmp.join("*.conf")},
+ })
+ expected := []*confgroup.Group{
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": confgroup.TypeStock,
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ {
+ Source: filename,
+ Configs: []confgroup.Config{
+ {
+ "name": "name",
+ "module": "module",
+ "update_every": module.UpdateEvery,
+ "autodetection_retry": module.AutoDetectionRetry,
+ "priority": module.Priority,
+ "__provider__": "file watcher",
+ "__source_type__": "stock",
+ "__source__": fmt.Sprintf("discoverer=file_watcher,file=%s", filename),
+ },
+ },
+ },
+ }
+
+ sim := discoverySim{
+ discovery: discovery,
+ beforeRun: func() {
+ tmp.writeYAML(filename, cfg)
+ },
+ afterRun: func() {
+ newFilename := filename + ".swp"
+ tmp.renameFile(filename, newFilename)
+ tmp.writeYAML(filename, cfg)
+ tmp.removeFile(newFilename)
+ time.Sleep(time.Millisecond * 500)
+ },
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ tmp := newTmpDir(t, "watch-run-*")
+ defer tmp.cleanup()
+
+ test.createSim(tmp).run(t)
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/manager.go b/src/go/collectors/go.d.plugin/agent/discovery/manager.go
new file mode 100644
index 000000000..ac9ee2211
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/manager.go
@@ -0,0 +1,199 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package discovery
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/dummy"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/file"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func NewManager(cfg Config) (*Manager, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("discovery manager config validation: %v", err)
+ }
+
+ mgr := &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "discovery manager"),
+ ),
+ send: make(chan struct{}, 1),
+ sendEvery: time.Second * 2, // timeout to aggregate changes
+ discoverers: make([]discoverer, 0),
+ mux: &sync.RWMutex{},
+ cache: newCache(),
+ }
+
+ if err := mgr.registerDiscoverers(cfg); err != nil {
+ return nil, fmt.Errorf("discovery manager initializaion: %v", err)
+ }
+
+ return mgr, nil
+}
+
+type discoverer interface {
+ Run(ctx context.Context, in chan<- []*confgroup.Group)
+}
+
+type Manager struct {
+ *logger.Logger
+ discoverers []discoverer
+ send chan struct{}
+ sendEvery time.Duration
+ mux *sync.RWMutex
+ cache *cache
+}
+
+func (m *Manager) String() string {
+ return fmt.Sprintf("discovery manager: %v", m.discoverers)
+}
+
+func (m *Manager) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ for _, d := range m.discoverers {
+ wg.Add(1)
+ go func(d discoverer) {
+ defer wg.Done()
+ m.runDiscoverer(ctx, d)
+ }(d)
+ }
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ m.sendLoop(ctx, in)
+ }()
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (m *Manager) registerDiscoverers(cfg Config) error {
+ if len(cfg.File.Read) > 0 || len(cfg.File.Watch) > 0 {
+ cfg.File.Registry = cfg.Registry
+ d, err := file.NewDiscovery(cfg.File)
+ if err != nil {
+ return err
+ }
+ m.discoverers = append(m.discoverers, d)
+ }
+
+ if len(cfg.Dummy.Names) > 0 {
+ cfg.Dummy.Registry = cfg.Registry
+ d, err := dummy.NewDiscovery(cfg.Dummy)
+ if err != nil {
+ return err
+ }
+ m.discoverers = append(m.discoverers, d)
+ }
+
+ if len(cfg.SD.ConfDir) != 0 {
+ cfg.SD.ConfigDefaults = cfg.Registry
+ d, err := sd.NewServiceDiscovery(cfg.SD)
+ if err != nil {
+ return err
+ }
+ m.discoverers = append(m.discoverers, d)
+ }
+
+ if len(m.discoverers) == 0 {
+ return errors.New("zero registered discoverers")
+ }
+
+ m.Infof("registered discoverers: %v", m.discoverers)
+
+ return nil
+}
+
+func (m *Manager) runDiscoverer(ctx context.Context, d discoverer) {
+ updates := make(chan []*confgroup.Group)
+ go d.Run(ctx, updates)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case groups, ok := <-updates:
+ if !ok {
+ return
+ }
+ func() {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ m.cache.update(groups)
+ m.triggerSend()
+ }()
+ }
+ }
+}
+
+func (m *Manager) sendLoop(ctx context.Context, in chan<- []*confgroup.Group) {
+ m.mustSend(ctx, in)
+
+ tk := time.NewTicker(m.sendEvery)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ select {
+ case <-m.send:
+ m.trySend(in)
+ default:
+ }
+ }
+ }
+}
+
+func (m *Manager) mustSend(ctx context.Context, in chan<- []*confgroup.Group) {
+ select {
+ case <-ctx.Done():
+ return
+ case <-m.send:
+ m.mux.Lock()
+ groups := m.cache.groups()
+ m.cache.reset()
+ m.mux.Unlock()
+
+ select {
+ case <-ctx.Done():
+ case in <- groups:
+ }
+ return
+ }
+}
+
+func (m *Manager) trySend(in chan<- []*confgroup.Group) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ select {
+ case in <- m.cache.groups():
+ m.cache.reset()
+ default:
+ m.triggerSend()
+ }
+}
+
+func (m *Manager) triggerSend() {
+ select {
+ case m.send <- struct{}{}:
+ default:
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/manager_test.go b/src/go/collectors/go.d.plugin/agent/discovery/manager_test.go
new file mode 100644
index 000000000..665fe5611
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/manager_test.go
@@ -0,0 +1,177 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package discovery
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/file"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestNewManager(t *testing.T) {
+ tests := map[string]struct {
+ cfg Config
+ wantErr bool
+ }{
+ "valid config": {
+ cfg: Config{
+ Registry: confgroup.Registry{"module1": confgroup.Default{}},
+ File: file.Config{Read: []string{"path"}},
+ },
+ },
+ "invalid config, registry not set": {
+ cfg: Config{
+ File: file.Config{Read: []string{"path"}},
+ },
+ wantErr: true,
+ },
+ "invalid config, discoverers not set": {
+ cfg: Config{
+ Registry: confgroup.Registry{"module1": confgroup.Default{}},
+ },
+ wantErr: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ mgr, err := NewManager(test.cfg)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ assert.NotNil(t, mgr)
+ }
+ })
+ }
+}
+
+func TestManager_Run(t *testing.T) {
+ tests := map[string]func() discoverySim{
+ "several discoverers, unique groups with delayed collect": func() discoverySim {
+ const numGroups, numCfgs = 2, 2
+ d1 := prepareMockDiscoverer("test1", numGroups, numCfgs)
+ d2 := prepareMockDiscoverer("test2", numGroups, numCfgs)
+ mgr := prepareManager(d1, d2)
+ expected := combineGroups(d1.groups, d2.groups)
+
+ sim := discoverySim{
+ mgr: mgr,
+ collectDelay: mgr.sendEvery + time.Second,
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ "several discoverers, unique groups": func() discoverySim {
+ const numGroups, numCfgs = 2, 2
+ d1 := prepareMockDiscoverer("test1", numGroups, numCfgs)
+ d2 := prepareMockDiscoverer("test2", numGroups, numCfgs)
+ mgr := prepareManager(d1, d2)
+ expected := combineGroups(d1.groups, d2.groups)
+ sim := discoverySim{
+ mgr: mgr,
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ "several discoverers, same groups": func() discoverySim {
+ const numGroups, numTargets = 2, 2
+ d1 := prepareMockDiscoverer("test1", numGroups, numTargets)
+ mgr := prepareManager(d1, d1)
+ expected := combineGroups(d1.groups)
+
+ sim := discoverySim{
+ mgr: mgr,
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ "several discoverers, empty groups": func() discoverySim {
+ const numGroups, numCfgs = 1, 0
+ d1 := prepareMockDiscoverer("test1", numGroups, numCfgs)
+ d2 := prepareMockDiscoverer("test2", numGroups, numCfgs)
+ mgr := prepareManager(d1, d2)
+ expected := combineGroups(d1.groups, d2.groups)
+
+ sim := discoverySim{
+ mgr: mgr,
+ expectedGroups: expected,
+ }
+ return sim
+ },
+ "several discoverers, nil groups": func() discoverySim {
+ const numGroups, numCfgs = 0, 0
+ d1 := prepareMockDiscoverer("test1", numGroups, numCfgs)
+ d2 := prepareMockDiscoverer("test2", numGroups, numCfgs)
+ mgr := prepareManager(d1, d2)
+
+ sim := discoverySim{
+ mgr: mgr,
+ expectedGroups: nil,
+ }
+ return sim
+ },
+ }
+
+ for name, sim := range tests {
+ t.Run(name, func(t *testing.T) { sim().run(t) })
+ }
+}
+
+func prepareMockDiscoverer(source string, groups, configs int) mockDiscoverer {
+ d := mockDiscoverer{}
+
+ for i := 0; i < groups; i++ {
+ group := confgroup.Group{
+ Source: fmt.Sprintf("%s_group_%d", source, i+1),
+ }
+ for j := 0; j < configs; j++ {
+ group.Configs = append(group.Configs,
+ confgroup.Config{"name": fmt.Sprintf("%s_group_%d_target_%d", source, i+1, j+1)})
+ }
+ d.groups = append(d.groups, &group)
+ }
+ return d
+}
+
+func prepareManager(discoverers ...discoverer) *Manager {
+ mgr := &Manager{
+ send: make(chan struct{}, 1),
+ sendEvery: 2 * time.Second,
+ discoverers: discoverers,
+ cache: newCache(),
+ mux: &sync.RWMutex{},
+ }
+ return mgr
+}
+
+type mockDiscoverer struct {
+ groups []*confgroup.Group
+}
+
+func (md mockDiscoverer) Run(ctx context.Context, out chan<- []*confgroup.Group) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case out <- md.groups:
+ return
+ }
+ }
+}
+
+func combineGroups(groups ...[]*confgroup.Group) (combined []*confgroup.Group) {
+ for _, set := range groups {
+ combined = append(combined, set...)
+ }
+ return combined
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/conffile.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/conffile.go
new file mode 100644
index 000000000..73aef1737
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/conffile.go
@@ -0,0 +1,69 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package sd
+
+import (
+ "context"
+ "os"
+
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/multipath"
+)
+
+type confFile struct {
+ source string
+ content []byte
+}
+
+func newConfFileReader(log *logger.Logger, dir multipath.MultiPath) *confFileReader {
+ return &confFileReader{
+ Logger: log,
+ confDir: dir,
+ confChan: make(chan confFile),
+ }
+}
+
+type confFileReader struct {
+ *logger.Logger
+
+ confDir multipath.MultiPath
+ confChan chan confFile
+}
+
+func (c *confFileReader) run(ctx context.Context) {
+ files, err := c.confDir.FindFiles(".conf")
+ if err != nil {
+ c.Error(err)
+ return
+ }
+
+ if len(files) == 0 {
+ return
+ }
+
+ var confFiles []confFile
+
+ for _, file := range files {
+ bs, err := os.ReadFile(file)
+ if err != nil {
+ c.Error(err)
+ continue
+ }
+ confFiles = append(confFiles, confFile{
+ source: file,
+ content: bs,
+ })
+ }
+
+ for _, conf := range confFiles {
+ select {
+ case <-ctx.Done():
+ case c.confChan <- conf:
+ }
+ }
+
+}
+
+func (c *confFileReader) configs() chan confFile {
+ return c.confChan
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
new file mode 100644
index 000000000..cca6b658e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
@@ -0,0 +1,241 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/dockerhost"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/web"
+
+ "github.com/docker/docker/api/types"
+ typesContainer "github.com/docker/docker/api/types/container"
+ docker "github.com/docker/docker/client"
+ "github.com/ilyam8/hashstructure"
+)
+
+func NewDiscoverer(cfg Config) (*Discoverer, error) {
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ d := &Discoverer{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", "docker"),
+ ),
+ cfgSource: cfg.Source,
+ newDockerClient: func(addr string) (dockerClient, error) {
+ return docker.NewClientWithOpts(docker.WithHost(addr))
+ },
+ addr: docker.DefaultDockerHost,
+ listInterval: time.Second * 60,
+ timeout: time.Second * 2,
+ seenTggSources: make(map[string]bool),
+ started: make(chan struct{}),
+ }
+
+ if addr := dockerhost.FromEnv(); addr != "" && d.addr == docker.DefaultDockerHost {
+ d.Infof("using docker host from environment: %s ", addr)
+ d.addr = addr
+ }
+
+ d.Tags().Merge(tags)
+
+ if cfg.Timeout.Duration().Seconds() != 0 {
+ d.timeout = cfg.Timeout.Duration()
+ }
+ if cfg.Address != "" {
+ d.addr = cfg.Address
+ }
+
+ return d, nil
+}
+
+type Config struct {
+ Source string
+
+ Tags string `yaml:"tags"`
+ Address string `yaml:"address"`
+ Timeout web.Duration `yaml:"timeout"`
+}
+
+type (
+ Discoverer struct {
+ *logger.Logger
+ model.Base
+
+ dockerClient dockerClient
+ newDockerClient func(addr string) (dockerClient, error)
+ addr string
+
+ cfgSource string
+
+ listInterval time.Duration
+ timeout time.Duration
+ seenTggSources map[string]bool // [targetGroup.Source]
+
+ started chan struct{}
+ }
+ dockerClient interface {
+ NegotiateAPIVersion(context.Context)
+ ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error)
+ Close() error
+ }
+)
+
+func (d *Discoverer) String() string {
+ return "sd:docker"
+}
+
+func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.cleanup(); d.Info("instance is stopped") }()
+
+ close(d.started)
+
+ if d.dockerClient == nil {
+ client, err := d.newDockerClient(d.addr)
+ if err != nil {
+ d.Errorf("error on creating docker client: %v", err)
+ return
+ }
+ d.dockerClient = client
+ }
+
+ d.dockerClient.NegotiateAPIVersion(ctx)
+
+ if err := d.listContainers(ctx, in); err != nil {
+ d.Error(err)
+ return
+ }
+
+ tk := time.NewTicker(d.listInterval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.listContainers(ctx, in); err != nil {
+ d.Warning(err)
+ }
+ }
+ }
+}
+
+func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error {
+ listCtx, cancel := context.WithTimeout(ctx, d.timeout)
+ defer cancel()
+
+ containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{})
+ if err != nil {
+ return err
+ }
+
+ var tggs []model.TargetGroup
+ seen := make(map[string]bool)
+
+ for _, cntr := range containers {
+ if tgg := d.buildTargetGroup(cntr); tgg != nil {
+ tggs = append(tggs, tgg)
+ seen[tgg.Source()] = true
+ }
+ }
+
+ for src := range d.seenTggSources {
+ if !seen[src] {
+ tggs = append(tggs, &targetGroup{source: src})
+ }
+ }
+ d.seenTggSources = seen
+
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+
+ return nil
+}
+
+func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup {
+ if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 {
+ return nil
+ }
+
+ tgg := &targetGroup{
+ source: cntrSource(cntr),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ for netDriver, network := range cntr.NetworkSettings.Networks {
+ // container with network mode host will be discovered by local-listeners
+ for _, port := range cntr.Ports {
+ tgt := &target{
+ ID: cntr.ID,
+ Name: strings.TrimPrefix(cntr.Names[0], "/"),
+ Image: cntr.Image,
+ Command: cntr.Command,
+ Labels: mapAny(cntr.Labels),
+ PrivatePort: strconv.Itoa(int(port.PrivatePort)),
+ PublicPort: strconv.Itoa(int(port.PublicPort)),
+ PublicPortIP: port.IP,
+ PortProtocol: port.Type,
+ NetworkMode: cntr.HostConfig.NetworkMode,
+ NetworkDriver: netDriver,
+ IPAddress: network.IPAddress,
+ }
+ tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort)
+
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+
+ tgt.hash = hash
+ tgt.Tags().Merge(d.Tags())
+
+ tgg.targets = append(tgg.targets, tgt)
+ }
+ }
+
+ return tgg
+}
+
+func (d *Discoverer) cleanup() {
+ if d.dockerClient != nil {
+ _ = d.dockerClient.Close()
+ }
+}
+
+func cntrSource(cntr types.Container) string {
+ name := strings.TrimPrefix(cntr.Names[0], "/")
+ return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image)
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}
+
+func mapAny(src map[string]string) map[string]any {
+ if src == nil {
+ return nil
+ }
+ m := make(map[string]any, len(src))
+ for k, v := range src {
+ m[k] = v
+ }
+ return m
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go
new file mode 100644
index 000000000..d325f99dd
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go
@@ -0,0 +1,162 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/docker/docker/api/types"
+ typesNetwork "github.com/docker/docker/api/types/network"
+)
+
+func TestDiscoverer_Discover(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *discoverySim
+ }{
+ "add containers": {
+ createSim: func() *discoverySim {
+ nginx1 := prepareNginxContainer("nginx1")
+ nginx2 := prepareNginxContainer("nginx2")
+
+ sim := &discoverySim{
+ dockerCli: func(cli dockerCli, _ time.Duration) {
+ cli.addContainer(nginx1)
+ cli.addContainer(nginx2)
+ },
+ wantGroups: []model.TargetGroup{
+ &targetGroup{
+ source: cntrSource(nginx1),
+ targets: []model.Target{
+ withHash(&target{
+ ID: nginx1.ID,
+ Name: nginx1.Names[0][1:],
+ Image: nginx1.Image,
+ Command: nginx1.Command,
+ Labels: mapAny(nginx1.Labels),
+ PrivatePort: "80",
+ PublicPort: "8080",
+ PublicPortIP: "0.0.0.0",
+ PortProtocol: "tcp",
+ NetworkMode: "default",
+ NetworkDriver: "bridge",
+ IPAddress: "192.0.2.0",
+ Address: "192.0.2.0:80",
+ }),
+ },
+ },
+ &targetGroup{
+ source: cntrSource(nginx2),
+ targets: []model.Target{
+ withHash(&target{
+ ID: nginx2.ID,
+ Name: nginx2.Names[0][1:],
+ Image: nginx2.Image,
+ Command: nginx2.Command,
+ Labels: mapAny(nginx2.Labels),
+ PrivatePort: "80",
+ PublicPort: "8080",
+ PublicPortIP: "0.0.0.0",
+ PortProtocol: "tcp",
+ NetworkMode: "default",
+ NetworkDriver: "bridge",
+ IPAddress: "192.0.2.0",
+ Address: "192.0.2.0:80",
+ }),
+ },
+ },
+ },
+ }
+ return sim
+ },
+ },
+ "remove containers": {
+ createSim: func() *discoverySim {
+ nginx1 := prepareNginxContainer("nginx1")
+ nginx2 := prepareNginxContainer("nginx2")
+
+ sim := &discoverySim{
+ dockerCli: func(cli dockerCli, interval time.Duration) {
+ cli.addContainer(nginx1)
+ cli.addContainer(nginx2)
+ time.Sleep(interval * 2)
+ cli.removeContainer(nginx1.ID)
+ },
+ wantGroups: []model.TargetGroup{
+ &targetGroup{
+ source: cntrSource(nginx1),
+ targets: nil,
+ },
+ &targetGroup{
+ source: cntrSource(nginx2),
+ targets: []model.Target{
+ withHash(&target{
+ ID: nginx2.ID,
+ Name: nginx2.Names[0][1:],
+ Image: nginx2.Image,
+ Command: nginx2.Command,
+ Labels: mapAny(nginx2.Labels),
+ PrivatePort: "80",
+ PublicPort: "8080",
+ PublicPortIP: "0.0.0.0",
+ PortProtocol: "tcp",
+ NetworkMode: "default",
+ NetworkDriver: "bridge",
+ IPAddress: "192.0.2.0",
+ Address: "192.0.2.0:80",
+ }),
+ },
+ },
+ },
+ }
+ return sim
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareNginxContainer(name string) types.Container {
+ return types.Container{
+ ID: "id-" + name,
+ Names: []string{"/" + name},
+ Image: "nginx-image",
+ ImageID: "nginx-image-id",
+ Command: "nginx-command",
+ Ports: []types.Port{
+ {
+ IP: "0.0.0.0",
+ PrivatePort: 80,
+ PublicPort: 8080,
+ Type: "tcp",
+ },
+ },
+ Labels: map[string]string{"key1": "value1"},
+ HostConfig: struct {
+ NetworkMode string `json:",omitempty"`
+ Annotations map[string]string `json:",omitempty"`
+ }{
+ NetworkMode: "default",
+ },
+ NetworkSettings: &types.SummaryNetworkSettings{
+ Networks: map[string]*typesNetwork.EndpointSettings{
+ "bridge": {IPAddress: "192.0.2.0"},
+ },
+ },
+ }
+}
+
+func withHash(tgt *target) *target {
+ tgt.hash, _ = calcHash(tgt)
+ tags, _ := model.ParseTags("docker")
+ tgt.Tags().Merge(tags)
+ return tgt
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go
new file mode 100644
index 000000000..7b0b76aba
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go
@@ -0,0 +1,162 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "context"
+ "sort"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/docker/docker/api/types"
+ typesContainer "github.com/docker/docker/api/types/container"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type dockerCli interface {
+ addContainer(cntr types.Container)
+ removeContainer(id string)
+}
+
+type discoverySim struct {
+ dockerCli func(cli dockerCli, interval time.Duration)
+ wantGroups []model.TargetGroup
+}
+
+func (sim *discoverySim) run(t *testing.T) {
+ d, err := NewDiscoverer(Config{
+ Source: "",
+ Tags: "docker",
+ })
+ require.NoError(t, err)
+
+ mock := newMockDockerd()
+
+ d.newDockerClient = func(addr string) (dockerClient, error) {
+ return mock, nil
+ }
+ d.listInterval = time.Millisecond * 100
+
+ seen := make(map[string]model.TargetGroup)
+ ctx, cancel := context.WithCancel(context.Background())
+ in := make(chan []model.TargetGroup)
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ d.Discover(ctx, in)
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case tggs := <-in:
+ for _, tgg := range tggs {
+ seen[tgg.Source()] = tgg
+ }
+ }
+ }
+ }()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ wg.Wait()
+ }()
+
+ select {
+ case <-d.started:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery failed to start")
+ }
+
+ sim.dockerCli(mock, d.listInterval)
+ time.Sleep(time.Second)
+
+ cancel()
+
+ select {
+ case <-done:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery hasn't finished after cancel")
+ }
+
+ var tggs []model.TargetGroup
+ for _, tgg := range seen {
+ tggs = append(tggs, tgg)
+ }
+
+ sortTargetGroups(tggs)
+ sortTargetGroups(sim.wantGroups)
+
+ wantLen, gotLen := len(sim.wantGroups), len(tggs)
+ assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen)
+ assert.Equal(t, sim.wantGroups, tggs)
+
+ assert.True(t, mock.negApiVerCalled, "NegotiateAPIVersion called")
+ assert.True(t, mock.closeCalled, "Close called")
+}
+
+func newMockDockerd() *mockDockerd {
+ return &mockDockerd{
+ containers: make(map[string]types.Container),
+ }
+}
+
+type mockDockerd struct {
+ negApiVerCalled bool
+ closeCalled bool
+ mux sync.Mutex
+ containers map[string]types.Container
+}
+
+func (m *mockDockerd) addContainer(cntr types.Container) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ m.containers[cntr.ID] = cntr
+}
+
+func (m *mockDockerd) removeContainer(id string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ delete(m.containers, id)
+}
+
+func (m *mockDockerd) ContainerList(_ context.Context, _ typesContainer.ListOptions) ([]types.Container, error) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ var cntrs []types.Container
+ for _, cntr := range m.containers {
+ cntrs = append(cntrs, cntr)
+ }
+
+ return cntrs, nil
+}
+
+func (m *mockDockerd) NegotiateAPIVersion(_ context.Context) {
+ m.negApiVerCalled = true
+}
+
+func (m *mockDockerd) Close() error {
+ m.closeCalled = true
+ return nil
+}
+
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go
new file mode 100644
index 000000000..2422bc98e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go
@@ -0,0 +1,55 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "fmt"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+)
+
+type targetGroup struct {
+ source string
+ targets []model.Target
+}
+
+func (g *targetGroup) Provider() string { return "sd:docker" }
+func (g *targetGroup) Source() string { return g.source }
+func (g *targetGroup) Targets() []model.Target { return g.targets }
+
+type target struct {
+ model.Base
+
+ hash uint64
+
+ ID string
+ Name string
+ Image string
+ Command string
+ Labels map[string]any
+ PrivatePort string // Port on the container
+ PublicPort string // Port exposed on the host
+ PublicPortIP string // Host IP address that the container's port is mapped to
+ PortProtocol string
+ NetworkMode string
+ NetworkDriver string
+ IPAddress string
+
+ Address string // "IPAddress:PrivatePort"
+}
+
+func (t *target) TUID() string {
+ if t.PublicPort != "" {
+ return fmt.Sprintf("%s_%s_%s_%s_%s_%s",
+ t.Name, t.IPAddress, t.PublicPortIP, t.PortProtocol, t.PublicPort, t.PrivatePort)
+ }
+ if t.PrivatePort != "" {
+ return fmt.Sprintf("%s_%s_%s_%s",
+ t.Name, t.IPAddress, t.PortProtocol, t.PrivatePort)
+ }
+ return fmt.Sprintf("%s_%s", t.Name, t.IPAddress)
+}
+
+func (t *target) Hash() uint64 {
+ return t.hash
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go
new file mode 100644
index 000000000..15a1e4745
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "errors"
+ "fmt"
+)
+
+type Config struct {
+ APIServer string `yaml:"api_server"` // TODO: not used
+ Role string `yaml:"role"`
+ Tags string `yaml:"tags"`
+ Namespaces []string `yaml:"namespaces"`
+ Selector struct {
+ Label string `yaml:"label"`
+ Field string `yaml:"field"`
+ } `yaml:"selector"`
+ Pod struct {
+ LocalMode bool `yaml:"local_mode"`
+ } `yaml:"pod"`
+}
+
+func validateConfig(cfg Config) error {
+ switch role(cfg.Role) {
+ case rolePod, roleService:
+ default:
+ return fmt.Errorf("unknown role: '%s'", cfg.Role)
+ }
+ if cfg.Tags == "" {
+ return errors.New("'tags' not set")
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go
new file mode 100644
index 000000000..aa153a34a
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go
@@ -0,0 +1,268 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient"
+
+ "github.com/ilyam8/hashstructure"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type role string
+
+const (
+ rolePod role = "pod"
+ roleService role = "service"
+)
+
+const (
+ envNodeName = "MY_NODE_NAME"
+)
+
+var log = logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", "kubernetes"),
+)
+
+func NewKubeDiscoverer(cfg Config) (*KubeDiscoverer, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("config validation: %v", err)
+ }
+
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ client, err := k8sclient.New("Netdata/service-td")
+ if err != nil {
+ return nil, fmt.Errorf("create clientset: %v", err)
+ }
+
+ ns := cfg.Namespaces
+ if len(ns) == 0 {
+ ns = []string{corev1.NamespaceAll}
+ }
+
+ selectorField := cfg.Selector.Field
+ if role(cfg.Role) == rolePod && cfg.Pod.LocalMode {
+ name := os.Getenv(envNodeName)
+ if name == "" {
+ return nil, fmt.Errorf("local_mode is enabled, but env '%s' not set", envNodeName)
+ }
+ selectorField = joinSelectors(selectorField, "spec.nodeName="+name)
+ }
+
+ d := &KubeDiscoverer{
+ Logger: log,
+ client: client,
+ tags: tags,
+ role: role(cfg.Role),
+ namespaces: ns,
+ selectorLabel: cfg.Selector.Label,
+ selectorField: selectorField,
+ discoverers: make([]model.Discoverer, 0, len(ns)),
+ started: make(chan struct{}),
+ }
+
+ return d, nil
+}
+
+type KubeDiscoverer struct {
+ *logger.Logger
+
+ client kubernetes.Interface
+
+ tags model.Tags
+ role role
+ namespaces []string
+ selectorLabel string
+ selectorField string
+ discoverers []model.Discoverer
+ started chan struct{}
+}
+
+func (d *KubeDiscoverer) String() string {
+ return "sd:k8s"
+}
+
+const resyncPeriod = 10 * time.Minute
+
+func (d *KubeDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer d.Info("instance is stopped")
+
+ for _, namespace := range d.namespaces {
+ var dd model.Discoverer
+ switch d.role {
+ case rolePod:
+ dd = d.setupPodDiscoverer(ctx, namespace)
+ case roleService:
+ dd = d.setupServiceDiscoverer(ctx, namespace)
+ default:
+ d.Errorf("unknown role: '%s'", d.role)
+ continue
+ }
+ d.discoverers = append(d.discoverers, dd)
+ }
+
+ if len(d.discoverers) == 0 {
+ d.Error("no discoverers registered")
+ return
+ }
+
+ d.Infof("registered: %v", d.discoverers)
+
+ var wg sync.WaitGroup
+ updates := make(chan []model.TargetGroup)
+
+ for _, disc := range d.discoverers {
+ wg.Add(1)
+ go func(disc model.Discoverer) { defer wg.Done(); disc.Discover(ctx, updates) }(disc)
+ }
+
+ done := make(chan struct{})
+ go func() { defer close(done); wg.Wait() }()
+
+ close(d.started)
+
+ for {
+ select {
+ case <-ctx.Done():
+ select {
+ case <-done:
+ d.Info("all discoverers exited")
+ case <-time.After(time.Second * 5):
+ d.Warning("not all discoverers exited")
+ }
+ return
+ case <-done:
+ d.Info("all discoverers exited")
+ return
+ case tggs := <-updates:
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+ }
+ }
+}
+
+func (d *KubeDiscoverer) setupPodDiscoverer(ctx context.Context, ns string) *podDiscoverer {
+ pod := d.client.CoreV1().Pods(ns)
+ podLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return pod.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return pod.Watch(ctx, opts)
+ },
+ }
+
+ cmap := d.client.CoreV1().ConfigMaps(ns)
+ cmapLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ return cmap.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ return cmap.Watch(ctx, opts)
+ },
+ }
+
+ secret := d.client.CoreV1().Secrets(ns)
+ secretLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ return secret.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ return secret.Watch(ctx, opts)
+ },
+ }
+
+ td := newPodDiscoverer(
+ cache.NewSharedInformer(podLW, &corev1.Pod{}, resyncPeriod),
+ cache.NewSharedInformer(cmapLW, &corev1.ConfigMap{}, resyncPeriod),
+ cache.NewSharedInformer(secretLW, &corev1.Secret{}, resyncPeriod),
+ )
+ td.Tags().Merge(d.tags)
+
+ return td
+}
+
+func (d *KubeDiscoverer) setupServiceDiscoverer(ctx context.Context, namespace string) *serviceDiscoverer {
+ svc := d.client.CoreV1().Services(namespace)
+
+ svcLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return svc.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return svc.Watch(ctx, opts)
+ },
+ }
+
+ inf := cache.NewSharedInformer(svcLW, &corev1.Service{}, resyncPeriod)
+
+ td := newServiceDiscoverer(inf)
+ td.Tags().Merge(d.tags)
+
+ return td
+}
+
+func enqueue(queue *workqueue.Type, obj any) {
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err != nil {
+ return
+ }
+ queue.Add(key)
+}
+
+func send(ctx context.Context, in chan<- []model.TargetGroup, tgg model.TargetGroup) {
+ if tgg == nil {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ case in <- []model.TargetGroup{tgg}:
+ }
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}
+
+func joinSelectors(srs ...string) string {
+ var i int
+ for _, v := range srs {
+ if v != "" {
+ srs[i] = v
+ i++
+ }
+ }
+ return strings.Join(srs[:i], ",")
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go
new file mode 100644
index 000000000..9743a0af5
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go
@@ -0,0 +1,160 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "fmt"
+ "os"
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/fake"
+)
+
+var discoveryTags, _ = model.ParseTags("k8s")
+
+func TestMain(m *testing.M) {
+ _ = os.Setenv(envNodeName, "m01")
+ _ = os.Setenv(k8sclient.EnvFakeClient, "true")
+ code := m.Run()
+ _ = os.Unsetenv(envNodeName)
+ _ = os.Unsetenv(k8sclient.EnvFakeClient)
+ os.Exit(code)
+}
+
+func TestNewKubeDiscoverer(t *testing.T) {
+ tests := map[string]struct {
+ cfg Config
+ wantErr bool
+ }{
+ "pod role config": {
+ wantErr: false,
+ cfg: Config{Role: string(rolePod), Tags: "k8s"},
+ },
+ "service role config": {
+ wantErr: false,
+ cfg: Config{Role: string(roleService), Tags: "k8s"},
+ },
+ "empty config": {
+ wantErr: true,
+ cfg: Config{},
+ },
+ }
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ disc, err := NewKubeDiscoverer(test.cfg)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, disc)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, disc)
+ }
+ })
+ }
+}
+
+func TestKubeDiscoverer_Discover(t *testing.T) {
+ const prod = "prod"
+ const dev = "dev"
+ prodNamespace := newNamespace(prod)
+ devNamespace := newNamespace(dev)
+
+ tests := map[string]struct {
+ createSim func() discoverySim
+ }{
+ "multiple namespaces pod td": {
+ createSim: func() discoverySim {
+ httpdProd, nginxProd := newHTTPDPod(), newNGINXPod()
+ httpdProd.Namespace = prod
+ nginxProd.Namespace = prod
+
+ httpdDev, nginxDev := newHTTPDPod(), newNGINXPod()
+ httpdDev.Namespace = dev
+ nginxDev.Namespace = dev
+
+ disc, _ := preparePodDiscoverer(
+ []string{prod, dev},
+ prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev)
+
+ return discoverySim{
+ td: disc,
+ sortBeforeVerify: true,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpdDev),
+ preparePodTargetGroup(nginxDev),
+ preparePodTargetGroup(httpdProd),
+ preparePodTargetGroup(nginxProd),
+ },
+ }
+ },
+ },
+ "multiple namespaces ClusterIP service td": {
+ createSim: func() discoverySim {
+ httpdProd, nginxProd := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ httpdProd.Namespace = prod
+ nginxProd.Namespace = prod
+
+ httpdDev, nginxDev := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ httpdDev.Namespace = dev
+ nginxDev.Namespace = dev
+
+ disc, _ := prepareSvcDiscoverer(
+ []string{prod, dev},
+ prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev)
+
+ return discoverySim{
+ td: disc,
+ sortBeforeVerify: true,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpdDev),
+ prepareSvcTargetGroup(nginxDev),
+ prepareSvcTargetGroup(httpdProd),
+ prepareSvcTargetGroup(nginxProd),
+ },
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareDiscoverer(role role, namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ client := fake.NewSimpleClientset(objects...)
+ tags, _ := model.ParseTags("k8s")
+ disc := &KubeDiscoverer{
+ tags: tags,
+ role: role,
+ namespaces: namespaces,
+ client: client,
+ discoverers: nil,
+ started: make(chan struct{}),
+ }
+ return disc, client
+}
+
+func newNamespace(name string) *corev1.Namespace {
+ return &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}
+}
+
+func mustCalcHash(obj any) uint64 {
+ hash, err := calcHash(obj)
+ if err != nil {
+ panic(fmt.Sprintf("hash calculation: %v", err))
+ }
+ return hash
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
new file mode 100644
index 000000000..a271e7285
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
@@ -0,0 +1,434 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type podTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (p podTargetGroup) Provider() string { return "sd:k8s:pod" }
+func (p podTargetGroup) Source() string { return p.source }
+func (p podTargetGroup) Targets() []model.Target { return p.targets }
+
+type PodTarget struct {
+ model.Base `hash:"ignore"`
+
+ hash uint64
+ tuid string
+
+ Address string
+ Namespace string
+ Name string
+ Annotations map[string]any
+ Labels map[string]any
+ NodeName string
+ PodIP string
+ ControllerName string
+ ControllerKind string
+ ContName string
+ Image string
+ Env map[string]any
+ Port string
+ PortName string
+ PortProtocol string
+}
+
+func (p PodTarget) Hash() uint64 { return p.hash }
+func (p PodTarget) TUID() string { return p.tuid }
+
+func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer {
+
+ if pod == nil || cmap == nil || secret == nil {
+ panic("nil pod or cmap or secret informer")
+ }
+
+ queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"})
+
+ _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj any) { enqueue(queue, obj) },
+ UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
+ DeleteFunc: func(obj any) { enqueue(queue, obj) },
+ })
+
+ return &podDiscoverer{
+ Logger: log,
+ podInformer: pod,
+ cmapInformer: cmap,
+ secretInformer: secret,
+ queue: queue,
+ }
+}
+
+type podDiscoverer struct {
+ *logger.Logger
+ model.Base
+
+ podInformer cache.SharedInformer
+ cmapInformer cache.SharedInformer
+ secretInformer cache.SharedInformer
+ queue *workqueue.Type
+}
+
+func (p *podDiscoverer) String() string {
+ return "sd:k8s:pod"
+}
+
+func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ p.Info("instance is started")
+ defer p.Info("instance is stopped")
+ defer p.queue.ShutDown()
+
+ go p.podInformer.Run(ctx.Done())
+ go p.cmapInformer.Run(ctx.Done())
+ go p.secretInformer.Run(ctx.Done())
+
+ if !cache.WaitForCacheSync(ctx.Done(),
+ p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) {
+ p.Error("failed to sync caches")
+ return
+ }
+
+ go p.run(ctx, in)
+
+ <-ctx.Done()
+}
+
+func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
+ for {
+ item, shutdown := p.queue.Get()
+ if shutdown {
+ return
+ }
+ p.handleQueueItem(ctx, in, item)
+ }
+}
+
+func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
+ defer p.queue.Done(item)
+
+ key := item.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return
+ }
+
+ obj, ok, err := p.podInformer.GetStore().GetByKey(key)
+ if err != nil {
+ return
+ }
+
+ if !ok {
+ tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)}
+ send(ctx, in, tgg)
+ return
+ }
+
+ pod, err := toPod(obj)
+ if err != nil {
+ return
+ }
+
+ tgg := p.buildTargetGroup(pod)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(p.Tags())
+ }
+
+ send(ctx, in, tgg)
+
+}
+
+func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup {
+ if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 {
+ return &podTargetGroup{
+ source: podSource(pod),
+ }
+ }
+ return &podTargetGroup{
+ source: podSource(pod),
+ targets: p.buildTargets(pod),
+ }
+}
+
+func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) {
+ var name, kind string
+ for _, ref := range pod.OwnerReferences {
+ if ref.Controller != nil && *ref.Controller {
+ name = ref.Name
+ kind = ref.Kind
+ break
+ }
+ }
+
+ for _, container := range pod.Spec.Containers {
+ env := p.collectEnv(pod.Namespace, container)
+
+ if len(container.Ports) == 0 {
+ tgt := &PodTarget{
+ tuid: podTUID(pod, container),
+ Address: pod.Status.PodIP,
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: name,
+ ControllerKind: kind,
+ ContName: container.Name,
+ Image: container.Image,
+ Env: mapAny(env),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ } else {
+ for _, port := range container.Ports {
+ portNum := strconv.FormatUint(uint64(port.ContainerPort), 10)
+ tgt := &PodTarget{
+ tuid: podTUIDWithPort(pod, container, port),
+ Address: net.JoinHostPort(pod.Status.PodIP, portNum),
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: name,
+ ControllerKind: kind,
+ ContName: container.Name,
+ Image: container.Image,
+ Env: mapAny(env),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ }
+ }
+ }
+
+ return targets
+}
+
+func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string {
+ vars := make(map[string]string)
+
+ // When a key exists in multiple sources,
+ // the value associated with the last source will take precedence.
+ // Values defined by an Env with a duplicate key will take precedence.
+ //
+ // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go)
+ // - envFrom: configMapRef, secretRef
+ // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap
+
+ for _, src := range container.EnvFrom {
+ switch {
+ case src.ConfigMapRef != nil:
+ p.envFromConfigMap(vars, ns, src)
+ case src.SecretRef != nil:
+ p.envFromSecret(vars, ns, src)
+ }
+ }
+
+ for _, env := range container.Env {
+ if env.Name == "" || isVar(env.Name) {
+ continue
+ }
+ switch {
+ case env.Value != "":
+ vars[env.Name] = env.Value
+ case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil:
+ p.valueFromSecret(vars, ns, env)
+ case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil:
+ p.valueFromConfigMap(vars, ns, env)
+ }
+ }
+
+ if len(vars) == 0 {
+ return nil
+ }
+ return vars
+}
+
+func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) {
+ if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" {
+ return
+ }
+
+ sr := env.ValueFrom.ConfigMapKeyRef
+ key := ns + "/" + sr.Name
+
+ item, exist, err := p.cmapInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ cmap, err := toConfigMap(item)
+ if err != nil {
+ return
+ }
+
+ if v, ok := cmap.Data[sr.Key]; ok {
+ vars[env.Name] = v
+ }
+}
+
+func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) {
+ if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" {
+ return
+ }
+
+ secretKey := env.ValueFrom.SecretKeyRef
+ key := ns + "/" + secretKey.Name
+
+ item, exist, err := p.secretInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ secret, err := toSecret(item)
+ if err != nil {
+ return
+ }
+
+ if v, ok := secret.Data[secretKey.Key]; ok {
+ vars[env.Name] = string(v)
+ }
+}
+
+func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) {
+ if src.ConfigMapRef.Name == "" {
+ return
+ }
+
+ key := ns + "/" + src.ConfigMapRef.Name
+ item, exist, err := p.cmapInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ cmap, err := toConfigMap(item)
+ if err != nil {
+ return
+ }
+
+ for k, v := range cmap.Data {
+ vars[src.Prefix+k] = v
+ }
+}
+
+func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) {
+ if src.SecretRef.Name == "" {
+ return
+ }
+
+ key := ns + "/" + src.SecretRef.Name
+ item, exist, err := p.secretInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ secret, err := toSecret(item)
+ if err != nil {
+ return
+ }
+
+ for k, v := range secret.Data {
+ vars[src.Prefix+k] = string(v)
+ }
+}
+
+func podTUID(pod *corev1.Pod, container corev1.Container) string {
+ return fmt.Sprintf("%s_%s_%s",
+ pod.Namespace,
+ pod.Name,
+ container.Name,
+ )
+}
+
+func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string {
+ return fmt.Sprintf("%s_%s_%s_%s_%s",
+ pod.Namespace,
+ pod.Name,
+ container.Name,
+ strings.ToLower(string(port.Protocol)),
+ strconv.FormatUint(uint64(port.ContainerPort), 10),
+ )
+}
+
+func podSourceFromNsName(namespace, name string) string {
+ return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name)
+}
+
+func podSource(pod *corev1.Pod) string {
+ return podSourceFromNsName(pod.Namespace, pod.Name)
+}
+
+func toPod(obj any) (*corev1.Pod, error) {
+ pod, ok := obj.(*corev1.Pod)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return pod, nil
+}
+
+func toConfigMap(obj any) (*corev1.ConfigMap, error) {
+ cmap, ok := obj.(*corev1.ConfigMap)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return cmap, nil
+}
+
+func toSecret(obj any) (*corev1.Secret, error) {
+ secret, ok := obj.(*corev1.Secret)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return secret, nil
+}
+
+func isVar(name string) bool {
+ // Variable references $(VAR_NAME) are expanded using the previous defined
+ // environment variables in the container and any service environment
+ // variables.
+ return strings.IndexByte(name, '$') != -1
+}
+
+func mapAny(src map[string]string) map[string]any {
+ if src == nil {
+ return nil
+ }
+ m := make(map[string]any, len(src))
+ for k, v := range src {
+ m[k] = v
+ }
+ return m
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go
new file mode 100644
index 000000000..ebe92d2f6
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go
@@ -0,0 +1,648 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "net"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+func TestPodTargetGroup_Provider(t *testing.T) {
+ var p podTargetGroup
+ assert.NotEmpty(t, p.Provider())
+}
+
+func TestPodTargetGroup_Source(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantSources []string
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantSources: []string{
+ "discoverer=k8s,kind=pod,namespace=default,pod_name=httpd-dd95c4d68-5bkwl",
+ "discoverer=k8s,kind=pod,namespace=default,pod_name=nginx-7cfd77469b-q6kxj",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var sources []string
+ for _, tgg := range sim.run(t) {
+ sources = append(sources, tgg.Source())
+ }
+
+ assert.Equal(t, test.wantSources, sources)
+ })
+ }
+}
+
+func TestPodTargetGroup_Targets(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTargets int
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: discovery,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantTargets: 4,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var targets int
+ for _, tgg := range sim.run(t) {
+ targets += len(tgg.Targets())
+ }
+
+ assert.Equal(t, test.wantTargets, targets)
+ })
+ }
+}
+
+func TestPodTarget_Hash(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantHashes []uint64
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: discovery,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantHashes: []uint64{
+ 12703169414253998055,
+ 13351713096133918928,
+ 8241692333761256175,
+ 11562466355572729519,
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var hashes []uint64
+ for _, tgg := range sim.run(t) {
+ for _, tg := range tgg.Targets() {
+ hashes = append(hashes, tg.Hash())
+ }
+ }
+
+ assert.Equal(t, test.wantHashes, hashes)
+ })
+ }
+}
+
+func TestPodTarget_TUID(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTUID []string
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: discovery,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantTUID: []string{
+ "default_httpd-dd95c4d68-5bkwl_httpd_tcp_80",
+ "default_httpd-dd95c4d68-5bkwl_httpd_tcp_443",
+ "default_nginx-7cfd77469b-q6kxj_nginx_tcp_80",
+ "default_nginx-7cfd77469b-q6kxj_nginx_tcp_443",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var tuid []string
+ for _, tgg := range sim.run(t) {
+ for _, tg := range tgg.Targets() {
+ tuid = append(tuid, tg.TUID())
+ }
+ }
+
+ assert.Equal(t, test.wantTUID, tuid)
+ })
+ }
+}
+
+func TestNewPodDiscoverer(t *testing.T) {
+ tests := map[string]struct {
+ podInf cache.SharedInformer
+ cmapInf cache.SharedInformer
+ secretInf cache.SharedInformer
+ wantPanic bool
+ }{
+ "valid informers": {
+ wantPanic: false,
+ podInf: cache.NewSharedInformer(nil, &corev1.Pod{}, resyncPeriod),
+ cmapInf: cache.NewSharedInformer(nil, &corev1.ConfigMap{}, resyncPeriod),
+ secretInf: cache.NewSharedInformer(nil, &corev1.Secret{}, resyncPeriod),
+ },
+ "nil informers": {
+ wantPanic: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ f := func() { newPodDiscoverer(test.podInf, test.cmapInf, test.secretInf) }
+
+ if test.wantPanic {
+ assert.Panics(t, f)
+ } else {
+ assert.NotPanics(t, f)
+ }
+ })
+ }
+}
+
+func TestPodDiscoverer_String(t *testing.T) {
+ var p podDiscoverer
+ assert.NotEmpty(t, p.String())
+}
+
+func TestPodDiscoverer_Discover(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ }{
+ "ADD: pods exist before run": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ td, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: td,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: pods exist before run and add after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, client := prepareAllNsPodDiscoverer(httpd)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "DELETE: remove pods after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, client := prepareAllNsPodDiscoverer(httpd, nginx)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _ = podClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "DELETE,ADD: remove and add pods after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, client := prepareAllNsPodDiscoverer(httpd)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: pods with empty PodIP": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ httpd.Status.PodIP = ""
+ nginx.Status.PodIP = ""
+ disc, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "UPDATE: set pods PodIP after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ httpd.Status.PodIP = ""
+ nginx.Status.PodIP = ""
+ disc, client := prepareAllNsPodDiscoverer(httpd, nginx)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _, _ = podClient.Update(ctx, newHTTPDPod(), metav1.UpdateOptions{})
+ _, _ = podClient.Update(ctx, newNGINXPod(), metav1.UpdateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ preparePodTargetGroup(newHTTPDPod()),
+ preparePodTargetGroup(newNGINXPod()),
+ },
+ }
+ },
+ },
+ "ADD: pods without containers": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ httpd.Spec.Containers = httpd.Spec.Containers[:0]
+ nginx.Spec.Containers = httpd.Spec.Containers[:0]
+ disc, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "Env: from value": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.Env = []corev1.EnvVar{
+ {Name: "key1", Value: "value1"},
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1"}
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "Env: from Secret": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.Env = []corev1.EnvVar{
+ {
+ Name: "key1",
+ ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"},
+ Key: "key1",
+ }},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1"}
+ secret := prepareSecret("my-secret", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, secret)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "Env: from ConfigMap": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.Env = []corev1.EnvVar{
+ {
+ Name: "key1",
+ ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"},
+ Key: "key1",
+ }},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1"}
+ cmap := prepareConfigMap("my-cmap", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, cmap)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "EnvFrom: from ConfigMap": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.EnvFrom = []corev1.EnvFromSource{
+ {
+ ConfigMapRef: &corev1.ConfigMapEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1", "key2": "value2"}
+ cmap := prepareConfigMap("my-cmap", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, cmap)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "EnvFrom: from Secret": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.EnvFrom = []corev1.EnvFromSource{
+ {
+ SecretRef: &corev1.SecretEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1", "key2": "value2"}
+ secret := prepareSecret("my-secret", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, secret)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareAllNsPodDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(rolePod, []string{corev1.NamespaceAll}, objects...)
+}
+
+func preparePodDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(rolePod, namespaces, objects...)
+}
+
+func mangleContainers(containers []corev1.Container, mange func(container *corev1.Container)) {
+ for i := range containers {
+ mange(&containers[i])
+ }
+}
+
+var controllerTrue = true
+
+func newHTTPDPod() *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "httpd-dd95c4d68-5bkwl",
+ Namespace: "default",
+ UID: "1cebb6eb-0c1e-495b-8131-8fa3e6668dc8",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "httpd", "tier": "frontend"},
+ OwnerReferences: []metav1.OwnerReference{
+ {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue},
+ },
+ },
+ Spec: corev1.PodSpec{
+ NodeName: "m01",
+ Containers: []corev1.Container{
+ {
+ Name: "httpd",
+ Image: "httpd",
+ Ports: []corev1.ContainerPort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443},
+ },
+ },
+ },
+ },
+ Status: corev1.PodStatus{
+ PodIP: "172.17.0.1",
+ },
+ }
+}
+
+func newNGINXPod() *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "nginx-7cfd77469b-q6kxj",
+ Namespace: "default",
+ UID: "09e883f2-d740-4c5f-970d-02cf02876522",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "nginx", "tier": "frontend"},
+ OwnerReferences: []metav1.OwnerReference{
+ {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue},
+ },
+ },
+ Spec: corev1.PodSpec{
+ NodeName: "m01",
+ Containers: []corev1.Container{
+ {
+ Name: "nginx",
+ Image: "nginx",
+ Ports: []corev1.ContainerPort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443},
+ },
+ },
+ },
+ },
+ Status: corev1.PodStatus{
+ PodIP: "172.17.0.2",
+ },
+ }
+}
+
+func prepareConfigMap(name string, data map[string]string) *corev1.ConfigMap {
+ return &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: "default",
+ UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8167" + name),
+ },
+ Data: data,
+ }
+}
+
+func prepareSecret(name string, data map[string]string) *corev1.Secret {
+ secretData := make(map[string][]byte, len(data))
+ for k, v := range data {
+ secretData[k] = []byte(v)
+ }
+ return &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: "default",
+ UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8161" + name),
+ },
+ Data: secretData,
+ }
+}
+
+func prepareEmptyPodTargetGroup(pod *corev1.Pod) *podTargetGroup {
+ return &podTargetGroup{source: podSource(pod)}
+}
+
+func preparePodTargetGroup(pod *corev1.Pod) *podTargetGroup {
+ tgg := prepareEmptyPodTargetGroup(pod)
+
+ for _, container := range pod.Spec.Containers {
+ for _, port := range container.Ports {
+ portNum := strconv.FormatUint(uint64(port.ContainerPort), 10)
+ tgt := &PodTarget{
+ tuid: podTUIDWithPort(pod, container, port),
+ Address: net.JoinHostPort(pod.Status.PodIP, portNum),
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: "netdata-test",
+ ControllerKind: "DaemonSet",
+ ContName: container.Name,
+ Image: container.Image,
+ Env: nil,
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ }
+ tgt.hash = mustCalcHash(tgt)
+ tgt.Tags().Merge(discoveryTags)
+
+ tgg.targets = append(tgg.targets, tgt)
+ }
+ }
+
+ return tgg
+}
+
+func preparePodTargetGroupWithEnv(pod *corev1.Pod, env map[string]string) *podTargetGroup {
+ tgg := preparePodTargetGroup(pod)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.(*PodTarget).Env = mapAny(env)
+ tgt.(*PodTarget).hash = mustCalcHash(tgt)
+ }
+
+ return tgg
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
new file mode 100644
index 000000000..4cfdd62f1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
@@ -0,0 +1,209 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type serviceTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" }
+func (s serviceTargetGroup) Source() string { return s.source }
+func (s serviceTargetGroup) Targets() []model.Target { return s.targets }
+
+type ServiceTarget struct {
+ model.Base `hash:"ignore"`
+
+ hash uint64
+ tuid string
+
+ Address string
+ Namespace string
+ Name string
+ Annotations map[string]any
+ Labels map[string]any
+ Port string
+ PortName string
+ PortProtocol string
+ ClusterIP string
+ ExternalName string
+ Type string
+}
+
+func (s ServiceTarget) Hash() uint64 { return s.hash }
+func (s ServiceTarget) TUID() string { return s.tuid }
+
+type serviceDiscoverer struct {
+ *logger.Logger
+ model.Base
+
+ informer cache.SharedInformer
+ queue *workqueue.Type
+}
+
+func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer {
+ if inf == nil {
+ panic("nil service informer")
+ }
+
+ queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"})
+ _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj any) { enqueue(queue, obj) },
+ UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
+ DeleteFunc: func(obj any) { enqueue(queue, obj) },
+ })
+
+ return &serviceDiscoverer{
+ Logger: log,
+ informer: inf,
+ queue: queue,
+ }
+}
+
+func (s *serviceDiscoverer) String() string {
+ return "k8s service"
+}
+
+func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) {
+ s.Info("instance is started")
+ defer s.Info("instance is stopped")
+ defer s.queue.ShutDown()
+
+ go s.informer.Run(ctx.Done())
+
+ if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
+ s.Error("failed to sync caches")
+ return
+ }
+
+ go s.run(ctx, ch)
+
+ <-ctx.Done()
+}
+
+func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
+ for {
+ item, shutdown := s.queue.Get()
+ if shutdown {
+ return
+ }
+
+ s.handleQueueItem(ctx, in, item)
+ }
+}
+
+func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
+ defer s.queue.Done(item)
+
+ key := item.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return
+ }
+
+ obj, exists, err := s.informer.GetStore().GetByKey(key)
+ if err != nil {
+ return
+ }
+
+ if !exists {
+ tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)}
+ send(ctx, in, tgg)
+ return
+ }
+
+ svc, err := toService(obj)
+ if err != nil {
+ return
+ }
+
+ tgg := s.buildTargetGroup(svc)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(s.Tags())
+ }
+
+ send(ctx, in, tgg)
+}
+
+func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup {
+ // TODO: headless service?
+ if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 {
+ return &serviceTargetGroup{
+ source: serviceSource(svc),
+ }
+ }
+ return &serviceTargetGroup{
+ source: serviceSource(svc),
+ targets: s.buildTargets(svc),
+ }
+}
+
+func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) {
+ for _, port := range svc.Spec.Ports {
+ portNum := strconv.FormatInt(int64(port.Port), 10)
+ tgt := &ServiceTarget{
+ tuid: serviceTUID(svc, port),
+ Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum),
+ Namespace: svc.Namespace,
+ Name: svc.Name,
+ Annotations: mapAny(svc.Annotations),
+ Labels: mapAny(svc.Labels),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ ClusterIP: svc.Spec.ClusterIP,
+ ExternalName: svc.Spec.ExternalName,
+ Type: string(svc.Spec.Type),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ }
+
+ return targets
+}
+
+func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string {
+ return fmt.Sprintf("%s_%s_%s_%s",
+ svc.Namespace,
+ svc.Name,
+ strings.ToLower(string(port.Protocol)),
+ strconv.FormatInt(int64(port.Port), 10),
+ )
+}
+
+func serviceSourceFromNsName(namespace, name string) string {
+ return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name)
+}
+
+func serviceSource(svc *corev1.Service) string {
+ return serviceSourceFromNsName(svc.Namespace, svc.Name)
+}
+
+func toService(obj any) (*corev1.Service, error) {
+ svc, ok := obj.(*corev1.Service)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return svc, nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go
new file mode 100644
index 000000000..d2e496015
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go
@@ -0,0 +1,456 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "net"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+func TestServiceTargetGroup_Provider(t *testing.T) {
+ var s serviceTargetGroup
+ assert.NotEmpty(t, s.Provider())
+}
+
+func TestServiceTargetGroup_Source(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantSources []string
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantSources: []string{
+ "discoverer=k8s,kind=service,namespace=default,service_name=httpd-cluster-ip-service",
+ "discoverer=k8s,kind=service,namespace=default,service_name=nginx-cluster-ip-service",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var sources []string
+ for _, tgg := range sim.run(t) {
+ sources = append(sources, tgg.Source())
+ }
+
+ assert.Equal(t, test.wantSources, sources)
+ })
+ }
+}
+
+func TestServiceTargetGroup_Targets(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTargets int
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantTargets: 4,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var targets int
+ for _, tgg := range sim.run(t) {
+ targets += len(tgg.Targets())
+ }
+
+ assert.Equal(t, test.wantTargets, targets)
+ })
+ }
+}
+
+func TestServiceTarget_Hash(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantHashes []uint64
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantHashes: []uint64{
+ 17611803477081780974,
+ 6019985892433421258,
+ 4151907287549842238,
+ 5757608926096186119,
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var hashes []uint64
+ for _, tgg := range sim.run(t) {
+ for _, tgt := range tgg.Targets() {
+ hashes = append(hashes, tgt.Hash())
+ }
+ }
+
+ assert.Equal(t, test.wantHashes, hashes)
+ })
+ }
+}
+
+func TestServiceTarget_TUID(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTUID []string
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantTUID: []string{
+ "default_httpd-cluster-ip-service_tcp_80",
+ "default_httpd-cluster-ip-service_tcp_443",
+ "default_nginx-cluster-ip-service_tcp_80",
+ "default_nginx-cluster-ip-service_tcp_443",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var tuid []string
+ for _, tgg := range sim.run(t) {
+ for _, tgt := range tgg.Targets() {
+ tuid = append(tuid, tgt.TUID())
+ }
+ }
+
+ assert.Equal(t, test.wantTUID, tuid)
+ })
+ }
+}
+
+func TestNewServiceDiscoverer(t *testing.T) {
+ tests := map[string]struct {
+ informer cache.SharedInformer
+ wantPanic bool
+ }{
+ "valid informer": {
+ wantPanic: false,
+ informer: cache.NewSharedInformer(nil, &corev1.Service{}, resyncPeriod),
+ },
+ "nil informer": {
+ wantPanic: true,
+ informer: nil,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ f := func() { newServiceDiscoverer(test.informer) }
+
+ if test.wantPanic {
+ assert.Panics(t, f)
+ } else {
+ assert.NotPanics(t, f)
+ }
+ })
+ }
+}
+
+func TestServiceDiscoverer_String(t *testing.T) {
+ var s serviceDiscoverer
+ assert.NotEmpty(t, s.String())
+}
+
+func TestServiceDiscoverer_Discover(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ }{
+ "ADD: ClusterIP svc exist before run": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: ClusterIP svc exist before run and add after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, client := prepareAllNsSvcDiscoverer(httpd)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "DELETE: ClusterIP svc remove after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, client := prepareAllNsSvcDiscoverer(httpd, nginx)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _ = svcClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD,DELETE: ClusterIP svc remove and add after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, client := prepareAllNsSvcDiscoverer(httpd)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: Headless svc exist before run": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "UPDATE: Headless => ClusterIP svc after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService()
+ httpdUpd, nginxUpd := *httpd, *nginx
+ httpdUpd.Spec.ClusterIP = "10.100.0.1"
+ nginxUpd.Spec.ClusterIP = "10.100.0.2"
+ disc, client := prepareAllNsSvcDiscoverer(httpd, nginx)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _, _ = svcClient.Update(ctx, &httpdUpd, metav1.UpdateOptions{})
+ _, _ = svcClient.Update(ctx, &nginxUpd, metav1.UpdateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ prepareSvcTargetGroup(&httpdUpd),
+ prepareSvcTargetGroup(&nginxUpd),
+ },
+ }
+ },
+ },
+ "ADD: ClusterIP svc with zero exposed ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ httpd.Spec.Ports = httpd.Spec.Ports[:0]
+ nginx.Spec.Ports = httpd.Spec.Ports[:0]
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareAllNsSvcDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(roleService, []string{corev1.NamespaceAll}, objects...)
+}
+
+func prepareSvcDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(roleService, namespaces, objects...)
+}
+
+func newHTTPDClusterIPService() *corev1.Service {
+ return &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "httpd-cluster-ip-service",
+ Namespace: "default",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "httpd", "tier": "frontend"},
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443},
+ },
+ Type: corev1.ServiceTypeClusterIP,
+ ClusterIP: "10.100.0.1",
+ Selector: map[string]string{"app": "httpd", "tier": "frontend"},
+ },
+ }
+}
+
+func newNGINXClusterIPService() *corev1.Service {
+ return &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "nginx-cluster-ip-service",
+ Namespace: "default",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "nginx", "tier": "frontend"},
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443},
+ },
+ Type: corev1.ServiceTypeClusterIP,
+ ClusterIP: "10.100.0.2",
+ Selector: map[string]string{"app": "nginx", "tier": "frontend"},
+ },
+ }
+}
+
+func newHTTPDHeadlessService() *corev1.Service {
+ svc := newHTTPDClusterIPService()
+ svc.Name = "httpd-headless-service"
+ svc.Spec.ClusterIP = ""
+ return svc
+}
+
+func newNGINXHeadlessService() *corev1.Service {
+ svc := newNGINXClusterIPService()
+ svc.Name = "nginx-headless-service"
+ svc.Spec.ClusterIP = ""
+ return svc
+}
+
+func prepareEmptySvcTargetGroup(svc *corev1.Service) *serviceTargetGroup {
+ return &serviceTargetGroup{source: serviceSource(svc)}
+}
+
+func prepareSvcTargetGroup(svc *corev1.Service) *serviceTargetGroup {
+ tgg := prepareEmptySvcTargetGroup(svc)
+
+ for _, port := range svc.Spec.Ports {
+ portNum := strconv.FormatInt(int64(port.Port), 10)
+ tgt := &ServiceTarget{
+ tuid: serviceTUID(svc, port),
+ Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum),
+ Namespace: svc.Namespace,
+ Name: svc.Name,
+ Annotations: mapAny(svc.Annotations),
+ Labels: mapAny(svc.Labels),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ ClusterIP: svc.Spec.ClusterIP,
+ ExternalName: svc.Spec.ExternalName,
+ Type: string(svc.Spec.Type),
+ }
+ tgt.hash = mustCalcHash(tgt)
+ tgt.Tags().Merge(discoveryTags)
+ tgg.targets = append(tgg.targets, tgt)
+ }
+
+ return tgg
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go
new file mode 100644
index 000000000..db986b855
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go
@@ -0,0 +1,137 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "k8s.io/client-go/tools/cache"
+)
+
+const (
+ startWaitTimeout = time.Second * 3
+ finishWaitTimeout = time.Second * 5
+)
+
+type discoverySim struct {
+ td *KubeDiscoverer
+ runAfterSync func(ctx context.Context)
+ sortBeforeVerify bool
+ wantTargetGroups []model.TargetGroup
+}
+
+func (sim discoverySim) run(t *testing.T) []model.TargetGroup {
+ t.Helper()
+ require.NotNil(t, sim.td)
+ require.NotEmpty(t, sim.wantTargetGroups)
+
+ in, out := make(chan []model.TargetGroup), make(chan []model.TargetGroup)
+ go sim.collectTargetGroups(t, in, out)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ defer cancel()
+ go sim.td.Discover(ctx, in)
+
+ select {
+ case <-sim.td.started:
+ case <-time.After(startWaitTimeout):
+ t.Fatalf("td %s failed to start in %s", sim.td.discoverers, startWaitTimeout)
+ }
+
+ synced := cache.WaitForCacheSync(ctx.Done(), sim.td.hasSynced)
+ require.Truef(t, synced, "td %s failed to sync", sim.td.discoverers)
+
+ if sim.runAfterSync != nil {
+ sim.runAfterSync(ctx)
+ }
+
+ groups := <-out
+
+ if sim.sortBeforeVerify {
+ sortTargetGroups(groups)
+ }
+
+ sim.verifyResult(t, groups)
+ return groups
+}
+
+func (sim discoverySim) collectTargetGroups(t *testing.T, in, out chan []model.TargetGroup) {
+ var tggs []model.TargetGroup
+loop:
+ for {
+ select {
+ case inGroups := <-in:
+ if tggs = append(tggs, inGroups...); len(tggs) >= len(sim.wantTargetGroups) {
+ break loop
+ }
+ case <-time.After(finishWaitTimeout):
+ t.Logf("td %s timed out after %s, got %d groups, expected %d, some events are skipped",
+ sim.td.discoverers, finishWaitTimeout, len(tggs), len(sim.wantTargetGroups))
+ break loop
+ }
+ }
+ out <- tggs
+}
+
+func (sim discoverySim) verifyResult(t *testing.T, result []model.TargetGroup) {
+ var expected, actual any
+
+ if len(sim.wantTargetGroups) == len(result) {
+ expected = sim.wantTargetGroups
+ actual = result
+ } else {
+ want := make(map[string]model.TargetGroup)
+ for _, group := range sim.wantTargetGroups {
+ want[group.Source()] = group
+ }
+ got := make(map[string]model.TargetGroup)
+ for _, group := range result {
+ got[group.Source()] = group
+ }
+ expected, actual = want, got
+ }
+
+ assert.Equal(t, expected, actual)
+}
+
+type hasSynced interface {
+ hasSynced() bool
+}
+
+var (
+ _ hasSynced = &KubeDiscoverer{}
+ _ hasSynced = &podDiscoverer{}
+ _ hasSynced = &serviceDiscoverer{}
+)
+
+func (d *KubeDiscoverer) hasSynced() bool {
+ for _, disc := range d.discoverers {
+ v, ok := disc.(hasSynced)
+ if !ok || !v.hasSynced() {
+ return false
+ }
+ }
+ return true
+}
+
+func (p *podDiscoverer) hasSynced() bool {
+ return p.podInformer.HasSynced() && p.cmapInformer.HasSynced() && p.secretInformer.HasSynced()
+}
+
+func (s *serviceDiscoverer) hasSynced() bool {
+ return s.informer.HasSynced()
+}
+
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
new file mode 100644
index 000000000..bfd7a99b8
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
@@ -0,0 +1,326 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package netlisteners
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "net"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/executable"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/ilyam8/hashstructure"
+)
+
+var (
+ shortName = "net_listeners"
+ fullName = fmt.Sprintf("sd:%s", shortName)
+)
+
+func NewDiscoverer(cfg Config) (*Discoverer, error) {
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ dir := os.Getenv("NETDATA_PLUGINS_DIR")
+ if dir == "" {
+ dir = executable.Directory
+ }
+ if dir == "" {
+ dir, _ = os.Getwd()
+ }
+
+ d := &Discoverer{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", shortName),
+ ),
+ cfgSource: cfg.Source,
+ ll: &localListenersExec{
+ binPath: filepath.Join(dir, "local-listeners"),
+ timeout: time.Second * 5,
+ },
+ interval: time.Minute * 2,
+ expiryTime: time.Minute * 10,
+ cache: make(map[uint64]*cacheItem),
+ started: make(chan struct{}),
+ }
+
+ d.Tags().Merge(tags)
+
+ return d, nil
+}
+
+type Config struct {
+ Source string `yaml:"-"`
+ Tags string `yaml:"tags"`
+}
+
+type (
+ Discoverer struct {
+ *logger.Logger
+ model.Base
+
+ cfgSource string
+
+ interval time.Duration
+ ll localListeners
+
+ expiryTime time.Duration
+ cache map[uint64]*cacheItem // [target.Hash]
+
+ started chan struct{}
+ }
+ cacheItem struct {
+ lastSeenTime time.Time
+ tgt model.Target
+ }
+ localListeners interface {
+ discover(ctx context.Context) ([]byte, error)
+ }
+)
+
+func (d *Discoverer) String() string {
+ return fullName
+}
+
+func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ close(d.started)
+
+ if err := d.discoverLocalListeners(ctx, in); err != nil {
+ d.Error(err)
+ return
+ }
+
+ tk := time.NewTicker(d.interval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.discoverLocalListeners(ctx, in); err != nil {
+ d.Warning(err)
+ return
+ }
+ }
+ }
+}
+
+func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error {
+ bs, err := d.ll.discover(ctx)
+ if err != nil {
+ if errors.Is(err, context.Canceled) {
+ return nil
+ }
+ return err
+ }
+
+ tgts, err := d.parseLocalListeners(bs)
+ if err != nil {
+ return err
+ }
+
+ tggs := d.processTargets(tgts)
+
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+
+ return nil
+}
+
+func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup {
+ tgg := &targetGroup{
+ provider: fullName,
+ source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ if d.expiryTime.Milliseconds() == 0 {
+ tgg.targets = tgts
+ return []model.TargetGroup{tgg}
+ }
+
+ now := time.Now()
+
+ for _, tgt := range tgts {
+ hash := tgt.Hash()
+ if _, ok := d.cache[hash]; !ok {
+ d.cache[hash] = &cacheItem{tgt: tgt}
+ }
+ d.cache[hash].lastSeenTime = now
+ }
+
+ for k, v := range d.cache {
+ if now.Sub(v.lastSeenTime) > d.expiryTime {
+ delete(d.cache, k)
+ continue
+ }
+ tgg.targets = append(tgg.targets, v.tgt)
+ }
+
+ return []model.TargetGroup{tgg}
+}
+
+func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
+ const (
+ local4 = "127.0.0.1"
+ local6 = "::1"
+ )
+
+ var targets []target
+ sc := bufio.NewScanner(bytes.NewReader(bs))
+
+ for sc.Scan() {
+ text := strings.TrimSpace(sc.Text())
+ if text == "" {
+ continue
+ }
+
+ // Protocol|IPAddress|Port|Cmdline
+ parts := strings.SplitN(text, "|", 4)
+ if len(parts) != 4 {
+ return nil, fmt.Errorf("unexpected data: '%s'", text)
+ }
+
+ tgt := target{
+ Protocol: parts[0],
+ IPAddress: parts[1],
+ Port: parts[2],
+ Comm: extractComm(parts[3]),
+ Cmdline: parts[3],
+ }
+
+ if tgt.Comm == "docker-proxy" {
+ continue
+ }
+
+ if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") {
+ tgt.IPAddress = local4
+ } else if tgt.IPAddress == "::" {
+ tgt.IPAddress = local6
+ }
+
+ // quick support for https://github.com/netdata/netdata/pull/17866
+ // TODO: create both ipv4 and ipv6 targets?
+ if tgt.IPAddress == "*" {
+ tgt.IPAddress = local4
+ }
+
+ tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port)
+
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+
+ tgt.hash = hash
+ tgt.Tags().Merge(d.Tags())
+
+ targets = append(targets, tgt)
+ }
+
+ // order: TCP, TCP6, UDP, UDP6
+ sort.Slice(targets, func(i, j int) bool {
+ tgt1, tgt2 := targets[i], targets[j]
+ if tgt1.Protocol != tgt2.Protocol {
+ return tgt1.Protocol < tgt2.Protocol
+ }
+
+ p1, _ := strconv.Atoi(targets[i].Port)
+ p2, _ := strconv.Atoi(targets[j].Port)
+ if p1 != p2 {
+ return p1 < p2
+ }
+
+ return tgt1.IPAddress == local4 || tgt1.IPAddress == local6
+ })
+
+ seen := make(map[string]bool)
+ tgts := make([]model.Target, len(targets))
+ var n int
+
+ for _, tgt := range targets {
+ tgt := tgt
+
+ proto := strings.TrimSuffix(tgt.Protocol, "6")
+ key := tgt.Protocol + ":" + tgt.Address
+ keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port)
+ keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port)
+
+ // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface (a.b.c.d).
+ // Create a target only for localhost. Assumption: any address always goes first.
+ if seen[key] || seen[keyLocal4] || seen[keyLocal6] {
+ continue
+ }
+ seen[key] = true
+
+ tgts[n] = &tgt
+ n++
+ }
+
+ return tgts[:n], nil
+}
+
+type localListenersExec struct {
+ binPath string
+ timeout time.Duration
+}
+
+func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) {
+ execCtx, cancel := context.WithTimeout(ctx, e.timeout)
+ defer cancel()
+
+ // TCPv4/6 and UPDv4 sockets in LISTEN state
+ // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c
+ args := []string{
+ "no-udp6",
+ "no-local",
+ "no-inbound",
+ "no-outbound",
+ "no-namespaces",
+ }
+
+ cmd := exec.CommandContext(execCtx, e.binPath, args...)
+
+ bs, err := cmd.Output()
+ if err != nil {
+ return nil, fmt.Errorf("error on executing '%s': %v", cmd, err)
+ }
+
+ return bs, nil
+}
+
+func extractComm(cmdLine string) string {
+ if i := strings.IndexByte(cmdLine, ' '); i != -1 {
+ cmdLine = cmdLine[:i]
+ }
+ _, comm := filepath.Split(cmdLine)
+ return strings.TrimSuffix(comm, ":")
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go
new file mode 100644
index 000000000..a94879f09
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package netlisteners
+
+import (
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+)
+
+func TestDiscoverer_Discover(t *testing.T) {
+ tests := map[string]discoverySim{
+ "add listeners": {
+ listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
+ cli.addListener("UDP|127.0.0.1|323|/usr/sbin/chronyd")
+ cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|2001:DB8::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|0.0.0.0|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|192.0.2.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ cli.addListener("TCP46|*|80|/usr/sbin/httpd -k start")
+ cli.addListener("TCP6|::|80|/usr/sbin/apache2 -k start")
+ cli.addListener("TCP|0.0.0.0|80|/usr/sbin/apache2 -k start")
+ cli.addListener("TCP|0.0.0.0|8080|/usr/sbin/docker-proxy -proto tcp -host-ip 0.0.0.0 -host-port 8080 -container-ip 172.17.0.4 -container-port 80")
+ cli.addListener("TCP6|::|8080|/usr/sbin/docker-proxy -proto tcp -host-ip :: -host-port 8080 -container-ip 172.17.0.4 -container-port 80")
+ time.Sleep(interval * 2)
+ },
+ wantGroups: []model.TargetGroup{&targetGroup{
+ provider: "sd:net_listeners",
+ source: "discoverer=net_listeners,host=localhost",
+ targets: []model.Target{
+ withHash(&target{
+ Protocol: "UDP",
+ IPAddress: "127.0.0.1",
+ Port: "323",
+ Address: "127.0.0.1:323",
+ Comm: "chronyd",
+ Cmdline: "/usr/sbin/chronyd",
+ }),
+ withHash(&target{
+ Protocol: "TCP46",
+ IPAddress: "127.0.0.1",
+ Port: "80",
+ Address: "127.0.0.1:80",
+ Comm: "httpd",
+ Cmdline: "/usr/sbin/httpd -k start",
+ }),
+ withHash(&target{
+ Protocol: "TCP",
+ IPAddress: "127.0.0.1",
+ Port: "80",
+ Address: "127.0.0.1:80",
+ Comm: "apache2",
+ Cmdline: "/usr/sbin/apache2 -k start",
+ }),
+ withHash(&target{
+ Protocol: "TCP",
+ IPAddress: "127.0.0.1",
+ Port: "8125",
+ Address: "127.0.0.1:8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "UDP",
+ IPAddress: "127.0.0.1",
+ Port: "53768",
+ Address: "127.0.0.1:53768",
+ Comm: "go.d.plugin",
+ Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1",
+ }),
+ withHash(&target{
+ Protocol: "UDP6",
+ IPAddress: "::1",
+ Port: "8125",
+ Address: "[::1]:8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ },
+ }},
+ },
+ "remove listeners; not expired": {
+ listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
+ cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ },
+ wantGroups: []model.TargetGroup{&targetGroup{
+ provider: "sd:net_listeners",
+ source: "discoverer=net_listeners,host=localhost",
+ targets: []model.Target{
+ withHash(&target{
+ Protocol: "UDP6",
+ IPAddress: "::1",
+ Port: "8125",
+ Address: "[::1]:8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "TCP",
+ IPAddress: "127.0.0.1",
+ Port: "8125",
+ Address: "127.0.0.1:8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ withHash(&target{
+ Protocol: "UDP",
+ IPAddress: "127.0.0.1",
+ Port: "53768",
+ Address: "127.0.0.1:53768",
+ Comm: "go.d.plugin",
+ Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1",
+ }),
+ },
+ }},
+ },
+ "remove listeners; expired": {
+ listenersCli: func(cli listenersCli, interval, expiry time.Duration) {
+ cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(interval * 2)
+ cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D")
+ cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1")
+ time.Sleep(expiry * 2)
+ },
+ wantGroups: []model.TargetGroup{&targetGroup{
+ provider: "sd:net_listeners",
+ source: "discoverer=net_listeners,host=localhost",
+ targets: []model.Target{
+ withHash(&target{
+ Protocol: "TCP",
+ IPAddress: "127.0.0.1",
+ Port: "8125",
+ Address: "127.0.0.1:8125",
+ Comm: "netdata",
+ Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D",
+ }),
+ },
+ }},
+ },
+ }
+
+ for name, sim := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim.run(t)
+ })
+ }
+}
+
+func withHash(l *target) *target {
+ l.hash, _ = calcHash(l)
+ tags, _ := model.ParseTags("netlisteners")
+ l.Tags().Merge(tags)
+ return l
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
new file mode 100644
index 000000000..ad90f8278
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go
@@ -0,0 +1,167 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package netlisteners
+
+import (
+ "context"
+ "errors"
+ "slices"
+ "sort"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type listenersCli interface {
+ addListener(s string)
+ removeListener(s string)
+}
+
+type discoverySim struct {
+ listenersCli func(cli listenersCli, interval, expiry time.Duration)
+ wantGroups []model.TargetGroup
+}
+
+func (sim *discoverySim) run(t *testing.T) {
+ d, err := NewDiscoverer(Config{
+ Source: "",
+ Tags: "netlisteners",
+ })
+ require.NoError(t, err)
+
+ mock := newMockLocalListenersExec()
+
+ d.ll = mock
+
+ d.interval = time.Millisecond * 100
+ d.expiryTime = time.Second * 1
+
+ seen := make(map[string]model.TargetGroup)
+ ctx, cancel := context.WithCancel(context.Background())
+ in := make(chan []model.TargetGroup)
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ d.Discover(ctx, in)
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case tggs := <-in:
+ for _, tgg := range tggs {
+ seen[tgg.Source()] = tgg
+ }
+ }
+ }
+ }()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ wg.Wait()
+ }()
+
+ select {
+ case <-d.started:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery failed to start")
+ }
+
+ sim.listenersCli(mock, d.interval, d.expiryTime)
+
+ cancel()
+
+ select {
+ case <-done:
+ case <-time.After(time.Second * 3):
+ require.Fail(t, "discovery hasn't finished after cancel")
+ }
+
+ var tggs []model.TargetGroup
+ for _, tgg := range seen {
+ tggs = append(tggs, tgg)
+ }
+
+ sortTargetGroups(tggs)
+ sortTargetGroups(sim.wantGroups)
+
+ wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs)
+ assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen)
+ assert.Equal(t, sim.wantGroups, tggs)
+}
+
+func newMockLocalListenersExec() *mockLocalListenersExec {
+ return &mockLocalListenersExec{}
+}
+
+type mockLocalListenersExec struct {
+ errResponse bool
+ mux sync.Mutex
+ listeners []string
+}
+
+func (m *mockLocalListenersExec) addListener(s string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ m.listeners = append(m.listeners, s)
+}
+
+func (m *mockLocalListenersExec) removeListener(s string) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ if i := slices.Index(m.listeners, s); i != -1 {
+ m.listeners = append(m.listeners[:i], m.listeners[i+1:]...)
+ }
+}
+
+func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) {
+ if m.errResponse {
+ return nil, errors.New("mock discover() error")
+ }
+
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ var buf strings.Builder
+ for _, s := range m.listeners {
+ buf.WriteString(s)
+ buf.WriteByte('\n')
+ }
+
+ return []byte(buf.String()), nil
+}
+
+func calcTargets(tggs []model.TargetGroup) int {
+ var n int
+ for _, tgg := range tggs {
+ n += len(tgg.Targets())
+ }
+ return n
+}
+
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+
+ for idx := range tggs {
+ tgts := tggs[idx].Targets()
+ sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go
new file mode 100644
index 000000000..a36620f32
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package netlisteners
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+)
+
+type targetGroup struct {
+ provider string
+ source string
+ targets []model.Target
+}
+
+func (g *targetGroup) Provider() string { return g.provider }
+func (g *targetGroup) Source() string { return g.source }
+func (g *targetGroup) Targets() []model.Target { return g.targets }
+
+type target struct {
+ model.Base
+
+ hash uint64
+
+ Protocol string
+ IPAddress string
+ Port string
+ Comm string
+ Cmdline string
+
+ Address string // "IPAddress:Port"
+}
+
+func (t *target) TUID() string { return tuid(t) }
+func (t *target) Hash() uint64 { return t.hash }
+
+func tuid(tgt *target) string {
+ return fmt.Sprintf("%s_%s_%d", strings.ToLower(tgt.Protocol), tgt.Port, tgt.hash)
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/model/discoverer.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/discoverer.go
new file mode 100644
index 000000000..301322d32
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/discoverer.go
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package model
+
+import (
+ "context"
+)
+
+type Discoverer interface {
+ Discover(ctx context.Context, ch chan<- []TargetGroup)
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags.go
new file mode 100644
index 000000000..22517d77e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags.go
@@ -0,0 +1,87 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package model
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+)
+
+type Base struct {
+ tags Tags
+}
+
+func (b *Base) Tags() Tags {
+ if b.tags == nil {
+ b.tags = NewTags()
+ }
+ return b.tags
+}
+
+type Tags map[string]struct{}
+
+func NewTags() Tags {
+ return Tags{}
+}
+
+func (t Tags) Merge(tags Tags) {
+ for tag := range tags {
+ if strings.HasPrefix(tag, "-") {
+ delete(t, tag[1:])
+ } else {
+ t[tag] = struct{}{}
+ }
+ }
+}
+
+func (t Tags) Clone() Tags {
+ ts := NewTags()
+ ts.Merge(t)
+ return ts
+}
+
+func (t Tags) String() string {
+ ts := make([]string, 0, len(t))
+ for key := range t {
+ ts = append(ts, key)
+ }
+ sort.Strings(ts)
+ return fmt.Sprintf("{%s}", strings.Join(ts, ", "))
+}
+
+func ParseTags(line string) (Tags, error) {
+ words := strings.Fields(line)
+ if len(words) == 0 {
+ return NewTags(), nil
+ }
+
+ tags := NewTags()
+ for _, tag := range words {
+ if !isTagWordValid(tag) {
+ return nil, fmt.Errorf("tags '%s' contains tag '%s' with forbidden symbol", line, tag)
+ }
+ tags[tag] = struct{}{}
+ }
+ return tags, nil
+}
+
+func isTagWordValid(word string) bool {
+ // valid:
+ // ^[a-zA-Z][a-zA-Z0-9=_.]*$
+ word = strings.TrimPrefix(word, "-")
+ if len(word) == 0 {
+ return false
+ }
+ for i, b := range word {
+ switch {
+ default:
+ return false
+ case b >= 'a' && b <= 'z':
+ case b >= 'A' && b <= 'Z':
+ case b >= '0' && b <= '9' && i > 0:
+ case (b == '=' || b == '_' || b == '.') && i > 0:
+ }
+ }
+ return true
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags_test.go
new file mode 100644
index 000000000..4f07bcbf6
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/tags_test.go
@@ -0,0 +1,3 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package model
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/model/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/target.go
new file mode 100644
index 000000000..eb2bd9d51
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/model/target.go
@@ -0,0 +1,15 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package model
+
+type Target interface {
+ Hash() uint64
+ Tags() Tags
+ TUID() string
+}
+
+type TargetGroup interface {
+ Targets() []Target
+ Provider() string
+ Source() string
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go
new file mode 100644
index 000000000..a84212734
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go
@@ -0,0 +1,152 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func newAccumulator() *accumulator {
+ return &accumulator{
+ send: make(chan struct{}, 1),
+ sendEvery: time.Second * 2,
+ mux: &sync.Mutex{},
+ tggs: make(map[string]model.TargetGroup),
+ }
+}
+
+type accumulator struct {
+ *logger.Logger
+ discoverers []model.Discoverer
+ send chan struct{}
+ sendEvery time.Duration
+ mux *sync.Mutex
+ tggs map[string]model.TargetGroup
+}
+
+func (a *accumulator) run(ctx context.Context, in chan []model.TargetGroup) {
+ updates := make(chan []model.TargetGroup)
+
+ var wg sync.WaitGroup
+ for _, d := range a.discoverers {
+ wg.Add(1)
+ d := d
+ go func() { defer wg.Done(); a.runDiscoverer(ctx, d, updates) }()
+ }
+
+ done := make(chan struct{})
+ go func() { defer close(done); wg.Wait() }()
+
+ tk := time.NewTicker(a.sendEvery)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ select {
+ case <-done:
+ a.Info("all discoverers exited")
+ case <-time.After(time.Second * 3):
+ a.Warning("not all discoverers exited")
+ }
+ a.trySend(in)
+ return
+ case <-done:
+ if !isDone(ctx) {
+ a.Info("all discoverers exited before ctx done")
+ } else {
+ a.Info("all discoverers exited")
+ }
+ a.trySend(in)
+ return
+ case <-tk.C:
+ select {
+ case <-a.send:
+ a.trySend(in)
+ default:
+ }
+ }
+ }
+}
+
+func (a *accumulator) runDiscoverer(ctx context.Context, d model.Discoverer, updates chan []model.TargetGroup) {
+ done := make(chan struct{})
+ go func() { defer close(done); d.Discover(ctx, updates) }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ select {
+ case <-done:
+ case <-time.After(time.Second * 2):
+ a.Warningf("discoverer '%v' didn't exit on ctx done", d)
+ }
+ return
+ case <-done:
+ if !isDone(ctx) {
+ a.Infof("discoverer '%v' exited before ctx done", d)
+ }
+ return
+ case tggs := <-updates:
+ a.mux.Lock()
+ a.groupsUpdate(tggs)
+ a.mux.Unlock()
+ a.triggerSend()
+ }
+ }
+}
+
+func (a *accumulator) trySend(in chan<- []model.TargetGroup) {
+ a.mux.Lock()
+ defer a.mux.Unlock()
+
+ select {
+ case in <- a.groupsList():
+ a.groupsReset()
+ default:
+ a.triggerSend()
+ }
+}
+
+func (a *accumulator) triggerSend() {
+ select {
+ case a.send <- struct{}{}:
+ default:
+ }
+}
+
+func (a *accumulator) groupsUpdate(tggs []model.TargetGroup) {
+ for _, tgg := range tggs {
+ a.tggs[tgg.Source()] = tgg
+ }
+}
+
+func (a *accumulator) groupsReset() {
+ for key := range a.tggs {
+ delete(a.tggs, key)
+ }
+}
+
+func (a *accumulator) groupsList() []model.TargetGroup {
+ tggs := make([]model.TargetGroup, 0, len(a.tggs))
+ for _, tgg := range a.tggs {
+ if tgg != nil {
+ tggs = append(tggs, tgg)
+ }
+ }
+ return tggs
+}
+
+func isDone(ctx context.Context) bool {
+ select {
+ case <-ctx.Done():
+ return true
+ default:
+ return false
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go
new file mode 100644
index 000000000..bd686b306
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go
@@ -0,0 +1,132 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "text/template"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func newTargetClassificator(cfg []ClassifyRuleConfig) (*targetClassificator, error) {
+ rules, err := newClassifyRules(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ c := &targetClassificator{
+ rules: rules,
+ buf: bytes.Buffer{},
+ }
+
+ return c, nil
+}
+
+type (
+ targetClassificator struct {
+ *logger.Logger
+ rules []*classifyRule
+ buf bytes.Buffer
+ }
+
+ classifyRule struct {
+ name string
+ sr selector
+ tags model.Tags
+ match []*classifyRuleMatch
+ }
+ classifyRuleMatch struct {
+ tags model.Tags
+ expr *template.Template
+ }
+)
+
+func (c *targetClassificator) classify(tgt model.Target) model.Tags {
+ tgtTags := tgt.Tags().Clone()
+ var tags model.Tags
+
+ for i, rule := range c.rules {
+ if !rule.sr.matches(tgtTags) {
+ continue
+ }
+
+ for j, match := range rule.match {
+ c.buf.Reset()
+
+ if err := match.expr.Execute(&c.buf, tgt); err != nil {
+ c.Warningf("failed to execute classify rule[%d]->match[%d]->expr on target '%s'", i+1, j+1, tgt.TUID())
+ continue
+ }
+ if strings.TrimSpace(c.buf.String()) != "true" {
+ continue
+ }
+
+ if tags == nil {
+ tags = model.NewTags()
+ }
+
+ tags.Merge(rule.tags)
+ tags.Merge(match.tags)
+ tgtTags.Merge(tags)
+ }
+ }
+
+ return tags
+}
+
+func newClassifyRules(cfg []ClassifyRuleConfig) ([]*classifyRule, error) {
+ var rules []*classifyRule
+
+ fmap := newFuncMap()
+
+ for i, ruleCfg := range cfg {
+ i++
+ rule := classifyRule{name: ruleCfg.Name}
+
+ sr, err := parseSelector(ruleCfg.Selector)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d': %v", i, err)
+ }
+ rule.sr = sr
+
+ tags, err := model.ParseTags(ruleCfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d': %v", i, err)
+ }
+ rule.tags = tags
+
+ for j, matchCfg := range ruleCfg.Match {
+ j++
+ var match classifyRuleMatch
+
+ tags, err := model.ParseTags(matchCfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err)
+ }
+ match.tags = tags
+
+ tmpl, err := parseTemplate(matchCfg.Expr, fmap)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err)
+ }
+ match.expr = tmpl
+
+ rule.match = append(rule.match, &match)
+ }
+
+ rules = append(rules, &rule)
+ }
+
+ return rules, nil
+}
+
+func parseTemplate(s string, fmap template.FuncMap) (*template.Template, error) {
+ return template.New("root").
+ Option("missingkey=error").
+ Funcs(fmap).
+ Parse(s)
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go
new file mode 100644
index 000000000..214c96cf7
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go
@@ -0,0 +1,83 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v2"
+)
+
+func TestTargetClassificator_classify(t *testing.T) {
+ config := `
+- selector: "rule0"
+ tags: "skip"
+ match:
+ - tags: "skip"
+ expr: '{{ glob .Name "*" }}'
+- selector: "!skip rule1"
+ tags: "foo1"
+ match:
+ - tags: "bar1"
+ expr: '{{ glob .Name "mock*1*" }}'
+ - tags: "bar2"
+ expr: '{{ glob .Name "mock*2*" }}'
+- selector: "!skip rule2"
+ tags: "foo2"
+ match:
+ - tags: "bar3"
+ expr: '{{ glob .Name "mock*3*" }}'
+ - tags: "bar4"
+ expr: '{{ glob .Name "mock*4*" }}'
+- selector: "rule3"
+ tags: "foo3"
+ match:
+ - tags: "bar5"
+ expr: '{{ glob .Name "mock*5*" }}'
+ - tags: "bar6"
+ expr: '{{ glob .Name "mock*6*" }}'
+`
+ tests := map[string]struct {
+ target model.Target
+ wantTags model.Tags
+ }{
+ "no rules match": {
+ target: newMockTarget("mock1"),
+ wantTags: nil,
+ },
+ "one rule one match": {
+ target: newMockTarget("mock4", "rule2"),
+ wantTags: mustParseTags("foo2 bar4"),
+ },
+ "one rule two match": {
+ target: newMockTarget("mock56", "rule3"),
+ wantTags: mustParseTags("foo3 bar5 bar6"),
+ },
+ "all rules all matches": {
+ target: newMockTarget("mock123456", "rule1 rule2 rule3"),
+ wantTags: mustParseTags("foo1 foo2 foo3 bar1 bar2 bar3 bar4 bar5 bar6"),
+ },
+ "applying labels after every rule": {
+ target: newMockTarget("mock123456", "rule0 rule1 rule2 rule3"),
+ wantTags: mustParseTags("skip foo3 bar5 bar6"),
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ var cfg []ClassifyRuleConfig
+
+ err := yaml.Unmarshal([]byte(config), &cfg)
+ require.NoError(t, err, "yaml unmarshalling of config")
+
+ clr, err := newTargetClassificator(cfg)
+ require.NoError(t, err, "targetClassificator creation")
+
+ assert.Equal(t, test.wantTags, clr.classify(test.target))
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go
new file mode 100644
index 000000000..de2ed21b8
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go
@@ -0,0 +1,157 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "text/template"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "gopkg.in/yaml.v2"
+)
+
+func newConfigComposer(cfg []ComposeRuleConfig) (*configComposer, error) {
+ rules, err := newComposeRules(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ c := &configComposer{
+ rules: rules,
+ buf: bytes.Buffer{},
+ }
+
+ return c, nil
+}
+
+type (
+ configComposer struct {
+ *logger.Logger
+ rules []*composeRule
+ buf bytes.Buffer
+ }
+
+ composeRule struct {
+ name string
+ sr selector
+ conf []*composeRuleConf
+ }
+ composeRuleConf struct {
+ sr selector
+ tmpl *template.Template
+ }
+)
+
+func (c *configComposer) compose(tgt model.Target) []confgroup.Config {
+ var configs []confgroup.Config
+
+ for i, rule := range c.rules {
+ if !rule.sr.matches(tgt.Tags()) {
+ continue
+ }
+
+ for j, conf := range rule.conf {
+ if !conf.sr.matches(tgt.Tags()) {
+ continue
+ }
+
+ c.buf.Reset()
+
+ if err := conf.tmpl.Execute(&c.buf, tgt); err != nil {
+ c.Warningf("failed to execute rule[%d]->config[%d]->template on target '%s': %v",
+ i+1, j+1, tgt.TUID(), err)
+ continue
+ }
+ if c.buf.Len() == 0 {
+ continue
+ }
+
+ cfgs, err := c.parseTemplateData(c.buf.Bytes())
+ if err != nil {
+ c.Warningf("failed to parse template data: %v", err)
+ continue
+ }
+
+ configs = append(configs, cfgs...)
+ }
+ }
+
+ if len(configs) > 0 {
+ c.Debugf("created %d config(s) for target '%s'", len(configs), tgt.TUID())
+ }
+ return configs
+}
+
+func (c *configComposer) parseTemplateData(bs []byte) ([]confgroup.Config, error) {
+ var data any
+ if err := yaml.Unmarshal(bs, &data); err != nil {
+ return nil, err
+ }
+
+ type (
+ single = map[any]any
+ multi = []any
+ )
+
+ switch data.(type) {
+ case single:
+ var cfg confgroup.Config
+ if err := yaml.Unmarshal(bs, &cfg); err != nil {
+ return nil, err
+ }
+ return []confgroup.Config{cfg}, nil
+ case multi:
+ var cfgs []confgroup.Config
+ if err := yaml.Unmarshal(bs, &cfgs); err != nil {
+ return nil, err
+ }
+ return cfgs, nil
+ default:
+ return nil, errors.New("unknown config format")
+ }
+}
+
+func newComposeRules(cfg []ComposeRuleConfig) ([]*composeRule, error) {
+ var rules []*composeRule
+
+ fmap := newFuncMap()
+
+ for i, ruleCfg := range cfg {
+ i++
+ rule := composeRule{name: ruleCfg.Name}
+
+ sr, err := parseSelector(ruleCfg.Selector)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d': %v", i, err)
+ }
+ rule.sr = sr
+
+ for j, confCfg := range ruleCfg.Config {
+ j++
+ var conf composeRuleConf
+
+ sr, err := parseSelector(confCfg.Selector)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err)
+ }
+ conf.sr = sr
+
+ tmpl, err := parseTemplate(confCfg.Template, fmap)
+ if err != nil {
+ return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err)
+ }
+ conf.tmpl = tmpl
+
+ rule.conf = append(rule.conf, &conf)
+ }
+
+ rules = append(rules, &rule)
+ }
+
+ return rules, nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go
new file mode 100644
index 000000000..fa758bcd3
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go
@@ -0,0 +1,92 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v2"
+)
+
+func TestConfigComposer_compose(t *testing.T) {
+ config := `
+- selector: "rule1"
+ config:
+ - selector: "bar1"
+ template: |
+ name: {{ .Name }}-1
+ - selector: "bar2"
+ template: |
+ name: {{ .Name }}-2
+- selector: "rule2"
+ config:
+ - selector: "bar3"
+ template: |
+ name: {{ .Name }}-3
+ - selector: "bar4"
+ template: |
+ name: {{ .Name }}-4
+- selector: "rule3"
+ config:
+ - selector: "bar5"
+ template: |
+ name: {{ .Name }}-5
+ - selector: "bar6"
+ template: |
+ - name: {{ .Name }}-6
+ - name: {{ .Name }}-7
+`
+ tests := map[string]struct {
+ target model.Target
+ wantConfigs []confgroup.Config
+ }{
+ "no rules matches": {
+ target: newMockTarget("mock"),
+ wantConfigs: nil,
+ },
+ "one rule one config": {
+ target: newMockTarget("mock", "rule1 bar1"),
+ wantConfigs: []confgroup.Config{
+ {"name": "mock-1"},
+ },
+ },
+ "one rule two config": {
+ target: newMockTarget("mock", "rule2 bar3 bar4"),
+ wantConfigs: []confgroup.Config{
+ {"name": "mock-3"},
+ {"name": "mock-4"},
+ },
+ },
+ "all rules all configs": {
+ target: newMockTarget("mock", "rule1 bar1 bar2 rule2 bar3 bar4 rule3 bar5 bar6"),
+ wantConfigs: []confgroup.Config{
+ {"name": "mock-1"},
+ {"name": "mock-2"},
+ {"name": "mock-3"},
+ {"name": "mock-4"},
+ {"name": "mock-5"},
+ {"name": "mock-6"},
+ {"name": "mock-7"},
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ var cfg []ComposeRuleConfig
+
+ err := yaml.Unmarshal([]byte(config), &cfg)
+ require.NoErrorf(t, err, "yaml unmarshalling of config")
+
+ cmr, err := newConfigComposer(cfg)
+ require.NoErrorf(t, err, "configComposer creation")
+
+ assert.Equal(t, test.wantConfigs, cmr.compose(test.target))
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go
new file mode 100644
index 000000000..4dac63f0f
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go
@@ -0,0 +1,136 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/dockerd"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/kubernetes"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/netlisteners"
+)
+
+type Config struct {
+ Source string `yaml:"-"`
+ ConfigDefaults confgroup.Registry `yaml:"-"`
+
+ Disabled bool `yaml:"disabled"`
+ Name string `yaml:"name"`
+ Discover []DiscoveryConfig `yaml:"discover"`
+ Classify []ClassifyRuleConfig `yaml:"classify"`
+ Compose []ComposeRuleConfig `yaml:"compose"`
+}
+
+type DiscoveryConfig struct {
+ Discoverer string `yaml:"discoverer"`
+ NetListeners netlisteners.Config `yaml:"net_listeners"`
+ Docker dockerd.Config `yaml:"docker"`
+ K8s []kubernetes.Config `yaml:"k8s"`
+}
+
+type ClassifyRuleConfig struct {
+ Name string `yaml:"name"`
+ Selector string `yaml:"selector"` // mandatory
+ Tags string `yaml:"tags"` // mandatory
+ Match []struct {
+ Tags string `yaml:"tags"` // mandatory
+ Expr string `yaml:"expr"` // mandatory
+ } `yaml:"match"` // mandatory, at least 1
+}
+
+type ComposeRuleConfig struct {
+ Name string `yaml:"name"` // optional
+ Selector string `yaml:"selector"` // mandatory
+ Config []struct {
+ Selector string `yaml:"selector"` // mandatory
+ Template string `yaml:"template"` // mandatory
+ } `yaml:"config"` // mandatory, at least 1
+}
+
+func validateConfig(cfg Config) error {
+ if cfg.Name == "" {
+ return errors.New("'name' not set")
+ }
+ if err := validateDiscoveryConfig(cfg.Discover); err != nil {
+ return fmt.Errorf("discover config: %v", err)
+ }
+ if err := validateClassifyConfig(cfg.Classify); err != nil {
+ return fmt.Errorf("classify rules: %v", err)
+ }
+ if err := validateComposeConfig(cfg.Compose); err != nil {
+ return fmt.Errorf("compose rules: %v", err)
+ }
+ return nil
+}
+
+func validateDiscoveryConfig(config []DiscoveryConfig) error {
+ if len(config) == 0 {
+ return errors.New("no discoverers, must be at least one")
+ }
+ for _, cfg := range config {
+ switch cfg.Discoverer {
+ case "net_listeners", "docker", "k8s":
+ default:
+ return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer)
+ }
+ }
+ return nil
+}
+
+func validateClassifyConfig(rules []ClassifyRuleConfig) error {
+ if len(rules) == 0 {
+ return errors.New("empty config, need least 1 rule")
+ }
+ for i, rule := range rules {
+ i++
+ if rule.Selector == "" {
+ return fmt.Errorf("'rule[%s][%d]->selector' not set", rule.Name, i)
+ }
+ if rule.Tags == "" {
+ return fmt.Errorf("'rule[%s][%d]->tags' not set", rule.Name, i)
+ }
+ if len(rule.Match) == 0 {
+ return fmt.Errorf("'rule[%s][%d]->match' not set, need at least 1 rule match", rule.Name, i)
+ }
+
+ for j, match := range rule.Match {
+ j++
+ if match.Tags == "" {
+ return fmt.Errorf("'rule[%s][%d]->match[%d]->tags' not set", rule.Name, i, j)
+ }
+ if match.Expr == "" {
+ return fmt.Errorf("'rule[%s][%d]->match[%d]->expr' not set", rule.Name, i, j)
+ }
+ }
+ }
+ return nil
+}
+
+func validateComposeConfig(rules []ComposeRuleConfig) error {
+ if len(rules) == 0 {
+ return errors.New("empty config, need least 1 rule")
+ }
+ for i, rule := range rules {
+ i++
+ if rule.Selector == "" {
+ return fmt.Errorf("'rule[%s][%d]->selector' not set", rule.Name, i)
+ }
+
+ if len(rule.Config) == 0 {
+ return fmt.Errorf("'rule[%s][%d]->config' not set", rule.Name, i)
+ }
+
+ for j, conf := range rule.Config {
+ j++
+ if conf.Selector == "" {
+ return fmt.Errorf("'rule[%s][%d]->config[%d]->selector' not set", rule.Name, i, j)
+ }
+ if conf.Template == "" {
+ return fmt.Errorf("'rule[%s][%d]->config[%d]->template' not set", rule.Name, i, j)
+ }
+ }
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go
new file mode 100644
index 000000000..8a9698b65
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go
@@ -0,0 +1,63 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "regexp"
+ "strconv"
+ "text/template"
+
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/matcher"
+
+ "github.com/Masterminds/sprig/v3"
+ "github.com/bmatcuk/doublestar/v4"
+)
+
+func newFuncMap() template.FuncMap {
+ custom := map[string]interface{}{
+ "match": funcMatchAny,
+ "glob": func(value, pattern string, patterns ...string) bool {
+ return funcMatchAny("glob", value, pattern, patterns...)
+ },
+ "promPort": func(port string) string {
+ v, _ := strconv.Atoi(port)
+ return prometheusPortAllocations[v]
+ },
+ }
+
+ fm := sprig.HermeticTxtFuncMap()
+
+ for name, fn := range custom {
+ fm[name] = fn
+ }
+
+ return fm
+}
+
+func funcMatchAny(typ, value, pattern string, patterns ...string) bool {
+ switch len(patterns) {
+ case 0:
+ return funcMatch(typ, value, pattern)
+ default:
+ return funcMatch(typ, value, pattern) || funcMatchAny(typ, value, patterns[0], patterns[1:]...)
+ }
+}
+
+func funcMatch(typ string, value, pattern string) bool {
+ switch typ {
+ case "glob", "":
+ m, err := matcher.NewGlobMatcher(pattern)
+ return err == nil && m.MatchString(value)
+ case "sp":
+ m, err := matcher.NewSimplePatternsMatcher(pattern)
+ return err == nil && m.MatchString(value)
+ case "re":
+ ok, err := regexp.MatchString(pattern, value)
+ return err == nil && ok
+ case "dstar":
+ ok, err := doublestar.Match(pattern, value)
+ return err == nil && ok
+ default:
+ return false
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go
new file mode 100644
index 000000000..3de71ef70
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go
@@ -0,0 +1,81 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_funcMatchAny(t *testing.T) {
+ tests := map[string]struct {
+ typ string
+ patterns []string
+ value string
+ wantMatch bool
+ }{
+ "dstar: one param, matches": {
+ wantMatch: true,
+ typ: "dstar",
+ patterns: []string{"*"},
+ value: "value",
+ },
+ "dstar: one param, matches with *": {
+ wantMatch: true,
+ typ: "dstar",
+ patterns: []string{"**/value"},
+ value: "/one/two/three/value",
+ },
+ "dstar: one param, not matches": {
+ wantMatch: false,
+ typ: "dstar",
+ patterns: []string{"Value"},
+ value: "value",
+ },
+ "dstar: several params, last one matches": {
+ wantMatch: true,
+ typ: "dstar",
+ patterns: []string{"not", "matches", "*"},
+ value: "value",
+ },
+ "dstar: several params, no matches": {
+ wantMatch: false,
+ typ: "dstar",
+ patterns: []string{"not", "matches", "really"},
+ value: "value",
+ },
+ "re: one param, matches": {
+ wantMatch: true,
+ typ: "re",
+ patterns: []string{"^value$"},
+ value: "value",
+ },
+ "re: one param, not matches": {
+ wantMatch: false,
+ typ: "re",
+ patterns: []string{"^Value$"},
+ value: "value",
+ },
+ "re: several params, last one matches": {
+ wantMatch: true,
+ typ: "re",
+ patterns: []string{"not", "matches", "va[lue]{3}"},
+ value: "value",
+ },
+ "re: several params, no matches": {
+ wantMatch: false,
+ typ: "re",
+ patterns: []string{"not", "matches", "val[^l]ue"},
+ value: "value",
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ ok := funcMatchAny(test.typ, test.value, test.patterns[0], test.patterns[1:]...)
+
+ assert.Equal(t, test.wantMatch, ok)
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go
new file mode 100644
index 000000000..f69501c39
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go
@@ -0,0 +1,236 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/dockerd"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/kubernetes"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/netlisteners"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/hostinfo"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func New(cfg Config) (*Pipeline, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, err
+ }
+
+ clr, err := newTargetClassificator(cfg.Classify)
+ if err != nil {
+ return nil, fmt.Errorf("classify rules: %v", err)
+ }
+
+ cmr, err := newConfigComposer(cfg.Compose)
+ if err != nil {
+ return nil, fmt.Errorf("compose rules: %v", err)
+ }
+
+ p := &Pipeline{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("pipeline", cfg.Name),
+ ),
+ configDefaults: cfg.ConfigDefaults,
+ clr: clr,
+ cmr: cmr,
+ accum: newAccumulator(),
+ discoverers: make([]model.Discoverer, 0),
+ configs: make(map[string]map[uint64][]confgroup.Config),
+ }
+ p.accum.Logger = p.Logger
+
+ if err := p.registerDiscoverers(cfg); err != nil {
+ return nil, err
+ }
+
+ return p, nil
+}
+
+type (
+ Pipeline struct {
+ *logger.Logger
+
+ configDefaults confgroup.Registry
+ discoverers []model.Discoverer
+ accum *accumulator
+ clr classificator
+ cmr composer
+ configs map[string]map[uint64][]confgroup.Config // [targetSource][targetHash]
+ }
+ classificator interface {
+ classify(model.Target) model.Tags
+ }
+ composer interface {
+ compose(model.Target) []confgroup.Config
+ }
+)
+
+func (p *Pipeline) registerDiscoverers(conf Config) error {
+ for _, cfg := range conf.Discover {
+ switch cfg.Discoverer {
+ case "net_listeners":
+ cfg.NetListeners.Source = conf.Source
+ td, err := netlisteners.NewDiscoverer(cfg.NetListeners)
+ if err != nil {
+ return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err)
+ }
+ p.discoverers = append(p.discoverers, td)
+ case "docker":
+ if hostinfo.IsInsideK8sCluster() {
+ p.Infof("not registering '%s' discoverer: disabled in k8s environment", cfg.Discoverer)
+ continue
+ }
+ cfg.Docker.Source = conf.Source
+ td, err := dockerd.NewDiscoverer(cfg.Docker)
+ if err != nil {
+ return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err)
+ }
+ p.discoverers = append(p.discoverers, td)
+ case "k8s":
+ for _, k8sCfg := range cfg.K8s {
+ td, err := kubernetes.NewKubeDiscoverer(k8sCfg)
+ if err != nil {
+ return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err)
+ }
+ p.discoverers = append(p.discoverers, td)
+ }
+ default:
+ return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer)
+ }
+ }
+
+ if len(p.discoverers) == 0 {
+ return errors.New("no discoverers registered")
+ }
+
+ return nil
+}
+
+func (p *Pipeline) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ p.Info("instance is started")
+ defer p.Info("instance is stopped")
+
+ p.accum.discoverers = p.discoverers
+
+ updates := make(chan []model.TargetGroup)
+ done := make(chan struct{})
+
+ go func() { defer close(done); p.accum.run(ctx, updates) }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ select {
+ case <-done:
+ case <-time.After(time.Second * 4):
+ }
+ return
+ case <-done:
+ return
+ case tggs := <-updates:
+ p.Debugf("received %d target groups", len(tggs))
+ if cfggs := p.processGroups(tggs); len(cfggs) > 0 {
+ select {
+ case <-ctx.Done():
+ case in <- cfggs: // FIXME: potentially stale configs if upstream cannot receive (blocking)
+ }
+ }
+ }
+ }
+}
+
+func (p *Pipeline) processGroups(tggs []model.TargetGroup) []*confgroup.Group {
+ var groups []*confgroup.Group
+ // updates come from the accumulator, this ensures that all groups have different sources
+ for _, tgg := range tggs {
+ p.Debugf("processing group '%s' with %d target(s)", tgg.Source(), len(tgg.Targets()))
+ if v := p.processGroup(tgg); v != nil {
+ groups = append(groups, v)
+ }
+ }
+ return groups
+}
+
+func (p *Pipeline) processGroup(tgg model.TargetGroup) *confgroup.Group {
+ if len(tgg.Targets()) == 0 {
+ if _, ok := p.configs[tgg.Source()]; !ok {
+ return nil
+ }
+ delete(p.configs, tgg.Source())
+
+ return &confgroup.Group{Source: tgg.Source()}
+ }
+
+ targetsCache, ok := p.configs[tgg.Source()]
+ if !ok {
+ targetsCache = make(map[uint64][]confgroup.Config)
+ p.configs[tgg.Source()] = targetsCache
+ }
+
+ var changed bool
+ seen := make(map[uint64]bool)
+
+ for _, tgt := range tgg.Targets() {
+ if tgt == nil {
+ continue
+ }
+
+ hash := tgt.Hash()
+ seen[hash] = true
+
+ if _, ok := targetsCache[hash]; ok {
+ continue
+ }
+
+ targetsCache[hash] = nil
+
+ if tags := p.clr.classify(tgt); len(tags) > 0 {
+ tgt.Tags().Merge(tags)
+
+ if cfgs := p.cmr.compose(tgt); len(cfgs) > 0 {
+ targetsCache[hash] = cfgs
+ changed = true
+
+ for _, cfg := range cfgs {
+ cfg.SetProvider(tgg.Provider())
+ cfg.SetSource(tgg.Source())
+ cfg.SetSourceType(confgroup.TypeDiscovered)
+ if def, ok := p.configDefaults.Lookup(cfg.Module()); ok {
+ cfg.ApplyDefaults(def)
+ }
+ }
+ }
+ }
+ }
+
+ for hash := range targetsCache {
+ if seen[hash] {
+ continue
+ }
+ if cfgs := targetsCache[hash]; len(cfgs) > 0 {
+ changed = true
+ }
+ delete(targetsCache, hash)
+ }
+
+ if !changed {
+ return nil
+ }
+
+ // TODO: deepcopy?
+ cfgGroup := &confgroup.Group{Source: tgg.Source()}
+
+ for _, cfgs := range targetsCache {
+ cfgGroup.Configs = append(cfgGroup.Configs, cfgs...)
+ }
+
+ return cfgGroup
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go
new file mode 100644
index 000000000..2dd53cf10
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go
@@ -0,0 +1,303 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/ilyam8/hashstructure"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v2"
+)
+
+func Test_defaultConfigs(t *testing.T) {
+ dir := "../../../../config/go.d/sd/"
+ entries, err := os.ReadDir(dir)
+ require.NoError(t, err)
+
+ require.NotEmpty(t, entries)
+
+ for _, e := range entries {
+ if strings.Contains(e.Name(), "prometheus") {
+ continue
+ }
+ file, err := filepath.Abs(filepath.Join(dir, e.Name()))
+ require.NoError(t, err, "abs path")
+
+ bs, err := os.ReadFile(file)
+ require.NoError(t, err, "read config file")
+
+ var cfg Config
+ require.NoError(t, yaml.Unmarshal(bs, &cfg), "unmarshal")
+
+ _, err = New(cfg)
+ require.NoError(t, err, "create pipeline")
+ }
+}
+
+func TestNew(t *testing.T) {
+ tests := map[string]struct {
+ config string
+ wantErr bool
+ }{
+ "fails when config unset": {
+ wantErr: true,
+ config: "",
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+
+ var cfg Config
+ err := yaml.Unmarshal([]byte(test.config), &cfg)
+ require.Nilf(t, err, "cfg unmarshal")
+
+ _, err = New(cfg)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestPipeline_Run(t *testing.T) {
+ const config = `
+classify:
+ - selector: "rule1"
+ tags: "foo1"
+ match:
+ - tags: "bar1"
+ expr: '{{ glob .Name "mock*1*" }}'
+ - tags: "bar2"
+ expr: '{{ glob .Name "mock*2*" }}'
+compose:
+ - selector: "foo1"
+ config:
+ - selector: "bar1"
+ template: |
+ name: {{ .Name }}-foobar1
+ - selector: "bar2"
+ template: |
+ name: {{ .Name }}-foobar2
+`
+ tests := map[string]discoverySim{
+ "new group with no targets": {
+ config: config,
+ discoverers: []model.Discoverer{
+ newMockDiscoverer("",
+ newMockTargetGroup("test"),
+ ),
+ },
+ wantClassifyCalls: 0,
+ wantComposeCalls: 0,
+ wantConfGroups: nil,
+ },
+ "new group with targets": {
+ config: config,
+ discoverers: []model.Discoverer{
+ newMockDiscoverer("rule1",
+ newMockTargetGroup("test", "mock1", "mock2"),
+ ),
+ },
+ wantClassifyCalls: 2,
+ wantComposeCalls: 2,
+ wantConfGroups: []*confgroup.Group{
+ prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"),
+ },
+ },
+ "existing group with same targets": {
+ config: config,
+ discoverers: []model.Discoverer{
+ newMockDiscoverer("rule1",
+ newMockTargetGroup("test", "mock1", "mock2"),
+ ),
+ newDelayedMockDiscoverer("rule1", 5,
+ newMockTargetGroup("test", "mock1", "mock2"),
+ ),
+ },
+ wantClassifyCalls: 2,
+ wantComposeCalls: 2,
+ wantConfGroups: []*confgroup.Group{
+ prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"),
+ },
+ },
+ "existing group that previously had targets with no targets": {
+ config: config,
+ discoverers: []model.Discoverer{
+ newMockDiscoverer("rule1",
+ newMockTargetGroup("test", "mock1", "mock2"),
+ ),
+ newDelayedMockDiscoverer("rule1", 5,
+ newMockTargetGroup("test"),
+ ),
+ },
+ wantClassifyCalls: 2,
+ wantComposeCalls: 2,
+ wantConfGroups: []*confgroup.Group{
+ prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"),
+ prepareDiscoveredGroup(),
+ },
+ },
+ "existing group with old and new targets": {
+ config: config,
+ discoverers: []model.Discoverer{
+ newMockDiscoverer("rule1",
+ newMockTargetGroup("test", "mock1", "mock2"),
+ ),
+ newDelayedMockDiscoverer("rule1", 5,
+ newMockTargetGroup("test", "mock1", "mock2", "mock11", "mock22"),
+ ),
+ },
+ wantClassifyCalls: 4,
+ wantComposeCalls: 4,
+ wantConfGroups: []*confgroup.Group{
+ prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"),
+ prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2", "mock11-foobar1", "mock22-foobar2"),
+ },
+ },
+ "existing group with new targets only": {
+ config: config,
+ discoverers: []model.Discoverer{
+ newMockDiscoverer("rule1",
+ newMockTargetGroup("test", "mock1", "mock2"),
+ ),
+ newDelayedMockDiscoverer("rule1", 5,
+ newMockTargetGroup("test", "mock11", "mock22"),
+ ),
+ },
+ wantClassifyCalls: 4,
+ wantComposeCalls: 4,
+ wantConfGroups: []*confgroup.Group{
+ prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"),
+ prepareDiscoveredGroup("mock11-foobar1", "mock22-foobar2"),
+ },
+ },
+ }
+
+ for name, sim := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim.run(t)
+ })
+ }
+}
+
+func prepareDiscoveredGroup(configNames ...string) *confgroup.Group {
+ var configs []confgroup.Config
+
+ for _, name := range configNames {
+ configs = append(configs, confgroup.Config{}.
+ SetProvider("mock").
+ SetSourceType(confgroup.TypeDiscovered).
+ SetSource("test").
+ SetName(name))
+ }
+
+ return &confgroup.Group{
+ Source: "test",
+ Configs: configs,
+ }
+}
+
+func newMockDiscoverer(tags string, tggs ...model.TargetGroup) *mockDiscoverer {
+ return &mockDiscoverer{
+ tags: mustParseTags(tags),
+ tggs: tggs,
+ }
+}
+
+func newDelayedMockDiscoverer(tags string, delay int, tggs ...model.TargetGroup) *mockDiscoverer {
+ return &mockDiscoverer{
+ tags: mustParseTags(tags),
+ tggs: tggs,
+ delay: time.Duration(delay) * time.Second,
+ }
+}
+
+type mockDiscoverer struct {
+ tggs []model.TargetGroup
+ tags model.Tags
+ delay time.Duration
+}
+
+func (md mockDiscoverer) String() string {
+ return "mock discoverer"
+}
+
+func (md mockDiscoverer) Discover(ctx context.Context, out chan<- []model.TargetGroup) {
+ for _, tgg := range md.tggs {
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(md.tags)
+ }
+ }
+
+ select {
+ case <-ctx.Done():
+ case <-time.After(md.delay):
+ select {
+ case <-ctx.Done():
+ case out <- md.tggs:
+ }
+ }
+}
+
+func newMockTargetGroup(source string, targets ...string) *mockTargetGroup {
+ m := &mockTargetGroup{source: source}
+ for _, name := range targets {
+ m.targets = append(m.targets, &mockTarget{Name: name})
+ }
+ return m
+}
+
+type mockTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (mg mockTargetGroup) Targets() []model.Target { return mg.targets }
+func (mg mockTargetGroup) Source() string { return mg.source }
+func (mg mockTargetGroup) Provider() string { return "mock" }
+
+func newMockTarget(name string, tags ...string) *mockTarget {
+ m := &mockTarget{Name: name}
+ v, _ := model.ParseTags(strings.Join(tags, " "))
+ m.Tags().Merge(v)
+ return m
+}
+
+type mockTarget struct {
+ model.Base
+ Name string
+}
+
+func (mt mockTarget) TUID() string { return mt.Name }
+func (mt mockTarget) Hash() uint64 { return mustCalcHash(mt.Name) }
+
+func mustParseTags(line string) model.Tags {
+ v, err := model.ParseTags(line)
+ if err != nil {
+ panic(fmt.Sprintf("mustParseTags: %v", err))
+ }
+ return v
+}
+
+func mustCalcHash(obj any) uint64 {
+ hash, err := hashstructure.Hash(obj, nil)
+ if err != nil {
+ panic(fmt.Sprintf("hash calculation: %v", err))
+ }
+ return hash
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go
new file mode 100644
index 000000000..646e1abb1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go
@@ -0,0 +1,662 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+// https://github.com/prometheus/prometheus/wiki/Default-port-allocations
+var prometheusPortAllocations = map[int]string{
+ 2019: "caddy",
+ 3000: "grafana",
+ 3100: "loki",
+ 5555: "prometheus-jdbc-exporter",
+ 6060: "crowdsec",
+ 7300: "midonet_agent",
+ 8001: "netbox",
+ 8088: "fawkes",
+ 8089: "prom2teams",
+ 8292: "phabricator_webhook_for_alertmanager",
+ 8404: "ha_proxy_v2_plus",
+ 9042: "rds_exporter",
+ 9087: "telegram_bot_for_alertmanager",
+ 9091: "pushgateway",
+ 9097: "jiralert",
+ 9101: "haproxy_exporter",
+ 9102: "statsd_exporter",
+ 9103: "collectd_exporter",
+ 9104: "mysqld_exporter",
+ 9105: "mesos_exporter",
+ 9106: "cloudwatch_exporter",
+ 9107: "consul_exporter",
+ 9108: "graphite_exporter",
+ 9109: "graphite_exporter",
+ 9110: "blackbox_exporter",
+ 9111: "expvar_exporter",
+ 9112: "promacct_pcap-based_network_traffic_accounting",
+ 9113: "nginx_exporter",
+ 9114: "elasticsearch_exporter",
+ 9115: "blackbox_exporter",
+ 9116: "snmp_exporter",
+ 9117: "apache_exporter",
+ 9118: "jenkins_exporter",
+ 9119: "bind_exporter",
+ 9120: "powerdns_exporter",
+ 9121: "redis_exporter",
+ 9122: "influxdb_exporter",
+ 9123: "rethinkdb_exporter",
+ 9124: "freebsd_sysctl_exporter",
+ 9125: "statsd_exporter",
+ 9126: "new_relic_exporter",
+ 9127: "pgbouncer_exporter",
+ 9128: "ceph_exporter",
+ 9129: "haproxy_log_exporter",
+ 9130: "unifi_poller",
+ 9131: "varnish_exporter",
+ 9132: "airflow_exporter",
+ 9133: "fritz_box_exporter",
+ 9134: "zfs_exporter",
+ 9135: "rtorrent_exporter",
+ 9136: "collins_exporter",
+ 9137: "silicondust_hdhomerun_exporter",
+ 9138: "heka_exporter",
+ 9139: "azure_sql_exporter",
+ 9140: "mirth_exporter",
+ 9141: "zookeeper_exporter",
+ 9142: "big-ip_exporter",
+ 9143: "cloudmonitor_exporter",
+ 9145: "aerospike_exporter",
+ 9146: "icecast_exporter",
+ 9147: "nginx_request_exporter",
+ 9148: "nats_exporter",
+ 9149: "passenger_exporter",
+ 9150: "memcached_exporter",
+ 9151: "varnish_request_exporter",
+ 9152: "command_runner_exporter",
+ 9154: "postfix_exporter",
+ 9155: "vsphere_graphite",
+ 9156: "webdriver_exporter",
+ 9157: "ibm_mq_exporter",
+ 9158: "pingdom_exporter",
+ 9160: "apache_flink_exporter",
+ 9161: "oracle_db_exporter",
+ 9162: "apcupsd_exporter",
+ 9163: "zgres_exporter",
+ 9164: "s6_exporter",
+ 9165: "keepalived_exporter",
+ 9166: "dovecot_exporter",
+ 9167: "unbound_exporter",
+ 9168: "gitlab-monitor",
+ 9169: "lustre_exporter",
+ 9170: "docker_hub_exporter",
+ 9171: "github_exporter",
+ 9172: "script_exporter",
+ 9173: "rancher_exporter",
+ 9174: "docker-cloud_exporter",
+ 9175: "saltstack_exporter",
+ 9176: "openvpn_exporter",
+ 9177: "libvirt_exporter",
+ 9178: "stream_exporter",
+ 9179: "shield_exporter",
+ 9180: "scylladb_exporter",
+ 9181: "openstack_ceilometer_exporter",
+ 9183: "openstack_exporter",
+ 9184: "twitch_exporter",
+ 9185: "kafka_topic_exporter",
+ 9186: "cloud_foundry_firehose_exporter",
+ 9187: "postgresql_exporter",
+ 9188: "crypto_exporter",
+ 9189: "hetzner_cloud_csi_driver_nodes",
+ 9190: "bosh_exporter",
+ 9191: "netflow_exporter",
+ 9192: "ceph_exporter",
+ 9193: "cloud_foundry_exporter",
+ 9194: "bosh_tsdb_exporter",
+ 9195: "maxscale_exporter",
+ 9196: "upnp_internet_gateway_device_exporter",
+ 9198: "logstash_exporter",
+ 9199: "cloudflare_exporter",
+ 9202: "pacemaker_exporter",
+ 9203: "domain_exporter",
+ 9204: "pcsensor_temper_exporter",
+ 9205: "nextcloud_exporter",
+ 9206: "elasticsearch_exporter",
+ 9207: "mysql_exporter",
+ 9208: "kafka_consumer_group_exporter",
+ 9209: "fastnetmon_advanced_exporter",
+ 9210: "netatmo_exporter",
+ 9211: "dnsbl-exporter",
+ 9212: "digitalocean_exporter",
+ 9213: "custom_exporter",
+ 9214: "mqtt_blackbox_exporter",
+ 9215: "prometheus_graphite_bridge",
+ 9216: "mongodb_exporter",
+ 9217: "consul_agent_exporter",
+ 9218: "promql-guard",
+ 9219: "ssl_certificate_exporter",
+ 9220: "netapp_trident_exporter",
+ 9221: "proxmox_ve_exporter",
+ 9222: "aws_ecs_exporter",
+ 9223: "bladepsgi_exporter",
+ 9224: "fluentd_exporter",
+ 9225: "mailexporter",
+ 9226: "allas",
+ 9227: "proc_exporter",
+ 9228: "flussonic_exporter",
+ 9229: "gitlab-workhorse",
+ 9230: "network_ups_tools_exporter",
+ 9231: "solr_exporter",
+ 9232: "osquery_exporter",
+ 9233: "mgmt_exporter",
+ 9234: "mosquitto_exporter",
+ 9235: "gitlab-pages_exporter",
+ 9236: "gitlab_gitaly_exporter",
+ 9237: "sql_exporter",
+ 9238: "uwsgi_expoter",
+ 9239: "surfboard_exporter",
+ 9240: "tinyproxy_exporter",
+ 9241: "arangodb_exporter",
+ 9242: "ceph_radosgw_usage_exporter",
+ 9243: "chef_compliance_exporter",
+ 9244: "moby_container_exporter",
+ 9245: "naemon_nagios_exporter",
+ 9246: "smartpi",
+ 9247: "sphinx_exporter",
+ 9248: "freebsd_gstat_exporter",
+ 9249: "apache_flink_metrics_reporter",
+ 9250: "opentsdb_exporter",
+ 9251: "sensu_exporter",
+ 9252: "gitlab_runner_exporter",
+ 9253: "php-fpm_exporter",
+ 9254: "kafka_burrow_exporter",
+ 9255: "google_stackdriver_exporter",
+ 9256: "td-agent_exporter",
+ 9257: "smart_exporter",
+ 9258: "hello_sense_exporter",
+ 9259: "azure_resources_exporter",
+ 9260: "buildkite_exporter",
+ 9261: "grafana_exporter",
+ 9262: "bloomsky_exporter",
+ 9263: "vmware_guest_exporter",
+ 9264: "nest_exporter",
+ 9265: "weather_exporter",
+ 9266: "openhab_exporter",
+ 9267: "nagios_livestatus_exporter",
+ 9268: "cratedb_remote_remote_read_write_adapter",
+ 9269: "fluent-agent-lite_exporter",
+ 9270: "jmeter_exporter",
+ 9271: "pagespeed_exporter",
+ 9272: "vmware_exporter",
+ 9274: "kubernetes_persistentvolume_disk_usage_exporter",
+ 9275: "nrpe_exporter",
+ 9276: "azure_monitor_exporter",
+ 9277: "mongo_collection_exporter",
+ 9278: "crypto_miner_exporter",
+ 9279: "instaclustr_exporter",
+ 9280: "citrix_netscaler_exporter",
+ 9281: "fastd_exporter",
+ 9282: "freeswitch_exporter",
+ 9283: "ceph_ceph-mgr_prometheus_plugin",
+ 9284: "gobetween",
+ 9285: "database_exporter",
+ 9286: "vdo_compression_and_deduplication_exporter",
+ 9287: "ceph_iscsi_gateway_statistics",
+ 9288: "consrv",
+ 9289: "lovoos_ipmi_exporter",
+ 9290: "soundclouds_ipmi_exporter",
+ 9291: "ibm_z_hmc_exporter",
+ 9292: "netapp_ontap_api_exporter",
+ 9293: "connection_status_exporter",
+ 9294: "miflora_flower_care_exporter",
+ 9295: "freifunk_exporter",
+ 9296: "odbc_exporter",
+ 9297: "machbase_exporter",
+ 9298: "generic_exporter",
+ 9299: "exporter_aggregator",
+ 9301: "squid_exporter",
+ 9302: "faucet_sdn_faucet_exporter",
+ 9303: "faucet_sdn_gauge_exporter",
+ 9304: "logstash_exporter",
+ 9305: "go-ethereum_exporter",
+ 9306: "kyototycoon_exporter",
+ 9307: "audisto_exporter",
+ 9308: "kafka_exporter",
+ 9309: "fluentd_exporter",
+ 9310: "open_vswitch_exporter",
+ 9311: "iota_exporter",
+ 9313: "cloudprober_exporter",
+ 9314: "eris_exporter",
+ 9315: "centrifugo_exporter",
+ 9316: "tado_exporter",
+ 9317: "tellstick_local_exporter",
+ 9318: "conntrack_exporter",
+ 9319: "flexlm_exporter",
+ 9320: "consul_telemetry_exporter",
+ 9321: "spring_boot_actuator_exporter",
+ 9322: "haproxy_abuser_exporter",
+ 9323: "docker_prometheus_metrics",
+ 9324: "bird_routing_daemon_exporter",
+ 9325: "ovirt_exporter",
+ 9326: "junos_exporter",
+ 9327: "s3_exporter",
+ 9328: "openldap_syncrepl_exporter",
+ 9329: "cups_exporter",
+ 9330: "openldap_metrics_exporter",
+ 9331: "influx-spout_prometheus_metrics",
+ 9332: "network_exporter",
+ 9333: "vault_pki_exporter",
+ 9334: "ejabberd_exporter",
+ 9335: "nexsan_exporter",
+ 9336: "mediacom_internet_usage_exporter",
+ 9337: "mqttgateway",
+ 9339: "aws_s3_exporter",
+ 9340: "financial_quotes_exporter",
+ 9341: "slurm_exporter",
+ 9342: "frr_exporter",
+ 9343: "gridserver_exporter",
+ 9344: "mqtt_exporter",
+ 9345: "ruckus_smartzone_exporter",
+ 9346: "ping_exporter",
+ 9347: "junos_exporter",
+ 9348: "bigquery_exporter",
+ 9349: "configurable_elasticsearch_query_exporter",
+ 9350: "thousandeyes_exporter",
+ 9351: "wal-e_wal-g_exporter",
+ 9352: "nature_remo_exporter",
+ 9353: "ceph_exporter",
+ 9354: "deluge_exporter",
+ 9355: "nightwatchjs_exporter",
+ 9356: "pacemaker_exporter",
+ 9357: "p1_exporter",
+ 9358: "performance_counters_exporter",
+ 9359: "sidekiq_prometheus",
+ 9360: "powershell_exporter",
+ 9361: "scaleway_sd_exporter",
+ 9362: "cisco_exporter",
+ // Netdata has clickhouse collector.
+ // CH itself exposes messy Prometheus metrics: camelCase names, appends instances to names instead of labels.
+ //9363: "clickhouse",
+ 9364: "continent8_exporter",
+ 9365: "cumulus_linux_exporter",
+ 9366: "haproxy_stick_table_exporter",
+ 9367: "teamspeak3_exporter",
+ 9368: "ethereum_client_exporter",
+ 9369: "prometheus_pushprox",
+ 9370: "u-bmc",
+ 9371: "conntrack-stats-exporter",
+ 9372: "appmetrics_prometheus",
+ 9373: "gcp_service_discovery",
+ 9374: "smokeping_prober",
+ 9375: "particle_exporter",
+ 9376: "falco",
+ 9377: "cisco_aci_exporter",
+ 9378: "etcd_grpc_proxy_exporter",
+ 9379: "etcd_exporter",
+ 9380: "mythtv_exporter",
+ 9381: "kafka_zookeeper_exporter",
+ 9382: "frrouting_exporter",
+ 9383: "aws_health_exporter",
+ 9384: "aws_sqs_exporter",
+ 9385: "apcupsdexporter",
+ 9386: "tankerkönig_api_exporter",
+ 9387: "sabnzbd_exporter",
+ 9388: "linode_exporter",
+ 9389: "scylla-cluster-tests_exporter",
+ 9390: "kannel_exporter",
+ 9391: "concourse_prometheus_metrics",
+ 9392: "generic_command_line_output_exporter",
+ 9393: "alertmanager_github_webhook_receiver",
+ 9394: "ruby_prometheus_exporter",
+ 9395: "ldap_exporter",
+ 9396: "monerod_exporter",
+ 9397: "comap",
+ 9398: "open_hardware_monitor_exporter",
+ 9399: "prometheus_sql_exporter",
+ 9400: "ripe_atlas_exporter",
+ 9401: "1-wire_exporter",
+ 9402: "google_cloud_platform_exporter",
+ 9403: "zerto_exporter",
+ 9404: "jmx_exporter",
+ 9405: "discourse_exporter",
+ 9406: "hhvm_exporter",
+ 9407: "obs_studio_exporter",
+ 9408: "rds_enhanced_monitoring_exporter",
+ 9409: "ovn-kubernetes_master_exporter",
+ 9410: "ovn-kubernetes_node_exporter",
+ 9411: "softether_exporter",
+ 9412: "sentry_exporter",
+ 9413: "mogilefs_exporter",
+ 9414: "homey_exporter",
+ 9415: "cloudwatch_read_adapter",
+ 9416: "hp_ilo_metrics_exporter",
+ 9417: "ethtool_exporter",
+ 9418: "gearman_exporter",
+ 9419: "rabbitmq_exporter",
+ 9420: "couchbase_exporter",
+ 9421: "apicast",
+ 9422: "jolokia_exporter",
+ 9423: "hp_raid_exporter",
+ 9424: "influxdb_stats_exporter",
+ 9425: "pachyderm_exporter",
+ 9426: "vespa_engine_exporter",
+ 9427: "ping_exporter",
+ 9428: "ssh_exporter",
+ 9429: "uptimerobot_exporter",
+ 9430: "corerad",
+ 9431: "hpfeeds_broker_exporter",
+ 9432: "windows_perflib_exporter",
+ 9433: "knot_exporter",
+ 9434: "opensips_exporter",
+ 9435: "ebpf_exporter",
+ 9436: "mikrotik-exporter",
+ 9437: "dell_emc_isilon_exporter",
+ 9438: "dell_emc_ecs_exporter",
+ 9439: "bitcoind_exporter",
+ 9440: "ravendb_exporter",
+ 9441: "nomad_exporter",
+ 9442: "mcrouter_exporter",
+ 9444: "foundationdb_exporter",
+ 9445: "nvidia_gpu_exporter",
+ 9446: "orange_livebox_dsl_modem_exporter",
+ 9447: "resque_exporter",
+ 9448: "eventstore_exporter",
+ 9449: "omeroserver_exporter",
+ 9450: "habitat_exporter",
+ 9451: "reindexer_exporter",
+ 9452: "freebsd_jail_exporter",
+ 9453: "midonet-kubernetes",
+ 9454: "nvidia_smi_exporter",
+ 9455: "iptables_exporter",
+ 9456: "aws_lambda_exporter",
+ 9457: "files_content_exporter",
+ 9458: "rocketchat_exporter",
+ 9459: "yarn_exporter",
+ 9460: "hana_exporter",
+ 9461: "aws_lambda_read_adapter",
+ 9462: "php_opcache_exporter",
+ 9463: "virgin_media_liberty_global_hub3_exporter",
+ 9464: "opencensus-nodejs_prometheus_exporter",
+ 9465: "hetzner_cloud_k8s_cloud_controller_manager",
+ 9466: "mqtt_push_gateway",
+ 9467: "nginx-prometheus-shiny-exporter",
+ 9468: "nasa-swpc-exporter",
+ 9469: "script_exporter",
+ 9470: "cachet_exporter",
+ 9471: "lxc-exporter",
+ 9472: "hetzner_cloud_csi_driver_controller",
+ 9473: "stellar-core-exporter",
+ 9474: "libvirtd_exporter",
+ 9475: "wgipamd",
+ 9476: "ovn_metrics_exporter",
+ 9477: "csp_violation_report_exporter",
+ 9478: "sentinel_exporter",
+ 9479: "elasticbeat_exporter",
+ 9480: "brigade_exporter",
+ 9481: "drbd9_exporter",
+ 9482: "vector_packet_process_vpp_exporter",
+ 9483: "ibm_app_connect_enterprise_exporter",
+ 9484: "kubedex-exporter",
+ 9485: "emarsys_exporter",
+ 9486: "domoticz_exporter",
+ 9487: "docker_stats_exporter",
+ 9488: "bmw_connected_drive_exporter",
+ 9489: "tezos_node_metrics_exporter",
+ 9490: "exporter_for_docker_libnetwork_plugin_for_ovn",
+ 9491: "docker_container_stats_exporter_docker_ps",
+ 9492: "azure_exporter_monitor_and_usage",
+ 9493: "prosafe_exporter",
+ 9494: "kamailio_exporter",
+ 9495: "ingestor_exporter",
+ 9496: "389ds_ipa_exporter",
+ 9497: "immudb_exporter",
+ 9498: "tp-link_hs110_exporter",
+ 9499: "smartthings_exporter",
+ 9500: "cassandra_exporter",
+ 9501: "hetznercloud_exporter",
+ 9502: "hetzner_exporter",
+ 9503: "scaleway_exporter",
+ 9504: "github_exporter",
+ 9505: "dockerhub_exporter",
+ 9506: "jenkins_exporter",
+ 9507: "owncloud_exporter",
+ 9508: "ccache_exporter",
+ 9509: "hetzner_storagebox_exporter",
+ 9510: "dummy_exporter",
+ 9512: "cloudera_exporter",
+ 9513: "openconfig_streaming_telemetry_exporter",
+ 9514: "app_stores_exporter",
+ 9515: "swarm-exporter",
+ 9516: "prometheus_speedtest_exporter",
+ 9517: "matroschka_prober",
+ 9518: "crypto_stock_exchanges_funds_exporter",
+ 9519: "acurite_exporter",
+ 9520: "swift_health_exporter",
+ 9521: "ruuvi_exporter",
+ 9522: "tftp_exporter",
+ 9523: "3cx_exporter",
+ 9524: "loki_exporter",
+ 9525: "alibaba_cloud_exporter",
+ 9526: "kafka_lag_exporter",
+ 9527: "netgear_cable_modem_exporter",
+ 9528: "total_connect_comfort_exporter",
+ 9529: "octoprint_exporter",
+ 9530: "custom_prometheus_exporter",
+ 9531: "jfrog_artifactory_exporter",
+ 9532: "snyk_exporter",
+ 9533: "network_exporter_for_cisco_api",
+ 9534: "humio_exporter",
+ 9535: "cron_exporter",
+ 9536: "ipsec_exporter",
+ 9537: "cri-o",
+ 9538: "bull_queue",
+ 9539: "modemmanager_exporter",
+ 9540: "emq_exporter",
+ 9541: "smartmon_exporter",
+ 9542: "sakuracloud_exporter",
+ 9543: "kube2iam_exporter",
+ 9544: "pgio_exporter",
+ 9545: "hp_ilo4_exporter",
+ 9546: "pwrstat-exporter",
+ 9547: "patroni_exporter",
+ 9548: "trafficserver_exporter",
+ 9549: "raspberry_exporter",
+ 9550: "rtl_433_exporter",
+ 9551: "hostapd_exporter",
+ 9552: "aws_elastic_beanstalk_exporter",
+ 9553: "apt_exporter",
+ 9554: "acc_server_manager_exporter",
+ 9555: "sona_exporter",
+ 9556: "routinator_exporter",
+ 9557: "mysql_count_exporter",
+ 9558: "systemd_exporter",
+ 9559: "ntp_exporter",
+ 9560: "sql_queries_exporter",
+ 9561: "qbittorrent_exporter",
+ 9562: "ptv_xserver_exporter",
+ 9563: "kibana_exporter",
+ 9564: "purpleair_exporter",
+ 9565: "bminer_exporter",
+ 9566: "rabbitmq_cli_consumer",
+ 9567: "alertsnitch",
+ 9568: "dell_poweredge_ipmi_exporter",
+ 9569: "hvpa_controller",
+ 9570: "vpa_exporter",
+ 9571: "helm_exporter",
+ 9572: "ctld_exporter",
+ 9573: "jkstatus_exporter",
+ 9574: "opentracker_exporter",
+ 9575: "poweradmin_server_monitor_exporter",
+ 9576: "exabgp_exporter",
+ 9578: "aria2_exporter",
+ 9579: "iperf3_exporter",
+ 9580: "azure_service_bus_exporter",
+ 9581: "codenotary_vcn_exporter",
+ 9583: "signatory_a_remote_operation_signer_for_tezos",
+ 9584: "bunnycdn_exporter",
+ 9585: "opvizor_performance_analyzer_process_exporter",
+ 9586: "wireguard_exporter",
+ 9587: "nfs-ganesha_exporter",
+ 9588: "ltsv-tailer_exporter",
+ 9589: "goflow_exporter",
+ 9590: "flow_exporter",
+ 9591: "srcds_exporter",
+ 9592: "gcp_quota_exporter",
+ 9593: "lighthouse_exporter",
+ 9594: "plex_exporter",
+ 9595: "netio_exporter",
+ 9596: "azure_elastic_sql_exporter",
+ 9597: "github_vulnerability_alerts_exporter",
+ 9599: "pirograph_exporter",
+ 9600: "circleci_exporter",
+ 9601: "messagebird_exporter",
+ 9602: "modbus_exporter",
+ 9603: "xen_exporter_using_xenlight",
+ 9604: "xmpp_blackbox_exporter",
+ 9605: "fping-exporter",
+ 9606: "ecr-exporter",
+ 9607: "raspberry_pi_sense_hat_exporter",
+ 9608: "ironic_prometheus_exporter",
+ 9609: "netapp_exporter",
+ 9610: "kubernetes_exporter",
+ 9611: "speedport_exporter",
+ 9612: "opflex-agent_exporter",
+ 9613: "azure_health_exporter",
+ 9614: "nut_upsc_exporter",
+ 9615: "mellanox_mlx5_exporter",
+ 9616: "mailgun_exporter",
+ 9617: "pi-hole_exporter",
+ 9618: "stellar-account-exporter",
+ 9619: "stellar-horizon-exporter",
+ 9620: "rundeck_exporter",
+ 9621: "opennebula_exporter",
+ 9622: "bmc_exporter",
+ 9623: "tc4400_exporter",
+ 9624: "pact_broker_exporter",
+ 9625: "bareos_exporter",
+ 9626: "hockeypuck",
+ 9627: "artifactory_exporter",
+ 9628: "solace_pubsub_plus_exporter",
+ 9629: "prometheus_gitlab_notifier",
+ 9630: "nftables_exporter",
+ 9631: "a_op5_monitor_exporter",
+ 9632: "opflex-server_exporter",
+ 9633: "smartctl_exporter",
+ 9634: "aerospike_ttl_exporter",
+ 9635: "fail2ban_exporter",
+ 9636: "exim4_exporter",
+ 9637: "kubeversion_exporter",
+ 9638: "a_icinga2_exporter",
+ 9639: "scriptable_jmx_exporter",
+ 9640: "logstash_output_exporter",
+ 9641: "coturn_exporter",
+ 9642: "bugsnag_exporter",
+ 9644: "exporter_for_grouped_process",
+ 9645: "burp_exporter",
+ 9646: "locust_exporter",
+ 9647: "docker_exporter",
+ 9648: "ntpmon_exporter",
+ 9649: "logstash_exporter",
+ 9650: "keepalived_exporter",
+ 9651: "storj_exporter",
+ 9652: "praefect_exporter",
+ 9653: "jira_issues_exporter",
+ 9654: "ansible_galaxy_exporter",
+ 9655: "kube-netc_exporter",
+ 9656: "matrix",
+ 9657: "krill_exporter",
+ 9658: "sap_hana_sql_exporter",
+ 9660: "kaiterra_laser_egg_exporter",
+ 9661: "hashpipe_exporter",
+ 9662: "pms5003_particulate_matter_sensor_exporter",
+ 9663: "sap_nwrfc_exporter",
+ 9664: "linux_ha_clusterlabs_exporter",
+ 9665: "senderscore_exporter",
+ 9666: "alertmanager_silences_exporter",
+ 9667: "smtpd_exporter",
+ 9668: "suses_sap_hana_exporter",
+ 9669: "panopticon_native_metrics",
+ 9670: "flare_native_metrics",
+ 9671: "aws_ec2_spot_exporter",
+ 9672: "aircontrol_co2_exporter",
+ 9673: "co2_monitor_exporter",
+ 9674: "google_analytics_exporter",
+ 9675: "docker_swarm_exporter",
+ 9676: "hetzner_traffic_exporter",
+ 9677: "aws_ecs_exporter",
+ 9678: "ircd_user_exporter",
+ 9679: "aws_health_exporter",
+ 9680: "suses_sap_host_exporter",
+ 9681: "myfitnesspal_exporter",
+ 9682: "powder_monkey",
+ 9683: "infiniband_exporter",
+ 9684: "kibana_standalone_exporter",
+ 9685: "eideticom",
+ 9686: "aws_ec2_exporter",
+ 9687: "gitaly_blackbox_exporter",
+ 9689: "lan_server_modbus_exporter",
+ 9690: "tcp_longterm_connection_exporter",
+ 9691: "celery_redis_exporter",
+ 9692: "gcp_gce_exporter",
+ 9693: "sigma_air_manager_exporter",
+ 9694: "per-user_usage_exporter_for_cisco_xe_lnss",
+ 9695: "cifs_exporter",
+ 9696: "jitsi_videobridge_exporter",
+ 9697: "tendermint_blockchain_exporter",
+ 9698: "integrated_dell_remote_access_controller_idrac_exporter",
+ 9699: "pyncette_exporter",
+ 9700: "jitsi_meet_exporter",
+ 9701: "workbook_exporter",
+ 9702: "homeplug_plc_exporter",
+ 9703: "vircadia",
+ 9704: "linux_tc_exporter",
+ 9705: "upc_connect_box_exporter",
+ 9706: "postfix_exporter",
+ 9707: "radarr_exporter",
+ 9708: "sonarr_exporter",
+ 9709: "hadoop_hdfs_fsimage_exporter",
+ 9710: "nut-exporter",
+ 9711: "cloudflare_flan_scan_report_exporter",
+ 9712: "siemens_s7_plc_exporter",
+ 9713: "glusterfs_exporter",
+ 9714: "fritzbox_exporter",
+ 9715: "twincat_ads_web_service_exporter",
+ 9716: "signald_webhook_receiver",
+ 9717: "tplink_easysmart_switch_exporter",
+ 9718: "warp10_exporter",
+ 9719: "pgpool-ii_exporter",
+ 9720: "moodle_db_exporter",
+ 9721: "gtp_exporter",
+ 9722: "miele_exporter",
+ 9724: "freeswitch_exporter",
+ 9725: "sunnyboy_exporter",
+ 9726: "python_rq_exporter",
+ 9727: "ctdb_exporter",
+ 9728: "nginx-rtmp_exporter",
+ 9729: "libvirtd_exporter",
+ 9730: "lynis_exporter",
+ 9731: "nebula_mam_exporter",
+ 9732: "nftables_exporter",
+ 9733: "honeypot_exporter",
+ 9734: "a10-networks_prometheus_exporter",
+ 9735: "webweaver",
+ 9736: "mongodb_query_exporter",
+ 9737: "folding_home_exporter",
+ 9738: "processor_counter_monitor_exporter",
+ 9739: "kafka_consumer_lag_monitoring",
+ 9740: "flightdeck",
+ 9741: "ibm_spectrum_exporter",
+ 9742: "transmission-exporter",
+ 9743: "sma-exporter",
+ 9803: "site24x7_exporter",
+ 9901: "envoy_proxy",
+ 9913: "nginx_vts_exporter",
+ 9943: "filestat_exporter",
+ 9980: "login_exporter",
+ 9983: "sia_exporter",
+ 9984: "couchdb_exporter",
+ 9987: "netapp_solidfire_exporter",
+ 9990: "wildfly_exporter",
+ 16995: "storidge_exporter",
+ 19091: "transmission_exporter",
+ 24231: "fluent_plugin_for_prometheus",
+ 42004: "proxysql_exporter",
+ 44323: "pcp_exporter",
+ 61091: "dcos_exporter",
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go
new file mode 100644
index 000000000..8bb5fb061
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go
@@ -0,0 +1,154 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+)
+
+type selector interface {
+ matches(model.Tags) bool
+}
+
+type (
+ exactSelector string
+ trueSelector struct{}
+ negSelector struct{ selector }
+ orSelector struct{ lhs, rhs selector }
+ andSelector struct{ lhs, rhs selector }
+)
+
+func (s exactSelector) matches(tags model.Tags) bool { _, ok := tags[string(s)]; return ok }
+func (s trueSelector) matches(model.Tags) bool { return true }
+func (s negSelector) matches(tags model.Tags) bool { return !s.selector.matches(tags) }
+func (s orSelector) matches(tags model.Tags) bool { return s.lhs.matches(tags) || s.rhs.matches(tags) }
+func (s andSelector) matches(tags model.Tags) bool { return s.lhs.matches(tags) && s.rhs.matches(tags) }
+
+func (s exactSelector) String() string { return "{" + string(s) + "}" }
+func (s negSelector) String() string { return "{!" + stringify(s.selector) + "}" }
+func (s trueSelector) String() string { return "{*}" }
+func (s orSelector) String() string { return "{" + stringify(s.lhs) + "|" + stringify(s.rhs) + "}" }
+func (s andSelector) String() string { return "{" + stringify(s.lhs) + ", " + stringify(s.rhs) + "}" }
+func stringify(sr selector) string { return strings.Trim(fmt.Sprintf("%s", sr), "{}") }
+
+func parseSelector(line string) (sr selector, err error) {
+ words := strings.Fields(line)
+ if len(words) == 0 {
+ return trueSelector{}, nil
+ }
+
+ var srs []selector
+ for _, word := range words {
+ if idx := strings.IndexByte(word, '|'); idx > 0 {
+ sr, err = parseOrSelectorWord(word)
+ } else {
+ sr, err = parseSingleSelectorWord(word)
+ }
+ if err != nil {
+ return nil, fmt.Errorf("selector '%s' contains selector '%s' with forbidden symbol", line, word)
+ }
+ srs = append(srs, sr)
+ }
+
+ switch len(srs) {
+ case 0:
+ return trueSelector{}, nil
+ case 1:
+ return srs[0], nil
+ default:
+ return newAndSelector(srs[0], srs[1], srs[2:]...), nil
+ }
+}
+
+func parseOrSelectorWord(orWord string) (sr selector, err error) {
+ var srs []selector
+ for _, word := range strings.Split(orWord, "|") {
+ if sr, err = parseSingleSelectorWord(word); err != nil {
+ return nil, err
+ }
+ srs = append(srs, sr)
+ }
+ switch len(srs) {
+ case 0:
+ return trueSelector{}, nil
+ case 1:
+ return srs[0], nil
+ default:
+ return newOrSelector(srs[0], srs[1], srs[2:]...), nil
+ }
+}
+
+func parseSingleSelectorWord(word string) (selector, error) {
+ if len(word) == 0 {
+ return nil, errors.New("empty word")
+ }
+ neg := word[0] == '!'
+ if neg {
+ word = word[1:]
+ }
+ if len(word) == 0 {
+ return nil, errors.New("empty word")
+ }
+ if word != "*" && !isSelectorWordValid(word) {
+ return nil, errors.New("forbidden symbol")
+ }
+
+ var sr selector
+ switch word {
+ case "*":
+ sr = trueSelector{}
+ default:
+ sr = exactSelector(word)
+ }
+ if neg {
+ return negSelector{sr}, nil
+ }
+ return sr, nil
+}
+
+func newAndSelector(lhs, rhs selector, others ...selector) selector {
+ m := andSelector{lhs: lhs, rhs: rhs}
+ switch len(others) {
+ case 0:
+ return m
+ default:
+ return newAndSelector(m, others[0], others[1:]...)
+ }
+}
+
+func newOrSelector(lhs, rhs selector, others ...selector) selector {
+ m := orSelector{lhs: lhs, rhs: rhs}
+ switch len(others) {
+ case 0:
+ return m
+ default:
+ return newOrSelector(m, others[0], others[1:]...)
+ }
+}
+
+func isSelectorWordValid(word string) bool {
+ // valid:
+ // *
+ // ^[a-zA-Z][a-zA-Z0-9=_.]*$
+ if len(word) == 0 {
+ return false
+ }
+ if word == "*" {
+ return true
+ }
+ for i, b := range word {
+ switch {
+ case b >= 'a' && b <= 'z':
+ case b >= 'A' && b <= 'Z':
+ case b >= '0' && b <= '9' && i > 0:
+ case (b == '=' || b == '_' || b == '.') && i > 0:
+ default:
+ return false
+ }
+ }
+ return true
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go
new file mode 100644
index 000000000..a4fcf3041
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go
@@ -0,0 +1,248 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "regexp"
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var reSrString = regexp.MustCompile(`^{[^{}]+}$`)
+
+func TestTrueSelector_String(t *testing.T) {
+ var sr trueSelector
+ assert.Equal(t, "{*}", sr.String())
+}
+
+func TestExactSelector_String(t *testing.T) {
+ sr := exactSelector("selector")
+
+ assert.True(t, reSrString.MatchString(sr.String()))
+}
+
+func TestNegSelector_String(t *testing.T) {
+ srs := []selector{
+ exactSelector("selector"),
+ negSelector{exactSelector("selector")},
+ orSelector{
+ lhs: exactSelector("selector"),
+ rhs: exactSelector("selector")},
+ orSelector{
+ lhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ rhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ },
+ andSelector{
+ lhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ rhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ },
+ }
+
+ for i, sr := range srs {
+ neg := negSelector{sr}
+ assert.True(t, reSrString.MatchString(neg.String()), "selector num %d", i+1)
+ }
+}
+
+func TestOrSelector_String(t *testing.T) {
+ sr := orSelector{
+ lhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ rhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ }
+
+ assert.True(t, reSrString.MatchString(sr.String()))
+}
+
+func TestAndSelector_String(t *testing.T) {
+ sr := andSelector{
+ lhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ rhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}},
+ }
+
+ assert.True(t, reSrString.MatchString(sr.String()))
+}
+
+func TestExactSelector_Matches(t *testing.T) {
+ matchTests := struct {
+ tags model.Tags
+ srs []exactSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}},
+ srs: []exactSelector{
+ "a",
+ "b",
+ },
+ }
+ notMatchTests := struct {
+ tags model.Tags
+ srs []exactSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}},
+ srs: []exactSelector{
+ "c",
+ "d",
+ },
+ }
+
+ for i, sr := range matchTests.srs {
+ assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1)
+ }
+ for i, sr := range notMatchTests.srs {
+ assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1)
+ }
+}
+
+func TestNegSelector_Matches(t *testing.T) {
+ matchTests := struct {
+ tags model.Tags
+ srs []negSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}},
+ srs: []negSelector{
+ {exactSelector("c")},
+ {exactSelector("d")},
+ },
+ }
+ notMatchTests := struct {
+ tags model.Tags
+ srs []negSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}},
+ srs: []negSelector{
+ {exactSelector("a")},
+ {exactSelector("b")},
+ },
+ }
+
+ for i, sr := range matchTests.srs {
+ assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1)
+ }
+ for i, sr := range notMatchTests.srs {
+ assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1)
+ }
+}
+
+func TestOrSelector_Matches(t *testing.T) {
+ matchTests := struct {
+ tags model.Tags
+ srs []orSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}},
+ srs: []orSelector{
+ {
+ lhs: orSelector{lhs: exactSelector("c"), rhs: exactSelector("d")},
+ rhs: orSelector{lhs: exactSelector("e"), rhs: exactSelector("b")},
+ },
+ },
+ }
+ notMatchTests := struct {
+ tags model.Tags
+ srs []orSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}},
+ srs: []orSelector{
+ {
+ lhs: orSelector{lhs: exactSelector("c"), rhs: exactSelector("d")},
+ rhs: orSelector{lhs: exactSelector("e"), rhs: exactSelector("f")},
+ },
+ },
+ }
+
+ for i, sr := range matchTests.srs {
+ assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1)
+ }
+ for i, sr := range notMatchTests.srs {
+ assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1)
+ }
+}
+
+func TestAndSelector_Matches(t *testing.T) {
+ matchTests := struct {
+ tags model.Tags
+ srs []andSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}, "c": {}, "d": {}},
+ srs: []andSelector{
+ {
+ lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")},
+ rhs: andSelector{lhs: exactSelector("c"), rhs: exactSelector("d")},
+ },
+ },
+ }
+ notMatchTests := struct {
+ tags model.Tags
+ srs []andSelector
+ }{
+ tags: model.Tags{"a": {}, "b": {}, "c": {}, "d": {}},
+ srs: []andSelector{
+ {
+ lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")},
+ rhs: andSelector{lhs: exactSelector("c"), rhs: exactSelector("z")},
+ },
+ },
+ }
+
+ for i, sr := range matchTests.srs {
+ assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1)
+ }
+ for i, sr := range notMatchTests.srs {
+ assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1)
+ }
+}
+
+func TestParseSelector(t *testing.T) {
+ tests := map[string]struct {
+ wantSelector selector
+ wantErr bool
+ }{
+ "": {wantSelector: trueSelector{}},
+ "a": {wantSelector: exactSelector("a")},
+ "Z": {wantSelector: exactSelector("Z")},
+ "a_b": {wantSelector: exactSelector("a_b")},
+ "a=b": {wantSelector: exactSelector("a=b")},
+ "!a": {wantSelector: negSelector{exactSelector("a")}},
+ "a b": {wantSelector: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}},
+ "a|b": {wantSelector: orSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}},
+ "*": {wantSelector: trueSelector{}},
+ "!*": {wantSelector: negSelector{trueSelector{}}},
+ "a b !c d|e f": {
+ wantSelector: andSelector{
+ lhs: andSelector{
+ lhs: andSelector{
+ lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")},
+ rhs: negSelector{exactSelector("c")},
+ },
+ rhs: orSelector{
+ lhs: exactSelector("d"),
+ rhs: exactSelector("e"),
+ },
+ },
+ rhs: exactSelector("f"),
+ },
+ },
+ "!": {wantErr: true},
+ "a !": {wantErr: true},
+ "a!b": {wantErr: true},
+ "0a": {wantErr: true},
+ "a b c*": {wantErr: true},
+ "__": {wantErr: true},
+ "a|b|c*": {wantErr: true},
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sr, err := parseSelector(name)
+
+ if test.wantErr {
+ assert.Nil(t, sr)
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, test.wantSelector, sr)
+ }
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go
new file mode 100644
index 000000000..23a120751
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go
@@ -0,0 +1,130 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "context"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gopkg.in/yaml.v2"
+)
+
+type discoverySim struct {
+ config string
+ discoverers []model.Discoverer
+ wantClassifyCalls int
+ wantComposeCalls int
+ wantConfGroups []*confgroup.Group
+}
+
+func (sim discoverySim) run(t *testing.T) {
+ t.Helper()
+
+ var cfg Config
+ err := yaml.Unmarshal([]byte(sim.config), &cfg)
+ require.Nilf(t, err, "cfg unmarshal")
+
+ clr, err := newTargetClassificator(cfg.Classify)
+ require.Nil(t, err, "newTargetClassificator")
+
+ cmr, err := newConfigComposer(cfg.Compose)
+ require.Nil(t, err, "newConfigComposer")
+
+ mockClr := &mockClassificator{clr: clr}
+ mockCmr := &mockComposer{cmr: cmr}
+
+ accum := newAccumulator()
+ accum.sendEvery = time.Second * 2
+
+ pl := &Pipeline{
+ Logger: logger.New(),
+ discoverers: sim.discoverers,
+ accum: accum,
+ clr: mockClr,
+ cmr: mockCmr,
+ configs: make(map[string]map[uint64][]confgroup.Config),
+ }
+
+ pl.accum.Logger = pl.Logger
+ clr.Logger = pl.Logger
+ cmr.Logger = pl.Logger
+
+ groups := sim.collectGroups(t, pl)
+
+ sortConfigGroups(groups)
+ sortConfigGroups(sim.wantConfGroups)
+
+ assert.Equal(t, sim.wantConfGroups, groups)
+ assert.Equalf(t, sim.wantClassifyCalls, mockClr.calls, "classify calls")
+ assert.Equalf(t, sim.wantComposeCalls, mockCmr.calls, "compose calls")
+}
+
+func (sim discoverySim) collectGroups(t *testing.T, pl *Pipeline) []*confgroup.Group {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ in := make(chan []*confgroup.Group)
+ done := make(chan struct{})
+
+ go func() { defer close(done); pl.Run(ctx, in) }()
+
+ timeout := time.Second * 10
+ var groups []*confgroup.Group
+
+ func() {
+ for {
+ select {
+ case inGroups := <-in:
+ groups = append(groups, inGroups...)
+ case <-done:
+ return
+ case <-time.After(timeout):
+ t.Logf("discovery timed out after %s, got %d groups, expected %d, some events are skipped",
+ timeout, len(groups), len(sim.wantConfGroups))
+ return
+ }
+ }
+ }()
+
+ return groups
+}
+
+type mockClassificator struct {
+ calls int
+ clr *targetClassificator
+}
+
+func (m *mockClassificator) classify(tgt model.Target) model.Tags {
+ m.calls++
+ return m.clr.classify(tgt)
+}
+
+type mockComposer struct {
+ calls int
+ cmr *configComposer
+}
+
+func (m *mockComposer) compose(tgt model.Target) []confgroup.Config {
+ m.calls++
+ return m.cmr.compose(tgt)
+}
+
+func sortConfigGroups(groups []*confgroup.Group) {
+ sort.Slice(groups, func(i, j int) bool {
+ return groups[i].Source < groups[j].Source
+ })
+
+ for _, g := range groups {
+ sort.Slice(g.Configs, func(i, j int) bool {
+ return g.Configs[i].Name() < g.Configs[j].Name()
+ })
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go
new file mode 100644
index 000000000..ab84c979e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go
@@ -0,0 +1,147 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package sd
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "sync"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/multipath"
+
+ "gopkg.in/yaml.v2"
+)
+
+type Config struct {
+ ConfigDefaults confgroup.Registry
+ ConfDir multipath.MultiPath
+}
+
+func NewServiceDiscovery(cfg Config) (*ServiceDiscovery, error) {
+ log := logger.New().With(
+ slog.String("component", "service discovery"),
+ )
+
+ d := &ServiceDiscovery{
+ Logger: log,
+ confProv: newConfFileReader(log, cfg.ConfDir),
+ configDefaults: cfg.ConfigDefaults,
+ newPipeline: func(config pipeline.Config) (sdPipeline, error) {
+ return pipeline.New(config)
+ },
+ pipelines: make(map[string]func()),
+ }
+
+ return d, nil
+}
+
+type (
+ ServiceDiscovery struct {
+ *logger.Logger
+
+ confProv confFileProvider
+
+ configDefaults confgroup.Registry
+ newPipeline func(config pipeline.Config) (sdPipeline, error)
+ pipelines map[string]func()
+ }
+ sdPipeline interface {
+ Run(ctx context.Context, in chan<- []*confgroup.Group)
+ }
+ confFileProvider interface {
+ run(ctx context.Context)
+ configs() chan confFile
+ }
+)
+
+func (d *ServiceDiscovery) String() string {
+ return "service discovery"
+}
+
+func (d *ServiceDiscovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ d.Info("instance is started")
+ defer func() { d.cleanup(); d.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); d.confProv.run(ctx) }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); d.run(ctx, in) }()
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (d *ServiceDiscovery) run(ctx context.Context, in chan<- []*confgroup.Group) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case cfg := <-d.confProv.configs():
+ if cfg.source == "" {
+ continue
+ }
+ if len(cfg.content) == 0 {
+ d.removePipeline(cfg)
+ } else {
+ d.addPipeline(ctx, cfg, in)
+ }
+ }
+ }
+}
+
+func (d *ServiceDiscovery) removePipeline(conf confFile) {
+ if stop, ok := d.pipelines[conf.source]; ok {
+ d.Infof("received an empty config, stopping the pipeline ('%s')", conf.source)
+ delete(d.pipelines, conf.source)
+ stop()
+ }
+}
+
+func (d *ServiceDiscovery) addPipeline(ctx context.Context, conf confFile, in chan<- []*confgroup.Group) {
+ var cfg pipeline.Config
+
+ if err := yaml.Unmarshal(conf.content, &cfg); err != nil {
+ d.Error(err)
+ return
+ }
+
+ if cfg.Disabled {
+ d.Infof("pipeline config is disabled '%s' (%s)", cfg.Name, cfg.Source)
+ return
+ }
+
+ cfg.Source = fmt.Sprintf("file=%s", conf.source)
+ cfg.ConfigDefaults = d.configDefaults
+
+ pl, err := d.newPipeline(cfg)
+ if err != nil {
+ d.Error(err)
+ return
+ }
+
+ if stop, ok := d.pipelines[conf.source]; ok {
+ stop()
+ }
+
+ var wg sync.WaitGroup
+ plCtx, cancel := context.WithCancel(ctx)
+
+ wg.Add(1)
+ go func() { defer wg.Done(); pl.Run(plCtx, in) }()
+
+ stop := func() { cancel(); wg.Wait() }
+ d.pipelines[conf.source] = stop
+}
+
+func (d *ServiceDiscovery) cleanup() {
+ for _, stop := range d.pipelines {
+ stop()
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/sd_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/sd_test.go
new file mode 100644
index 000000000..376c9f7e7
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/sd_test.go
@@ -0,0 +1,106 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package sd
+
+import (
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline"
+
+ "gopkg.in/yaml.v2"
+)
+
+func TestServiceDiscovery_Run(t *testing.T) {
+ tests := map[string]discoverySim{
+ "add pipeline": {
+ configs: []confFile{
+ prepareConfigFile("source", "name"),
+ },
+ wantPipelines: []*mockPipeline{
+ {name: "name", started: true, stopped: false},
+ },
+ },
+ "add disabled pipeline": {
+ configs: []confFile{
+ prepareDisabledConfigFile("source", "name"),
+ },
+ wantPipelines: nil,
+ },
+ "remove pipeline": {
+ configs: []confFile{
+ prepareConfigFile("source", "name"),
+ prepareEmptyConfigFile("source"),
+ },
+ wantPipelines: []*mockPipeline{
+ {name: "name", started: true, stopped: true},
+ },
+ },
+ "re-add pipeline multiple times": {
+ configs: []confFile{
+ prepareConfigFile("source", "name"),
+ prepareConfigFile("source", "name"),
+ prepareConfigFile("source", "name"),
+ },
+ wantPipelines: []*mockPipeline{
+ {name: "name", started: true, stopped: true},
+ {name: "name", started: true, stopped: true},
+ {name: "name", started: true, stopped: false},
+ },
+ },
+ "restart pipeline": {
+ configs: []confFile{
+ prepareConfigFile("source", "name1"),
+ prepareConfigFile("source", "name2"),
+ },
+ wantPipelines: []*mockPipeline{
+ {name: "name1", started: true, stopped: true},
+ {name: "name2", started: true, stopped: false},
+ },
+ },
+ "invalid pipeline config": {
+ configs: []confFile{
+ prepareConfigFile("source", "invalid"),
+ },
+ wantPipelines: nil,
+ },
+ "invalid config for running pipeline": {
+ configs: []confFile{
+ prepareConfigFile("source", "name"),
+ prepareConfigFile("source", "invalid"),
+ },
+ wantPipelines: []*mockPipeline{
+ {name: "name", started: true, stopped: false},
+ },
+ },
+ }
+
+ for name, sim := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim.run(t)
+ })
+ }
+}
+
+func prepareConfigFile(source, name string) confFile {
+ bs, _ := yaml.Marshal(pipeline.Config{Name: name})
+
+ return confFile{
+ source: source,
+ content: bs,
+ }
+}
+
+func prepareEmptyConfigFile(source string) confFile {
+ return confFile{
+ source: source,
+ }
+}
+
+func prepareDisabledConfigFile(source, name string) confFile {
+ bs, _ := yaml.Marshal(pipeline.Config{Name: name, Disabled: true})
+
+ return confFile{
+ source: source,
+ content: bs,
+ }
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go
new file mode 100644
index 000000000..7741221d1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go
@@ -0,0 +1,118 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package sd
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var lock = &sync.Mutex{}
+
+type discoverySim struct {
+ configs []confFile
+ wantPipelines []*mockPipeline
+}
+
+func (sim *discoverySim) run(t *testing.T) {
+ fact := &mockFactory{}
+ mgr := &ServiceDiscovery{
+ Logger: logger.New(),
+ newPipeline: func(config pipeline.Config) (sdPipeline, error) {
+ return fact.create(config)
+ },
+ confProv: &mockConfigProvider{
+ confFiles: sim.configs,
+ ch: make(chan confFile),
+ },
+ pipelines: make(map[string]func()),
+ }
+
+ in := make(chan<- []*confgroup.Group)
+ done := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go func() { defer close(done); mgr.Run(ctx, in) }()
+
+ time.Sleep(time.Second * 3)
+
+ lock.Lock()
+ assert.Equalf(t, sim.wantPipelines, fact.pipelines, "before stop")
+ lock.Unlock()
+
+ cancel()
+
+ timeout := time.Second * 5
+
+ select {
+ case <-done:
+ lock.Lock()
+ for _, pl := range fact.pipelines {
+ assert.Truef(t, pl.stopped, "pipeline '%s' is not stopped after cancel()", pl.name)
+ }
+ lock.Unlock()
+ case <-time.After(timeout):
+ t.Errorf("sd failed to exit in %s", timeout)
+ }
+}
+
+type mockConfigProvider struct {
+ confFiles []confFile
+ ch chan confFile
+}
+
+func (m *mockConfigProvider) run(ctx context.Context) {
+ for _, conf := range m.confFiles {
+ select {
+ case <-ctx.Done():
+ return
+ case m.ch <- conf:
+ }
+ }
+ <-ctx.Done()
+}
+
+func (m *mockConfigProvider) configs() chan confFile {
+ return m.ch
+}
+
+type mockFactory struct {
+ pipelines []*mockPipeline
+}
+
+func (m *mockFactory) create(cfg pipeline.Config) (sdPipeline, error) {
+ lock.Lock()
+ defer lock.Unlock()
+
+ if cfg.Name == "invalid" {
+ return nil, errors.New("mock sdPipelineFactory.create() error")
+ }
+
+ pl := mockPipeline{name: cfg.Name}
+ m.pipelines = append(m.pipelines, &pl)
+
+ return &pl, nil
+}
+
+type mockPipeline struct {
+ name string
+ started bool
+ stopped bool
+}
+
+func (m *mockPipeline) Run(ctx context.Context, _ chan<- []*confgroup.Group) {
+ lock.Lock()
+ m.started = true
+ lock.Unlock()
+ defer func() { lock.Lock(); m.stopped = true; lock.Unlock() }()
+ <-ctx.Done()
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sim_test.go
new file mode 100644
index 000000000..0b777f392
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sim_test.go
@@ -0,0 +1,67 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package discovery
+
+import (
+ "context"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type discoverySim struct {
+ mgr *Manager
+ collectDelay time.Duration
+ expectedGroups []*confgroup.Group
+}
+
+func (sim discoverySim) run(t *testing.T) {
+ t.Helper()
+ require.NotNil(t, sim.mgr)
+
+ in, out := make(chan []*confgroup.Group), make(chan []*confgroup.Group)
+ go sim.collectGroups(t, in, out)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go sim.mgr.Run(ctx, in)
+
+ actualGroups := <-out
+
+ sortGroups(sim.expectedGroups)
+ sortGroups(actualGroups)
+
+ assert.Equal(t, sim.expectedGroups, actualGroups)
+}
+
+func (sim discoverySim) collectGroups(t *testing.T, in, out chan []*confgroup.Group) {
+ time.Sleep(sim.collectDelay)
+
+ timeout := sim.mgr.sendEvery + time.Second*2
+ var groups []*confgroup.Group
+loop:
+ for {
+ select {
+ case inGroups := <-in:
+ if groups = append(groups, inGroups...); len(groups) >= len(sim.expectedGroups) {
+ break loop
+ }
+ case <-time.After(timeout):
+ t.Logf("discovery %s timed out after %s, got %d groups, expected %d, some events are skipped",
+ sim.mgr.discoverers, timeout, len(groups), len(sim.expectedGroups))
+ break loop
+ }
+ }
+ out <- groups
+}
+
+func sortGroups(groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ sort.Slice(groups, func(i, j int) bool { return groups[i].Source < groups[j].Source })
+}