summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/jobmgr/manager.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/jobmgr/manager.go370
1 files changed, 370 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/jobmgr/manager.go b/src/go/collectors/go.d.plugin/agent/jobmgr/manager.go
new file mode 100644
index 000000000..f4d55fcf6
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/jobmgr/manager.go
@@ -0,0 +1,370 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/functions"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/module"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/safewriter"
+ "github.com/netdata/netdata/go/go.d.plugin/agent/ticker"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/mattn/go-isatty"
+ "gopkg.in/yaml.v2"
+)
+
+var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd())
+
+func New() *Manager {
+ mgr := &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "job manager"),
+ ),
+ Out: io.Discard,
+ FileLock: noop{},
+ FileStatus: noop{},
+ FileStatusStore: noop{},
+ Vnodes: noop{},
+ FnReg: noop{},
+
+ discoveredConfigs: newDiscoveredConfigsCache(),
+ seenConfigs: newSeenConfigCache(),
+ exposedConfigs: newExposedConfigCache(),
+ runningJobs: newRunningJobsCache(),
+ retryingTasks: newRetryingTasksCache(),
+
+ started: make(chan struct{}),
+ api: netdataapi.New(safewriter.Stdout),
+ addCh: make(chan confgroup.Config),
+ rmCh: make(chan confgroup.Config),
+ dyncfgCh: make(chan functions.Function),
+ }
+
+ return mgr
+}
+
+type Manager struct {
+ *logger.Logger
+
+ PluginName string
+ Out io.Writer
+ Modules module.Registry
+ ConfigDefaults confgroup.Registry
+
+ FileLock FileLocker
+ FileStatus FileStatus
+ FileStatusStore FileStatusStore
+ Vnodes Vnodes
+ FnReg FunctionRegistry
+
+ discoveredConfigs *discoveredConfigs
+ seenConfigs *seenConfigs
+ exposedConfigs *exposedConfigs
+ retryingTasks *retryingTasks
+ runningJobs *runningJobs
+
+ ctx context.Context
+ started chan struct{}
+ api dyncfgAPI
+ addCh chan confgroup.Config
+ rmCh chan confgroup.Config
+ dyncfgCh chan functions.Function
+
+ waitCfgOnOff string // block processing of discovered configs until "enable"/"disable" is received from Netdata
+}
+
+func (m *Manager) Run(ctx context.Context, in chan []*confgroup.Group) {
+ m.Info("instance is started")
+ defer func() { m.cleanup(); m.Info("instance is stopped") }()
+ m.ctx = ctx
+
+ m.FnReg.Register("config", m.dyncfgConfig)
+
+ for name := range m.Modules {
+ m.dyncfgModuleCreate(name)
+ }
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.runProcessConfGroups(in) }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.run() }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.runNotifyRunningJobs() }()
+
+ close(m.started)
+
+ wg.Wait()
+ <-m.ctx.Done()
+}
+
+func (m *Manager) runProcessConfGroups(in chan []*confgroup.Group) {
+ for {
+ select {
+ case <-m.ctx.Done():
+ return
+ case groups := <-in:
+ for _, gr := range groups {
+ a, r := m.discoveredConfigs.add(gr)
+ m.Debugf("received configs: %d/+%d/-%d ('%s')", len(gr.Configs), len(a), len(r), gr.Source)
+ sendConfigs(m.ctx, m.rmCh, r...)
+ sendConfigs(m.ctx, m.addCh, a...)
+ }
+ }
+ }
+}
+
+func (m *Manager) run() {
+ for {
+ if m.waitCfgOnOff != "" {
+ select {
+ case <-m.ctx.Done():
+ return
+ case fn := <-m.dyncfgCh:
+ m.dyncfgConfigExec(fn)
+ }
+ } else {
+ select {
+ case <-m.ctx.Done():
+ return
+ case cfg := <-m.addCh:
+ m.addConfig(cfg)
+ case cfg := <-m.rmCh:
+ m.removeConfig(cfg)
+ case fn := <-m.dyncfgCh:
+ m.dyncfgConfigExec(fn)
+ }
+ }
+ }
+}
+
+func (m *Manager) addConfig(cfg confgroup.Config) {
+ if _, ok := m.Modules.Lookup(cfg.Module()); !ok {
+ return
+ }
+
+ m.retryingTasks.remove(cfg)
+
+ scfg, ok := m.seenConfigs.lookup(cfg)
+ if !ok {
+ scfg = &seenConfig{cfg: cfg}
+ m.seenConfigs.add(scfg)
+ }
+
+ ecfg, ok := m.exposedConfigs.lookup(cfg)
+ if !ok {
+ scfg.status = dyncfgAccepted
+ ecfg = scfg
+ m.exposedConfigs.add(ecfg)
+ } else {
+ sp, ep := scfg.cfg.SourceTypePriority(), ecfg.cfg.SourceTypePriority()
+ if ep > sp || (ep == sp && ecfg.status == dyncfgRunning) {
+ m.retryingTasks.remove(cfg)
+ return
+ }
+ if ecfg.status == dyncfgRunning {
+ m.stopRunningJob(ecfg.cfg.FullName())
+ m.FileLock.Unlock(ecfg.cfg.FullName())
+ m.FileStatus.Remove(ecfg.cfg)
+ }
+ scfg.status = dyncfgAccepted
+ m.exposedConfigs.add(scfg) // replace existing exposed
+ ecfg = scfg
+ }
+
+ m.dyncfgJobCreate(ecfg.cfg, ecfg.status)
+
+ if isTerminal || m.PluginName == "nodyncfg" { // FIXME: quick fix of TestAgent_Run (agent_test.go)
+ m.dyncfgConfigEnable(functions.Function{Args: []string{dyncfgJobID(ecfg.cfg), "enable"}})
+ } else {
+ m.waitCfgOnOff = ecfg.cfg.FullName()
+ }
+}
+
+func (m *Manager) removeConfig(cfg confgroup.Config) {
+ m.retryingTasks.remove(cfg)
+
+ scfg, ok := m.seenConfigs.lookup(cfg)
+ if !ok {
+ return
+ }
+ m.seenConfigs.remove(cfg)
+
+ ecfg, ok := m.exposedConfigs.lookup(cfg)
+ if !ok || scfg.cfg.UID() != ecfg.cfg.UID() {
+ return
+ }
+
+ m.exposedConfigs.remove(cfg)
+ m.stopRunningJob(cfg.FullName())
+ m.FileLock.Unlock(cfg.FullName())
+ m.FileStatus.Remove(cfg)
+
+ if !isStock(cfg) || ecfg.status == dyncfgRunning {
+ m.dyncfgJobRemove(cfg)
+ }
+}
+
+func (m *Manager) runNotifyRunningJobs() {
+ tk := ticker.New(time.Second)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-m.ctx.Done():
+ return
+ case clock := <-tk.C:
+ m.runningJobs.lock()
+ m.runningJobs.forEach(func(_ string, job *module.Job) { job.Tick(clock) })
+ m.runningJobs.unlock()
+ }
+ }
+}
+
+func (m *Manager) startRunningJob(job *module.Job) {
+ m.runningJobs.lock()
+ defer m.runningJobs.unlock()
+
+ if job, ok := m.runningJobs.lookup(job.FullName()); ok {
+ job.Stop()
+ }
+
+ go job.Start()
+ m.runningJobs.add(job.FullName(), job)
+}
+
+func (m *Manager) stopRunningJob(name string) {
+ m.runningJobs.lock()
+ defer m.runningJobs.unlock()
+
+ if job, ok := m.runningJobs.lookup(name); ok {
+ job.Stop()
+ m.runningJobs.remove(name)
+ }
+}
+
+func (m *Manager) cleanup() {
+ m.FnReg.Unregister("config")
+
+ m.runningJobs.lock()
+ defer m.runningJobs.unlock()
+
+ m.runningJobs.forEach(func(key string, job *module.Job) {
+ job.Stop()
+ })
+}
+
+func (m *Manager) createCollectorJob(cfg confgroup.Config) (*module.Job, error) {
+ creator, ok := m.Modules[cfg.Module()]
+ if !ok {
+ return nil, fmt.Errorf("can not find %s module", cfg.Module())
+ }
+
+ var vnode struct {
+ guid string
+ hostname string
+ labels map[string]string
+ }
+
+ if cfg.Vnode() != "" {
+ n, ok := m.Vnodes.Lookup(cfg.Vnode())
+ if !ok {
+ return nil, fmt.Errorf("vnode '%s' is not found", cfg.Vnode())
+ }
+
+ vnode.guid = n.GUID
+ vnode.hostname = n.Hostname
+ vnode.labels = n.Labels
+ }
+
+ m.Debugf("creating %s[%s] job, config: %v", cfg.Module(), cfg.Name(), cfg)
+
+ mod := creator.Create()
+
+ if err := applyConfig(cfg, mod); err != nil {
+ return nil, err
+ }
+
+ jobCfg := module.JobConfig{
+ PluginName: m.PluginName,
+ Name: cfg.Name(),
+ ModuleName: cfg.Module(),
+ FullName: cfg.FullName(),
+ UpdateEvery: cfg.UpdateEvery(),
+ AutoDetectEvery: cfg.AutoDetectionRetry(),
+ Priority: cfg.Priority(),
+ Labels: makeLabels(cfg),
+ IsStock: cfg.SourceType() == "stock",
+ Module: mod,
+ Out: m.Out,
+ VnodeGUID: vnode.guid,
+ VnodeHostname: vnode.hostname,
+ VnodeLabels: vnode.labels,
+ }
+
+ job := module.NewJob(jobCfg)
+
+ return job, nil
+}
+
+func runRetryTask(ctx context.Context, out chan<- confgroup.Config, cfg confgroup.Config) {
+ t := time.NewTimer(time.Second * time.Duration(cfg.AutoDetectionRetry()))
+ defer t.Stop()
+
+ select {
+ case <-ctx.Done():
+ case <-t.C:
+ sendConfigs(ctx, out, cfg)
+ }
+}
+
+func sendConfigs(ctx context.Context, out chan<- confgroup.Config, cfgs ...confgroup.Config) {
+ for _, cfg := range cfgs {
+ select {
+ case <-ctx.Done():
+ return
+ case out <- cfg:
+ }
+ }
+}
+
+func isStock(cfg confgroup.Config) bool {
+ return cfg.SourceType() == confgroup.TypeStock
+}
+
+func isDyncfg(cfg confgroup.Config) bool {
+ return cfg.SourceType() == confgroup.TypeDyncfg
+}
+
+func applyConfig(cfg confgroup.Config, module any) error {
+ bs, err := yaml.Marshal(cfg)
+ if err != nil {
+ return err
+ }
+ return yaml.Unmarshal(bs, module)
+}
+
+func makeLabels(cfg confgroup.Config) map[string]string {
+ labels := make(map[string]string)
+ for name, value := range cfg.Labels() {
+ n, ok1 := name.(string)
+ v, ok2 := value.(string)
+ if ok1 && ok2 {
+ labels[n] = v
+ }
+ }
+ return labels
+}