summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go321
1 files changed, 321 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
new file mode 100644
index 000000000..9897405cd
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/netlisteners/netlisteners.go
@@ -0,0 +1,321 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package netlisteners
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "net"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/executable"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/ilyam8/hashstructure"
+)
+
+var (
+ shortName = "net_listeners"
+ fullName = fmt.Sprintf("sd:%s", shortName)
+)
+
+func NewDiscoverer(cfg Config) (*Discoverer, error) {
+ tags, err := model.ParseTags(cfg.Tags)
+ if err != nil {
+ return nil, fmt.Errorf("parse tags: %v", err)
+ }
+
+ dir := os.Getenv("NETDATA_PLUGINS_DIR")
+ if dir == "" {
+ dir = executable.Directory
+ }
+ if dir == "" {
+ dir, _ = os.Getwd()
+ }
+
+ d := &Discoverer{
+ Logger: logger.New().With(
+ slog.String("component", "service discovery"),
+ slog.String("discoverer", shortName),
+ ),
+ cfgSource: cfg.Source,
+ ll: &localListenersExec{
+ binPath: filepath.Join(dir, "local-listeners"),
+ timeout: time.Second * 5,
+ },
+ interval: time.Minute * 2,
+ expiryTime: time.Minute * 10,
+ cache: make(map[uint64]*cacheItem),
+ started: make(chan struct{}),
+ }
+
+ d.Tags().Merge(tags)
+
+ return d, nil
+}
+
+type Config struct {
+ Source string `yaml:"-"`
+ Tags string `yaml:"tags"`
+}
+
+type (
+ Discoverer struct {
+ *logger.Logger
+ model.Base
+
+ cfgSource string
+
+ interval time.Duration
+ ll localListeners
+
+ expiryTime time.Duration
+ cache map[uint64]*cacheItem // [target.Hash]
+
+ started chan struct{}
+ }
+ cacheItem struct {
+ lastSeenTime time.Time
+ tgt model.Target
+ }
+ localListeners interface {
+ discover(ctx context.Context) ([]byte, error)
+ }
+)
+
+func (d *Discoverer) String() string {
+ return fullName
+}
+
+func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
+ d.Info("instance is started")
+ defer func() { d.Info("instance is stopped") }()
+
+ close(d.started)
+
+ if err := d.discoverLocalListeners(ctx, in); err != nil {
+ d.Error(err)
+ return
+ }
+
+ tk := time.NewTicker(d.interval)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ if err := d.discoverLocalListeners(ctx, in); err != nil {
+ d.Warning(err)
+ return
+ }
+ }
+ }
+}
+
+func (d *Discoverer) discoverLocalListeners(ctx context.Context, in chan<- []model.TargetGroup) error {
+ bs, err := d.ll.discover(ctx)
+ if err != nil {
+ if errors.Is(err, context.Canceled) {
+ return nil
+ }
+ return err
+ }
+
+ tgts, err := d.parseLocalListeners(bs)
+ if err != nil {
+ return err
+ }
+
+ tggs := d.processTargets(tgts)
+
+ select {
+ case <-ctx.Done():
+ case in <- tggs:
+ }
+
+ return nil
+}
+
+func (d *Discoverer) processTargets(tgts []model.Target) []model.TargetGroup {
+ tgg := &targetGroup{
+ provider: fullName,
+ source: fmt.Sprintf("discoverer=%s,host=localhost", shortName),
+ }
+ if d.cfgSource != "" {
+ tgg.source += fmt.Sprintf(",%s", d.cfgSource)
+ }
+
+ if d.expiryTime.Milliseconds() == 0 {
+ tgg.targets = tgts
+ return []model.TargetGroup{tgg}
+ }
+
+ now := time.Now()
+
+ for _, tgt := range tgts {
+ hash := tgt.Hash()
+ if _, ok := d.cache[hash]; !ok {
+ d.cache[hash] = &cacheItem{tgt: tgt}
+ }
+ d.cache[hash].lastSeenTime = now
+ }
+
+ for k, v := range d.cache {
+ if now.Sub(v.lastSeenTime) > d.expiryTime {
+ delete(d.cache, k)
+ continue
+ }
+ tgg.targets = append(tgg.targets, v.tgt)
+ }
+
+ return []model.TargetGroup{tgg}
+}
+
+func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
+ const (
+ local4 = "127.0.0.1"
+ local6 = "::1"
+ )
+
+ var targets []target
+ sc := bufio.NewScanner(bytes.NewReader(bs))
+
+ for sc.Scan() {
+ text := strings.TrimSpace(sc.Text())
+ if text == "" {
+ continue
+ }
+
+ // Protocol|IPAddress|Port|Cmdline
+ parts := strings.SplitN(text, "|", 4)
+ if len(parts) != 4 {
+ return nil, fmt.Errorf("unexpected data: '%s'", text)
+ }
+
+ tgt := target{
+ Protocol: parts[0],
+ IPAddress: parts[1],
+ Port: parts[2],
+ Comm: extractComm(parts[3]),
+ Cmdline: parts[3],
+ }
+
+ if tgt.Comm == "docker-proxy" {
+ continue
+ }
+
+ if tgt.IPAddress == "0.0.0.0" || strings.HasPrefix(tgt.IPAddress, "127") {
+ tgt.IPAddress = local4
+ } else if tgt.IPAddress == "::" {
+ tgt.IPAddress = local6
+ }
+
+ tgt.Address = net.JoinHostPort(tgt.IPAddress, tgt.Port)
+
+ hash, err := calcHash(tgt)
+ if err != nil {
+ continue
+ }
+
+ tgt.hash = hash
+ tgt.Tags().Merge(d.Tags())
+
+ targets = append(targets, tgt)
+ }
+
+ // order: TCP, TCP6, UDP, UDP6
+ sort.Slice(targets, func(i, j int) bool {
+ tgt1, tgt2 := targets[i], targets[j]
+ if tgt1.Protocol != tgt2.Protocol {
+ return tgt1.Protocol < tgt2.Protocol
+ }
+
+ p1, _ := strconv.Atoi(targets[i].Port)
+ p2, _ := strconv.Atoi(targets[j].Port)
+ if p1 != p2 {
+ return p1 < p2
+ }
+
+ return tgt1.IPAddress == local4 || tgt1.IPAddress == local6
+ })
+
+ seen := make(map[string]bool)
+ tgts := make([]model.Target, len(targets))
+ var n int
+
+ for _, tgt := range targets {
+ tgt := tgt
+
+ proto := strings.TrimSuffix(tgt.Protocol, "6")
+ key := tgt.Protocol + ":" + tgt.Address
+ keyLocal4 := proto + ":" + net.JoinHostPort(local4, tgt.Port)
+ keyLocal6 := proto + "6:" + net.JoinHostPort(local6, tgt.Port)
+
+ // Filter targets that accept conns on any (0.0.0.0) and additionally on each individual network interface (a.b.c.d).
+ // Create a target only for localhost. Assumption: any address always goes first.
+ if seen[key] || seen[keyLocal4] || seen[keyLocal6] {
+ continue
+ }
+ seen[key] = true
+
+ tgts[n] = &tgt
+ n++
+ }
+
+ return tgts[:n], nil
+}
+
+type localListenersExec struct {
+ binPath string
+ timeout time.Duration
+}
+
+func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) {
+ execCtx, cancel := context.WithTimeout(ctx, e.timeout)
+ defer cancel()
+
+ // TCPv4/6 and UPDv4 sockets in LISTEN state
+ // https://github.com/netdata/netdata/blob/master/src/collectors/plugins.d/local_listeners.c
+ args := []string{
+ "no-udp6",
+ "no-local",
+ "no-inbound",
+ "no-outbound",
+ "no-namespaces",
+ }
+
+ cmd := exec.CommandContext(execCtx, e.binPath, args...)
+
+ bs, err := cmd.Output()
+ if err != nil {
+ return nil, fmt.Errorf("error on executing '%s': %v", cmd, err)
+ }
+
+ return bs, nil
+}
+
+func extractComm(cmdLine string) string {
+ i := strings.IndexByte(cmdLine, ' ')
+ if i <= 0 {
+ return cmdLine
+ }
+ _, comm := filepath.Split(cmdLine[:i])
+ return comm
+}
+
+func calcHash(obj any) (uint64, error) {
+ return hashstructure.Hash(obj, nil)
+}