diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/plugin/go.d/agent/jobmgr/manager.go | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/jobmgr/manager.go b/src/go/plugin/go.d/agent/jobmgr/manager.go new file mode 100644 index 000000000..59947be77 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/manager.go @@ -0,0 +1,370 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "context" + "fmt" + "io" + "log/slog" + "os" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/ticker" + + "github.com/mattn/go-isatty" + "gopkg.in/yaml.v2" +) + +var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd()) + +func New() *Manager { + mgr := &Manager{ + Logger: logger.New().With( + slog.String("component", "job manager"), + ), + Out: io.Discard, + FileLock: noop{}, + FileStatus: noop{}, + FileStatusStore: noop{}, + Vnodes: noop{}, + FnReg: noop{}, + + discoveredConfigs: newDiscoveredConfigsCache(), + seenConfigs: newSeenConfigCache(), + exposedConfigs: newExposedConfigCache(), + runningJobs: newRunningJobsCache(), + retryingTasks: newRetryingTasksCache(), + + started: make(chan struct{}), + api: netdataapi.New(safewriter.Stdout), + addCh: make(chan confgroup.Config), + rmCh: make(chan confgroup.Config), + dyncfgCh: make(chan functions.Function), + } + + return mgr +} + +type Manager struct { + *logger.Logger + + PluginName string + Out io.Writer + Modules module.Registry + ConfigDefaults confgroup.Registry + + FileLock FileLocker + FileStatus FileStatus + FileStatusStore FileStatusStore + Vnodes Vnodes + FnReg FunctionRegistry + + discoveredConfigs *discoveredConfigs + seenConfigs *seenConfigs + exposedConfigs *exposedConfigs + retryingTasks *retryingTasks + runningJobs *runningJobs + + ctx context.Context + started chan struct{} + api dyncfgAPI + addCh chan confgroup.Config + rmCh chan confgroup.Config + dyncfgCh chan functions.Function + + waitCfgOnOff string // block processing of discovered configs until "enable"/"disable" is received from Netdata +} + +func (m *Manager) Run(ctx context.Context, in chan []*confgroup.Group) { + m.Info("instance is started") + defer func() { m.cleanup(); m.Info("instance is stopped") }() + m.ctx = ctx + + m.FnReg.Register("config", m.dyncfgConfig) + + for name := range m.Modules { + m.dyncfgModuleCreate(name) + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); m.runProcessConfGroups(in) }() + + wg.Add(1) + go func() { defer wg.Done(); m.run() }() + + wg.Add(1) + go func() { defer wg.Done(); m.runNotifyRunningJobs() }() + + close(m.started) + + wg.Wait() + <-m.ctx.Done() +} + +func (m *Manager) runProcessConfGroups(in chan []*confgroup.Group) { + for { + select { + case <-m.ctx.Done(): + return + case groups := <-in: + for _, gr := range groups { + a, r := m.discoveredConfigs.add(gr) + m.Debugf("received configs: %d/+%d/-%d ('%s')", len(gr.Configs), len(a), len(r), gr.Source) + sendConfigs(m.ctx, m.rmCh, r...) + sendConfigs(m.ctx, m.addCh, a...) + } + } + } +} + +func (m *Manager) run() { + for { + if m.waitCfgOnOff != "" { + select { + case <-m.ctx.Done(): + return + case fn := <-m.dyncfgCh: + m.dyncfgConfigExec(fn) + } + } else { + select { + case <-m.ctx.Done(): + return + case cfg := <-m.addCh: + m.addConfig(cfg) + case cfg := <-m.rmCh: + m.removeConfig(cfg) + case fn := <-m.dyncfgCh: + m.dyncfgConfigExec(fn) + } + } + } +} + +func (m *Manager) addConfig(cfg confgroup.Config) { + if _, ok := m.Modules.Lookup(cfg.Module()); !ok { + return + } + + m.retryingTasks.remove(cfg) + + scfg, ok := m.seenConfigs.lookup(cfg) + if !ok { + scfg = &seenConfig{cfg: cfg} + m.seenConfigs.add(scfg) + } + + ecfg, ok := m.exposedConfigs.lookup(cfg) + if !ok { + scfg.status = dyncfgAccepted + ecfg = scfg + m.exposedConfigs.add(ecfg) + } else { + sp, ep := scfg.cfg.SourceTypePriority(), ecfg.cfg.SourceTypePriority() + if ep > sp || (ep == sp && ecfg.status == dyncfgRunning) { + m.retryingTasks.remove(cfg) + return + } + if ecfg.status == dyncfgRunning { + m.stopRunningJob(ecfg.cfg.FullName()) + m.FileLock.Unlock(ecfg.cfg.FullName()) + m.FileStatus.Remove(ecfg.cfg) + } + scfg.status = dyncfgAccepted + m.exposedConfigs.add(scfg) // replace existing exposed + ecfg = scfg + } + + m.dyncfgJobCreate(ecfg.cfg, ecfg.status) + + if isTerminal || m.PluginName == "nodyncfg" { // FIXME: quick fix of TestAgent_Run (agent_test.go) + m.dyncfgConfigEnable(functions.Function{Args: []string{dyncfgJobID(ecfg.cfg), "enable"}}) + } else { + m.waitCfgOnOff = ecfg.cfg.FullName() + } +} + +func (m *Manager) removeConfig(cfg confgroup.Config) { + m.retryingTasks.remove(cfg) + + scfg, ok := m.seenConfigs.lookup(cfg) + if !ok { + return + } + m.seenConfigs.remove(cfg) + + ecfg, ok := m.exposedConfigs.lookup(cfg) + if !ok || scfg.cfg.UID() != ecfg.cfg.UID() { + return + } + + m.exposedConfigs.remove(cfg) + m.stopRunningJob(cfg.FullName()) + m.FileLock.Unlock(cfg.FullName()) + m.FileStatus.Remove(cfg) + + if !isStock(cfg) || ecfg.status == dyncfgRunning { + m.dyncfgJobRemove(cfg) + } +} + +func (m *Manager) runNotifyRunningJobs() { + tk := ticker.New(time.Second) + defer tk.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case clock := <-tk.C: + m.runningJobs.lock() + m.runningJobs.forEach(func(_ string, job *module.Job) { job.Tick(clock) }) + m.runningJobs.unlock() + } + } +} + +func (m *Manager) startRunningJob(job *module.Job) { + m.runningJobs.lock() + defer m.runningJobs.unlock() + + if job, ok := m.runningJobs.lookup(job.FullName()); ok { + job.Stop() + } + + go job.Start() + m.runningJobs.add(job.FullName(), job) +} + +func (m *Manager) stopRunningJob(name string) { + m.runningJobs.lock() + defer m.runningJobs.unlock() + + if job, ok := m.runningJobs.lookup(name); ok { + job.Stop() + m.runningJobs.remove(name) + } +} + +func (m *Manager) cleanup() { + m.FnReg.Unregister("config") + + m.runningJobs.lock() + defer m.runningJobs.unlock() + + m.runningJobs.forEach(func(key string, job *module.Job) { + job.Stop() + }) +} + +func (m *Manager) createCollectorJob(cfg confgroup.Config) (*module.Job, error) { + creator, ok := m.Modules[cfg.Module()] + if !ok { + return nil, fmt.Errorf("can not find %s module", cfg.Module()) + } + + var vnode struct { + guid string + hostname string + labels map[string]string + } + + if cfg.Vnode() != "" { + n, ok := m.Vnodes.Lookup(cfg.Vnode()) + if !ok { + return nil, fmt.Errorf("vnode '%s' is not found", cfg.Vnode()) + } + + vnode.guid = n.GUID + vnode.hostname = n.Hostname + vnode.labels = n.Labels + } + + m.Debugf("creating %s[%s] job, config: %v", cfg.Module(), cfg.Name(), cfg) + + mod := creator.Create() + + if err := applyConfig(cfg, mod); err != nil { + return nil, err + } + + jobCfg := module.JobConfig{ + PluginName: m.PluginName, + Name: cfg.Name(), + ModuleName: cfg.Module(), + FullName: cfg.FullName(), + UpdateEvery: cfg.UpdateEvery(), + AutoDetectEvery: cfg.AutoDetectionRetry(), + Priority: cfg.Priority(), + Labels: makeLabels(cfg), + IsStock: cfg.SourceType() == "stock", + Module: mod, + Out: m.Out, + VnodeGUID: vnode.guid, + VnodeHostname: vnode.hostname, + VnodeLabels: vnode.labels, + } + + job := module.NewJob(jobCfg) + + return job, nil +} + +func runRetryTask(ctx context.Context, out chan<- confgroup.Config, cfg confgroup.Config) { + t := time.NewTimer(time.Second * time.Duration(cfg.AutoDetectionRetry())) + defer t.Stop() + + select { + case <-ctx.Done(): + case <-t.C: + sendConfigs(ctx, out, cfg) + } +} + +func sendConfigs(ctx context.Context, out chan<- confgroup.Config, cfgs ...confgroup.Config) { + for _, cfg := range cfgs { + select { + case <-ctx.Done(): + return + case out <- cfg: + } + } +} + +func isStock(cfg confgroup.Config) bool { + return cfg.SourceType() == confgroup.TypeStock +} + +func isDyncfg(cfg confgroup.Config) bool { + return cfg.SourceType() == confgroup.TypeDyncfg +} + +func applyConfig(cfg confgroup.Config, module any) error { + bs, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return yaml.Unmarshal(bs, module) +} + +func makeLabels(cfg confgroup.Config) map[string]string { + labels := make(map[string]string) + for name, value := range cfg.Labels() { + n, ok1 := name.(string) + v, ok2 := value.(string) + if ok1 && ok2 { + labels[n] = v + } + } + return labels +} |