From 87d772a7d708fec12f48cd8adc0dedff6e1025da Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 26 Aug 2024 10:15:20 +0200 Subject: Adding upstream version 1.47.0. Signed-off-by: Daniel Baumann --- .../sd/discoverer/netlisteners/netlisteners.go | 326 --------------------- 1 file changed, 326 deletions(-) delete mode 100644 src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go (limited to 'src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go') diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go deleted file mode 100644 index bfd7a99b8..000000000 --- a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go +++ /dev/null @@ -1,326 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -package netlisteners - -import ( - "bufio" - "bytes" - "context" - "errors" - "fmt" - "log/slog" - "net" - "os" - "os/exec" - "path/filepath" - "sort" - "strconv" - "strings" - "time" - - "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model" - "github.com/netdata/netdata/go/go.d.plugin/agent/executable" - "github.com/netdata/netdata/go/go.d.plugin/logger" - - "github.com/ilyam8/hashstructure" -) - -var ( - shortName = "net_listeners" - fullName = fmt.Sprintf("sd:%s", shortName) -) - -func NewDiscoverer(cfg Config) (*Discoverer, error) { - tags, err := model.ParseTags(cfg.Tags) - if err != nil { - return nil, fmt.Errorf("parse tags: %v", err) - } - - dir := os.Getenv("NETDATA_PLUGINS_DIR") - if dir == "" { - dir = executable.Directory - } - if dir == "" { - dir, _ = os.Getwd() - } - - d := &Discoverer{ - Logger: logger.New().With( - slog.String("component", "service discovery"), - slog.String("discoverer", shortName), - ), - cfgSource: cfg.Source, - ll: &localListenersExec{ - binPath: filepath.Join(dir, "local-listeners"), - timeout: time.Second * 5, - }, - interval: time.Minute * 2, - expiryTime: time.Minute * 10, - cache: make(map[uint64]*cacheItem), - started: make(chan struct{}), - } - - d.Tags().Merge(tags) - - return d, nil -} - -type Config struct { - Source string `yaml:"-"` - Tags string `yaml:"tags"` -} - -type ( - Discoverer struct { - *logger.Logger - model.Base - - cfgSource string - - interval time.Duration - ll localListeners - - expiryTime time.Duration - cache map[uint64]*cacheItem // [target.Hash] - - started chan struct{} - } - cacheItem struct { - lastSeenTime time.Time - tgt model.Target - } - localListeners interface { - discover(ctx context.Context) ([]byte, error) - } -) - -func (d *Discoverer) String() string { - return fullName -} - -func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) { - d.Info("instance is started") - defer func() { d.Info("instance is stopped") }() - - close(d.started) - - if err := d.discoverLocalListeners(ctx, in); err != nil { - d.Error(err) - return - } - - tk := time.NewTicker(d.interval) - defer tk.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-tk.C: - if err := d.discoverLocalListeners(ctx, in); err != nil { - d.Warning(err) - return - } - } - } -} - -func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error { - bs, err := d.ll.discover(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - return err - } - - tgts, err := d.parseLocalListeners(bs) - if err != nil { - return err - } - - tggs := d.processTargets(tgts) - - select { - case <-ctx.Done(): - case in <- tggs: - } - - return nil -} - -func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup { - tgg := &targetGroup{ - provider: fullName, - source: fmt.Sprintf("discoverer=%s,host=localhost", shortName), - } - if d.cfgSource != "" { - tgg.source += fmt.Sprintf(",%s", d.cfgSource) - } - - if d.expiryTime.Milliseconds() == 0 { - tgg.targets = tgts - return []model.TargetGroup{tgg} - } - - now := time.Now() - - for _, tgt := range tgts { - hash := tgt.Hash() - if _, ok := d.cache[hash]; !ok { - d.cache[hash] = &cacheItem{tgt: tgt} - } - d.cache[hash].lastSeenTime = now - } - - for k, v := range d.cache { - if now.Sub(v.lastSeenTime) > d.expiryTime { - delete(d.cache, k) - continue - } - tgg.targets = append(tgg.targets, v.tgt) - } - - return []model.TargetGroup{tgg} -} - -func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) { - const ( - local4 = "127.0.0.1" - local6 = "::1" - ) - - var targets []target - sc := bufio.NewScanner(bytes.NewReader(bs)) - - for sc.Scan() { - text := strings.TrimSpace(sc.Text()) - if text == "" { - continue - } - - // Protocol|IPAddress|Port|Cmdline - parts := strings.SplitN(text, "|", 4) - if len(parts) != 4 { - return nil, fmt.Errorf("unexpected data: '%s'", text) - } - - tgt := target{ - Protocol: parts[0], - IPAddress: parts[1], - Port: parts[2], - Comm: extractComm(parts[3]), - Cmdline: parts[3], - } - - if tgt.Comm == "docker-proxy" { - continue - } - - if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") { - tgt.IPAddress = local4 - } else if tgt.IPAddress == "::" { - tgt.IPAddress = local6 - } - - // quick support for https://github.com/netdata/netdata/pull/17866 - // TODO: create both ipv4 and ipv6 targets? - if tgt.IPAddress == "*" { - tgt.IPAddress = local4 - } - - tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port) - - hash, err := calcHash(tgt) - if err != nil { - continue - } - - tgt.hash = hash - tgt.Tags().Merge(d.Tags()) - - targets = append(targets, tgt) - } - - // order: TCP, TCP6, UDP, UDP6 - sort.Slice(targets, func(i, j int) bool { - tgt1, tgt2 := targets[i], targets[j] - if tgt1.Protocol != tgt2.Protocol { - return tgt1.Protocol < tgt2.Protocol - } - - p1, _ := strconv.Atoi(targets[i].Port) - p2, _ := strconv.Atoi(targets[j].Port) - if p1 != p2 { - return p1 < p2 - } - - return tgt1.IPAddress == local4 || tgt1.IPAddress == local6 - }) - - seen := make(map[string]bool) - tgts := make([]model.Target, len(targets)) - var n int - - for _, tgt := range targets { - tgt := tgt - - proto := strings.TrimSuffix(tgt.Protocol, "6") - key := tgt.Protocol + ":" + tgt.Address - keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port) - keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port) - - // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface (a.b.c.d). - // Create a target only for localhost. Assumption: any address always goes first. - if seen[key] || seen[keyLocal4] || seen[keyLocal6] { - continue - } - seen[key] = true - - tgts[n] = &tgt - n++ - } - - return tgts[:n], nil -} - -type localListenersExec struct { - binPath string - timeout time.Duration -} - -func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) { - execCtx, cancel := context.WithTimeout(ctx, e.timeout) - defer cancel() - - // TCPv4/6 and UPDv4 sockets in LISTEN state - // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c - args := []string{ - "no-udp6", - "no-local", - "no-inbound", - "no-outbound", - "no-namespaces", - } - - cmd := exec.CommandContext(execCtx, e.binPath, args...) - - bs, err := cmd.Output() - if err != nil { - return nil, fmt.Errorf("error on executing '%s': %v", cmd, err) - } - - return bs, nil -} - -func extractComm(cmdLine string) string { - if i := strings.IndexByte(cmdLine, ' '); i != -1 { - cmdLine = cmdLine[:i] - } - _, comm := filepath.Split(cmdLine) - return strings.TrimSuffix(comm, ":") -} - -func calcHash(obj any) (uint64, error) { - return hashstructure.Hash(obj, nil) -} -- cgit v1.2.3