diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:36:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 12:36:04 +0000 |
commit | b09c6d56832eb1718c07d74abf3bc6ae3fe4e030 (patch) | |
tree | d2caec2610d4ea887803ec9e9c3cd77136c448ba /pkg/icingaredis/telemetry/heartbeat.go | |
parent | Initial commit. (diff) | |
download | icingadb-b09c6d56832eb1718c07d74abf3bc6ae3fe4e030.tar.xz icingadb-b09c6d56832eb1718c07d74abf3bc6ae3fe4e030.zip |
Adding upstream version 1.1.0.upstream/1.1.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'pkg/icingaredis/telemetry/heartbeat.go')
-rw-r--r-- | pkg/icingaredis/telemetry/heartbeat.go | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go new file mode 100644 index 0000000..ee476a1 --- /dev/null +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -0,0 +1,203 @@ +package telemetry + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "regexp" + "runtime/metrics" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// ha represents icingadb.HA to avoid import cycles. +type ha interface { + State() (weResponsibleMilli int64, weResponsible, otherResponsible bool) +} + +type SuccessfulSync struct { + FinishMilli int64 + DurationMilli int64 +} + +// currentDbConnErr stores ongoing errors from database connections. +var currentDbConnErr struct { + mu sync.Mutex + message string + sinceMilli int64 +} + +// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr. +func UpdateCurrentDbConnErr(err error) { + now := time.Now().UnixMilli() + + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + if currentDbConnErr.sinceMilli >= now { + // Already updated with a more recent error, ignore this one. + return + } + + message := "" + if err != nil { + message = err.Error() + } + + if currentDbConnErr.message == message { + // Error stayed the same, no update needed, keeping the old timestamp. + return + } + + if currentDbConnErr.message == "" || message == "" { + // Either first error or recovery from an error, update timestamp. + currentDbConnErr.sinceMilli = now + } + + currentDbConnErr.message = message +} + +// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in +// milliseconds of the last change from OK to error or from error to OK. +func GetCurrentDbConnErr() (string, int64) { + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + return currentDbConnErr.message, currentDbConnErr.sinceMilli +} + +// OngoingSyncStartMilli is to be updated by the main() function. +var OngoingSyncStartMilli int64 + +// LastSuccessfulSync is to be updated by the main() function. +var LastSuccessfulSync com.Atomic[SuccessfulSync] + +var boolToStr = map[bool]string{false: "0", true: "1"} +var startTime = time.Now().UnixMilli() + +// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2. +func StartHeartbeat( + ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, +) { + goMetrics := NewGoMetrics() + + const interval = time.Second + + var lastErr string + var silenceUntil time.Time + + periodic.Start(ctx, interval, func(tick periodic.Tick) { + heartbeat := heartbeat.LastReceived() + responsibleTsMilli, responsible, otherResponsible := ha.State() + ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) + sync, _ := LastSuccessfulSync.Load() + dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() + now := time.Now() + + values := map[string]string{ + "version": internal.Version.Version, + "time": strconv.FormatInt(now.UnixMilli(), 10), + "start-time": strconv.FormatInt(startTime, 10), + "error": dbConnErr, + "error-since": strconv.FormatInt(dbConnErrSinceMilli, 10), + "performance-data": goMetrics.PerformanceData(), + "last-heartbeat-received": strconv.FormatInt(heartbeat, 10), + "ha-responsible": boolToStr[responsible], + "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha-other-responsible": boolToStr[otherResponsible], + "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10), + } + + ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) + defer cancel() + + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:heartbeat", + MaxLen: 1, + Values: values, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) { + logw := logger.Debugw + currentErr := err.Error() + + if currentErr != lastErr || now.After(silenceUntil) { + logw = logger.Warnw + lastErr = currentErr + silenceUntil = now.Add(time.Minute) + } + + logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd))) + } else { + lastErr = "" + silenceUntil = time.Time{} + } + }) +} + +type goMetrics struct { + names []string + units []string + samples []metrics.Sample +} + +func NewGoMetrics() *goMetrics { + m := &goMetrics{} + + forbiddenRe := regexp.MustCompile(`\W`) + + for _, d := range metrics.All() { + switch d.Kind { + case metrics.KindUint64, metrics.KindFloat64: + name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_") + + unit := "" + if strings.HasSuffix(d.Name, ":bytes") { + unit = "B" + } else if strings.HasSuffix(d.Name, ":seconds") { + unit = "s" + } else if d.Cumulative { + unit = "c" + } + + m.names = append(m.names, name) + m.units = append(m.units, unit) + m.samples = append(m.samples, metrics.Sample{Name: d.Name}) + } + } + + return m +} + +func (g *goMetrics) PerformanceData() string { + metrics.Read(g.samples) + + var buf strings.Builder + + for i, sample := range g.samples { + if i > 0 { + buf.WriteByte(' ') + } + + switch sample.Value.Kind() { + case metrics.KindUint64: + _, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i]) + case metrics.KindFloat64: + _, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i]) + } + } + + return buf.String() +} |