From f09848204fa5283d21ea43e262ee41aa578e1808 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 26 Aug 2024 10:15:24 +0200 Subject: Merging upstream version 1.47.0. Signed-off-by: Daniel Baumann --- src/go/plugin/go.d/agent/jobmgr/cache.go | 181 +++ src/go/plugin/go.d/agent/jobmgr/di.go | 39 + src/go/plugin/go.d/agent/jobmgr/dyncfg.go | 852 ++++++++++ src/go/plugin/go.d/agent/jobmgr/manager.go | 370 +++++ src/go/plugin/go.d/agent/jobmgr/manager_test.go | 1892 +++++++++++++++++++++++ src/go/plugin/go.d/agent/jobmgr/noop.go | 21 + src/go/plugin/go.d/agent/jobmgr/sim_test.go | 152 ++ 7 files changed, 3507 insertions(+) create mode 100644 src/go/plugin/go.d/agent/jobmgr/cache.go create mode 100644 src/go/plugin/go.d/agent/jobmgr/di.go create mode 100644 src/go/plugin/go.d/agent/jobmgr/dyncfg.go create mode 100644 src/go/plugin/go.d/agent/jobmgr/manager.go create mode 100644 src/go/plugin/go.d/agent/jobmgr/manager_test.go create mode 100644 src/go/plugin/go.d/agent/jobmgr/noop.go create mode 100644 src/go/plugin/go.d/agent/jobmgr/sim_test.go (limited to 'src/go/plugin/go.d/agent/jobmgr') diff --git a/src/go/plugin/go.d/agent/jobmgr/cache.go b/src/go/plugin/go.d/agent/jobmgr/cache.go new file mode 100644 index 000000000..8ea16ce96 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/cache.go @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "context" + "sync" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" +) + +func newDiscoveredConfigsCache() *discoveredConfigs { + return &discoveredConfigs{ + items: make(map[string]map[uint64]confgroup.Config), + } +} + +func newSeenConfigCache() *seenConfigs { + return &seenConfigs{ + items: make(map[string]*seenConfig), + } +} + +func newExposedConfigCache() *exposedConfigs { + return &exposedConfigs{ + items: make(map[string]*seenConfig), + } +} + +func newRunningJobsCache() *runningJobs { + return &runningJobs{ + mux: sync.Mutex{}, + items: make(map[string]*module.Job), + } +} + +func newRetryingTasksCache() *retryingTasks { + return &retryingTasks{ + items: make(map[string]*retryTask), + } +} + +type ( + discoveredConfigs struct { + // [Source][Hash] + items map[string]map[uint64]confgroup.Config + } + + seenConfigs struct { + // [cfg.UID()] + items map[string]*seenConfig + } + exposedConfigs struct { + // [cfg.FullName()] + items map[string]*seenConfig + } + seenConfig struct { + cfg confgroup.Config + status dyncfgStatus + } + + runningJobs struct { + mux sync.Mutex + // [cfg.FullName()] + items map[string]*module.Job + } + + retryingTasks struct { + // [cfg.UID()] + items map[string]*retryTask + } + retryTask struct { + cancel context.CancelFunc + } +) + +func (c *discoveredConfigs) add(group *confgroup.Group) (added, removed []confgroup.Config) { + cfgs, ok := c.items[group.Source] + if !ok { + if len(group.Configs) == 0 { + return nil, nil + } + cfgs = make(map[uint64]confgroup.Config) + c.items[group.Source] = cfgs + } + + seen := make(map[uint64]bool) + + for _, cfg := range group.Configs { + hash := cfg.Hash() + seen[hash] = true + + if _, ok := cfgs[hash]; ok { + continue + } + + cfgs[hash] = cfg + added = append(added, cfg) + } + + for hash, cfg := range cfgs { + if !seen[hash] { + delete(cfgs, hash) + removed = append(removed, cfg) + } + } + + if len(cfgs) == 0 { + delete(c.items, group.Source) + } + + return added, removed +} + +func (c *seenConfigs) add(sj *seenConfig) { + c.items[sj.cfg.UID()] = sj +} +func (c *seenConfigs) remove(cfg confgroup.Config) { + delete(c.items, cfg.UID()) +} +func (c *seenConfigs) lookup(cfg confgroup.Config) (*seenConfig, bool) { + v, ok := c.items[cfg.UID()] + return v, ok +} + +func (c *exposedConfigs) add(sj *seenConfig) { + c.items[sj.cfg.FullName()] = sj +} +func (c *exposedConfigs) remove(cfg confgroup.Config) { + delete(c.items, cfg.FullName()) +} +func (c *exposedConfigs) lookup(cfg confgroup.Config) (*seenConfig, bool) { + v, ok := c.items[cfg.FullName()] + return v, ok +} + +func (c *exposedConfigs) lookupByName(module, job string) (*seenConfig, bool) { + key := module + "_" + job + if module == job { + key = job + } + v, ok := c.items[key] + return v, ok +} + +func (c *runningJobs) lock() { + c.mux.Lock() +} +func (c *runningJobs) unlock() { + c.mux.Unlock() +} +func (c *runningJobs) add(fullName string, job *module.Job) { + c.items[fullName] = job +} +func (c *runningJobs) remove(fullName string) { + delete(c.items, fullName) +} +func (c *runningJobs) lookup(fullName string) (*module.Job, bool) { + j, ok := c.items[fullName] + return j, ok +} +func (c *runningJobs) forEach(fn func(fullName string, job *module.Job)) { + for k, j := range c.items { + fn(k, j) + } +} + +func (c *retryingTasks) add(cfg confgroup.Config, retry *retryTask) { + c.items[cfg.UID()] = retry +} +func (c *retryingTasks) remove(cfg confgroup.Config) { + if v, ok := c.lookup(cfg); ok { + v.cancel() + } + delete(c.items, cfg.UID()) +} +func (c *retryingTasks) lookup(cfg confgroup.Config) (*retryTask, bool) { + v, ok := c.items[cfg.UID()] + return v, ok +} diff --git a/src/go/plugin/go.d/agent/jobmgr/di.go b/src/go/plugin/go.d/agent/jobmgr/di.go new file mode 100644 index 000000000..466fcdf90 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/di.go @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/vnodes" +) + +type FileLocker interface { + Lock(name string) (bool, error) + Unlock(name string) +} + +type FileStatus interface { + Save(cfg confgroup.Config, state string) + Remove(cfg confgroup.Config) +} + +type FileStatusStore interface { + Contains(cfg confgroup.Config, states ...string) bool +} + +type Vnodes interface { + Lookup(key string) (*vnodes.VirtualNode, bool) +} + +type FunctionRegistry interface { + Register(name string, reg func(functions.Function)) + Unregister(name string) +} + +type dyncfgAPI interface { + CONFIGCREATE(id, status, configType, path, sourceType, source, supportedCommands string) + CONFIGDELETE(id string) + CONFIGSTATUS(id, status string) + FUNCRESULT(uid, contentType, payload, code, expireTimestamp string) +} diff --git a/src/go/plugin/go.d/agent/jobmgr/dyncfg.go b/src/go/plugin/go.d/agent/jobmgr/dyncfg.go new file mode 100644 index 000000000..da6d67489 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/dyncfg.go @@ -0,0 +1,852 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "reflect" + "strconv" + "strings" + "time" + "unicode" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" + + "gopkg.in/yaml.v2" +) + +type dyncfgStatus int + +const ( + _ dyncfgStatus = iota + dyncfgAccepted + dyncfgRunning + dyncfgFailed + dyncfgIncomplete + dyncfgDisabled +) + +func (s dyncfgStatus) String() string { + switch s { + case dyncfgAccepted: + return "accepted" + case dyncfgRunning: + return "running" + case dyncfgFailed: + return "failed" + case dyncfgIncomplete: + return "incomplete" + case dyncfgDisabled: + return "disabled" + default: + return "unknown" + } +} + +const ( + dyncfgIDPrefix = "go.d:collector:" + dyncfgPath = "/collectors/jobs" +) + +func dyncfgModID(name string) string { + return fmt.Sprintf("%s%s", dyncfgIDPrefix, name) +} +func dyncfgJobID(cfg confgroup.Config) string { + return fmt.Sprintf("%s%s:%s", dyncfgIDPrefix, cfg.Module(), cfg.Name()) +} + +func dyncfgModCmds() string { + return "add schema enable disable test userconfig" +} +func dyncfgJobCmds(cfg confgroup.Config) string { + cmds := "schema get enable disable update restart test userconfig" + if isDyncfg(cfg) { + cmds += " remove" + } + return cmds +} + +func (m *Manager) dyncfgModuleCreate(name string) { + id := dyncfgModID(name) + path := dyncfgPath + cmds := dyncfgModCmds() + typ := "template" + src := "internal" + m.api.CONFIGCREATE(id, dyncfgAccepted.String(), typ, path, src, src, cmds) +} + +func (m *Manager) dyncfgJobCreate(cfg confgroup.Config, status dyncfgStatus) { + id := dyncfgJobID(cfg) + path := dyncfgPath + cmds := dyncfgJobCmds(cfg) + typ := "job" + m.api.CONFIGCREATE(id, status.String(), typ, path, cfg.SourceType(), cfg.Source(), cmds) +} + +func (m *Manager) dyncfgJobRemove(cfg confgroup.Config) { + m.api.CONFIGDELETE(dyncfgJobID(cfg)) +} + +func (m *Manager) dyncfgJobStatus(cfg confgroup.Config, status dyncfgStatus) { + m.api.CONFIGSTATUS(dyncfgJobID(cfg), status.String()) +} + +func (m *Manager) dyncfgConfig(fn functions.Function) { + if len(fn.Args) < 2 { + m.Warningf("dyncfg: %s: missing required arguments, want 3 got %d", fn.Name, len(fn.Args)) + m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 2, but got %d.", len(fn.Args)) + return + } + + select { + case <-m.ctx.Done(): + m.dyncfgRespf(fn, 503, "Job manager is shutting down.") + default: + } + + //m.Infof("QQ FN: '%s'", fn) + + action := strings.ToLower(fn.Args[1]) + + switch action { + case "userconfig": + m.dyncfgConfigUserconfig(fn) + return + case "test": + m.dyncfgConfigTest(fn) + return + case "schema": + m.dyncfgConfigSchema(fn) + return + } + + select { + case <-m.ctx.Done(): + m.dyncfgRespf(fn, 503, "Job manager is shutting down.") + case m.dyncfgCh <- fn: + } +} + +func (m *Manager) dyncfgConfigExec(fn functions.Function) { + action := strings.ToLower(fn.Args[1]) + + switch action { + case "test": + m.dyncfgConfigTest(fn) + case "schema": + m.dyncfgConfigSchema(fn) + case "get": + m.dyncfgConfigGet(fn) + case "restart": + m.dyncfgConfigRestart(fn) + case "enable": + m.dyncfgConfigEnable(fn) + case "disable": + m.dyncfgConfigDisable(fn) + case "add": + m.dyncfgConfigAdd(fn) + case "remove": + m.dyncfgConfigRemove(fn) + case "update": + m.dyncfgConfigUpdate(fn) + default: + m.Warningf("dyncfg: function '%s' not implemented", fn.String()) + m.dyncfgRespf(fn, 501, "Function '%s' is not implemented.", fn.Name) + } +} + +func (m *Manager) dyncfgConfigUserconfig(fn functions.Function) { + id := fn.Args[0] + jn := "test" + if len(fn.Args) > 2 { + jn = fn.Args[2] + } + + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: userconfig: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, + "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + creator, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: userconfig: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + if creator.Config == nil || creator.Config() == nil { + m.Warningf("dyncfg: userconfig: module %s: configuration not found", mn) + m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn) + return + } + + bs, err := userConfigFromPayload(creator.Config(), jn, fn) + if err != nil { + m.Warningf("dyncfg: userconfig: module %s: failed to create config from payload: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + } + + m.dyncfgRespPayloadYAML(fn, string(bs)) +} + +func (m *Manager) dyncfgConfigTest(fn functions.Function) { + id := fn.Args[0] + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: test: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, + "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + jn := "test" + if len(fn.Args) > 2 { + jn = fn.Args[2] + } + + if err := validateJobName(jn); err != nil { + m.Warningf("dyncfg: test: module %s: unacceptable job name '%s': %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Unacceptable job name '%s': %v.", jn, err) + return + } + + creator, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: test: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + cfg, err := configFromPayload(fn) + if err != nil { + m.Warningf("dyncfg: test: module %s: failed to create config from payload: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + return + } + + if cfg.Vnode() != "" { + if _, ok := m.Vnodes.Lookup(cfg.Vnode()); !ok { + m.Warningf("dyncfg: test: module %s: vnode %s not found", mn, cfg.Vnode()) + m.dyncfgRespf(fn, 400, "The specified vnode '%s' is not registered.", cfg.Vnode()) + return + } + } + + cfg.SetModule(mn) + cfg.SetName(jn) + + job := creator.Create() + + if err := applyConfig(cfg, job); err != nil { + m.Warningf("dyncfg: test: module %s: failed to apply config: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + return + } + + job.GetBase().Logger = logger.New().With( + slog.String("collector", cfg.Module()), + slog.String("job", cfg.Name()), + ) + + defer job.Cleanup() + + if err := job.Init(); err != nil { + m.dyncfgRespf(fn, 422, "Job initialization failed: %v", err) + return + } + if err := job.Check(); err != nil { + m.dyncfgRespf(fn, 422, "Job check failed: %v", err) + return + } + + m.dyncfgRespf(fn, 200, "") +} + +func (m *Manager) dyncfgConfigSchema(fn functions.Function) { + id := fn.Args[0] + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: schema: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + mod, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: schema: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + if mod.JobConfigSchema == "" { + m.Warningf("dyncfg: schema: module %s: schema not found", mn) + m.dyncfgRespf(fn, 500, "Module %s configuration schema not found.", mn) + return + } + + m.dyncfgRespPayloadJSON(fn, mod.JobConfigSchema) +} + +func (m *Manager) dyncfgConfigGet(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: get: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, + "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + creator, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: get: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: get: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + mod := creator.Create() + + if err := applyConfig(ecfg.cfg, mod); err != nil { + m.Warningf("dyncfg: get: module %s job %s failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + return + } + + conf := mod.Configuration() + if conf == nil { + m.Warningf("dyncfg: get: module %s: configuration not found", mn) + m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn) + return + } + + bs, err := json.Marshal(conf) + if err != nil { + m.Warningf("dyncfg: get: module %s job %s failed to json marshal config: %v", mn, jn, err) + m.dyncfgRespf(fn, 500, "Failed to convert configuration into JSON: %v.", err) + return + } + + m.dyncfgRespPayloadJSON(fn, string(bs)) +} + +func (m *Manager) dyncfgConfigRestart(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: restart: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: restart: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + job, err := m.createCollectorJob(ecfg.cfg) + if err != nil { + m.Warningf("dyncfg: restart: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + switch ecfg.status { + case dyncfgAccepted, dyncfgDisabled: + m.Warningf("dyncfg: restart: module %s job %s: restarting not allowed in '%s' state", mn, jn, ecfg.status) + m.dyncfgRespf(fn, 405, "Restarting data collection job is not allowed in '%s' state.", ecfg.status) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + case dyncfgRunning: + m.FileStatus.Remove(ecfg.cfg) + m.FileLock.Unlock(ecfg.cfg.FullName()) + m.stopRunningJob(ecfg.cfg.FullName()) + default: + } + + if err := job.AutoDetection(); err != nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 422, "Job restart failed: %v", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 500, "Job restart failed: cannot filelock.") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + ecfg.status = dyncfgRunning + + if isDyncfg(ecfg.cfg) { + m.FileStatus.Save(ecfg.cfg, ecfg.status.String()) + } + m.startRunningJob(job) + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) +} + +func (m *Manager) dyncfgConfigEnable(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: enable: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: enable: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + if ecfg.cfg.FullName() == m.waitCfgOnOff { + m.waitCfgOnOff = "" + } + + switch ecfg.status { + case dyncfgAccepted, dyncfgDisabled, dyncfgFailed: + case dyncfgRunning: + // non-dyncfg update triggers enable/disable + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + default: + m.Warningf("dyncfg: enable: module %s job %s: enabling not allowed in %s state", mn, jn, ecfg.status) + m.dyncfgRespf(fn, 405, "Enabling data collection job is not allowed in '%s' state.", ecfg.status) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + job, err := m.createCollectorJob(ecfg.cfg) + if err != nil { + ecfg.status = dyncfgFailed + m.Warningf("dyncfg: enable: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + if err := job.AutoDetection(); err != nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 200, "Job enable failed: %v.", err) + + if isStock(ecfg.cfg) { + m.exposedConfigs.remove(ecfg.cfg) + m.dyncfgJobRemove(ecfg.cfg) + } else { + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + } + + if job.RetryAutoDetection() && !isDyncfg(ecfg.cfg) { + m.Infof("%s[%s] job detection failed, will retry in %d seconds", + ecfg.cfg.Module(), ecfg.cfg.Name(), job.AutoDetectionEvery()) + + ctx, cancel := context.WithCancel(m.ctx) + m.retryingTasks.add(ecfg.cfg, &retryTask{cancel: cancel}) + go runRetryTask(ctx, m.addCh, ecfg.cfg) + } + return + } + + if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 500, "Job enable failed: can not filelock.") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + ecfg.status = dyncfgRunning + + if isDyncfg(ecfg.cfg) { + m.FileStatus.Save(ecfg.cfg, ecfg.status.String()) + } + + m.startRunningJob(job) + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + +} + +func (m *Manager) dyncfgConfigDisable(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: disable: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: disable: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + if ecfg.cfg.FullName() == m.waitCfgOnOff { + m.waitCfgOnOff = "" + } + + switch ecfg.status { + case dyncfgDisabled: + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + case dyncfgRunning: + m.stopRunningJob(ecfg.cfg.FullName()) + if isDyncfg(ecfg.cfg) { + m.FileStatus.Remove(ecfg.cfg) + } + m.FileLock.Unlock(ecfg.cfg.FullName()) + default: + } + + ecfg.status = dyncfgDisabled + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) +} + +func (m *Manager) dyncfgConfigAdd(fn functions.Function) { + if len(fn.Args) < 3 { + m.Warningf("dyncfg: add: missing required arguments, want 3 got %d", len(fn.Args)) + m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 3, but got %d.", len(fn.Args)) + return + } + + id := fn.Args[0] + jn := fn.Args[2] + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: add: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + if len(fn.Payload) == 0 { + m.Warningf("dyncfg: add: module %s job %s missing configuration payload.", mn, jn) + m.dyncfgRespf(fn, 400, "Missing configuration payload.") + return + } + + if err := validateJobName(jn); err != nil { + m.Warningf("dyncfg: add: module %s: unacceptable job name '%s': %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Unacceptable job name '%s': %v.", jn, err) + return + } + + cfg, err := configFromPayload(fn) + if err != nil { + m.Warningf("dyncfg: add: module %s job %s: failed to create config from payload: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + return + } + + m.dyncfgSetConfigMeta(cfg, mn, jn) + + if _, err := m.createCollectorJob(cfg); err != nil { + m.Warningf("dyncfg: add: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + return + } + + if ecfg, ok := m.exposedConfigs.lookup(cfg); ok { + if scfg, ok := m.seenConfigs.lookup(ecfg.cfg); ok && isDyncfg(scfg.cfg) { + m.seenConfigs.remove(ecfg.cfg) + } + m.exposedConfigs.remove(ecfg.cfg) + m.stopRunningJob(ecfg.cfg.FullName()) + } + + scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted} + ecfg := scfg + m.seenConfigs.add(scfg) + m.exposedConfigs.add(ecfg) + + m.dyncfgRespf(fn, 202, "") + m.dyncfgJobCreate(ecfg.cfg, ecfg.status) +} + +func (m *Manager) dyncfgConfigRemove(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: remove: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: remove: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + if !isDyncfg(ecfg.cfg) { + m.Warningf("dyncfg: remove: module %s job %s: can not remove jobs of type %s", mn, jn, ecfg.cfg.SourceType()) + m.dyncfgRespf(fn, 405, "Removing jobs of type '%s' is not supported. Only 'dyncfg' jobs can be removed.", ecfg.cfg.SourceType()) + return + } + + m.seenConfigs.remove(ecfg.cfg) + m.exposedConfigs.remove(ecfg.cfg) + m.stopRunningJob(ecfg.cfg.FullName()) + m.FileLock.Unlock(ecfg.cfg.FullName()) + m.FileStatus.Remove(ecfg.cfg) + + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobRemove(ecfg.cfg) +} + +func (m *Manager) dyncfgConfigUpdate(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: update: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: update: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + cfg, err := configFromPayload(fn) + if err != nil { + m.Warningf("dyncfg: update: module %s: failed to create config from payload: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + m.dyncfgSetConfigMeta(cfg, mn, jn) + + if ecfg.status == dyncfgRunning && ecfg.cfg.UID() == cfg.UID() { + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + job, err := m.createCollectorJob(cfg) + if err != nil { + m.Warningf("dyncfg: update: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + if ecfg.status == dyncfgAccepted { + m.Warningf("dyncfg: update: module %s job %s: updating not allowed in %s", mn, jn, ecfg.status) + m.dyncfgRespf(fn, 403, "Updating data collection job is not allowed in '%s' state.", ecfg.status) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + m.exposedConfigs.remove(ecfg.cfg) + m.stopRunningJob(ecfg.cfg.FullName()) + + scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted} + m.seenConfigs.add(scfg) + m.exposedConfigs.add(scfg) + + if isDyncfg(ecfg.cfg) { + m.seenConfigs.remove(ecfg.cfg) + } else { + // needed to update meta. There is no other way, unfortunately, but to send "create" + defer m.dyncfgJobCreate(scfg.cfg, scfg.status) + } + + if ecfg.status == dyncfgDisabled { + scfg.status = dyncfgDisabled + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(cfg, scfg.status) + return + } + + if err := job.AutoDetection(); err != nil { + job.Cleanup() + scfg.status = dyncfgFailed + m.dyncfgRespf(fn, 200, "Job update failed: %v", err) + m.dyncfgJobStatus(scfg.cfg, scfg.status) + return + } + + if ok, err := m.FileLock.Lock(scfg.cfg.FullName()); !ok && err == nil { + job.Cleanup() + scfg.status = dyncfgFailed + m.dyncfgRespf(fn, 500, "Job update failed: cannot create file lock.") + m.dyncfgJobStatus(scfg.cfg, scfg.status) + return + } + + scfg.status = dyncfgRunning + m.startRunningJob(job) + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(scfg.cfg, scfg.status) +} + +func (m *Manager) dyncfgSetConfigMeta(cfg confgroup.Config, module, name string) { + cfg.SetProvider("dyncfg") + cfg.SetSource(fmt.Sprintf("type=dyncfg,module=%s,job=%s", module, name)) + cfg.SetSourceType("dyncfg") + cfg.SetModule(module) + cfg.SetName(name) + if def, ok := m.ConfigDefaults.Lookup(module); ok { + cfg.ApplyDefaults(def) + } +} + +func (m *Manager) dyncfgRespPayloadJSON(fn functions.Function, payload string) { + m.dyncfgRespPayload(fn, payload, "application/json") +} + +func (m *Manager) dyncfgRespPayloadYAML(fn functions.Function, payload string) { + m.dyncfgRespPayload(fn, payload, "application/yaml") +} + +func (m *Manager) dyncfgRespPayload(fn functions.Function, payload string, contentType string) { + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, contentType, payload, "200", ts) +} + +func (m *Manager) dyncfgRespf(fn functions.Function, code int, msgf string, a ...any) { + if fn.UID == "" { + return + } + bs, _ := json.Marshal(struct { + Status int `json:"status"` + Message string `json:"message"` + }{ + Status: code, + Message: fmt.Sprintf(msgf, a...), + }) + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) +} + +func userConfigFromPayload(cfg any, jobName string, fn functions.Function) ([]byte, error) { + if v := reflect.ValueOf(cfg); v.Kind() != reflect.Ptr || v.IsNil() { + return nil, fmt.Errorf("invalid config: expected a pointer to a struct, got a %s", v.Type()) + } + + if fn.ContentType == "application/json" { + if err := json.Unmarshal(fn.Payload, cfg); err != nil { + return nil, err + } + } else { + if err := yaml.Unmarshal(fn.Payload, cfg); err != nil { + return nil, err + } + } + + bs, err := yaml.Marshal(cfg) + if err != nil { + return nil, err + } + + var yms yaml.MapSlice + if err := yaml.Unmarshal(bs, &yms); err != nil { + return nil, err + } + + yms = append([]yaml.MapItem{{Key: "name", Value: jobName}}, yms...) + + v := map[string]any{ + "jobs": []any{yms}, + } + + bs, err = yaml.Marshal(v) + if err != nil { + return nil, err + } + + return bs, nil +} + +func configFromPayload(fn functions.Function) (confgroup.Config, error) { + var cfg confgroup.Config + + if fn.ContentType == "application/json" { + if err := json.Unmarshal(fn.Payload, &cfg); err != nil { + return nil, err + } + + return cfg.Clone() + } + + if err := yaml.Unmarshal(fn.Payload, &cfg); err != nil { + return nil, err + } + + return cfg, nil +} + +func extractModuleJobName(id string) (mn string, jn string, ok bool) { + if mn, ok = extractModuleName(id); !ok { + return "", "", false + } + if jn, ok = extractJobName(id); !ok { + return "", "", false + } + return mn, jn, true +} + +func extractModuleName(id string) (string, bool) { + id = strings.TrimPrefix(id, dyncfgIDPrefix) + i := strings.IndexByte(id, ':') + if i == -1 { + return id, id != "" + } + return id[:i], true +} + +func extractJobName(id string) (string, bool) { + i := strings.LastIndexByte(id, ':') + if i == -1 { + return "", false + } + return id[i+1:], true +} + +func validateJobName(jobName string) error { + for _, r := range jobName { + if unicode.IsSpace(r) { + return errors.New("contains spaces") + } + switch r { + case '.', ':': + return fmt.Errorf("contains '%c'", r) + } + } + return nil +} diff --git a/src/go/plugin/go.d/agent/jobmgr/manager.go b/src/go/plugin/go.d/agent/jobmgr/manager.go new file mode 100644 index 000000000..59947be77 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/manager.go @@ -0,0 +1,370 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "context" + "fmt" + "io" + "log/slog" + "os" + "sync" + "time" + + "github.com/netdata/netdata/go/plugins/logger" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/ticker" + + "github.com/mattn/go-isatty" + "gopkg.in/yaml.v2" +) + +var isTerminal = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsTerminal(os.Stdin.Fd()) + +func New() *Manager { + mgr := &Manager{ + Logger: logger.New().With( + slog.String("component", "job manager"), + ), + Out: io.Discard, + FileLock: noop{}, + FileStatus: noop{}, + FileStatusStore: noop{}, + Vnodes: noop{}, + FnReg: noop{}, + + discoveredConfigs: newDiscoveredConfigsCache(), + seenConfigs: newSeenConfigCache(), + exposedConfigs: newExposedConfigCache(), + runningJobs: newRunningJobsCache(), + retryingTasks: newRetryingTasksCache(), + + started: make(chan struct{}), + api: netdataapi.New(safewriter.Stdout), + addCh: make(chan confgroup.Config), + rmCh: make(chan confgroup.Config), + dyncfgCh: make(chan functions.Function), + } + + return mgr +} + +type Manager struct { + *logger.Logger + + PluginName string + Out io.Writer + Modules module.Registry + ConfigDefaults confgroup.Registry + + FileLock FileLocker + FileStatus FileStatus + FileStatusStore FileStatusStore + Vnodes Vnodes + FnReg FunctionRegistry + + discoveredConfigs *discoveredConfigs + seenConfigs *seenConfigs + exposedConfigs *exposedConfigs + retryingTasks *retryingTasks + runningJobs *runningJobs + + ctx context.Context + started chan struct{} + api dyncfgAPI + addCh chan confgroup.Config + rmCh chan confgroup.Config + dyncfgCh chan functions.Function + + waitCfgOnOff string // block processing of discovered configs until "enable"/"disable" is received from Netdata +} + +func (m *Manager) Run(ctx context.Context, in chan []*confgroup.Group) { + m.Info("instance is started") + defer func() { m.cleanup(); m.Info("instance is stopped") }() + m.ctx = ctx + + m.FnReg.Register("config", m.dyncfgConfig) + + for name := range m.Modules { + m.dyncfgModuleCreate(name) + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { defer wg.Done(); m.runProcessConfGroups(in) }() + + wg.Add(1) + go func() { defer wg.Done(); m.run() }() + + wg.Add(1) + go func() { defer wg.Done(); m.runNotifyRunningJobs() }() + + close(m.started) + + wg.Wait() + <-m.ctx.Done() +} + +func (m *Manager) runProcessConfGroups(in chan []*confgroup.Group) { + for { + select { + case <-m.ctx.Done(): + return + case groups := <-in: + for _, gr := range groups { + a, r := m.discoveredConfigs.add(gr) + m.Debugf("received configs: %d/+%d/-%d ('%s')", len(gr.Configs), len(a), len(r), gr.Source) + sendConfigs(m.ctx, m.rmCh, r...) + sendConfigs(m.ctx, m.addCh, a...) + } + } + } +} + +func (m *Manager) run() { + for { + if m.waitCfgOnOff != "" { + select { + case <-m.ctx.Done(): + return + case fn := <-m.dyncfgCh: + m.dyncfgConfigExec(fn) + } + } else { + select { + case <-m.ctx.Done(): + return + case cfg := <-m.addCh: + m.addConfig(cfg) + case cfg := <-m.rmCh: + m.removeConfig(cfg) + case fn := <-m.dyncfgCh: + m.dyncfgConfigExec(fn) + } + } + } +} + +func (m *Manager) addConfig(cfg confgroup.Config) { + if _, ok := m.Modules.Lookup(cfg.Module()); !ok { + return + } + + m.retryingTasks.remove(cfg) + + scfg, ok := m.seenConfigs.lookup(cfg) + if !ok { + scfg = &seenConfig{cfg: cfg} + m.seenConfigs.add(scfg) + } + + ecfg, ok := m.exposedConfigs.lookup(cfg) + if !ok { + scfg.status = dyncfgAccepted + ecfg = scfg + m.exposedConfigs.add(ecfg) + } else { + sp, ep := scfg.cfg.SourceTypePriority(), ecfg.cfg.SourceTypePriority() + if ep > sp || (ep == sp && ecfg.status == dyncfgRunning) { + m.retryingTasks.remove(cfg) + return + } + if ecfg.status == dyncfgRunning { + m.stopRunningJob(ecfg.cfg.FullName()) + m.FileLock.Unlock(ecfg.cfg.FullName()) + m.FileStatus.Remove(ecfg.cfg) + } + scfg.status = dyncfgAccepted + m.exposedConfigs.add(scfg) // replace existing exposed + ecfg = scfg + } + + m.dyncfgJobCreate(ecfg.cfg, ecfg.status) + + if isTerminal || m.PluginName == "nodyncfg" { // FIXME: quick fix of TestAgent_Run (agent_test.go) + m.dyncfgConfigEnable(functions.Function{Args: []string{dyncfgJobID(ecfg.cfg), "enable"}}) + } else { + m.waitCfgOnOff = ecfg.cfg.FullName() + } +} + +func (m *Manager) removeConfig(cfg confgroup.Config) { + m.retryingTasks.remove(cfg) + + scfg, ok := m.seenConfigs.lookup(cfg) + if !ok { + return + } + m.seenConfigs.remove(cfg) + + ecfg, ok := m.exposedConfigs.lookup(cfg) + if !ok || scfg.cfg.UID() != ecfg.cfg.UID() { + return + } + + m.exposedConfigs.remove(cfg) + m.stopRunningJob(cfg.FullName()) + m.FileLock.Unlock(cfg.FullName()) + m.FileStatus.Remove(cfg) + + if !isStock(cfg) || ecfg.status == dyncfgRunning { + m.dyncfgJobRemove(cfg) + } +} + +func (m *Manager) runNotifyRunningJobs() { + tk := ticker.New(time.Second) + defer tk.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case clock := <-tk.C: + m.runningJobs.lock() + m.runningJobs.forEach(func(_ string, job *module.Job) { job.Tick(clock) }) + m.runningJobs.unlock() + } + } +} + +func (m *Manager) startRunningJob(job *module.Job) { + m.runningJobs.lock() + defer m.runningJobs.unlock() + + if job, ok := m.runningJobs.lookup(job.FullName()); ok { + job.Stop() + } + + go job.Start() + m.runningJobs.add(job.FullName(), job) +} + +func (m *Manager) stopRunningJob(name string) { + m.runningJobs.lock() + defer m.runningJobs.unlock() + + if job, ok := m.runningJobs.lookup(name); ok { + job.Stop() + m.runningJobs.remove(name) + } +} + +func (m *Manager) cleanup() { + m.FnReg.Unregister("config") + + m.runningJobs.lock() + defer m.runningJobs.unlock() + + m.runningJobs.forEach(func(key string, job *module.Job) { + job.Stop() + }) +} + +func (m *Manager) createCollectorJob(cfg confgroup.Config) (*module.Job, error) { + creator, ok := m.Modules[cfg.Module()] + if !ok { + return nil, fmt.Errorf("can not find %s module", cfg.Module()) + } + + var vnode struct { + guid string + hostname string + labels map[string]string + } + + if cfg.Vnode() != "" { + n, ok := m.Vnodes.Lookup(cfg.Vnode()) + if !ok { + return nil, fmt.Errorf("vnode '%s' is not found", cfg.Vnode()) + } + + vnode.guid = n.GUID + vnode.hostname = n.Hostname + vnode.labels = n.Labels + } + + m.Debugf("creating %s[%s] job, config: %v", cfg.Module(), cfg.Name(), cfg) + + mod := creator.Create() + + if err := applyConfig(cfg, mod); err != nil { + return nil, err + } + + jobCfg := module.JobConfig{ + PluginName: m.PluginName, + Name: cfg.Name(), + ModuleName: cfg.Module(), + FullName: cfg.FullName(), + UpdateEvery: cfg.UpdateEvery(), + AutoDetectEvery: cfg.AutoDetectionRetry(), + Priority: cfg.Priority(), + Labels: makeLabels(cfg), + IsStock: cfg.SourceType() == "stock", + Module: mod, + Out: m.Out, + VnodeGUID: vnode.guid, + VnodeHostname: vnode.hostname, + VnodeLabels: vnode.labels, + } + + job := module.NewJob(jobCfg) + + return job, nil +} + +func runRetryTask(ctx context.Context, out chan<- confgroup.Config, cfg confgroup.Config) { + t := time.NewTimer(time.Second * time.Duration(cfg.AutoDetectionRetry())) + defer t.Stop() + + select { + case <-ctx.Done(): + case <-t.C: + sendConfigs(ctx, out, cfg) + } +} + +func sendConfigs(ctx context.Context, out chan<- confgroup.Config, cfgs ...confgroup.Config) { + for _, cfg := range cfgs { + select { + case <-ctx.Done(): + return + case out <- cfg: + } + } +} + +func isStock(cfg confgroup.Config) bool { + return cfg.SourceType() == confgroup.TypeStock +} + +func isDyncfg(cfg confgroup.Config) bool { + return cfg.SourceType() == confgroup.TypeDyncfg +} + +func applyConfig(cfg confgroup.Config, module any) error { + bs, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return yaml.Unmarshal(bs, module) +} + +func makeLabels(cfg confgroup.Config) map[string]string { + labels := make(map[string]string) + for name, value := range cfg.Labels() { + n, ok1 := name.(string) + v, ok2 := value.(string) + if ok1 && ok2 { + labels[n] = v + } + } + return labels +} diff --git a/src/go/plugin/go.d/agent/jobmgr/manager_test.go b/src/go/plugin/go.d/agent/jobmgr/manager_test.go new file mode 100644 index 000000000..1b55a8308 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/manager_test.go @@ -0,0 +1,1892 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" +) + +func TestManager_Run(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "stock => ok: add and remove": { + createSim: func() *runSim { + cfg := prepareStockCfg("success", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + + sendConfGroup(in, cfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:success:name create accepted job /collectors/jobs stock 'type=stock,module=success,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:name status running + +CONFIG go.d:collector:success:name delete +`, + } + }, + }, + "stock => nok: add": { + createSim: func() *runSim { + cfg := prepareStockCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + }, + wantDiscovered: []confgroup.Config{cfg}, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgFailed}, + }, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name delete +`, + } + }, + }, + "stock => nok: add and remove": { + createSim: func() *runSim { + cfg := prepareStockCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + + sendConfGroup(in, cfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name delete +`, + } + }, + }, + "user => ok: add and remove": { + createSim: func() *runSim { + cfg := prepareUserCfg("success", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + + sendConfGroup(in, cfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:success:name create accepted job /collectors/jobs user 'type=user,module=success,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:name status running + +CONFIG go.d:collector:success:name delete + `, + } + }, + }, + "user => nok: add and remove": { + createSim: func() *runSim { + cfg := prepareUserCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + + sendConfGroup(in, cfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + +CONFIG go.d:collector:fail:name delete + `, + } + }, + }, + "disc => ok: add and remove": { + createSim: func() *runSim { + cfg := prepareDiscoveredCfg("success", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + + sendConfGroup(in, cfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:success:name create accepted job /collectors/jobs discovered 'type=discovered,module=success,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:name status running + +CONFIG go.d:collector:success:name delete + `, + } + }, + }, + "disc => nok: add and remove": { + createSim: func() *runSim { + cfg := prepareDiscoveredCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, cfg.Source(), cfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + + sendConfGroup(in, cfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + +CONFIG go.d:collector:fail:name delete + `, + } + }, + }, + "non-dyncfg => nok: diff src, diff name: add": { + createSim: func() *runSim { + stockCfg := prepareStockCfg("fail", "stock") + discCfg := prepareDiscoveredCfg("fail", "discovered") + userCfg := prepareUserCfg("fail", "user") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, stockCfg.Source(), stockCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(stockCfg), "enable"}, + }) + + sendConfGroup(in, discCfg.Source(), discCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(discCfg), "enable"}, + }) + + sendConfGroup(in, userCfg.Source(), userCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "3-enable", + Args: []string{dyncfgJobID(userCfg), "enable"}, + }) + }, + wantDiscovered: []confgroup.Config{ + stockCfg, + userCfg, + discCfg, + }, + wantSeen: []seenConfig{ + {cfg: stockCfg, status: dyncfgFailed}, + {cfg: discCfg, status: dyncfgFailed}, + {cfg: userCfg, status: dyncfgFailed}, + }, + wantExposed: []seenConfig{ + {cfg: discCfg, status: dyncfgFailed}, + {cfg: userCfg, status: dyncfgFailed}, + }, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:stock create accepted job /collectors/jobs stock 'type=stock,module=fail,job=stock' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:stock delete + +CONFIG go.d:collector:fail:discovered create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=discovered' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:discovered status failed + +CONFIG go.d:collector:fail:user create accepted job /collectors/jobs user 'type=user,module=fail,job=user' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 3-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:user status failed + `, + } + }, + }, + "non-dyncfg => nok: diff src,src prio asc,same name: add": { + createSim: func() *runSim { + stockCfg := prepareStockCfg("fail", "name") + discCfg := prepareDiscoveredCfg("fail", "name") + userCfg := prepareUserCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, stockCfg.Source(), stockCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(stockCfg), "enable"}, + }) + + sendConfGroup(in, discCfg.Source(), discCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(discCfg), "enable"}, + }) + + sendConfGroup(in, userCfg.Source(), userCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "3-enable", + Args: []string{dyncfgJobID(userCfg), "enable"}, + }) + }, + wantDiscovered: []confgroup.Config{ + stockCfg, + userCfg, + discCfg, + }, + wantSeen: []seenConfig{ + {cfg: stockCfg, status: dyncfgFailed}, + {cfg: discCfg, status: dyncfgFailed}, + {cfg: userCfg, status: dyncfgFailed}, + }, + wantExposed: []seenConfig{ + {cfg: userCfg, status: dyncfgFailed}, + }, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name delete + +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 3-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + `, + } + }, + }, + "non-dyncfg => nok: diff src,src prio asc,same name: add and remove": { + createSim: func() *runSim { + stockCfg := prepareStockCfg("fail", "name") + discCfg := prepareDiscoveredCfg("fail", "name") + userCfg := prepareUserCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, stockCfg.Source(), stockCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(stockCfg), "enable"}, + }) + + sendConfGroup(in, discCfg.Source(), discCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(discCfg), "enable"}, + }) + + sendConfGroup(in, userCfg.Source(), userCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "3-enable", + Args: []string{dyncfgJobID(userCfg), "enable"}, + }) + + sendConfGroup(in, stockCfg.Source()) + sendConfGroup(in, discCfg.Source()) + sendConfGroup(in, userCfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs stock 'type=stock,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name delete + +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs discovered 'type=discovered,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 3-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + +CONFIG go.d:collector:fail:name delete + `, + } + }, + }, + "non-dyncfg => nok: diff src,src prio desc,same name: add": { + createSim: func() *runSim { + userCfg := prepareUserCfg("fail", "name") + discCfg := prepareDiscoveredCfg("fail", "name") + stockCfg := prepareStockCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, userCfg.Source(), userCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(userCfg), "enable"}, + }) + + sendConfGroup(in, discCfg.Source(), discCfg) + sendConfGroup(in, stockCfg.Source(), stockCfg) + }, + wantDiscovered: []confgroup.Config{ + stockCfg, + userCfg, + discCfg, + }, + wantSeen: []seenConfig{ + {cfg: userCfg, status: dyncfgFailed}, + {cfg: discCfg}, + {cfg: stockCfg}, + }, + wantExposed: []seenConfig{ + {cfg: userCfg, status: dyncfgFailed}, + }, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + `, + } + }, + }, + "non-dyncfg => nok: diff src,src prio desc,same name: add and remove": { + createSim: func() *runSim { + userCfg := prepareUserCfg("fail", "name") + discCfg := prepareDiscoveredCfg("fail", "name") + stockCfg := prepareStockCfg("fail", "name") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, userCfg.Source(), userCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(userCfg), "enable"}, + }) + + sendConfGroup(in, discCfg.Source(), discCfg) + sendConfGroup(in, stockCfg.Source(), stockCfg) + + sendConfGroup(in, userCfg.Source()) + sendConfGroup(in, discCfg.Source()) + sendConfGroup(in, stockCfg.Source()) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +CONFIG go.d:collector:fail:name create accepted job /collectors/jobs user 'type=user,module=fail,job=name' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:name status failed + +CONFIG go.d:collector:fail:name delete + `, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Get(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[get] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-get", + Args: []string{dyncfgJobID(cfg), "get"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-get 404 application/json +{"status":404,"message":"The specified module 'success' job 'test' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[get] existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test"). + Set("option_str", "1"). + Set("option_int", 1) + bs, _ := json.Marshal(cfg) + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: bs, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-get", + Args: []string{dyncfgJobID(cfg), "get"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-get 200 application/json +{"option_str":"1","option_int":1} +FUNCTION_RESULT_END +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Userconfig(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[userconfig] existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-userconfig", + Args: []string{dyncfgJobID(cfg), "userconfig"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +FUNCTION_RESULT_BEGIN 1-userconfig 200 application/yaml +jobs: +- name: test + option_one: one + option_two: 2 + +FUNCTION_RESULT_END +`, + } + }, + }, + "[userconfig] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success!", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-userconfig", + Args: []string{dyncfgJobID(cfg), "userconfig"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` +FUNCTION_RESULT_BEGIN 1-userconfig 404 application/json +{"status":404,"message":"The specified module 'success!' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Add(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[add] dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 +`, + } + }, + }, + "[add] dyncfg:nok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("fail", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 +`, + } + }, + }, + "[add] dyncfg:ok twice": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Enable(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[enable] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-enable 404 application/json +{"status":404,"message":"The specified module 'success' job 'test' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[enable] dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantRunning: []string{cfg.FullName()}, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running +`, + } + }, + }, + "[enable] dyncfg:ok twice": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantRunning: []string{cfg.FullName()}, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running + +FUNCTION_RESULT_BEGIN 3-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running +`, + } + }, + }, + "[enable] dyncfg:nok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("fail", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgFailed}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgFailed}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test status failed +`, + } + }, + }, + "[enable] dyncfg:nok twice": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("fail", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgFailed}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgFailed}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test status failed + +FUNCTION_RESULT_BEGIN 3-enable 200 application/json +{"status":200,"message":"Job enable failed: mock failed init."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test status failed +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Disable(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[disable] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-disable 404 application/json +{"status":404,"message":"The specified module 'success' job 'test' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[disable] dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled +`, + } + }, + }, + "[disable] dyncfg:ok twice": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled + +FUNCTION_RESULT_BEGIN 3-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled +`, + } + }, + }, + "[disable] dyncfg:nok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("fail", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test status disabled +`, + } + }, + }, + "[disable] dyncfg:nok twice": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("fail", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=fail,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test status disabled + +FUNCTION_RESULT_BEGIN 3-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:fail:test status disabled +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Restart(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[restart] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-restart", + Args: []string{dyncfgJobID(cfg), "restart"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-restart 404 application/json +{"status":404,"message":"The specified module 'success' job 'test' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[restart] not enabled dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-restart", + Args: []string{dyncfgJobID(cfg), "restart"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgAccepted}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-restart 405 application/json +{"status":405,"message":"Restarting data collection job is not allowed in 'accepted' state."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status accepted +`, + } + }, + }, + "[restart] enabled dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-restart", + Args: []string{dyncfgJobID(cfg), "restart"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantRunning: []string{cfg.FullName()}, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running + +FUNCTION_RESULT_BEGIN 3-restart 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running +`, + } + }, + }, + "[restart] disabled dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-disable", + Args: []string{dyncfgJobID(cfg), "disable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-restart", + Args: []string{dyncfgJobID(cfg), "restart"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgDisabled}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled + +FUNCTION_RESULT_BEGIN 3-restart 405 application/json +{"status":405,"message":"Restarting data collection job is not allowed in 'disabled' state."} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled +`, + } + }, + }, + "[restart] enabled dyncfg:ok multiple times": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-restart", + Args: []string{dyncfgJobID(cfg), "restart"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "4-restart", + Args: []string{dyncfgJobID(cfg), "restart"}, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantExposed: []seenConfig{ + {cfg: cfg, status: dyncfgRunning}, + }, + wantRunning: []string{cfg.FullName()}, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running + +FUNCTION_RESULT_BEGIN 3-restart 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running + +FUNCTION_RESULT_BEGIN 4-restart 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Remove(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[remove] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-remove", + Args: []string{dyncfgJobID(cfg), "remove"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-remove 404 application/json +{"status":404,"message":"The specified module 'success' job 'test' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[remove] non-dyncfg": { + createSim: func() *runSim { + stockCfg := prepareStockCfg("success", "stock") + userCfg := prepareUserCfg("success", "user") + discCfg := prepareDiscoveredCfg("success", "discovered") + + return &runSim{ + do: func(mgr *Manager, in chan []*confgroup.Group) { + sendConfGroup(in, stockCfg.Source(), stockCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "1-enable", + Args: []string{dyncfgJobID(stockCfg), "enable"}, + }) + + sendConfGroup(in, userCfg.Source(), userCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(userCfg), "enable"}, + }) + + sendConfGroup(in, discCfg.Source(), discCfg) + mgr.dyncfgConfig(functions.Function{ + UID: "3-enable", + Args: []string{dyncfgJobID(discCfg), "enable"}, + }) + + mgr.dyncfgConfig(functions.Function{ + UID: "1-remove", + Args: []string{dyncfgJobID(stockCfg), "remove"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-remove", + Args: []string{dyncfgJobID(userCfg), "remove"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-remove", + Args: []string{dyncfgJobID(discCfg), "remove"}, + }) + }, + wantDiscovered: []confgroup.Config{ + stockCfg, + userCfg, + discCfg, + }, + wantSeen: []seenConfig{ + {cfg: stockCfg, status: dyncfgRunning}, + {cfg: userCfg, status: dyncfgRunning}, + {cfg: discCfg, status: dyncfgRunning}, + }, + wantExposed: []seenConfig{ + {cfg: stockCfg, status: dyncfgRunning}, + {cfg: userCfg, status: dyncfgRunning}, + {cfg: discCfg, status: dyncfgRunning}, + }, + wantRunning: []string{stockCfg.FullName(), userCfg.FullName(), discCfg.FullName()}, + wantDyncfg: ` +CONFIG go.d:collector:success:stock create accepted job /collectors/jobs stock 'type=stock,module=success,job=stock' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 1-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:stock status running + +CONFIG go.d:collector:success:user create accepted job /collectors/jobs user 'type=user,module=success,job=user' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:user status running + +CONFIG go.d:collector:success:discovered create accepted job /collectors/jobs discovered 'type=discovered,module=success,job=discovered' 'schema get enable disable update restart test userconfig' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 3-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:discovered status running + +FUNCTION_RESULT_BEGIN 1-remove 405 application/json +{"status":405,"message":"Removing jobs of type 'stock' is not supported. Only 'dyncfg' jobs can be removed."} +FUNCTION_RESULT_END + +FUNCTION_RESULT_BEGIN 2-remove 405 application/json +{"status":405,"message":"Removing jobs of type 'user' is not supported. Only 'dyncfg' jobs can be removed."} +FUNCTION_RESULT_END + +FUNCTION_RESULT_BEGIN 3-remove 405 application/json +{"status":405,"message":"Removing jobs of type 'discovered' is not supported. Only 'dyncfg' jobs can be removed."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[remove] not enabled dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-remove", + Args: []string{dyncfgJobID(cfg), "remove"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-remove 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test delete +`, + } + }, + }, + "[remove] enabled dyncfg:ok": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(cfg.Module()), "add", cfg.Name()}, + Payload: []byte("{}"), + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(cfg), "enable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-remove", + Args: []string{dyncfgJobID(cfg), "remove"}, + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running + +FUNCTION_RESULT_BEGIN 3-remove 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test delete +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func TestManager_Run_Dyncfg_Update(t *testing.T) { + tests := map[string]struct { + createSim func() *runSim + }{ + "[update] non-existing": { + createSim: func() *runSim { + cfg := prepareDyncfgCfg("success", "test") + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-update", + Args: []string{dyncfgJobID(cfg), "update"}, + Payload: []byte("{}"), + }) + }, + wantDiscovered: nil, + wantSeen: nil, + wantExposed: nil, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-update 404 application/json +{"status":404,"message":"The specified module 'success' job 'test' is not registered."} +FUNCTION_RESULT_END +`, + } + }, + }, + "[update] enabled dyncfg:ok with dyncfg:ok": { + createSim: func() *runSim { + origCfg := prepareDyncfgCfg("success", "test"). + Set("option_str", "1") + updCfg := prepareDyncfgCfg("success", "test"). + Set("option_str", "2") + origBs, _ := json.Marshal(origCfg) + updBs, _ := json.Marshal(updCfg) + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(origCfg.Module()), "add", origCfg.Name()}, + Payload: origBs, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-enable", + Args: []string{dyncfgJobID(origCfg), "enable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-update", + Args: []string{dyncfgJobID(origCfg), "update"}, + Payload: updBs, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: updCfg, status: dyncfgRunning}, + }, + wantExposed: []seenConfig{ + {cfg: updCfg, status: dyncfgRunning}, + }, + wantRunning: []string{updCfg.FullName()}, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-enable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running + +FUNCTION_RESULT_BEGIN 3-update 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status running +`, + } + }, + }, + "[update] disabled dyncfg:ok with dyncfg:ok": { + createSim: func() *runSim { + origCfg := prepareDyncfgCfg("success", "test"). + Set("option_str", "1") + updCfg := prepareDyncfgCfg("success", "test"). + Set("option_str", "2") + origBs, _ := json.Marshal(origCfg) + updBs, _ := json.Marshal(updCfg) + + return &runSim{ + do: func(mgr *Manager, _ chan []*confgroup.Group) { + mgr.dyncfgConfig(functions.Function{ + UID: "1-add", + Args: []string{dyncfgModID(origCfg.Module()), "add", origCfg.Name()}, + Payload: origBs, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "2-disable", + Args: []string{dyncfgJobID(origCfg), "disable"}, + }) + mgr.dyncfgConfig(functions.Function{ + UID: "3-update", + Args: []string{dyncfgJobID(origCfg), "update"}, + Payload: updBs, + }) + }, + wantDiscovered: nil, + wantSeen: []seenConfig{ + {cfg: updCfg, status: dyncfgDisabled}, + }, + wantExposed: []seenConfig{ + {cfg: updCfg, status: dyncfgDisabled}, + }, + wantRunning: nil, + wantDyncfg: ` + +FUNCTION_RESULT_BEGIN 1-add 202 application/json +{"status":202,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test create accepted job /collectors/jobs dyncfg 'type=dyncfg,module=success,job=test' 'schema get enable disable update restart test userconfig remove' 0x0000 0x0000 + +FUNCTION_RESULT_BEGIN 2-disable 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled + +FUNCTION_RESULT_BEGIN 3-update 200 application/json +{"status":200,"message":""} +FUNCTION_RESULT_END + +CONFIG go.d:collector:success:test status disabled +`, + } + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + sim := test.createSim() + sim.run(t) + }) + } +} + +func sendConfGroup(in chan []*confgroup.Group, src string, configs ...confgroup.Config) { + in <- prepareCfgGroups(src, configs...) + in <- prepareCfgGroups("_") +} + +func prepareCfgGroups(src string, configs ...confgroup.Config) []*confgroup.Group { + return []*confgroup.Group{{Configs: configs, Source: src}} +} + +func prepareStockCfg(module, job string) confgroup.Config { + return confgroup.Config{}. + SetSourceType(confgroup.TypeStock). + SetProvider("test"). + SetSource(fmt.Sprintf("type=stock,module=%s,job=%s", module, job)). + SetModule(module). + SetName(job) +} + +func prepareUserCfg(module, job string) confgroup.Config { + return confgroup.Config{}. + SetSourceType(confgroup.TypeUser). + SetProvider("test"). + SetSource(fmt.Sprintf("type=user,module=%s,job=%s", module, job)). + SetModule(module). + SetName(job) +} + +func prepareDiscoveredCfg(module, job string) confgroup.Config { + return confgroup.Config{}. + SetSourceType(confgroup.TypeDiscovered). + SetProvider("test"). + SetSource(fmt.Sprintf("type=discovered,module=%s,job=%s", module, job)). + SetModule(module). + SetName(job) +} + +func prepareDyncfgCfg(module, job string) confgroup.Config { + return confgroup.Config{}. + SetSourceType(confgroup.TypeDyncfg). + SetProvider("dyncfg"). + SetSource(fmt.Sprintf("type=dyncfg,module=%s,job=%s", module, job)). + SetModule(module). + SetName(job) +} diff --git a/src/go/plugin/go.d/agent/jobmgr/noop.go b/src/go/plugin/go.d/agent/jobmgr/noop.go new file mode 100644 index 000000000..adeacf906 --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/noop.go @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/functions" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/vnodes" +) + +type noop struct{} + +func (n noop) Lock(string) (bool, error) { return true, nil } +func (n noop) Unlock(string) {} +func (n noop) Save(confgroup.Config, string) {} +func (n noop) Remove(confgroup.Config) {} +func (n noop) Contains(confgroup.Config, ...string) bool { return false } +func (n noop) Lookup(string) (*vnodes.VirtualNode, bool) { return nil, false } +func (n noop) Register(name string, reg func(functions.Function)) {} +func (n noop) Unregister(name string) {} diff --git a/src/go/plugin/go.d/agent/jobmgr/sim_test.go b/src/go/plugin/go.d/agent/jobmgr/sim_test.go new file mode 100644 index 000000000..9fe67175a --- /dev/null +++ b/src/go/plugin/go.d/agent/jobmgr/sim_test.go @@ -0,0 +1,152 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "bytes" + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/confgroup" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/netdataapi" + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/safewriter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type runSim struct { + do func(mgr *Manager, in chan []*confgroup.Group) + + wantDiscovered []confgroup.Config + wantSeen []seenConfig + wantExposed []seenConfig + wantRunning []string + wantDyncfg string +} + +func (s *runSim) run(t *testing.T) { + t.Helper() + + require.NotNil(t, s.do, "s.do is nil") + + var buf bytes.Buffer + mgr := New() + mgr.api = netdataapi.New(safewriter.New(&buf)) + mgr.Modules = prepareMockRegistry() + + done := make(chan struct{}) + grpCh := make(chan []*confgroup.Group) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { defer close(done); defer close(grpCh); mgr.Run(ctx, grpCh) }() + + timeout := time.Second * 5 + + select { + case <-mgr.started: + case <-time.After(timeout): + t.Errorf("failed to start work in %s", timeout) + } + + s.do(mgr, grpCh) + cancel() + + select { + case <-done: + case <-time.After(timeout): + t.Errorf("failed to finish work in %s", timeout) + } + + var lines []string + for _, s := range strings.Split(buf.String(), "\n") { + if strings.HasPrefix(s, "CONFIG") && strings.Contains(s, " template ") { + continue + } + if strings.HasPrefix(s, "FUNCTION_RESULT_BEGIN") { + parts := strings.Fields(s) + s = strings.Join(parts[:len(parts)-1], " ") // remove timestamp + } + lines = append(lines, s) + } + wantDyncfg, gotDyncfg := strings.TrimSpace(s.wantDyncfg), strings.TrimSpace(strings.Join(lines, "\n")) + + //fmt.Println(gotDyncfg) + + assert.Equal(t, wantDyncfg, gotDyncfg, "dyncfg commands") + + var n int + for _, cfgs := range mgr.discoveredConfigs.items { + n += len(cfgs) + } + + wantLen, gotLen := len(s.wantDiscovered), n + require.Equalf(t, wantLen, gotLen, "discoveredConfigs: different len (want %d got %d)", wantLen, gotLen) + + for _, cfg := range s.wantDiscovered { + cfgs, ok := mgr.discoveredConfigs.items[cfg.Source()] + require.Truef(t, ok, "discoveredConfigs: source %s is not found", cfg.Source()) + _, ok = cfgs[cfg.Hash()] + require.Truef(t, ok, "discoveredConfigs: source %s config %d is not found", cfg.Source(), cfg.Hash()) + } + + wantLen, gotLen = len(s.wantSeen), len(mgr.seenConfigs.items) + require.Equalf(t, wantLen, gotLen, "seenConfigs: different len (want %d got %d)", wantLen, gotLen) + + for _, scfg := range s.wantSeen { + v, ok := mgr.seenConfigs.lookup(scfg.cfg) + require.Truef(t, ok, "seenConfigs: config '%s' is not found", scfg.cfg.UID()) + require.Truef(t, scfg.status == v.status, "seenConfigs: wrong status, want %s got %s", scfg.status, v.status) + } + + wantLen, gotLen = len(s.wantExposed), len(mgr.exposedConfigs.items) + require.Equalf(t, wantLen, gotLen, "exposedConfigs: different len (want %d got %d)", wantLen, gotLen) + + for _, scfg := range s.wantExposed { + v, ok := mgr.exposedConfigs.lookup(scfg.cfg) + require.Truef(t, ok && scfg.cfg.UID() == v.cfg.UID(), "exposedConfigs: config '%s' is not found", scfg.cfg.UID()) + require.Truef(t, scfg.status == v.status, "exposedConfigs: wrong status, want %s got %s", scfg.status, v.status) + } + + wantLen, gotLen = len(s.wantRunning), len(mgr.runningJobs.items) + require.Equalf(t, wantLen, gotLen, "runningJobs: different len (want %d got %d)", wantLen, gotLen) + for _, name := range s.wantRunning { + _, ok := mgr.runningJobs.lookup(name) + require.Truef(t, ok, "runningJobs: job '%s' is not found", name) + } +} + +func prepareMockRegistry() module.Registry { + reg := module.Registry{} + type config struct { + OptionOne string `yaml:"option_one" json:"option_one"` + OptionTwo int64 `yaml:"option_two" json:"option_two"` + } + + reg.Register("success", module.Creator{ + JobConfigSchema: module.MockConfigSchema, + Create: func() module.Module { + return &module.MockModule{ + ChartsFunc: func() *module.Charts { + return &module.Charts{&module.Chart{ID: "id", Title: "title", Units: "units", Dims: module.Dims{{ID: "id1"}}}} + }, + CollectFunc: func() map[string]int64 { return map[string]int64{"id1": 1} }, + } + }, + Config: func() any { + return &config{OptionOne: "one", OptionTwo: 2} + }, + }) + reg.Register("fail", module.Creator{ + Create: func() module.Module { + return &module.MockModule{ + InitFunc: func() error { return errors.New("mock failed init") }, + } + }, + }) + + return reg +} -- cgit v1.2.3