summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go34
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go268
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go160
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go434
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go648
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go209
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go456
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go137
8 files changed, 2346 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go
new file mode 100644
index 000000000..15a1e4745
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "errors"
+ "fmt"
+)
+
+type Config struct {
+ APIServer string `yaml:"api_server"` // TODO: not used
+ Role string `yaml:"role"`
+ Tags string `yaml:"tags"`
+ Namespaces []string `yaml:"namespaces"`
+ Selector struct {
+ Label string `yaml:"label"`
+ Field string `yaml:"field"`
+ } `yaml:"selector"`
+ Pod struct {
+ LocalMode bool `yaml:"local_mode"`
+ } `yaml:"pod"`
+}
+
+func validateConfig(cfg Config) error {
+ switch role(cfg.Role) {
+ case rolePod, roleService:
+ default:
+ return fmt.Errorf("unknown role: '%s'", cfg.Role)
+ }
+ if cfg.Tags == "" {
+ return errors.New("'tags' not set")
+ }
+ return nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go
new file mode 100644
index 000000000..aa153a34a
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go
@@ -0,0 +1,268 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient"
+
+ "github.com/ilyam8/hashstructure"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type role string
+
+const (
+ rolePod role = "pod"
+ roleService role = "service"
+)
+
+const (
+ envNodeName = "MY_NODE_NAME"
+)
+
+var log = logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", "kubernetes"),
+)
+
+func NewKubeDiscoverer(cfg Config) (*KubeDiscoverer, error) {
+ if err := validateConfig(cfg); err != nil {
+ return nil, fmt.Errorf("config validation: %v", err)
+ }
+
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ client, err := k8sclient.New("Netdata/service-td")
+ if err != nil {
+ return nil, fmt.Errorf("create clientset: %v", err)
+ }
+
+ ns := cfg.Namespaces
+ if len(ns) == 0 {
+ ns = []string{corev1.NamespaceAll}
+ }
+
+ selectorField := cfg.Selector.Field
+ if role(cfg.Role) == rolePod && cfg.Pod.LocalMode {
+ name := os.Getenv(envNodeName)
+ if name == "" {
+ return nil, fmt.Errorf("local_mode is enabled, but env '%s' not set", envNodeName)
+ }
+ selectorField = joinSelectors(selectorField, "spec.nodeName="+name)
+ }
+
+ d := &KubeDiscoverer{
+ Logger: log,
+ client: client,
+ tags: tags,
+ role: role(cfg.Role),
+ namespaces: ns,
+ selectorLabel: cfg.Selector.Label,
+ selectorField: selectorField,
+ discoverers: make([]model.Discoverer, 0, len(ns)),
+ started: make(chan struct{}),
+ }
+
+ return d, nil
+}
+
+type KubeDiscoverer struct {
+ *logger.Logger
+
+ client kubernetes.Interface
+
+ tags model.Tags
+ role role
+ namespaces []string
+ selectorLabel string
+ selectorField string
+ discoverers []model.Discoverer
+ started chan struct{}
+}
+
+func (d *KubeDiscoverer) String() string {
+ return "sd:k8s"
+}
+
+const resyncPeriod = 10 * time.Minute
+
+func (d *KubeDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer d.Info("instance is stopped")
+
+ for _, namespace := range d.namespaces {
+ var dd model.Discoverer
+ switch d.role {
+ case rolePod:
+ dd = d.setupPodDiscoverer(ctx, namespace)
+ case roleService:
+ dd = d.setupServiceDiscoverer(ctx, namespace)
+ default:
+ d.Errorf("unknown role: '%s'", d.role)
+ continue
+ }
+ d.discoverers = append(d.discoverers, dd)
+ }
+
+ if len(d.discoverers) == 0 {
+ d.Error("no discoverers registered")
+ return
+ }
+
+ d.Infof("registered: %v", d.discoverers)
+
+ var wg sync.WaitGroup
+ updates := make(chan []model.TargetGroup)
+
+ for _, disc := range d.discoverers {
+ wg.Add(1)
+ go func(disc model.Discoverer) { defer wg.Done(); disc.Discover(ctx, updates) }(disc)
+ }
+
+ done := make(chan struct{})
+ go func() { defer close(done); wg.Wait() }()
+
+ close(d.started)
+
+ for {
+ select {
+ case <-ctx.Done():
+ select {
+ case <-done:
+ d.Info("all discoverers exited")
+ case <-time.After(time.Second * 5):
+ d.Warning("not all discoverers exited")
+ }
+ return
+ case <-done:
+ d.Info("all discoverers exited")
+ return
+ case tggs := <-updates:
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+ }
+ }
+}
+
+func (d *KubeDiscoverer) setupPodDiscoverer(ctx context.Context, ns string) *podDiscoverer {
+ pod := d.client.CoreV1().Pods(ns)
+ podLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return pod.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return pod.Watch(ctx, opts)
+ },
+ }
+
+ cmap := d.client.CoreV1().ConfigMaps(ns)
+ cmapLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ return cmap.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ return cmap.Watch(ctx, opts)
+ },
+ }
+
+ secret := d.client.CoreV1().Secrets(ns)
+ secretLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ return secret.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ return secret.Watch(ctx, opts)
+ },
+ }
+
+ td := newPodDiscoverer(
+ cache.NewSharedInformer(podLW, &corev1.Pod{}, resyncPeriod),
+ cache.NewSharedInformer(cmapLW, &corev1.ConfigMap{}, resyncPeriod),
+ cache.NewSharedInformer(secretLW, &corev1.Secret{}, resyncPeriod),
+ )
+ td.Tags().Merge(d.tags)
+
+ return td
+}
+
+func (d *KubeDiscoverer) setupServiceDiscoverer(ctx context.Context, namespace string) *serviceDiscoverer {
+ svc := d.client.CoreV1().Services(namespace)
+
+ svcLW := &cache.ListWatch{
+ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return svc.List(ctx, opts)
+ },
+ WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
+ opts.FieldSelector = d.selectorField
+ opts.LabelSelector = d.selectorLabel
+ return svc.Watch(ctx, opts)
+ },
+ }
+
+ inf := cache.NewSharedInformer(svcLW, &corev1.Service{}, resyncPeriod)
+
+ td := newServiceDiscoverer(inf)
+ td.Tags().Merge(d.tags)
+
+ return td
+}
+
+func enqueue(queue *workqueue.Type, obj any) {
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err != nil {
+ return
+ }
+ queue.Add(key)
+}
+
+func send(ctx context.Context, in chan<- []model.TargetGroup, tgg model.TargetGroup) {
+ if tgg == nil {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ case in <- []model.TargetGroup{tgg}:
+ }
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}
+
+func joinSelectors(srs ...string) string {
+ var i int
+ for _, v := range srs {
+ if v != "" {
+ srs[i] = v
+ i++
+ }
+ }
+ return strings.Join(srs[:i], ",")
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go
new file mode 100644
index 000000000..9743a0af5
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go
@@ -0,0 +1,160 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "fmt"
+ "os"
+ "testing"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/fake"
+)
+
+var discoveryTags, _ = model.ParseTags("k8s")
+
+func TestMain(m *testing.M) {
+ _ = os.Setenv(envNodeName, "m01")
+ _ = os.Setenv(k8sclient.EnvFakeClient, "true")
+ code := m.Run()
+ _ = os.Unsetenv(envNodeName)
+ _ = os.Unsetenv(k8sclient.EnvFakeClient)
+ os.Exit(code)
+}
+
+func TestNewKubeDiscoverer(t *testing.T) {
+ tests := map[string]struct {
+ cfg Config
+ wantErr bool
+ }{
+ "pod role config": {
+ wantErr: false,
+ cfg: Config{Role: string(rolePod), Tags: "k8s"},
+ },
+ "service role config": {
+ wantErr: false,
+ cfg: Config{Role: string(roleService), Tags: "k8s"},
+ },
+ "empty config": {
+ wantErr: true,
+ cfg: Config{},
+ },
+ }
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ disc, err := NewKubeDiscoverer(test.cfg)
+
+ if test.wantErr {
+ assert.Error(t, err)
+ assert.Nil(t, disc)
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, disc)
+ }
+ })
+ }
+}
+
+func TestKubeDiscoverer_Discover(t *testing.T) {
+ const prod = "prod"
+ const dev = "dev"
+ prodNamespace := newNamespace(prod)
+ devNamespace := newNamespace(dev)
+
+ tests := map[string]struct {
+ createSim func() discoverySim
+ }{
+ "multiple namespaces pod td": {
+ createSim: func() discoverySim {
+ httpdProd, nginxProd := newHTTPDPod(), newNGINXPod()
+ httpdProd.Namespace = prod
+ nginxProd.Namespace = prod
+
+ httpdDev, nginxDev := newHTTPDPod(), newNGINXPod()
+ httpdDev.Namespace = dev
+ nginxDev.Namespace = dev
+
+ disc, _ := preparePodDiscoverer(
+ []string{prod, dev},
+ prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev)
+
+ return discoverySim{
+ td: disc,
+ sortBeforeVerify: true,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpdDev),
+ preparePodTargetGroup(nginxDev),
+ preparePodTargetGroup(httpdProd),
+ preparePodTargetGroup(nginxProd),
+ },
+ }
+ },
+ },
+ "multiple namespaces ClusterIP service td": {
+ createSim: func() discoverySim {
+ httpdProd, nginxProd := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ httpdProd.Namespace = prod
+ nginxProd.Namespace = prod
+
+ httpdDev, nginxDev := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ httpdDev.Namespace = dev
+ nginxDev.Namespace = dev
+
+ disc, _ := prepareSvcDiscoverer(
+ []string{prod, dev},
+ prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev)
+
+ return discoverySim{
+ td: disc,
+ sortBeforeVerify: true,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpdDev),
+ prepareSvcTargetGroup(nginxDev),
+ prepareSvcTargetGroup(httpdProd),
+ prepareSvcTargetGroup(nginxProd),
+ },
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareDiscoverer(role role, namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ client := fake.NewSimpleClientset(objects...)
+ tags, _ := model.ParseTags("k8s")
+ disc := &KubeDiscoverer{
+ tags: tags,
+ role: role,
+ namespaces: namespaces,
+ client: client,
+ discoverers: nil,
+ started: make(chan struct{}),
+ }
+ return disc, client
+}
+
+func newNamespace(name string) *corev1.Namespace {
+ return &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}
+}
+
+func mustCalcHash(obj any) uint64 {
+ hash, err := calcHash(obj)
+ if err != nil {
+ panic(fmt.Sprintf("hash calculation: %v", err))
+ }
+ return hash
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
new file mode 100644
index 000000000..a271e7285
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
@@ -0,0 +1,434 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type podTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (p podTargetGroup) Provider() string { return "sd:k8s:pod" }
+func (p podTargetGroup) Source() string { return p.source }
+func (p podTargetGroup) Targets() []model.Target { return p.targets }
+
+type PodTarget struct {
+ model.Base `hash:"ignore"`
+
+ hash uint64
+ tuid string
+
+ Address string
+ Namespace string
+ Name string
+ Annotations map[string]any
+ Labels map[string]any
+ NodeName string
+ PodIP string
+ ControllerName string
+ ControllerKind string
+ ContName string
+ Image string
+ Env map[string]any
+ Port string
+ PortName string
+ PortProtocol string
+}
+
+func (p PodTarget) Hash() uint64 { return p.hash }
+func (p PodTarget) TUID() string { return p.tuid }
+
+func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer {
+
+ if pod == nil || cmap == nil || secret == nil {
+ panic("nil pod or cmap or secret informer")
+ }
+
+ queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"})
+
+ _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj any) { enqueue(queue, obj) },
+ UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
+ DeleteFunc: func(obj any) { enqueue(queue, obj) },
+ })
+
+ return &podDiscoverer{
+ Logger: log,
+ podInformer: pod,
+ cmapInformer: cmap,
+ secretInformer: secret,
+ queue: queue,
+ }
+}
+
+type podDiscoverer struct {
+ *logger.Logger
+ model.Base
+
+ podInformer cache.SharedInformer
+ cmapInformer cache.SharedInformer
+ secretInformer cache.SharedInformer
+ queue *workqueue.Type
+}
+
+func (p *podDiscoverer) String() string {
+ return "sd:k8s:pod"
+}
+
+func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ p.Info("instance is started")
+ defer p.Info("instance is stopped")
+ defer p.queue.ShutDown()
+
+ go p.podInformer.Run(ctx.Done())
+ go p.cmapInformer.Run(ctx.Done())
+ go p.secretInformer.Run(ctx.Done())
+
+ if !cache.WaitForCacheSync(ctx.Done(),
+ p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) {
+ p.Error("failed to sync caches")
+ return
+ }
+
+ go p.run(ctx, in)
+
+ <-ctx.Done()
+}
+
+func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
+ for {
+ item, shutdown := p.queue.Get()
+ if shutdown {
+ return
+ }
+ p.handleQueueItem(ctx, in, item)
+ }
+}
+
+func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
+ defer p.queue.Done(item)
+
+ key := item.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return
+ }
+
+ obj, ok, err := p.podInformer.GetStore().GetByKey(key)
+ if err != nil {
+ return
+ }
+
+ if !ok {
+ tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)}
+ send(ctx, in, tgg)
+ return
+ }
+
+ pod, err := toPod(obj)
+ if err != nil {
+ return
+ }
+
+ tgg := p.buildTargetGroup(pod)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(p.Tags())
+ }
+
+ send(ctx, in, tgg)
+
+}
+
+func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup {
+ if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 {
+ return &podTargetGroup{
+ source: podSource(pod),
+ }
+ }
+ return &podTargetGroup{
+ source: podSource(pod),
+ targets: p.buildTargets(pod),
+ }
+}
+
+func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) {
+ var name, kind string
+ for _, ref := range pod.OwnerReferences {
+ if ref.Controller != nil && *ref.Controller {
+ name = ref.Name
+ kind = ref.Kind
+ break
+ }
+ }
+
+ for _, container := range pod.Spec.Containers {
+ env := p.collectEnv(pod.Namespace, container)
+
+ if len(container.Ports) == 0 {
+ tgt := &PodTarget{
+ tuid: podTUID(pod, container),
+ Address: pod.Status.PodIP,
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: name,
+ ControllerKind: kind,
+ ContName: container.Name,
+ Image: container.Image,
+ Env: mapAny(env),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ } else {
+ for _, port := range container.Ports {
+ portNum := strconv.FormatUint(uint64(port.ContainerPort), 10)
+ tgt := &PodTarget{
+ tuid: podTUIDWithPort(pod, container, port),
+ Address: net.JoinHostPort(pod.Status.PodIP, portNum),
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: name,
+ ControllerKind: kind,
+ ContName: container.Name,
+ Image: container.Image,
+ Env: mapAny(env),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ }
+ }
+ }
+
+ return targets
+}
+
+func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string {
+ vars := make(map[string]string)
+
+ // When a key exists in multiple sources,
+ // the value associated with the last source will take precedence.
+ // Values defined by an Env with a duplicate key will take precedence.
+ //
+ // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go)
+ // - envFrom: configMapRef, secretRef
+ // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap
+
+ for _, src := range container.EnvFrom {
+ switch {
+ case src.ConfigMapRef != nil:
+ p.envFromConfigMap(vars, ns, src)
+ case src.SecretRef != nil:
+ p.envFromSecret(vars, ns, src)
+ }
+ }
+
+ for _, env := range container.Env {
+ if env.Name == "" || isVar(env.Name) {
+ continue
+ }
+ switch {
+ case env.Value != "":
+ vars[env.Name] = env.Value
+ case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil:
+ p.valueFromSecret(vars, ns, env)
+ case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil:
+ p.valueFromConfigMap(vars, ns, env)
+ }
+ }
+
+ if len(vars) == 0 {
+ return nil
+ }
+ return vars
+}
+
+func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) {
+ if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" {
+ return
+ }
+
+ sr := env.ValueFrom.ConfigMapKeyRef
+ key := ns + "/" + sr.Name
+
+ item, exist, err := p.cmapInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ cmap, err := toConfigMap(item)
+ if err != nil {
+ return
+ }
+
+ if v, ok := cmap.Data[sr.Key]; ok {
+ vars[env.Name] = v
+ }
+}
+
+func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) {
+ if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" {
+ return
+ }
+
+ secretKey := env.ValueFrom.SecretKeyRef
+ key := ns + "/" + secretKey.Name
+
+ item, exist, err := p.secretInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ secret, err := toSecret(item)
+ if err != nil {
+ return
+ }
+
+ if v, ok := secret.Data[secretKey.Key]; ok {
+ vars[env.Name] = string(v)
+ }
+}
+
+func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) {
+ if src.ConfigMapRef.Name == "" {
+ return
+ }
+
+ key := ns + "/" + src.ConfigMapRef.Name
+ item, exist, err := p.cmapInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ cmap, err := toConfigMap(item)
+ if err != nil {
+ return
+ }
+
+ for k, v := range cmap.Data {
+ vars[src.Prefix+k] = v
+ }
+}
+
+func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) {
+ if src.SecretRef.Name == "" {
+ return
+ }
+
+ key := ns + "/" + src.SecretRef.Name
+ item, exist, err := p.secretInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ secret, err := toSecret(item)
+ if err != nil {
+ return
+ }
+
+ for k, v := range secret.Data {
+ vars[src.Prefix+k] = string(v)
+ }
+}
+
+func podTUID(pod *corev1.Pod, container corev1.Container) string {
+ return fmt.Sprintf("%s_%s_%s",
+ pod.Namespace,
+ pod.Name,
+ container.Name,
+ )
+}
+
+func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string {
+ return fmt.Sprintf("%s_%s_%s_%s_%s",
+ pod.Namespace,
+ pod.Name,
+ container.Name,
+ strings.ToLower(string(port.Protocol)),
+ strconv.FormatUint(uint64(port.ContainerPort), 10),
+ )
+}
+
+func podSourceFromNsName(namespace, name string) string {
+ return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name)
+}
+
+func podSource(pod *corev1.Pod) string {
+ return podSourceFromNsName(pod.Namespace, pod.Name)
+}
+
+func toPod(obj any) (*corev1.Pod, error) {
+ pod, ok := obj.(*corev1.Pod)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return pod, nil
+}
+
+func toConfigMap(obj any) (*corev1.ConfigMap, error) {
+ cmap, ok := obj.(*corev1.ConfigMap)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return cmap, nil
+}
+
+func toSecret(obj any) (*corev1.Secret, error) {
+ secret, ok := obj.(*corev1.Secret)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return secret, nil
+}
+
+func isVar(name string) bool {
+ // Variable references $(VAR_NAME) are expanded using the previous defined
+ // environment variables in the container and any service environment
+ // variables.
+ return strings.IndexByte(name, '$') != -1
+}
+
+func mapAny(src map[string]string) map[string]any {
+ if src == nil {
+ return nil
+ }
+ m := make(map[string]any, len(src))
+ for k, v := range src {
+ m[k] = v
+ }
+ return m
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go
new file mode 100644
index 000000000..ebe92d2f6
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go
@@ -0,0 +1,648 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "net"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+func TestPodTargetGroup_Provider(t *testing.T) {
+ var p podTargetGroup
+ assert.NotEmpty(t, p.Provider())
+}
+
+func TestPodTargetGroup_Source(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantSources []string
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantSources: []string{
+ "discoverer=k8s,kind=pod,namespace=default,pod_name=httpd-dd95c4d68-5bkwl",
+ "discoverer=k8s,kind=pod,namespace=default,pod_name=nginx-7cfd77469b-q6kxj",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var sources []string
+ for _, tgg := range sim.run(t) {
+ sources = append(sources, tgg.Source())
+ }
+
+ assert.Equal(t, test.wantSources, sources)
+ })
+ }
+}
+
+func TestPodTargetGroup_Targets(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTargets int
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: discovery,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantTargets: 4,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var targets int
+ for _, tgg := range sim.run(t) {
+ targets += len(tgg.Targets())
+ }
+
+ assert.Equal(t, test.wantTargets, targets)
+ })
+ }
+}
+
+func TestPodTarget_Hash(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantHashes []uint64
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: discovery,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantHashes: []uint64{
+ 12703169414253998055,
+ 13351713096133918928,
+ 8241692333761256175,
+ 11562466355572729519,
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var hashes []uint64
+ for _, tgg := range sim.run(t) {
+ for _, tg := range tgg.Targets() {
+ hashes = append(hashes, tg.Hash())
+ }
+ }
+
+ assert.Equal(t, test.wantHashes, hashes)
+ })
+ }
+}
+
+func TestPodTarget_TUID(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTUID []string
+ }{
+ "pods with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: discovery,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ wantTUID: []string{
+ "default_httpd-dd95c4d68-5bkwl_httpd_tcp_80",
+ "default_httpd-dd95c4d68-5bkwl_httpd_tcp_443",
+ "default_nginx-7cfd77469b-q6kxj_nginx_tcp_80",
+ "default_nginx-7cfd77469b-q6kxj_nginx_tcp_443",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var tuid []string
+ for _, tgg := range sim.run(t) {
+ for _, tg := range tgg.Targets() {
+ tuid = append(tuid, tg.TUID())
+ }
+ }
+
+ assert.Equal(t, test.wantTUID, tuid)
+ })
+ }
+}
+
+func TestNewPodDiscoverer(t *testing.T) {
+ tests := map[string]struct {
+ podInf cache.SharedInformer
+ cmapInf cache.SharedInformer
+ secretInf cache.SharedInformer
+ wantPanic bool
+ }{
+ "valid informers": {
+ wantPanic: false,
+ podInf: cache.NewSharedInformer(nil, &corev1.Pod{}, resyncPeriod),
+ cmapInf: cache.NewSharedInformer(nil, &corev1.ConfigMap{}, resyncPeriod),
+ secretInf: cache.NewSharedInformer(nil, &corev1.Secret{}, resyncPeriod),
+ },
+ "nil informers": {
+ wantPanic: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ f := func() { newPodDiscoverer(test.podInf, test.cmapInf, test.secretInf) }
+
+ if test.wantPanic {
+ assert.Panics(t, f)
+ } else {
+ assert.NotPanics(t, f)
+ }
+ })
+ }
+}
+
+func TestPodDiscoverer_String(t *testing.T) {
+ var p podDiscoverer
+ assert.NotEmpty(t, p.String())
+}
+
+func TestPodDiscoverer_Discover(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ }{
+ "ADD: pods exist before run": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ td, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: td,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: pods exist before run and add after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, client := prepareAllNsPodDiscoverer(httpd)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "DELETE: remove pods after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, client := prepareAllNsPodDiscoverer(httpd, nginx)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _ = podClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "DELETE,ADD: remove and add pods after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ disc, client := prepareAllNsPodDiscoverer(httpd)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(httpd),
+ preparePodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: pods with empty PodIP": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ httpd.Status.PodIP = ""
+ nginx.Status.PodIP = ""
+ disc, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "UPDATE: set pods PodIP after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ httpd.Status.PodIP = ""
+ nginx.Status.PodIP = ""
+ disc, client := prepareAllNsPodDiscoverer(httpd, nginx)
+ podClient := client.CoreV1().Pods("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _, _ = podClient.Update(ctx, newHTTPDPod(), metav1.UpdateOptions{})
+ _, _ = podClient.Update(ctx, newNGINXPod(), metav1.UpdateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ preparePodTargetGroup(newHTTPDPod()),
+ preparePodTargetGroup(newNGINXPod()),
+ },
+ }
+ },
+ },
+ "ADD: pods without containers": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDPod(), newNGINXPod()
+ httpd.Spec.Containers = httpd.Spec.Containers[:0]
+ nginx.Spec.Containers = httpd.Spec.Containers[:0]
+ disc, _ := prepareAllNsPodDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptyPodTargetGroup(httpd),
+ prepareEmptyPodTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "Env: from value": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.Env = []corev1.EnvVar{
+ {Name: "key1", Value: "value1"},
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1"}
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "Env: from Secret": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.Env = []corev1.EnvVar{
+ {
+ Name: "key1",
+ ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"},
+ Key: "key1",
+ }},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1"}
+ secret := prepareSecret("my-secret", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, secret)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "Env: from ConfigMap": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.Env = []corev1.EnvVar{
+ {
+ Name: "key1",
+ ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"},
+ Key: "key1",
+ }},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1"}
+ cmap := prepareConfigMap("my-cmap", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, cmap)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "EnvFrom: from ConfigMap": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.EnvFrom = []corev1.EnvFromSource{
+ {
+ ConfigMapRef: &corev1.ConfigMapEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1", "key2": "value2"}
+ cmap := prepareConfigMap("my-cmap", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, cmap)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ "EnvFrom: from Secret": {
+ createSim: func() discoverySim {
+ httpd := newHTTPDPod()
+ mangle := func(c *corev1.Container) {
+ c.EnvFrom = []corev1.EnvFromSource{
+ {
+ SecretRef: &corev1.SecretEnvSource{
+ LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}},
+ },
+ }
+ }
+ mangleContainers(httpd.Spec.Containers, mangle)
+ data := map[string]string{"key1": "value1", "key2": "value2"}
+ secret := prepareSecret("my-secret", data)
+
+ disc, _ := prepareAllNsPodDiscoverer(httpd, secret)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ preparePodTargetGroupWithEnv(httpd, data),
+ },
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareAllNsPodDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(rolePod, []string{corev1.NamespaceAll}, objects...)
+}
+
+func preparePodDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(rolePod, namespaces, objects...)
+}
+
+func mangleContainers(containers []corev1.Container, mange func(container *corev1.Container)) {
+ for i := range containers {
+ mange(&containers[i])
+ }
+}
+
+var controllerTrue = true
+
+func newHTTPDPod() *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "httpd-dd95c4d68-5bkwl",
+ Namespace: "default",
+ UID: "1cebb6eb-0c1e-495b-8131-8fa3e6668dc8",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "httpd", "tier": "frontend"},
+ OwnerReferences: []metav1.OwnerReference{
+ {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue},
+ },
+ },
+ Spec: corev1.PodSpec{
+ NodeName: "m01",
+ Containers: []corev1.Container{
+ {
+ Name: "httpd",
+ Image: "httpd",
+ Ports: []corev1.ContainerPort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443},
+ },
+ },
+ },
+ },
+ Status: corev1.PodStatus{
+ PodIP: "172.17.0.1",
+ },
+ }
+}
+
+func newNGINXPod() *corev1.Pod {
+ return &corev1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "nginx-7cfd77469b-q6kxj",
+ Namespace: "default",
+ UID: "09e883f2-d740-4c5f-970d-02cf02876522",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "nginx", "tier": "frontend"},
+ OwnerReferences: []metav1.OwnerReference{
+ {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue},
+ },
+ },
+ Spec: corev1.PodSpec{
+ NodeName: "m01",
+ Containers: []corev1.Container{
+ {
+ Name: "nginx",
+ Image: "nginx",
+ Ports: []corev1.ContainerPort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443},
+ },
+ },
+ },
+ },
+ Status: corev1.PodStatus{
+ PodIP: "172.17.0.2",
+ },
+ }
+}
+
+func prepareConfigMap(name string, data map[string]string) *corev1.ConfigMap {
+ return &corev1.ConfigMap{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: "default",
+ UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8167" + name),
+ },
+ Data: data,
+ }
+}
+
+func prepareSecret(name string, data map[string]string) *corev1.Secret {
+ secretData := make(map[string][]byte, len(data))
+ for k, v := range data {
+ secretData[k] = []byte(v)
+ }
+ return &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: "default",
+ UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8161" + name),
+ },
+ Data: secretData,
+ }
+}
+
+func prepareEmptyPodTargetGroup(pod *corev1.Pod) *podTargetGroup {
+ return &podTargetGroup{source: podSource(pod)}
+}
+
+func preparePodTargetGroup(pod *corev1.Pod) *podTargetGroup {
+ tgg := prepareEmptyPodTargetGroup(pod)
+
+ for _, container := range pod.Spec.Containers {
+ for _, port := range container.Ports {
+ portNum := strconv.FormatUint(uint64(port.ContainerPort), 10)
+ tgt := &PodTarget{
+ tuid: podTUIDWithPort(pod, container, port),
+ Address: net.JoinHostPort(pod.Status.PodIP, portNum),
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: "netdata-test",
+ ControllerKind: "DaemonSet",
+ ContName: container.Name,
+ Image: container.Image,
+ Env: nil,
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ }
+ tgt.hash = mustCalcHash(tgt)
+ tgt.Tags().Merge(discoveryTags)
+
+ tgg.targets = append(tgg.targets, tgt)
+ }
+ }
+
+ return tgg
+}
+
+func preparePodTargetGroupWithEnv(pod *corev1.Pod, env map[string]string) *podTargetGroup {
+ tgg := preparePodTargetGroup(pod)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.(*PodTarget).Env = mapAny(env)
+ tgt.(*PodTarget).hash = mustCalcHash(tgt)
+ }
+
+ return tgg
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
new file mode 100644
index 000000000..4cfdd62f1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
@@ -0,0 +1,209 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type serviceTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" }
+func (s serviceTargetGroup) Source() string { return s.source }
+func (s serviceTargetGroup) Targets() []model.Target { return s.targets }
+
+type ServiceTarget struct {
+ model.Base `hash:"ignore"`
+
+ hash uint64
+ tuid string
+
+ Address string
+ Namespace string
+ Name string
+ Annotations map[string]any
+ Labels map[string]any
+ Port string
+ PortName string
+ PortProtocol string
+ ClusterIP string
+ ExternalName string
+ Type string
+}
+
+func (s ServiceTarget) Hash() uint64 { return s.hash }
+func (s ServiceTarget) TUID() string { return s.tuid }
+
+type serviceDiscoverer struct {
+ *logger.Logger
+ model.Base
+
+ informer cache.SharedInformer
+ queue *workqueue.Type
+}
+
+func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer {
+ if inf == nil {
+ panic("nil service informer")
+ }
+
+ queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"})
+ _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj any) { enqueue(queue, obj) },
+ UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
+ DeleteFunc: func(obj any) { enqueue(queue, obj) },
+ })
+
+ return &serviceDiscoverer{
+ Logger: log,
+ informer: inf,
+ queue: queue,
+ }
+}
+
+func (s *serviceDiscoverer) String() string {
+ return "k8s service"
+}
+
+func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) {
+ s.Info("instance is started")
+ defer s.Info("instance is stopped")
+ defer s.queue.ShutDown()
+
+ go s.informer.Run(ctx.Done())
+
+ if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
+ s.Error("failed to sync caches")
+ return
+ }
+
+ go s.run(ctx, ch)
+
+ <-ctx.Done()
+}
+
+func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
+ for {
+ item, shutdown := s.queue.Get()
+ if shutdown {
+ return
+ }
+
+ s.handleQueueItem(ctx, in, item)
+ }
+}
+
+func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
+ defer s.queue.Done(item)
+
+ key := item.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return
+ }
+
+ obj, exists, err := s.informer.GetStore().GetByKey(key)
+ if err != nil {
+ return
+ }
+
+ if !exists {
+ tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)}
+ send(ctx, in, tgg)
+ return
+ }
+
+ svc, err := toService(obj)
+ if err != nil {
+ return
+ }
+
+ tgg := s.buildTargetGroup(svc)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(s.Tags())
+ }
+
+ send(ctx, in, tgg)
+}
+
+func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup {
+ // TODO: headless service?
+ if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 {
+ return &serviceTargetGroup{
+ source: serviceSource(svc),
+ }
+ }
+ return &serviceTargetGroup{
+ source: serviceSource(svc),
+ targets: s.buildTargets(svc),
+ }
+}
+
+func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) {
+ for _, port := range svc.Spec.Ports {
+ portNum := strconv.FormatInt(int64(port.Port), 10)
+ tgt := &ServiceTarget{
+ tuid: serviceTUID(svc, port),
+ Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum),
+ Namespace: svc.Namespace,
+ Name: svc.Name,
+ Annotations: mapAny(svc.Annotations),
+ Labels: mapAny(svc.Labels),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ ClusterIP: svc.Spec.ClusterIP,
+ ExternalName: svc.Spec.ExternalName,
+ Type: string(svc.Spec.Type),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ }
+
+ return targets
+}
+
+func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string {
+ return fmt.Sprintf("%s_%s_%s_%s",
+ svc.Namespace,
+ svc.Name,
+ strings.ToLower(string(port.Protocol)),
+ strconv.FormatInt(int64(port.Port), 10),
+ )
+}
+
+func serviceSourceFromNsName(namespace, name string) string {
+ return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name)
+}
+
+func serviceSource(svc *corev1.Service) string {
+ return serviceSourceFromNsName(svc.Namespace, svc.Name)
+}
+
+func toService(obj any) (*corev1.Service, error) {
+ svc, ok := obj.(*corev1.Service)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return svc, nil
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go
new file mode 100644
index 000000000..d2e496015
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go
@@ -0,0 +1,456 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "net"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+func TestServiceTargetGroup_Provider(t *testing.T) {
+ var s serviceTargetGroup
+ assert.NotEmpty(t, s.Provider())
+}
+
+func TestServiceTargetGroup_Source(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantSources []string
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantSources: []string{
+ "discoverer=k8s,kind=service,namespace=default,service_name=httpd-cluster-ip-service",
+ "discoverer=k8s,kind=service,namespace=default,service_name=nginx-cluster-ip-service",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var sources []string
+ for _, tgg := range sim.run(t) {
+ sources = append(sources, tgg.Source())
+ }
+
+ assert.Equal(t, test.wantSources, sources)
+ })
+ }
+}
+
+func TestServiceTargetGroup_Targets(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTargets int
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantTargets: 4,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var targets int
+ for _, tgg := range sim.run(t) {
+ targets += len(tgg.Targets())
+ }
+
+ assert.Equal(t, test.wantTargets, targets)
+ })
+ }
+}
+
+func TestServiceTarget_Hash(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantHashes []uint64
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantHashes: []uint64{
+ 17611803477081780974,
+ 6019985892433421258,
+ 4151907287549842238,
+ 5757608926096186119,
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var hashes []uint64
+ for _, tgg := range sim.run(t) {
+ for _, tgt := range tgg.Targets() {
+ hashes = append(hashes, tgt.Hash())
+ }
+ }
+
+ assert.Equal(t, test.wantHashes, hashes)
+ })
+ }
+}
+
+func TestServiceTarget_TUID(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ wantTUID []string
+ }{
+ "ClusterIP svc with multiple ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ wantTUID: []string{
+ "default_httpd-cluster-ip-service_tcp_80",
+ "default_httpd-cluster-ip-service_tcp_443",
+ "default_nginx-cluster-ip-service_tcp_80",
+ "default_nginx-cluster-ip-service_tcp_443",
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+
+ var tuid []string
+ for _, tgg := range sim.run(t) {
+ for _, tgt := range tgg.Targets() {
+ tuid = append(tuid, tgt.TUID())
+ }
+ }
+
+ assert.Equal(t, test.wantTUID, tuid)
+ })
+ }
+}
+
+func TestNewServiceDiscoverer(t *testing.T) {
+ tests := map[string]struct {
+ informer cache.SharedInformer
+ wantPanic bool
+ }{
+ "valid informer": {
+ wantPanic: false,
+ informer: cache.NewSharedInformer(nil, &corev1.Service{}, resyncPeriod),
+ },
+ "nil informer": {
+ wantPanic: true,
+ informer: nil,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ f := func() { newServiceDiscoverer(test.informer) }
+
+ if test.wantPanic {
+ assert.Panics(t, f)
+ } else {
+ assert.NotPanics(t, f)
+ }
+ })
+ }
+}
+
+func TestServiceDiscoverer_String(t *testing.T) {
+ var s serviceDiscoverer
+ assert.NotEmpty(t, s.String())
+}
+
+func TestServiceDiscoverer_Discover(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() discoverySim
+ }{
+ "ADD: ClusterIP svc exist before run": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: ClusterIP svc exist before run and add after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, client := prepareAllNsSvcDiscoverer(httpd)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "DELETE: ClusterIP svc remove after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, client := prepareAllNsSvcDiscoverer(httpd, nginx)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _ = svcClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD,DELETE: ClusterIP svc remove and add after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ disc, client := prepareAllNsSvcDiscoverer(httpd)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{})
+ _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareSvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(httpd),
+ prepareSvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "ADD: Headless svc exist before run": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService()
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ "UPDATE: Headless => ClusterIP svc after sync": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService()
+ httpdUpd, nginxUpd := *httpd, *nginx
+ httpdUpd.Spec.ClusterIP = "10.100.0.1"
+ nginxUpd.Spec.ClusterIP = "10.100.0.2"
+ disc, client := prepareAllNsSvcDiscoverer(httpd, nginx)
+ svcClient := client.CoreV1().Services("default")
+
+ return discoverySim{
+ td: disc,
+ runAfterSync: func(ctx context.Context) {
+ time.Sleep(time.Millisecond * 50)
+ _, _ = svcClient.Update(ctx, &httpdUpd, metav1.UpdateOptions{})
+ _, _ = svcClient.Update(ctx, &nginxUpd, metav1.UpdateOptions{})
+ },
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ prepareSvcTargetGroup(&httpdUpd),
+ prepareSvcTargetGroup(&nginxUpd),
+ },
+ }
+ },
+ },
+ "ADD: ClusterIP svc with zero exposed ports": {
+ createSim: func() discoverySim {
+ httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService()
+ httpd.Spec.Ports = httpd.Spec.Ports[:0]
+ nginx.Spec.Ports = httpd.Spec.Ports[:0]
+ disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx)
+
+ return discoverySim{
+ td: disc,
+ wantTargetGroups: []model.TargetGroup{
+ prepareEmptySvcTargetGroup(httpd),
+ prepareEmptySvcTargetGroup(nginx),
+ },
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func prepareAllNsSvcDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(roleService, []string{corev1.NamespaceAll}, objects...)
+}
+
+func prepareSvcDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) {
+ return prepareDiscoverer(roleService, namespaces, objects...)
+}
+
+func newHTTPDClusterIPService() *corev1.Service {
+ return &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "httpd-cluster-ip-service",
+ Namespace: "default",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "httpd", "tier": "frontend"},
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443},
+ },
+ Type: corev1.ServiceTypeClusterIP,
+ ClusterIP: "10.100.0.1",
+ Selector: map[string]string{"app": "httpd", "tier": "frontend"},
+ },
+ }
+}
+
+func newNGINXClusterIPService() *corev1.Service {
+ return &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "nginx-cluster-ip-service",
+ Namespace: "default",
+ Annotations: map[string]string{"phase": "prod"},
+ Labels: map[string]string{"app": "nginx", "tier": "frontend"},
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80},
+ {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443},
+ },
+ Type: corev1.ServiceTypeClusterIP,
+ ClusterIP: "10.100.0.2",
+ Selector: map[string]string{"app": "nginx", "tier": "frontend"},
+ },
+ }
+}
+
+func newHTTPDHeadlessService() *corev1.Service {
+ svc := newHTTPDClusterIPService()
+ svc.Name = "httpd-headless-service"
+ svc.Spec.ClusterIP = ""
+ return svc
+}
+
+func newNGINXHeadlessService() *corev1.Service {
+ svc := newNGINXClusterIPService()
+ svc.Name = "nginx-headless-service"
+ svc.Spec.ClusterIP = ""
+ return svc
+}
+
+func prepareEmptySvcTargetGroup(svc *corev1.Service) *serviceTargetGroup {
+ return &serviceTargetGroup{source: serviceSource(svc)}
+}
+
+func prepareSvcTargetGroup(svc *corev1.Service) *serviceTargetGroup {
+ tgg := prepareEmptySvcTargetGroup(svc)
+
+ for _, port := range svc.Spec.Ports {
+ portNum := strconv.FormatInt(int64(port.Port), 10)
+ tgt := &ServiceTarget{
+ tuid: serviceTUID(svc, port),
+ Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum),
+ Namespace: svc.Namespace,
+ Name: svc.Name,
+ Annotations: mapAny(svc.Annotations),
+ Labels: mapAny(svc.Labels),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ ClusterIP: svc.Spec.ClusterIP,
+ ExternalName: svc.Spec.ExternalName,
+ Type: string(svc.Spec.Type),
+ }
+ tgt.hash = mustCalcHash(tgt)
+ tgt.Tags().Merge(discoveryTags)
+ tgg.targets = append(tgg.targets, tgt)
+ }
+
+ return tgg
+}
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go
new file mode 100644
index 000000000..db986b855
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go
@@ -0,0 +1,137 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "k8s.io/client-go/tools/cache"
+)
+
+const (
+ startWaitTimeout = time.Second * 3
+ finishWaitTimeout = time.Second * 5
+)
+
+type discoverySim struct {
+ td *KubeDiscoverer
+ runAfterSync func(ctx context.Context)
+ sortBeforeVerify bool
+ wantTargetGroups []model.TargetGroup
+}
+
+func (sim discoverySim) run(t *testing.T) []model.TargetGroup {
+ t.Helper()
+ require.NotNil(t, sim.td)
+ require.NotEmpty(t, sim.wantTargetGroups)
+
+ in, out := make(chan []model.TargetGroup), make(chan []model.TargetGroup)
+ go sim.collectTargetGroups(t, in, out)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ defer cancel()
+ go sim.td.Discover(ctx, in)
+
+ select {
+ case <-sim.td.started:
+ case <-time.After(startWaitTimeout):
+ t.Fatalf("td %s failed to start in %s", sim.td.discoverers, startWaitTimeout)
+ }
+
+ synced := cache.WaitForCacheSync(ctx.Done(), sim.td.hasSynced)
+ require.Truef(t, synced, "td %s failed to sync", sim.td.discoverers)
+
+ if sim.runAfterSync != nil {
+ sim.runAfterSync(ctx)
+ }
+
+ groups := <-out
+
+ if sim.sortBeforeVerify {
+ sortTargetGroups(groups)
+ }
+
+ sim.verifyResult(t, groups)
+ return groups
+}
+
+func (sim discoverySim) collectTargetGroups(t *testing.T, in, out chan []model.TargetGroup) {
+ var tggs []model.TargetGroup
+loop:
+ for {
+ select {
+ case inGroups := <-in:
+ if tggs = append(tggs, inGroups...); len(tggs) >= len(sim.wantTargetGroups) {
+ break loop
+ }
+ case <-time.After(finishWaitTimeout):
+ t.Logf("td %s timed out after %s, got %d groups, expected %d, some events are skipped",
+ sim.td.discoverers, finishWaitTimeout, len(tggs), len(sim.wantTargetGroups))
+ break loop
+ }
+ }
+ out <- tggs
+}
+
+func (sim discoverySim) verifyResult(t *testing.T, result []model.TargetGroup) {
+ var expected, actual any
+
+ if len(sim.wantTargetGroups) == len(result) {
+ expected = sim.wantTargetGroups
+ actual = result
+ } else {
+ want := make(map[string]model.TargetGroup)
+ for _, group := range sim.wantTargetGroups {
+ want[group.Source()] = group
+ }
+ got := make(map[string]model.TargetGroup)
+ for _, group := range result {
+ got[group.Source()] = group
+ }
+ expected, actual = want, got
+ }
+
+ assert.Equal(t, expected, actual)
+}
+
+type hasSynced interface {
+ hasSynced() bool
+}
+
+var (
+ _ hasSynced = &KubeDiscoverer{}
+ _ hasSynced = &podDiscoverer{}
+ _ hasSynced = &serviceDiscoverer{}
+)
+
+func (d *KubeDiscoverer) hasSynced() bool {
+ for _, disc := range d.discoverers {
+ v, ok := disc.(hasSynced)
+ if !ok || !v.hasSynced() {
+ return false
+ }
+ }
+ return true
+}
+
+func (p *podDiscoverer) hasSynced() bool {
+ return p.podInformer.HasSynced() && p.cmapInformer.HasSynced() && p.secretInformer.HasSynced()
+}
+
+func (s *serviceDiscoverer) hasSynced() bool {
+ return s.informer.HasSynced()
+}
+
+func sortTargetGroups(tggs []model.TargetGroup) {
+ if len(tggs) == 0 {
+ return
+ }
+ sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() })
+}