summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go209
1 files changed, 209 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
new file mode 100644
index 000000000..4cfdd62f1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go
@@ -0,0 +1,209 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+type serviceTargetGroup struct {
+ targets []model.Target
+ source string
+}
+
+func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" }
+func (s serviceTargetGroup) Source() string { return s.source }
+func (s serviceTargetGroup) Targets() []model.Target { return s.targets }
+
+type ServiceTarget struct {
+ model.Base `hash:"ignore"`
+
+ hash uint64
+ tuid string
+
+ Address string
+ Namespace string
+ Name string
+ Annotations map[string]any
+ Labels map[string]any
+ Port string
+ PortName string
+ PortProtocol string
+ ClusterIP string
+ ExternalName string
+ Type string
+}
+
+func (s ServiceTarget) Hash() uint64 { return s.hash }
+func (s ServiceTarget) TUID() string { return s.tuid }
+
+type serviceDiscoverer struct {
+ *logger.Logger
+ model.Base
+
+ informer cache.SharedInformer
+ queue *workqueue.Type
+}
+
+func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer {
+ if inf == nil {
+ panic("nil service informer")
+ }
+
+ queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"})
+ _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj any) { enqueue(queue, obj) },
+ UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
+ DeleteFunc: func(obj any) { enqueue(queue, obj) },
+ })
+
+ return &serviceDiscoverer{
+ Logger: log,
+ informer: inf,
+ queue: queue,
+ }
+}
+
+func (s *serviceDiscoverer) String() string {
+ return "k8s service"
+}
+
+func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) {
+ s.Info("instance is started")
+ defer s.Info("instance is stopped")
+ defer s.queue.ShutDown()
+
+ go s.informer.Run(ctx.Done())
+
+ if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
+ s.Error("failed to sync caches")
+ return
+ }
+
+ go s.run(ctx, ch)
+
+ <-ctx.Done()
+}
+
+func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
+ for {
+ item, shutdown := s.queue.Get()
+ if shutdown {
+ return
+ }
+
+ s.handleQueueItem(ctx, in, item)
+ }
+}
+
+func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
+ defer s.queue.Done(item)
+
+ key := item.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return
+ }
+
+ obj, exists, err := s.informer.GetStore().GetByKey(key)
+ if err != nil {
+ return
+ }
+
+ if !exists {
+ tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)}
+ send(ctx, in, tgg)
+ return
+ }
+
+ svc, err := toService(obj)
+ if err != nil {
+ return
+ }
+
+ tgg := s.buildTargetGroup(svc)
+
+ for _, tgt := range tgg.Targets() {
+ tgt.Tags().Merge(s.Tags())
+ }
+
+ send(ctx, in, tgg)
+}
+
+func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup {
+ // TODO: headless service?
+ if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 {
+ return &serviceTargetGroup{
+ source: serviceSource(svc),
+ }
+ }
+ return &serviceTargetGroup{
+ source: serviceSource(svc),
+ targets: s.buildTargets(svc),
+ }
+}
+
+func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) {
+ for _, port := range svc.Spec.Ports {
+ portNum := strconv.FormatInt(int64(port.Port), 10)
+ tgt := &ServiceTarget{
+ tuid: serviceTUID(svc, port),
+ Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum),
+ Namespace: svc.Namespace,
+ Name: svc.Name,
+ Annotations: mapAny(svc.Annotations),
+ Labels: mapAny(svc.Labels),
+ Port: portNum,
+ PortName: port.Name,
+ PortProtocol: string(port.Protocol),
+ ClusterIP: svc.Spec.ClusterIP,
+ ExternalName: svc.Spec.ExternalName,
+ Type: string(svc.Spec.Type),
+ }
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+ tgt.hash = hash
+
+ targets = append(targets, tgt)
+ }
+
+ return targets
+}
+
+func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string {
+ return fmt.Sprintf("%s_%s_%s_%s",
+ svc.Namespace,
+ svc.Name,
+ strings.ToLower(string(port.Protocol)),
+ strconv.FormatInt(int64(port.Port), 10),
+ )
+}
+
+func serviceSourceFromNsName(namespace, name string) string {
+ return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name)
+}
+
+func serviceSource(svc *corev1.Service) string {
+ return serviceSourceFromNsName(svc.Namespace, svc.Name)
+}
+
+func toService(obj any) (*corev1.Service, error) {
+ svc, ok := obj.(*corev1.Service)
+ if !ok {
+ return nil, fmt.Errorf("received unexpected object type: %T", obj)
+ }
+ return svc, nil
+}