summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/agent/jobmgr
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 08:15:24 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 08:15:35 +0000
commitf09848204fa5283d21ea43e262ee41aa578e1808 (patch)
treec62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/agent/jobmgr
parentReleasing debian version 1.46.3-2. (diff)
downloadnetdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz
netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/agent/jobmgr')
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/cache.go181
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/di.go39
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/dyncfg.go852
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/manager.go370
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/manager_test.go1892
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/noop.go21
-rw-r--r--src/go/plugin/go.d/agent/jobmgr/sim_test.go152
7 files changed, 3507 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/agent/jobmgr/cache.go b/src/go/plugin/go.d/agent/jobmgr/cache.go
new file mode 100644
index 000000000..8ea16ce96
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/cache.go
@@ -0,0 +1,181 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "context"
+ "sync"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
+)
+
+func newDiscoveredConfigsCache() *discoveredConfigs {
+ return &discoveredConfigs{
+ items: make(map[string]map[uint64]confgroup.Config),
+ }
+}
+
+func newSeenConfigCache() *seenConfigs {
+ return &seenConfigs{
+ items: make(map[string]*seenConfig),
+ }
+}
+
+func newExposedConfigCache() *exposedConfigs {
+ return &exposedConfigs{
+ items: make(map[string]*seenConfig),
+ }
+}
+
+func newRunningJobsCache() *runningJobs {
+ return &runningJobs{
+ mux: sync.Mutex{},
+ items: make(map[string]*module.Job),
+ }
+}
+
+func newRetryingTasksCache() *retryingTasks {
+ return &retryingTasks{
+ items: make(map[string]*retryTask),
+ }
+}
+
+type (
+ discoveredConfigs struct {
+ // [Source][Hash]
+ items map[string]map[uint64]confgroup.Config
+ }
+
+ seenConfigs struct {
+ // [cfg.UID()]
+ items map[string]*seenConfig
+ }
+ exposedConfigs struct {
+ // [cfg.FullName()]
+ items map[string]*seenConfig
+ }
+ seenConfig struct {
+ cfg confgroup.Config
+ status dyncfgStatus
+ }
+
+ runningJobs struct {
+ mux sync.Mutex
+ // [cfg.FullName()]
+ items map[string]*module.Job
+ }
+
+ retryingTasks struct {
+ // [cfg.UID()]
+ items map[string]*retryTask
+ }
+ retryTask struct {
+ cancel context.CancelFunc
+ }
+)
+
+func (c *discoveredConfigs) add(group *confgroup.Group) (added, removed []confgroup.Config) {
+ cfgs, ok := c.items[group.Source]
+ if !ok {
+ if len(group.Configs) == 0 {
+ return nil, nil
+ }
+ cfgs = make(map[uint64]confgroup.Config)
+ c.items[group.Source] = cfgs
+ }
+
+ seen := make(map[uint64]bool)
+
+ for _, cfg := range group.Configs {
+ hash := cfg.Hash()
+ seen[hash] = true
+
+ if _, ok := cfgs[hash]; ok {
+ continue
+ }
+
+ cfgs[hash] = cfg
+ added = append(added, cfg)
+ }
+
+ for hash, cfg := range cfgs {
+ if !seen[hash] {
+ delete(cfgs, hash)
+ removed = append(removed, cfg)
+ }
+ }
+
+ if len(cfgs) == 0 {
+ delete(c.items, group.Source)
+ }
+
+ return added, removed
+}
+
+func (c *seenConfigs) add(sj *seenConfig) {
+ c.items[sj.cfg.UID()] = sj
+}
+func (c *seenConfigs) remove(cfg confgroup.Config) {
+ delete(c.items, cfg.UID())
+}
+func (c *seenConfigs) lookup(cfg confgroup.Config) (*seenConfig, bool) {
+ v, ok := c.items[cfg.UID()]
+ return v, ok
+}
+
+func (c *exposedConfigs) add(sj *seenConfig) {
+ c.items[sj.cfg.FullName()] = sj
+}
+func (c *exposedConfigs) remove(cfg confgroup.Config) {
+ delete(c.items, cfg.FullName())
+}
+func (c *exposedConfigs) lookup(cfg confgroup.Config) (*seenConfig, bool) {
+ v, ok := c.items[cfg.FullName()]
+ return v, ok
+}
+
+func (c *exposedConfigs) lookupByName(module, job string) (*seenConfig, bool) {
+ key := module + "_" + job
+ if module == job {
+ key = job
+ }
+ v, ok := c.items[key]
+ return v, ok
+}
+
+func (c *runningJobs) lock() {
+ c.mux.Lock()
+}
+func (c *runningJobs) unlock() {
+ c.mux.Unlock()
+}
+func (c *runningJobs) add(fullName string, job *module.Job) {
+ c.items[fullName] = job
+}
+func (c *runningJobs) remove(fullName string) {
+ delete(c.items, fullName)
+}
+func (c *runningJobs) lookup(fullName string) (*module.Job, bool) {
+ j, ok := c.items[fullName]
+ return j, ok
+}
+func (c *runningJobs) forEach(fn func(fullName string, job *module.Job)) {
+ for k, j := range c.items {
+ fn(k, j)
+ }
+}
+
+func (c *retryingTasks) add(cfg confgroup.Config, retry *retryTask) {
+ c.items[cfg.UID()] = retry
+}
+func (c *retryingTasks) remove(cfg confgroup.Config) {
+ if v, ok := c.lookup(cfg); ok {
+ v.cancel()
+ }
+ delete(c.items, cfg.UID())
+}
+func (c *retryingTasks) lookup(cfg confgroup.Config) (*retryTask, bool) {
+ v, ok := c.items[cfg.UID()]
+ return v, ok
+}
diff --git a/src/go/plugin/go.d/agent/jobmgr/di.go b/src/go/plugin/go.d/agent/jobmgr/di.go
new file mode 100644
index 000000000..466fcdf90
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/di.go
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/vnodes"
+)
+
+type FileLocker interface {
+ Lock(name string) (bool, error)
+ Unlock(name string)
+}
+
+type FileStatus interface {
+ Save(cfg confgroup.Config, state string)
+ Remove(cfg confgroup.Config)
+}
+
+type FileStatusStore interface {
+ Contains(cfg confgroup.Config, states ...string) bool
+}
+
+type Vnodes interface {
+ Lookup(key string) (*vnodes.VirtualNode, bool)
+}
+
+type FunctionRegistry interface {
+ Register(name string, reg func(functions.Function))
+ Unregister(name string)
+}
+
+type dyncfgAPI interface {
+ CONFIGCREATE(id, status, configType, path, sourceType, source, supportedCommands string)
+ CONFIGDELETE(id string)
+ CONFIGSTATUS(id, status string)
+ FUNCRESULT(uid, contentType, payload, code, expireTimestamp string)
+}
diff --git a/src/go/plugin/go.d/agent/jobmgr/dyncfg.go b/src/go/plugin/go.d/agent/jobmgr/dyncfg.go
new file mode 100644
index 000000000..da6d67489
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/dyncfg.go
@@ -0,0 +1,852 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "log/slog"
+ "reflect"
+ "strconv"
+ "strings"
+ "time"
+ "unicode"
+
+ "github.com/netdata/netdata/go/plugins/logger"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions"
+
+ "gopkg.in/yaml.v2"
+)
+
+type dyncfgStatus int
+
+const (
+ _ dyncfgStatus = iota
+ dyncfgAccepted
+ dyncfgRunning
+ dyncfgFailed
+ dyncfgIncomplete
+ dyncfgDisabled
+)
+
+func (s dyncfgStatus) String() string {
+ switch s {
+ case dyncfgAccepted:
+ return "accepted"
+ case dyncfgRunning:
+ return "running"
+ case dyncfgFailed:
+ return "failed"
+ case dyncfgIncomplete:
+ return "incomplete"
+ case dyncfgDisabled:
+ return "disabled"
+ default:
+ return "unknown"
+ }
+}
+
+const (
+ dyncfgIDPrefix = "go.d:collector:"
+ dyncfgPath = "/collectors/jobs"
+)
+
+func dyncfgModID(name string) string {
+ return fmt.Sprintf("%s%s", dyncfgIDPrefix, name)
+}
+func dyncfgJobID(cfg confgroup.Config) string {
+ return fmt.Sprintf("%s%s:%s", dyncfgIDPrefix, cfg.Module(), cfg.Name())
+}
+
+func dyncfgModCmds() string {
+ return "add schema enable disable test userconfig"
+}
+func dyncfgJobCmds(cfg confgroup.Config) string {
+ cmds := "schema get enable disable update restart test userconfig"
+ if isDyncfg(cfg) {
+ cmds += " remove"
+ }
+ return cmds
+}
+
+func (m *Manager) dyncfgModuleCreate(name string) {
+ id := dyncfgModID(name)
+ path := dyncfgPath
+ cmds := dyncfgModCmds()
+ typ := "template"
+ src := "internal"
+ m.api.CONFIGCREATE(id, dyncfgAccepted.String(), typ, path, src, src, cmds)
+}
+
+func (m *Manager) dyncfgJobCreate(cfg confgroup.Config, status dyncfgStatus) {
+ id := dyncfgJobID(cfg)
+ path := dyncfgPath
+ cmds := dyncfgJobCmds(cfg)
+ typ := "job"
+ m.api.CONFIGCREATE(id, status.String(), typ, path, cfg.SourceType(), cfg.Source(), cmds)
+}
+
+func (m *Manager) dyncfgJobRemove(cfg confgroup.Config) {
+ m.api.CONFIGDELETE(dyncfgJobID(cfg))
+}
+
+func (m *Manager) dyncfgJobStatus(cfg confgroup.Config, status dyncfgStatus) {
+ m.api.CONFIGSTATUS(dyncfgJobID(cfg), status.String())
+}
+
+func (m *Manager) dyncfgConfig(fn functions.Function) {
+ if len(fn.Args) < 2 {
+ m.Warningf("dyncfg: %s: missing required arguments, want 3 got %d", fn.Name, len(fn.Args))
+ m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 2, but got %d.", len(fn.Args))
+ return
+ }
+
+ select {
+ case <-m.ctx.Done():
+ m.dyncfgRespf(fn, 503, "Job manager is shutting down.")
+ default:
+ }
+
+ //m.Infof("QQ FN: '%s'", fn)
+
+ action := strings.ToLower(fn.Args[1])
+
+ switch action {
+ case "userconfig":
+ m.dyncfgConfigUserconfig(fn)
+ return
+ case "test":
+ m.dyncfgConfigTest(fn)
+ return
+ case "schema":
+ m.dyncfgConfigSchema(fn)
+ return
+ }
+
+ select {
+ case <-m.ctx.Done():
+ m.dyncfgRespf(fn, 503, "Job manager is shutting down.")
+ case m.dyncfgCh <- fn:
+ }
+}
+
+func (m *Manager) dyncfgConfigExec(fn functions.Function) {
+ action := strings.ToLower(fn.Args[1])
+
+ switch action {
+ case "test":
+ m.dyncfgConfigTest(fn)
+ case "schema":
+ m.dyncfgConfigSchema(fn)
+ case "get":
+ m.dyncfgConfigGet(fn)
+ case "restart":
+ m.dyncfgConfigRestart(fn)
+ case "enable":
+ m.dyncfgConfigEnable(fn)
+ case "disable":
+ m.dyncfgConfigDisable(fn)
+ case "add":
+ m.dyncfgConfigAdd(fn)
+ case "remove":
+ m.dyncfgConfigRemove(fn)
+ case "update":
+ m.dyncfgConfigUpdate(fn)
+ default:
+ m.Warningf("dyncfg: function '%s' not implemented", fn.String())
+ m.dyncfgRespf(fn, 501, "Function '%s' is not implemented.", fn.Name)
+ }
+}
+
+func (m *Manager) dyncfgConfigUserconfig(fn functions.Function) {
+ id := fn.Args[0]
+ jn := "test"
+ if len(fn.Args) > 2 {
+ jn = fn.Args[2]
+ }
+
+ mn, ok := extractModuleName(id)
+ if !ok {
+ m.Warningf("dyncfg: userconfig: could not extract module and job from id (%s)", id)
+ m.dyncfgRespf(fn, 400,
+ "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ creator, ok := m.Modules.Lookup(mn)
+ if !ok {
+ m.Warningf("dyncfg: userconfig: module %s not found", mn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
+ return
+ }
+
+ if creator.Config == nil || creator.Config() == nil {
+ m.Warningf("dyncfg: userconfig: module %s: configuration not found", mn)
+ m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn)
+ return
+ }
+
+ bs, err := userConfigFromPayload(creator.Config(), jn, fn)
+ if err != nil {
+ m.Warningf("dyncfg: userconfig: module %s: failed to create config from payload: %v", mn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
+ }
+
+ m.dyncfgRespPayloadYAML(fn, string(bs))
+}
+
+func (m *Manager) dyncfgConfigTest(fn functions.Function) {
+ id := fn.Args[0]
+ mn, ok := extractModuleName(id)
+ if !ok {
+ m.Warningf("dyncfg: test: could not extract module and job from id (%s)", id)
+ m.dyncfgRespf(fn, 400,
+ "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ jn := "test"
+ if len(fn.Args) > 2 {
+ jn = fn.Args[2]
+ }
+
+ if err := validateJobName(jn); err != nil {
+ m.Warningf("dyncfg: test: module %s: unacceptable job name '%s': %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Unacceptable job name '%s': %v.", jn, err)
+ return
+ }
+
+ creator, ok := m.Modules.Lookup(mn)
+ if !ok {
+ m.Warningf("dyncfg: test: module %s not found", mn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
+ return
+ }
+
+ cfg, err := configFromPayload(fn)
+ if err != nil {
+ m.Warningf("dyncfg: test: module %s: failed to create config from payload: %v", mn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
+ return
+ }
+
+ if cfg.Vnode() != "" {
+ if _, ok := m.Vnodes.Lookup(cfg.Vnode()); !ok {
+ m.Warningf("dyncfg: test: module %s: vnode %s not found", mn, cfg.Vnode())
+ m.dyncfgRespf(fn, 400, "The specified vnode '%s' is not registered.", cfg.Vnode())
+ return
+ }
+ }
+
+ cfg.SetModule(mn)
+ cfg.SetName(jn)
+
+ job := creator.Create()
+
+ if err := applyConfig(cfg, job); err != nil {
+ m.Warningf("dyncfg: test: module %s: failed to apply config: %v", mn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
+ return
+ }
+
+ job.GetBase().Logger = logger.New().With(
+ slog.String("collector", cfg.Module()),
+ slog.String("job", cfg.Name()),
+ )
+
+ defer job.Cleanup()
+
+ if err := job.Init(); err != nil {
+ m.dyncfgRespf(fn, 422, "Job initialization failed: %v", err)
+ return
+ }
+ if err := job.Check(); err != nil {
+ m.dyncfgRespf(fn, 422, "Job check failed: %v", err)
+ return
+ }
+
+ m.dyncfgRespf(fn, 200, "")
+}
+
+func (m *Manager) dyncfgConfigSchema(fn functions.Function) {
+ id := fn.Args[0]
+ mn, ok := extractModuleName(id)
+ if !ok {
+ m.Warningf("dyncfg: schema: could not extract module from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ mod, ok := m.Modules.Lookup(mn)
+ if !ok {
+ m.Warningf("dyncfg: schema: module %s not found", mn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
+ return
+ }
+
+ if mod.JobConfigSchema == "" {
+ m.Warningf("dyncfg: schema: module %s: schema not found", mn)
+ m.dyncfgRespf(fn, 500, "Module %s configuration schema not found.", mn)
+ return
+ }
+
+ m.dyncfgRespPayloadJSON(fn, mod.JobConfigSchema)
+}
+
+func (m *Manager) dyncfgConfigGet(fn functions.Function) {
+ id := fn.Args[0]
+ mn, jn, ok := extractModuleJobName(id)
+ if !ok {
+ m.Warningf("dyncfg: get: could not extract module and job from id (%s)", id)
+ m.dyncfgRespf(fn, 400,
+ "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ creator, ok := m.Modules.Lookup(mn)
+ if !ok {
+ m.Warningf("dyncfg: get: module %s not found", mn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn)
+ return
+ }
+
+ ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
+ if !ok {
+ m.Warningf("dyncfg: get: module %s job %s not found", mn, jn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
+ return
+ }
+
+ mod := creator.Create()
+
+ if err := applyConfig(ecfg.cfg, mod); err != nil {
+ m.Warningf("dyncfg: get: module %s job %s failed to apply config: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
+ return
+ }
+
+ conf := mod.Configuration()
+ if conf == nil {
+ m.Warningf("dyncfg: get: module %s: configuration not found", mn)
+ m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn)
+ return
+ }
+
+ bs, err := json.Marshal(conf)
+ if err != nil {
+ m.Warningf("dyncfg: get: module %s job %s failed to json marshal config: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 500, "Failed to convert configuration into JSON: %v.", err)
+ return
+ }
+
+ m.dyncfgRespPayloadJSON(fn, string(bs))
+}
+
+func (m *Manager) dyncfgConfigRestart(fn functions.Function) {
+ id := fn.Args[0]
+ mn, jn, ok := extractModuleJobName(id)
+ if !ok {
+ m.Warningf("dyncfg: restart: could not extract module from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
+ if !ok {
+ m.Warningf("dyncfg: restart: module %s job %s not found", mn, jn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
+ return
+ }
+
+ job, err := m.createCollectorJob(ecfg.cfg)
+ if err != nil {
+ m.Warningf("dyncfg: restart: module %s job %s: failed to apply config: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ switch ecfg.status {
+ case dyncfgAccepted, dyncfgDisabled:
+ m.Warningf("dyncfg: restart: module %s job %s: restarting not allowed in '%s' state", mn, jn, ecfg.status)
+ m.dyncfgRespf(fn, 405, "Restarting data collection job is not allowed in '%s' state.", ecfg.status)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ case dyncfgRunning:
+ m.FileStatus.Remove(ecfg.cfg)
+ m.FileLock.Unlock(ecfg.cfg.FullName())
+ m.stopRunningJob(ecfg.cfg.FullName())
+ default:
+ }
+
+ if err := job.AutoDetection(); err != nil {
+ job.Cleanup()
+ ecfg.status = dyncfgFailed
+ m.dyncfgRespf(fn, 422, "Job restart failed: %v", err)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil {
+ job.Cleanup()
+ ecfg.status = dyncfgFailed
+ m.dyncfgRespf(fn, 500, "Job restart failed: cannot filelock.")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ ecfg.status = dyncfgRunning
+
+ if isDyncfg(ecfg.cfg) {
+ m.FileStatus.Save(ecfg.cfg, ecfg.status.String())
+ }
+ m.startRunningJob(job)
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+}
+
+func (m *Manager) dyncfgConfigEnable(fn functions.Function) {
+ id := fn.Args[0]
+ mn, jn, ok := extractModuleJobName(id)
+ if !ok {
+ m.Warningf("dyncfg: enable: could not extract module and job from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
+ if !ok {
+ m.Warningf("dyncfg: enable: module %s job %s not found", mn, jn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
+ return
+ }
+
+ if ecfg.cfg.FullName() == m.waitCfgOnOff {
+ m.waitCfgOnOff = ""
+ }
+
+ switch ecfg.status {
+ case dyncfgAccepted, dyncfgDisabled, dyncfgFailed:
+ case dyncfgRunning:
+ // non-dyncfg update triggers enable/disable
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ default:
+ m.Warningf("dyncfg: enable: module %s job %s: enabling not allowed in %s state", mn, jn, ecfg.status)
+ m.dyncfgRespf(fn, 405, "Enabling data collection job is not allowed in '%s' state.", ecfg.status)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ job, err := m.createCollectorJob(ecfg.cfg)
+ if err != nil {
+ ecfg.status = dyncfgFailed
+ m.Warningf("dyncfg: enable: module %s job %s: failed to apply config: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ if err := job.AutoDetection(); err != nil {
+ job.Cleanup()
+ ecfg.status = dyncfgFailed
+ m.dyncfgRespf(fn, 200, "Job enable failed: %v.", err)
+
+ if isStock(ecfg.cfg) {
+ m.exposedConfigs.remove(ecfg.cfg)
+ m.dyncfgJobRemove(ecfg.cfg)
+ } else {
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ }
+
+ if job.RetryAutoDetection() && !isDyncfg(ecfg.cfg) {
+ m.Infof("%s[%s] job detection failed, will retry in %d seconds",
+ ecfg.cfg.Module(), ecfg.cfg.Name(), job.AutoDetectionEvery())
+
+ ctx, cancel := context.WithCancel(m.ctx)
+ m.retryingTasks.add(ecfg.cfg, &retryTask{cancel: cancel})
+ go runRetryTask(ctx, m.addCh, ecfg.cfg)
+ }
+ return
+ }
+
+ if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil {
+ job.Cleanup()
+ ecfg.status = dyncfgFailed
+ m.dyncfgRespf(fn, 500, "Job enable failed: can not filelock.")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ ecfg.status = dyncfgRunning
+
+ if isDyncfg(ecfg.cfg) {
+ m.FileStatus.Save(ecfg.cfg, ecfg.status.String())
+ }
+
+ m.startRunningJob(job)
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+
+}
+
+func (m *Manager) dyncfgConfigDisable(fn functions.Function) {
+ id := fn.Args[0]
+ mn, jn, ok := extractModuleJobName(id)
+ if !ok {
+ m.Warningf("dyncfg: disable: could not extract module from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
+ if !ok {
+ m.Warningf("dyncfg: disable: module %s job %s not found", mn, jn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
+ return
+ }
+
+ if ecfg.cfg.FullName() == m.waitCfgOnOff {
+ m.waitCfgOnOff = ""
+ }
+
+ switch ecfg.status {
+ case dyncfgDisabled:
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ case dyncfgRunning:
+ m.stopRunningJob(ecfg.cfg.FullName())
+ if isDyncfg(ecfg.cfg) {
+ m.FileStatus.Remove(ecfg.cfg)
+ }
+ m.FileLock.Unlock(ecfg.cfg.FullName())
+ default:
+ }
+
+ ecfg.status = dyncfgDisabled
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+}
+
+func (m *Manager) dyncfgConfigAdd(fn functions.Function) {
+ if len(fn.Args) < 3 {
+ m.Warningf("dyncfg: add: missing required arguments, want 3 got %d", len(fn.Args))
+ m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 3, but got %d.", len(fn.Args))
+ return
+ }
+
+ id := fn.Args[0]
+ jn := fn.Args[2]
+ mn, ok := extractModuleName(id)
+ if !ok {
+ m.Warningf("dyncfg: add: could not extract module from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ if len(fn.Payload) == 0 {
+ m.Warningf("dyncfg: add: module %s job %s missing configuration payload.", mn, jn)
+ m.dyncfgRespf(fn, 400, "Missing configuration payload.")
+ return
+ }
+
+ if err := validateJobName(jn); err != nil {
+ m.Warningf("dyncfg: add: module %s: unacceptable job name '%s': %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Unacceptable job name '%s': %v.", jn, err)
+ return
+ }
+
+ cfg, err := configFromPayload(fn)
+ if err != nil {
+ m.Warningf("dyncfg: add: module %s job %s: failed to create config from payload: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
+ return
+ }
+
+ m.dyncfgSetConfigMeta(cfg, mn, jn)
+
+ if _, err := m.createCollectorJob(cfg); err != nil {
+ m.Warningf("dyncfg: add: module %s job %s: failed to apply config: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
+ return
+ }
+
+ if ecfg, ok := m.exposedConfigs.lookup(cfg); ok {
+ if scfg, ok := m.seenConfigs.lookup(ecfg.cfg); ok && isDyncfg(scfg.cfg) {
+ m.seenConfigs.remove(ecfg.cfg)
+ }
+ m.exposedConfigs.remove(ecfg.cfg)
+ m.stopRunningJob(ecfg.cfg.FullName())
+ }
+
+ scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted}
+ ecfg := scfg
+ m.seenConfigs.add(scfg)
+ m.exposedConfigs.add(ecfg)
+
+ m.dyncfgRespf(fn, 202, "")
+ m.dyncfgJobCreate(ecfg.cfg, ecfg.status)
+}
+
+func (m *Manager) dyncfgConfigRemove(fn functions.Function) {
+ id := fn.Args[0]
+ mn, jn, ok := extractModuleJobName(id)
+ if !ok {
+ m.Warningf("dyncfg: remove: could not extract module and job from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
+ if !ok {
+ m.Warningf("dyncfg: remove: module %s job %s not found", mn, jn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
+ return
+ }
+
+ if !isDyncfg(ecfg.cfg) {
+ m.Warningf("dyncfg: remove: module %s job %s: can not remove jobs of type %s", mn, jn, ecfg.cfg.SourceType())
+ m.dyncfgRespf(fn, 405, "Removing jobs of type '%s' is not supported. Only 'dyncfg' jobs can be removed.", ecfg.cfg.SourceType())
+ return
+ }
+
+ m.seenConfigs.remove(ecfg.cfg)
+ m.exposedConfigs.remove(ecfg.cfg)
+ m.stopRunningJob(ecfg.cfg.FullName())
+ m.FileLock.Unlock(ecfg.cfg.FullName())
+ m.FileStatus.Remove(ecfg.cfg)
+
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobRemove(ecfg.cfg)
+}
+
+func (m *Manager) dyncfgConfigUpdate(fn functions.Function) {
+ id := fn.Args[0]
+ mn, jn, ok := extractModuleJobName(id)
+ if !ok {
+ m.Warningf("dyncfg: update: could not extract module from id (%s)", id)
+ m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id)
+ return
+ }
+
+ ecfg, ok := m.exposedConfigs.lookupByName(mn, jn)
+ if !ok {
+ m.Warningf("dyncfg: update: module %s job %s not found", mn, jn)
+ m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn)
+ return
+ }
+
+ cfg, err := configFromPayload(fn)
+ if err != nil {
+ m.Warningf("dyncfg: update: module %s: failed to create config from payload: %v", mn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ m.dyncfgSetConfigMeta(cfg, mn, jn)
+
+ if ecfg.status == dyncfgRunning && ecfg.cfg.UID() == cfg.UID() {
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ job, err := m.createCollectorJob(cfg)
+ if err != nil {
+ m.Warningf("dyncfg: update: module %s job %s: failed to apply config: %v", mn, jn, err)
+ m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ if ecfg.status == dyncfgAccepted {
+ m.Warningf("dyncfg: update: module %s job %s: updating not allowed in %s", mn, jn, ecfg.status)
+ m.dyncfgRespf(fn, 403, "Updating data collection job is not allowed in '%s' state.", ecfg.status)
+ m.dyncfgJobStatus(ecfg.cfg, ecfg.status)
+ return
+ }
+
+ m.exposedConfigs.remove(ecfg.cfg)
+ m.stopRunningJob(ecfg.cfg.FullName())
+
+ scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted}
+ m.seenConfigs.add(scfg)
+ m.exposedConfigs.add(scfg)
+
+ if isDyncfg(ecfg.cfg) {
+ m.seenConfigs.remove(ecfg.cfg)
+ } else {
+ // needed to update meta. There is no other way, unfortunately, but to send "create"
+ defer m.dyncfgJobCreate(scfg.cfg, scfg.status)
+ }
+
+ if ecfg.status == dyncfgDisabled {
+ scfg.status = dyncfgDisabled
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(cfg, scfg.status)
+ return
+ }
+
+ if err := job.AutoDetection(); err != nil {
+ job.Cleanup()
+ scfg.status = dyncfgFailed
+ m.dyncfgRespf(fn, 200, "Job update failed: %v", err)
+ m.dyncfgJobStatus(scfg.cfg, scfg.status)
+ return
+ }
+
+ if ok, err := m.FileLock.Lock(scfg.cfg.FullName()); !ok && err == nil {
+ job.Cleanup()
+ scfg.status = dyncfgFailed
+ m.dyncfgRespf(fn, 500, "Job update failed: cannot create file lock.")
+ m.dyncfgJobStatus(scfg.cfg, scfg.status)
+ return
+ }
+
+ scfg.status = dyncfgRunning
+ m.startRunningJob(job)
+ m.dyncfgRespf(fn, 200, "")
+ m.dyncfgJobStatus(scfg.cfg, scfg.status)
+}
+
+func (m *Manager) dyncfgSetConfigMeta(cfg confgroup.Config, module, name string) {
+ cfg.SetProvider("dyncfg")
+ cfg.SetSource(fmt.Sprintf("type=dyncfg,module=%s,job=%s", module, name))
+ cfg.SetSourceType("dyncfg")
+ cfg.SetModule(module)
+ cfg.SetName(name)
+ if def, ok := m.ConfigDefaults.Lookup(module); ok {
+ cfg.ApplyDefaults(def)
+ }
+}
+
+func (m *Manager) dyncfgRespPayloadJSON(fn functions.Function, payload string) {
+ m.dyncfgRespPayload(fn, payload, "application/json")
+}
+
+func (m *Manager) dyncfgRespPayloadYAML(fn functions.Function, payload string) {
+ m.dyncfgRespPayload(fn, payload, "application/yaml")
+}
+
+func (m *Manager) dyncfgRespPayload(fn functions.Function, payload string, contentType string) {
+ ts := strconv.FormatInt(time.Now().Unix(), 10)
+ m.api.FUNCRESULT(fn.UID, contentType, payload, "200", ts)
+}
+
+func (m *Manager) dyncfgRespf(fn functions.Function, code int, msgf string, a ...any) {
+ if fn.UID == "" {
+ return
+ }
+ bs, _ := json.Marshal(struct {
+ Status int `json:"status"`
+ Message string `json:"message"`
+ }{
+ Status: code,
+ Message: fmt.Sprintf(msgf, a...),
+ })
+ ts := strconv.FormatInt(time.Now().Unix(), 10)
+ m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts)
+}
+
+func userConfigFromPayload(cfg any, jobName string, fn functions.Function) ([]byte, error) {
+ if v := reflect.ValueOf(cfg); v.Kind() != reflect.Ptr || v.IsNil() {
+ return nil, fmt.Errorf("invalid config: expected a pointer to a struct, got a %s", v.Type())
+ }
+
+ if fn.ContentType == "application/json" {
+ if err := json.Unmarshal(fn.Payload, cfg); err != nil {
+ return nil, err
+ }
+ } else {
+ if err := yaml.Unmarshal(fn.Payload, cfg); err != nil {
+ return nil, err
+ }
+ }
+
+ bs, err := yaml.Marshal(cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ var yms yaml.MapSlice
+ if err := yaml.Unmarshal(bs, &yms); err != nil {
+ return nil, err
+ }
+
+ yms = append([]yaml.MapItem{{Key: "name", Value: jobName}}, yms...)
+
+ v := map[string]any{
+ "jobs": []any{yms},
+ }
+
+ bs, err = yaml.Marshal(v)
+ if err != nil {
+ return nil, err
+ }
+
+ return bs, nil
+}
+
+func configFromPayload(fn functions.Function) (confgroup.Config, error) {
+ var cfg confgroup.Config
+
+ if fn.ContentType == "application/json" {
+ if err := json.Unmarshal(fn.Payload, &cfg); err != nil {
+ return nil, err
+ }
+
+ return cfg.Clone()
+ }
+
+ if err := yaml.Unmarshal(fn.Payload, &cfg); err != nil {
+ return nil, err
+ }
+
+ return cfg, nil
+}
+
+func extractModuleJobName(id string) (mn string, jn string, ok bool) {
+ if mn, ok = extractModuleName(id); !ok {
+ return "", "", false
+ }
+ if jn, ok = extractJobName(id); !ok {
+ return "", "", false
+ }
+ return mn, jn, true
+}
+
+func extractModuleName(id string) (string, bool) {
+ id = strings.TrimPrefix(id, dyncfgIDPrefix)
+ i := strings.IndexByte(id, ':')
+ if i == -1 {
+ return id, id != ""
+ }
+ return id[:i], true
+}
+
+func extractJobName(id string) (string, bool) {
+ i := strings.LastIndexByte(id, ':')
+ if i == -1 {
+ return "", false
+ }
+ return id[i+1:], true
+}
+
+func validateJobName(jobName string) error {
+ for _, r := range jobName {
+ if unicode.IsSpace(r) {
+ return errors.New("contains spaces")
+ }
+ switch r {
+ case '.', ':':
+ return fmt.Errorf("contains '%c'", r)
+ }
+ }
+ return nil
+}
diff --git a/src/go/plugin/go.d/agent/jobmgr/manager.go b/src/go/plugin/go.d/agent/jobmgr/manager.go
new file mode 100644
index 000000000..59947be77
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/manager.go
@@ -0,0 +1,370 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log/slog"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/logger"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/ticker"
+
+ "github.com/mattn/go-isatty"
+ "gopkg.in/yaml.v2"
+)
+
+var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd())
+
+func New() *Manager {
+ mgr := &Manager{
+ Logger: logger.New().With(
+ slog.String("component", "job manager"),
+ ),
+ Out: io.Discard,
+ FileLock: noop{},
+ FileStatus: noop{},
+ FileStatusStore: noop{},
+ Vnodes: noop{},
+ FnReg: noop{},
+
+ discoveredConfigs: newDiscoveredConfigsCache(),
+ seenConfigs: newSeenConfigCache(),
+ exposedConfigs: newExposedConfigCache(),
+ runningJobs: newRunningJobsCache(),
+ retryingTasks: newRetryingTasksCache(),
+
+ started: make(chan struct{}),
+ api: netdataapi.New(safewriter.Stdout),
+ addCh: make(chan confgroup.Config),
+ rmCh: make(chan confgroup.Config),
+ dyncfgCh: make(chan functions.Function),
+ }
+
+ return mgr
+}
+
+type Manager struct {
+ *logger.Logger
+
+ PluginName string
+ Out io.Writer
+ Modules module.Registry
+ ConfigDefaults confgroup.Registry
+
+ FileLock FileLocker
+ FileStatus FileStatus
+ FileStatusStore FileStatusStore
+ Vnodes Vnodes
+ FnReg FunctionRegistry
+
+ discoveredConfigs *discoveredConfigs
+ seenConfigs *seenConfigs
+ exposedConfigs *exposedConfigs
+ retryingTasks *retryingTasks
+ runningJobs *runningJobs
+
+ ctx context.Context
+ started chan struct{}
+ api dyncfgAPI
+ addCh chan confgroup.Config
+ rmCh chan confgroup.Config
+ dyncfgCh chan functions.Function
+
+ waitCfgOnOff string // block processing of discovered configs until "enable"/"disable" is received from Netdata
+}
+
+func (m *Manager) Run(ctx context.Context, in chan []*confgroup.Group) {
+ m.Info("instance is started")
+ defer func() { m.cleanup(); m.Info("instance is stopped") }()
+ m.ctx = ctx
+
+ m.FnReg.Register("config", m.dyncfgConfig)
+
+ for name := range m.Modules {
+ m.dyncfgModuleCreate(name)
+ }
+
+ var wg sync.WaitGroup
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.runProcessConfGroups(in) }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.run() }()
+
+ wg.Add(1)
+ go func() { defer wg.Done(); m.runNotifyRunningJobs() }()
+
+ close(m.started)
+
+ wg.Wait()
+ <-m.ctx.Done()
+}
+
+func (m *Manager) runProcessConfGroups(in chan []*confgroup.Group) {
+ for {
+ select {
+ case <-m.ctx.Done():
+ return
+ case groups := <-in:
+ for _, gr := range groups {
+ a, r := m.discoveredConfigs.add(gr)
+ m.Debugf("received configs: %d/+%d/-%d ('%s')", len(gr.Configs), len(a), len(r), gr.Source)
+ sendConfigs(m.ctx, m.rmCh, r...)
+ sendConfigs(m.ctx, m.addCh, a...)
+ }
+ }
+ }
+}
+
+func (m *Manager) run() {
+ for {
+ if m.waitCfgOnOff != "" {
+ select {
+ case <-m.ctx.Done():
+ return
+ case fn := <-m.dyncfgCh:
+ m.dyncfgConfigExec(fn)
+ }
+ } else {
+ select {
+ case <-m.ctx.Done():
+ return
+ case cfg := <-m.addCh:
+ m.addConfig(cfg)
+ case cfg := <-m.rmCh:
+ m.removeConfig(cfg)
+ case fn := <-m.dyncfgCh:
+ m.dyncfgConfigExec(fn)
+ }
+ }
+ }
+}
+
+func (m *Manager) addConfig(cfg confgroup.Config) {
+ if _, ok := m.Modules.Lookup(cfg.Module()); !ok {
+ return
+ }
+
+ m.retryingTasks.remove(cfg)
+
+ scfg, ok := m.seenConfigs.lookup(cfg)
+ if !ok {
+ scfg = &seenConfig{cfg: cfg}
+ m.seenConfigs.add(scfg)
+ }
+
+ ecfg, ok := m.exposedConfigs.lookup(cfg)
+ if !ok {
+ scfg.status = dyncfgAccepted
+ ecfg = scfg
+ m.exposedConfigs.add(ecfg)
+ } else {
+ sp, ep := scfg.cfg.SourceTypePriority(), ecfg.cfg.SourceTypePriority()
+ if ep > sp || (ep == sp && ecfg.status == dyncfgRunning) {
+ m.retryingTasks.remove(cfg)
+ return
+ }
+ if ecfg.status == dyncfgRunning {
+ m.stopRunningJob(ecfg.cfg.FullName())
+ m.FileLock.Unlock(ecfg.cfg.FullName())
+ m.FileStatus.Remove(ecfg.cfg)
+ }
+ scfg.status = dyncfgAccepted
+ m.exposedConfigs.add(scfg) // replace existing exposed
+ ecfg = scfg
+ }
+
+ m.dyncfgJobCreate(ecfg.cfg, ecfg.status)
+
+ if isTerminal || m.PluginName == "nodyncfg" { // FIXME: quick fix of TestAgent_Run (agent_test.go)
+ m.dyncfgConfigEnable(functions.Function{Args: []string{dyncfgJobID(ecfg.cfg), "enable"}})
+ } else {
+ m.waitCfgOnOff = ecfg.cfg.FullName()
+ }
+}
+
+func (m *Manager) removeConfig(cfg confgroup.Config) {
+ m.retryingTasks.remove(cfg)
+
+ scfg, ok := m.seenConfigs.lookup(cfg)
+ if !ok {
+ return
+ }
+ m.seenConfigs.remove(cfg)
+
+ ecfg, ok := m.exposedConfigs.lookup(cfg)
+ if !ok || scfg.cfg.UID() != ecfg.cfg.UID() {
+ return
+ }
+
+ m.exposedConfigs.remove(cfg)
+ m.stopRunningJob(cfg.FullName())
+ m.FileLock.Unlock(cfg.FullName())
+ m.FileStatus.Remove(cfg)
+
+ if !isStock(cfg) || ecfg.status == dyncfgRunning {
+ m.dyncfgJobRemove(cfg)
+ }
+}
+
+func (m *Manager) runNotifyRunningJobs() {
+ tk := ticker.New(time.Second)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-m.ctx.Done():
+ return
+ case clock := <-tk.C:
+ m.runningJobs.lock()
+ m.runningJobs.forEach(func(_ string, job *module.Job) { job.Tick(clock) })
+ m.runningJobs.unlock()
+ }
+ }
+}
+
+func (m *Manager) startRunningJob(job *module.Job) {
+ m.runningJobs.lock()
+ defer m.runningJobs.unlock()
+
+ if job, ok := m.runningJobs.lookup(job.FullName()); ok {
+ job.Stop()
+ }
+
+ go job.Start()
+ m.runningJobs.add(job.FullName(), job)
+}
+
+func (m *Manager) stopRunningJob(name string) {
+ m.runningJobs.lock()
+ defer m.runningJobs.unlock()
+
+ if job, ok := m.runningJobs.lookup(name); ok {
+ job.Stop()
+ m.runningJobs.remove(name)
+ }
+}
+
+func (m *Manager) cleanup() {
+ m.FnReg.Unregister("config")
+
+ m.runningJobs.lock()
+ defer m.runningJobs.unlock()
+
+ m.runningJobs.forEach(func(key string, job *module.Job) {
+ job.Stop()
+ })
+}
+
+func (m *Manager) createCollectorJob(cfg confgroup.Config) (*module.Job, error) {
+ creator, ok := m.Modules[cfg.Module()]
+ if !ok {
+ return nil, fmt.Errorf("can not find %s module", cfg.Module())
+ }
+
+ var vnode struct {
+ guid string
+ hostname string
+ labels map[string]string
+ }
+
+ if cfg.Vnode() != "" {
+ n, ok := m.Vnodes.Lookup(cfg.Vnode())
+ if !ok {
+ return nil, fmt.Errorf("vnode '%s' is not found", cfg.Vnode())
+ }
+
+ vnode.guid = n.GUID
+ vnode.hostname = n.Hostname
+ vnode.labels = n.Labels
+ }
+
+ m.Debugf("creating %s[%s] job, config: %v", cfg.Module(), cfg.Name(), cfg)
+
+ mod := creator.Create()
+
+ if err := applyConfig(cfg, mod); err != nil {
+ return nil, err
+ }
+
+ jobCfg := module.JobConfig{
+ PluginName: m.PluginName,
+ Name: cfg.Name(),
+ ModuleName: cfg.Module(),
+ FullName: cfg.FullName(),
+ UpdateEvery: cfg.UpdateEvery(),
+ AutoDetectEvery: cfg.AutoDetectionRetry(),
+ Priority: cfg.Priority(),
+ Labels: makeLabels(cfg),
+ IsStock: cfg.SourceType() == "stock",
+ Module: mod,
+ Out: m.Out,
+ VnodeGUID: vnode.guid,
+ VnodeHostname: vnode.hostname,
+ VnodeLabels: vnode.labels,
+ }
+
+ job := module.NewJob(jobCfg)
+
+ return job, nil
+}
+
+func runRetryTask(ctx context.Context, out chan<- confgroup.Config, cfg confgroup.Config) {
+ t := time.NewTimer(time.Second * time.Duration(cfg.AutoDetectionRetry()))
+ defer t.Stop()
+
+ select {
+ case <-ctx.Done():
+ case <-t.C:
+ sendConfigs(ctx, out, cfg)
+ }
+}
+
+func sendConfigs(ctx context.Context, out chan<- confgroup.Config, cfgs ...confgroup.Config) {
+ for _, cfg := range cfgs {
+ select {
+ case <-ctx.Done():
+ return
+ case out <- cfg:
+ }
+ }
+}
+
+func isStock(cfg confgroup.Config) bool {
+ return cfg.SourceType() == confgroup.TypeStock
+}
+
+func isDyncfg(cfg confgroup.Config) bool {
+ return cfg.SourceType() == confgroup.TypeDyncfg
+}
+
+func applyConfig(cfg confgroup.Config, module any) error {
+ bs, err := yaml.Marshal(cfg)
+ if err != nil {
+ return err
+ }
+ return yaml.Unmarshal(bs, module)
+}
+
+func makeLabels(cfg confgroup.Config) map[string]string {
+ labels := make(map[string]string)
+ for name, value := range cfg.Labels() {
+ n, ok1 := name.(string)
+ v, ok2 := value.(string)
+ if ok1 && ok2 {
+ labels[n] = v
+ }
+ }
+ return labels
+}
diff --git a/src/go/plugin/go.d/agent/jobmgr/manager_test.go b/src/go/plugin/go.d/agent/jobmgr/manager_test.go
new file mode 100644
index 000000000..1b55a8308
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/manager_test.go
@@ -0,0 +1,1892 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions"
+)
+
+func TestManager_Run(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "stock => ok: add and remove": {
+ createSim: func() *runSim {
+ cfg := prepareStockCfg("success", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+
+ sendConfGroup(in, cfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:success:name create accepted job /collectors/jobs stock 'type=stock,module=success,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:name status running
+
+CONFIG go.d:collector:success:name delete
+`,
+ }
+ },
+ },
+ "stock => nok: add": {
+ createSim: func() *runSim {
+ cfg := prepareStockCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ },
+ wantDiscovered: []confgroup.Config{cfg},
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgFailed},
+ },
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name delete
+`,
+ }
+ },
+ },
+ "stock => nok: add and remove": {
+ createSim: func() *runSim {
+ cfg := prepareStockCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+
+ sendConfGroup(in, cfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name delete
+`,
+ }
+ },
+ },
+ "user => ok: add and remove": {
+ createSim: func() *runSim {
+ cfg := prepareUserCfg("success", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+
+ sendConfGroup(in, cfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:success:name create accepted job /collectors/jobs user 'type=user,module=success,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:name status running
+
+CONFIG go.d:collector:success:name delete
+ `,
+ }
+ },
+ },
+ "user => nok: add and remove": {
+ createSim: func() *runSim {
+ cfg := prepareUserCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+
+ sendConfGroup(in, cfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+
+CONFIG go.d:collector:fail:name delete
+ `,
+ }
+ },
+ },
+ "disc => ok: add and remove": {
+ createSim: func() *runSim {
+ cfg := prepareDiscoveredCfg("success", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+
+ sendConfGroup(in, cfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:success:name create accepted job /collectors/jobs discovered 'type=discovered,module=success,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:name status running
+
+CONFIG go.d:collector:success:name delete
+ `,
+ }
+ },
+ },
+ "disc => nok: add and remove": {
+ createSim: func() *runSim {
+ cfg := prepareDiscoveredCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, cfg.Source(), cfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+
+ sendConfGroup(in, cfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+
+CONFIG go.d:collector:fail:name delete
+ `,
+ }
+ },
+ },
+ "non-dyncfg => nok: diff src, diff name: add": {
+ createSim: func() *runSim {
+ stockCfg := prepareStockCfg("fail", "stock")
+ discCfg := prepareDiscoveredCfg("fail", "discovered")
+ userCfg := prepareUserCfg("fail", "user")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, stockCfg.Source(), stockCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(stockCfg), "enable"},
+ })
+
+ sendConfGroup(in, discCfg.Source(), discCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(discCfg), "enable"},
+ })
+
+ sendConfGroup(in, userCfg.Source(), userCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-enable",
+ Args: []string{dyncfgJobID(userCfg), "enable"},
+ })
+ },
+ wantDiscovered: []confgroup.Config{
+ stockCfg,
+ userCfg,
+ discCfg,
+ },
+ wantSeen: []seenConfig{
+ {cfg: stockCfg, status: dyncfgFailed},
+ {cfg: discCfg, status: dyncfgFailed},
+ {cfg: userCfg, status: dyncfgFailed},
+ },
+ wantExposed: []seenConfig{
+ {cfg: discCfg, status: dyncfgFailed},
+ {cfg: userCfg, status: dyncfgFailed},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:stock create accepted job /collectors/jobs stock 'type=stock,module=fail,job=stock' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:stock delete
+
+CONFIG go.d:collector:fail:discovered create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=discovered' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:discovered status failed
+
+CONFIG go.d:collector:fail:user create accepted job /collectors/jobs user 'type=user,module=fail,job=user' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 3-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:user status failed
+ `,
+ }
+ },
+ },
+ "non-dyncfg => nok: diff src,src prio asc,same name: add": {
+ createSim: func() *runSim {
+ stockCfg := prepareStockCfg("fail", "name")
+ discCfg := prepareDiscoveredCfg("fail", "name")
+ userCfg := prepareUserCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, stockCfg.Source(), stockCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(stockCfg), "enable"},
+ })
+
+ sendConfGroup(in, discCfg.Source(), discCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(discCfg), "enable"},
+ })
+
+ sendConfGroup(in, userCfg.Source(), userCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-enable",
+ Args: []string{dyncfgJobID(userCfg), "enable"},
+ })
+ },
+ wantDiscovered: []confgroup.Config{
+ stockCfg,
+ userCfg,
+ discCfg,
+ },
+ wantSeen: []seenConfig{
+ {cfg: stockCfg, status: dyncfgFailed},
+ {cfg: discCfg, status: dyncfgFailed},
+ {cfg: userCfg, status: dyncfgFailed},
+ },
+ wantExposed: []seenConfig{
+ {cfg: userCfg, status: dyncfgFailed},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name delete
+
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 3-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+ `,
+ }
+ },
+ },
+ "non-dyncfg => nok: diff src,src prio asc,same name: add and remove": {
+ createSim: func() *runSim {
+ stockCfg := prepareStockCfg("fail", "name")
+ discCfg := prepareDiscoveredCfg("fail", "name")
+ userCfg := prepareUserCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, stockCfg.Source(), stockCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(stockCfg), "enable"},
+ })
+
+ sendConfGroup(in, discCfg.Source(), discCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(discCfg), "enable"},
+ })
+
+ sendConfGroup(in, userCfg.Source(), userCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-enable",
+ Args: []string{dyncfgJobID(userCfg), "enable"},
+ })
+
+ sendConfGroup(in, stockCfg.Source())
+ sendConfGroup(in, discCfg.Source())
+ sendConfGroup(in, userCfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name delete
+
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 3-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+
+CONFIG go.d:collector:fail:name delete
+ `,
+ }
+ },
+ },
+ "non-dyncfg => nok: diff src,src prio desc,same name: add": {
+ createSim: func() *runSim {
+ userCfg := prepareUserCfg("fail", "name")
+ discCfg := prepareDiscoveredCfg("fail", "name")
+ stockCfg := prepareStockCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, userCfg.Source(), userCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(userCfg), "enable"},
+ })
+
+ sendConfGroup(in, discCfg.Source(), discCfg)
+ sendConfGroup(in, stockCfg.Source(), stockCfg)
+ },
+ wantDiscovered: []confgroup.Config{
+ stockCfg,
+ userCfg,
+ discCfg,
+ },
+ wantSeen: []seenConfig{
+ {cfg: userCfg, status: dyncfgFailed},
+ {cfg: discCfg},
+ {cfg: stockCfg},
+ },
+ wantExposed: []seenConfig{
+ {cfg: userCfg, status: dyncfgFailed},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+ `,
+ }
+ },
+ },
+ "non-dyncfg => nok: diff src,src prio desc,same name: add and remove": {
+ createSim: func() *runSim {
+ userCfg := prepareUserCfg("fail", "name")
+ discCfg := prepareDiscoveredCfg("fail", "name")
+ stockCfg := prepareStockCfg("fail", "name")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, userCfg.Source(), userCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(userCfg), "enable"},
+ })
+
+ sendConfGroup(in, discCfg.Source(), discCfg)
+ sendConfGroup(in, stockCfg.Source(), stockCfg)
+
+ sendConfGroup(in, userCfg.Source())
+ sendConfGroup(in, discCfg.Source())
+ sendConfGroup(in, stockCfg.Source())
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:name status failed
+
+CONFIG go.d:collector:fail:name delete
+ `,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Get(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[get] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-get",
+ Args: []string{dyncfgJobID(cfg), "get"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-get 404 application/json
+{"status":404,"message":"The specified module 'success' job 'test' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[get] existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test").
+ Set("option_str", "1").
+ Set("option_int", 1)
+ bs, _ := json.Marshal(cfg)
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: bs,
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-get",
+ Args: []string{dyncfgJobID(cfg), "get"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-get 200 application/json
+{"option_str":"1","option_int":1}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Userconfig(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[userconfig] existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-userconfig",
+ Args: []string{dyncfgJobID(cfg), "userconfig"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+FUNCTION_RESULT_BEGIN 1-userconfig 200 application/yaml
+jobs:
+- name: test
+ option_one: one
+ option_two: 2
+
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[userconfig] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success!", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-userconfig",
+ Args: []string{dyncfgJobID(cfg), "userconfig"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+FUNCTION_RESULT_BEGIN 1-userconfig 404 application/json
+{"status":404,"message":"The specified module 'success!' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Add(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[add] dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+`,
+ }
+ },
+ },
+ "[add] dyncfg:nok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("fail", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+`,
+ }
+ },
+ },
+ "[add] dyncfg:ok twice": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Enable(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[enable] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-enable 404 application/json
+{"status":404,"message":"The specified module 'success' job 'test' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[enable] dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantRunning: []string{cfg.FullName()},
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+`,
+ }
+ },
+ },
+ "[enable] dyncfg:ok twice": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantRunning: []string{cfg.FullName()},
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+
+FUNCTION_RESULT_BEGIN 3-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+`,
+ }
+ },
+ },
+ "[enable] dyncfg:nok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("fail", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgFailed},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgFailed},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test status failed
+`,
+ }
+ },
+ },
+ "[enable] dyncfg:nok twice": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("fail", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgFailed},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgFailed},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test status failed
+
+FUNCTION_RESULT_BEGIN 3-enable 200 application/json
+{"status":200,"message":"Job enable failed: mock failed init."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test status failed
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Disable(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[disable] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-disable 404 application/json
+{"status":404,"message":"The specified module 'success' job 'test' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[disable] dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+`,
+ }
+ },
+ },
+ "[disable] dyncfg:ok twice": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+
+FUNCTION_RESULT_BEGIN 3-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+`,
+ }
+ },
+ },
+ "[disable] dyncfg:nok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("fail", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test status disabled
+`,
+ }
+ },
+ },
+ "[disable] dyncfg:nok twice": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("fail", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test status disabled
+
+FUNCTION_RESULT_BEGIN 3-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:fail:test status disabled
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Restart(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[restart] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-restart",
+ Args: []string{dyncfgJobID(cfg), "restart"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-restart 404 application/json
+{"status":404,"message":"The specified module 'success' job 'test' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[restart] not enabled dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-restart",
+ Args: []string{dyncfgJobID(cfg), "restart"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgAccepted},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-restart 405 application/json
+{"status":405,"message":"Restarting data collection job is not allowed in 'accepted' state."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status accepted
+`,
+ }
+ },
+ },
+ "[restart] enabled dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-restart",
+ Args: []string{dyncfgJobID(cfg), "restart"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantRunning: []string{cfg.FullName()},
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+
+FUNCTION_RESULT_BEGIN 3-restart 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+`,
+ }
+ },
+ },
+ "[restart] disabled dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-disable",
+ Args: []string{dyncfgJobID(cfg), "disable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-restart",
+ Args: []string{dyncfgJobID(cfg), "restart"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgDisabled},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+
+FUNCTION_RESULT_BEGIN 3-restart 405 application/json
+{"status":405,"message":"Restarting data collection job is not allowed in 'disabled' state."}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+`,
+ }
+ },
+ },
+ "[restart] enabled dyncfg:ok multiple times": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-restart",
+ Args: []string{dyncfgJobID(cfg), "restart"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "4-restart",
+ Args: []string{dyncfgJobID(cfg), "restart"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantExposed: []seenConfig{
+ {cfg: cfg, status: dyncfgRunning},
+ },
+ wantRunning: []string{cfg.FullName()},
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+
+FUNCTION_RESULT_BEGIN 3-restart 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+
+FUNCTION_RESULT_BEGIN 4-restart 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Remove(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[remove] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-remove",
+ Args: []string{dyncfgJobID(cfg), "remove"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-remove 404 application/json
+{"status":404,"message":"The specified module 'success' job 'test' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[remove] non-dyncfg": {
+ createSim: func() *runSim {
+ stockCfg := prepareStockCfg("success", "stock")
+ userCfg := prepareUserCfg("success", "user")
+ discCfg := prepareDiscoveredCfg("success", "discovered")
+
+ return &runSim{
+ do: func(mgr *Manager, in chan []*confgroup.Group) {
+ sendConfGroup(in, stockCfg.Source(), stockCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-enable",
+ Args: []string{dyncfgJobID(stockCfg), "enable"},
+ })
+
+ sendConfGroup(in, userCfg.Source(), userCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(userCfg), "enable"},
+ })
+
+ sendConfGroup(in, discCfg.Source(), discCfg)
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-enable",
+ Args: []string{dyncfgJobID(discCfg), "enable"},
+ })
+
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-remove",
+ Args: []string{dyncfgJobID(stockCfg), "remove"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-remove",
+ Args: []string{dyncfgJobID(userCfg), "remove"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-remove",
+ Args: []string{dyncfgJobID(discCfg), "remove"},
+ })
+ },
+ wantDiscovered: []confgroup.Config{
+ stockCfg,
+ userCfg,
+ discCfg,
+ },
+ wantSeen: []seenConfig{
+ {cfg: stockCfg, status: dyncfgRunning},
+ {cfg: userCfg, status: dyncfgRunning},
+ {cfg: discCfg, status: dyncfgRunning},
+ },
+ wantExposed: []seenConfig{
+ {cfg: stockCfg, status: dyncfgRunning},
+ {cfg: userCfg, status: dyncfgRunning},
+ {cfg: discCfg, status: dyncfgRunning},
+ },
+ wantRunning: []string{stockCfg.FullName(), userCfg.FullName(), discCfg.FullName()},
+ wantDyncfg: `
+CONFIG go.d:collector:success:stock create accepted job /collectors/jobs stock 'type=stock,module=success,job=stock' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 1-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:stock status running
+
+CONFIG go.d:collector:success:user create accepted job /collectors/jobs user 'type=user,module=success,job=user' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:user status running
+
+CONFIG go.d:collector:success:discovered create accepted job /collectors/jobs discovered 'type=discovered,module=success,job=discovered' 'schema get enable disable update restart test userconfig' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 3-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:discovered status running
+
+FUNCTION_RESULT_BEGIN 1-remove 405 application/json
+{"status":405,"message":"Removing jobs of type 'stock' is not supported. Only 'dyncfg' jobs can be removed."}
+FUNCTION_RESULT_END
+
+FUNCTION_RESULT_BEGIN 2-remove 405 application/json
+{"status":405,"message":"Removing jobs of type 'user' is not supported. Only 'dyncfg' jobs can be removed."}
+FUNCTION_RESULT_END
+
+FUNCTION_RESULT_BEGIN 3-remove 405 application/json
+{"status":405,"message":"Removing jobs of type 'discovered' is not supported. Only 'dyncfg' jobs can be removed."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[remove] not enabled dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-remove",
+ Args: []string{dyncfgJobID(cfg), "remove"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-remove 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test delete
+`,
+ }
+ },
+ },
+ "[remove] enabled dyncfg:ok": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()},
+ Payload: []byte("{}"),
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(cfg), "enable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-remove",
+ Args: []string{dyncfgJobID(cfg), "remove"},
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+
+FUNCTION_RESULT_BEGIN 3-remove 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test delete
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func TestManager_Run_Dyncfg_Update(t *testing.T) {
+ tests := map[string]struct {
+ createSim func() *runSim
+ }{
+ "[update] non-existing": {
+ createSim: func() *runSim {
+ cfg := prepareDyncfgCfg("success", "test")
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-update",
+ Args: []string{dyncfgJobID(cfg), "update"},
+ Payload: []byte("{}"),
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: nil,
+ wantExposed: nil,
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-update 404 application/json
+{"status":404,"message":"The specified module 'success' job 'test' is not registered."}
+FUNCTION_RESULT_END
+`,
+ }
+ },
+ },
+ "[update] enabled dyncfg:ok with dyncfg:ok": {
+ createSim: func() *runSim {
+ origCfg := prepareDyncfgCfg("success", "test").
+ Set("option_str", "1")
+ updCfg := prepareDyncfgCfg("success", "test").
+ Set("option_str", "2")
+ origBs, _ := json.Marshal(origCfg)
+ updBs, _ := json.Marshal(updCfg)
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(origCfg.Module()), "add", origCfg.Name()},
+ Payload: origBs,
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-enable",
+ Args: []string{dyncfgJobID(origCfg), "enable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-update",
+ Args: []string{dyncfgJobID(origCfg), "update"},
+ Payload: updBs,
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: updCfg, status: dyncfgRunning},
+ },
+ wantExposed: []seenConfig{
+ {cfg: updCfg, status: dyncfgRunning},
+ },
+ wantRunning: []string{updCfg.FullName()},
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-enable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+
+FUNCTION_RESULT_BEGIN 3-update 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status running
+`,
+ }
+ },
+ },
+ "[update] disabled dyncfg:ok with dyncfg:ok": {
+ createSim: func() *runSim {
+ origCfg := prepareDyncfgCfg("success", "test").
+ Set("option_str", "1")
+ updCfg := prepareDyncfgCfg("success", "test").
+ Set("option_str", "2")
+ origBs, _ := json.Marshal(origCfg)
+ updBs, _ := json.Marshal(updCfg)
+
+ return &runSim{
+ do: func(mgr *Manager, _ chan []*confgroup.Group) {
+ mgr.dyncfgConfig(functions.Function{
+ UID: "1-add",
+ Args: []string{dyncfgModID(origCfg.Module()), "add", origCfg.Name()},
+ Payload: origBs,
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "2-disable",
+ Args: []string{dyncfgJobID(origCfg), "disable"},
+ })
+ mgr.dyncfgConfig(functions.Function{
+ UID: "3-update",
+ Args: []string{dyncfgJobID(origCfg), "update"},
+ Payload: updBs,
+ })
+ },
+ wantDiscovered: nil,
+ wantSeen: []seenConfig{
+ {cfg: updCfg, status: dyncfgDisabled},
+ },
+ wantExposed: []seenConfig{
+ {cfg: updCfg, status: dyncfgDisabled},
+ },
+ wantRunning: nil,
+ wantDyncfg: `
+
+FUNCTION_RESULT_BEGIN 1-add 202 application/json
+{"status":202,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000
+
+FUNCTION_RESULT_BEGIN 2-disable 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+
+FUNCTION_RESULT_BEGIN 3-update 200 application/json
+{"status":200,"message":""}
+FUNCTION_RESULT_END
+
+CONFIG go.d:collector:success:test status disabled
+`,
+ }
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ sim := test.createSim()
+ sim.run(t)
+ })
+ }
+}
+
+func sendConfGroup(in chan []*confgroup.Group, src string, configs ...confgroup.Config) {
+ in <- prepareCfgGroups(src, configs...)
+ in <- prepareCfgGroups("_")
+}
+
+func prepareCfgGroups(src string, configs ...confgroup.Config) []*confgroup.Group {
+ return []*confgroup.Group{{Configs: configs, Source: src}}
+}
+
+func prepareStockCfg(module, job string) confgroup.Config {
+ return confgroup.Config{}.
+ SetSourceType(confgroup.TypeStock).
+ SetProvider("test").
+ SetSource(fmt.Sprintf("type=stock,module=%s,job=%s", module, job)).
+ SetModule(module).
+ SetName(job)
+}
+
+func prepareUserCfg(module, job string) confgroup.Config {
+ return confgroup.Config{}.
+ SetSourceType(confgroup.TypeUser).
+ SetProvider("test").
+ SetSource(fmt.Sprintf("type=user,module=%s,job=%s", module, job)).
+ SetModule(module).
+ SetName(job)
+}
+
+func prepareDiscoveredCfg(module, job string) confgroup.Config {
+ return confgroup.Config{}.
+ SetSourceType(confgroup.TypeDiscovered).
+ SetProvider("test").
+ SetSource(fmt.Sprintf("type=discovered,module=%s,job=%s", module, job)).
+ SetModule(module).
+ SetName(job)
+}
+
+func prepareDyncfgCfg(module, job string) confgroup.Config {
+ return confgroup.Config{}.
+ SetSourceType(confgroup.TypeDyncfg).
+ SetProvider("dyncfg").
+ SetSource(fmt.Sprintf("type=dyncfg,module=%s,job=%s", module, job)).
+ SetModule(module).
+ SetName(job)
+}
diff --git a/src/go/plugin/go.d/agent/jobmgr/noop.go b/src/go/plugin/go.d/agent/jobmgr/noop.go
new file mode 100644
index 000000000..adeacf906
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/noop.go
@@ -0,0 +1,21 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/vnodes"
+)
+
+type noop struct{}
+
+func (n noop) Lock(string) (bool, error) { return true, nil }
+func (n noop) Unlock(string) {}
+func (n noop) Save(confgroup.Config, string) {}
+func (n noop) Remove(confgroup.Config) {}
+func (n noop) Contains(confgroup.Config, ...string) bool { return false }
+func (n noop) Lookup(string) (*vnodes.VirtualNode, bool) { return nil, false }
+func (n noop) Register(name string, reg func(functions.Function)) {}
+func (n noop) Unregister(name string) {}
diff --git a/src/go/plugin/go.d/agent/jobmgr/sim_test.go b/src/go/plugin/go.d/agent/jobmgr/sim_test.go
new file mode 100644
index 000000000..9fe67175a
--- /dev/null
+++ b/src/go/plugin/go.d/agent/jobmgr/sim_test.go
@@ -0,0 +1,152 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package jobmgr
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type runSim struct {
+ do func(mgr *Manager, in chan []*confgroup.Group)
+
+ wantDiscovered []confgroup.Config
+ wantSeen []seenConfig
+ wantExposed []seenConfig
+ wantRunning []string
+ wantDyncfg string
+}
+
+func (s *runSim) run(t *testing.T) {
+ t.Helper()
+
+ require.NotNil(t, s.do, "s.do is nil")
+
+ var buf bytes.Buffer
+ mgr := New()
+ mgr.api = netdataapi.New(safewriter.New(&buf))
+ mgr.Modules = prepareMockRegistry()
+
+ done := make(chan struct{})
+ grpCh := make(chan []*confgroup.Group)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go func() { defer close(done); defer close(grpCh); mgr.Run(ctx, grpCh) }()
+
+ timeout := time.Second * 5
+
+ select {
+ case <-mgr.started:
+ case <-time.After(timeout):
+ t.Errorf("failed to start work in %s", timeout)
+ }
+
+ s.do(mgr, grpCh)
+ cancel()
+
+ select {
+ case <-done:
+ case <-time.After(timeout):
+ t.Errorf("failed to finish work in %s", timeout)
+ }
+
+ var lines []string
+ for _, s := range strings.Split(buf.String(), "\n") {
+ if strings.HasPrefix(s, "CONFIG") && strings.Contains(s, " template ") {
+ continue
+ }
+ if strings.HasPrefix(s, "FUNCTION_RESULT_BEGIN") {
+ parts := strings.Fields(s)
+ s = strings.Join(parts[:len(parts)-1], " ") // remove timestamp
+ }
+ lines = append(lines, s)
+ }
+ wantDyncfg, gotDyncfg := strings.TrimSpace(s.wantDyncfg), strings.TrimSpace(strings.Join(lines, "\n"))
+
+ //fmt.Println(gotDyncfg)
+
+ assert.Equal(t, wantDyncfg, gotDyncfg, "dyncfg commands")
+
+ var n int
+ for _, cfgs := range mgr.discoveredConfigs.items {
+ n += len(cfgs)
+ }
+
+ wantLen, gotLen := len(s.wantDiscovered), n
+ require.Equalf(t, wantLen, gotLen, "discoveredConfigs: different len (want %d got %d)", wantLen, gotLen)
+
+ for _, cfg := range s.wantDiscovered {
+ cfgs, ok := mgr.discoveredConfigs.items[cfg.Source()]
+ require.Truef(t, ok, "discoveredConfigs: source %s is not found", cfg.Source())
+ _, ok = cfgs[cfg.Hash()]
+ require.Truef(t, ok, "discoveredConfigs: source %s config %d is not found", cfg.Source(), cfg.Hash())
+ }
+
+ wantLen, gotLen = len(s.wantSeen), len(mgr.seenConfigs.items)
+ require.Equalf(t, wantLen, gotLen, "seenConfigs: different len (want %d got %d)", wantLen, gotLen)
+
+ for _, scfg := range s.wantSeen {
+ v, ok := mgr.seenConfigs.lookup(scfg.cfg)
+ require.Truef(t, ok, "seenConfigs: config '%s' is not found", scfg.cfg.UID())
+ require.Truef(t, scfg.status == v.status, "seenConfigs: wrong status, want %s got %s", scfg.status, v.status)
+ }
+
+ wantLen, gotLen = len(s.wantExposed), len(mgr.exposedConfigs.items)
+ require.Equalf(t, wantLen, gotLen, "exposedConfigs: different len (want %d got %d)", wantLen, gotLen)
+
+ for _, scfg := range s.wantExposed {
+ v, ok := mgr.exposedConfigs.lookup(scfg.cfg)
+ require.Truef(t, ok && scfg.cfg.UID() == v.cfg.UID(), "exposedConfigs: config '%s' is not found", scfg.cfg.UID())
+ require.Truef(t, scfg.status == v.status, "exposedConfigs: wrong status, want %s got %s", scfg.status, v.status)
+ }
+
+ wantLen, gotLen = len(s.wantRunning), len(mgr.runningJobs.items)
+ require.Equalf(t, wantLen, gotLen, "runningJobs: different len (want %d got %d)", wantLen, gotLen)
+ for _, name := range s.wantRunning {
+ _, ok := mgr.runningJobs.lookup(name)
+ require.Truef(t, ok, "runningJobs: job '%s' is not found", name)
+ }
+}
+
+func prepareMockRegistry() module.Registry {
+ reg := module.Registry{}
+ type config struct {
+ OptionOne string `yaml:"option_one" json:"option_one"`
+ OptionTwo int64 `yaml:"option_two" json:"option_two"`
+ }
+
+ reg.Register("success", module.Creator{
+ JobConfigSchema: module.MockConfigSchema,
+ Create: func() module.Module {
+ return &module.MockModule{
+ ChartsFunc: func() *module.Charts {
+ return &module.Charts{&module.Chart{ID: "id", Title: "title", Units: "units", Dims: module.Dims{{ID: "id1"}}}}
+ },
+ CollectFunc: func() map[string]int64 { return map[string]int64{"id1": 1} },
+ }
+ },
+ Config: func() any {
+ return &config{OptionOne: "one", OptionTwo: 2}
+ },
+ })
+ reg.Register("fail", module.Creator{
+ Create: func() module.Module {
+ return &module.MockModule{
+ InitFunc: func() error { return errors.New("mock failed init") },
+ }
+ },
+ })
+
+ return reg
+}