diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go new file mode 100644 index 000000000..a84212734 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go @@ -0,0 +1,152 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +func newAccumulator() *accumulator { + return &accumulator{ + send: make(chan struct{}, 1), + sendEvery: time.Second * 2, + mux: &sync.Mutex{}, + tggs: make(map[string]model.TargetGroup), + } +} + +type accumulator struct { + *logger.Logger + discoverers []model.Discoverer + send chan struct{} + sendEvery time.Duration + mux *sync.Mutex + tggs map[string]model.TargetGroup +} + +func (a *accumulator) run(ctx context.Context, in chan []model.TargetGroup) { + updates := make(chan []model.TargetGroup) + + var wg sync.WaitGroup + for _, d := range a.discoverers { + wg.Add(1) + d := d + go func() { defer wg.Done(); a.runDiscoverer(ctx, d, updates) }() + } + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + tk := time.NewTicker(a.sendEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + a.Info("all discoverers exited") + case <-time.After(time.Second * 3): + a.Warning("not all discoverers exited") + } + a.trySend(in) + return + case <-done: + if !isDone(ctx) { + a.Info("all discoverers exited before ctx done") + } else { + a.Info("all discoverers exited") + } + a.trySend(in) + return + case <-tk.C: + select { + case <-a.send: + a.trySend(in) + default: + } + } + } +} + +func (a *accumulator) runDiscoverer(ctx context.Context, d model.Discoverer, updates chan []model.TargetGroup) { + done := make(chan struct{}) + go func() { defer close(done); d.Discover(ctx, updates) }() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + case <-time.After(time.Second * 2): + a.Warningf("discoverer '%v' didn't exit on ctx done", d) + } + return + case <-done: + if !isDone(ctx) { + a.Infof("discoverer '%v' exited before ctx done", d) + } + return + case tggs := <-updates: + a.mux.Lock() + a.groupsUpdate(tggs) + a.mux.Unlock() + a.triggerSend() + } + } +} + +func (a *accumulator) trySend(in chan<- []model.TargetGroup) { + a.mux.Lock() + defer a.mux.Unlock() + + select { + case in <- a.groupsList(): + a.groupsReset() + default: + a.triggerSend() + } +} + +func (a *accumulator) triggerSend() { + select { + case a.send <- struct{}{}: + default: + } +} + +func (a *accumulator) groupsUpdate(tggs []model.TargetGroup) { + for _, tgg := range tggs { + a.tggs[tgg.Source()] = tgg + } +} + +func (a *accumulator) groupsReset() { + for key := range a.tggs { + delete(a.tggs, key) + } +} + +func (a *accumulator) groupsList() []model.TargetGroup { + tggs := make([]model.TargetGroup, 0, len(a.tggs)) + for _, tgg := range a.tggs { + if tgg != nil { + tggs = append(tggs, tgg) + } + } + return tggs +} + +func isDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} |