summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go236
1 files changed, 236 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go
new file mode 100644
index 000000000..f69501c39
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go
@@ -0,0 +1,236 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package pipeline
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/dockerd"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/kubernetes"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/netlisteners"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/hostinfo"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+)
+
+func New(cfg Config) (*Pipeline, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, err
+ }
+
+ clr, err := newTargetClassificator(cfg.Classify)
+ if err != nil {
+ return nil, fmt.Errorf("classify rules: %v", err)
+ }
+
+ cmr, err := newConfigComposer(cfg.Compose)
+ if err != nil {
+ return nil, fmt.Errorf("compose rules: %v", err)
+ }
+
+ p := &Pipeline{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("pipeline", cfg.Name),
+ ),
+ configDefaults: cfg.ConfigDefaults,
+ clr: clr,
+ cmr: cmr,
+ accum: newAccumulator(),
+ discoverers: make([]model.Discoverer, 0),
+ configs: make(map[string]map[uint64][]confgroup.Config),
+ }
+ p.accum.Logger = p.Logger
+
+ if err := p.registerDiscoverers(cfg); err != nil {
+ return nil, err
+ }
+
+ return p, nil
+}
+
+type (
+ Pipeline struct {
+ *logger.Logger
+
+ configDefaults confgroup.Registry
+ discoverers []model.Discoverer
+ accum *accumulator
+ clr classificator
+ cmr composer
+ configs map[string]map[uint64][]confgroup.Config // [targetSource][targetHash]
+ }
+ classificator interface {
+ classify(model.Target) model.Tags
+ }
+ composer interface {
+ compose(model.Target) []confgroup.Config
+ }
+)
+
+func (p *Pipeline) registerDiscoverers(conf Config) error {
+ for _, cfg := range conf.Discover {
+ switch cfg.Discoverer {
+ case "net_listeners":
+ cfg.NetListeners.Source = conf.Source
+ td, err := netlisteners.NewDiscoverer(cfg.NetListeners)
+ if err != nil {
+ return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err)
+ }
+ p.discoverers = append(p.discoverers, td)
+ case "docker":
+ if hostinfo.IsInsideK8sCluster() {
+ p.Infof("not registering '%s' discoverer: disabled in k8s environment", cfg.Discoverer)
+ continue
+ }
+ cfg.Docker.Source = conf.Source
+ td, err := dockerd.NewDiscoverer(cfg.Docker)
+ if err != nil {
+ return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err)
+ }
+ p.discoverers = append(p.discoverers, td)
+ case "k8s":
+ for _, k8sCfg := range cfg.K8s {
+ td, err := kubernetes.NewKubeDiscoverer(k8sCfg)
+ if err != nil {
+ return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err)
+ }
+ p.discoverers = append(p.discoverers, td)
+ }
+ default:
+ return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer)
+ }
+ }
+
+ if len(p.discoverers) == 0 {
+ return errors.New("no discoverers registered")
+ }
+
+ return nil
+}
+
+func (p *Pipeline) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ p.Info("instance is started")
+ defer p.Info("instance is stopped")
+
+ p.accum.discoverers = p.discoverers
+
+ updates := make(chan []model.TargetGroup)
+ done := make(chan struct{})
+
+ go func() { defer close(done); p.accum.run(ctx, updates) }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ select {
+ case <-done:
+ case <-time.After(time.Second * 4):
+ }
+ return
+ case <-done:
+ return
+ case tggs := <-updates:
+ p.Debugf("received %d target groups", len(tggs))
+ if cfggs := p.processGroups(tggs); len(cfggs) > 0 {
+ select {
+ case <-ctx.Done():
+ case in <- cfggs: // FIXME: potentially stale configs if upstream cannot receive (blocking)
+ }
+ }
+ }
+ }
+}
+
+func (p *Pipeline) processGroups(tggs []model.TargetGroup) []*confgroup.Group {
+ var groups []*confgroup.Group
+ // updates come from the accumulator, this ensures that all groups have different sources
+ for _, tgg := range tggs {
+ p.Debugf("processing group '%s' with %d target(s)", tgg.Source(), len(tgg.Targets()))
+ if v := p.processGroup(tgg); v != nil {
+ groups = append(groups, v)
+ }
+ }
+ return groups
+}
+
+func (p *Pipeline) processGroup(tgg model.TargetGroup) *confgroup.Group {
+ if len(tgg.Targets()) == 0 {
+ if _, ok := p.configs[tgg.Source()]; !ok {
+ return nil
+ }
+ delete(p.configs, tgg.Source())
+
+ return &confgroup.Group{Source: tgg.Source()}
+ }
+
+ targetsCache, ok := p.configs[tgg.Source()]
+ if !ok {
+ targetsCache = make(map[uint64][]confgroup.Config)
+ p.configs[tgg.Source()] = targetsCache
+ }
+
+ var changed bool
+ seen := make(map[uint64]bool)
+
+ for _, tgt := range tgg.Targets() {
+ if tgt == nil {
+ continue
+ }
+
+ hash := tgt.Hash()
+ seen[hash] = true
+
+ if _, ok := targetsCache[hash]; ok {
+ continue
+ }
+
+ targetsCache[hash] = nil
+
+ if tags := p.clr.classify(tgt); len(tags) > 0 {
+ tgt.Tags().Merge(tags)
+
+ if cfgs := p.cmr.compose(tgt); len(cfgs) > 0 {
+ targetsCache[hash] = cfgs
+ changed = true
+
+ for _, cfg := range cfgs {
+ cfg.SetProvider(tgg.Provider())
+ cfg.SetSource(tgg.Source())
+ cfg.SetSourceType(confgroup.TypeDiscovered)
+ if def, ok := p.configDefaults.Lookup(cfg.Module()); ok {
+ cfg.ApplyDefaults(def)
+ }
+ }
+ }
+ }
+ }
+
+ for hash := range targetsCache {
+ if seen[hash] {
+ continue
+ }
+ if cfgs := targetsCache[hash]; len(cfgs) > 0 {
+ changed = true
+ }
+ delete(targetsCache, hash)
+ }
+
+ if !changed {
+ return nil
+ }
+
+ // TODO: deepcopy?
+ cfgGroup := &confgroup.Group{Source: tgg.Source()}
+
+ for _, cfgs := range targetsCache {
+ cfgGroup.Configs = append(cfgGroup.Configs, cfgs...)
+ }
+
+ return cfgGroup
+}