summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go434
1 files changed, 434 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
new file mode 100644
index 000000000..a271e7285
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go
@@ -0,0 +1,434 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type podTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (p podTargetGroup) Provider() string { return "sd:k8s:pod" }
+func (p podTargetGroup) Source() string { return p.source }
+func (p podTargetGroup) Targets() []model.Target { return p.targets }
+
+type PodTarget struct {
+ model.Base `hash:"ignore"`
+
+ hash uint64
+ tuid string
+
+ Address string
+ Namespace string
+ Name string
+ Annotations map[string]any
+ Labels map[string]any
+ NodeName string
+ PodIP string
+ ControllerName string
+ ControllerKind string
+ ContName string
+ Image string
+ Env map[string]any
+ Port string
+ PortName string
+ PortProtocol string
+}
+
+func (p PodTarget) Hash() uint64 { return p.hash }
+func (p PodTarget) TUID() string { return p.tuid }
+
+func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer {
+
+ if pod == nil || cmap == nil || secret == nil {
+ panic("nil pod or cmap or secret informer")
+ }
+
+ queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"})
+
+ _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj any) { enqueue(queue, obj) },
+ UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
+ DeleteFunc: func(obj any) { enqueue(queue, obj) },
+ })
+
+ return &podDiscoverer{
+ Logger: log,
+ podInformer: pod,
+ cmapInformer: cmap,
+ secretInformer: secret,
+ queue: queue,
+ }
+}
+
+type podDiscoverer struct {
+ *logger.Logger
+ model.Base
+
+ podInformer cache.SharedInformer
+ cmapInformer cache.SharedInformer
+ secretInformer cache.SharedInformer
+ queue *workqueue.Type
+}
+
+func (p *podDiscoverer) String() string {
+ return "sd:k8s:pod"
+}
+
+func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ p.Info("instance is started")
+ defer p.Info("instance is stopped")
+ defer p.queue.ShutDown()
+
+ go p.podInformer.Run(ctx.Done())
+ go p.cmapInformer.Run(ctx.Done())
+ go p.secretInformer.Run(ctx.Done())
+
+ if !cache.WaitForCacheSync(ctx.Done(),
+ p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) {
+ p.Error("failed to sync caches")
+ return
+ }
+
+ go p.run(ctx, in)
+
+ <-ctx.Done()
+}
+
+func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
+ for {
+ item, shutdown := p.queue.Get()
+ if shutdown {
+ return
+ }
+ p.handleQueueItem(ctx, in, item)
+ }
+}
+
+func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
+ defer p.queue.Done(item)
+
+ key := item.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return
+ }
+
+ obj, ok, err := p.podInformer.GetStore().GetByKey(key)
+ if err != nil {
+ return
+ }
+
+ if !ok {
+ tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)}
+ send(ctx, in, tgg)
+ return
+ }
+
+ pod, err := toPod(obj)
+ if err != nil {
+ return
+ }
+
+ tgg := p.buildTargetGroup(pod)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(p.Tags())
+ }
+
+ send(ctx, in, tgg)
+
+}
+
+func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup {
+ if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 {
+ return &podTargetGroup{
+ source: podSource(pod),
+ }
+ }
+ return &podTargetGroup{
+ source: podSource(pod),
+ targets: p.buildTargets(pod),
+ }
+}
+
+func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) {
+ var name, kind string
+ for _, ref := range pod.OwnerReferences {
+ if ref.Controller != nil && *ref.Controller {
+ name = ref.Name
+ kind = ref.Kind
+ break
+ }
+ }
+
+ for _, container := range pod.Spec.Containers {
+ env := p.collectEnv(pod.Namespace, container)
+
+ if len(container.Ports) == 0 {
+ tgt := &PodTarget{
+ tuid: podTUID(pod, container),
+ Address: pod.Status.PodIP,
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: name,
+ ControllerKind: kind,
+ ContName: container.Name,
+ Image: container.Image,
+ Env: mapAny(env),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ } else {
+ for _, port := range container.Ports {
+ portNum := strconv.FormatUint(uint64(port.ContainerPort), 10)
+ tgt := &PodTarget{
+ tuid: podTUIDWithPort(pod, container, port),
+ Address: net.JoinHostPort(pod.Status.PodIP, portNum),
+ Namespace: pod.Namespace,
+ Name: pod.Name,
+ Annotations: mapAny(pod.Annotations),
+ Labels: mapAny(pod.Labels),
+ NodeName: pod.Spec.NodeName,
+ PodIP: pod.Status.PodIP,
+ ControllerName: name,
+ ControllerKind: kind,
+ ContName: container.Name,
+ Image: container.Image,
+ Env: mapAny(env),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ }
+ }
+ }
+
+ return targets
+}
+
+func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string {
+ vars := make(map[string]string)
+
+ // When a key exists in multiple sources,
+ // the value associated with the last source will take precedence.
+ // Values defined by an Env with a duplicate key will take precedence.
+ //
+ // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go)
+ // - envFrom: configMapRef, secretRef
+ // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap
+
+ for _, src := range container.EnvFrom {
+ switch {
+ case src.ConfigMapRef != nil:
+ p.envFromConfigMap(vars, ns, src)
+ case src.SecretRef != nil:
+ p.envFromSecret(vars, ns, src)
+ }
+ }
+
+ for _, env := range container.Env {
+ if env.Name == "" || isVar(env.Name) {
+ continue
+ }
+ switch {
+ case env.Value != "":
+ vars[env.Name] = env.Value
+ case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil:
+ p.valueFromSecret(vars, ns, env)
+ case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil:
+ p.valueFromConfigMap(vars, ns, env)
+ }
+ }
+
+ if len(vars) == 0 {
+ return nil
+ }
+ return vars
+}
+
+func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) {
+ if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" {
+ return
+ }
+
+ sr := env.ValueFrom.ConfigMapKeyRef
+ key := ns + "/" + sr.Name
+
+ item, exist, err := p.cmapInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ cmap, err := toConfigMap(item)
+ if err != nil {
+ return
+ }
+
+ if v, ok := cmap.Data[sr.Key]; ok {
+ vars[env.Name] = v
+ }
+}
+
+func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) {
+ if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" {
+ return
+ }
+
+ secretKey := env.ValueFrom.SecretKeyRef
+ key := ns + "/" + secretKey.Name
+
+ item, exist, err := p.secretInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ secret, err := toSecret(item)
+ if err != nil {
+ return
+ }
+
+ if v, ok := secret.Data[secretKey.Key]; ok {
+ vars[env.Name] = string(v)
+ }
+}
+
+func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) {
+ if src.ConfigMapRef.Name == "" {
+ return
+ }
+
+ key := ns + "/" + src.ConfigMapRef.Name
+ item, exist, err := p.cmapInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ cmap, err := toConfigMap(item)
+ if err != nil {
+ return
+ }
+
+ for k, v := range cmap.Data {
+ vars[src.Prefix+k] = v
+ }
+}
+
+func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) {
+ if src.SecretRef.Name == "" {
+ return
+ }
+
+ key := ns + "/" + src.SecretRef.Name
+ item, exist, err := p.secretInformer.GetStore().GetByKey(key)
+ if err != nil || !exist {
+ return
+ }
+
+ secret, err := toSecret(item)
+ if err != nil {
+ return
+ }
+
+ for k, v := range secret.Data {
+ vars[src.Prefix+k] = string(v)
+ }
+}
+
+func podTUID(pod *corev1.Pod, container corev1.Container) string {
+ return fmt.Sprintf("%s_%s_%s",
+ pod.Namespace,
+ pod.Name,
+ container.Name,
+ )
+}
+
+func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string {
+ return fmt.Sprintf("%s_%s_%s_%s_%s",
+ pod.Namespace,
+ pod.Name,
+ container.Name,
+ strings.ToLower(string(port.Protocol)),
+ strconv.FormatUint(uint64(port.ContainerPort), 10),
+ )
+}
+
+func podSourceFromNsName(namespace, name string) string {
+ return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name)
+}
+
+func podSource(pod *corev1.Pod) string {
+ return podSourceFromNsName(pod.Namespace, pod.Name)
+}
+
+func toPod(obj any) (*corev1.Pod, error) {
+ pod, ok := obj.(*corev1.Pod)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return pod, nil
+}
+
+func toConfigMap(obj any) (*corev1.ConfigMap, error) {
+ cmap, ok := obj.(*corev1.ConfigMap)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return cmap, nil
+}
+
+func toSecret(obj any) (*corev1.Secret, error) {
+ secret, ok := obj.(*corev1.Secret)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return secret, nil
+}
+
+func isVar(name string) bool {
+ // Variable references $(VAR_NAME) are expanded using the previous defined
+ // environment variables in the container and any service environment
+ // variables.
+ return strings.IndexByte(name, '$') != -1
+}
+
+func mapAny(src map[string]string) map[string]any {
+ if src == nil {
+ return nil
+ }
+ m := make(map[string]any, len(src))
+ for k, v := range src {
+ m[k] = v
+ }
+ return m
+}