summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go235
1 files changed, 235 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
new file mode 100644
index 000000000..d3ff9f333
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go
@@ -0,0 +1,235 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package dockerd
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/web"
+
+ "github.com/docker/docker/api/types"
+ typesContainer "github.com/docker/docker/api/types/container"
+ docker "github.com/docker/docker/client"
+ "github.com/ilyam8/hashstructure"
+)
+
+func NewDiscoverer(cfg Config) (*Discoverer, error) {
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ d := &Discoverer{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", "docker"),
+ ),
+ cfgSource: cfg.Source,
+ newDockerClient: func(addr string) (dockerClient, error) {
+ return docker.NewClientWithOpts(docker.WithHost(addr))
+ },
+ addr: docker.DefaultDockerHost,
+ listInterval: time.Second * 60,
+ timeout: time.Second * 2,
+ seenTggSources: make(map[string]bool),
+ started: make(chan struct{}),
+ }
+
+ d.Tags().Merge(tags)
+
+ if cfg.Timeout.Duration().Seconds() != 0 {
+ d.timeout = cfg.Timeout.Duration()
+ }
+ if cfg.Address != "" {
+ d.addr = cfg.Address
+ }
+
+ return d, nil
+}
+
+type Config struct {
+ Source string
+
+ Tags string `yaml:"tags"`
+ Address string `yaml:"address"`
+ Timeout web.Duration `yaml:"timeout"`
+}
+
+type (
+ Discoverer struct {
+ *logger.Logger
+ model.Base
+
+ dockerClient dockerClient
+ newDockerClient func(addr string) (dockerClient, error)
+ addr string
+
+ cfgSource string
+
+ listInterval time.Duration
+ timeout time.Duration
+ seenTggSources map[string]bool // [targetGroup.Source]
+
+ started chan struct{}
+ }
+ dockerClient interface {
+ NegotiateAPIVersion(context.Context)
+ ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error)
+ Close() error
+ }
+)
+
+func (d *Discoverer) String() string {
+ return "sd:docker"
+}
+
+func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.cleanup(); d.Info("instance is stopped") }()
+
+ close(d.started)
+
+ if d.dockerClient == nil {
+ client, err := d.newDockerClient(d.addr)
+ if err != nil {
+ d.Errorf("error on creating docker client: %v", err)
+ return
+ }
+ d.dockerClient = client
+ }
+
+ d.dockerClient.NegotiateAPIVersion(ctx)
+
+ if err := d.listContainers(ctx, in); err != nil {
+ d.Error(err)
+ return
+ }
+
+ tk := time.NewTicker(d.listInterval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.listContainers(ctx, in); err != nil {
+ d.Warning(err)
+ }
+ }
+ }
+}
+
+func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error {
+ listCtx, cancel := context.WithTimeout(ctx, d.timeout)
+ defer cancel()
+
+ containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{})
+ if err != nil {
+ return err
+ }
+
+ var tggs []model.TargetGroup
+ seen := make(map[string]bool)
+
+ for _, cntr := range containers {
+ if tgg := d.buildTargetGroup(cntr); tgg != nil {
+ tggs = append(tggs, tgg)
+ seen[tgg.Source()] = true
+ }
+ }
+
+ for src := range d.seenTggSources {
+ if !seen[src] {
+ tggs = append(tggs, &targetGroup{source: src})
+ }
+ }
+ d.seenTggSources = seen
+
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+
+ return nil
+}
+
+func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup {
+ if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 {
+ return nil
+ }
+
+ tgg := &targetGroup{
+ source: cntrSource(cntr),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ for netDriver, network := range cntr.NetworkSettings.Networks {
+ // container with network mode host will be discovered by local-listeners
+ for _, port := range cntr.Ports {
+ tgt := &target{
+ ID: cntr.ID,
+ Name: strings.TrimPrefix(cntr.Names[0], "/"),
+ Image: cntr.Image,
+ Command: cntr.Command,
+ Labels: mapAny(cntr.Labels),
+ PrivatePort: strconv.Itoa(int(port.PrivatePort)),
+ PublicPort: strconv.Itoa(int(port.PublicPort)),
+ PublicPortIP: port.IP,
+ PortProtocol: port.Type,
+ NetworkMode: cntr.HostConfig.NetworkMode,
+ NetworkDriver: netDriver,
+ IPAddress: network.IPAddress,
+ }
+ tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort)
+
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+
+ tgt.hash = hash
+ tgt.Tags().Merge(d.Tags())
+
+ tgg.targets = append(tgg.targets, tgt)
+ }
+ }
+
+ return tgg
+}
+
+func (d *Discoverer) cleanup() {
+ if d.dockerClient != nil {
+ _ = d.dockerClient.Close()
+ }
+}
+
+func cntrSource(cntr types.Container) string {
+ name := strings.TrimPrefix(cntr.Names[0], "/")
+ return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image)
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}
+
+func mapAny(src map[string]string) map[string]any {
+ if src == nil {
+ return nil
+ }
+ m := make(map[string]any, len(src))
+ for k, v := range src {
+ m[k] = v
+ }
+ return m
+}