diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go new file mode 100644 index 000000000..d3ff9f333 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/dockerd/docker.go @@ -0,0 +1,235 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package dockerd + +import ( + "context" + "fmt" + "log/slog" + "net" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/netdata/netdata/go/go.d.plugin/pkg/web" + + "github.com/docker/docker/api/types" + typesContainer "github.com/docker/docker/api/types/container" + docker "github.com/docker/docker/client" + "github.com/ilyam8/hashstructure" +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", "docker"), + ), + cfgSource: cfg.Source, + newDockerClient: func(addr string) (dockerClient, error) { + return docker.NewClientWithOpts(docker.WithHost(addr)) + }, + addr: docker.DefaultDockerHost, + listInterval: time.Second * 60, + timeout: time.Second * 2, + seenTggSources: make(map[string]bool), + started: make(chan struct{}), + } + + d.Tags().Merge(tags) + + if cfg.Timeout.Duration().Seconds() != 0 { + d.timeout = cfg.Timeout.Duration() + } + if cfg.Address != "" { + d.addr = cfg.Address + } + + return d, nil +} + +type Config struct { + Source string + + Tags string `yaml:"tags"` + Address string `yaml:"address"` + Timeout web.Duration `yaml:"timeout"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + dockerClient dockerClient + newDockerClient func(addr string) (dockerClient, error) + addr string + + cfgSource string + + listInterval time.Duration + timeout time.Duration + seenTggSources map[string]bool // [targetGroup.Source] + + started chan struct{} + } + dockerClient interface { + NegotiateAPIVersion(context.Context) + ContainerList(context.Context, typesContainer.ListOptions) ([]types.Container, error) + Close() error + } +) + +func (d *Discoverer) String() string { + return "sd:docker" +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.cleanup(); d.Info("instance is stopped") }() + + close(d.started) + + if d.dockerClient == nil { + client, err := d.newDockerClient(d.addr) + if err != nil { + d.Errorf("error on creating docker client: %v", err) + return + } + d.dockerClient = client + } + + d.dockerClient.NegotiateAPIVersion(ctx) + + if err := d.listContainers(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.listInterval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.listContainers(ctx, in); err != nil { + d.Warning(err) + } + } + } +} + +func (d *Discoverer) listContainers(ctx context.Context, in chan<- []model.TargetGroup) error { + listCtx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + + containers, err := d.dockerClient.ContainerList(listCtx, typesContainer.ListOptions{}) + if err != nil { + return err + } + + var tggs []model.TargetGroup + seen := make(map[string]bool) + + for _, cntr := range containers { + if tgg := d.buildTargetGroup(cntr); tgg != nil { + tggs = append(tggs, tgg) + seen[tgg.Source()] = true + } + } + + for src := range d.seenTggSources { + if !seen[src] { + tggs = append(tggs, &targetGroup{source: src}) + } + } + d.seenTggSources = seen + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) buildTargetGroup(cntr types.Container) model.TargetGroup { + if len(cntr.Names) == 0 || cntr.NetworkSettings == nil || len(cntr.NetworkSettings.Networks) == 0 { + return nil + } + + tgg := &targetGroup{ + source: cntrSource(cntr), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + for netDriver, network := range cntr.NetworkSettings.Networks { + // container with network mode host will be discovered by local-listeners + for _, port := range cntr.Ports { + tgt := &target{ + ID: cntr.ID, + Name: strings.TrimPrefix(cntr.Names[0], "/"), + Image: cntr.Image, + Command: cntr.Command, + Labels: mapAny(cntr.Labels), + PrivatePort: strconv.Itoa(int(port.PrivatePort)), + PublicPort: strconv.Itoa(int(port.PublicPort)), + PublicPortIP: port.IP, + PortProtocol: port.Type, + NetworkMode: cntr.HostConfig.NetworkMode, + NetworkDriver: netDriver, + IPAddress: network.IPAddress, + } + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.PrivatePort) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + tgg.targets = append(tgg.targets, tgt) + } + } + + return tgg +} + +func (d *Discoverer) cleanup() { + if d.dockerClient != nil { + _ = d.dockerClient.Close() + } +} + +func cntrSource(cntr types.Container) string { + name := strings.TrimPrefix(cntr.Names[0], "/") + return fmt.Sprintf("discoverer=docker,container=%s,image=%s", name, cntr.Image) +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} + +func mapAny(src map[string]string) map[string]any { + if src == nil { + return nil + } + m := make(map[string]any, len(src)) + for k, v := range src { + m[k] = v + } + return m +} |