diff options
Diffstat (limited to '')
4 files changed, 613 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go new file mode 100644 index 000000000..d3ff9f333 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go @@ -0,0 +1,235 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "fmt" + "log/slog" + "net" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + docker "github.com/docker/docker/client" + "github.com/ilyam8/hashstructure" +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "docker"), + ), + cfgSource: cfg.Source, + newDockerClient: func(addr string) (dockerClient, error) { + return docker.NewClientWithOpts(docker.WithHost(addr)) + }, + addr: docker.DefaultDockerHost, + listInterval: time.Second * 60, + timeout: time.Second * 2, + seenTggSources: make(map[string]bool), + started: make(chan struct{}), + } + + d.Tags().Merge(tags) + + if cfg.Timeout.Duration().Seconds() != 0 { + d.timeout = cfg.Timeout.Duration() + } + if cfg.Address != "" { + d.addr = cfg.Address + } + + return d, nil +} + +type Config struct { + Source string + + Tags string `yaml:"tags"` + Address string `yaml:"address"` + Timeout web.Duration `yaml:"timeout"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + dockerClient dockerClient + newDockerClient func(addr string) (dockerClient, error) + addr string + + cfgSource string + + listInterval time.Duration + timeout time.Duration + seenTggSources map[string]bool // [targetGroup.Source] + + started chan struct{} + } + dockerClient interface { + NegotiateAPIVersion(context.Context) + ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error) + Close() error + } +) + +func (d *Discoverer) String() string { + return "sd:docker" +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.cleanup(); d.Info("instance is stopped") }() + + close(d.started) + + if d.dockerClient == nil { + client, err := d.newDockerClient(d.addr) + if err != nil { + d.Errorf("error on creating docker client: %v", err) + return + } + d.dockerClient = client + } + + d.dockerClient.NegotiateAPIVersion(ctx) + + if err := d.listContainers(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.listInterval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.listContainers(ctx, in); err != nil { + d.Warning(err) + } + } + } +} + +func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error { + listCtx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + + containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{}) + if err != nil { + return err + } + + var tggs []model.TargetGroup + seen := make(map[string]bool) + + for _, cntr := range containers { + if tgg := d.buildTargetGroup(cntr); tgg != nil { + tggs = append(tggs, tgg) + seen[tgg.Source()] = true + } + } + + for src := range d.seenTggSources { + if !seen[src] { + tggs = append(tggs, &targetGroup{source: src}) + } + } + d.seenTggSources = seen + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup { + if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 { + return nil + } + + tgg := &targetGroup{ + source: cntrSource(cntr), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + for netDriver, network := range cntr.NetworkSettings.Networks { + // container with network mode host will be discovered by local-listeners + for _, port := range cntr.Ports { + tgt := &target{ + ID: cntr.ID, + Name: strings.TrimPrefix(cntr.Names[0], "/"), + Image: cntr.Image, + Command: cntr.Command, + Labels: mapAny(cntr.Labels), + PrivatePort: strconv.Itoa(int(port.PrivatePort)), + PublicPort: strconv.Itoa(int(port.PublicPort)), + PublicPortIP: port.IP, + PortProtocol: port.Type, + NetworkMode: cntr.HostConfig.NetworkMode, + NetworkDriver: netDriver, + IPAddress: network.IPAddress, + } + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func (d *Discoverer) cleanup() { + if d.dockerClient != nil { + _ = d.dockerClient.Close() + } +} + +func cntrSource(cntr types.Container) string { + name := strings.TrimPrefix(cntr.Names[0], "/") + return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image) +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go new file mode 100644 index 000000000..14ad5f920 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/dockerd_test.go @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/docker/docker/api/types" + typesNetwork "github.com/docker/docker/api/types/network" +) + +func TestDiscoverer_Discover(t *testing.T) { + tests := map[string]struct { + createSim func() *discoverySim + }{ + "add containers": { + createSim: func() *discoverySim { + nginx1 := prepareNginxContainer("nginx1") + nginx2 := prepareNginxContainer("nginx2") + + sim := &discoverySim{ + dockerCli: func(cli dockerCli, _ time.Duration) { + cli.addContainer(nginx1) + cli.addContainer(nginx2) + }, + wantGroups: []model.TargetGroup{ + &targetGroup{ + source: cntrSource(nginx1), + targets: []model.Target{ + withHash(&target{ + ID: nginx1.ID, + Name: nginx1.Names[0][1:], + Image: nginx1.Image, + Command: nginx1.Command, + Labels: mapAny(nginx1.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + &targetGroup{ + source: cntrSource(nginx2), + targets: []model.Target{ + withHash(&target{ + ID: nginx2.ID, + Name: nginx2.Names[0][1:], + Image: nginx2.Image, + Command: nginx2.Command, + Labels: mapAny(nginx2.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + }, + } + return sim + }, + }, + "remove containers": { + createSim: func() *discoverySim { + nginx1 := prepareNginxContainer("nginx1") + nginx2 := prepareNginxContainer("nginx2") + + sim := &discoverySim{ + dockerCli: func(cli dockerCli, interval time.Duration) { + cli.addContainer(nginx1) + cli.addContainer(nginx2) + time.Sleep(interval * 2) + cli.removeContainer(nginx1.ID) + }, + wantGroups: []model.TargetGroup{ + &targetGroup{ + source: cntrSource(nginx1), + targets: nil, + }, + &targetGroup{ + source: cntrSource(nginx2), + targets: []model.Target{ + withHash(&target{ + ID: nginx2.ID, + Name: nginx2.Names[0][1:], + Image: nginx2.Image, + Command: nginx2.Command, + Labels: mapAny(nginx2.Labels), + PrivatePort: "80", + PublicPort: "8080", + PublicPortIP: "0.0.0.0", + PortProtocol: "tcp", + NetworkMode: "default", + NetworkDriver: "bridge", + IPAddress: "192.0.2.0", + Address: "192.0.2.0:80", + }), + }, + }, + }, + } + return sim + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func prepareNginxContainer(name string) types.Container { + return types.Container{ + ID: "id-" + name, + Names: []string{"/" + name}, + Image: "nginx-image", + ImageID: "nginx-image-id", + Command: "nginx-command", + Ports: []types.Port{ + { + IP: "0.0.0.0", + PrivatePort: 80, + PublicPort: 8080, + Type: "tcp", + }, + }, + Labels: map[string]string{"key1": "value1"}, + HostConfig: struct { + NetworkMode string `json:",omitempty"` + }{ + NetworkMode: "default", + }, + NetworkSettings: &types.SummaryNetworkSettings{ + Networks: map[string]*typesNetwork.EndpointSettings{ + "bridge": {IPAddress: "192.0.2.0"}, + }, + }, + } +} + +func withHash(tgt *target) *target { + tgt.hash, _ = calcHash(tgt) + tags, _ := model.ParseTags("docker") + tgt.Tags().Merge(tags) + return tgt +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go new file mode 100644 index 000000000..7b0b76aba --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/sim_test.go @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "sort" + "sync" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type dockerCli interface { + addContainer(cntr types.Container) + removeContainer(id string) +} + +type discoverySim struct { + dockerCli func(cli dockerCli, interval time.Duration) + wantGroups []model.TargetGroup +} + +func (sim *discoverySim) run(t *testing.T) { + d, err := NewDiscoverer(Config{ + Source: "", + Tags: "docker", + }) + require.NoError(t, err) + + mock := newMockDockerd() + + d.newDockerClient = func(addr string) (dockerClient, error) { + return mock, nil + } + d.listInterval = time.Millisecond * 100 + + seen := make(map[string]model.TargetGroup) + ctx, cancel := context.WithCancel(context.Background()) + in := make(chan []model.TargetGroup) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + d.Discover(ctx, in) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case tggs := <-in: + for _, tgg := range tggs { + seen[tgg.Source()] = tgg + } + } + } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-d.started: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery failed to start") + } + + sim.dockerCli(mock, d.listInterval) + time.Sleep(time.Second) + + cancel() + + select { + case <-done: + case <-time.After(time.Second * 3): + require.Fail(t, "discovery hasn't finished after cancel") + } + + var tggs []model.TargetGroup + for _, tgg := range seen { + tggs = append(tggs, tgg) + } + + sortTargetGroups(tggs) + sortTargetGroups(sim.wantGroups) + + wantLen, gotLen := len(sim.wantGroups), len(tggs) + assert.Equalf(t, wantLen, gotLen, "different len (want %d got %d)", wantLen, gotLen) + assert.Equal(t, sim.wantGroups, tggs) + + assert.True(t, mock.negApiVerCalled, "NegotiateAPIVersion called") + assert.True(t, mock.closeCalled, "Close called") +} + +func newMockDockerd() *mockDockerd { + return &mockDockerd{ + containers: make(map[string]types.Container), + } +} + +type mockDockerd struct { + negApiVerCalled bool + closeCalled bool + mux sync.Mutex + containers map[string]types.Container +} + +func (m *mockDockerd) addContainer(cntr types.Container) { + m.mux.Lock() + defer m.mux.Unlock() + + m.containers[cntr.ID] = cntr +} + +func (m *mockDockerd) removeContainer(id string) { + m.mux.Lock() + defer m.mux.Unlock() + + delete(m.containers, id) +} + +func (m *mockDockerd) ContainerList(_ context.Context, _ typesContainer.ListOptions) ([]types.Container, error) { + m.mux.Lock() + defer m.mux.Unlock() + + var cntrs []types.Container + for _, cntr := range m.containers { + cntrs = append(cntrs, cntr) + } + + return cntrs, nil +} + +func (m *mockDockerd) NegotiateAPIVersion(_ context.Context) { + m.negApiVerCalled = true +} + +func (m *mockDockerd) Close() error { + m.closeCalled = true + return nil +} + +func sortTargetGroups(tggs []model.TargetGroup) { + if len(tggs) == 0 { + return + } + sort.Slice(tggs, func(i, j int) bool { return tggs[i].Source() < tggs[j].Source() }) +} diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go new file mode 100644 index 000000000..2422bc98e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/target.go @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "fmt" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" +) + +type targetGroup struct { + source string + targets []model.Target +} + +func (g *targetGroup) Provider() string { return "sd:docker" } +func (g *targetGroup) Source() string { return g.source } +func (g *targetGroup) Targets() []model.Target { return g.targets } + +type target struct { + model.Base + + hash uint64 + + ID string + Name string + Image string + Command string + Labels map[string]any + PrivatePort string // Port on the container + PublicPort string // Port exposed on the host + PublicPortIP string // Host IP address that the container's port is mapped to + PortProtocol string + NetworkMode string + NetworkDriver string + IPAddress string + + Address string // "IPAddress:PrivatePort" +} + +func (t *target) TUID() string { + if t.PublicPort != "" { + return fmt.Sprintf("%s_%s_%s_%s_%s_%s", + t.Name, t.IPAddress, t.PublicPortIP, t.PortProtocol, t.PublicPort, t.PrivatePort) + } + if t.PrivatePort != "" { + return fmt.Sprintf("%s_%s_%s_%s", + t.Name, t.IPAddress, t.PortProtocol, t.PrivatePort) + } + return fmt.Sprintf("%s_%s", t.Name, t.IPAddress) +} + +func (t *target) Hash() uint64 { + return t.hash +} |