diff options
Diffstat (limited to '')
-rw-r--r-- | pkg/icingaredis/heartbeat.go | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go new file mode 100644 index 0000000..9a8ebad --- /dev/null +++ b/pkg/icingaredis/heartbeat.go @@ -0,0 +1,221 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "sync" + "sync/atomic" + "time" +) + +// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// After this time, a heartbeat loss is propagated. +var timeout = 60 * time.Second + +// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. +// Also signals on if the heartbeat is Lost. +type Heartbeat struct { + active bool + events chan *HeartbeatMessage + lastReceivedMs int64 + cancelCtx context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *logging.Logger +} + +// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. +func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat { + ctx, cancelCtx := context.WithCancel(ctx) + + heartbeat := &Heartbeat{ + events: make(chan *HeartbeatMessage, 1), + cancelCtx: cancelCtx, + client: client, + done: make(chan struct{}), + logger: logger, + } + + go heartbeat.controller(ctx) + + return heartbeat +} + +// Events returns a channel that is sent to on heartbeat events. +// +// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. +func (h *Heartbeat) Events() <-chan *HeartbeatMessage { + return h.events +} + +// LastReceived returns the last heartbeat's receive time in ms. +func (h *Heartbeat) LastReceived() int64 { + return atomic.LoadInt64(&h.lastReceivedMs) +} + +// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (h *Heartbeat) Close() error { + h.cancelCtx() + <-h.Done() + + return h.Err() +} + +// Done returns a channel that will be closed when the heartbeat controller loop has ended. +func (h *Heartbeat) Done() <-chan struct{} { + return h.done +} + +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. +func (h *Heartbeat) Err() error { + h.errMu.Lock() + defer h.errMu.Unlock() + + return h.err +} + +// controller loop. +func (h *Heartbeat) controller(ctx context.Context) { + defer close(h.done) + + messages := make(chan *HeartbeatMessage) + defer close(messages) + + g, ctx := errgroup.WithContext(ctx) + + // Message producer loop. + g.Go(func() error { + // We expect heartbeats every second but only read them every 3 seconds. + throttle := time.NewTicker(time.Second * 3) + defer throttle.Stop() + + for id := "$"; ; { + streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{ + Streams: []string{"icinga:stats", id}, + }) + if err != nil { + return errors.Wrap(err, "can't read Icinga heartbeat") + } + + m := &HeartbeatMessage{ + received: time.Now(), + stats: streams[0].Messages[0].Values, + } + + select { + case messages <- m: + case <-ctx.Done(): + return ctx.Err() + } + + id = streams[0].Messages[0].ID + + <-throttle.C + } + }) + + // State loop. + g.Go(func() error { + for { + select { + case m := <-messages: + if !h.active { + envId, err := m.EnvironmentID() + if err != nil { + return err + } + h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) + h.active = true + } + + atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) + h.sendEvent(m) + case <-time.After(timeout): + if h.active { + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) + h.sendEvent(nil) + h.active = false + } else { + h.logger.Warn("Waiting for Icinga heartbeat") + } + + atomic.StoreInt64(&h.lastReceivedMs, 0) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + // Since the goroutines of the group actually run endlessly, + // we wait here forever, unless an error occurs. + if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { + // Do not propagate any context-canceled errors here, + // as this is to be expected when calling Close or + // when the parent context is canceled. + h.setError(err) + } +} + +func (h *Heartbeat) setError(err error) { + h.errMu.Lock() + defer h.errMu.Unlock() + + h.err = errors.Wrap(err, "heartbeat failed") +} + +func (h *Heartbeat) sendEvent(m *HeartbeatMessage) { + // Remove any not yet delivered event + select { + case old := <-h.events: + if old != nil { + kv := []any{zap.Time("previous", old.received)} + if m != nil { + kv = append(kv, zap.Time("current", m.received)) + } + + h.logger.Debugw("Previous heartbeat not read from channel", kv...) + } else { + h.logger.Debug("Previous heartbeat loss event not read from channel") + } + default: + } + + h.events <- m +} + +// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received. +type HeartbeatMessage struct { + received time.Time + stats v1.StatsMessage +} + +// Stats returns the underlying heartbeat message from the icinga:stats stream. +func (m *HeartbeatMessage) Stats() *v1.StatsMessage { + return &m.stats +} + +// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message. +func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) { + var id types.Binary + err := internal.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id) + if err != nil { + return nil, err + } + return id, nil +} + +// ExpiryTime returns the timestamp when the heartbeat expires. +func (m *HeartbeatMessage) ExpiryTime() time.Time { + return m.received.Add(timeout) +} |