diff options
Diffstat (limited to '')
8 files changed, 2346 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go new file mode 100644 index 000000000..15a1e4745 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "errors" + "fmt" +) + +type Config struct { + APIServer string `yaml:"api_server"` // TODO: not used + Role string `yaml:"role"` + Tags string `yaml:"tags"` + Namespaces []string `yaml:"namespaces"` + Selector struct { + Label string `yaml:"label"` + Field string `yaml:"field"` + } `yaml:"selector"` + Pod struct { + LocalMode bool `yaml:"local_mode"` + } `yaml:"pod"` +} + +func validateConfig(cfg Config) error { + switch role(cfg.Role) { + case rolePod, roleService: + default: + return fmt.Errorf("unknown role: '%s'", cfg.Role) + } + if cfg.Tags == "" { + return errors.New("'tags' not set") + } + return nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go new file mode 100644 index 000000000..aa153a34a --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go @@ -0,0 +1,268 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "os" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient" + + "github.com/ilyam8/hashstructure" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type role string + +const ( + rolePod role = "pod" + roleService role = "service" +) + +const ( + envNodeName = "MY_NODE_NAME" +) + +var log = logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "kubernetes"), +) + +func NewKubeDiscoverer(cfg Config) (*KubeDiscoverer, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("config validation: %v", err) + } + + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + client, err := k8sclient.New("Netdata/service-td") + if err != nil { + return nil, fmt.Errorf("create clientset: %v", err) + } + + ns := cfg.Namespaces + if len(ns) == 0 { + ns = []string{corev1.NamespaceAll} + } + + selectorField := cfg.Selector.Field + if role(cfg.Role) == rolePod && cfg.Pod.LocalMode { + name := os.Getenv(envNodeName) + if name == "" { + return nil, fmt.Errorf("local_mode is enabled, but env '%s' not set", envNodeName) + } + selectorField = joinSelectors(selectorField, "spec.nodeName="+name) + } + + d := &KubeDiscoverer{ + Logger: log, + client: client, + tags: tags, + role: role(cfg.Role), + namespaces: ns, + selectorLabel: cfg.Selector.Label, + selectorField: selectorField, + discoverers: make([]model.Discoverer, 0, len(ns)), + started: make(chan struct{}), + } + + return d, nil +} + +type KubeDiscoverer struct { + *logger.Logger + + client kubernetes.Interface + + tags model.Tags + role role + namespaces []string + selectorLabel string + selectorField string + discoverers []model.Discoverer + started chan struct{} +} + +func (d *KubeDiscoverer) String() string { + return "sd:k8s" +} + +const resyncPeriod = 10 * time.Minute + +func (d *KubeDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer d.Info("instance is stopped") + + for _, namespace := range d.namespaces { + var dd model.Discoverer + switch d.role { + case rolePod: + dd = d.setupPodDiscoverer(ctx, namespace) + case roleService: + dd = d.setupServiceDiscoverer(ctx, namespace) + default: + d.Errorf("unknown role: '%s'", d.role) + continue + } + d.discoverers = append(d.discoverers, dd) + } + + if len(d.discoverers) == 0 { + d.Error("no discoverers registered") + return + } + + d.Infof("registered: %v", d.discoverers) + + var wg sync.WaitGroup + updates := make(chan []model.TargetGroup) + + for _, disc := range d.discoverers { + wg.Add(1) + go func(disc model.Discoverer) { defer wg.Done(); disc.Discover(ctx, updates) }(disc) + } + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + close(d.started) + + for { + select { + case <-ctx.Done(): + select { + case <-done: + d.Info("all discoverers exited") + case <-time.After(time.Second * 5): + d.Warning("not all discoverers exited") + } + return + case <-done: + d.Info("all discoverers exited") + return + case tggs := <-updates: + select { + case <-ctx.Done(): + case in <- tggs: + } + } + } +} + +func (d *KubeDiscoverer) setupPodDiscoverer(ctx context.Context, ns string) *podDiscoverer { + pod := d.client.CoreV1().Pods(ns) + podLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return pod.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return pod.Watch(ctx, opts) + }, + } + + cmap := d.client.CoreV1().ConfigMaps(ns) + cmapLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return cmap.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return cmap.Watch(ctx, opts) + }, + } + + secret := d.client.CoreV1().Secrets(ns) + secretLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return secret.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return secret.Watch(ctx, opts) + }, + } + + td := newPodDiscoverer( + cache.NewSharedInformer(podLW, &corev1.Pod{}, resyncPeriod), + cache.NewSharedInformer(cmapLW, &corev1.ConfigMap{}, resyncPeriod), + cache.NewSharedInformer(secretLW, &corev1.Secret{}, resyncPeriod), + ) + td.Tags().Merge(d.tags) + + return td +} + +func (d *KubeDiscoverer) setupServiceDiscoverer(ctx context.Context, namespace string) *serviceDiscoverer { + svc := d.client.CoreV1().Services(namespace) + + svcLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return svc.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return svc.Watch(ctx, opts) + }, + } + + inf := cache.NewSharedInformer(svcLW, &corev1.Service{}, resyncPeriod) + + td := newServiceDiscoverer(inf) + td.Tags().Merge(d.tags) + + return td +} + +func enqueue(queue *workqueue.Type, obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + queue.Add(key) +} + +func send(ctx context.Context, in chan<- []model.TargetGroup, tgg model.TargetGroup) { + if tgg == nil { + return + } + select { + case <-ctx.Done(): + case in <- []model.TargetGroup{tgg}: + } +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func joinSelectors(srs ...string) string { + var i int + for _, v := range srs { + if v != "" { + srs[i] = v + i++ + } + } + return strings.Join(srs[:i], ",") +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go new file mode 100644 index 000000000..9743a0af5 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "fmt" + "os" + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +var discoveryTags, _ = model.ParseTags("k8s") + +func TestMain(m *testing.M) { + _ = os.Setenv(envNodeName, "m01") + _ = os.Setenv(k8sclient.EnvFakeClient, "true") + code := m.Run() + _ = os.Unsetenv(envNodeName) + _ = os.Unsetenv(k8sclient.EnvFakeClient) + os.Exit(code) +} + +func TestNewKubeDiscoverer(t *testing.T) { + tests := map[string]struct { + cfg Config + wantErr bool + }{ + "pod role config": { + wantErr: false, + cfg: Config{Role: string(rolePod), Tags: "k8s"}, + }, + "service role config": { + wantErr: false, + cfg: Config{Role: string(roleService), Tags: "k8s"}, + }, + "empty config": { + wantErr: true, + cfg: Config{}, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + disc, err := NewKubeDiscoverer(test.cfg) + + if test.wantErr { + assert.Error(t, err) + assert.Nil(t, disc) + } else { + assert.NoError(t, err) + assert.NotNil(t, disc) + } + }) + } +} + +func TestKubeDiscoverer_Discover(t *testing.T) { + const prod = "prod" + const dev = "dev" + prodNamespace := newNamespace(prod) + devNamespace := newNamespace(dev) + + tests := map[string]struct { + createSim func() discoverySim + }{ + "multiple namespaces pod td": { + createSim: func() discoverySim { + httpdProd, nginxProd := newHTTPDPod(), newNGINXPod() + httpdProd.Namespace = prod + nginxProd.Namespace = prod + + httpdDev, nginxDev := newHTTPDPod(), newNGINXPod() + httpdDev.Namespace = dev + nginxDev.Namespace = dev + + disc, _ := preparePodDiscoverer( + []string{prod, dev}, + prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev) + + return discoverySim{ + td: disc, + sortBeforeVerify: true, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpdDev), + preparePodTargetGroup(nginxDev), + preparePodTargetGroup(httpdProd), + preparePodTargetGroup(nginxProd), + }, + } + }, + }, + "multiple namespaces ClusterIP service td": { + createSim: func() discoverySim { + httpdProd, nginxProd := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpdProd.Namespace = prod + nginxProd.Namespace = prod + + httpdDev, nginxDev := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpdDev.Namespace = dev + nginxDev.Namespace = dev + + disc, _ := prepareSvcDiscoverer( + []string{prod, dev}, + prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev) + + return discoverySim{ + td: disc, + sortBeforeVerify: true, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpdDev), + prepareSvcTargetGroup(nginxDev), + prepareSvcTargetGroup(httpdProd), + prepareSvcTargetGroup(nginxProd), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareDiscoverer(role role, namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + client := fake.NewSimpleClientset(objects...) + tags, _ := model.ParseTags("k8s") + disc := &KubeDiscoverer{ + tags: tags, + role: role, + namespaces: namespaces, + client: client, + discoverers: nil, + started: make(chan struct{}), + } + return disc, client +} + +func newNamespace(name string) *corev1.Namespace { + return &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} +} + +func mustCalcHash(obj any) uint64 { + hash, err := calcHash(obj) + if err != nil { + panic(fmt.Sprintf("hash calculation: %v", err)) + } + return hash +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go new file mode 100644 index 000000000..a271e7285 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go @@ -0,0 +1,434 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type podTargetGroup struct { + targets []model.Target + source string +} + +func (p podTargetGroup) Provider() string { return "sd:k8s:pod" } +func (p podTargetGroup) Source() string { return p.source } +func (p podTargetGroup) Targets() []model.Target { return p.targets } + +type PodTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + NodeName string + PodIP string + ControllerName string + ControllerKind string + ContName string + Image string + Env map[string]any + Port string + PortName string + PortProtocol string +} + +func (p PodTarget) Hash() uint64 { return p.hash } +func (p PodTarget) TUID() string { return p.tuid } + +func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer { + + if pod == nil || cmap == nil || secret == nil { + panic("nil pod or cmap or secret informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"}) + + _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &podDiscoverer{ + Logger: log, + podInformer: pod, + cmapInformer: cmap, + secretInformer: secret, + queue: queue, + } +} + +type podDiscoverer struct { + *logger.Logger + model.Base + + podInformer cache.SharedInformer + cmapInformer cache.SharedInformer + secretInformer cache.SharedInformer + queue *workqueue.Type +} + +func (p *podDiscoverer) String() string { + return "sd:k8s:pod" +} + +func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + p.Info("instance is started") + defer p.Info("instance is stopped") + defer p.queue.ShutDown() + + go p.podInformer.Run(ctx.Done()) + go p.cmapInformer.Run(ctx.Done()) + go p.secretInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), + p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) { + p.Error("failed to sync caches") + return + } + + go p.run(ctx, in) + + <-ctx.Done() +} + +func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := p.queue.Get() + if shutdown { + return + } + p.handleQueueItem(ctx, in, item) + } +} + +func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer p.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, ok, err := p.podInformer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !ok { + tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + pod, err := toPod(obj) + if err != nil { + return + } + + tgg := p.buildTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(p.Tags()) + } + + send(ctx, in, tgg) + +} + +func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup { + if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 { + return &podTargetGroup{ + source: podSource(pod), + } + } + return &podTargetGroup{ + source: podSource(pod), + targets: p.buildTargets(pod), + } +} + +func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) { + var name, kind string + for _, ref := range pod.OwnerReferences { + if ref.Controller != nil && *ref.Controller { + name = ref.Name + kind = ref.Kind + break + } + } + + for _, container := range pod.Spec.Containers { + env := p.collectEnv(pod.Namespace, container) + + if len(container.Ports) == 0 { + tgt := &PodTarget{ + tuid: podTUID(pod, container), + Address: pod.Status.PodIP, + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } else { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + } + } + + return targets +} + +func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string { + vars := make(map[string]string) + + // When a key exists in multiple sources, + // the value associated with the last source will take precedence. + // Values defined by an Env with a duplicate key will take precedence. + // + // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go) + // - envFrom: configMapRef, secretRef + // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap + + for _, src := range container.EnvFrom { + switch { + case src.ConfigMapRef != nil: + p.envFromConfigMap(vars, ns, src) + case src.SecretRef != nil: + p.envFromSecret(vars, ns, src) + } + } + + for _, env := range container.Env { + if env.Name == "" || isVar(env.Name) { + continue + } + switch { + case env.Value != "": + vars[env.Name] = env.Value + case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil: + p.valueFromSecret(vars, ns, env) + case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil: + p.valueFromConfigMap(vars, ns, env) + } + } + + if len(vars) == 0 { + return nil + } + return vars +} + +func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" { + return + } + + sr := env.ValueFrom.ConfigMapKeyRef + key := ns + "/" + sr.Name + + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + if v, ok := cmap.Data[sr.Key]; ok { + vars[env.Name] = v + } +} + +func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" { + return + } + + secretKey := env.ValueFrom.SecretKeyRef + key := ns + "/" + secretKey.Name + + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + if v, ok := secret.Data[secretKey.Key]; ok { + vars[env.Name] = string(v) + } +} + +func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.ConfigMapRef.Name == "" { + return + } + + key := ns + "/" + src.ConfigMapRef.Name + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + for k, v := range cmap.Data { + vars[src.Prefix+k] = v + } +} + +func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.SecretRef.Name == "" { + return + } + + key := ns + "/" + src.SecretRef.Name + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + for k, v := range secret.Data { + vars[src.Prefix+k] = string(v) + } +} + +func podTUID(pod *corev1.Pod, container corev1.Container) string { + return fmt.Sprintf("%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + ) +} + +func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string { + return fmt.Sprintf("%s_%s_%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatUint(uint64(port.ContainerPort), 10), + ) +} + +func podSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name) +} + +func podSource(pod *corev1.Pod) string { + return podSourceFromNsName(pod.Namespace, pod.Name) +} + +func toPod(obj any) (*corev1.Pod, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return pod, nil +} + +func toConfigMap(obj any) (*corev1.ConfigMap, error) { + cmap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return cmap, nil +} + +func toSecret(obj any) (*corev1.Secret, error) { + secret, ok := obj.(*corev1.Secret) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return secret, nil +} + +func isVar(name string) bool { + // Variable references $(VAR_NAME) are expanded using the previous defined + // environment variables in the container and any service environment + // variables. + return strings.IndexByte(name, '$') != -1 +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go new file mode 100644 index 000000000..ebe92d2f6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go @@ -0,0 +1,648 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func TestPodTargetGroup_Provider(t *testing.T) { + var p podTargetGroup + assert.NotEmpty(t, p.Provider()) +} + +func TestPodTargetGroup_Source(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantSources []string + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantSources: []string{ + "discoverer=k8s,kind=pod,namespace=default,pod_name=httpd-dd95c4d68-5bkwl", + "discoverer=k8s,kind=pod,namespace=default,pod_name=nginx-7cfd77469b-q6kxj", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var sources []string + for _, tgg := range sim.run(t) { + sources = append(sources, tgg.Source()) + } + + assert.Equal(t, test.wantSources, sources) + }) + } +} + +func TestPodTargetGroup_Targets(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTargets int + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantTargets: 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var targets int + for _, tgg := range sim.run(t) { + targets += len(tgg.Targets()) + } + + assert.Equal(t, test.wantTargets, targets) + }) + } +} + +func TestPodTarget_Hash(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantHashes []uint64 + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantHashes: []uint64{ + 12703169414253998055, + 13351713096133918928, + 8241692333761256175, + 11562466355572729519, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var hashes []uint64 + for _, tgg := range sim.run(t) { + for _, tg := range tgg.Targets() { + hashes = append(hashes, tg.Hash()) + } + } + + assert.Equal(t, test.wantHashes, hashes) + }) + } +} + +func TestPodTarget_TUID(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTUID []string + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantTUID: []string{ + "default_httpd-dd95c4d68-5bkwl_httpd_tcp_80", + "default_httpd-dd95c4d68-5bkwl_httpd_tcp_443", + "default_nginx-7cfd77469b-q6kxj_nginx_tcp_80", + "default_nginx-7cfd77469b-q6kxj_nginx_tcp_443", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var tuid []string + for _, tgg := range sim.run(t) { + for _, tg := range tgg.Targets() { + tuid = append(tuid, tg.TUID()) + } + } + + assert.Equal(t, test.wantTUID, tuid) + }) + } +} + +func TestNewPodDiscoverer(t *testing.T) { + tests := map[string]struct { + podInf cache.SharedInformer + cmapInf cache.SharedInformer + secretInf cache.SharedInformer + wantPanic bool + }{ + "valid informers": { + wantPanic: false, + podInf: cache.NewSharedInformer(nil, &corev1.Pod{}, resyncPeriod), + cmapInf: cache.NewSharedInformer(nil, &corev1.ConfigMap{}, resyncPeriod), + secretInf: cache.NewSharedInformer(nil, &corev1.Secret{}, resyncPeriod), + }, + "nil informers": { + wantPanic: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + f := func() { newPodDiscoverer(test.podInf, test.cmapInf, test.secretInf) } + + if test.wantPanic { + assert.Panics(t, f) + } else { + assert.NotPanics(t, f) + } + }) + } +} + +func TestPodDiscoverer_String(t *testing.T) { + var p podDiscoverer + assert.NotEmpty(t, p.String()) +} + +func TestPodDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + }{ + "ADD: pods exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + td, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: td, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "ADD: pods exist before run and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "DELETE: remove pods after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd, nginx) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _ = podClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "DELETE,ADD: remove and add pods after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + prepareEmptyPodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "ADD: pods with empty PodIP": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Status.PodIP = "" + nginx.Status.PodIP = "" + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "UPDATE: set pods PodIP after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Status.PodIP = "" + nginx.Status.PodIP = "" + disc, client := prepareAllNsPodDiscoverer(httpd, nginx) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _, _ = podClient.Update(ctx, newHTTPDPod(), metav1.UpdateOptions{}) + _, _ = podClient.Update(ctx, newNGINXPod(), metav1.UpdateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + preparePodTargetGroup(newHTTPDPod()), + preparePodTargetGroup(newNGINXPod()), + }, + } + }, + }, + "ADD: pods without containers": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Spec.Containers = httpd.Spec.Containers[:0] + nginx.Spec.Containers = httpd.Spec.Containers[:0] + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "Env: from value": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + {Name: "key1", Value: "value1"}, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + + disc, _ := prepareAllNsPodDiscoverer(httpd) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "Env: from Secret": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + { + Name: "key1", + ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}, + Key: "key1", + }}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + secret := prepareSecret("my-secret", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, secret) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "Env: from ConfigMap": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + { + Name: "key1", + ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}, + Key: "key1", + }}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + cmap := prepareConfigMap("my-cmap", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, cmap) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "EnvFrom: from ConfigMap": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.EnvFrom = []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1", "key2": "value2"} + cmap := prepareConfigMap("my-cmap", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, cmap) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "EnvFrom: from Secret": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.EnvFrom = []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1", "key2": "value2"} + secret := prepareSecret("my-secret", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, secret) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareAllNsPodDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(rolePod, []string{corev1.NamespaceAll}, objects...) +} + +func preparePodDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(rolePod, namespaces, objects...) +} + +func mangleContainers(containers []corev1.Container, mange func(container *corev1.Container)) { + for i := range containers { + mange(&containers[i]) + } +} + +var controllerTrue = true + +func newHTTPDPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "httpd-dd95c4d68-5bkwl", + Namespace: "default", + UID: "1cebb6eb-0c1e-495b-8131-8fa3e6668dc8", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "httpd", "tier": "frontend"}, + OwnerReferences: []metav1.OwnerReference{ + {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue}, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "m01", + Containers: []corev1.Container{ + { + Name: "httpd", + Image: "httpd", + Ports: []corev1.ContainerPort{ + {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443}, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "172.17.0.1", + }, + } +} + +func newNGINXPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-7cfd77469b-q6kxj", + Namespace: "default", + UID: "09e883f2-d740-4c5f-970d-02cf02876522", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "nginx", "tier": "frontend"}, + OwnerReferences: []metav1.OwnerReference{ + {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue}, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "m01", + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + Ports: []corev1.ContainerPort{ + {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443}, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "172.17.0.2", + }, + } +} + +func prepareConfigMap(name string, data map[string]string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8167" + name), + }, + Data: data, + } +} + +func prepareSecret(name string, data map[string]string) *corev1.Secret { + secretData := make(map[string][]byte, len(data)) + for k, v := range data { + secretData[k] = []byte(v) + } + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8161" + name), + }, + Data: secretData, + } +} + +func prepareEmptyPodTargetGroup(pod *corev1.Pod) *podTargetGroup { + return &podTargetGroup{source: podSource(pod)} +} + +func preparePodTargetGroup(pod *corev1.Pod) *podTargetGroup { + tgg := prepareEmptyPodTargetGroup(pod) + + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: "netdata-test", + ControllerKind: "DaemonSet", + ContName: container.Name, + Image: container.Image, + Env: nil, + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + tgt.hash = mustCalcHash(tgt) + tgt.Tags().Merge(discoveryTags) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func preparePodTargetGroupWithEnv(pod *corev1.Pod, env map[string]string) *podTargetGroup { + tgg := preparePodTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.(*PodTarget).Env = mapAny(env) + tgt.(*PodTarget).hash = mustCalcHash(tgt) + } + + return tgg +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go new file mode 100644 index 000000000..4cfdd62f1 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go @@ -0,0 +1,209 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type serviceTargetGroup struct { + targets []model.Target + source string +} + +func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" } +func (s serviceTargetGroup) Source() string { return s.source } +func (s serviceTargetGroup) Targets() []model.Target { return s.targets } + +type ServiceTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + Port string + PortName string + PortProtocol string + ClusterIP string + ExternalName string + Type string +} + +func (s ServiceTarget) Hash() uint64 { return s.hash } +func (s ServiceTarget) TUID() string { return s.tuid } + +type serviceDiscoverer struct { + *logger.Logger + model.Base + + informer cache.SharedInformer + queue *workqueue.Type +} + +func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer { + if inf == nil { + panic("nil service informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"}) + _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &serviceDiscoverer{ + Logger: log, + informer: inf, + queue: queue, + } +} + +func (s *serviceDiscoverer) String() string { + return "k8s service" +} + +func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) { + s.Info("instance is started") + defer s.Info("instance is stopped") + defer s.queue.ShutDown() + + go s.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) { + s.Error("failed to sync caches") + return + } + + go s.run(ctx, ch) + + <-ctx.Done() +} + +func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := s.queue.Get() + if shutdown { + return + } + + s.handleQueueItem(ctx, in, item) + } +} + +func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer s.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, exists, err := s.informer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !exists { + tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + svc, err := toService(obj) + if err != nil { + return + } + + tgg := s.buildTargetGroup(svc) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(s.Tags()) + } + + send(ctx, in, tgg) +} + +func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup { + // TODO: headless service? + if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 { + return &serviceTargetGroup{ + source: serviceSource(svc), + } + } + return &serviceTargetGroup{ + source: serviceSource(svc), + targets: s.buildTargets(svc), + } +} + +func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) { + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + + return targets +} + +func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string { + return fmt.Sprintf("%s_%s_%s_%s", + svc.Namespace, + svc.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatInt(int64(port.Port), 10), + ) +} + +func serviceSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name) +} + +func serviceSource(svc *corev1.Service) string { + return serviceSourceFromNsName(svc.Namespace, svc.Name) +} + +func toService(obj any) (*corev1.Service, error) { + svc, ok := obj.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return svc, nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go new file mode 100644 index 000000000..d2e496015 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go @@ -0,0 +1,456 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func TestServiceTargetGroup_Provider(t *testing.T) { + var s serviceTargetGroup + assert.NotEmpty(t, s.Provider()) +} + +func TestServiceTargetGroup_Source(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantSources []string + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantSources: []string{ + "discoverer=k8s,kind=service,namespace=default,service_name=httpd-cluster-ip-service", + "discoverer=k8s,kind=service,namespace=default,service_name=nginx-cluster-ip-service", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var sources []string + for _, tgg := range sim.run(t) { + sources = append(sources, tgg.Source()) + } + + assert.Equal(t, test.wantSources, sources) + }) + } +} + +func TestServiceTargetGroup_Targets(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTargets int + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantTargets: 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var targets int + for _, tgg := range sim.run(t) { + targets += len(tgg.Targets()) + } + + assert.Equal(t, test.wantTargets, targets) + }) + } +} + +func TestServiceTarget_Hash(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantHashes []uint64 + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantHashes: []uint64{ + 17611803477081780974, + 6019985892433421258, + 4151907287549842238, + 5757608926096186119, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var hashes []uint64 + for _, tgg := range sim.run(t) { + for _, tgt := range tgg.Targets() { + hashes = append(hashes, tgt.Hash()) + } + } + + assert.Equal(t, test.wantHashes, hashes) + }) + } +} + +func TestServiceTarget_TUID(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTUID []string + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantTUID: []string{ + "default_httpd-cluster-ip-service_tcp_80", + "default_httpd-cluster-ip-service_tcp_443", + "default_nginx-cluster-ip-service_tcp_80", + "default_nginx-cluster-ip-service_tcp_443", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var tuid []string + for _, tgg := range sim.run(t) { + for _, tgt := range tgg.Targets() { + tuid = append(tuid, tgt.TUID()) + } + } + + assert.Equal(t, test.wantTUID, tuid) + }) + } +} + +func TestNewServiceDiscoverer(t *testing.T) { + tests := map[string]struct { + informer cache.SharedInformer + wantPanic bool + }{ + "valid informer": { + wantPanic: false, + informer: cache.NewSharedInformer(nil, &corev1.Service{}, resyncPeriod), + }, + "nil informer": { + wantPanic: true, + informer: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + f := func() { newServiceDiscoverer(test.informer) } + + if test.wantPanic { + assert.Panics(t, f) + } else { + assert.NotPanics(t, f) + } + }) + } +} + +func TestServiceDiscoverer_String(t *testing.T) { + var s serviceDiscoverer + assert.NotEmpty(t, s.String()) +} + +func TestServiceDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + }{ + "ADD: ClusterIP svc exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "ADD: ClusterIP svc exist before run and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "DELETE: ClusterIP svc remove after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd, nginx) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _ = svcClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + "ADD,DELETE: ClusterIP svc remove and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "ADD: Headless svc exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + "UPDATE: Headless => ClusterIP svc after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService() + httpdUpd, nginxUpd := *httpd, *nginx + httpdUpd.Spec.ClusterIP = "10.100.0.1" + nginxUpd.Spec.ClusterIP = "10.100.0.2" + disc, client := prepareAllNsSvcDiscoverer(httpd, nginx) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _, _ = svcClient.Update(ctx, &httpdUpd, metav1.UpdateOptions{}) + _, _ = svcClient.Update(ctx, &nginxUpd, metav1.UpdateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + prepareSvcTargetGroup(&httpdUpd), + prepareSvcTargetGroup(&nginxUpd), + }, + } + }, + }, + "ADD: ClusterIP svc with zero exposed ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpd.Spec.Ports = httpd.Spec.Ports[:0] + nginx.Spec.Ports = httpd.Spec.Ports[:0] + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareAllNsSvcDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(roleService, []string{corev1.NamespaceAll}, objects...) +} + +func prepareSvcDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(roleService, namespaces, objects...) +} + +func newHTTPDClusterIPService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "httpd-cluster-ip-service", + Namespace: "default", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "httpd", "tier": "frontend"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443}, + }, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "10.100.0.1", + Selector: map[string]string{"app": "httpd", "tier": "frontend"}, + }, + } +} + +func newNGINXClusterIPService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-cluster-ip-service", + Namespace: "default", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "nginx", "tier": "frontend"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443}, + }, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "10.100.0.2", + Selector: map[string]string{"app": "nginx", "tier": "frontend"}, + }, + } +} + +func newHTTPDHeadlessService() *corev1.Service { + svc := newHTTPDClusterIPService() + svc.Name = "httpd-headless-service" + svc.Spec.ClusterIP = "" + return svc +} + +func newNGINXHeadlessService() *corev1.Service { + svc := newNGINXClusterIPService() + svc.Name = "nginx-headless-service" + svc.Spec.ClusterIP = "" + return svc +} + +func prepareEmptySvcTargetGroup(svc *corev1.Service) *serviceTargetGroup { + return &serviceTargetGroup{source: serviceSource(svc)} +} + +func prepareSvcTargetGroup(svc *corev1.Service) *serviceTargetGroup { + tgg := prepareEmptySvcTargetGroup(svc) + + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + tgt.hash = mustCalcHash(tgt) + tgt.Tags().Merge(discoveryTags) + tgg.targets = append(tgg.targets, tgt) + } + + return tgg +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go new file mode 100644 index 000000000..db986b855 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/tools/cache" +) + +const ( + startWaitTimeout = time.Second * 3 + finishWaitTimeout = time.Second * 5 +) + +type discoverySim struct { + td *KubeDiscoverer + runAfterSync func(ctx context.Context) + sortBeforeVerify bool + wantTargetGroups []model.TargetGroup +} + +func (sim discoverySim) run(t *testing.T) []model.TargetGroup { + t.Helper() + require.NotNil(t, sim.td) + require.NotEmpty(t, sim.wantTargetGroups) + + in, out := make(chan []model.TargetGroup), make(chan []model.TargetGroup) + go sim.collectTargetGroups(t, in, out) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + go sim.td.Discover(ctx, in) + + select { + case <-sim.td.started: + case <-time.After(startWaitTimeout): + t.Fatalf("td %s failed to start in %s", sim.td.discoverers, startWaitTimeout) + } + + synced := cache.WaitForCacheSync(ctx.Done(), sim.td.hasSynced) + require.Truef(t, synced, "td %s failed to sync", sim.td.discoverers) + + if sim.runAfterSync != nil { + sim.runAfterSync(ctx) + } + + groups := <-out + + if sim.sortBeforeVerify { + sortTargetGroups(groups) + } + + sim.verifyResult(t, groups) + return groups +} + +func (sim discoverySim) collectTargetGroups(t *testing.T, in, out chan []model.TargetGroup) { + var tggs []model.TargetGroup +loop: + for { + select { + case inGroups := <-in: + if tggs = append(tggs, inGroups...); len(tggs) >= len(sim.wantTargetGroups) { + break loop + } + case <-time.After(finishWaitTimeout): + t.Logf("td %s timed out after %s, got %d groups, expected %d, some events are skipped", + sim.td.discoverers, finishWaitTimeout, len(tggs), len(sim.wantTargetGroups)) + break loop + } + } + out <- tggs +} + +func (sim discoverySim) verifyResult(t *testing.T, result []model.TargetGroup) { + var expected, actual any + + if len(sim.wantTargetGroups) == len(result) { + expected = sim.wantTargetGroups + actual = result + } else { + want := make(map[string]model.TargetGroup) + for _, group := range sim.wantTargetGroups { + want[group.Source()] = group + } + got := make(map[string]model.TargetGroup) + for _, group := range result { + got[group.Source()] = group + } + expected, actual = want, got + } + + assert.Equal(t, expected, actual) +} + +type hasSynced interface { + hasSynced() bool +} + +var ( + _ hasSynced = &KubeDiscoverer{} + _ hasSynced = &podDiscoverer{} + _ hasSynced = &serviceDiscoverer{} +) + +func (d *KubeDiscoverer) hasSynced() bool { + for _, disc := range d.discoverers { + v, ok := disc.(hasSynced) + if !ok || !v.hasSynced() { + return false + } + } + return true +} + +func (p *podDiscoverer) hasSynced() bool { + return p.podInformer.HasSynced() && p.cmapInformer.HasSynced() && p.secretInformer.HasSynced() +} + +func (s *serviceDiscoverer) hasSynced() bool { + return s.informer.HasSynced() +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) +} |