diff options
Diffstat (limited to 'src/go/collectors/go.d.plugin/agent/module/job.go')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/module/job.go | 645 |
1 files changed, 645 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/module/job.go b/src/go/collectors/go.d.plugin/agent/module/job.go new file mode 100644 index 000000000..cb15fdc2e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/module/job.go @@ -0,0 +1,645 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package module + +import ( + "bytes" + "errors" + "fmt" + "io" + "log/slog" + "os" + "regexp" + "runtime/debug" + "strings" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/netdataapi" + "github.com/netdata/netdata/go/go.d.plugin/agent/vnodes" + "github.com/netdata/netdata/go/go.d.plugin/logger" +) + +var obsoleteLock = &sync.Mutex{} +var obsoleteCharts = true + +func DontObsoleteCharts() { + obsoleteLock.Lock() + obsoleteCharts = false + obsoleteLock.Unlock() +} + +func shouldObsoleteCharts() bool { + obsoleteLock.Lock() + defer obsoleteLock.Unlock() + return obsoleteCharts +} + +var reSpace = regexp.MustCompile(`\s+`) + +var ndInternalMonitoringDisabled = os.Getenv("NETDATA_INTERNALS_MONITORING") == "NO" + +func newRuntimeChart(pluginName string) *Chart { + // this is needed to keep the same name as we had before https://github.com/netdata/netdata/go/go.d.plugin/issues/650 + ctxName := pluginName + if ctxName == "go.d" { + ctxName = "go" + } + ctxName = reSpace.ReplaceAllString(ctxName, "_") + return &Chart{ + typ: "netdata", + Title: "Execution time", + Units: "ms", + Fam: pluginName, + Ctx: fmt.Sprintf("netdata.%s_plugin_execution_time", ctxName), + Priority: 145000, + Dims: Dims{ + {ID: "time"}, + }, + } +} + +type JobConfig struct { + PluginName string + Name string + ModuleName string + FullName string + Module Module + Labels map[string]string + Out io.Writer + UpdateEvery int + AutoDetectEvery int + Priority int + IsStock bool + + VnodeGUID string + VnodeHostname string + VnodeLabels map[string]string +} + +const ( + penaltyStep = 5 + maxPenalty = 600 + infTries = -1 +) + +func NewJob(cfg JobConfig) *Job { + var buf bytes.Buffer + + if cfg.UpdateEvery == 0 { + cfg.UpdateEvery = 1 + } + + j := &Job{ + AutoDetectEvery: cfg.AutoDetectEvery, + AutoDetectTries: infTries, + + pluginName: cfg.PluginName, + name: cfg.Name, + moduleName: cfg.ModuleName, + fullName: cfg.FullName, + updateEvery: cfg.UpdateEvery, + priority: cfg.Priority, + isStock: cfg.IsStock, + module: cfg.Module, + labels: cfg.Labels, + out: cfg.Out, + runChart: newRuntimeChart(cfg.PluginName), + stop: make(chan struct{}), + tick: make(chan int), + buf: &buf, + api: netdataapi.New(&buf), + + vnodeGUID: cfg.VnodeGUID, + vnodeHostname: cfg.VnodeHostname, + vnodeLabels: cfg.VnodeLabels, + } + + log := logger.New().With( + slog.String("collector", j.ModuleName()), + slog.String("job", j.Name()), + ) + + j.Logger = log + if j.module != nil { + j.module.GetBase().Logger = log + } + + return j +} + +// Job represents a job. It's a module wrapper. +type Job struct { + pluginName string + name string + moduleName string + fullName string + + updateEvery int + AutoDetectEvery int + AutoDetectTries int + priority int + labels map[string]string + + *logger.Logger + + isStock bool + + module Module + + initialized bool + panicked bool + + runChart *Chart + charts *Charts + tick chan int + out io.Writer + buf *bytes.Buffer + api *netdataapi.API + + retries int + prevRun time.Time + + stop chan struct{} + + vnodeCreated bool + vnodeGUID string + vnodeHostname string + vnodeLabels map[string]string +} + +// NetdataChartIDMaxLength is the chart ID max length. See RRD_ID_LENGTH_MAX in the netdata source code. +const NetdataChartIDMaxLength = 1200 + +// FullName returns job full name. +func (j *Job) FullName() string { + return j.fullName +} + +// ModuleName returns job module name. +func (j *Job) ModuleName() string { + return j.moduleName +} + +// Name returns job name. +func (j *Job) Name() string { + return j.name +} + +// Panicked returns 'panicked' flag value. +func (j *Job) Panicked() bool { + return j.panicked +} + +// AutoDetectionEvery returns value of AutoDetectEvery. +func (j *Job) AutoDetectionEvery() int { + return j.AutoDetectEvery +} + +// RetryAutoDetection returns whether it is needed to retry autodetection. +func (j *Job) RetryAutoDetection() bool { + return j.AutoDetectEvery > 0 && (j.AutoDetectTries == infTries || j.AutoDetectTries > 0) +} + +func (j *Job) Configuration() any { + return j.module.Configuration() +} + +// AutoDetection invokes init, check and postCheck. It handles panic. +func (j *Job) AutoDetection() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic %v", err) + j.panicked = true + j.disableAutoDetection() + + j.Errorf("PANIC %v", r) + if logger.Level.Enabled(slog.LevelDebug) { + j.Errorf("STACK: %s", debug.Stack()) + } + } + if err != nil { + j.module.Cleanup() + } + }() + + if j.isStock { + j.Mute() + } + + if err = j.init(); err != nil { + j.Error("init failed") + j.Unmute() + j.disableAutoDetection() + return err + } + + if err = j.check(); err != nil { + j.Error("check failed") + j.Unmute() + return err + } + + j.Unmute() + j.Info("check success") + + if err = j.postCheck(); err != nil { + j.Error("postCheck failed") + j.disableAutoDetection() + return err + } + + return nil +} + +// Tick Tick. +func (j *Job) Tick(clock int) { + select { + case j.tick <- clock: + default: + j.Debug("skip the tick due to previous run hasn't been finished") + } +} + +// Start starts job main loop. +func (j *Job) Start() { + j.Infof("started, data collection interval %ds", j.updateEvery) + defer func() { j.Info("stopped") }() + +LOOP: + for { + select { + case <-j.stop: + break LOOP + case t := <-j.tick: + if t%(j.updateEvery+j.penalty()) == 0 { + j.runOnce() + } + } + } + j.module.Cleanup() + j.Cleanup() + j.stop <- struct{}{} +} + +// Stop stops job main loop. It blocks until the job is stopped. +func (j *Job) Stop() { + // TODO: should have blocking and non blocking stop + j.stop <- struct{}{} + <-j.stop +} + +func (j *Job) disableAutoDetection() { + j.AutoDetectEvery = 0 +} + +func (j *Job) Cleanup() { + j.buf.Reset() + if !shouldObsoleteCharts() { + return + } + + if !vnodes.Disabled { + if !j.vnodeCreated && j.vnodeGUID != "" { + _ = j.api.HOSTINFO(j.vnodeGUID, j.vnodeHostname, j.vnodeLabels) + j.vnodeCreated = true + } + _ = j.api.HOST(j.vnodeGUID) + } + + if j.runChart.created { + j.runChart.MarkRemove() + j.createChart(j.runChart) + } + if j.charts != nil { + for _, chart := range *j.charts { + if chart.created { + chart.MarkRemove() + j.createChart(chart) + } + } + } + + if j.buf.Len() > 0 { + _, _ = io.Copy(j.out, j.buf) + } +} + +func (j *Job) init() error { + if j.initialized { + return nil + } + + if err := j.module.Init(); err != nil { + return err + } + + j.initialized = true + + return nil +} + +func (j *Job) check() error { + if err := j.module.Check(); err != nil { + if j.AutoDetectTries != infTries { + j.AutoDetectTries-- + } + return err + } + return nil +} + +func (j *Job) postCheck() error { + if j.charts = j.module.Charts(); j.charts == nil { + j.Error("nil charts") + return errors.New("nil charts") + } + if err := checkCharts(*j.charts...); err != nil { + j.Errorf("charts check: %v", err) + return err + } + return nil +} + +func (j *Job) runOnce() { + curTime := time.Now() + sinceLastRun := calcSinceLastRun(curTime, j.prevRun) + j.prevRun = curTime + + metrics := j.collect() + + if j.panicked { + return + } + + if j.processMetrics(metrics, curTime, sinceLastRun) { + j.retries = 0 + } else { + j.retries++ + } + + _, _ = io.Copy(j.out, j.buf) + j.buf.Reset() +} + +func (j *Job) collect() (result map[string]int64) { + j.panicked = false + defer func() { + if r := recover(); r != nil { + j.panicked = true + j.Errorf("PANIC: %v", r) + if logger.Level.Enabled(slog.LevelDebug) { + j.Errorf("STACK: %s", debug.Stack()) + } + } + }() + return j.module.Collect() +} + +func (j *Job) processMetrics(metrics map[string]int64, startTime time.Time, sinceLastRun int) bool { + if !vnodes.Disabled { + if !j.vnodeCreated && j.vnodeGUID != "" { + _ = j.api.HOSTINFO(j.vnodeGUID, j.vnodeHostname, j.vnodeLabels) + j.vnodeCreated = true + } + + _ = j.api.HOST(j.vnodeGUID) + } + + if !ndInternalMonitoringDisabled && !j.runChart.created { + j.runChart.ID = fmt.Sprintf("execution_time_of_%s", j.FullName()) + j.createChart(j.runChart) + } + + elapsed := int64(durationTo(time.Since(startTime), time.Millisecond)) + + var i, updated int + for _, chart := range *j.charts { + if !chart.created { + typeID := fmt.Sprintf("%s.%s", j.FullName(), chart.ID) + if len(typeID) >= NetdataChartIDMaxLength { + j.Warningf("chart 'type.id' length (%d) >= max allowed (%d), the chart is ignored (%s)", + len(typeID), NetdataChartIDMaxLength, typeID) + chart.ignore = true + } + j.createChart(chart) + } + if chart.remove { + continue + } + (*j.charts)[i] = chart + i++ + if len(metrics) == 0 || chart.Obsolete { + continue + } + if j.updateChart(chart, metrics, sinceLastRun) { + updated++ + } + } + *j.charts = (*j.charts)[:i] + + if updated == 0 { + return false + } + if !ndInternalMonitoringDisabled { + j.updateChart(j.runChart, map[string]int64{"time": elapsed}, sinceLastRun) + } + + return true +} + +func (j *Job) createChart(chart *Chart) { + defer func() { chart.created = true }() + if chart.ignore { + return + } + + if chart.Priority == 0 { + chart.Priority = j.priority + j.priority++ + } + _ = j.api.CHART( + getChartType(chart, j), + getChartID(chart), + chart.OverID, + chart.Title, + chart.Units, + chart.Fam, + chart.Ctx, + chart.Type.String(), + chart.Priority, + j.updateEvery, + chart.Opts.String(), + j.pluginName, + j.moduleName, + ) + + if chart.Obsolete { + _ = j.api.EMPTYLINE() + return + } + + seen := make(map[string]bool) + for _, l := range chart.Labels { + if l.Key != "" { + seen[l.Key] = true + ls := l.Source + // the default should be auto + // https://github.com/netdata/netdata/blob/cc2586de697702f86a3c34e60e23652dd4ddcb42/database/rrd.h#L205 + if ls == 0 { + ls = LabelSourceAuto + } + _ = j.api.CLABEL(l.Key, l.Value, ls) + } + } + for k, v := range j.labels { + if !seen[k] { + _ = j.api.CLABEL(k, v, LabelSourceConf) + } + } + _ = j.api.CLABEL("_collect_job", j.Name(), LabelSourceAuto) + _ = j.api.CLABELCOMMIT() + + for _, dim := range chart.Dims { + _ = j.api.DIMENSION( + firstNotEmpty(dim.Name, dim.ID), + dim.Name, + dim.Algo.String(), + handleZero(dim.Mul), + handleZero(dim.Div), + dim.DimOpts.String(), + ) + } + for _, v := range chart.Vars { + if v.Name != "" { + _ = j.api.VARIABLE(v.Name, v.Value) + } else { + _ = j.api.VARIABLE(v.ID, v.Value) + } + } + _ = j.api.EMPTYLINE() +} + +func (j *Job) updateChart(chart *Chart, collected map[string]int64, sinceLastRun int) bool { + if chart.ignore { + dims := chart.Dims[:0] + for _, dim := range chart.Dims { + if !dim.remove { + dims = append(dims, dim) + } + } + chart.Dims = dims + return false + } + + if !chart.updated { + sinceLastRun = 0 + } + + _ = j.api.BEGIN( + getChartType(chart, j), + getChartID(chart), + sinceLastRun, + ) + var i, updated int + for _, dim := range chart.Dims { + if dim.remove { + continue + } + chart.Dims[i] = dim + i++ + if v, ok := collected[dim.ID]; !ok { + _ = j.api.SETEMPTY(firstNotEmpty(dim.Name, dim.ID)) + } else { + _ = j.api.SET(firstNotEmpty(dim.Name, dim.ID), v) + updated++ + } + } + chart.Dims = chart.Dims[:i] + + for _, vr := range chart.Vars { + if v, ok := collected[vr.ID]; ok { + if vr.Name != "" { + _ = j.api.VARIABLE(vr.Name, v) + } else { + _ = j.api.VARIABLE(vr.ID, v) + } + } + + } + _ = j.api.END() + + if chart.updated = updated > 0; chart.updated { + chart.Retries = 0 + } else { + chart.Retries++ + } + return chart.updated +} + +func (j *Job) penalty() int { + v := j.retries / penaltyStep * penaltyStep * j.updateEvery / 2 + if v > maxPenalty { + return maxPenalty + } + return v +} + +func getChartType(chart *Chart, j *Job) string { + if chart.typ != "" { + return chart.typ + } + if !chart.IDSep { + chart.typ = j.FullName() + } else if i := strings.IndexByte(chart.ID, '.'); i != -1 { + chart.typ = j.FullName() + "_" + chart.ID[:i] + } else { + chart.typ = j.FullName() + } + if chart.OverModule != "" { + if v := strings.TrimPrefix(chart.typ, j.ModuleName()); v != chart.typ { + chart.typ = chart.OverModule + v + } + } + return chart.typ +} + +func getChartID(chart *Chart) string { + if chart.id != "" { + return chart.id + } + if !chart.IDSep { + return chart.ID + } + if i := strings.IndexByte(chart.ID, '.'); i != -1 { + chart.id = chart.ID[i+1:] + } else { + chart.id = chart.ID + } + return chart.id +} + +func calcSinceLastRun(curTime, prevRun time.Time) int { + if prevRun.IsZero() { + return 0 + } + return int((curTime.UnixNano() - prevRun.UnixNano()) / 1000) +} + +func durationTo(duration time.Duration, to time.Duration) int { + return int(int64(duration) / (int64(to) / int64(time.Nanosecond))) +} + +func firstNotEmpty(val1, val2 string) string { + if val1 != "" { + return val1 + } + return val2 +} + +func handleZero(v int) int { + if v == 0 { + return 1 + } + return v +} |