summaryrefslogtreecommitdiffstats
path: root/pkg/icingaredis/heartbeat.go
blob: cb34010879d767cdb5e625617f504bf56ffaf9ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package icingaredis

import (
	"context"
	"github.com/icinga/icingadb/internal"
	v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
	"github.com/icinga/icingadb/pkg/logging"
	"github.com/icinga/icingadb/pkg/types"
	"github.com/icinga/icingadb/pkg/utils"
	"github.com/pkg/errors"
	"github.com/redis/go-redis/v9"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
	"sync"
	"sync/atomic"
	"time"
)

// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// After this time, a heartbeat loss is propagated.
const Timeout = time.Minute

// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
// Also signals on if the heartbeat is Lost.
type Heartbeat struct {
	active         bool
	events         chan *HeartbeatMessage
	lastReceivedMs int64
	cancelCtx      context.CancelFunc
	client         *Client
	done           chan struct{}
	errMu          sync.Mutex
	err            error
	logger         *logging.Logger
}

// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat {
	ctx, cancelCtx := context.WithCancel(ctx)

	heartbeat := &Heartbeat{
		events:    make(chan *HeartbeatMessage, 1),
		cancelCtx: cancelCtx,
		client:    client,
		done:      make(chan struct{}),
		logger:    logger,
	}

	go heartbeat.controller(ctx)

	return heartbeat
}

// Events returns a channel that is sent to on heartbeat events.
//
// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
	return h.events
}

// LastReceived returns the last heartbeat's receive time in ms.
func (h *Heartbeat) LastReceived() int64 {
	return atomic.LoadInt64(&h.lastReceivedMs)
}

// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
	h.cancelCtx()
	<-h.Done()

	return h.Err()
}

// Done returns a channel that will be closed when the heartbeat controller loop has ended.
func (h *Heartbeat) Done() <-chan struct{} {
	return h.done
}

// Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
func (h *Heartbeat) Err() error {
	h.errMu.Lock()
	defer h.errMu.Unlock()

	return h.err
}

// controller loop.
func (h *Heartbeat) controller(ctx context.Context) {
	defer close(h.done)

	messages := make(chan *HeartbeatMessage)
	defer close(messages)

	g, ctx := errgroup.WithContext(ctx)

	// Message producer loop.
	g.Go(func() error {
		// We expect heartbeats every second but only read them every 3 seconds.
		throttle := time.NewTicker(3 * time.Second)
		defer throttle.Stop()

		for id := "$"; ; {
			streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{
				Streams: []string{"icinga:stats", id},
			})
			if err != nil {
				return errors.Wrap(err, "can't read Icinga heartbeat")
			}

			m := &HeartbeatMessage{
				received: time.Now(),
				stats:    streams[0].Messages[0].Values,
			}

			select {
			case messages <- m:
			case <-ctx.Done():
				return ctx.Err()
			}

			id = streams[0].Messages[0].ID

			<-throttle.C
		}
	})

	// State loop.
	g.Go(func() error {
		for {
			select {
			case m := <-messages:
				if !h.active {
					envId, err := m.EnvironmentID()
					if err != nil {
						return err
					}
					h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String()))
					h.active = true
				}

				atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
				h.sendEvent(m)
			case <-time.After(Timeout):
				if h.active {
					h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout))
					h.sendEvent(nil)
					h.active = false
				} else {
					h.logger.Warn("Waiting for Icinga heartbeat")
				}

				atomic.StoreInt64(&h.lastReceivedMs, 0)
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})

	// Since the goroutines of the group actually run endlessly,
	// we wait here forever, unless an error occurs.
	if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) {
		// Do not propagate any context-canceled errors here,
		// as this is to be expected when calling Close or
		// when the parent context is canceled.
		h.setError(err)
	}
}

func (h *Heartbeat) setError(err error) {
	h.errMu.Lock()
	defer h.errMu.Unlock()

	h.err = errors.Wrap(err, "heartbeat failed")
}

func (h *Heartbeat) sendEvent(m *HeartbeatMessage) {
	// Remove any not yet delivered event
	select {
	case old := <-h.events:
		if old != nil {
			kv := []any{zap.Time("previous", old.received)}
			if m != nil {
				kv = append(kv, zap.Time("current", m.received))
			}

			h.logger.Debugw("Previous heartbeat not read from channel", kv...)
		} else {
			h.logger.Debug("Previous heartbeat loss event not read from channel")
		}
	default:
	}

	h.events <- m
}

// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
type HeartbeatMessage struct {
	received time.Time
	stats    v1.StatsMessage
}

// Stats returns the underlying heartbeat message from the icinga:stats stream.
func (m *HeartbeatMessage) Stats() *v1.StatsMessage {
	return &m.stats
}

// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message.
func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {
	var id types.Binary
	err := internal.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id)
	if err != nil {
		return nil, err
	}
	return id, nil
}

// ExpiryTime returns the timestamp when the heartbeat expires.
func (m *HeartbeatMessage) ExpiryTime() time.Time {
	return m.received.Add(Timeout)
}