diff options
Diffstat (limited to '')
14 files changed, 2629 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go new file mode 100644 index 000000000..a84212734 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/accumulator.go @@ -0,0 +1,152 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +func newAccumulator() *accumulator { + return &accumulator{ + send: make(chan struct{}, 1), + sendEvery: time.Second * 2, + mux: &sync.Mutex{}, + tggs: make(map[string]model.TargetGroup), + } +} + +type accumulator struct { + *logger.Logger + discoverers []model.Discoverer + send chan struct{} + sendEvery time.Duration + mux *sync.Mutex + tggs map[string]model.TargetGroup +} + +func (a *accumulator) run(ctx context.Context, in chan []model.TargetGroup) { + updates := make(chan []model.TargetGroup) + + var wg sync.WaitGroup + for _, d := range a.discoverers { + wg.Add(1) + d := d + go func() { defer wg.Done(); a.runDiscoverer(ctx, d, updates) }() + } + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + tk := time.NewTicker(a.sendEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + a.Info("all discoverers exited") + case <-time.After(time.Second * 3): + a.Warning("not all discoverers exited") + } + a.trySend(in) + return + case <-done: + if !isDone(ctx) { + a.Info("all discoverers exited before ctx done") + } else { + a.Info("all discoverers exited") + } + a.trySend(in) + return + case <-tk.C: + select { + case <-a.send: + a.trySend(in) + default: + } + } + } +} + +func (a *accumulator) runDiscoverer(ctx context.Context, d model.Discoverer, updates chan []model.TargetGroup) { + done := make(chan struct{}) + go func() { defer close(done); d.Discover(ctx, updates) }() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + case <-time.After(time.Second * 2): + a.Warningf("discoverer '%v' didn't exit on ctx done", d) + } + return + case <-done: + if !isDone(ctx) { + a.Infof("discoverer '%v' exited before ctx done", d) + } + return + case tggs := <-updates: + a.mux.Lock() + a.groupsUpdate(tggs) + a.mux.Unlock() + a.triggerSend() + } + } +} + +func (a *accumulator) trySend(in chan<- []model.TargetGroup) { + a.mux.Lock() + defer a.mux.Unlock() + + select { + case in <- a.groupsList(): + a.groupsReset() + default: + a.triggerSend() + } +} + +func (a *accumulator) triggerSend() { + select { + case a.send <- struct{}{}: + default: + } +} + +func (a *accumulator) groupsUpdate(tggs []model.TargetGroup) { + for _, tgg := range tggs { + a.tggs[tgg.Source()] = tgg + } +} + +func (a *accumulator) groupsReset() { + for key := range a.tggs { + delete(a.tggs, key) + } +} + +func (a *accumulator) groupsList() []model.TargetGroup { + tggs := make([]model.TargetGroup, 0, len(a.tggs)) + for _, tgg := range a.tggs { + if tgg != nil { + tggs = append(tggs, tgg) + } + } + return tggs +} + +func isDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go new file mode 100644 index 000000000..bd686b306 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify.go @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "bytes" + "fmt" + "strings" + "text/template" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +func newTargetClassificator(cfg []ClassifyRuleConfig) (*targetClassificator, error) { + rules, err := newClassifyRules(cfg) + if err != nil { + return nil, err + } + + c := &targetClassificator{ + rules: rules, + buf: bytes.Buffer{}, + } + + return c, nil +} + +type ( + targetClassificator struct { + *logger.Logger + rules []*classifyRule + buf bytes.Buffer + } + + classifyRule struct { + name string + sr selector + tags model.Tags + match []*classifyRuleMatch + } + classifyRuleMatch struct { + tags model.Tags + expr *template.Template + } +) + +func (c *targetClassificator) classify(tgt model.Target) model.Tags { + tgtTags := tgt.Tags().Clone() + var tags model.Tags + + for i, rule := range c.rules { + if !rule.sr.matches(tgtTags) { + continue + } + + for j, match := range rule.match { + c.buf.Reset() + + if err := match.expr.Execute(&c.buf, tgt); err != nil { + c.Warningf("failed to execute classify rule[%d]->match[%d]->expr on target '%s'", i+1, j+1, tgt.TUID()) + continue + } + if strings.TrimSpace(c.buf.String()) != "true" { + continue + } + + if tags == nil { + tags = model.NewTags() + } + + tags.Merge(rule.tags) + tags.Merge(match.tags) + tgtTags.Merge(tags) + } + } + + return tags +} + +func newClassifyRules(cfg []ClassifyRuleConfig) ([]*classifyRule, error) { + var rules []*classifyRule + + fmap := newFuncMap() + + for i, ruleCfg := range cfg { + i++ + rule := classifyRule{name: ruleCfg.Name} + + sr, err := parseSelector(ruleCfg.Selector) + if err != nil { + return nil, fmt.Errorf("rule '%d': %v", i, err) + } + rule.sr = sr + + tags, err := model.ParseTags(ruleCfg.Tags) + if err != nil { + return nil, fmt.Errorf("rule '%d': %v", i, err) + } + rule.tags = tags + + for j, matchCfg := range ruleCfg.Match { + j++ + var match classifyRuleMatch + + tags, err := model.ParseTags(matchCfg.Tags) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + match.tags = tags + + tmpl, err := parseTemplate(matchCfg.Expr, fmap) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + match.expr = tmpl + + rule.match = append(rule.match, &match) + } + + rules = append(rules, &rule) + } + + return rules, nil +} + +func parseTemplate(s string, fmap template.FuncMap) (*template.Template, error) { + return template.New("root"). + Option("missingkey=error"). + Funcs(fmap). + Parse(s) +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go new file mode 100644 index 000000000..214c96cf7 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/classify_test.go @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestTargetClassificator_classify(t *testing.T) { + config := ` +- selector: "rule0" + tags: "skip" + match: + - tags: "skip" + expr: '{{ glob .Name "*" }}' +- selector: "!skip rule1" + tags: "foo1" + match: + - tags: "bar1" + expr: '{{ glob .Name "mock*1*" }}' + - tags: "bar2" + expr: '{{ glob .Name "mock*2*" }}' +- selector: "!skip rule2" + tags: "foo2" + match: + - tags: "bar3" + expr: '{{ glob .Name "mock*3*" }}' + - tags: "bar4" + expr: '{{ glob .Name "mock*4*" }}' +- selector: "rule3" + tags: "foo3" + match: + - tags: "bar5" + expr: '{{ glob .Name "mock*5*" }}' + - tags: "bar6" + expr: '{{ glob .Name "mock*6*" }}' +` + tests := map[string]struct { + target model.Target + wantTags model.Tags + }{ + "no rules match": { + target: newMockTarget("mock1"), + wantTags: nil, + }, + "one rule one match": { + target: newMockTarget("mock4", "rule2"), + wantTags: mustParseTags("foo2 bar4"), + }, + "one rule two match": { + target: newMockTarget("mock56", "rule3"), + wantTags: mustParseTags("foo3 bar5 bar6"), + }, + "all rules all matches": { + target: newMockTarget("mock123456", "rule1 rule2 rule3"), + wantTags: mustParseTags("foo1 foo2 foo3 bar1 bar2 bar3 bar4 bar5 bar6"), + }, + "applying labels after every rule": { + target: newMockTarget("mock123456", "rule0 rule1 rule2 rule3"), + wantTags: mustParseTags("skip foo3 bar5 bar6"), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var cfg []ClassifyRuleConfig + + err := yaml.Unmarshal([]byte(config), &cfg) + require.NoError(t, err, "yaml unmarshalling of config") + + clr, err := newTargetClassificator(cfg) + require.NoError(t, err, "targetClassificator creation") + + assert.Equal(t, test.wantTags, clr.classify(test.target)) + }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go new file mode 100644 index 000000000..de2ed21b8 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose.go @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "bytes" + "errors" + "fmt" + "text/template" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "gopkg.in/yaml.v2" +) + +func newConfigComposer(cfg []ComposeRuleConfig) (*configComposer, error) { + rules, err := newComposeRules(cfg) + if err != nil { + return nil, err + } + + c := &configComposer{ + rules: rules, + buf: bytes.Buffer{}, + } + + return c, nil +} + +type ( + configComposer struct { + *logger.Logger + rules []*composeRule + buf bytes.Buffer + } + + composeRule struct { + name string + sr selector + conf []*composeRuleConf + } + composeRuleConf struct { + sr selector + tmpl *template.Template + } +) + +func (c *configComposer) compose(tgt model.Target) []confgroup.Config { + var configs []confgroup.Config + + for i, rule := range c.rules { + if !rule.sr.matches(tgt.Tags()) { + continue + } + + for j, conf := range rule.conf { + if !conf.sr.matches(tgt.Tags()) { + continue + } + + c.buf.Reset() + + if err := conf.tmpl.Execute(&c.buf, tgt); err != nil { + c.Warningf("failed to execute rule[%d]->config[%d]->template on target '%s': %v", + i+1, j+1, tgt.TUID(), err) + continue + } + if c.buf.Len() == 0 { + continue + } + + cfgs, err := c.parseTemplateData(c.buf.Bytes()) + if err != nil { + c.Warningf("failed to parse template data: %v", err) + continue + } + + configs = append(configs, cfgs...) + } + } + + if len(configs) > 0 { + c.Debugf("created %d config(s) for target '%s'", len(configs), tgt.TUID()) + } + return configs +} + +func (c *configComposer) parseTemplateData(bs []byte) ([]confgroup.Config, error) { + var data any + if err := yaml.Unmarshal(bs, &data); err != nil { + return nil, err + } + + type ( + single = map[any]any + multi = []any + ) + + switch data.(type) { + case single: + var cfg confgroup.Config + if err := yaml.Unmarshal(bs, &cfg); err != nil { + return nil, err + } + return []confgroup.Config{cfg}, nil + case multi: + var cfgs []confgroup.Config + if err := yaml.Unmarshal(bs, &cfgs); err != nil { + return nil, err + } + return cfgs, nil + default: + return nil, errors.New("unknown config format") + } +} + +func newComposeRules(cfg []ComposeRuleConfig) ([]*composeRule, error) { + var rules []*composeRule + + fmap := newFuncMap() + + for i, ruleCfg := range cfg { + i++ + rule := composeRule{name: ruleCfg.Name} + + sr, err := parseSelector(ruleCfg.Selector) + if err != nil { + return nil, fmt.Errorf("rule '%d': %v", i, err) + } + rule.sr = sr + + for j, confCfg := range ruleCfg.Config { + j++ + var conf composeRuleConf + + sr, err := parseSelector(confCfg.Selector) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + conf.sr = sr + + tmpl, err := parseTemplate(confCfg.Template, fmap) + if err != nil { + return nil, fmt.Errorf("rule '%d/%d': %v", i, j, err) + } + conf.tmpl = tmpl + + rule.conf = append(rule.conf, &conf) + } + + rules = append(rules, &rule) + } + + return rules, nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go new file mode 100644 index 000000000..fa758bcd3 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/compose_test.go @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestConfigComposer_compose(t *testing.T) { + config := ` +- selector: "rule1" + config: + - selector: "bar1" + template: | + name: {{ .Name }}-1 + - selector: "bar2" + template: | + name: {{ .Name }}-2 +- selector: "rule2" + config: + - selector: "bar3" + template: | + name: {{ .Name }}-3 + - selector: "bar4" + template: | + name: {{ .Name }}-4 +- selector: "rule3" + config: + - selector: "bar5" + template: | + name: {{ .Name }}-5 + - selector: "bar6" + template: | + - name: {{ .Name }}-6 + - name: {{ .Name }}-7 +` + tests := map[string]struct { + target model.Target + wantConfigs []confgroup.Config + }{ + "no rules matches": { + target: newMockTarget("mock"), + wantConfigs: nil, + }, + "one rule one config": { + target: newMockTarget("mock", "rule1 bar1"), + wantConfigs: []confgroup.Config{ + {"name": "mock-1"}, + }, + }, + "one rule two config": { + target: newMockTarget("mock", "rule2 bar3 bar4"), + wantConfigs: []confgroup.Config{ + {"name": "mock-3"}, + {"name": "mock-4"}, + }, + }, + "all rules all configs": { + target: newMockTarget("mock", "rule1 bar1 bar2 rule2 bar3 bar4 rule3 bar5 bar6"), + wantConfigs: []confgroup.Config{ + {"name": "mock-1"}, + {"name": "mock-2"}, + {"name": "mock-3"}, + {"name": "mock-4"}, + {"name": "mock-5"}, + {"name": "mock-6"}, + {"name": "mock-7"}, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + var cfg []ComposeRuleConfig + + err := yaml.Unmarshal([]byte(config), &cfg) + require.NoErrorf(t, err, "yaml unmarshalling of config") + + cmr, err := newConfigComposer(cfg) + require.NoErrorf(t, err, "configComposer creation") + + assert.Equal(t, test.wantConfigs, cmr.compose(test.target)) + }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go new file mode 100644 index 000000000..4dac63f0f --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/config.go @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "errors" + "fmt" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/dockerd" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/kubernetes" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/netlisteners" +) + +type Config struct { + Source string `yaml:"-"` + ConfigDefaults confgroup.Registry `yaml:"-"` + + Disabled bool `yaml:"disabled"` + Name string `yaml:"name"` + Discover []DiscoveryConfig `yaml:"discover"` + Classify []ClassifyRuleConfig `yaml:"classify"` + Compose []ComposeRuleConfig `yaml:"compose"` +} + +type DiscoveryConfig struct { + Discoverer string `yaml:"discoverer"` + NetListeners netlisteners.Config `yaml:"net_listeners"` + Docker dockerd.Config `yaml:"docker"` + K8s []kubernetes.Config `yaml:"k8s"` +} + +type ClassifyRuleConfig struct { + Name string `yaml:"name"` + Selector string `yaml:"selector"` // mandatory + Tags string `yaml:"tags"` // mandatory + Match []struct { + Tags string `yaml:"tags"` // mandatory + Expr string `yaml:"expr"` // mandatory + } `yaml:"match"` // mandatory, at least 1 +} + +type ComposeRuleConfig struct { + Name string `yaml:"name"` // optional + Selector string `yaml:"selector"` // mandatory + Config []struct { + Selector string `yaml:"selector"` // mandatory + Template string `yaml:"template"` // mandatory + } `yaml:"config"` // mandatory, at least 1 +} + +func validateConfig(cfg Config) error { + if cfg.Name == "" { + return errors.New("'name' not set") + } + if err := validateDiscoveryConfig(cfg.Discover); err != nil { + return fmt.Errorf("discover config: %v", err) + } + if err := validateClassifyConfig(cfg.Classify); err != nil { + return fmt.Errorf("classify rules: %v", err) + } + if err := validateComposeConfig(cfg.Compose); err != nil { + return fmt.Errorf("compose rules: %v", err) + } + return nil +} + +func validateDiscoveryConfig(config []DiscoveryConfig) error { + if len(config) == 0 { + return errors.New("no discoverers, must be at least one") + } + for _, cfg := range config { + switch cfg.Discoverer { + case "net_listeners", "docker", "k8s": + default: + return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer) + } + } + return nil +} + +func validateClassifyConfig(rules []ClassifyRuleConfig) error { + if len(rules) == 0 { + return errors.New("empty config, need least 1 rule") + } + for i, rule := range rules { + i++ + if rule.Selector == "" { + return fmt.Errorf("'rule[%s][%d]->selector' not set", rule.Name, i) + } + if rule.Tags == "" { + return fmt.Errorf("'rule[%s][%d]->tags' not set", rule.Name, i) + } + if len(rule.Match) == 0 { + return fmt.Errorf("'rule[%s][%d]->match' not set, need at least 1 rule match", rule.Name, i) + } + + for j, match := range rule.Match { + j++ + if match.Tags == "" { + return fmt.Errorf("'rule[%s][%d]->match[%d]->tags' not set", rule.Name, i, j) + } + if match.Expr == "" { + return fmt.Errorf("'rule[%s][%d]->match[%d]->expr' not set", rule.Name, i, j) + } + } + } + return nil +} + +func validateComposeConfig(rules []ComposeRuleConfig) error { + if len(rules) == 0 { + return errors.New("empty config, need least 1 rule") + } + for i, rule := range rules { + i++ + if rule.Selector == "" { + return fmt.Errorf("'rule[%s][%d]->selector' not set", rule.Name, i) + } + + if len(rule.Config) == 0 { + return fmt.Errorf("'rule[%s][%d]->config' not set", rule.Name, i) + } + + for j, conf := range rule.Config { + j++ + if conf.Selector == "" { + return fmt.Errorf("'rule[%s][%d]->config[%d]->selector' not set", rule.Name, i, j) + } + if conf.Template == "" { + return fmt.Errorf("'rule[%s][%d]->config[%d]->template' not set", rule.Name, i, j) + } + } + } + return nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go new file mode 100644 index 000000000..8a9698b65 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap.go @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "regexp" + "strconv" + "text/template" + + "github.com/netdata/netdata/go/go.d.plugin/pkg/matcher" + + "github.com/Masterminds/sprig/v3" + "github.com/bmatcuk/doublestar/v4" +) + +func newFuncMap() template.FuncMap { + custom := map[string]interface{}{ + "match": funcMatchAny, + "glob": func(value, pattern string, patterns ...string) bool { + return funcMatchAny("glob", value, pattern, patterns...) + }, + "promPort": func(port string) string { + v, _ := strconv.Atoi(port) + return prometheusPortAllocations[v] + }, + } + + fm := sprig.HermeticTxtFuncMap() + + for name, fn := range custom { + fm[name] = fn + } + + return fm +} + +func funcMatchAny(typ, value, pattern string, patterns ...string) bool { + switch len(patterns) { + case 0: + return funcMatch(typ, value, pattern) + default: + return funcMatch(typ, value, pattern) || funcMatchAny(typ, value, patterns[0], patterns[1:]...) + } +} + +func funcMatch(typ string, value, pattern string) bool { + switch typ { + case "glob", "": + m, err := matcher.NewGlobMatcher(pattern) + return err == nil && m.MatchString(value) + case "sp": + m, err := matcher.NewSimplePatternsMatcher(pattern) + return err == nil && m.MatchString(value) + case "re": + ok, err := regexp.MatchString(pattern, value) + return err == nil && ok + case "dstar": + ok, err := doublestar.Match(pattern, value) + return err == nil && ok + default: + return false + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go new file mode 100644 index 000000000..3de71ef70 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/funcmap_test.go @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_funcMatchAny(t *testing.T) { + tests := map[string]struct { + typ string + patterns []string + value string + wantMatch bool + }{ + "dstar: one param, matches": { + wantMatch: true, + typ: "dstar", + patterns: []string{"*"}, + value: "value", + }, + "dstar: one param, matches with *": { + wantMatch: true, + typ: "dstar", + patterns: []string{"**/value"}, + value: "/one/two/three/value", + }, + "dstar: one param, not matches": { + wantMatch: false, + typ: "dstar", + patterns: []string{"Value"}, + value: "value", + }, + "dstar: several params, last one matches": { + wantMatch: true, + typ: "dstar", + patterns: []string{"not", "matches", "*"}, + value: "value", + }, + "dstar: several params, no matches": { + wantMatch: false, + typ: "dstar", + patterns: []string{"not", "matches", "really"}, + value: "value", + }, + "re: one param, matches": { + wantMatch: true, + typ: "re", + patterns: []string{"^value$"}, + value: "value", + }, + "re: one param, not matches": { + wantMatch: false, + typ: "re", + patterns: []string{"^Value$"}, + value: "value", + }, + "re: several params, last one matches": { + wantMatch: true, + typ: "re", + patterns: []string{"not", "matches", "va[lue]{3}"}, + value: "value", + }, + "re: several params, no matches": { + wantMatch: false, + typ: "re", + patterns: []string{"not", "matches", "val[^l]ue"}, + value: "value", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ok := funcMatchAny(test.typ, test.value, test.patterns[0], test.patterns[1:]...) + + assert.Equal(t, test.wantMatch, ok) + }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go new file mode 100644 index 000000000..f69501c39 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline.go @@ -0,0 +1,236 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/dockerd" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/kubernetes" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/discoverer/netlisteners" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/agent/hostinfo" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +func New(cfg Config) (*Pipeline, error) { + if err := validateConfig(cfg); err != nil { + return nil, err + } + + clr, err := newTargetClassificator(cfg.Classify) + if err != nil { + return nil, fmt.Errorf("classify rules: %v", err) + } + + cmr, err := newConfigComposer(cfg.Compose) + if err != nil { + return nil, fmt.Errorf("compose rules: %v", err) + } + + p := &Pipeline{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("pipeline", cfg.Name), + ), + configDefaults: cfg.ConfigDefaults, + clr: clr, + cmr: cmr, + accum: newAccumulator(), + discoverers: make([]model.Discoverer, 0), + configs: make(map[string]map[uint64][]confgroup.Config), + } + p.accum.Logger = p.Logger + + if err := p.registerDiscoverers(cfg); err != nil { + return nil, err + } + + return p, nil +} + +type ( + Pipeline struct { + *logger.Logger + + configDefaults confgroup.Registry + discoverers []model.Discoverer + accum *accumulator + clr classificator + cmr composer + configs map[string]map[uint64][]confgroup.Config // [targetSource][targetHash] + } + classificator interface { + classify(model.Target) model.Tags + } + composer interface { + compose(model.Target) []confgroup.Config + } +) + +func (p *Pipeline) registerDiscoverers(conf Config) error { + for _, cfg := range conf.Discover { + switch cfg.Discoverer { + case "net_listeners": + cfg.NetListeners.Source = conf.Source + td, err := netlisteners.NewDiscoverer(cfg.NetListeners) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + case "docker": + if hostinfo.IsInsideK8sCluster() { + p.Infof("not registering '%s' discoverer: disabled in k8s environment", cfg.Discoverer) + continue + } + cfg.Docker.Source = conf.Source + td, err := dockerd.NewDiscoverer(cfg.Docker) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + case "k8s": + for _, k8sCfg := range cfg.K8s { + td, err := kubernetes.NewKubeDiscoverer(k8sCfg) + if err != nil { + return fmt.Errorf("failed to create '%s' discoverer: %v", cfg.Discoverer, err) + } + p.discoverers = append(p.discoverers, td) + } + default: + return fmt.Errorf("unknown discoverer: '%s'", cfg.Discoverer) + } + } + + if len(p.discoverers) == 0 { + return errors.New("no discoverers registered") + } + + return nil +} + +func (p *Pipeline) Run(ctx context.Context, in chan<- []*confgroup.Group) { + p.Info("instance is started") + defer p.Info("instance is stopped") + + p.accum.discoverers = p.discoverers + + updates := make(chan []model.TargetGroup) + done := make(chan struct{}) + + go func() { defer close(done); p.accum.run(ctx, updates) }() + + for { + select { + case <-ctx.Done(): + select { + case <-done: + case <-time.After(time.Second * 4): + } + return + case <-done: + return + case tggs := <-updates: + p.Debugf("received %d target groups", len(tggs)) + if cfggs := p.processGroups(tggs); len(cfggs) > 0 { + select { + case <-ctx.Done(): + case in <- cfggs: // FIXME: potentially stale configs if upstream cannot receive (blocking) + } + } + } + } +} + +func (p *Pipeline) processGroups(tggs []model.TargetGroup) []*confgroup.Group { + var groups []*confgroup.Group + // updates come from the accumulator, this ensures that all groups have different sources + for _, tgg := range tggs { + p.Debugf("processing group '%s' with %d target(s)", tgg.Source(), len(tgg.Targets())) + if v := p.processGroup(tgg); v != nil { + groups = append(groups, v) + } + } + return groups +} + +func (p *Pipeline) processGroup(tgg model.TargetGroup) *confgroup.Group { + if len(tgg.Targets()) == 0 { + if _, ok := p.configs[tgg.Source()]; !ok { + return nil + } + delete(p.configs, tgg.Source()) + + return &confgroup.Group{Source: tgg.Source()} + } + + targetsCache, ok := p.configs[tgg.Source()] + if !ok { + targetsCache = make(map[uint64][]confgroup.Config) + p.configs[tgg.Source()] = targetsCache + } + + var changed bool + seen := make(map[uint64]bool) + + for _, tgt := range tgg.Targets() { + if tgt == nil { + continue + } + + hash := tgt.Hash() + seen[hash] = true + + if _, ok := targetsCache[hash]; ok { + continue + } + + targetsCache[hash] = nil + + if tags := p.clr.classify(tgt); len(tags) > 0 { + tgt.Tags().Merge(tags) + + if cfgs := p.cmr.compose(tgt); len(cfgs) > 0 { + targetsCache[hash] = cfgs + changed = true + + for _, cfg := range cfgs { + cfg.SetProvider(tgg.Provider()) + cfg.SetSource(tgg.Source()) + cfg.SetSourceType(confgroup.TypeDiscovered) + if def, ok := p.configDefaults.Lookup(cfg.Module()); ok { + cfg.ApplyDefaults(def) + } + } + } + } + } + + for hash := range targetsCache { + if seen[hash] { + continue + } + if cfgs := targetsCache[hash]; len(cfgs) > 0 { + changed = true + } + delete(targetsCache, hash) + } + + if !changed { + return nil + } + + // TODO: deepcopy? + cfgGroup := &confgroup.Group{Source: tgg.Source()} + + for _, cfgs := range targetsCache { + cfgGroup.Configs = append(cfgGroup.Configs, cfgs...) + } + + return cfgGroup +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go new file mode 100644 index 000000000..2dd53cf10 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/pipeline_test.go @@ -0,0 +1,303 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/ilyam8/hashstructure" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func Test_defaultConfigs(t *testing.T) { + dir := "../../../../config/go.d/sd/" + entries, err := os.ReadDir(dir) + require.NoError(t, err) + + require.NotEmpty(t, entries) + + for _, e := range entries { + if strings.Contains(e.Name(), "prometheus") { + continue + } + file, err := filepath.Abs(filepath.Join(dir, e.Name())) + require.NoError(t, err, "abs path") + + bs, err := os.ReadFile(file) + require.NoError(t, err, "read config file") + + var cfg Config + require.NoError(t, yaml.Unmarshal(bs, &cfg), "unmarshal") + + _, err = New(cfg) + require.NoError(t, err, "create pipeline") + } +} + +func TestNew(t *testing.T) { + tests := map[string]struct { + config string + wantErr bool + }{ + "fails when config unset": { + wantErr: true, + config: "", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + + var cfg Config + err := yaml.Unmarshal([]byte(test.config), &cfg) + require.Nilf(t, err, "cfg unmarshal") + + _, err = New(cfg) + + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestPipeline_Run(t *testing.T) { + const config = ` +classify: + - selector: "rule1" + tags: "foo1" + match: + - tags: "bar1" + expr: '{{ glob .Name "mock*1*" }}' + - tags: "bar2" + expr: '{{ glob .Name "mock*2*" }}' +compose: + - selector: "foo1" + config: + - selector: "bar1" + template: | + name: {{ .Name }}-foobar1 + - selector: "bar2" + template: | + name: {{ .Name }}-foobar2 +` + tests := map[string]discoverySim{ + "new group with no targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("", + newMockTargetGroup("test"), + ), + }, + wantClassifyCalls: 0, + wantComposeCalls: 0, + wantConfGroups: nil, + }, + "new group with targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + }, + wantClassifyCalls: 2, + wantComposeCalls: 2, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + }, + }, + "existing group with same targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test", "mock1", "mock2"), + ), + }, + wantClassifyCalls: 2, + wantComposeCalls: 2, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + }, + }, + "existing group that previously had targets with no targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test"), + ), + }, + wantClassifyCalls: 2, + wantComposeCalls: 2, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + prepareDiscoveredGroup(), + }, + }, + "existing group with old and new targets": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test", "mock1", "mock2", "mock11", "mock22"), + ), + }, + wantClassifyCalls: 4, + wantComposeCalls: 4, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2", "mock11-foobar1", "mock22-foobar2"), + }, + }, + "existing group with new targets only": { + config: config, + discoverers: []model.Discoverer{ + newMockDiscoverer("rule1", + newMockTargetGroup("test", "mock1", "mock2"), + ), + newDelayedMockDiscoverer("rule1", 5, + newMockTargetGroup("test", "mock11", "mock22"), + ), + }, + wantClassifyCalls: 4, + wantComposeCalls: 4, + wantConfGroups: []*confgroup.Group{ + prepareDiscoveredGroup("mock1-foobar1", "mock2-foobar2"), + prepareDiscoveredGroup("mock11-foobar1", "mock22-foobar2"), + }, + }, + } + + for name, sim := range tests { + t.Run(name, func(t *testing.T) { + sim.run(t) + }) + } +} + +func prepareDiscoveredGroup(configNames ...string) *confgroup.Group { + var configs []confgroup.Config + + for _, name := range configNames { + configs = append(configs, confgroup.Config{}. + SetProvider("mock"). + SetSourceType(confgroup.TypeDiscovered). + SetSource("test"). + SetName(name)) + } + + return &confgroup.Group{ + Source: "test", + Configs: configs, + } +} + +func newMockDiscoverer(tags string, tggs ...model.TargetGroup) *mockDiscoverer { + return &mockDiscoverer{ + tags: mustParseTags(tags), + tggs: tggs, + } +} + +func newDelayedMockDiscoverer(tags string, delay int, tggs ...model.TargetGroup) *mockDiscoverer { + return &mockDiscoverer{ + tags: mustParseTags(tags), + tggs: tggs, + delay: time.Duration(delay) * time.Second, + } +} + +type mockDiscoverer struct { + tggs []model.TargetGroup + tags model.Tags + delay time.Duration +} + +func (md mockDiscoverer) String() string { + return "mock discoverer" +} + +func (md mockDiscoverer) Discover(ctx context.Context, out chan<- []model.TargetGroup) { + for _, tgg := range md.tggs { + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(md.tags) + } + } + + select { + case <-ctx.Done(): + case <-time.After(md.delay): + select { + case <-ctx.Done(): + case out <- md.tggs: + } + } +} + +func newMockTargetGroup(source string, targets ...string) *mockTargetGroup { + m := &mockTargetGroup{source: source} + for _, name := range targets { + m.targets = append(m.targets, &mockTarget{Name: name}) + } + return m +} + +type mockTargetGroup struct { + targets []model.Target + source string +} + +func (mg mockTargetGroup) Targets() []model.Target { return mg.targets } +func (mg mockTargetGroup) Source() string { return mg.source } +func (mg mockTargetGroup) Provider() string { return "mock" } + +func newMockTarget(name string, tags ...string) *mockTarget { + m := &mockTarget{Name: name} + v, _ := model.ParseTags(strings.Join(tags, " ")) + m.Tags().Merge(v) + return m +} + +type mockTarget struct { + model.Base + Name string +} + +func (mt mockTarget) TUID() string { return mt.Name } +func (mt mockTarget) Hash() uint64 { return mustCalcHash(mt.Name) } + +func mustParseTags(line string) model.Tags { + v, err := model.ParseTags(line) + if err != nil { + panic(fmt.Sprintf("mustParseTags: %v", err)) + } + return v +} + +func mustCalcHash(obj any) uint64 { + hash, err := hashstructure.Hash(obj, nil) + if err != nil { + panic(fmt.Sprintf("hash calculation: %v", err)) + } + return hash +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go new file mode 100644 index 000000000..646e1abb1 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/promport.go @@ -0,0 +1,662 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +// https://github.com/prometheus/prometheus/wiki/Default-port-allocations +var prometheusPortAllocations = map[int]string{ + 2019: "caddy", + 3000: "grafana", + 3100: "loki", + 5555: "prometheus-jdbc-exporter", + 6060: "crowdsec", + 7300: "midonet_agent", + 8001: "netbox", + 8088: "fawkes", + 8089: "prom2teams", + 8292: "phabricator_webhook_for_alertmanager", + 8404: "ha_proxy_v2_plus", + 9042: "rds_exporter", + 9087: "telegram_bot_for_alertmanager", + 9091: "pushgateway", + 9097: "jiralert", + 9101: "haproxy_exporter", + 9102: "statsd_exporter", + 9103: "collectd_exporter", + 9104: "mysqld_exporter", + 9105: "mesos_exporter", + 9106: "cloudwatch_exporter", + 9107: "consul_exporter", + 9108: "graphite_exporter", + 9109: "graphite_exporter", + 9110: "blackbox_exporter", + 9111: "expvar_exporter", + 9112: "promacct_pcap-based_network_traffic_accounting", + 9113: "nginx_exporter", + 9114: "elasticsearch_exporter", + 9115: "blackbox_exporter", + 9116: "snmp_exporter", + 9117: "apache_exporter", + 9118: "jenkins_exporter", + 9119: "bind_exporter", + 9120: "powerdns_exporter", + 9121: "redis_exporter", + 9122: "influxdb_exporter", + 9123: "rethinkdb_exporter", + 9124: "freebsd_sysctl_exporter", + 9125: "statsd_exporter", + 9126: "new_relic_exporter", + 9127: "pgbouncer_exporter", + 9128: "ceph_exporter", + 9129: "haproxy_log_exporter", + 9130: "unifi_poller", + 9131: "varnish_exporter", + 9132: "airflow_exporter", + 9133: "fritz_box_exporter", + 9134: "zfs_exporter", + 9135: "rtorrent_exporter", + 9136: "collins_exporter", + 9137: "silicondust_hdhomerun_exporter", + 9138: "heka_exporter", + 9139: "azure_sql_exporter", + 9140: "mirth_exporter", + 9141: "zookeeper_exporter", + 9142: "big-ip_exporter", + 9143: "cloudmonitor_exporter", + 9145: "aerospike_exporter", + 9146: "icecast_exporter", + 9147: "nginx_request_exporter", + 9148: "nats_exporter", + 9149: "passenger_exporter", + 9150: "memcached_exporter", + 9151: "varnish_request_exporter", + 9152: "command_runner_exporter", + 9154: "postfix_exporter", + 9155: "vsphere_graphite", + 9156: "webdriver_exporter", + 9157: "ibm_mq_exporter", + 9158: "pingdom_exporter", + 9160: "apache_flink_exporter", + 9161: "oracle_db_exporter", + 9162: "apcupsd_exporter", + 9163: "zgres_exporter", + 9164: "s6_exporter", + 9165: "keepalived_exporter", + 9166: "dovecot_exporter", + 9167: "unbound_exporter", + 9168: "gitlab-monitor", + 9169: "lustre_exporter", + 9170: "docker_hub_exporter", + 9171: "github_exporter", + 9172: "script_exporter", + 9173: "rancher_exporter", + 9174: "docker-cloud_exporter", + 9175: "saltstack_exporter", + 9176: "openvpn_exporter", + 9177: "libvirt_exporter", + 9178: "stream_exporter", + 9179: "shield_exporter", + 9180: "scylladb_exporter", + 9181: "openstack_ceilometer_exporter", + 9183: "openstack_exporter", + 9184: "twitch_exporter", + 9185: "kafka_topic_exporter", + 9186: "cloud_foundry_firehose_exporter", + 9187: "postgresql_exporter", + 9188: "crypto_exporter", + 9189: "hetzner_cloud_csi_driver_nodes", + 9190: "bosh_exporter", + 9191: "netflow_exporter", + 9192: "ceph_exporter", + 9193: "cloud_foundry_exporter", + 9194: "bosh_tsdb_exporter", + 9195: "maxscale_exporter", + 9196: "upnp_internet_gateway_device_exporter", + 9198: "logstash_exporter", + 9199: "cloudflare_exporter", + 9202: "pacemaker_exporter", + 9203: "domain_exporter", + 9204: "pcsensor_temper_exporter", + 9205: "nextcloud_exporter", + 9206: "elasticsearch_exporter", + 9207: "mysql_exporter", + 9208: "kafka_consumer_group_exporter", + 9209: "fastnetmon_advanced_exporter", + 9210: "netatmo_exporter", + 9211: "dnsbl-exporter", + 9212: "digitalocean_exporter", + 9213: "custom_exporter", + 9214: "mqtt_blackbox_exporter", + 9215: "prometheus_graphite_bridge", + 9216: "mongodb_exporter", + 9217: "consul_agent_exporter", + 9218: "promql-guard", + 9219: "ssl_certificate_exporter", + 9220: "netapp_trident_exporter", + 9221: "proxmox_ve_exporter", + 9222: "aws_ecs_exporter", + 9223: "bladepsgi_exporter", + 9224: "fluentd_exporter", + 9225: "mailexporter", + 9226: "allas", + 9227: "proc_exporter", + 9228: "flussonic_exporter", + 9229: "gitlab-workhorse", + 9230: "network_ups_tools_exporter", + 9231: "solr_exporter", + 9232: "osquery_exporter", + 9233: "mgmt_exporter", + 9234: "mosquitto_exporter", + 9235: "gitlab-pages_exporter", + 9236: "gitlab_gitaly_exporter", + 9237: "sql_exporter", + 9238: "uwsgi_expoter", + 9239: "surfboard_exporter", + 9240: "tinyproxy_exporter", + 9241: "arangodb_exporter", + 9242: "ceph_radosgw_usage_exporter", + 9243: "chef_compliance_exporter", + 9244: "moby_container_exporter", + 9245: "naemon_nagios_exporter", + 9246: "smartpi", + 9247: "sphinx_exporter", + 9248: "freebsd_gstat_exporter", + 9249: "apache_flink_metrics_reporter", + 9250: "opentsdb_exporter", + 9251: "sensu_exporter", + 9252: "gitlab_runner_exporter", + 9253: "php-fpm_exporter", + 9254: "kafka_burrow_exporter", + 9255: "google_stackdriver_exporter", + 9256: "td-agent_exporter", + 9257: "smart_exporter", + 9258: "hello_sense_exporter", + 9259: "azure_resources_exporter", + 9260: "buildkite_exporter", + 9261: "grafana_exporter", + 9262: "bloomsky_exporter", + 9263: "vmware_guest_exporter", + 9264: "nest_exporter", + 9265: "weather_exporter", + 9266: "openhab_exporter", + 9267: "nagios_livestatus_exporter", + 9268: "cratedb_remote_remote_read_write_adapter", + 9269: "fluent-agent-lite_exporter", + 9270: "jmeter_exporter", + 9271: "pagespeed_exporter", + 9272: "vmware_exporter", + 9274: "kubernetes_persistentvolume_disk_usage_exporter", + 9275: "nrpe_exporter", + 9276: "azure_monitor_exporter", + 9277: "mongo_collection_exporter", + 9278: "crypto_miner_exporter", + 9279: "instaclustr_exporter", + 9280: "citrix_netscaler_exporter", + 9281: "fastd_exporter", + 9282: "freeswitch_exporter", + 9283: "ceph_ceph-mgr_prometheus_plugin", + 9284: "gobetween", + 9285: "database_exporter", + 9286: "vdo_compression_and_deduplication_exporter", + 9287: "ceph_iscsi_gateway_statistics", + 9288: "consrv", + 9289: "lovoos_ipmi_exporter", + 9290: "soundclouds_ipmi_exporter", + 9291: "ibm_z_hmc_exporter", + 9292: "netapp_ontap_api_exporter", + 9293: "connection_status_exporter", + 9294: "miflora_flower_care_exporter", + 9295: "freifunk_exporter", + 9296: "odbc_exporter", + 9297: "machbase_exporter", + 9298: "generic_exporter", + 9299: "exporter_aggregator", + 9301: "squid_exporter", + 9302: "faucet_sdn_faucet_exporter", + 9303: "faucet_sdn_gauge_exporter", + 9304: "logstash_exporter", + 9305: "go-ethereum_exporter", + 9306: "kyototycoon_exporter", + 9307: "audisto_exporter", + 9308: "kafka_exporter", + 9309: "fluentd_exporter", + 9310: "open_vswitch_exporter", + 9311: "iota_exporter", + 9313: "cloudprober_exporter", + 9314: "eris_exporter", + 9315: "centrifugo_exporter", + 9316: "tado_exporter", + 9317: "tellstick_local_exporter", + 9318: "conntrack_exporter", + 9319: "flexlm_exporter", + 9320: "consul_telemetry_exporter", + 9321: "spring_boot_actuator_exporter", + 9322: "haproxy_abuser_exporter", + 9323: "docker_prometheus_metrics", + 9324: "bird_routing_daemon_exporter", + 9325: "ovirt_exporter", + 9326: "junos_exporter", + 9327: "s3_exporter", + 9328: "openldap_syncrepl_exporter", + 9329: "cups_exporter", + 9330: "openldap_metrics_exporter", + 9331: "influx-spout_prometheus_metrics", + 9332: "network_exporter", + 9333: "vault_pki_exporter", + 9334: "ejabberd_exporter", + 9335: "nexsan_exporter", + 9336: "mediacom_internet_usage_exporter", + 9337: "mqttgateway", + 9339: "aws_s3_exporter", + 9340: "financial_quotes_exporter", + 9341: "slurm_exporter", + 9342: "frr_exporter", + 9343: "gridserver_exporter", + 9344: "mqtt_exporter", + 9345: "ruckus_smartzone_exporter", + 9346: "ping_exporter", + 9347: "junos_exporter", + 9348: "bigquery_exporter", + 9349: "configurable_elasticsearch_query_exporter", + 9350: "thousandeyes_exporter", + 9351: "wal-e_wal-g_exporter", + 9352: "nature_remo_exporter", + 9353: "ceph_exporter", + 9354: "deluge_exporter", + 9355: "nightwatchjs_exporter", + 9356: "pacemaker_exporter", + 9357: "p1_exporter", + 9358: "performance_counters_exporter", + 9359: "sidekiq_prometheus", + 9360: "powershell_exporter", + 9361: "scaleway_sd_exporter", + 9362: "cisco_exporter", + // Netdata has clickhouse collector. + // CH itself exposes messy Prometheus metrics: camelCase names, appends instances to names instead of labels. + //9363: "clickhouse", + 9364: "continent8_exporter", + 9365: "cumulus_linux_exporter", + 9366: "haproxy_stick_table_exporter", + 9367: "teamspeak3_exporter", + 9368: "ethereum_client_exporter", + 9369: "prometheus_pushprox", + 9370: "u-bmc", + 9371: "conntrack-stats-exporter", + 9372: "appmetrics_prometheus", + 9373: "gcp_service_discovery", + 9374: "smokeping_prober", + 9375: "particle_exporter", + 9376: "falco", + 9377: "cisco_aci_exporter", + 9378: "etcd_grpc_proxy_exporter", + 9379: "etcd_exporter", + 9380: "mythtv_exporter", + 9381: "kafka_zookeeper_exporter", + 9382: "frrouting_exporter", + 9383: "aws_health_exporter", + 9384: "aws_sqs_exporter", + 9385: "apcupsdexporter", + 9386: "tankerkönig_api_exporter", + 9387: "sabnzbd_exporter", + 9388: "linode_exporter", + 9389: "scylla-cluster-tests_exporter", + 9390: "kannel_exporter", + 9391: "concourse_prometheus_metrics", + 9392: "generic_command_line_output_exporter", + 9393: "alertmanager_github_webhook_receiver", + 9394: "ruby_prometheus_exporter", + 9395: "ldap_exporter", + 9396: "monerod_exporter", + 9397: "comap", + 9398: "open_hardware_monitor_exporter", + 9399: "prometheus_sql_exporter", + 9400: "ripe_atlas_exporter", + 9401: "1-wire_exporter", + 9402: "google_cloud_platform_exporter", + 9403: "zerto_exporter", + 9404: "jmx_exporter", + 9405: "discourse_exporter", + 9406: "hhvm_exporter", + 9407: "obs_studio_exporter", + 9408: "rds_enhanced_monitoring_exporter", + 9409: "ovn-kubernetes_master_exporter", + 9410: "ovn-kubernetes_node_exporter", + 9411: "softether_exporter", + 9412: "sentry_exporter", + 9413: "mogilefs_exporter", + 9414: "homey_exporter", + 9415: "cloudwatch_read_adapter", + 9416: "hp_ilo_metrics_exporter", + 9417: "ethtool_exporter", + 9418: "gearman_exporter", + 9419: "rabbitmq_exporter", + 9420: "couchbase_exporter", + 9421: "apicast", + 9422: "jolokia_exporter", + 9423: "hp_raid_exporter", + 9424: "influxdb_stats_exporter", + 9425: "pachyderm_exporter", + 9426: "vespa_engine_exporter", + 9427: "ping_exporter", + 9428: "ssh_exporter", + 9429: "uptimerobot_exporter", + 9430: "corerad", + 9431: "hpfeeds_broker_exporter", + 9432: "windows_perflib_exporter", + 9433: "knot_exporter", + 9434: "opensips_exporter", + 9435: "ebpf_exporter", + 9436: "mikrotik-exporter", + 9437: "dell_emc_isilon_exporter", + 9438: "dell_emc_ecs_exporter", + 9439: "bitcoind_exporter", + 9440: "ravendb_exporter", + 9441: "nomad_exporter", + 9442: "mcrouter_exporter", + 9444: "foundationdb_exporter", + 9445: "nvidia_gpu_exporter", + 9446: "orange_livebox_dsl_modem_exporter", + 9447: "resque_exporter", + 9448: "eventstore_exporter", + 9449: "omeroserver_exporter", + 9450: "habitat_exporter", + 9451: "reindexer_exporter", + 9452: "freebsd_jail_exporter", + 9453: "midonet-kubernetes", + 9454: "nvidia_smi_exporter", + 9455: "iptables_exporter", + 9456: "aws_lambda_exporter", + 9457: "files_content_exporter", + 9458: "rocketchat_exporter", + 9459: "yarn_exporter", + 9460: "hana_exporter", + 9461: "aws_lambda_read_adapter", + 9462: "php_opcache_exporter", + 9463: "virgin_media_liberty_global_hub3_exporter", + 9464: "opencensus-nodejs_prometheus_exporter", + 9465: "hetzner_cloud_k8s_cloud_controller_manager", + 9466: "mqtt_push_gateway", + 9467: "nginx-prometheus-shiny-exporter", + 9468: "nasa-swpc-exporter", + 9469: "script_exporter", + 9470: "cachet_exporter", + 9471: "lxc-exporter", + 9472: "hetzner_cloud_csi_driver_controller", + 9473: "stellar-core-exporter", + 9474: "libvirtd_exporter", + 9475: "wgipamd", + 9476: "ovn_metrics_exporter", + 9477: "csp_violation_report_exporter", + 9478: "sentinel_exporter", + 9479: "elasticbeat_exporter", + 9480: "brigade_exporter", + 9481: "drbd9_exporter", + 9482: "vector_packet_process_vpp_exporter", + 9483: "ibm_app_connect_enterprise_exporter", + 9484: "kubedex-exporter", + 9485: "emarsys_exporter", + 9486: "domoticz_exporter", + 9487: "docker_stats_exporter", + 9488: "bmw_connected_drive_exporter", + 9489: "tezos_node_metrics_exporter", + 9490: "exporter_for_docker_libnetwork_plugin_for_ovn", + 9491: "docker_container_stats_exporter_docker_ps", + 9492: "azure_exporter_monitor_and_usage", + 9493: "prosafe_exporter", + 9494: "kamailio_exporter", + 9495: "ingestor_exporter", + 9496: "389ds_ipa_exporter", + 9497: "immudb_exporter", + 9498: "tp-link_hs110_exporter", + 9499: "smartthings_exporter", + 9500: "cassandra_exporter", + 9501: "hetznercloud_exporter", + 9502: "hetzner_exporter", + 9503: "scaleway_exporter", + 9504: "github_exporter", + 9505: "dockerhub_exporter", + 9506: "jenkins_exporter", + 9507: "owncloud_exporter", + 9508: "ccache_exporter", + 9509: "hetzner_storagebox_exporter", + 9510: "dummy_exporter", + 9512: "cloudera_exporter", + 9513: "openconfig_streaming_telemetry_exporter", + 9514: "app_stores_exporter", + 9515: "swarm-exporter", + 9516: "prometheus_speedtest_exporter", + 9517: "matroschka_prober", + 9518: "crypto_stock_exchanges_funds_exporter", + 9519: "acurite_exporter", + 9520: "swift_health_exporter", + 9521: "ruuvi_exporter", + 9522: "tftp_exporter", + 9523: "3cx_exporter", + 9524: "loki_exporter", + 9525: "alibaba_cloud_exporter", + 9526: "kafka_lag_exporter", + 9527: "netgear_cable_modem_exporter", + 9528: "total_connect_comfort_exporter", + 9529: "octoprint_exporter", + 9530: "custom_prometheus_exporter", + 9531: "jfrog_artifactory_exporter", + 9532: "snyk_exporter", + 9533: "network_exporter_for_cisco_api", + 9534: "humio_exporter", + 9535: "cron_exporter", + 9536: "ipsec_exporter", + 9537: "cri-o", + 9538: "bull_queue", + 9539: "modemmanager_exporter", + 9540: "emq_exporter", + 9541: "smartmon_exporter", + 9542: "sakuracloud_exporter", + 9543: "kube2iam_exporter", + 9544: "pgio_exporter", + 9545: "hp_ilo4_exporter", + 9546: "pwrstat-exporter", + 9547: "patroni_exporter", + 9548: "trafficserver_exporter", + 9549: "raspberry_exporter", + 9550: "rtl_433_exporter", + 9551: "hostapd_exporter", + 9552: "aws_elastic_beanstalk_exporter", + 9553: "apt_exporter", + 9554: "acc_server_manager_exporter", + 9555: "sona_exporter", + 9556: "routinator_exporter", + 9557: "mysql_count_exporter", + 9558: "systemd_exporter", + 9559: "ntp_exporter", + 9560: "sql_queries_exporter", + 9561: "qbittorrent_exporter", + 9562: "ptv_xserver_exporter", + 9563: "kibana_exporter", + 9564: "purpleair_exporter", + 9565: "bminer_exporter", + 9566: "rabbitmq_cli_consumer", + 9567: "alertsnitch", + 9568: "dell_poweredge_ipmi_exporter", + 9569: "hvpa_controller", + 9570: "vpa_exporter", + 9571: "helm_exporter", + 9572: "ctld_exporter", + 9573: "jkstatus_exporter", + 9574: "opentracker_exporter", + 9575: "poweradmin_server_monitor_exporter", + 9576: "exabgp_exporter", + 9578: "aria2_exporter", + 9579: "iperf3_exporter", + 9580: "azure_service_bus_exporter", + 9581: "codenotary_vcn_exporter", + 9583: "signatory_a_remote_operation_signer_for_tezos", + 9584: "bunnycdn_exporter", + 9585: "opvizor_performance_analyzer_process_exporter", + 9586: "wireguard_exporter", + 9587: "nfs-ganesha_exporter", + 9588: "ltsv-tailer_exporter", + 9589: "goflow_exporter", + 9590: "flow_exporter", + 9591: "srcds_exporter", + 9592: "gcp_quota_exporter", + 9593: "lighthouse_exporter", + 9594: "plex_exporter", + 9595: "netio_exporter", + 9596: "azure_elastic_sql_exporter", + 9597: "github_vulnerability_alerts_exporter", + 9599: "pirograph_exporter", + 9600: "circleci_exporter", + 9601: "messagebird_exporter", + 9602: "modbus_exporter", + 9603: "xen_exporter_using_xenlight", + 9604: "xmpp_blackbox_exporter", + 9605: "fping-exporter", + 9606: "ecr-exporter", + 9607: "raspberry_pi_sense_hat_exporter", + 9608: "ironic_prometheus_exporter", + 9609: "netapp_exporter", + 9610: "kubernetes_exporter", + 9611: "speedport_exporter", + 9612: "opflex-agent_exporter", + 9613: "azure_health_exporter", + 9614: "nut_upsc_exporter", + 9615: "mellanox_mlx5_exporter", + 9616: "mailgun_exporter", + 9617: "pi-hole_exporter", + 9618: "stellar-account-exporter", + 9619: "stellar-horizon-exporter", + 9620: "rundeck_exporter", + 9621: "opennebula_exporter", + 9622: "bmc_exporter", + 9623: "tc4400_exporter", + 9624: "pact_broker_exporter", + 9625: "bareos_exporter", + 9626: "hockeypuck", + 9627: "artifactory_exporter", + 9628: "solace_pubsub_plus_exporter", + 9629: "prometheus_gitlab_notifier", + 9630: "nftables_exporter", + 9631: "a_op5_monitor_exporter", + 9632: "opflex-server_exporter", + 9633: "smartctl_exporter", + 9634: "aerospike_ttl_exporter", + 9635: "fail2ban_exporter", + 9636: "exim4_exporter", + 9637: "kubeversion_exporter", + 9638: "a_icinga2_exporter", + 9639: "scriptable_jmx_exporter", + 9640: "logstash_output_exporter", + 9641: "coturn_exporter", + 9642: "bugsnag_exporter", + 9644: "exporter_for_grouped_process", + 9645: "burp_exporter", + 9646: "locust_exporter", + 9647: "docker_exporter", + 9648: "ntpmon_exporter", + 9649: "logstash_exporter", + 9650: "keepalived_exporter", + 9651: "storj_exporter", + 9652: "praefect_exporter", + 9653: "jira_issues_exporter", + 9654: "ansible_galaxy_exporter", + 9655: "kube-netc_exporter", + 9656: "matrix", + 9657: "krill_exporter", + 9658: "sap_hana_sql_exporter", + 9660: "kaiterra_laser_egg_exporter", + 9661: "hashpipe_exporter", + 9662: "pms5003_particulate_matter_sensor_exporter", + 9663: "sap_nwrfc_exporter", + 9664: "linux_ha_clusterlabs_exporter", + 9665: "senderscore_exporter", + 9666: "alertmanager_silences_exporter", + 9667: "smtpd_exporter", + 9668: "suses_sap_hana_exporter", + 9669: "panopticon_native_metrics", + 9670: "flare_native_metrics", + 9671: "aws_ec2_spot_exporter", + 9672: "aircontrol_co2_exporter", + 9673: "co2_monitor_exporter", + 9674: "google_analytics_exporter", + 9675: "docker_swarm_exporter", + 9676: "hetzner_traffic_exporter", + 9677: "aws_ecs_exporter", + 9678: "ircd_user_exporter", + 9679: "aws_health_exporter", + 9680: "suses_sap_host_exporter", + 9681: "myfitnesspal_exporter", + 9682: "powder_monkey", + 9683: "infiniband_exporter", + 9684: "kibana_standalone_exporter", + 9685: "eideticom", + 9686: "aws_ec2_exporter", + 9687: "gitaly_blackbox_exporter", + 9689: "lan_server_modbus_exporter", + 9690: "tcp_longterm_connection_exporter", + 9691: "celery_redis_exporter", + 9692: "gcp_gce_exporter", + 9693: "sigma_air_manager_exporter", + 9694: "per-user_usage_exporter_for_cisco_xe_lnss", + 9695: "cifs_exporter", + 9696: "jitsi_videobridge_exporter", + 9697: "tendermint_blockchain_exporter", + 9698: "integrated_dell_remote_access_controller_idrac_exporter", + 9699: "pyncette_exporter", + 9700: "jitsi_meet_exporter", + 9701: "workbook_exporter", + 9702: "homeplug_plc_exporter", + 9703: "vircadia", + 9704: "linux_tc_exporter", + 9705: "upc_connect_box_exporter", + 9706: "postfix_exporter", + 9707: "radarr_exporter", + 9708: "sonarr_exporter", + 9709: "hadoop_hdfs_fsimage_exporter", + 9710: "nut-exporter", + 9711: "cloudflare_flan_scan_report_exporter", + 9712: "siemens_s7_plc_exporter", + 9713: "glusterfs_exporter", + 9714: "fritzbox_exporter", + 9715: "twincat_ads_web_service_exporter", + 9716: "signald_webhook_receiver", + 9717: "tplink_easysmart_switch_exporter", + 9718: "warp10_exporter", + 9719: "pgpool-ii_exporter", + 9720: "moodle_db_exporter", + 9721: "gtp_exporter", + 9722: "miele_exporter", + 9724: "freeswitch_exporter", + 9725: "sunnyboy_exporter", + 9726: "python_rq_exporter", + 9727: "ctdb_exporter", + 9728: "nginx-rtmp_exporter", + 9729: "libvirtd_exporter", + 9730: "lynis_exporter", + 9731: "nebula_mam_exporter", + 9732: "nftables_exporter", + 9733: "honeypot_exporter", + 9734: "a10-networks_prometheus_exporter", + 9735: "webweaver", + 9736: "mongodb_query_exporter", + 9737: "folding_home_exporter", + 9738: "processor_counter_monitor_exporter", + 9739: "kafka_consumer_lag_monitoring", + 9740: "flightdeck", + 9741: "ibm_spectrum_exporter", + 9742: "transmission-exporter", + 9743: "sma-exporter", + 9803: "site24x7_exporter", + 9901: "envoy_proxy", + 9913: "nginx_vts_exporter", + 9943: "filestat_exporter", + 9980: "login_exporter", + 9983: "sia_exporter", + 9984: "couchdb_exporter", + 9987: "netapp_solidfire_exporter", + 9990: "wildfly_exporter", + 16995: "storidge_exporter", + 19091: "transmission_exporter", + 24231: "fluent_plugin_for_prometheus", + 42004: "proxysql_exporter", + 44323: "pcp_exporter", + 61091: "dcos_exporter", +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go new file mode 100644 index 000000000..8bb5fb061 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector.go @@ -0,0 +1,154 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "errors" + "fmt" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" +) + +type selector interface { + matches(model.Tags) bool +} + +type ( + exactSelector string + trueSelector struct{} + negSelector struct{ selector } + orSelector struct{ lhs, rhs selector } + andSelector struct{ lhs, rhs selector } +) + +func (s exactSelector) matches(tags model.Tags) bool { _, ok := tags[string(s)]; return ok } +func (s trueSelector) matches(model.Tags) bool { return true } +func (s negSelector) matches(tags model.Tags) bool { return !s.selector.matches(tags) } +func (s orSelector) matches(tags model.Tags) bool { return s.lhs.matches(tags) || s.rhs.matches(tags) } +func (s andSelector) matches(tags model.Tags) bool { return s.lhs.matches(tags) && s.rhs.matches(tags) } + +func (s exactSelector) String() string { return "{" + string(s) + "}" } +func (s negSelector) String() string { return "{!" + stringify(s.selector) + "}" } +func (s trueSelector) String() string { return "{*}" } +func (s orSelector) String() string { return "{" + stringify(s.lhs) + "|" + stringify(s.rhs) + "}" } +func (s andSelector) String() string { return "{" + stringify(s.lhs) + ", " + stringify(s.rhs) + "}" } +func stringify(sr selector) string { return strings.Trim(fmt.Sprintf("%s", sr), "{}") } + +func parseSelector(line string) (sr selector, err error) { + words := strings.Fields(line) + if len(words) == 0 { + return trueSelector{}, nil + } + + var srs []selector + for _, word := range words { + if idx := strings.IndexByte(word, '|'); idx > 0 { + sr, err = parseOrSelectorWord(word) + } else { + sr, err = parseSingleSelectorWord(word) + } + if err != nil { + return nil, fmt.Errorf("selector '%s' contains selector '%s' with forbidden symbol", line, word) + } + srs = append(srs, sr) + } + + switch len(srs) { + case 0: + return trueSelector{}, nil + case 1: + return srs[0], nil + default: + return newAndSelector(srs[0], srs[1], srs[2:]...), nil + } +} + +func parseOrSelectorWord(orWord string) (sr selector, err error) { + var srs []selector + for _, word := range strings.Split(orWord, "|") { + if sr, err = parseSingleSelectorWord(word); err != nil { + return nil, err + } + srs = append(srs, sr) + } + switch len(srs) { + case 0: + return trueSelector{}, nil + case 1: + return srs[0], nil + default: + return newOrSelector(srs[0], srs[1], srs[2:]...), nil + } +} + +func parseSingleSelectorWord(word string) (selector, error) { + if len(word) == 0 { + return nil, errors.New("empty word") + } + neg := word[0] == '!' + if neg { + word = word[1:] + } + if len(word) == 0 { + return nil, errors.New("empty word") + } + if word != "*" && !isSelectorWordValid(word) { + return nil, errors.New("forbidden symbol") + } + + var sr selector + switch word { + case "*": + sr = trueSelector{} + default: + sr = exactSelector(word) + } + if neg { + return negSelector{sr}, nil + } + return sr, nil +} + +func newAndSelector(lhs, rhs selector, others ...selector) selector { + m := andSelector{lhs: lhs, rhs: rhs} + switch len(others) { + case 0: + return m + default: + return newAndSelector(m, others[0], others[1:]...) + } +} + +func newOrSelector(lhs, rhs selector, others ...selector) selector { + m := orSelector{lhs: lhs, rhs: rhs} + switch len(others) { + case 0: + return m + default: + return newOrSelector(m, others[0], others[1:]...) + } +} + +func isSelectorWordValid(word string) bool { + // valid: + // * + // ^[a-zA-Z][a-zA-Z0-9=_.]*$ + if len(word) == 0 { + return false + } + if word == "*" { + return true + } + for i, b := range word { + switch { + case b >= 'a' && b <= 'z': + case b >= 'A' && b <= 'Z': + case b >= '0' && b <= '9' && i > 0: + case (b == '=' || b == '_' || b == '.') && i > 0: + default: + return false + } + } + return true +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go new file mode 100644 index 000000000..a4fcf3041 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/selector_test.go @@ -0,0 +1,248 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "regexp" + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" +) + +var reSrString = regexp.MustCompile(`^{[^{}]+}$`) + +func TestTrueSelector_String(t *testing.T) { + var sr trueSelector + assert.Equal(t, "{*}", sr.String()) +} + +func TestExactSelector_String(t *testing.T) { + sr := exactSelector("selector") + + assert.True(t, reSrString.MatchString(sr.String())) +} + +func TestNegSelector_String(t *testing.T) { + srs := []selector{ + exactSelector("selector"), + negSelector{exactSelector("selector")}, + orSelector{ + lhs: exactSelector("selector"), + rhs: exactSelector("selector")}, + orSelector{ + lhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + }, + andSelector{ + lhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + }, + } + + for i, sr := range srs { + neg := negSelector{sr} + assert.True(t, reSrString.MatchString(neg.String()), "selector num %d", i+1) + } +} + +func TestOrSelector_String(t *testing.T) { + sr := orSelector{ + lhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: orSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + } + + assert.True(t, reSrString.MatchString(sr.String())) +} + +func TestAndSelector_String(t *testing.T) { + sr := andSelector{ + lhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + rhs: andSelector{lhs: exactSelector("selector"), rhs: negSelector{exactSelector("selector")}}, + } + + assert.True(t, reSrString.MatchString(sr.String())) +} + +func TestExactSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []exactSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []exactSelector{ + "a", + "b", + }, + } + notMatchTests := struct { + tags model.Tags + srs []exactSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []exactSelector{ + "c", + "d", + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestNegSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []negSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []negSelector{ + {exactSelector("c")}, + {exactSelector("d")}, + }, + } + notMatchTests := struct { + tags model.Tags + srs []negSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []negSelector{ + {exactSelector("a")}, + {exactSelector("b")}, + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestOrSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []orSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []orSelector{ + { + lhs: orSelector{lhs: exactSelector("c"), rhs: exactSelector("d")}, + rhs: orSelector{lhs: exactSelector("e"), rhs: exactSelector("b")}, + }, + }, + } + notMatchTests := struct { + tags model.Tags + srs []orSelector + }{ + tags: model.Tags{"a": {}, "b": {}}, + srs: []orSelector{ + { + lhs: orSelector{lhs: exactSelector("c"), rhs: exactSelector("d")}, + rhs: orSelector{lhs: exactSelector("e"), rhs: exactSelector("f")}, + }, + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestAndSelector_Matches(t *testing.T) { + matchTests := struct { + tags model.Tags + srs []andSelector + }{ + tags: model.Tags{"a": {}, "b": {}, "c": {}, "d": {}}, + srs: []andSelector{ + { + lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}, + rhs: andSelector{lhs: exactSelector("c"), rhs: exactSelector("d")}, + }, + }, + } + notMatchTests := struct { + tags model.Tags + srs []andSelector + }{ + tags: model.Tags{"a": {}, "b": {}, "c": {}, "d": {}}, + srs: []andSelector{ + { + lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}, + rhs: andSelector{lhs: exactSelector("c"), rhs: exactSelector("z")}, + }, + }, + } + + for i, sr := range matchTests.srs { + assert.Truef(t, sr.matches(matchTests.tags), "match selector num %d", i+1) + } + for i, sr := range notMatchTests.srs { + assert.Falsef(t, sr.matches(notMatchTests.tags), "not match selector num %d", i+1) + } +} + +func TestParseSelector(t *testing.T) { + tests := map[string]struct { + wantSelector selector + wantErr bool + }{ + "": {wantSelector: trueSelector{}}, + "a": {wantSelector: exactSelector("a")}, + "Z": {wantSelector: exactSelector("Z")}, + "a_b": {wantSelector: exactSelector("a_b")}, + "a=b": {wantSelector: exactSelector("a=b")}, + "!a": {wantSelector: negSelector{exactSelector("a")}}, + "a b": {wantSelector: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}}, + "a|b": {wantSelector: orSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}}, + "*": {wantSelector: trueSelector{}}, + "!*": {wantSelector: negSelector{trueSelector{}}}, + "a b !c d|e f": { + wantSelector: andSelector{ + lhs: andSelector{ + lhs: andSelector{ + lhs: andSelector{lhs: exactSelector("a"), rhs: exactSelector("b")}, + rhs: negSelector{exactSelector("c")}, + }, + rhs: orSelector{ + lhs: exactSelector("d"), + rhs: exactSelector("e"), + }, + }, + rhs: exactSelector("f"), + }, + }, + "!": {wantErr: true}, + "a !": {wantErr: true}, + "a!b": {wantErr: true}, + "0a": {wantErr: true}, + "a b c*": {wantErr: true}, + "__": {wantErr: true}, + "a|b|c*": {wantErr: true}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sr, err := parseSelector(name) + + if test.wantErr { + assert.Nil(t, sr) + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.wantSelector, sr) + } + }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go new file mode 100644 index 000000000..23a120751 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/pipeline/sim_test.go @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package pipeline + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +type discoverySim struct { + config string + discoverers []model.Discoverer + wantClassifyCalls int + wantComposeCalls int + wantConfGroups []*confgroup.Group +} + +func (sim discoverySim) run(t *testing.T) { + t.Helper() + + var cfg Config + err := yaml.Unmarshal([]byte(sim.config), &cfg) + require.Nilf(t, err, "cfg unmarshal") + + clr, err := newTargetClassificator(cfg.Classify) + require.Nil(t, err, "newTargetClassificator") + + cmr, err := newConfigComposer(cfg.Compose) + require.Nil(t, err, "newConfigComposer") + + mockClr := &mockClassificator{clr: clr} + mockCmr := &mockComposer{cmr: cmr} + + accum := newAccumulator() + accum.sendEvery = time.Second * 2 + + pl := &Pipeline{ + Logger: logger.New(), + discoverers: sim.discoverers, + accum: accum, + clr: mockClr, + cmr: mockCmr, + configs: make(map[string]map[uint64][]confgroup.Config), + } + + pl.accum.Logger = pl.Logger + clr.Logger = pl.Logger + cmr.Logger = pl.Logger + + groups := sim.collectGroups(t, pl) + + sortConfigGroups(groups) + sortConfigGroups(sim.wantConfGroups) + + assert.Equal(t, sim.wantConfGroups, groups) + assert.Equalf(t, sim.wantClassifyCalls, mockClr.calls, "classify calls") + assert.Equalf(t, sim.wantComposeCalls, mockCmr.calls, "compose calls") +} + +func (sim discoverySim) collectGroups(t *testing.T, pl *Pipeline) []*confgroup.Group { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + in := make(chan []*confgroup.Group) + done := make(chan struct{}) + + go func() { defer close(done); pl.Run(ctx, in) }() + + timeout := time.Second * 10 + var groups []*confgroup.Group + + func() { + for { + select { + case inGroups := <-in: + groups = append(groups, inGroups...) + case <-done: + return + case <-time.After(timeout): + t.Logf("discovery timed out after %s, got %d groups, expected %d, some events are skipped", + timeout, len(groups), len(sim.wantConfGroups)) + return + } + } + }() + + return groups +} + +type mockClassificator struct { + calls int + clr *targetClassificator +} + +func (m *mockClassificator) classify(tgt model.Target) model.Tags { + m.calls++ + return m.clr.classify(tgt) +} + +type mockComposer struct { + calls int + cmr *configComposer +} + +func (m *mockComposer) compose(tgt model.Target) []confgroup.Config { + m.calls++ + return m.cmr.compose(tgt) +} + +func sortConfigGroups(groups []*confgroup.Group) { + sort.Slice(groups, func(i, j int) bool { + return groups[i].Source < groups[j].Source + }) + + for _, g := range groups { + sort.Slice(g.Configs, func(i, j int) bool { + return g.Configs[i].Name() < g.Configs[j].Name() + }) + } +} |