summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go104
1 files changed, 104 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
new file mode 100644
index 000000000..97b437fc3
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/discovery.go
@@ -0,0 +1,104 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "sync"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+var log = logger.New().With(
+ slog.String("component", "discovery"),
+ slog.String("discoverer", "file"),
+)
+
+func NewDiscovery(cfg Config) (*Discovery, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("file discovery config validation: %v", err)
+ }
+
+ d := Discovery{
+ Logger: log,
+ }
+
+ if err := d.registerDiscoverers(cfg); err != nil {
+ return nil, fmt.Errorf("file discovery initialization: %v", err)
+ }
+
+ return &d, nil
+}
+
+type (
+ Discovery struct {
+ *logger.Logger
+ discoverers []discoverer
+ }
+ discoverer interface {
+ Run(ctx context.Context, in chan<- []*confgroup.Group)
+ }
+)
+
+func (d *Discovery) String() string {
+ return d.Name()
+}
+
+func (d *Discovery) Name() string {
+ return fmt.Sprintf("file discovery: %v", d.discoverers)
+}
+
+func (d *Discovery) registerDiscoverers(cfg Config) error {
+ if len(cfg.Read) != 0 {
+ d.discoverers = append(d.discoverers, NewReader(cfg.Registry, cfg.Read))
+ }
+ if len(cfg.Watch) != 0 {
+ d.discoverers = append(d.discoverers, NewWatcher(cfg.Registry, cfg.Watch))
+ }
+ if len(d.discoverers) == 0 {
+ return errors.New("zero registered discoverers")
+ }
+ return nil
+}
+
+func (d *Discovery) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ var wg sync.WaitGroup
+
+ for _, dd := range d.discoverers {
+ wg.Add(1)
+ go func(dd discoverer) {
+ defer wg.Done()
+ d.runDiscoverer(ctx, dd, in)
+ }(dd)
+ }
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (d *Discovery) runDiscoverer(ctx context.Context, dd discoverer, in chan<- []*confgroup.Group) {
+ updates := make(chan []*confgroup.Group)
+ go dd.Run(ctx, updates)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case groups, ok := <-updates:
+ if !ok {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case in <- groups:
+ }
+ }
+ }
+}