diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go new file mode 100644 index 000000000..ab84c979e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/sd.go @@ -0,0 +1,147 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package sd + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/pipeline" + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/netdata/netdata/go/go.d.plugin/pkg/multipath" + + "gopkg.in/yaml.v2" +) + +type Config struct { + ConfigDefaults confgroup.Registry + ConfDir multipath.MultiPath +} + +func NewServiceDiscovery(cfg Config) (*ServiceDiscovery, error) { + log := logger.New().With( + slog.String("component", "service discovery"), + ) + + d := &ServiceDiscovery{ + Logger: log, + confProv: newConfFileReader(log, cfg.ConfDir), + configDefaults: cfg.ConfigDefaults, + newPipeline: func(config pipeline.Config) (sdPipeline, error) { + return pipeline.New(config) + }, + pipelines: make(map[string]func()), + } + + return d, nil +} + +type ( + ServiceDiscovery struct { + *logger.Logger + + confProv confFileProvider + + configDefaults confgroup.Registry + newPipeline func(config pipeline.Config) (sdPipeline, error) + pipelines map[string]func() + } + sdPipeline interface { + Run(ctx context.Context, in chan<- []*confgroup.Group) + } + confFileProvider interface { + run(ctx context.Context) + configs() chan confFile + } +) + +func (d *ServiceDiscovery) String() string { + return "service discovery" +} + +func (d *ServiceDiscovery) Run(ctx context.Context, in chan<- []*confgroup.Group) { + d.Info("instance is started") + defer func() { d.cleanup(); d.Info("instance is stopped") }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); d.confProv.run(ctx) }() + + wg.Add(1) + go func() { defer wg.Done(); d.run(ctx, in) }() + + wg.Wait() + <-ctx.Done() +} + +func (d *ServiceDiscovery) run(ctx context.Context, in chan<- []*confgroup.Group) { + for { + select { + case <-ctx.Done(): + return + case cfg := <-d.confProv.configs(): + if cfg.source == "" { + continue + } + if len(cfg.content) == 0 { + d.removePipeline(cfg) + } else { + d.addPipeline(ctx, cfg, in) + } + } + } +} + +func (d *ServiceDiscovery) removePipeline(conf confFile) { + if stop, ok := d.pipelines[conf.source]; ok { + d.Infof("received an empty config, stopping the pipeline ('%s')", conf.source) + delete(d.pipelines, conf.source) + stop() + } +} + +func (d *ServiceDiscovery) addPipeline(ctx context.Context, conf confFile, in chan<- []*confgroup.Group) { + var cfg pipeline.Config + + if err := yaml.Unmarshal(conf.content, &cfg); err != nil { + d.Error(err) + return + } + + if cfg.Disabled { + d.Infof("pipeline config is disabled '%s' (%s)", cfg.Name, cfg.Source) + return + } + + cfg.Source = fmt.Sprintf("file=%s", conf.source) + cfg.ConfigDefaults = d.configDefaults + + pl, err := d.newPipeline(cfg) + if err != nil { + d.Error(err) + return + } + + if stop, ok := d.pipelines[conf.source]; ok { + stop() + } + + var wg sync.WaitGroup + plCtx, cancel := context.WithCancel(ctx) + + wg.Add(1) + go func() { defer wg.Done(); pl.Run(plCtx, in) }() + + stop := func() { cancel(); wg.Wait() } + d.pipelines[conf.source] = stop +} + +func (d *ServiceDiscovery) cleanup() { + for _, stop := range d.pipelines { + stop() + } +} |