diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go new file mode 100644 index 000000000..4d391d41e --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline.go @@ -0,0 +1,236 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/dockerd" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/kubernetes" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/discoverer/netlisteners" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/hostinfo" +) + +func New(cfg Config) (*Pipeline, error) { + if err := validateConfig(cfg); err != nil { + return nil, err + } + + clr, err := newTargetClassificator(cfg.Classify) + if err != nil { + return nil, fmt.Errorf("classify rules: %v", err) + } + + cmr, err := newConfigComposer(cfg.Compose) + if err != nil { + return nil, fmt.Errorf("compose rules: %v", err) + } + + p := &Pipeline{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("pipeline", cfg.Name), + ), + configDefaults: cfg.ConfigDefaults, + clr: clr, + cmr: cmr, + accum: newAccumulator(), + discoverers: make([]model.Discoverer, 0), + configs: make(map[string]map[uint64][]confgroup.Config), + } + p.accum.Logger = p.Logger + + if err := p.registerDiscoverers(cfg); err != nil { + return nil, err + } + + return p, nil +} + +type ( + Pipeline struct { + *logger.Logger + + configDefaults confgroup.Registry + discoverers []model.Discoverer + accum *accumulator + clr classificator + cmr composer + configs map[string]map[uint64][]confgroup.Config // [targetSource][targetHash] + } + classificator interface { + classify(model.Target) model.Tags + } + composer interface { + compose(model.Target) []confgroup.Config + } +) + +func (p *Pipeline) registerDiscoverers(conf Config) error { + for _, cfg := range conf.Discover { + switch cfg.Discoverer { + case "net_listeners": + cfg.NetListeners.Source = conf.Source + td, err := netlisteners.NewDiscoverer(cfg.NetListeners) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + case "docker": + if hostinfo.IsInsideK8sCluster() { + p.Infof("not registering '%s' discoverer: disabled in k8s environment", cfg.Discoverer) + continue + } + cfg.Docker.Source = conf.Source + td, err := dockerd.NewDiscoverer(cfg.Docker) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + case "k8s": + for _, k8sCfg := range cfg.K8s { + td, err := kubernetes.NewKubeDiscoverer(k8sCfg) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + } + default: + return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer) + } + } + + if len(p.discoverers) == 0 { + return errors.New("no discoverers registered") + } + + return nil +} + +func (p *Pipeline) Run(ctx context.Context, in chan<- []*confgroup.Group) { + p.Info("instance is started") + defer p.Info("instance is stopped") + + p.accum.discoverers = p.discoverers + + updates := make(chan []model.TargetGroup) + done := make(chan struct{}) + + go func() { defer close(done); p.accum.run(ctx, updates) }() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + case <-time.After(time.Second * 4): + } + return + case <-done: + return + case tggs := <-updates: + p.Debugf("received %d target groups", len(tggs)) + if cfggs := p.processGroups(tggs); len(cfggs) > 0 { + select { + case <-ctx.Done(): + case in <- cfggs: // FIXME: potentially stale configs if upstream cannot receive (blocking) + } + } + } + } +} + +func (p *Pipeline) processGroups(tggs []model.TargetGroup) []*confgroup.Group { + var groups []*confgroup.Group + // updates come from the accumulator, this ensures that all groups have different sources + for _, tgg := range tggs { + p.Debugf("processing group '%s' with %d target(s)", tgg.Source(), len(tgg.Targets())) + if v := p.processGroup(tgg); v != nil { + groups = append(groups, v) + } + } + return groups +} + +func (p *Pipeline) processGroup(tgg model.TargetGroup) *confgroup.Group { + if len(tgg.Targets()) == 0 { + if _, ok := p.configs[tgg.Source()]; !ok { + return nil + } + delete(p.configs, tgg.Source()) + + return &confgroup.Group{Source: tgg.Source()} + } + + targetsCache, ok := p.configs[tgg.Source()] + if !ok { + targetsCache = make(map[uint64][]confgroup.Config) + p.configs[tgg.Source()] = targetsCache + } + + var changed bool + seen := make(map[uint64]bool) + + for _, tgt := range tgg.Targets() { + if tgt == nil { + continue + } + + hash := tgt.Hash() + seen[hash] = true + + if _, ok := targetsCache[hash]; ok { + continue + } + + targetsCache[hash] = nil + + if tags := p.clr.classify(tgt); len(tags) > 0 { + tgt.Tags().Merge(tags) + + if cfgs := p.cmr.compose(tgt); len(cfgs) > 0 { + targetsCache[hash] = cfgs + changed = true + + for _, cfg := range cfgs { + cfg.SetProvider(tgg.Provider()) + cfg.SetSource(tgg.Source()) + cfg.SetSourceType(confgroup.TypeDiscovered) + if def, ok := p.configDefaults.Lookup(cfg.Module()); ok { + cfg.ApplyDefaults(def) + } + } + } + } + } + + for hash := range targetsCache { + if seen[hash] { + continue + } + if cfgs := targetsCache[hash]; len(cfgs) > 0 { + changed = true + } + delete(targetsCache, hash) + } + + if !changed { + return nil + } + + // TODO: deepcopy? + cfgGroup := &confgroup.Group{Source: tgg.Source()} + + for _, cfgs := range targetsCache { + cfgGroup.Configs = append(cfgGroup.Configs, cfgs...) + } + + return cfgGroup +} |