diff options
Diffstat (limited to 'src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go')
-rw-r--r-- | src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go new file mode 100644 index 000000000..617081742 --- /dev/null +++ b/src/go/plugin/go.d/agent/discovery/sd/discoverer/kubernetes/pod.go @@ -0,0 +1,434 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type podTargetGroup struct { + targets []model.Target + source string +} + +func (p podTargetGroup) Provider() string { return "sd:k8s:pod" } +func (p podTargetGroup) Source() string { return p.source } +func (p podTargetGroup) Targets() []model.Target { return p.targets } + +type PodTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + NodeName string + PodIP string + ControllerName string + ControllerKind string + ContName string + Image string + Env map[string]any + Port string + PortName string + PortProtocol string +} + +func (p PodTarget) Hash() uint64 { return p.hash } +func (p PodTarget) TUID() string { return p.tuid } + +func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer { + + if pod == nil || cmap == nil || secret == nil { + panic("nil pod or cmap or secret informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"}) + + _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &podDiscoverer{ + Logger: log, + podInformer: pod, + cmapInformer: cmap, + secretInformer: secret, + queue: queue, + } +} + +type podDiscoverer struct { + *logger.Logger + model.Base + + podInformer cache.SharedInformer + cmapInformer cache.SharedInformer + secretInformer cache.SharedInformer + queue *workqueue.Type +} + +func (p *podDiscoverer) String() string { + return "sd:k8s:pod" +} + +func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + p.Info("instance is started") + defer p.Info("instance is stopped") + defer p.queue.ShutDown() + + go p.podInformer.Run(ctx.Done()) + go p.cmapInformer.Run(ctx.Done()) + go p.secretInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), + p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) { + p.Error("failed to sync caches") + return + } + + go p.run(ctx, in) + + <-ctx.Done() +} + +func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := p.queue.Get() + if shutdown { + return + } + p.handleQueueItem(ctx, in, item) + } +} + +func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer p.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, ok, err := p.podInformer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !ok { + tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + pod, err := toPod(obj) + if err != nil { + return + } + + tgg := p.buildTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(p.Tags()) + } + + send(ctx, in, tgg) + +} + +func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup { + if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 { + return &podTargetGroup{ + source: podSource(pod), + } + } + return &podTargetGroup{ + source: podSource(pod), + targets: p.buildTargets(pod), + } +} + +func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) { + var name, kind string + for _, ref := range pod.OwnerReferences { + if ref.Controller != nil && *ref.Controller { + name = ref.Name + kind = ref.Kind + break + } + } + + for _, container := range pod.Spec.Containers { + env := p.collectEnv(pod.Namespace, container) + + if len(container.Ports) == 0 { + tgt := &PodTarget{ + tuid: podTUID(pod, container), + Address: pod.Status.PodIP, + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } else { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + } + } + + return targets +} + +func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string { + vars := make(map[string]string) + + // When a key exists in multiple sources, + // the value associated with the last source will take precedence. + // Values defined by an Env with a duplicate key will take precedence. + // + // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go) + // - envFrom: configMapRef, secretRef + // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap + + for _, src := range container.EnvFrom { + switch { + case src.ConfigMapRef != nil: + p.envFromConfigMap(vars, ns, src) + case src.SecretRef != nil: + p.envFromSecret(vars, ns, src) + } + } + + for _, env := range container.Env { + if env.Name == "" || isVar(env.Name) { + continue + } + switch { + case env.Value != "": + vars[env.Name] = env.Value + case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil: + p.valueFromSecret(vars, ns, env) + case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil: + p.valueFromConfigMap(vars, ns, env) + } + } + + if len(vars) == 0 { + return nil + } + return vars +} + +func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" { + return + } + + sr := env.ValueFrom.ConfigMapKeyRef + key := ns + "/" + sr.Name + + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + if v, ok := cmap.Data[sr.Key]; ok { + vars[env.Name] = v + } +} + +func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" { + return + } + + secretKey := env.ValueFrom.SecretKeyRef + key := ns + "/" + secretKey.Name + + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + if v, ok := secret.Data[secretKey.Key]; ok { + vars[env.Name] = string(v) + } +} + +func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.ConfigMapRef.Name == "" { + return + } + + key := ns + "/" + src.ConfigMapRef.Name + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + for k, v := range cmap.Data { + vars[src.Prefix+k] = v + } +} + +func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.SecretRef.Name == "" { + return + } + + key := ns + "/" + src.SecretRef.Name + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + for k, v := range secret.Data { + vars[src.Prefix+k] = string(v) + } +} + +func podTUID(pod *corev1.Pod, container corev1.Container) string { + return fmt.Sprintf("%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + ) +} + +func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string { + return fmt.Sprintf("%s_%s_%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatUint(uint64(port.ContainerPort), 10), + ) +} + +func podSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name) +} + +func podSource(pod *corev1.Pod) string { + return podSourceFromNsName(pod.Namespace, pod.Name) +} + +func toPod(obj any) (*corev1.Pod, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return pod, nil +} + +func toConfigMap(obj any) (*corev1.ConfigMap, error) { + cmap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return cmap, nil +} + +func toSecret(obj any) (*corev1.Secret, error) { + secret, ok := obj.(*corev1.Secret) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return secret, nil +} + +func isVar(name string) bool { + // Variable references $(VAR_NAME) are expanded using the previous defined + // environment variables in the container and any service environment + // variables. + return strings.IndexByte(name, '$') != -1 +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} |