diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go | 321 |
1 files changed, 321 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go new file mode 100644 index 000000000..9897405cd --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go @@ -0,0 +1,321 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package netlisteners + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "log/slog" + "net" + "os" + "os/exec" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" + "github.com/netdata/netdata/go/go.d.plugin/agent/executable" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/ilyam8/hashstructure" +) + +var ( + shortName = "net_listeners" + fullName = fmt.Sprintf("sd:%s", shortName) +) + +func NewDiscoverer(cfg Config) (*Discoverer, error) { + tags, err := model.ParseTags(cfg.Tags) + if err != nil { + return nil, fmt.Errorf("parse tags: %v", err) + } + + dir := os.Getenv("NETDATA_PLUGINS_DIR") + if dir == "" { + dir = executable.Directory + } + if dir == "" { + dir, _ = os.Getwd() + } + + d := &Discoverer{ + Logger: logger.New().With( + slog.String("component", "service discovery"), + slog.String("discoverer", shortName), + ), + cfgSource: cfg.Source, + ll: &localListenersExec{ + binPath: filepath.Join(dir, "local-listeners"), + timeout: time.Second * 5, + }, + interval: time.Minute * 2, + expiryTime: time.Minute * 10, + cache: make(map[uint64]*cacheItem), + started: make(chan struct{}), + } + + d.Tags().Merge(tags) + + return d, nil +} + +type Config struct { + Source string `yaml:"-"` + Tags string `yaml:"tags"` +} + +type ( + Discoverer struct { + *logger.Logger + model.Base + + cfgSource string + + interval time.Duration + ll localListeners + + expiryTime time.Duration + cache map[uint64]*cacheItem // [target.Hash] + + started chan struct{} + } + cacheItem struct { + lastSeenTime time.Time + tgt model.Target + } + localListeners interface { + discover(ctx context.Context) ([]byte, error) + } +) + +func (d *Discoverer) String() string { + return fullName +} + +func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { + d.Info("instance is started") + defer func() { d.Info("instance is stopped") }() + + close(d.started) + + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Error(err) + return + } + + tk := time.NewTicker(d.interval) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + if err := d.discoverLocalListeners(ctx, in); err != nil { + d.Warning(err) + return + } + } + } +} + +func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error { + bs, err := d.ll.discover(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return err + } + + tgts, err := d.parseLocalListeners(bs) + if err != nil { + return err + } + + tggs := d.processTargets(tgts) + + select { + case <-ctx.Done(): + case in <- tggs: + } + + return nil +} + +func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup { + tgg := &targetGroup{ + provider: fullName, + source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), + } + if d.cfgSource != "" { + tgg.source += fmt.Sprintf(",%s", d.cfgSource) + } + + if d.expiryTime.Milliseconds() == 0 { + tgg.targets = tgts + return []model.TargetGroup{tgg} + } + + now := time.Now() + + for _, tgt := range tgts { + hash := tgt.Hash() + if _, ok := d.cache[hash]; !ok { + d.cache[hash] = &cacheItem{tgt: tgt} + } + d.cache[hash].lastSeenTime = now + } + + for k, v := range d.cache { + if now.Sub(v.lastSeenTime) > d.expiryTime { + delete(d.cache, k) + continue + } + tgg.targets = append(tgg.targets, v.tgt) + } + + return []model.TargetGroup{tgg} +} + +func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { + const ( + local4 = "127.0.0.1" + local6 = "::1" + ) + + var targets []target + sc := bufio.NewScanner(bytes.NewReader(bs)) + + for sc.Scan() { + text := strings.TrimSpace(sc.Text()) + if text == "" { + continue + } + + // Protocol|IPAddress|Port|Cmdline + parts := strings.SplitN(text, "|", 4) + if len(parts) != 4 { + return nil, fmt.Errorf("unexpected data: '%s'", text) + } + + tgt := target{ + Protocol: parts[0], + IPAddress: parts[1], + Port: parts[2], + Comm: extractComm(parts[3]), + Cmdline: parts[3], + } + + if tgt.Comm == "docker-proxy" { + continue + } + + if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") { + tgt.IPAddress = local4 + } else if tgt.IPAddress == "::" { + tgt.IPAddress = local6 + } + + tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port) + + hash, err := calcHash(tgt) + if err != nil { + continue + } + + tgt.hash = hash + tgt.Tags().Merge(d.Tags()) + + targets = append(targets, tgt) + } + + // order: TCP, TCP6, UDP, UDP6 + sort.Slice(targets, func(i, j int) bool { + tgt1, tgt2 := targets[i], targets[j] + if tgt1.Protocol != tgt2.Protocol { + return tgt1.Protocol < tgt2.Protocol + } + + p1, _ := strconv.Atoi(targets[i].Port) + p2, _ := strconv.Atoi(targets[j].Port) + if p1 != p2 { + return p1 < p2 + } + + return tgt1.IPAddress == local4 || tgt1.IPAddress == local6 + }) + + seen := make(map[string]bool) + tgts := make([]model.Target, len(targets)) + var n int + + for _, tgt := range targets { + tgt := tgt + + proto := strings.TrimSuffix(tgt.Protocol, "6") + key := tgt.Protocol + ":" + tgt.Address + keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port) + keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port) + + // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface (a.b.c.d). + // Create a target only for localhost. Assumption: any address always goes first. + if seen[key] || seen[keyLocal4] || seen[keyLocal6] { + continue + } + seen[key] = true + + tgts[n] = &tgt + n++ + } + + return tgts[:n], nil +} + +type localListenersExec struct { + binPath string + timeout time.Duration +} + +func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) { + execCtx, cancel := context.WithTimeout(ctx, e.timeout) + defer cancel() + + // TCPv4/6 and UPDv4 sockets in LISTEN state + // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c + args := []string{ + "no-udp6", + "no-local", + "no-inbound", + "no-outbound", + "no-namespaces", + } + + cmd := exec.CommandContext(execCtx, e.binPath, args...) + + bs, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("error on executing '%s': %v", cmd, err) + } + + return bs, nil +} + +func extractComm(cmdLine string) string { + i := strings.IndexByte(cmdLine, ' ') + if i <= 0 { + return cmdLine + } + _, comm := filepath.Split(cmdLine[:i]) + return comm +} + +func calcHash(obj any) (uint64, error) { + return hashstructure.Hash(obj, nil) +} |