summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/manager.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/manager.go199
1 files changed, 199 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/manager.go b/src/go/collectors/go.d.plugin/agent/discovery/manager.go
new file mode 100644
index 000000000..ac9ee2211
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/manager.go
@@ -0,0 +1,199 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package discovery
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/dummy"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/file"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func NewManager(cfg Config) (*Manager, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("discovery manager config validation: %v", err)
+ }
+
+ mgr := &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "discovery manager"),
+ ),
+ send: make(chan struct{}, 1),
+ sendEvery: time.Second * 2, // timeout to aggregate changes
+ discoverers: make([]discoverer, 0),
+ mux: &sync.RWMutex{},
+ cache: newCache(),
+ }
+
+ if err := mgr.registerDiscoverers(cfg); err != nil {
+ return nil, fmt.Errorf("discovery manager initializaion: %v", err)
+ }
+
+ return mgr, nil
+}
+
+type discoverer interface {
+ Run(ctx context.Context, in chan<- []*confgroup.Group)
+}
+
+type Manager struct {
+ *logger.Logger
+ discoverers []discoverer
+ send chan struct{}
+ sendEvery time.Duration
+ mux *sync.RWMutex
+ cache *cache
+}
+
+func (m *Manager) String() string {
+ return fmt.Sprintf("discovery manager: %v", m.discoverers)
+}
+
+func (m *Manager) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ m.Info("instance is started")
+ defer func() { m.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ for _, d := range m.discoverers {
+ wg.Add(1)
+ go func(d discoverer) {
+ defer wg.Done()
+ m.runDiscoverer(ctx, d)
+ }(d)
+ }
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ m.sendLoop(ctx, in)
+ }()
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (m *Manager) registerDiscoverers(cfg Config) error {
+ if len(cfg.File.Read) > 0 || len(cfg.File.Watch) > 0 {
+ cfg.File.Registry = cfg.Registry
+ d, err := file.NewDiscovery(cfg.File)
+ if err != nil {
+ return err
+ }
+ m.discoverers = append(m.discoverers, d)
+ }
+
+ if len(cfg.Dummy.Names) > 0 {
+ cfg.Dummy.Registry = cfg.Registry
+ d, err := dummy.NewDiscovery(cfg.Dummy)
+ if err != nil {
+ return err
+ }
+ m.discoverers = append(m.discoverers, d)
+ }
+
+ if len(cfg.SD.ConfDir) != 0 {
+ cfg.SD.ConfigDefaults = cfg.Registry
+ d, err := sd.NewServiceDiscovery(cfg.SD)
+ if err != nil {
+ return err
+ }
+ m.discoverers = append(m.discoverers, d)
+ }
+
+ if len(m.discoverers) == 0 {
+ return errors.New("zero registered discoverers")
+ }
+
+ m.Infof("registered discoverers: %v", m.discoverers)
+
+ return nil
+}
+
+func (m *Manager) runDiscoverer(ctx context.Context, d discoverer) {
+ updates := make(chan []*confgroup.Group)
+ go d.Run(ctx, updates)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case groups, ok := <-updates:
+ if !ok {
+ return
+ }
+ func() {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ m.cache.update(groups)
+ m.triggerSend()
+ }()
+ }
+ }
+}
+
+func (m *Manager) sendLoop(ctx context.Context, in chan<- []*confgroup.Group) {
+ m.mustSend(ctx, in)
+
+ tk := time.NewTicker(m.sendEvery)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ select {
+ case <-m.send:
+ m.trySend(in)
+ default:
+ }
+ }
+ }
+}
+
+func (m *Manager) mustSend(ctx context.Context, in chan<- []*confgroup.Group) {
+ select {
+ case <-ctx.Done():
+ return
+ case <-m.send:
+ m.mux.Lock()
+ groups := m.cache.groups()
+ m.cache.reset()
+ m.mux.Unlock()
+
+ select {
+ case <-ctx.Done():
+ case in <- groups:
+ }
+ return
+ }
+}
+
+func (m *Manager) trySend(in chan<- []*confgroup.Group) {
+ m.mux.Lock()
+ defer m.mux.Unlock()
+
+ select {
+ case in <- m.cache.groups():
+ m.cache.reset()
+ default:
+ m.triggerSend()
+ }
+}
+
+func (m *Manager) triggerSend() {
+ select {
+ case m.send <- struct{}{}:
+ default:
+ }
+}