diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:35 +0000 |
commit | f09848204fa5283d21ea43e262ee41aa578e1808 (patch) | |
tree | c62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/agent/agent.go | |
parent | Releasing debian version 1.46.3-2. (diff) | |
download | netdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip |
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/agent/agent.go')
-rw-r--r-- | src/go/plugin/go.d/agent/agent.go | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/agent.go b/src/go/plugin/go.d/agent/agent.go new file mode 100644 index 000000000..2423e84e0 --- /dev/null +++ b/src/go/plugin/go.d/agent/agent.go @@ -0,0 +1,253 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package agent + +import ( + "context" + "io" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/filelock" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/filestatus" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/jobmgr" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/vnodes" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/multipath" + + "github.com/mattn/go-isatty" +) + +var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) + +// Config is an Agent configuration. +type Config struct { + Name string + ConfDir []string + ModulesConfDir []string + ModulesConfSDDir []string + ModulesConfWatchPath []string + VnodesConfDir []string + StateFile string + LockDir string + ModuleRegistry module.Registry + RunModule string + MinUpdateEvery int +} + +// Agent represents orchestrator. +type Agent struct { + *logger.Logger + + Name string + ConfDir multipath.MultiPath + ModulesConfDir multipath.MultiPath + ModulesConfSDDir multipath.MultiPath + ModulesSDConfPath []string + VnodesConfDir multipath.MultiPath + StateFile string + LockDir string + RunModule string + MinUpdateEvery int + ModuleRegistry module.Registry + Out io.Writer + + api *netdataapi.API +} + +// New creates a new Agent. +func New(cfg Config) *Agent { + return &Agent{ + Logger: logger.New().With( + slog.String("component", "agent"), + ), + Name: cfg.Name, + ConfDir: cfg.ConfDir, + ModulesConfDir: cfg.ModulesConfDir, + ModulesConfSDDir: cfg.ModulesConfSDDir, + ModulesSDConfPath: cfg.ModulesConfWatchPath, + VnodesConfDir: cfg.VnodesConfDir, + StateFile: cfg.StateFile, + LockDir: cfg.LockDir, + RunModule: cfg.RunModule, + MinUpdateEvery: cfg.MinUpdateEvery, + ModuleRegistry: module.DefaultRegistry, + Out: safewriter.Stdout, + api: netdataapi.New(safewriter.Stdout), + } +} + +// Run starts the Agent. +func (a *Agent) Run() { + go a.keepAlive() + serve(a) +} + +func serve(a *Agent) { + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + var wg sync.WaitGroup + + var exit bool + + for { + ctx, cancel := context.WithCancel(context.Background()) + + wg.Add(1) + go func() { defer wg.Done(); a.run(ctx) }() + + switch sig := <-ch; sig { + case syscall.SIGHUP: + a.Infof("received %s signal (%d). Restarting running instance", sig, sig) + default: + a.Infof("received %s signal (%d). Terminating...", sig, sig) + module.DontObsoleteCharts() + exit = true + } + + cancel() + + func() { + timeout := time.Second * 10 + t := time.NewTimer(timeout) + defer t.Stop() + done := make(chan struct{}) + + go func() { wg.Wait(); close(done) }() + + select { + case <-t.C: + a.Errorf("stopping all goroutines timed out after %s. Exiting...", timeout) + os.Exit(0) + case <-done: + } + }() + + if exit { + os.Exit(0) + } + + time.Sleep(time.Second) + } +} + +func (a *Agent) run(ctx context.Context) { + a.Info("instance is started") + defer func() { a.Info("instance is stopped") }() + + cfg := a.loadPluginConfig() + a.Infof("using config: %s", cfg.String()) + + if !cfg.Enabled { + a.Info("plugin is disabled in the configuration file, exiting...") + if isTerminal { + os.Exit(0) + } + _ = a.api.DISABLE() + return + } + + enabledModules := a.loadEnabledModules(cfg) + if len(enabledModules) == 0 { + a.Info("no modules to run") + if isTerminal { + os.Exit(0) + } + _ = a.api.DISABLE() + return + } + + discCfg := a.buildDiscoveryConf(enabledModules) + + discMgr, err := discovery.NewManager(discCfg) + if err != nil { + a.Error(err) + if isTerminal { + os.Exit(0) + } + return + } + + fnMgr := functions.NewManager() + + jobMgr := jobmgr.New() + jobMgr.PluginName = a.Name + jobMgr.Out = a.Out + jobMgr.Modules = enabledModules + jobMgr.ConfigDefaults = discCfg.Registry + jobMgr.FnReg = fnMgr + + if reg := a.setupVnodeRegistry(); reg == nil || reg.Len() == 0 { + vnodes.Disabled = true + } else { + jobMgr.Vnodes = reg + } + + if a.LockDir != "" { + jobMgr.FileLock = filelock.New(a.LockDir) + } + + var fsMgr *filestatus.Manager + if !isTerminal && a.StateFile != "" { + fsMgr = filestatus.NewManager(a.StateFile) + jobMgr.FileStatus = fsMgr + if store, err := filestatus.LoadStore(a.StateFile); err != nil { + a.Warningf("couldn't load state file: %v", err) + } else { + jobMgr.FileStatusStore = store + } + } + + in := make(chan []*confgroup.Group) + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); fnMgr.Run(ctx) }() + + wg.Add(1) + go func() { defer wg.Done(); jobMgr.Run(ctx, in) }() + + wg.Add(1) + go func() { defer wg.Done(); discMgr.Run(ctx, in) }() + + if fsMgr != nil { + wg.Add(1) + go func() { defer wg.Done(); fsMgr.Run(ctx) }() + } + + wg.Wait() + <-ctx.Done() +} + +func (a *Agent) keepAlive() { + if isTerminal { + return + } + + tk := time.NewTicker(time.Second) + defer tk.Stop() + + var n int + for range tk.C { + if err := a.api.EMPTYLINE(); err != nil { + a.Infof("keepAlive: %v", err) + n++ + } else { + n = 0 + } + if n == 3 { + a.Info("too many keepAlive errors. Terminating...") + os.Exit(0) + } + } +} |