summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go118
1 files changed, 118 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go
new file mode 100644
index 000000000..7741221d1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/sim_test.go
@@ -0,0 +1,118 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package sd
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/stretchr/testify/assert"
+)
+
+var lock = &sync.Mutex{}
+
+type discoverySim struct {
+ configs []confFile
+ wantPipelines []*mockPipeline
+}
+
+func (sim *discoverySim) run(t *testing.T) {
+ fact := &mockFactory{}
+ mgr := &ServiceDiscovery{
+ Logger: logger.New(),
+ newPipeline: func(config pipeline.Config) (sdPipeline, error) {
+ return fact.create(config)
+ },
+ confProv: &mockConfigProvider{
+ confFiles: sim.configs,
+ ch: make(chan confFile),
+ },
+ pipelines: make(map[string]func()),
+ }
+
+ in := make(chan<- []*confgroup.Group)
+ done := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go func() { defer close(done); mgr.Run(ctx, in) }()
+
+ time.Sleep(time.Second * 3)
+
+ lock.Lock()
+ assert.Equalf(t, sim.wantPipelines, fact.pipelines, "before stop")
+ lock.Unlock()
+
+ cancel()
+
+ timeout := time.Second * 5
+
+ select {
+ case <-done:
+ lock.Lock()
+ for _, pl := range fact.pipelines {
+ assert.Truef(t, pl.stopped, "pipeline '%s' is not stopped after cancel()", pl.name)
+ }
+ lock.Unlock()
+ case <-time.After(timeout):
+ t.Errorf("sd failed to exit in %s", timeout)
+ }
+}
+
+type mockConfigProvider struct {
+ confFiles []confFile
+ ch chan confFile
+}
+
+func (m *mockConfigProvider) run(ctx context.Context) {
+ for _, conf := range m.confFiles {
+ select {
+ case <-ctx.Done():
+ return
+ case m.ch <- conf:
+ }
+ }
+ <-ctx.Done()
+}
+
+func (m *mockConfigProvider) configs() chan confFile {
+ return m.ch
+}
+
+type mockFactory struct {
+ pipelines []*mockPipeline
+}
+
+func (m *mockFactory) create(cfg pipeline.Config) (sdPipeline, error) {
+ lock.Lock()
+ defer lock.Unlock()
+
+ if cfg.Name == "invalid" {
+ return nil, errors.New("mock sdPipelineFactory.create() error")
+ }
+
+ pl := mockPipeline{name: cfg.Name}
+ m.pipelines = append(m.pipelines, &pl)
+
+ return &pl, nil
+}
+
+type mockPipeline struct {
+ name string
+ started bool
+ stopped bool
+}
+
+func (m *mockPipeline) Run(ctx context.Context, _ chan<- []*confgroup.Group) {
+ lock.Lock()
+ m.started = true
+ lock.Unlock()
+ defer func() { lock.Lock(); m.stopped = true; lock.Unlock() }()
+ <-ctx.Done()
+}