diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/jobmgr/dyncfg.go | 718 |
1 files changed, 718 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/jobmgr/dyncfg.go b/src/go/collectors/go.d.plugin/agent/jobmgr/dyncfg.go new file mode 100644 index 000000000..b74754638 --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/jobmgr/dyncfg.go @@ -0,0 +1,718 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package jobmgr + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strconv" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/agent/functions" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "gopkg.in/yaml.v2" +) + +type dyncfgStatus int + +const ( + _ dyncfgStatus = iota + dyncfgAccepted + dyncfgRunning + dyncfgFailed + dyncfgIncomplete + dyncfgDisabled +) + +func (s dyncfgStatus) String() string { + switch s { + case dyncfgAccepted: + return "accepted" + case dyncfgRunning: + return "running" + case dyncfgFailed: + return "failed" + case dyncfgIncomplete: + return "incomplete" + case dyncfgDisabled: + return "disabled" + default: + return "unknown" + } +} + +const ( + dyncfgIDPrefix = "go.d:collector:" + dyncfgPath = "/collectors/jobs" +) + +func dyncfgModID(name string) string { + return fmt.Sprintf("%s%s", dyncfgIDPrefix, name) +} +func dyncfgJobID(cfg confgroup.Config) string { + return fmt.Sprintf("%s%s:%s", dyncfgIDPrefix, cfg.Module(), cfg.Name()) +} + +func dyncfgModCmds() string { + return "add schema enable disable test" +} +func dyncfgJobCmds(cfg confgroup.Config) string { + cmds := "schema get enable disable update restart test" + if isDyncfg(cfg) { + cmds += " remove" + } + return cmds +} + +func (m *Manager) dyncfgModuleCreate(name string) { + id := dyncfgModID(name) + path := dyncfgPath + cmds := dyncfgModCmds() + typ := "template" + src := "internal" + m.api.CONFIGCREATE(id, dyncfgAccepted.String(), typ, path, src, src, cmds) +} + +func (m *Manager) dyncfgJobCreate(cfg confgroup.Config, status dyncfgStatus) { + id := dyncfgJobID(cfg) + path := dyncfgPath + cmds := dyncfgJobCmds(cfg) + typ := "job" + m.api.CONFIGCREATE(id, status.String(), typ, path, cfg.SourceType(), cfg.Source(), cmds) +} + +func (m *Manager) dyncfgJobRemove(cfg confgroup.Config) { + m.api.CONFIGDELETE(dyncfgJobID(cfg)) +} + +func (m *Manager) dyncfgJobStatus(cfg confgroup.Config, status dyncfgStatus) { + m.api.CONFIGSTATUS(dyncfgJobID(cfg), status.String()) +} + +func (m *Manager) dyncfgConfig(fn functions.Function) { + if len(fn.Args) < 2 { + m.Warningf("dyncfg: %s: missing required arguments, want 3 got %d", fn.Name, len(fn.Args)) + m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 2, but got %d.", len(fn.Args)) + return + } + + select { + case <-m.ctx.Done(): + m.dyncfgRespf(fn, 503, "Job manager is shutting down.") + default: + } + + action := strings.ToLower(fn.Args[1]) + switch action { + case "test": + m.dyncfgConfigTest(fn) + return + case "schema": + m.dyncfgConfigSchema(fn) + return + } + + select { + case <-m.ctx.Done(): + m.dyncfgRespf(fn, 503, "Job manager is shutting down.") + case m.dyncfgCh <- fn: + } +} + +func (m *Manager) dyncfgConfigExec(fn functions.Function) { + action := strings.ToLower(fn.Args[1]) + + //m.Infof("QQ FN(%s): '%s'", action, fn) + + switch action { + case "test": + m.dyncfgConfigTest(fn) + case "schema": + m.dyncfgConfigSchema(fn) + case "get": + m.dyncfgConfigGet(fn) + case "restart": + m.dyncfgConfigRestart(fn) + case "enable": + m.dyncfgConfigEnable(fn) + case "disable": + m.dyncfgConfigDisable(fn) + case "add": + m.dyncfgConfigAdd(fn) + case "remove": + m.dyncfgConfigRemove(fn) + case "update": + m.dyncfgConfigUpdate(fn) + default: + m.Warningf("dyncfg: function '%s' not implemented", fn.String()) + m.dyncfgRespf(fn, 501, "Function '%s' is not implemented.", fn.Name) + } +} + +func (m *Manager) dyncfgConfigTest(fn functions.Function) { + id := fn.Args[0] + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: test: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, + "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + creator, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: test: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + cfg, err := configFromPayload(fn) + if err != nil { + m.Warningf("dyncfg: test: module %s: failed to create config from payload: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + return + } + + cfg.SetModule(mn) + cfg.SetName("test") + + job := creator.Create() + + if err := applyConfig(cfg, job); err != nil { + m.Warningf("dyncfg: test: module %s: failed to apply config: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + return + } + + job.GetBase().Logger = logger.New().With( + slog.String("collector", cfg.Module()), + slog.String("job", cfg.Name()), + ) + + defer job.Cleanup() + + if err := job.Init(); err != nil { + m.dyncfgRespf(fn, 422, "Job initialization failed: %v", err) + return + } + if err := job.Check(); err != nil { + m.dyncfgRespf(fn, 422, "Job check failed: %v", err) + return + } + + m.dyncfgRespf(fn, 200, "") +} + +func (m *Manager) dyncfgConfigSchema(fn functions.Function) { + id := fn.Args[0] + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: schema: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + mod, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: schema: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + if mod.JobConfigSchema == "" { + m.Warningf("dyncfg: schema: module %s: schema not found", mn) + m.dyncfgRespf(fn, 500, "Module %s configuration schema not found.", mn) + return + } + + m.dyncfgRespPayload(fn, mod.JobConfigSchema) +} + +func (m *Manager) dyncfgConfigGet(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: get: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, + "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + creator, ok := m.Modules.Lookup(mn) + if !ok { + m.Warningf("dyncfg: get: module %s not found", mn) + m.dyncfgRespf(fn, 404, "The specified module '%s' is not registered.", mn) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: get: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + mod := creator.Create() + + if err := applyConfig(ecfg.cfg, mod); err != nil { + m.Warningf("dyncfg: get: module %s job %s failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + return + } + + conf := mod.Configuration() + if conf == nil { + m.Warningf("dyncfg: get: module %s: configuration not found", mn) + m.dyncfgRespf(fn, 500, "Module %s does not provide configuration.", mn) + return + } + + bs, err := json.Marshal(conf) + if err != nil { + m.Warningf("dyncfg: get: module %s job %s failed to json marshal config: %v", mn, jn, err) + m.dyncfgRespf(fn, 500, "Failed to convert configuration into JSON: %v.", err) + return + } + + m.dyncfgRespPayload(fn, string(bs)) +} + +func (m *Manager) dyncfgConfigRestart(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: restart: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: restart: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + job, err := m.createCollectorJob(ecfg.cfg) + if err != nil { + m.Warningf("dyncfg: restart: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + switch ecfg.status { + case dyncfgAccepted, dyncfgDisabled: + m.Warningf("dyncfg: restart: module %s job %s: restarting not allowed in '%s' state", mn, jn, ecfg.status) + m.dyncfgRespf(fn, 405, "Restarting data collection job is not allowed in '%s' state.", ecfg.status) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + case dyncfgRunning: + m.FileStatus.Remove(ecfg.cfg) + m.FileLock.Unlock(ecfg.cfg.FullName()) + m.stopRunningJob(ecfg.cfg.FullName()) + default: + } + + if err := job.AutoDetection(); err != nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 422, "Job restart failed: %v", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 500, "Job restart failed: cannot filelock.") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + ecfg.status = dyncfgRunning + + if isDyncfg(ecfg.cfg) { + m.FileStatus.Save(ecfg.cfg, ecfg.status.String()) + } + m.startRunningJob(job) + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) +} + +func (m *Manager) dyncfgConfigEnable(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: enable: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: enable: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + if ecfg.cfg.FullName() == m.waitCfgOnOff { + m.waitCfgOnOff = "" + } + + switch ecfg.status { + case dyncfgAccepted, dyncfgDisabled, dyncfgFailed: + default: + m.Warningf("dyncfg: enable: module %s job %s: enabling not allowed in %s state", mn, jn, ecfg.status) + m.dyncfgRespf(fn, 405, "Enabling data collection job is not allowed in '%s' state.", ecfg.status) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + job, err := m.createCollectorJob(ecfg.cfg) + if err != nil { + ecfg.status = dyncfgFailed + m.Warningf("dyncfg: enable: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + if err := job.AutoDetection(); err != nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 200, "Job enable failed: %v.", err) + + if isStock(ecfg.cfg) { + m.exposedConfigs.remove(ecfg.cfg) + m.dyncfgJobRemove(ecfg.cfg) + } else { + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + } + + if job.RetryAutoDetection() && !isDyncfg(ecfg.cfg) { + m.Infof("%s[%s] job detection failed, will retry in %d seconds", + ecfg.cfg.Module(), ecfg.cfg.Name(), job.AutoDetectionEvery()) + + ctx, cancel := context.WithCancel(m.ctx) + m.retryingTasks.add(ecfg.cfg, &retryTask{cancel: cancel}) + go runRetryTask(ctx, m.addCh, ecfg.cfg) + } + return + } + + if ok, err := m.FileLock.Lock(ecfg.cfg.FullName()); !ok && err == nil { + job.Cleanup() + ecfg.status = dyncfgFailed + m.dyncfgRespf(fn, 500, "Job enable failed: can not filelock.") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + ecfg.status = dyncfgRunning + + if isDyncfg(ecfg.cfg) { + m.FileStatus.Save(ecfg.cfg, ecfg.status.String()) + } + + m.startRunningJob(job) + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + +} + +func (m *Manager) dyncfgConfigDisable(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: disable: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: disable: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + if ecfg.cfg.FullName() == m.waitCfgOnOff { + m.waitCfgOnOff = "" + } + + switch ecfg.status { + case dyncfgDisabled: + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + case dyncfgRunning: + m.stopRunningJob(ecfg.cfg.FullName()) + if isDyncfg(ecfg.cfg) { + m.FileStatus.Remove(ecfg.cfg) + } + m.FileLock.Unlock(ecfg.cfg.FullName()) + default: + } + + ecfg.status = dyncfgDisabled + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) +} + +func (m *Manager) dyncfgConfigAdd(fn functions.Function) { + if len(fn.Args) < 3 { + m.Warningf("dyncfg: add: missing required arguments, want 3 got %d", len(fn.Args)) + m.dyncfgRespf(fn, 400, "Missing required arguments. Need at least 3, but got %d.", len(fn.Args)) + return + } + + id := fn.Args[0] + jn := fn.Args[2] + mn, ok := extractModuleName(id) + if !ok { + m.Warningf("dyncfg: add: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + if len(fn.Payload) == 0 { + m.Warningf("dyncfg: add: module %s job %s missing configuration payload.", mn, jn) + m.dyncfgRespf(fn, 400, "Missing configuration payload.") + return + } + + cfg, err := configFromPayload(fn) + if err != nil { + m.Warningf("dyncfg: add: module %s job %s: failed to create config from payload: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + return + } + + m.dyncfgSetConfigMeta(cfg, mn, jn) + + if _, err := m.createCollectorJob(cfg); err != nil { + m.Warningf("dyncfg: add: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + return + } + + if ecfg, ok := m.exposedConfigs.lookup(cfg); ok { + if scfg, ok := m.seenConfigs.lookup(ecfg.cfg); ok && isDyncfg(scfg.cfg) { + m.seenConfigs.remove(ecfg.cfg) + } + m.exposedConfigs.remove(ecfg.cfg) + m.stopRunningJob(ecfg.cfg.FullName()) + } + + scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted} + ecfg := scfg + m.seenConfigs.add(scfg) + m.exposedConfigs.add(ecfg) + + m.dyncfgRespf(fn, 202, "") + m.dyncfgJobCreate(ecfg.cfg, ecfg.status) +} + +func (m *Manager) dyncfgConfigRemove(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: remove: could not extract module and job from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module and job name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: remove: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + if !isDyncfg(ecfg.cfg) { + m.Warningf("dyncfg: remove: module %s job %s: can not remove jobs of type %s", mn, jn, ecfg.cfg.SourceType()) + m.dyncfgRespf(fn, 405, "Removing jobs of type '%s' is not supported. Only 'dyncfg' jobs can be removed.", ecfg.cfg.SourceType()) + return + } + + m.seenConfigs.remove(ecfg.cfg) + m.exposedConfigs.remove(ecfg.cfg) + m.stopRunningJob(ecfg.cfg.FullName()) + m.FileLock.Unlock(ecfg.cfg.FullName()) + m.FileStatus.Remove(ecfg.cfg) + + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobRemove(ecfg.cfg) +} + +func (m *Manager) dyncfgConfigUpdate(fn functions.Function) { + id := fn.Args[0] + mn, jn, ok := extractModuleJobName(id) + if !ok { + m.Warningf("dyncfg: update: could not extract module from id (%s)", id) + m.dyncfgRespf(fn, 400, "Invalid ID format. Could not extract module name from ID. Provided ID: %s.", id) + return + } + + ecfg, ok := m.exposedConfigs.lookupByName(mn, jn) + if !ok { + m.Warningf("dyncfg: update: module %s job %s not found", mn, jn) + m.dyncfgRespf(fn, 404, "The specified module '%s' job '%s' is not registered.", mn, jn) + return + } + + cfg, err := configFromPayload(fn) + if err != nil { + m.Warningf("dyncfg: update: module %s: failed to create config from payload: %v", mn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration format. Failed to create configuration from payload: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + m.dyncfgSetConfigMeta(cfg, mn, jn) + + if ecfg.status == dyncfgRunning && ecfg.cfg.UID() == cfg.UID() { + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + job, err := m.createCollectorJob(cfg) + if err != nil { + m.Warningf("dyncfg: update: module %s job %s: failed to apply config: %v", mn, jn, err) + m.dyncfgRespf(fn, 400, "Invalid configuration. Failed to apply configuration: %v.", err) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + if ecfg.status == dyncfgAccepted { + m.Warningf("dyncfg: update: module %s job %s: updating not allowed in %s", mn, jn, ecfg.status) + m.dyncfgRespf(fn, 403, "Updating data collection job is not allowed in '%s' state.", ecfg.status) + m.dyncfgJobStatus(ecfg.cfg, ecfg.status) + return + } + + m.exposedConfigs.remove(ecfg.cfg) + m.stopRunningJob(ecfg.cfg.FullName()) + + scfg := &seenConfig{cfg: cfg, status: dyncfgAccepted} + m.seenConfigs.add(scfg) + m.exposedConfigs.add(scfg) + + if isDyncfg(ecfg.cfg) { + m.seenConfigs.remove(ecfg.cfg) + } else { + // needed to update meta. There is no other way, unfortunately, but to send "create" + defer m.dyncfgJobCreate(scfg.cfg, scfg.status) + } + + if ecfg.status == dyncfgDisabled { + scfg.status = dyncfgDisabled + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(cfg, scfg.status) + return + } + + if err := job.AutoDetection(); err != nil { + job.Cleanup() + scfg.status = dyncfgFailed + m.dyncfgRespf(fn, 200, "Job update failed: %v", err) + m.dyncfgJobStatus(scfg.cfg, scfg.status) + return + } + + if ok, err := m.FileLock.Lock(scfg.cfg.FullName()); !ok && err == nil { + job.Cleanup() + scfg.status = dyncfgFailed + m.dyncfgRespf(fn, 500, "Job update failed: cannot create file lock.") + m.dyncfgJobStatus(scfg.cfg, scfg.status) + return + } + + scfg.status = dyncfgRunning + m.startRunningJob(job) + m.dyncfgRespf(fn, 200, "") + m.dyncfgJobStatus(scfg.cfg, scfg.status) +} + +func (m *Manager) dyncfgSetConfigMeta(cfg confgroup.Config, module, name string) { + cfg.SetProvider("dyncfg") + cfg.SetSource(fmt.Sprintf("type=dyncfg,module=%s,job=%s", module, name)) + cfg.SetSourceType("dyncfg") + cfg.SetModule(module) + cfg.SetName(name) + if def, ok := m.ConfigDefaults.Lookup(module); ok { + cfg.ApplyDefaults(def) + } +} + +func (m *Manager) dyncfgRespPayload(fn functions.Function, payload string) { + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", payload, "200", ts) +} + +func (m *Manager) dyncfgRespf(fn functions.Function, code int, msgf string, a ...any) { + if fn.UID == "" { + return + } + bs, _ := json.Marshal(struct { + Status int `json:"status"` + Message string `json:"message"` + }{ + Status: code, + Message: fmt.Sprintf(msgf, a...), + }) + ts := strconv.FormatInt(time.Now().Unix(), 10) + m.api.FUNCRESULT(fn.UID, "application/json", string(bs), strconv.Itoa(code), ts) +} + +func configFromPayload(fn functions.Function) (confgroup.Config, error) { + var cfg confgroup.Config + + if fn.ContentType == "application/json" { + if err := json.Unmarshal(fn.Payload, &cfg); err != nil { + return nil, err + } + + return cfg.Clone() + } + + if err := yaml.Unmarshal(fn.Payload, &cfg); err != nil { + return nil, err + } + + return cfg, nil +} + +func extractModuleJobName(id string) (mn string, jn string, ok bool) { + if mn, ok = extractModuleName(id); !ok { + return "", "", false + } + if jn, ok = extractJobName(id); !ok { + return "", "", false + } + return mn, jn, true +} + +func extractModuleName(id string) (string, bool) { + id = strings.TrimPrefix(id, dyncfgIDPrefix) + i := strings.IndexByte(id, ':') + if i == -1 { + return id, id != "" + } + return id[:i], true +} + +func extractJobName(id string) (string, bool) { + i := strings.LastIndexByte(id, ':') + if i == -1 { + return "", false + } + return id[i+1:], true +} |