summaryrefslogtreecommitdiffstats
path: root/pkg/icingaredis/heartbeat.go
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 11:40:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 11:40:59 +0000
commitbc4e624732bd51c0dd1e9529cf228e8c23127732 (patch)
treed95dab8960e9d02d3b95f8653074ad2e54ca207c /pkg/icingaredis/heartbeat.go
parentInitial commit. (diff)
downloadicingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.tar.xz
icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.zip
Adding upstream version 1.1.1.upstream/1.1.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'pkg/icingaredis/heartbeat.go')
-rw-r--r--pkg/icingaredis/heartbeat.go221
1 files changed, 221 insertions, 0 deletions
diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go
new file mode 100644
index 0000000..9a8ebad
--- /dev/null
+++ b/pkg/icingaredis/heartbeat.go
@@ -0,0 +1,221 @@
+package icingaredis
+
+import (
+ "context"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/internal"
+ v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/types"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
+// After this time, a heartbeat loss is propagated.
+var timeout = 60 * time.Second
+
+// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
+// Also signals on if the heartbeat is Lost.
+type Heartbeat struct {
+ active bool
+ events chan *HeartbeatMessage
+ lastReceivedMs int64
+ cancelCtx context.CancelFunc
+ client *Client
+ done chan struct{}
+ errMu sync.Mutex
+ err error
+ logger *logging.Logger
+}
+
+// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
+func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat {
+ ctx, cancelCtx := context.WithCancel(ctx)
+
+ heartbeat := &Heartbeat{
+ events: make(chan *HeartbeatMessage, 1),
+ cancelCtx: cancelCtx,
+ client: client,
+ done: make(chan struct{}),
+ logger: logger,
+ }
+
+ go heartbeat.controller(ctx)
+
+ return heartbeat
+}
+
+// Events returns a channel that is sent to on heartbeat events.
+//
+// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
+func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
+ return h.events
+}
+
+// LastReceived returns the last heartbeat's receive time in ms.
+func (h *Heartbeat) LastReceived() int64 {
+ return atomic.LoadInt64(&h.lastReceivedMs)
+}
+
+// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
+// Implements the io.Closer interface.
+func (h *Heartbeat) Close() error {
+ h.cancelCtx()
+ <-h.Done()
+
+ return h.Err()
+}
+
+// Done returns a channel that will be closed when the heartbeat controller loop has ended.
+func (h *Heartbeat) Done() <-chan struct{} {
+ return h.done
+}
+
+// Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
+func (h *Heartbeat) Err() error {
+ h.errMu.Lock()
+ defer h.errMu.Unlock()
+
+ return h.err
+}
+
+// controller loop.
+func (h *Heartbeat) controller(ctx context.Context) {
+ defer close(h.done)
+
+ messages := make(chan *HeartbeatMessage)
+ defer close(messages)
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ // Message producer loop.
+ g.Go(func() error {
+ // We expect heartbeats every second but only read them every 3 seconds.
+ throttle := time.NewTicker(time.Second * 3)
+ defer throttle.Stop()
+
+ for id := "$"; ; {
+ streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{
+ Streams: []string{"icinga:stats", id},
+ })
+ if err != nil {
+ return errors.Wrap(err, "can't read Icinga heartbeat")
+ }
+
+ m := &HeartbeatMessage{
+ received: time.Now(),
+ stats: streams[0].Messages[0].Values,
+ }
+
+ select {
+ case messages <- m:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ id = streams[0].Messages[0].ID
+
+ <-throttle.C
+ }
+ })
+
+ // State loop.
+ g.Go(func() error {
+ for {
+ select {
+ case m := <-messages:
+ if !h.active {
+ envId, err := m.EnvironmentID()
+ if err != nil {
+ return err
+ }
+ h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String()))
+ h.active = true
+ }
+
+ atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
+ h.sendEvent(m)
+ case <-time.After(timeout):
+ if h.active {
+ h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
+ h.sendEvent(nil)
+ h.active = false
+ } else {
+ h.logger.Warn("Waiting for Icinga heartbeat")
+ }
+
+ atomic.StoreInt64(&h.lastReceivedMs, 0)
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ })
+
+ // Since the goroutines of the group actually run endlessly,
+ // we wait here forever, unless an error occurs.
+ if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) {
+ // Do not propagate any context-canceled errors here,
+ // as this is to be expected when calling Close or
+ // when the parent context is canceled.
+ h.setError(err)
+ }
+}
+
+func (h *Heartbeat) setError(err error) {
+ h.errMu.Lock()
+ defer h.errMu.Unlock()
+
+ h.err = errors.Wrap(err, "heartbeat failed")
+}
+
+func (h *Heartbeat) sendEvent(m *HeartbeatMessage) {
+ // Remove any not yet delivered event
+ select {
+ case old := <-h.events:
+ if old != nil {
+ kv := []any{zap.Time("previous", old.received)}
+ if m != nil {
+ kv = append(kv, zap.Time("current", m.received))
+ }
+
+ h.logger.Debugw("Previous heartbeat not read from channel", kv...)
+ } else {
+ h.logger.Debug("Previous heartbeat loss event not read from channel")
+ }
+ default:
+ }
+
+ h.events <- m
+}
+
+// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
+type HeartbeatMessage struct {
+ received time.Time
+ stats v1.StatsMessage
+}
+
+// Stats returns the underlying heartbeat message from the icinga:stats stream.
+func (m *HeartbeatMessage) Stats() *v1.StatsMessage {
+ return &m.stats
+}
+
+// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message.
+func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {
+ var id types.Binary
+ err := internal.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id)
+ if err != nil {
+ return nil, err
+ }
+ return id, nil
+}
+
+// ExpiryTime returns the timestamp when the heartbeat expires.
+func (m *HeartbeatMessage) ExpiryTime() time.Time {
+ return m.received.Add(timeout)
+}