diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go new file mode 100644 index 000000000..4cfdd62f1 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go @@ -0,0 +1,209 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type serviceTargetGroup struct { + targets []model.Target + source string +} + +func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" } +func (s serviceTargetGroup) Source() string { return s.source } +func (s serviceTargetGroup) Targets() []model.Target { return s.targets } + +type ServiceTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + Port string + PortName string + PortProtocol string + ClusterIP string + ExternalName string + Type string +} + +func (s ServiceTarget) Hash() uint64 { return s.hash } +func (s ServiceTarget) TUID() string { return s.tuid } + +type serviceDiscoverer struct { + *logger.Logger + model.Base + + informer cache.SharedInformer + queue *workqueue.Type +} + +func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer { + if inf == nil { + panic("nil service informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"}) + _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &serviceDiscoverer{ + Logger: log, + informer: inf, + queue: queue, + } +} + +func (s *serviceDiscoverer) String() string { + return "k8s service" +} + +func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) { + s.Info("instance is started") + defer s.Info("instance is stopped") + defer s.queue.ShutDown() + + go s.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) { + s.Error("failed to sync caches") + return + } + + go s.run(ctx, ch) + + <-ctx.Done() +} + +func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := s.queue.Get() + if shutdown { + return + } + + s.handleQueueItem(ctx, in, item) + } +} + +func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer s.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, exists, err := s.informer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !exists { + tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + svc, err := toService(obj) + if err != nil { + return + } + + tgg := s.buildTargetGroup(svc) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(s.Tags()) + } + + send(ctx, in, tgg) +} + +func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup { + // TODO: headless service? + if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 { + return &serviceTargetGroup{ + source: serviceSource(svc), + } + } + return &serviceTargetGroup{ + source: serviceSource(svc), + targets: s.buildTargets(svc), + } +} + +func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) { + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + + return targets +} + +func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string { + return fmt.Sprintf("%s_%s_%s_%s", + svc.Namespace, + svc.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatInt(int64(port.Port), 10), + ) +} + +func serviceSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name) +} + +func serviceSource(svc *corev1.Service) string { + return serviceSourceFromNsName(svc.Namespace, svc.Name) +} + +func toService(obj any) (*corev1.Service, error) { + svc, ok := obj.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return svc, nil +} |