1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
// SPDX-License-Identifier: GPL-3.0-or-later
package sd
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
"github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline"
"github.com/netdata/netdata/go/go.d.plugin/logger"
"github.com/stretchr/testify/assert"
)
var lock = &sync.Mutex{}
type discoverySim struct {
configs []confFile
wantPipelines []*mockPipeline
}
func (sim *discoverySim) run(t *testing.T) {
fact := &mockFactory{}
mgr := &ServiceDiscovery{
Logger: logger.New(),
newPipeline: func(config pipeline.Config) (sdPipeline, error) {
return fact.create(config)
},
confProv: &mockConfigProvider{
confFiles: sim.configs,
ch: make(chan confFile),
},
pipelines: make(map[string]func()),
}
in := make(chan<- []*confgroup.Group)
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() { defer close(done); mgr.Run(ctx, in) }()
time.Sleep(time.Second * 3)
lock.Lock()
assert.Equalf(t, sim.wantPipelines, fact.pipelines, "before stop")
lock.Unlock()
cancel()
timeout := time.Second * 5
select {
case <-done:
lock.Lock()
for _, pl := range fact.pipelines {
assert.Truef(t, pl.stopped, "pipeline '%s' is not stopped after cancel()", pl.name)
}
lock.Unlock()
case <-time.After(timeout):
t.Errorf("sd failed to exit in %s", timeout)
}
}
type mockConfigProvider struct {
confFiles []confFile
ch chan confFile
}
func (m *mockConfigProvider) run(ctx context.Context) {
for _, conf := range m.confFiles {
select {
case <-ctx.Done():
return
case m.ch <- conf:
}
}
<-ctx.Done()
}
func (m *mockConfigProvider) configs() chan confFile {
return m.ch
}
type mockFactory struct {
pipelines []*mockPipeline
}
func (m *mockFactory) create(cfg pipeline.Config) (sdPipeline, error) {
lock.Lock()
defer lock.Unlock()
if cfg.Name == "invalid" {
return nil, errors.New("mock sdPipelineFactory.create() error")
}
pl := mockPipeline{name: cfg.Name}
m.pipelines = append(m.pipelines, &pl)
return &pl, nil
}
type mockPipeline struct {
name string
started bool
stopped bool
}
func (m *mockPipeline) Run(ctx context.Context, _ chan<- []*confgroup.Group) {
lock.Lock()
m.started = true
lock.Unlock()
defer func() { lock.Lock(); m.stopped = true; lock.Unlock() }()
<-ctx.Done()
}
|