summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/agent.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/agent.go253
1 files changed, 253 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/agent.go b/src/go/collectors/go.d.plugin/agent/agent.go
new file mode 100644
index 000000000..caf260dc2
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/agent.go
@@ -0,0 +1,253 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package agent
+
+import (
+ "context"
+ "io"
+ "log/slog"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/discovery"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/filelock"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/filestatus"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/functions"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/jobmgr"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/vnodes"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/multipath"
+
+ "github.com/mattn/go-isatty"
+)
+
+var isTerminal = isatty.IsTerminal(os.Stdout.Fd())
+
+// Config is an Agent configuration.
+type Config struct {
+ Name string
+ ConfDir []string
+ ModulesConfDir []string
+ ModulesConfSDDir []string
+ ModulesConfWatchPath []string
+ VnodesConfDir []string
+ StateFile string
+ LockDir string
+ ModuleRegistry module.Registry
+ RunModule string
+ MinUpdateEvery int
+}
+
+// Agent represents orchestrator.
+type Agent struct {
+ *logger.Logger
+
+ Name string
+ ConfDir multipath.MultiPath
+ ModulesConfDir multipath.MultiPath
+ ModulesConfSDDir multipath.MultiPath
+ ModulesSDConfPath []string
+ VnodesConfDir multipath.MultiPath
+ StateFile string
+ LockDir string
+ RunModule string
+ MinUpdateEvery int
+ ModuleRegistry module.Registry
+ Out io.Writer
+
+ api *netdataapi.API
+}
+
+// New creates a new Agent.
+func New(cfg Config) *Agent {
+ return &Agent{
+ Logger: logger.New().With(
+ slog.String("component", "agent"),
+ ),
+ Name: cfg.Name,
+ ConfDir: cfg.ConfDir,
+ ModulesConfDir: cfg.ModulesConfDir,
+ ModulesConfSDDir: cfg.ModulesConfSDDir,
+ ModulesSDConfPath: cfg.ModulesConfWatchPath,
+ VnodesConfDir: cfg.VnodesConfDir,
+ StateFile: cfg.StateFile,
+ LockDir: cfg.LockDir,
+ RunModule: cfg.RunModule,
+ MinUpdateEvery: cfg.MinUpdateEvery,
+ ModuleRegistry: module.DefaultRegistry,
+ Out: safewriter.Stdout,
+ api: netdataapi.New(safewriter.Stdout),
+ }
+}
+
+// Run starts the Agent.
+func (a *Agent) Run() {
+ go a.keepAlive()
+ serve(a)
+}
+
+func serve(a *Agent) {
+ ch := make(chan os.Signal, 1)
+ signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
+ var wg sync.WaitGroup
+
+ var exit bool
+
+ for {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ wg.Add(1)
+ go func() { defer wg.Done(); a.run(ctx) }()
+
+ switch sig := <-ch; sig {
+ case syscall.SIGHUP:
+ a.Infof("received %s signal (%d). Restarting running instance", sig, sig)
+ default:
+ a.Infof("received %s signal (%d). Terminating...", sig, sig)
+ module.DontObsoleteCharts()
+ exit = true
+ }
+
+ cancel()
+
+ func() {
+ timeout := time.Second * 10
+ t := time.NewTimer(timeout)
+ defer t.Stop()
+ done := make(chan struct{})
+
+ go func() { wg.Wait(); close(done) }()
+
+ select {
+ case <-t.C:
+ a.Errorf("stopping all goroutines timed out after %s. Exiting...", timeout)
+ os.Exit(0)
+ case <-done:
+ }
+ }()
+
+ if exit {
+ os.Exit(0)
+ }
+
+ time.Sleep(time.Second)
+ }
+}
+
+func (a *Agent) run(ctx context.Context) {
+ a.Info("instance is started")
+ defer func() { a.Info("instance is stopped") }()
+
+ cfg := a.loadPluginConfig()
+ a.Infof("using config: %s", cfg.String())
+
+ if !cfg.Enabled {
+ a.Info("plugin is disabled in the configuration file, exiting...")
+ if isTerminal {
+ os.Exit(0)
+ }
+ _ = a.api.DISABLE()
+ return
+ }
+
+ enabledModules := a.loadEnabledModules(cfg)
+ if len(enabledModules) == 0 {
+ a.Info("no modules to run")
+ if isTerminal {
+ os.Exit(0)
+ }
+ _ = a.api.DISABLE()
+ return
+ }
+
+ discCfg := a.buildDiscoveryConf(enabledModules)
+
+ discMgr, err := discovery.NewManager(discCfg)
+ if err != nil {
+ a.Error(err)
+ if isTerminal {
+ os.Exit(0)
+ }
+ return
+ }
+
+ fnMgr := functions.NewManager()
+
+ jobMgr := jobmgr.New()
+ jobMgr.PluginName = a.Name
+ jobMgr.Out = a.Out
+ jobMgr.Modules = enabledModules
+ jobMgr.ConfigDefaults = discCfg.Registry
+ jobMgr.FnReg = fnMgr
+
+ if reg := a.setupVnodeRegistry(); reg == nil || reg.Len() == 0 {
+ vnodes.Disabled = true
+ } else {
+ jobMgr.Vnodes = reg
+ }
+
+ if a.LockDir != "" {
+ jobMgr.FileLock = filelock.New(a.LockDir)
+ }
+
+ var fsMgr *filestatus.Manager
+ if !isTerminal && a.StateFile != "" {
+ fsMgr = filestatus.NewManager(a.StateFile)
+ jobMgr.FileStatus = fsMgr
+ if store, err := filestatus.LoadStore(a.StateFile); err != nil {
+ a.Warningf("couldn't load state file: %v", err)
+ } else {
+ jobMgr.FileStatusStore = store
+ }
+ }
+
+ in := make(chan []*confgroup.Group)
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); fnMgr.Run(ctx) }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); jobMgr.Run(ctx, in) }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); discMgr.Run(ctx, in) }()
+
+ if fsMgr != nil {
+ wg.Add(1)
+ go func() { defer wg.Done(); fsMgr.Run(ctx) }()
+ }
+
+ wg.Wait()
+ <-ctx.Done()
+}
+
+func (a *Agent) keepAlive() {
+ if isTerminal {
+ return
+ }
+
+ tk := time.NewTicker(time.Second)
+ defer tk.Stop()
+
+ var n int
+ for range tk.C {
+ if err := a.api.EMPTYLINE(); err != nil {
+ a.Infof("keepAlive: %v", err)
+ n++
+ } else {
+ n = 0
+ }
+ if n == 3 {
+ a.Info("too many keepAlive errors. Terminating...")
+ os.Exit(0)
+ }
+ }
+}