diff options
Diffstat (limited to '')
16 files changed, 3639 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go new file mode 100644 index 000000000..d3ff9f333 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go @@ -0,0 +1,235 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "fmt" + "log/slog" + "net" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + docker "github.com/docker/docker/client" + "github.com/ilyam8/hashstructure" +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "docker"), + ), + cfgSource: cfg.Source, + newDockerClient: func(addr string) (dockerClient, error) { + return docker.NewClientWithOpts(docker.WithHost(addr)) + }, + addr: docker.DefaultDockerHost, + listInterval: time.Second * 60, + timeout: time.Second * 2, + seenTggSources: make(map[string]bool), + started: make(chan struct{}), + } + + d.Tags().Merge(tags) + + if cfg.Timeout.Duration().Seconds() != 0 { + d.timeout = cfg.Timeout.Duration() + } + if cfg.Address != "" { + d.addr = cfg.Address + } + + return d, nil +} + +type Config struct { + Source string + + Tags string `yaml:"tags"` + Address string `yaml:"address"` + Timeout web.Duration `yaml:"timeout"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + dockerClient dockerClient + newDockerClient func(addr string) (dockerClient, error) + addr string + + cfgSource string + + listInterval time.Duration + timeout time.Duration + seenTggSources map[string]bool // [targetGroup.Source] + + started chan struct{} + } + dockerClient interface { + NegotiateAPIVersion(context.Context) + ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error) + Close() error + } +) + +func (d *Discoverer) String() string { + return "sd:docker" +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.cleanup(); d.Info("instance is stopped") }() + + close(d.started) + + if d.dockerClient == nil { + client, err := d.newDockerClient(d.addr) + if err != nil { + d.Errorf("error on creating docker client: %v", err) + return + } + d.dockerClient = client + } + + d.dockerClient.NegotiateAPIVersion(ctx) + + if err := d.listContainers(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.listInterval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.listContainers(ctx, in); err != nil { + d.Warning(err) + } + } + } +} + +func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error { + listCtx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + + containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{}) + if err != nil { + return err + } + + var tggs []model.TargetGroup + seen := make(map[string]bool) + + for _, cntr := range containers { + if tgg := d.buildTargetGroup(cntr); tgg != nil { + tggs = append(tggs, tgg) + seen[tgg.Source()] = true + } + } + + for src := range d.seenTggSources { + if !seen[src] { + tggs = append(tggs, &targetGroup{source: src}) + } + } + d.seenTggSources = seen + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup { + if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 { + return nil + } + + tgg := &targetGroup{ + source: cntrSource(cntr), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + for netDriver, network := range cntr.NetworkSettings.Networks { + // container with network mode host will be discovered by local-listeners + for _, port := range cntr.Ports { + tgt := &target{ + ID: cntr.ID, + Name: strings.TrimPrefix(cntr.Names[0], "/"), + Image: cntr.Image, + Command: cntr.Command, + Labels: mapAny(cntr.Labels), + PrivatePort: strconv.Itoa(int(port.PrivatePort)), + PublicPort: strconv.Itoa(int(port.PublicPort)), + PublicPortIP: port.IP, + PortProtocol: port.Type, + NetworkMode: cntr.HostConfig.NetworkMode, + NetworkDriver: netDriver, + IPAddress: network.IPAddress, + } + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func (d *Discoverer) cleanup() { + if d.dockerClient != nil { + _ = d.dockerClient.Close() + } +} + +func cntrSource(cntr types.Container) string { + name := strings.TrimPrefix(cntr.Names[0], "/") + return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image) +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go new file mode 100644 index 000000000..14ad5f920 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/docker/docker/api/types" + typesNetwork "github.com/docker/docker/api/types/network" +) + +func TestDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() *discoverySim + }{ + "add containers": { + createSim: func() *discoverySim { + nginx1 := prepareNginxContainer("nginx1") + nginx2 := prepareNginxContainer("nginx2") + + sim := &discoverySim{ + dockerCli: func(cli dockerCli, _ time.Duration) { + cli.addContainer(nginx1) + cli.addContainer(nginx2) + }, + wantGroups: []model.TargetGroup{ + &targetGroup{ + source: cntrSource(nginx1), + targets: []model.Target{ + withHash(&target{ + ID: nginx1.ID, + Name: nginx1.Names[0][1:], + Image: nginx1.Image, + Command: nginx1.Command, + Labels: mapAny(nginx1.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + &targetGroup{ + source: cntrSource(nginx2), + targets: []model.Target{ + withHash(&target{ + ID: nginx2.ID, + Name: nginx2.Names[0][1:], + Image: nginx2.Image, + Command: nginx2.Command, + Labels: mapAny(nginx2.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + }, + } + return sim + }, + }, + "remove containers": { + createSim: func() *discoverySim { + nginx1 := prepareNginxContainer("nginx1") + nginx2 := prepareNginxContainer("nginx2") + + sim := &discoverySim{ + dockerCli: func(cli dockerCli, interval time.Duration) { + cli.addContainer(nginx1) + cli.addContainer(nginx2) + time.Sleep(interval * 2) + cli.removeContainer(nginx1.ID) + }, + wantGroups: []model.TargetGroup{ + &targetGroup{ + source: cntrSource(nginx1), + targets: nil, + }, + &targetGroup{ + source: cntrSource(nginx2), + targets: []model.Target{ + withHash(&target{ + ID: nginx2.ID, + Name: nginx2.Names[0][1:], + Image: nginx2.Image, + Command: nginx2.Command, + Labels: mapAny(nginx2.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + }, + } + return sim + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareNginxContainer(name string) types.Container { + return types.Container{ + ID: "id-" + name, + Names: []string{"/" + name}, + Image: "nginx-image", + ImageID: "nginx-image-id", + Command: "nginx-command", + Ports: []types.Port{ + { + IP: "0.0.0.0", + PrivatePort: 80, + PublicPort: 8080, + Type: "tcp", + }, + }, + Labels: map[string]string{"key1": "value1"}, + HostConfig: struct { + NetworkMode string `json:",omitempty"` + }{ + NetworkMode: "default", + }, + NetworkSettings: &types.SummaryNetworkSettings{ + Networks: map[string]*typesNetwork.EndpointSettings{ + "bridge": {IPAddress: "192.0.2.0"}, + }, + }, + } +} + +func withHash(tgt *target) *target { + tgt.hash, _ = calcHash(tgt) + tags, _ := model.ParseTags("docker") + tgt.Tags().Merge(tags) + return tgt +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go new file mode 100644 index 000000000..7b0b76aba --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "sort" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type dockerCli interface { + addContainer(cntr types.Container) + removeContainer(id string) +} + +type discoverySim struct { + dockerCli func(cli dockerCli, interval time.Duration) + wantGroups []model.TargetGroup +} + +func (sim *discoverySim) run(t *testing.T) { + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "docker", + }) + require.NoError(t, err) + + mock := newMockDockerd() + + d.newDockerClient = func(addr string) (dockerClient, error) { + return mock, nil + } + d.listInterval = time.Millisecond * 100 + + seen := make(map[string]model.TargetGroup) + ctx, cancel := context.WithCancel(context.Background()) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } + } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") + } + + sim.dockerCli(mock, d.listInterval) + time.Sleep(time.Second) + + cancel() + + select { + case <-done: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := len(sim.wantGroups), len(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) + + assert.True(t, mock.negApiVerCalled, "NegotiateAPIVersion called") + assert.True(t, mock.closeCalled, "Close called") +} + +func newMockDockerd() *mockDockerd { + return &mockDockerd{ + containers: make(map[string]types.Container), + } +} + +type mockDockerd struct { + negApiVerCalled bool + closeCalled bool + mux sync.Mutex + containers map[string]types.Container +} + +func (m *mockDockerd) addContainer(cntr types.Container) { + m.mux.Lock() + defer m.mux.Unlock() + + m.containers[cntr.ID] = cntr +} + +func (m *mockDockerd) removeContainer(id string) { + m.mux.Lock() + defer m.mux.Unlock() + + delete(m.containers, id) +} + +func (m *mockDockerd) ContainerList(_ context.Context, _ typesContainer.ListOptions) ([]types.Container, error) { + m.mux.Lock() + defer m.mux.Unlock() + + var cntrs []types.Container + for _, cntr := range m.containers { + cntrs = append(cntrs, cntr) + } + + return cntrs, nil +} + +func (m *mockDockerd) NegotiateAPIVersion(_ context.Context) { + m.negApiVerCalled = true +} + +func (m *mockDockerd) Close() error { + m.closeCalled = true + return nil +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go new file mode 100644 index 000000000..2422bc98e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "fmt" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" +) + +type targetGroup struct { + source string + targets []model.Target +} + +func (g *targetGroup) Provider() string { return "sd:docker" } +func (g *targetGroup) Source() string { return g.source } +func (g *targetGroup) Targets() []model.Target { return g.targets } + +type target struct { + model.Base + + hash uint64 + + ID string + Name string + Image string + Command string + Labels map[string]any + PrivatePort string // Port on the container + PublicPort string // Port exposed on the host + PublicPortIP string // Host IP address that the container's port is mapped to + PortProtocol string + NetworkMode string + NetworkDriver string + IPAddress string + + Address string // "IPAddress:PrivatePort" +} + +func (t *target) TUID() string { + if t.PublicPort != "" { + return fmt.Sprintf("%s_%s_%s_%s_%s_%s", + t.Name, t.IPAddress, t.PublicPortIP, t.PortProtocol, t.PublicPort, t.PrivatePort) + } + if t.PrivatePort != "" { + return fmt.Sprintf("%s_%s_%s_%s", + t.Name, t.IPAddress, t.PortProtocol, t.PrivatePort) + } + return fmt.Sprintf("%s_%s", t.Name, t.IPAddress) +} + +func (t *target) Hash() uint64 { + return t.hash +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go new file mode 100644 index 000000000..15a1e4745 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/config.go @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "errors" + "fmt" +) + +type Config struct { + APIServer string `yaml:"api_server"` // TODO: not used + Role string `yaml:"role"` + Tags string `yaml:"tags"` + Namespaces []string `yaml:"namespaces"` + Selector struct { + Label string `yaml:"label"` + Field string `yaml:"field"` + } `yaml:"selector"` + Pod struct { + LocalMode bool `yaml:"local_mode"` + } `yaml:"pod"` +} + +func validateConfig(cfg Config) error { + switch role(cfg.Role) { + case rolePod, roleService: + default: + return fmt.Errorf("unknown role: '%s'", cfg.Role) + } + if cfg.Tags == "" { + return errors.New("'tags' not set") + } + return nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go new file mode 100644 index 000000000..aa153a34a --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes.go @@ -0,0 +1,268 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "log/slog" + "os" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient" + + "github.com/ilyam8/hashstructure" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type role string + +const ( + rolePod role = "pod" + roleService role = "service" +) + +const ( + envNodeName = "MY_NODE_NAME" +) + +var log = logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "kubernetes"), +) + +func NewKubeDiscoverer(cfg Config) (*KubeDiscoverer, error) { + if err := validateConfig(cfg); err != nil { + return nil, fmt.Errorf("config validation: %v", err) + } + + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + client, err := k8sclient.New("Netdata/service-td") + if err != nil { + return nil, fmt.Errorf("create clientset: %v", err) + } + + ns := cfg.Namespaces + if len(ns) == 0 { + ns = []string{corev1.NamespaceAll} + } + + selectorField := cfg.Selector.Field + if role(cfg.Role) == rolePod && cfg.Pod.LocalMode { + name := os.Getenv(envNodeName) + if name == "" { + return nil, fmt.Errorf("local_mode is enabled, but env '%s' not set", envNodeName) + } + selectorField = joinSelectors(selectorField, "spec.nodeName="+name) + } + + d := &KubeDiscoverer{ + Logger: log, + client: client, + tags: tags, + role: role(cfg.Role), + namespaces: ns, + selectorLabel: cfg.Selector.Label, + selectorField: selectorField, + discoverers: make([]model.Discoverer, 0, len(ns)), + started: make(chan struct{}), + } + + return d, nil +} + +type KubeDiscoverer struct { + *logger.Logger + + client kubernetes.Interface + + tags model.Tags + role role + namespaces []string + selectorLabel string + selectorField string + discoverers []model.Discoverer + started chan struct{} +} + +func (d *KubeDiscoverer) String() string { + return "sd:k8s" +} + +const resyncPeriod = 10 * time.Minute + +func (d *KubeDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer d.Info("instance is stopped") + + for _, namespace := range d.namespaces { + var dd model.Discoverer + switch d.role { + case rolePod: + dd = d.setupPodDiscoverer(ctx, namespace) + case roleService: + dd = d.setupServiceDiscoverer(ctx, namespace) + default: + d.Errorf("unknown role: '%s'", d.role) + continue + } + d.discoverers = append(d.discoverers, dd) + } + + if len(d.discoverers) == 0 { + d.Error("no discoverers registered") + return + } + + d.Infof("registered: %v", d.discoverers) + + var wg sync.WaitGroup + updates := make(chan []model.TargetGroup) + + for _, disc := range d.discoverers { + wg.Add(1) + go func(disc model.Discoverer) { defer wg.Done(); disc.Discover(ctx, updates) }(disc) + } + + done := make(chan struct{}) + go func() { defer close(done); wg.Wait() }() + + close(d.started) + + for { + select { + case <-ctx.Done(): + select { + case <-done: + d.Info("all discoverers exited") + case <-time.After(time.Second * 5): + d.Warning("not all discoverers exited") + } + return + case <-done: + d.Info("all discoverers exited") + return + case tggs := <-updates: + select { + case <-ctx.Done(): + case in <- tggs: + } + } + } +} + +func (d *KubeDiscoverer) setupPodDiscoverer(ctx context.Context, ns string) *podDiscoverer { + pod := d.client.CoreV1().Pods(ns) + podLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return pod.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return pod.Watch(ctx, opts) + }, + } + + cmap := d.client.CoreV1().ConfigMaps(ns) + cmapLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return cmap.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return cmap.Watch(ctx, opts) + }, + } + + secret := d.client.CoreV1().Secrets(ns) + secretLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return secret.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return secret.Watch(ctx, opts) + }, + } + + td := newPodDiscoverer( + cache.NewSharedInformer(podLW, &corev1.Pod{}, resyncPeriod), + cache.NewSharedInformer(cmapLW, &corev1.ConfigMap{}, resyncPeriod), + cache.NewSharedInformer(secretLW, &corev1.Secret{}, resyncPeriod), + ) + td.Tags().Merge(d.tags) + + return td +} + +func (d *KubeDiscoverer) setupServiceDiscoverer(ctx context.Context, namespace string) *serviceDiscoverer { + svc := d.client.CoreV1().Services(namespace) + + svcLW := &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return svc.List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.FieldSelector = d.selectorField + opts.LabelSelector = d.selectorLabel + return svc.Watch(ctx, opts) + }, + } + + inf := cache.NewSharedInformer(svcLW, &corev1.Service{}, resyncPeriod) + + td := newServiceDiscoverer(inf) + td.Tags().Merge(d.tags) + + return td +} + +func enqueue(queue *workqueue.Type, obj any) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + queue.Add(key) +} + +func send(ctx context.Context, in chan<- []model.TargetGroup, tgg model.TargetGroup) { + if tgg == nil { + return + } + select { + case <-ctx.Done(): + case in <- []model.TargetGroup{tgg}: + } +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func joinSelectors(srs ...string) string { + var i int + for _, v := range srs { + if v != "" { + srs[i] = v + i++ + } + } + return strings.Join(srs[:i], ",") +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go new file mode 100644 index 000000000..9743a0af5 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/kubernetes_test.go @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "fmt" + "os" + "testing" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/pkg/k8sclient" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +var discoveryTags, _ = model.ParseTags("k8s") + +func TestMain(m *testing.M) { + _ = os.Setenv(envNodeName, "m01") + _ = os.Setenv(k8sclient.EnvFakeClient, "true") + code := m.Run() + _ = os.Unsetenv(envNodeName) + _ = os.Unsetenv(k8sclient.EnvFakeClient) + os.Exit(code) +} + +func TestNewKubeDiscoverer(t *testing.T) { + tests := map[string]struct { + cfg Config + wantErr bool + }{ + "pod role config": { + wantErr: false, + cfg: Config{Role: string(rolePod), Tags: "k8s"}, + }, + "service role config": { + wantErr: false, + cfg: Config{Role: string(roleService), Tags: "k8s"}, + }, + "empty config": { + wantErr: true, + cfg: Config{}, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + disc, err := NewKubeDiscoverer(test.cfg) + + if test.wantErr { + assert.Error(t, err) + assert.Nil(t, disc) + } else { + assert.NoError(t, err) + assert.NotNil(t, disc) + } + }) + } +} + +func TestKubeDiscoverer_Discover(t *testing.T) { + const prod = "prod" + const dev = "dev" + prodNamespace := newNamespace(prod) + devNamespace := newNamespace(dev) + + tests := map[string]struct { + createSim func() discoverySim + }{ + "multiple namespaces pod td": { + createSim: func() discoverySim { + httpdProd, nginxProd := newHTTPDPod(), newNGINXPod() + httpdProd.Namespace = prod + nginxProd.Namespace = prod + + httpdDev, nginxDev := newHTTPDPod(), newNGINXPod() + httpdDev.Namespace = dev + nginxDev.Namespace = dev + + disc, _ := preparePodDiscoverer( + []string{prod, dev}, + prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev) + + return discoverySim{ + td: disc, + sortBeforeVerify: true, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpdDev), + preparePodTargetGroup(nginxDev), + preparePodTargetGroup(httpdProd), + preparePodTargetGroup(nginxProd), + }, + } + }, + }, + "multiple namespaces ClusterIP service td": { + createSim: func() discoverySim { + httpdProd, nginxProd := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpdProd.Namespace = prod + nginxProd.Namespace = prod + + httpdDev, nginxDev := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpdDev.Namespace = dev + nginxDev.Namespace = dev + + disc, _ := prepareSvcDiscoverer( + []string{prod, dev}, + prodNamespace, devNamespace, httpdProd, nginxProd, httpdDev, nginxDev) + + return discoverySim{ + td: disc, + sortBeforeVerify: true, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpdDev), + prepareSvcTargetGroup(nginxDev), + prepareSvcTargetGroup(httpdProd), + prepareSvcTargetGroup(nginxProd), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareDiscoverer(role role, namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + client := fake.NewSimpleClientset(objects...) + tags, _ := model.ParseTags("k8s") + disc := &KubeDiscoverer{ + tags: tags, + role: role, + namespaces: namespaces, + client: client, + discoverers: nil, + started: make(chan struct{}), + } + return disc, client +} + +func newNamespace(name string) *corev1.Namespace { + return &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} +} + +func mustCalcHash(obj any) uint64 { + hash, err := calcHash(obj) + if err != nil { + panic(fmt.Sprintf("hash calculation: %v", err)) + } + return hash +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go new file mode 100644 index 000000000..a271e7285 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod.go @@ -0,0 +1,434 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type podTargetGroup struct { + targets []model.Target + source string +} + +func (p podTargetGroup) Provider() string { return "sd:k8s:pod" } +func (p podTargetGroup) Source() string { return p.source } +func (p podTargetGroup) Targets() []model.Target { return p.targets } + +type PodTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + NodeName string + PodIP string + ControllerName string + ControllerKind string + ContName string + Image string + Env map[string]any + Port string + PortName string + PortProtocol string +} + +func (p PodTarget) Hash() uint64 { return p.hash } +func (p PodTarget) TUID() string { return p.tuid } + +func newPodDiscoverer(pod, cmap, secret cache.SharedInformer) *podDiscoverer { + + if pod == nil || cmap == nil || secret == nil { + panic("nil pod or cmap or secret informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"}) + + _, _ = pod.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &podDiscoverer{ + Logger: log, + podInformer: pod, + cmapInformer: cmap, + secretInformer: secret, + queue: queue, + } +} + +type podDiscoverer struct { + *logger.Logger + model.Base + + podInformer cache.SharedInformer + cmapInformer cache.SharedInformer + secretInformer cache.SharedInformer + queue *workqueue.Type +} + +func (p *podDiscoverer) String() string { + return "sd:k8s:pod" +} + +func (p *podDiscoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + p.Info("instance is started") + defer p.Info("instance is stopped") + defer p.queue.ShutDown() + + go p.podInformer.Run(ctx.Done()) + go p.cmapInformer.Run(ctx.Done()) + go p.secretInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), + p.podInformer.HasSynced, p.cmapInformer.HasSynced, p.secretInformer.HasSynced) { + p.Error("failed to sync caches") + return + } + + go p.run(ctx, in) + + <-ctx.Done() +} + +func (p *podDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := p.queue.Get() + if shutdown { + return + } + p.handleQueueItem(ctx, in, item) + } +} + +func (p *podDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer p.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, ok, err := p.podInformer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !ok { + tgg := &podTargetGroup{source: podSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + pod, err := toPod(obj) + if err != nil { + return + } + + tgg := p.buildTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(p.Tags()) + } + + send(ctx, in, tgg) + +} + +func (p *podDiscoverer) buildTargetGroup(pod *corev1.Pod) model.TargetGroup { + if pod.Status.PodIP == "" || len(pod.Spec.Containers) == 0 { + return &podTargetGroup{ + source: podSource(pod), + } + } + return &podTargetGroup{ + source: podSource(pod), + targets: p.buildTargets(pod), + } +} + +func (p *podDiscoverer) buildTargets(pod *corev1.Pod) (targets []model.Target) { + var name, kind string + for _, ref := range pod.OwnerReferences { + if ref.Controller != nil && *ref.Controller { + name = ref.Name + kind = ref.Kind + break + } + } + + for _, container := range pod.Spec.Containers { + env := p.collectEnv(pod.Namespace, container) + + if len(container.Ports) == 0 { + tgt := &PodTarget{ + tuid: podTUID(pod, container), + Address: pod.Status.PodIP, + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } else { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: name, + ControllerKind: kind, + ContName: container.Name, + Image: container.Image, + Env: mapAny(env), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + } + } + + return targets +} + +func (p *podDiscoverer) collectEnv(ns string, container corev1.Container) map[string]string { + vars := make(map[string]string) + + // When a key exists in multiple sources, + // the value associated with the last source will take precedence. + // Values defined by an Env with a duplicate key will take precedence. + // + // Order (https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go) + // - envFrom: configMapRef, secretRef + // - env: value || valueFrom: fieldRef, resourceFieldRef, secretRef, configMap + + for _, src := range container.EnvFrom { + switch { + case src.ConfigMapRef != nil: + p.envFromConfigMap(vars, ns, src) + case src.SecretRef != nil: + p.envFromSecret(vars, ns, src) + } + } + + for _, env := range container.Env { + if env.Name == "" || isVar(env.Name) { + continue + } + switch { + case env.Value != "": + vars[env.Name] = env.Value + case env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil: + p.valueFromSecret(vars, ns, env) + case env.ValueFrom != nil && env.ValueFrom.ConfigMapKeyRef != nil: + p.valueFromConfigMap(vars, ns, env) + } + } + + if len(vars) == 0 { + return nil + } + return vars +} + +func (p *podDiscoverer) valueFromConfigMap(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.ConfigMapKeyRef.Name == "" || env.ValueFrom.ConfigMapKeyRef.Key == "" { + return + } + + sr := env.ValueFrom.ConfigMapKeyRef + key := ns + "/" + sr.Name + + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + if v, ok := cmap.Data[sr.Key]; ok { + vars[env.Name] = v + } +} + +func (p *podDiscoverer) valueFromSecret(vars map[string]string, ns string, env corev1.EnvVar) { + if env.ValueFrom.SecretKeyRef.Name == "" || env.ValueFrom.SecretKeyRef.Key == "" { + return + } + + secretKey := env.ValueFrom.SecretKeyRef + key := ns + "/" + secretKey.Name + + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + if v, ok := secret.Data[secretKey.Key]; ok { + vars[env.Name] = string(v) + } +} + +func (p *podDiscoverer) envFromConfigMap(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.ConfigMapRef.Name == "" { + return + } + + key := ns + "/" + src.ConfigMapRef.Name + item, exist, err := p.cmapInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + cmap, err := toConfigMap(item) + if err != nil { + return + } + + for k, v := range cmap.Data { + vars[src.Prefix+k] = v + } +} + +func (p *podDiscoverer) envFromSecret(vars map[string]string, ns string, src corev1.EnvFromSource) { + if src.SecretRef.Name == "" { + return + } + + key := ns + "/" + src.SecretRef.Name + item, exist, err := p.secretInformer.GetStore().GetByKey(key) + if err != nil || !exist { + return + } + + secret, err := toSecret(item) + if err != nil { + return + } + + for k, v := range secret.Data { + vars[src.Prefix+k] = string(v) + } +} + +func podTUID(pod *corev1.Pod, container corev1.Container) string { + return fmt.Sprintf("%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + ) +} + +func podTUIDWithPort(pod *corev1.Pod, container corev1.Container, port corev1.ContainerPort) string { + return fmt.Sprintf("%s_%s_%s_%s_%s", + pod.Namespace, + pod.Name, + container.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatUint(uint64(port.ContainerPort), 10), + ) +} + +func podSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=pod,namespace=%s,pod_name=%s", namespace, name) +} + +func podSource(pod *corev1.Pod) string { + return podSourceFromNsName(pod.Namespace, pod.Name) +} + +func toPod(obj any) (*corev1.Pod, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return pod, nil +} + +func toConfigMap(obj any) (*corev1.ConfigMap, error) { + cmap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return cmap, nil +} + +func toSecret(obj any) (*corev1.Secret, error) { + secret, ok := obj.(*corev1.Secret) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return secret, nil +} + +func isVar(name string) bool { + // Variable references $(VAR_NAME) are expanded using the previous defined + // environment variables in the container and any service environment + // variables. + return strings.IndexByte(name, '$') != -1 +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go new file mode 100644 index 000000000..ebe92d2f6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/pod_test.go @@ -0,0 +1,648 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func TestPodTargetGroup_Provider(t *testing.T) { + var p podTargetGroup + assert.NotEmpty(t, p.Provider()) +} + +func TestPodTargetGroup_Source(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantSources []string + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantSources: []string{ + "discoverer=k8s,kind=pod,namespace=default,pod_name=httpd-dd95c4d68-5bkwl", + "discoverer=k8s,kind=pod,namespace=default,pod_name=nginx-7cfd77469b-q6kxj", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var sources []string + for _, tgg := range sim.run(t) { + sources = append(sources, tgg.Source()) + } + + assert.Equal(t, test.wantSources, sources) + }) + } +} + +func TestPodTargetGroup_Targets(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTargets int + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantTargets: 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var targets int + for _, tgg := range sim.run(t) { + targets += len(tgg.Targets()) + } + + assert.Equal(t, test.wantTargets, targets) + }) + } +} + +func TestPodTarget_Hash(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantHashes []uint64 + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantHashes: []uint64{ + 12703169414253998055, + 13351713096133918928, + 8241692333761256175, + 11562466355572729519, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var hashes []uint64 + for _, tgg := range sim.run(t) { + for _, tg := range tgg.Targets() { + hashes = append(hashes, tg.Hash()) + } + } + + assert.Equal(t, test.wantHashes, hashes) + }) + } +} + +func TestPodTarget_TUID(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTUID []string + }{ + "pods with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + discovery, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: discovery, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + wantTUID: []string{ + "default_httpd-dd95c4d68-5bkwl_httpd_tcp_80", + "default_httpd-dd95c4d68-5bkwl_httpd_tcp_443", + "default_nginx-7cfd77469b-q6kxj_nginx_tcp_80", + "default_nginx-7cfd77469b-q6kxj_nginx_tcp_443", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var tuid []string + for _, tgg := range sim.run(t) { + for _, tg := range tgg.Targets() { + tuid = append(tuid, tg.TUID()) + } + } + + assert.Equal(t, test.wantTUID, tuid) + }) + } +} + +func TestNewPodDiscoverer(t *testing.T) { + tests := map[string]struct { + podInf cache.SharedInformer + cmapInf cache.SharedInformer + secretInf cache.SharedInformer + wantPanic bool + }{ + "valid informers": { + wantPanic: false, + podInf: cache.NewSharedInformer(nil, &corev1.Pod{}, resyncPeriod), + cmapInf: cache.NewSharedInformer(nil, &corev1.ConfigMap{}, resyncPeriod), + secretInf: cache.NewSharedInformer(nil, &corev1.Secret{}, resyncPeriod), + }, + "nil informers": { + wantPanic: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + f := func() { newPodDiscoverer(test.podInf, test.cmapInf, test.secretInf) } + + if test.wantPanic { + assert.Panics(t, f) + } else { + assert.NotPanics(t, f) + } + }) + } +} + +func TestPodDiscoverer_String(t *testing.T) { + var p podDiscoverer + assert.NotEmpty(t, p.String()) +} + +func TestPodDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + }{ + "ADD: pods exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + td, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: td, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "ADD: pods exist before run and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "DELETE: remove pods after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd, nginx) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _ = podClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + preparePodTargetGroup(nginx), + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "DELETE,ADD: remove and add pods after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + disc, client := prepareAllNsPodDiscoverer(httpd) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = podClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _, _ = podClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroup(httpd), + prepareEmptyPodTargetGroup(httpd), + preparePodTargetGroup(nginx), + }, + } + }, + }, + "ADD: pods with empty PodIP": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Status.PodIP = "" + nginx.Status.PodIP = "" + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "UPDATE: set pods PodIP after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Status.PodIP = "" + nginx.Status.PodIP = "" + disc, client := prepareAllNsPodDiscoverer(httpd, nginx) + podClient := client.CoreV1().Pods("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _, _ = podClient.Update(ctx, newHTTPDPod(), metav1.UpdateOptions{}) + _, _ = podClient.Update(ctx, newNGINXPod(), metav1.UpdateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + preparePodTargetGroup(newHTTPDPod()), + preparePodTargetGroup(newNGINXPod()), + }, + } + }, + }, + "ADD: pods without containers": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDPod(), newNGINXPod() + httpd.Spec.Containers = httpd.Spec.Containers[:0] + nginx.Spec.Containers = httpd.Spec.Containers[:0] + disc, _ := prepareAllNsPodDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptyPodTargetGroup(httpd), + prepareEmptyPodTargetGroup(nginx), + }, + } + }, + }, + "Env: from value": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + {Name: "key1", Value: "value1"}, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + + disc, _ := prepareAllNsPodDiscoverer(httpd) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "Env: from Secret": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + { + Name: "key1", + ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}, + Key: "key1", + }}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + secret := prepareSecret("my-secret", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, secret) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "Env: from ConfigMap": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.Env = []corev1.EnvVar{ + { + Name: "key1", + ValueFrom: &corev1.EnvVarSource{ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}, + Key: "key1", + }}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1"} + cmap := prepareConfigMap("my-cmap", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, cmap) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "EnvFrom: from ConfigMap": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.EnvFrom = []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-cmap"}}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1", "key2": "value2"} + cmap := prepareConfigMap("my-cmap", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, cmap) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + "EnvFrom: from Secret": { + createSim: func() discoverySim { + httpd := newHTTPDPod() + mangle := func(c *corev1.Container) { + c.EnvFrom = []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: "my-secret"}}, + }, + } + } + mangleContainers(httpd.Spec.Containers, mangle) + data := map[string]string{"key1": "value1", "key2": "value2"} + secret := prepareSecret("my-secret", data) + + disc, _ := prepareAllNsPodDiscoverer(httpd, secret) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + preparePodTargetGroupWithEnv(httpd, data), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareAllNsPodDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(rolePod, []string{corev1.NamespaceAll}, objects...) +} + +func preparePodDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(rolePod, namespaces, objects...) +} + +func mangleContainers(containers []corev1.Container, mange func(container *corev1.Container)) { + for i := range containers { + mange(&containers[i]) + } +} + +var controllerTrue = true + +func newHTTPDPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "httpd-dd95c4d68-5bkwl", + Namespace: "default", + UID: "1cebb6eb-0c1e-495b-8131-8fa3e6668dc8", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "httpd", "tier": "frontend"}, + OwnerReferences: []metav1.OwnerReference{ + {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue}, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "m01", + Containers: []corev1.Container{ + { + Name: "httpd", + Image: "httpd", + Ports: []corev1.ContainerPort{ + {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443}, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "172.17.0.1", + }, + } +} + +func newNGINXPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-7cfd77469b-q6kxj", + Namespace: "default", + UID: "09e883f2-d740-4c5f-970d-02cf02876522", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "nginx", "tier": "frontend"}, + OwnerReferences: []metav1.OwnerReference{ + {Name: "netdata-test", Kind: "DaemonSet", Controller: &controllerTrue}, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "m01", + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + Ports: []corev1.ContainerPort{ + {Name: "http", Protocol: corev1.ProtocolTCP, ContainerPort: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, ContainerPort: 443}, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "172.17.0.2", + }, + } +} + +func prepareConfigMap(name string, data map[string]string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8167" + name), + }, + Data: data, + } +} + +func prepareSecret(name string, data map[string]string) *corev1.Secret { + secretData := make(map[string][]byte, len(data)) + for k, v := range data { + secretData[k] = []byte(v) + } + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID("a03b8dc6-dc40-46dc-b571-5030e69d8161" + name), + }, + Data: secretData, + } +} + +func prepareEmptyPodTargetGroup(pod *corev1.Pod) *podTargetGroup { + return &podTargetGroup{source: podSource(pod)} +} + +func preparePodTargetGroup(pod *corev1.Pod) *podTargetGroup { + tgg := prepareEmptyPodTargetGroup(pod) + + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + portNum := strconv.FormatUint(uint64(port.ContainerPort), 10) + tgt := &PodTarget{ + tuid: podTUIDWithPort(pod, container, port), + Address: net.JoinHostPort(pod.Status.PodIP, portNum), + Namespace: pod.Namespace, + Name: pod.Name, + Annotations: mapAny(pod.Annotations), + Labels: mapAny(pod.Labels), + NodeName: pod.Spec.NodeName, + PodIP: pod.Status.PodIP, + ControllerName: "netdata-test", + ControllerKind: "DaemonSet", + ContName: container.Name, + Image: container.Image, + Env: nil, + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + } + tgt.hash = mustCalcHash(tgt) + tgt.Tags().Merge(discoveryTags) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func preparePodTargetGroupWithEnv(pod *corev1.Pod, env map[string]string) *podTargetGroup { + tgg := preparePodTargetGroup(pod) + + for _, tgt := range tgg.Targets() { + tgt.(*PodTarget).Env = mapAny(env) + tgt.(*PodTarget).hash = mustCalcHash(tgt) + } + + return tgg +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go new file mode 100644 index 000000000..4cfdd62f1 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go @@ -0,0 +1,209 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type serviceTargetGroup struct { + targets []model.Target + source string +} + +func (s serviceTargetGroup) Provider() string { return "sd:k8s:service" } +func (s serviceTargetGroup) Source() string { return s.source } +func (s serviceTargetGroup) Targets() []model.Target { return s.targets } + +type ServiceTarget struct { + model.Base `hash:"ignore"` + + hash uint64 + tuid string + + Address string + Namespace string + Name string + Annotations map[string]any + Labels map[string]any + Port string + PortName string + PortProtocol string + ClusterIP string + ExternalName string + Type string +} + +func (s ServiceTarget) Hash() uint64 { return s.hash } +func (s ServiceTarget) TUID() string { return s.tuid } + +type serviceDiscoverer struct { + *logger.Logger + model.Base + + informer cache.SharedInformer + queue *workqueue.Type +} + +func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer { + if inf == nil { + panic("nil service informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"}) + _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj any) { enqueue(queue, obj) }, + DeleteFunc: func(obj any) { enqueue(queue, obj) }, + }) + + return &serviceDiscoverer{ + Logger: log, + informer: inf, + queue: queue, + } +} + +func (s *serviceDiscoverer) String() string { + return "k8s service" +} + +func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) { + s.Info("instance is started") + defer s.Info("instance is stopped") + defer s.queue.ShutDown() + + go s.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) { + s.Error("failed to sync caches") + return + } + + go s.run(ctx, ch) + + <-ctx.Done() +} + +func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) { + for { + item, shutdown := s.queue.Get() + if shutdown { + return + } + + s.handleQueueItem(ctx, in, item) + } +} + +func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) { + defer s.queue.Done(item) + + key := item.(string) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + obj, exists, err := s.informer.GetStore().GetByKey(key) + if err != nil { + return + } + + if !exists { + tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)} + send(ctx, in, tgg) + return + } + + svc, err := toService(obj) + if err != nil { + return + } + + tgg := s.buildTargetGroup(svc) + + for _, tgt := range tgg.Targets() { + tgt.Tags().Merge(s.Tags()) + } + + send(ctx, in, tgg) +} + +func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup { + // TODO: headless service? + if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 { + return &serviceTargetGroup{ + source: serviceSource(svc), + } + } + return &serviceTargetGroup{ + source: serviceSource(svc), + targets: s.buildTargets(svc), + } +} + +func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) { + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + hash, err := calcHash(tgt) + if err != nil { + continue + } + tgt.hash = hash + + targets = append(targets, tgt) + } + + return targets +} + +func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string { + return fmt.Sprintf("%s_%s_%s_%s", + svc.Namespace, + svc.Name, + strings.ToLower(string(port.Protocol)), + strconv.FormatInt(int64(port.Port), 10), + ) +} + +func serviceSourceFromNsName(namespace, name string) string { + return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name) +} + +func serviceSource(svc *corev1.Service) string { + return serviceSourceFromNsName(svc.Namespace, svc.Name) +} + +func toService(obj any) (*corev1.Service, error) { + svc, ok := obj.(*corev1.Service) + if !ok { + return nil, fmt.Errorf("received unexpected object type: %T", obj) + } + return svc, nil +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go new file mode 100644 index 000000000..d2e496015 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service_test.go @@ -0,0 +1,456 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +func TestServiceTargetGroup_Provider(t *testing.T) { + var s serviceTargetGroup + assert.NotEmpty(t, s.Provider()) +} + +func TestServiceTargetGroup_Source(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantSources []string + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantSources: []string{ + "discoverer=k8s,kind=service,namespace=default,service_name=httpd-cluster-ip-service", + "discoverer=k8s,kind=service,namespace=default,service_name=nginx-cluster-ip-service", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var sources []string + for _, tgg := range sim.run(t) { + sources = append(sources, tgg.Source()) + } + + assert.Equal(t, test.wantSources, sources) + }) + } +} + +func TestServiceTargetGroup_Targets(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTargets int + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantTargets: 4, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var targets int + for _, tgg := range sim.run(t) { + targets += len(tgg.Targets()) + } + + assert.Equal(t, test.wantTargets, targets) + }) + } +} + +func TestServiceTarget_Hash(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantHashes []uint64 + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantHashes: []uint64{ + 17611803477081780974, + 6019985892433421258, + 4151907287549842238, + 5757608926096186119, + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var hashes []uint64 + for _, tgg := range sim.run(t) { + for _, tgt := range tgg.Targets() { + hashes = append(hashes, tgt.Hash()) + } + } + + assert.Equal(t, test.wantHashes, hashes) + }) + } +} + +func TestServiceTarget_TUID(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + wantTUID []string + }{ + "ClusterIP svc with multiple ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + wantTUID: []string{ + "default_httpd-cluster-ip-service_tcp_80", + "default_httpd-cluster-ip-service_tcp_443", + "default_nginx-cluster-ip-service_tcp_80", + "default_nginx-cluster-ip-service_tcp_443", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + + var tuid []string + for _, tgg := range sim.run(t) { + for _, tgt := range tgg.Targets() { + tuid = append(tuid, tgt.TUID()) + } + } + + assert.Equal(t, test.wantTUID, tuid) + }) + } +} + +func TestNewServiceDiscoverer(t *testing.T) { + tests := map[string]struct { + informer cache.SharedInformer + wantPanic bool + }{ + "valid informer": { + wantPanic: false, + informer: cache.NewSharedInformer(nil, &corev1.Service{}, resyncPeriod), + }, + "nil informer": { + wantPanic: true, + informer: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + f := func() { newServiceDiscoverer(test.informer) } + + if test.wantPanic { + assert.Panics(t, f) + } else { + assert.NotPanics(t, f) + } + }) + } +} + +func TestServiceDiscoverer_String(t *testing.T) { + var s serviceDiscoverer + assert.NotEmpty(t, s.String()) +} + +func TestServiceDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() discoverySim + }{ + "ADD: ClusterIP svc exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "ADD: ClusterIP svc exist before run and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "DELETE: ClusterIP svc remove after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd, nginx) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _ = svcClient.Delete(ctx, nginx.Name, metav1.DeleteOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + "ADD,DELETE: ClusterIP svc remove and add after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + disc, client := prepareAllNsSvcDiscoverer(httpd) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _ = svcClient.Delete(ctx, httpd.Name, metav1.DeleteOptions{}) + _, _ = svcClient.Create(ctx, nginx, metav1.CreateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareSvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(httpd), + prepareSvcTargetGroup(nginx), + }, + } + }, + }, + "ADD: Headless svc exist before run": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService() + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + "UPDATE: Headless => ClusterIP svc after sync": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDHeadlessService(), newNGINXHeadlessService() + httpdUpd, nginxUpd := *httpd, *nginx + httpdUpd.Spec.ClusterIP = "10.100.0.1" + nginxUpd.Spec.ClusterIP = "10.100.0.2" + disc, client := prepareAllNsSvcDiscoverer(httpd, nginx) + svcClient := client.CoreV1().Services("default") + + return discoverySim{ + td: disc, + runAfterSync: func(ctx context.Context) { + time.Sleep(time.Millisecond * 50) + _, _ = svcClient.Update(ctx, &httpdUpd, metav1.UpdateOptions{}) + _, _ = svcClient.Update(ctx, &nginxUpd, metav1.UpdateOptions{}) + }, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + prepareSvcTargetGroup(&httpdUpd), + prepareSvcTargetGroup(&nginxUpd), + }, + } + }, + }, + "ADD: ClusterIP svc with zero exposed ports": { + createSim: func() discoverySim { + httpd, nginx := newHTTPDClusterIPService(), newNGINXClusterIPService() + httpd.Spec.Ports = httpd.Spec.Ports[:0] + nginx.Spec.Ports = httpd.Spec.Ports[:0] + disc, _ := prepareAllNsSvcDiscoverer(httpd, nginx) + + return discoverySim{ + td: disc, + wantTargetGroups: []model.TargetGroup{ + prepareEmptySvcTargetGroup(httpd), + prepareEmptySvcTargetGroup(nginx), + }, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareAllNsSvcDiscoverer(objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(roleService, []string{corev1.NamespaceAll}, objects...) +} + +func prepareSvcDiscoverer(namespaces []string, objects ...runtime.Object) (*KubeDiscoverer, kubernetes.Interface) { + return prepareDiscoverer(roleService, namespaces, objects...) +} + +func newHTTPDClusterIPService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "httpd-cluster-ip-service", + Namespace: "default", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "httpd", "tier": "frontend"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443}, + }, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "10.100.0.1", + Selector: map[string]string{"app": "httpd", "tier": "frontend"}, + }, + } +} + +func newNGINXClusterIPService() *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-cluster-ip-service", + Namespace: "default", + Annotations: map[string]string{"phase": "prod"}, + Labels: map[string]string{"app": "nginx", "tier": "frontend"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "http", Protocol: corev1.ProtocolTCP, Port: 80}, + {Name: "https", Protocol: corev1.ProtocolTCP, Port: 443}, + }, + Type: corev1.ServiceTypeClusterIP, + ClusterIP: "10.100.0.2", + Selector: map[string]string{"app": "nginx", "tier": "frontend"}, + }, + } +} + +func newHTTPDHeadlessService() *corev1.Service { + svc := newHTTPDClusterIPService() + svc.Name = "httpd-headless-service" + svc.Spec.ClusterIP = "" + return svc +} + +func newNGINXHeadlessService() *corev1.Service { + svc := newNGINXClusterIPService() + svc.Name = "nginx-headless-service" + svc.Spec.ClusterIP = "" + return svc +} + +func prepareEmptySvcTargetGroup(svc *corev1.Service) *serviceTargetGroup { + return &serviceTargetGroup{source: serviceSource(svc)} +} + +func prepareSvcTargetGroup(svc *corev1.Service) *serviceTargetGroup { + tgg := prepareEmptySvcTargetGroup(svc) + + for _, port := range svc.Spec.Ports { + portNum := strconv.FormatInt(int64(port.Port), 10) + tgt := &ServiceTarget{ + tuid: serviceTUID(svc, port), + Address: net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum), + Namespace: svc.Namespace, + Name: svc.Name, + Annotations: mapAny(svc.Annotations), + Labels: mapAny(svc.Labels), + Port: portNum, + PortName: port.Name, + PortProtocol: string(port.Protocol), + ClusterIP: svc.Spec.ClusterIP, + ExternalName: svc.Spec.ExternalName, + Type: string(svc.Spec.Type), + } + tgt.hash = mustCalcHash(tgt) + tgt.Tags().Merge(discoveryTags) + tgg.targets = append(tgg.targets, tgt) + } + + return tgg +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go new file mode 100644 index 000000000..db986b855 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/sim_test.go @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package kubernetes + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/tools/cache" +) + +const ( + startWaitTimeout = time.Second * 3 + finishWaitTimeout = time.Second * 5 +) + +type discoverySim struct { + td *KubeDiscoverer + runAfterSync func(ctx context.Context) + sortBeforeVerify bool + wantTargetGroups []model.TargetGroup +} + +func (sim discoverySim) run(t *testing.T) []model.TargetGroup { + t.Helper() + require.NotNil(t, sim.td) + require.NotEmpty(t, sim.wantTargetGroups) + + in, out := make(chan []model.TargetGroup), make(chan []model.TargetGroup) + go sim.collectTargetGroups(t, in, out) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + go sim.td.Discover(ctx, in) + + select { + case <-sim.td.started: + case <-time.After(startWaitTimeout): + t.Fatalf("td %s failed to start in %s", sim.td.discoverers, startWaitTimeout) + } + + synced := cache.WaitForCacheSync(ctx.Done(), sim.td.hasSynced) + require.Truef(t, synced, "td %s failed to sync", sim.td.discoverers) + + if sim.runAfterSync != nil { + sim.runAfterSync(ctx) + } + + groups := <-out + + if sim.sortBeforeVerify { + sortTargetGroups(groups) + } + + sim.verifyResult(t, groups) + return groups +} + +func (sim discoverySim) collectTargetGroups(t *testing.T, in, out chan []model.TargetGroup) { + var tggs []model.TargetGroup +loop: + for { + select { + case inGroups := <-in: + if tggs = append(tggs, inGroups...); len(tggs) >= len(sim.wantTargetGroups) { + break loop + } + case <-time.After(finishWaitTimeout): + t.Logf("td %s timed out after %s, got %d groups, expected %d, some events are skipped", + sim.td.discoverers, finishWaitTimeout, len(tggs), len(sim.wantTargetGroups)) + break loop + } + } + out <- tggs +} + +func (sim discoverySim) verifyResult(t *testing.T, result []model.TargetGroup) { + var expected, actual any + + if len(sim.wantTargetGroups) == len(result) { + expected = sim.wantTargetGroups + actual = result + } else { + want := make(map[string]model.TargetGroup) + for _, group := range sim.wantTargetGroups { + want[group.Source()] = group + } + got := make(map[string]model.TargetGroup) + for _, group := range result { + got[group.Source()] = group + } + expected, actual = want, got + } + + assert.Equal(t, expected, actual) +} + +type hasSynced interface { + hasSynced() bool +} + +var ( + _ hasSynced = &KubeDiscoverer{} + _ hasSynced = &podDiscoverer{} + _ hasSynced = &serviceDiscoverer{} +) + +func (d *KubeDiscoverer) hasSynced() bool { + for _, disc := range d.discoverers { + v, ok := disc.(hasSynced) + if !ok || !v.hasSynced() { + return false + } + } + return true +} + +func (p *podDiscoverer) hasSynced() bool { + return p.podInformer.HasSynced() && p.cmapInformer.HasSynced() && p.secretInformer.HasSynced() +} + +func (s *serviceDiscoverer) hasSynced() bool { + return s.informer.HasSynced() +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go new file mode 100644 index 000000000..9897405cd --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go @@ -0,0 +1,321 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "log/slog" + "net" + "os" + "os/exec" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/agent/executable" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/ilyam8/hashstructure" +) + +var ( + shortName = "net_listeners" + fullName = fmt.Sprintf("sd:%s", shortName) +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + dir := os.Getenv("NETDATA_PLUGINS_DIR") + if dir == "" { + dir = executable.Directory + } + if dir == "" { + dir, _ = os.Getwd() + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", shortName), + ), + cfgSource: cfg.Source, + ll: &localListenersExec{ + binPath: filepath.Join(dir, "local-listeners"), + timeout: time.Second * 5, + }, + interval: time.Minute * 2, + expiryTime: time.Minute * 10, + cache: make(map[uint64]*cacheItem), + started: make(chan struct{}), + } + + d.Tags().Merge(tags) + + return d, nil +} + +type Config struct { + Source string `yaml:"-"` + Tags string `yaml:"tags"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + cfgSource string + + interval time.Duration + ll localListeners + + expiryTime time.Duration + cache map[uint64]*cacheItem // [target.Hash] + + started chan struct{} + } + cacheItem struct { + lastSeenTime time.Time + tgt model.Target + } + localListeners interface { + discover(ctx context.Context) ([]byte, error) + } +) + +func (d *Discoverer) String() string { + return fullName +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + close(d.started) + + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.interval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Warning(err) + return + } + } + } +} + +func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error { + bs, err := d.ll.discover(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return err + } + + tgts, err := d.parseLocalListeners(bs) + if err != nil { + return err + } + + tggs := d.processTargets(tgts) + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup { + tgg := &targetGroup{ + provider: fullName, + source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + if d.expiryTime.Milliseconds() == 0 { + tgg.targets = tgts + return []model.TargetGroup{tgg} + } + + now := time.Now() + + for _, tgt := range tgts { + hash := tgt.Hash() + if _, ok := d.cache[hash]; !ok { + d.cache[hash] = &cacheItem{tgt: tgt} + } + d.cache[hash].lastSeenTime = now + } + + for k, v := range d.cache { + if now.Sub(v.lastSeenTime) > d.expiryTime { + delete(d.cache, k) + continue + } + tgg.targets = append(tgg.targets, v.tgt) + } + + return []model.TargetGroup{tgg} +} + +func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { + const ( + local4 = "127.0.0.1" + local6 = "::1" + ) + + var targets []target + sc := bufio.NewScanner(bytes.NewReader(bs)) + + for sc.Scan() { + text := strings.TrimSpace(sc.Text()) + if text == "" { + continue + } + + // Protocol|IPAddress|Port|Cmdline + parts := strings.SplitN(text, "|", 4) + if len(parts) != 4 { + return nil, fmt.Errorf("unexpected data: '%s'", text) + } + + tgt := target{ + Protocol: parts[0], + IPAddress: parts[1], + Port: parts[2], + Comm: extractComm(parts[3]), + Cmdline: parts[3], + } + + if tgt.Comm == "docker-proxy" { + continue + } + + if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") { + tgt.IPAddress = local4 + } else if tgt.IPAddress == "::" { + tgt.IPAddress = local6 + } + + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + targets = append(targets, tgt) + } + + // order: TCP, TCP6, UDP, UDP6 + sort.Slice(targets, func(i, j int) bool { + tgt1, tgt2 := targets[i], targets[j] + if tgt1.Protocol != tgt2.Protocol { + return tgt1.Protocol < tgt2.Protocol + } + + p1, _ := strconv.Atoi(targets[i].Port) + p2, _ := strconv.Atoi(targets[j].Port) + if p1 != p2 { + return p1 < p2 + } + + return tgt1.IPAddress == local4 || tgt1.IPAddress == local6 + }) + + seen := make(map[string]bool) + tgts := make([]model.Target, len(targets)) + var n int + + for _, tgt := range targets { + tgt := tgt + + proto := strings.TrimSuffix(tgt.Protocol, "6") + key := tgt.Protocol + ":" + tgt.Address + keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port) + keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port) + + // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface (a.b.c.d). + // Create a target only for localhost. Assumption: any address always goes first. + if seen[key] || seen[keyLocal4] || seen[keyLocal6] { + continue + } + seen[key] = true + + tgts[n] = &tgt + n++ + } + + return tgts[:n], nil +} + +type localListenersExec struct { + binPath string + timeout time.Duration +} + +func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) { + execCtx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + + // TCPv4/6 and UPDv4 sockets in LISTEN state + // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c + args := []string{ + "no-udp6", + "no-local", + "no-inbound", + "no-outbound", + "no-namespaces", + } + + cmd := exec.CommandContext(execCtx, e.binPath, args...) + + bs, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("error on executing '%s': %v", cmd, err) + } + + return bs, nil +} + +func extractComm(cmdLine string) string { + i := strings.IndexByte(cmdLine, ' ') + if i <= 0 { + return cmdLine + } + _, comm := filepath.Split(cmdLine[:i]) + return comm +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go new file mode 100644 index 000000000..c3e3dcc69 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners_test.go @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" +) + +func TestDiscoverer_Discover(t *testing.T) { + tests := map[string]discoverySim{ + "add listeners": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|2001:DB8::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|0.0.0.0|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|192.0.2.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + cli.addListener("TCP6|::|80|/usr/sbin/apache2 -k start") + cli.addListener("TCP|0.0.0.0|80|/usr/sbin/apache2 -k start") + cli.addListener("TCP|0.0.0.0|8080|/usr/sbin/docker-proxy -proto tcp -host-ip 0.0.0.0 -host-port 8080 -container-ip 172.17.0.4 -container-port 80") + cli.addListener("TCP6|::|8080|/usr/sbin/docker-proxy -proto tcp -host-ip :: -host-port 8080 -container-ip 172.17.0.4 -container-port 80") + time.Sleep(interval * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "80", + Address: "127.0.0.1:80", + Comm: "apache2", + Cmdline: "/usr/sbin/apache2 -k start", + }), + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "8125", + Address: "127.0.0.1:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "UDP", + IPAddress: "127.0.0.1", + Port: "53768", + Address: "127.0.0.1:53768", + Comm: "go.d.plugin", + Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1", + }), + withHash(&target{ + Protocol: "UDP6", + IPAddress: "::1", + Port: "8125", + Address: "[::1]:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + }, + }}, + }, + "remove listeners; not expired": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "UDP6", + IPAddress: "::1", + Port: "8125", + Address: "[::1]:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "8125", + Address: "127.0.0.1:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + withHash(&target{ + Protocol: "UDP", + IPAddress: "127.0.0.1", + Port: "53768", + Address: "127.0.0.1:53768", + Comm: "go.d.plugin", + Cmdline: "/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1", + }), + }, + }}, + }, + "remove listeners; expired": { + listenersCli: func(cli listenersCli, interval, expiry time.Duration) { + cli.addListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("TCP|127.0.0.1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.addListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(interval * 2) + cli.removeListener("UDP6|::1|8125|/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D") + cli.removeListener("UDP|127.0.0.1|53768|/opt/netdata/usr/libexec/netdata/plugins.d/go.d.plugin 1") + time.Sleep(expiry * 2) + }, + wantGroups: []model.TargetGroup{&targetGroup{ + provider: "sd:net_listeners", + source: "discoverer=net_listeners,host=localhost", + targets: []model.Target{ + withHash(&target{ + Protocol: "TCP", + IPAddress: "127.0.0.1", + Port: "8125", + Address: "127.0.0.1:8125", + Comm: "netdata", + Cmdline: "/opt/netdata/usr/sbin/netdata -P /run/netdata/netdata.pid -D", + }), + }, + }}, + }, + } + + for name, sim := range tests { + t.Run(name, func(t *testing.T) { + sim.run(t) + }) + } +} + +func withHash(l *target) *target { + l.hash, _ = calcHash(l) + tags, _ := model.ParseTags("netlisteners") + l.Tags().Merge(tags) + return l +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go new file mode 100644 index 000000000..ad90f8278 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/sim_test.go @@ -0,0 +1,167 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "context" + "errors" + "slices" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type listenersCli interface { + addListener(s string) + removeListener(s string) +} + +type discoverySim struct { + listenersCli func(cli listenersCli, interval, expiry time.Duration) + wantGroups []model.TargetGroup +} + +func (sim *discoverySim) run(t *testing.T) { + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "netlisteners", + }) + require.NoError(t, err) + + mock := newMockLocalListenersExec() + + d.ll = mock + + d.interval = time.Millisecond * 100 + d.expiryTime = time.Second * 1 + + seen := make(map[string]model.TargetGroup) + ctx, cancel := context.WithCancel(context.Background()) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } + } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") + } + + sim.listenersCli(mock, d.interval, d.expiryTime) + + cancel() + + select { + case <-done: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := calcTargets(sim.wantGroups), calcTargets(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) +} + +func newMockLocalListenersExec() *mockLocalListenersExec { + return &mockLocalListenersExec{} +} + +type mockLocalListenersExec struct { + errResponse bool + mux sync.Mutex + listeners []string +} + +func (m *mockLocalListenersExec) addListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() + + m.listeners = append(m.listeners, s) +} + +func (m *mockLocalListenersExec) removeListener(s string) { + m.mux.Lock() + defer m.mux.Unlock() + + if i := slices.Index(m.listeners, s); i != -1 { + m.listeners = append(m.listeners[:i], m.listeners[i+1:]...) + } +} + +func (m *mockLocalListenersExec) discover(context.Context) ([]byte, error) { + if m.errResponse { + return nil, errors.New("mock discover() error") + } + + m.mux.Lock() + defer m.mux.Unlock() + + var buf strings.Builder + for _, s := range m.listeners { + buf.WriteString(s) + buf.WriteByte('\n') + } + + return []byte(buf.String()), nil +} + +func calcTargets(tggs []model.TargetGroup) int { + var n int + for _, tgg := range tggs { + n += len(tgg.Targets()) + } + return n +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) + + for idx := range tggs { + tgts := tggs[idx].Targets() + sort.Slice(tgts, func(i, j int) bool { return tgts[i].Hash() < tgts[j].Hash() }) + } +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go new file mode 100644 index 000000000..a36620f32 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/target.go @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "fmt" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" +) + +type targetGroup struct { + provider string + source string + targets []model.Target +} + +func (g *targetGroup) Provider() string { return g.provider } +func (g *targetGroup) Source() string { return g.source } +func (g *targetGroup) Targets() []model.Target { return g.targets } + +type target struct { + model.Base + + hash uint64 + + Protocol string + IPAddress string + Port string + Comm string + Cmdline string + + Address string // "IPAddress:Port" +} + +func (t *target) TUID() string { return tuid(t) } +func (t *target) Hash() uint64 { return t.hash } + +func tuid(tgt *target) string { + return fmt.Sprintf("%s_%s_%d", strings.ToLower(tgt.Protocol), tgt.Port, tgt.hash) +} |