diff options
Diffstat (limited to 'src/go/plugin/go.d/agent/discovery')
15 files changed, 111 insertions, 75 deletions
diff --git a/src/go/plugin/go.d/agent/discovery/file/parse.go b/src/go/plugin/go.d/agent/discovery/file/parse.go index 5fd31f32a..ae48fcb8f 100644 --- a/src/go/plugin/go.d/agent/discovery/file/parse.go +++ b/src/go/plugin/go.d/agent/discovery/file/parse.go @@ -97,7 +97,7 @@ func parseSDFormat(reg confgroup.Registry, path string, bs []byte) (*confgroup.G } func cfgFormat(bs []byte) format { - var data interface{} + var data any if err := yaml.Unmarshal(bs, &data); err != nil { return unknownFormat } diff --git a/src/go/plugin/go.d/agent/discovery/file/sim_test.go b/src/go/plugin/go.d/agent/discovery/file/sim_test.go index 3219c6892..a0a8c4425 100644 --- a/src/go/plugin/go.d/agent/discovery/file/sim_test.go +++ b/src/go/plugin/go.d/agent/discovery/file/sim_test.go @@ -110,7 +110,7 @@ func (d *tmpDir) renameFile(origFilename, newFilename string) { require.NoError(d.t, err) } -func (d *tmpDir) writeYAML(filename string, in interface{}) { +func (d *tmpDir) writeYAML(filename string, in any) { bs, err := yaml.Marshal(in) require.NoError(d.t, err) err = os.WriteFile(filename, bs, 0644) diff --git a/src/go/plugin/go.d/agent/discovery/sd/conffile.go b/src/go/plugin/go.d/agent/discovery/sd/conffile.go index e08a4021b..60a7208d2 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/conffile.go +++ b/src/go/plugin/go.d/agent/discovery/sd/conffile.go @@ -7,7 +7,7 @@ import ( "os" "github.com/netdata/netdata/go/plugins/logger" - "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/multipath" + "github.com/netdata/netdata/go/plugins/pkg/multipath" ) type confFile struct { diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go index 1cea014a9..e79d8b562 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/dockerd/docker.go @@ -13,8 +13,8 @@ import ( "github.com/netdata/netdata/go/plugins/logger" "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/confopt" "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/dockerhost" - "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" "github.com/docker/docker/api/types" typesContainer "github.com/docker/docker/api/types/container" @@ -64,9 +64,9 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) { type Config struct { Source string - Tags string `yaml:"tags"` - Address string `yaml:"address"` - Timeout web.Duration `yaml:"timeout"` + Tags string `yaml:"tags"` + Address string `yaml:"address"` + Timeout confopt.Duration `yaml:"timeout"` } type ( diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go index 439e2b695..26f8a1bd2 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes.go @@ -234,7 +234,7 @@ func (d *KubeDiscoverer) setupServiceDiscoverer(ctx context.Context, namespace s return td } -func enqueue(queue *workqueue.Type, obj any) { +func enqueue(queue *workqueue.Typed[any], obj any) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { return diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go index ba60a47b4..908a2a192 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go @@ -134,7 +134,7 @@ func TestKubeDiscoverer_Discover(t *testing.T) { } func prepareDiscoverer(role role, namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { - client := fake.NewSimpleClientset(objects...) + client := fake.NewClientset(objects...) tags, _ := model.ParseTags("k8s") disc := &KubeDiscoverer{ tags: tags, diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go index 617081742..f5918ca35 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go @@ -58,7 +58,7 @@ func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer { panic("nil pod or cmap or secret informer") } - queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"}) + queue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[any]{Name: "pod"}) _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { enqueue(queue, obj) }, @@ -82,7 +82,7 @@ type podDiscoverer struct { podInformer cache.SharedInformer cmapInformer cache.SharedInformer secretInformer cache.SharedInformer - queue *workqueue.Type + queue *workqueue.Typed[any] } func (p *podDiscoverer) String() string { diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go index 1d5ae7cd5..ebcfe31cc 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/service.go @@ -53,7 +53,7 @@ type serviceDiscoverer struct { model.Base informer cache.SharedInformer - queue *workqueue.Type + queue *workqueue.Typed[any] } func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer { @@ -61,7 +61,8 @@ func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer { panic("nil service informer") } - queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"}) + queue := workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[any]{Name: "service"}) + _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { enqueue(queue, obj) }, UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/ll.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/ll.go new file mode 100644 index 000000000..fdb70f5a3 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/ll.go @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/netdata/netdata/go/plugins/pkg/executable" +) + +type localListeners interface { + discover(ctx context.Context) ([]byte, error) +} + +func newLocalListeners(timeout time.Duration) localListeners { + dir := os.Getenv("NETDATA_PLUGINS_DIR") + if dir == "" { + dir = executable.Directory + } + if dir == "" { + dir, _ = os.Getwd() + } + + return &localListenersExec{ + binPath: filepath.Join(dir, "local-listeners"), + timeout: timeout, + } +} + +type localListenersExec struct { + binPath string + timeout time.Duration +} + +func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) { + execCtx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + + // TCPv4/6 and UPDv4 sockets in LISTEN state + // https://github.com/netdata/netdata/blob/master/src/collectors/utils/local_listeners.c + args := []string{ + "no-udp6", + "no-local", + "no-inbound", + "no-outbound", + "no-namespaces", + } + + cmd := exec.CommandContext(execCtx, e.binPath, args...) + + bs, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("error on executing '%s': %v", cmd, err) + } + + return bs, nil +} diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go index 60dd92cb4..b38d0506f 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go @@ -10,8 +10,6 @@ import ( "fmt" "log/slog" "net" - "os" - "os/exec" "path/filepath" "sort" "strconv" @@ -19,8 +17,8 @@ import ( "time" "github.com/netdata/netdata/go/plugins/logger" - "github.com/netdata/netdata/go/plugins/pkg/executable" "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/confopt" "github.com/ilyam8/hashstructure" ) @@ -30,18 +28,27 @@ var ( fullName = fmt.Sprintf("sd:%s", shortName) ) +type Config struct { + Source string `yaml:"-"` + Tags string `yaml:"tags"` + + Interval *confopt.Duration `yaml:"interval"` + Timeout confopt.Duration `yaml:"timeout"` +} + func NewDiscoverer(cfg Config) (*Discoverer, error) { tags, err := model.ParseTags(cfg.Tags) if err != nil { return nil, fmt.Errorf("parse tags: %v", err) } - dir := os.Getenv("NETDATA_PLUGINS_DIR") - if dir == "" { - dir = executable.Directory + interval := time.Minute * 2 + if cfg.Interval != nil { + interval = cfg.Interval.Duration() } - if dir == "" { - dir, _ = os.Getwd() + timeout := time.Second * 5 + if cfg.Timeout.Duration() != 0 { + timeout = cfg.Timeout.Duration() } d := &Discoverer{ @@ -49,12 +56,10 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) { slog.String("component", "service discovery"), slog.String("discoverer", shortName), ), - cfgSource: cfg.Source, - ll: &localListenersExec{ - binPath: filepath.Join(dir, "local-listeners"), - timeout: time.Second * 5, - }, - interval: time.Minute * 2, + cfgSource: cfg.Source, + ll: newLocalListeners(timeout), + interval: interval, + timeout: timeout, expiryTime: time.Minute * 10, cache: make(map[uint64]*cacheItem), started: make(chan struct{}), @@ -65,11 +70,6 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) { return d, nil } -type Config struct { - Source string `yaml:"-"` - Tags string `yaml:"tags"` -} - type ( Discoverer struct { *logger.Logger @@ -78,6 +78,7 @@ type ( cfgSource string interval time.Duration + timeout time.Duration ll localListeners expiryTime time.Duration @@ -92,9 +93,6 @@ type ( lastSeenTime time.Time tgt model.Target } - localListeners interface { - discover(ctx context.Context) ([]byte, error) - } ) func (d *Discoverer) String() string { @@ -103,6 +101,7 @@ func (d *Discoverer) String() string { func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { d.Info("instance is started") + d.Debugf("used config: interval: %s, timeout: %s, cache expiration time: %s", d.interval, d.timeout, d.expiryTime) defer func() { d.Info("instance is stopped") }() close(d.started) @@ -112,6 +111,10 @@ func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup return } + if d.interval == 0 { + return + } + tk := time.NewTicker(d.interval) defer tk.Stop() @@ -295,35 +298,6 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { return tgts[:n], nil } -type localListenersExec struct { - binPath string - timeout time.Duration -} - -func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) { - execCtx, cancel := context.WithTimeout(ctx, e.timeout) - defer cancel() - - // TCPv4/6 and UPDv4 sockets in LISTEN state - // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c - args := []string{ - "no-udp6", - "no-local", - "no-inbound", - "no-outbound", - "no-namespaces", - } - - cmd := exec.CommandContext(execCtx, e.binPath, args...) - - bs, err := cmd.Output() - if err != nil { - return nil, fmt.Errorf("error on executing '%s': %v", cmd, err) - } - - return bs, nil -} - func extractComm(cmdLine string) string { if i := strings.IndexByte(cmdLine, ' '); i != -1 { cmdLine = cmdLine[:i] diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go index 5ed188a54..378e03c25 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/funcmap.go @@ -7,14 +7,16 @@ import ( "strconv" "text/template" - "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/matcher" + "github.com/netdata/netdata/go/plugins/pkg/matcher" "github.com/Masterminds/sprig/v3" "github.com/bmatcuk/doublestar/v4" ) func newFuncMap() template.FuncMap { - custom := map[string]interface{}{ + fm := sprig.TxtFuncMap() + + extra := map[string]any{ "match": funcMatchAny, "glob": func(value, pattern string, patterns ...string) bool { return funcMatchAny("glob", value, pattern, patterns...) @@ -25,9 +27,7 @@ func newFuncMap() template.FuncMap { }, } - fm := sprig.HermeticTxtFuncMap() - - for name, fn := range custom { + for name, fn := range extra { fm[name] = fn } diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go index e67b6d7ce..4f2e11199 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/pipeline_test.go @@ -35,13 +35,13 @@ func Test_defaultConfigs(t *testing.T) { require.NoError(t, err, "abs path") bs, err := os.ReadFile(file) - require.NoError(t, err, "read config file") + require.NoErrorf(t, err, "read config file '%s'", file) var cfg Config - require.NoError(t, yaml.Unmarshal(bs, &cfg), "unmarshal") + require.NoErrorf(t, yaml.Unmarshal(bs, &cfg), "unmarshal '%s'", e.Name()) _, err = New(cfg) - require.NoError(t, err, "create pipeline") + require.NoErrorf(t, err, "create pipeline '%s'", e.Name()) } } diff --git a/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go b/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go index 646e1abb1..7edf227c7 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go +++ b/src/go/plugin/go.d/agent/discovery/sd/pipeline/promport.go @@ -46,7 +46,6 @@ var prometheusPortAllocations = map[int]string{ 9125: "statsd_exporter", 9126: "new_relic_exporter", 9127: "pgbouncer_exporter", - 9128: "ceph_exporter", 9129: "haproxy_log_exporter", 9130: "unifi_poller", 9131: "varnish_exporter", @@ -193,7 +192,6 @@ var prometheusPortAllocations = map[int]string{ 9280: "citrix_netscaler_exporter", 9281: "fastd_exporter", 9282: "freeswitch_exporter", - 9283: "ceph_ceph-mgr_prometheus_plugin", 9284: "gobetween", 9285: "database_exporter", 9286: "vdo_compression_and_deduplication_exporter", diff --git a/src/go/plugin/go.d/agent/discovery/sd/sd.go b/src/go/plugin/go.d/agent/discovery/sd/sd.go index 687ebfba8..90207219d 100644 --- a/src/go/plugin/go.d/agent/discovery/sd/sd.go +++ b/src/go/plugin/go.d/agent/discovery/sd/sd.go @@ -9,9 +9,9 @@ import ( "sync" "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/pkg/multipath" "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/pipeline" - "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/multipath" "gopkg.in/yaml.v2" ) diff --git a/src/go/plugin/go.d/agent/discovery/sim_test.go b/src/go/plugin/go.d/agent/discovery/sim_test.go index b20344c3c..134ec29f9 100644 --- a/src/go/plugin/go.d/agent/discovery/sim_test.go +++ b/src/go/plugin/go.d/agent/discovery/sim_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) |