summaryrefslogtreecommitdiffstats
path: root/pkg/icingaredis
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/icingaredis')
-rw-r--r--pkg/icingaredis/client.go243
-rw-r--r--pkg/icingaredis/heartbeat.go221
-rw-r--r--pkg/icingaredis/telemetry/heartbeat.go203
-rw-r--r--pkg/icingaredis/telemetry/stats.go51
-rw-r--r--pkg/icingaredis/utils.go128
-rw-r--r--pkg/icingaredis/v1/icinga_status.go21
-rw-r--r--pkg/icingaredis/v1/stats_message.go51
7 files changed, 918 insertions, 0 deletions
diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go
new file mode 100644
index 0000000..d42713c
--- /dev/null
+++ b/pkg/icingaredis/client.go
@@ -0,0 +1,243 @@
+package icingaredis
+
+import (
+ "context"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/pkg/com"
+ "github.com/icinga/icingadb/pkg/common"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/periodic"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/pkg/errors"
+ "golang.org/x/sync/errgroup"
+ "golang.org/x/sync/semaphore"
+ "runtime"
+ "time"
+)
+
+// Client is a wrapper around redis.Client with
+// streaming and logging capabilities.
+type Client struct {
+ *redis.Client
+
+ Options *Options
+
+ logger *logging.Logger
+}
+
+// Options define user configurable Redis options.
+type Options struct {
+ BlockTimeout time.Duration `yaml:"block_timeout" default:"1s"`
+ HMGetCount int `yaml:"hmget_count" default:"4096"`
+ HScanCount int `yaml:"hscan_count" default:"4096"`
+ MaxHMGetConnections int `yaml:"max_hmget_connections" default:"8"`
+ Timeout time.Duration `yaml:"timeout" default:"30s"`
+ XReadCount int `yaml:"xread_count" default:"4096"`
+}
+
+// Validate checks constraints in the supplied Redis options and returns an error if they are violated.
+func (o *Options) Validate() error {
+ if o.BlockTimeout <= 0 {
+ return errors.New("block_timeout must be positive")
+ }
+ if o.HMGetCount < 1 {
+ return errors.New("hmget_count must be at least 1")
+ }
+ if o.HScanCount < 1 {
+ return errors.New("hscan_count must be at least 1")
+ }
+ if o.MaxHMGetConnections < 1 {
+ return errors.New("max_hmget_connections must be at least 1")
+ }
+ if o.Timeout == 0 {
+ return errors.New("timeout cannot be 0. Configure a value greater than zero, or use -1 for no timeout")
+ }
+ if o.XReadCount < 1 {
+ return errors.New("xread_count must be at least 1")
+ }
+
+ return nil
+}
+
+// NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client.
+func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client {
+ return &Client{Client: client, logger: logger, Options: options}
+}
+
+// HPair defines Redis hashes field-value pairs.
+type HPair struct {
+ Field string
+ Value string
+}
+
+// HYield yields HPair field-value pairs for all fields in the hash stored at key.
+func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) {
+ pairs := make(chan HPair, c.Options.HScanCount)
+
+ return pairs, com.WaitAsync(contracts.WaiterFunc(func() error {
+ var counter com.Counter
+ defer c.log(ctx, key, &counter).Stop()
+ defer close(pairs)
+
+ seen := make(map[string]struct{})
+
+ var cursor uint64
+ var err error
+ var page []string
+
+ for {
+ cmd := c.HScan(ctx, key, cursor, "", int64(c.Options.HScanCount))
+ page, cursor, err = cmd.Result()
+
+ if err != nil {
+ return WrapCmdErr(cmd)
+ }
+
+ for i := 0; i < len(page); i += 2 {
+ if _, ok := seen[page[i]]; ok {
+ // Ignore duplicate returned by HSCAN.
+ continue
+ }
+
+ seen[page[i]] = struct{}{}
+
+ select {
+ case pairs <- HPair{
+ Field: page[i],
+ Value: page[i+1],
+ }:
+ counter.Inc()
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ if cursor == 0 {
+ break
+ }
+ }
+
+ return nil
+ }))
+}
+
+// HMYield yields HPair field-value pairs for the specified fields in the hash stored at key.
+func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error) {
+ pairs := make(chan HPair)
+
+ return pairs, com.WaitAsync(contracts.WaiterFunc(func() error {
+ var counter com.Counter
+ defer c.log(ctx, key, &counter).Stop()
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ defer func() {
+ // Wait until the group is done so that we can safely close the pairs channel,
+ // because on error, sem.Acquire will return before calling g.Wait(),
+ // which can result in goroutines working on a closed channel.
+ _ = g.Wait()
+ close(pairs)
+ }()
+
+ // Use context from group.
+ batches := utils.BatchSliceOfStrings(ctx, fields, c.Options.HMGetCount)
+
+ sem := semaphore.NewWeighted(int64(c.Options.MaxHMGetConnections))
+
+ for batch := range batches {
+ if err := sem.Acquire(ctx, 1); err != nil {
+ return errors.Wrap(err, "can't acquire semaphore")
+ }
+
+ batch := batch
+ g.Go(func() error {
+ defer sem.Release(1)
+
+ cmd := c.HMGet(ctx, key, batch...)
+ vals, err := cmd.Result()
+
+ if err != nil {
+ return WrapCmdErr(cmd)
+ }
+
+ for i, v := range vals {
+ if v == nil {
+ c.logger.Warnf("HMGET %s: field %#v missing", key, batch[i])
+ continue
+ }
+
+ select {
+ case pairs <- HPair{
+ Field: batch[i],
+ Value: v.(string),
+ }:
+ counter.Inc()
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ return nil
+ })
+ }
+
+ return g.Wait()
+ }))
+}
+
+// XReadUntilResult (repeatedly) calls XREAD with the specified arguments until a result is returned.
+// Each call blocks at most for the duration specified in Options.BlockTimeout until data
+// is available before it times out and the next call is made.
+// This also means that an already set block timeout is overridden.
+func (c *Client) XReadUntilResult(ctx context.Context, a *redis.XReadArgs) ([]redis.XStream, error) {
+ a.Block = c.Options.BlockTimeout
+
+ for {
+ cmd := c.XRead(ctx, a)
+ streams, err := cmd.Result()
+ if err != nil {
+ if errors.Is(err, redis.Nil) {
+ continue
+ }
+
+ return streams, WrapCmdErr(cmd)
+ }
+
+ return streams, nil
+ }
+}
+
+// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
+func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) {
+ key := utils.Key(utils.Name(subject.Entity()), ':')
+ if subject.WithChecksum() {
+ key = "icinga:checksum:" + key
+ } else {
+ key = "icinga:" + key
+ }
+
+ pairs, errs := c.HYield(ctx, key)
+ g, ctx := errgroup.WithContext(ctx)
+ // Let errors from HYield cancel the group.
+ com.ErrgroupReceive(g, errs)
+
+ desired, errs := CreateEntities(ctx, subject.FactoryForDelta(), pairs, runtime.NumCPU())
+ // Let errors from CreateEntities cancel the group.
+ com.ErrgroupReceive(g, errs)
+
+ return desired, com.WaitAsync(g)
+}
+
+func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stopper {
+ return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) {
+ // We may never get to progress logging here,
+ // as fetching should be completed before the interval expires,
+ // but if it does, it is good to have this log message.
+ if count := counter.Reset(); count > 0 {
+ c.logger.Debugf("Fetched %d items from %s", count, key)
+ }
+ }, periodic.OnStop(func(tick periodic.Tick) {
+ c.logger.Debugf("Finished fetching from %s with %d items in %s", key, counter.Total(), tick.Elapsed)
+ }))
+}
diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go
new file mode 100644
index 0000000..9a8ebad
--- /dev/null
+++ b/pkg/icingaredis/heartbeat.go
@@ -0,0 +1,221 @@
+package icingaredis
+
+import (
+ "context"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/internal"
+ v1 "github.com/icinga/icingadb/pkg/icingaredis/v1"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/types"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
+// After this time, a heartbeat loss is propagated.
+var timeout = 60 * time.Second
+
+// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
+// Also signals on if the heartbeat is Lost.
+type Heartbeat struct {
+ active bool
+ events chan *HeartbeatMessage
+ lastReceivedMs int64
+ cancelCtx context.CancelFunc
+ client *Client
+ done chan struct{}
+ errMu sync.Mutex
+ err error
+ logger *logging.Logger
+}
+
+// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop.
+func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat {
+ ctx, cancelCtx := context.WithCancel(ctx)
+
+ heartbeat := &Heartbeat{
+ events: make(chan *HeartbeatMessage, 1),
+ cancelCtx: cancelCtx,
+ client: client,
+ done: make(chan struct{}),
+ logger: logger,
+ }
+
+ go heartbeat.controller(ctx)
+
+ return heartbeat
+}
+
+// Events returns a channel that is sent to on heartbeat events.
+//
+// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss.
+func (h *Heartbeat) Events() <-chan *HeartbeatMessage {
+ return h.events
+}
+
+// LastReceived returns the last heartbeat's receive time in ms.
+func (h *Heartbeat) LastReceived() int64 {
+ return atomic.LoadInt64(&h.lastReceivedMs)
+}
+
+// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
+// Implements the io.Closer interface.
+func (h *Heartbeat) Close() error {
+ h.cancelCtx()
+ <-h.Done()
+
+ return h.Err()
+}
+
+// Done returns a channel that will be closed when the heartbeat controller loop has ended.
+func (h *Heartbeat) Done() <-chan struct{} {
+ return h.done
+}
+
+// Err returns an error if Done has been closed and there is an error. Otherwise returns nil.
+func (h *Heartbeat) Err() error {
+ h.errMu.Lock()
+ defer h.errMu.Unlock()
+
+ return h.err
+}
+
+// controller loop.
+func (h *Heartbeat) controller(ctx context.Context) {
+ defer close(h.done)
+
+ messages := make(chan *HeartbeatMessage)
+ defer close(messages)
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ // Message producer loop.
+ g.Go(func() error {
+ // We expect heartbeats every second but only read them every 3 seconds.
+ throttle := time.NewTicker(time.Second * 3)
+ defer throttle.Stop()
+
+ for id := "$"; ; {
+ streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{
+ Streams: []string{"icinga:stats", id},
+ })
+ if err != nil {
+ return errors.Wrap(err, "can't read Icinga heartbeat")
+ }
+
+ m := &HeartbeatMessage{
+ received: time.Now(),
+ stats: streams[0].Messages[0].Values,
+ }
+
+ select {
+ case messages <- m:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+
+ id = streams[0].Messages[0].ID
+
+ <-throttle.C
+ }
+ })
+
+ // State loop.
+ g.Go(func() error {
+ for {
+ select {
+ case m := <-messages:
+ if !h.active {
+ envId, err := m.EnvironmentID()
+ if err != nil {
+ return err
+ }
+ h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String()))
+ h.active = true
+ }
+
+ atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
+ h.sendEvent(m)
+ case <-time.After(timeout):
+ if h.active {
+ h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
+ h.sendEvent(nil)
+ h.active = false
+ } else {
+ h.logger.Warn("Waiting for Icinga heartbeat")
+ }
+
+ atomic.StoreInt64(&h.lastReceivedMs, 0)
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ })
+
+ // Since the goroutines of the group actually run endlessly,
+ // we wait here forever, unless an error occurs.
+ if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) {
+ // Do not propagate any context-canceled errors here,
+ // as this is to be expected when calling Close or
+ // when the parent context is canceled.
+ h.setError(err)
+ }
+}
+
+func (h *Heartbeat) setError(err error) {
+ h.errMu.Lock()
+ defer h.errMu.Unlock()
+
+ h.err = errors.Wrap(err, "heartbeat failed")
+}
+
+func (h *Heartbeat) sendEvent(m *HeartbeatMessage) {
+ // Remove any not yet delivered event
+ select {
+ case old := <-h.events:
+ if old != nil {
+ kv := []any{zap.Time("previous", old.received)}
+ if m != nil {
+ kv = append(kv, zap.Time("current", m.received))
+ }
+
+ h.logger.Debugw("Previous heartbeat not read from channel", kv...)
+ } else {
+ h.logger.Debug("Previous heartbeat loss event not read from channel")
+ }
+ default:
+ }
+
+ h.events <- m
+}
+
+// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received.
+type HeartbeatMessage struct {
+ received time.Time
+ stats v1.StatsMessage
+}
+
+// Stats returns the underlying heartbeat message from the icinga:stats stream.
+func (m *HeartbeatMessage) Stats() *v1.StatsMessage {
+ return &m.stats
+}
+
+// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message.
+func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {
+ var id types.Binary
+ err := internal.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id)
+ if err != nil {
+ return nil, err
+ }
+ return id, nil
+}
+
+// ExpiryTime returns the timestamp when the heartbeat expires.
+func (m *HeartbeatMessage) ExpiryTime() time.Time {
+ return m.received.Add(timeout)
+}
diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go
new file mode 100644
index 0000000..ee476a1
--- /dev/null
+++ b/pkg/icingaredis/telemetry/heartbeat.go
@@ -0,0 +1,203 @@
+package telemetry
+
+import (
+ "context"
+ "fmt"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/internal"
+ "github.com/icinga/icingadb/pkg/com"
+ "github.com/icinga/icingadb/pkg/icingaredis"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/periodic"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+ "regexp"
+ "runtime/metrics"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// ha represents icingadb.HA to avoid import cycles.
+type ha interface {
+ State() (weResponsibleMilli int64, weResponsible, otherResponsible bool)
+}
+
+type SuccessfulSync struct {
+ FinishMilli int64
+ DurationMilli int64
+}
+
+// currentDbConnErr stores ongoing errors from database connections.
+var currentDbConnErr struct {
+ mu sync.Mutex
+ message string
+ sinceMilli int64
+}
+
+// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr.
+func UpdateCurrentDbConnErr(err error) {
+ now := time.Now().UnixMilli()
+
+ currentDbConnErr.mu.Lock()
+ defer currentDbConnErr.mu.Unlock()
+
+ if currentDbConnErr.sinceMilli >= now {
+ // Already updated with a more recent error, ignore this one.
+ return
+ }
+
+ message := ""
+ if err != nil {
+ message = err.Error()
+ }
+
+ if currentDbConnErr.message == message {
+ // Error stayed the same, no update needed, keeping the old timestamp.
+ return
+ }
+
+ if currentDbConnErr.message == "" || message == "" {
+ // Either first error or recovery from an error, update timestamp.
+ currentDbConnErr.sinceMilli = now
+ }
+
+ currentDbConnErr.message = message
+}
+
+// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in
+// milliseconds of the last change from OK to error or from error to OK.
+func GetCurrentDbConnErr() (string, int64) {
+ currentDbConnErr.mu.Lock()
+ defer currentDbConnErr.mu.Unlock()
+
+ return currentDbConnErr.message, currentDbConnErr.sinceMilli
+}
+
+// OngoingSyncStartMilli is to be updated by the main() function.
+var OngoingSyncStartMilli int64
+
+// LastSuccessfulSync is to be updated by the main() function.
+var LastSuccessfulSync com.Atomic[SuccessfulSync]
+
+var boolToStr = map[bool]string{false: "0", true: "1"}
+var startTime = time.Now().UnixMilli()
+
+// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2.
+func StartHeartbeat(
+ ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat,
+) {
+ goMetrics := NewGoMetrics()
+
+ const interval = time.Second
+
+ var lastErr string
+ var silenceUntil time.Time
+
+ periodic.Start(ctx, interval, func(tick periodic.Tick) {
+ heartbeat := heartbeat.LastReceived()
+ responsibleTsMilli, responsible, otherResponsible := ha.State()
+ ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli)
+ sync, _ := LastSuccessfulSync.Load()
+ dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr()
+ now := time.Now()
+
+ values := map[string]string{
+ "version": internal.Version.Version,
+ "time": strconv.FormatInt(now.UnixMilli(), 10),
+ "start-time": strconv.FormatInt(startTime, 10),
+ "error": dbConnErr,
+ "error-since": strconv.FormatInt(dbConnErrSinceMilli, 10),
+ "performance-data": goMetrics.PerformanceData(),
+ "last-heartbeat-received": strconv.FormatInt(heartbeat, 10),
+ "ha-responsible": boolToStr[responsible],
+ "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10),
+ "ha-other-responsible": boolToStr[otherResponsible],
+ "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10),
+ "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10),
+ "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10),
+ }
+
+ ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval))
+ defer cancel()
+
+ cmd := client.XAdd(ctx, &redis.XAddArgs{
+ Stream: "icingadb:telemetry:heartbeat",
+ MaxLen: 1,
+ Values: values,
+ })
+ if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) {
+ logw := logger.Debugw
+ currentErr := err.Error()
+
+ if currentErr != lastErr || now.After(silenceUntil) {
+ logw = logger.Warnw
+ lastErr = currentErr
+ silenceUntil = now.Add(time.Minute)
+ }
+
+ logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd)))
+ } else {
+ lastErr = ""
+ silenceUntil = time.Time{}
+ }
+ })
+}
+
+type goMetrics struct {
+ names []string
+ units []string
+ samples []metrics.Sample
+}
+
+func NewGoMetrics() *goMetrics {
+ m := &goMetrics{}
+
+ forbiddenRe := regexp.MustCompile(`\W`)
+
+ for _, d := range metrics.All() {
+ switch d.Kind {
+ case metrics.KindUint64, metrics.KindFloat64:
+ name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_")
+
+ unit := ""
+ if strings.HasSuffix(d.Name, ":bytes") {
+ unit = "B"
+ } else if strings.HasSuffix(d.Name, ":seconds") {
+ unit = "s"
+ } else if d.Cumulative {
+ unit = "c"
+ }
+
+ m.names = append(m.names, name)
+ m.units = append(m.units, unit)
+ m.samples = append(m.samples, metrics.Sample{Name: d.Name})
+ }
+ }
+
+ return m
+}
+
+func (g *goMetrics) PerformanceData() string {
+ metrics.Read(g.samples)
+
+ var buf strings.Builder
+
+ for i, sample := range g.samples {
+ if i > 0 {
+ buf.WriteByte(' ')
+ }
+
+ switch sample.Value.Kind() {
+ case metrics.KindUint64:
+ _, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i])
+ case metrics.KindFloat64:
+ _, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i])
+ }
+ }
+
+ return buf.String()
+}
diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go
new file mode 100644
index 0000000..86db0b3
--- /dev/null
+++ b/pkg/icingaredis/telemetry/stats.go
@@ -0,0 +1,51 @@
+package telemetry
+
+import (
+ "context"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/pkg/com"
+ "github.com/icinga/icingadb/pkg/icingaredis"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/periodic"
+ "github.com/icinga/icingadb/pkg/utils"
+ "go.uber.org/zap"
+ "strconv"
+ "time"
+)
+
+var Stats struct {
+ // Config & co. are to be increased by the T sync once for every T object synced.
+ Config, State, History, Overdue, HistoryCleanup com.Counter
+}
+
+// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2.
+func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) {
+ counters := map[string]*com.Counter{
+ "config_sync": &Stats.Config,
+ "state_sync": &Stats.State,
+ "history_sync": &Stats.History,
+ "overdue_sync": &Stats.Overdue,
+ "history_cleanup": &Stats.HistoryCleanup,
+ }
+
+ periodic.Start(ctx, time.Second, func(_ periodic.Tick) {
+ var data []string
+ for kind, counter := range counters {
+ if cnt := counter.Reset(); cnt > 0 {
+ data = append(data, kind, strconv.FormatUint(cnt, 10))
+ }
+ }
+
+ if data != nil {
+ cmd := client.XAdd(ctx, &redis.XAddArgs{
+ Stream: "icingadb:telemetry:stats",
+ MaxLen: 15 * 60,
+ Approx: true,
+ Values: data,
+ })
+ if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) {
+ logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd)))
+ }
+ }
+ })
+}
diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go
new file mode 100644
index 0000000..9176dba
--- /dev/null
+++ b/pkg/icingaredis/utils.go
@@ -0,0 +1,128 @@
+package icingaredis
+
+import (
+ "context"
+ "github.com/go-redis/redis/v8"
+ "github.com/icinga/icingadb/internal"
+ "github.com/icinga/icingadb/pkg/com"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "github.com/icinga/icingadb/pkg/types"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/pkg/errors"
+ "golang.org/x/sync/errgroup"
+)
+
+// Streams represents a Redis stream key to ID mapping.
+type Streams map[string]string
+
+// Option returns the Redis stream key to ID mapping
+// as a slice of stream keys followed by their IDs
+// that is compatible for the Redis STREAMS option.
+func (s Streams) Option() []string {
+ // len*2 because we're appending the IDs later.
+ streams := make([]string, 0, len(s)*2)
+ ids := make([]string, 0, len(s))
+
+ for key, id := range s {
+ streams = append(streams, key)
+ ids = append(ids, id)
+ }
+
+ return append(streams, ids...)
+}
+
+// CreateEntities streams and creates entities from the
+// given Redis field value pairs using the specified factory function,
+// and streams them on a returned channel.
+func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) {
+ entities := make(chan contracts.Entity)
+ g, ctx := errgroup.WithContext(ctx)
+
+ g.Go(func() error {
+ defer close(entities)
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ for i := 0; i < concurrent; i++ {
+ g.Go(func() error {
+ for pair := range pairs {
+ var id types.Binary
+
+ if err := id.UnmarshalText([]byte(pair.Field)); err != nil {
+ return errors.Wrapf(err, "can't create ID from value %#v", pair.Field)
+ }
+
+ e := factoryFunc()
+ if err := internal.UnmarshalJSON([]byte(pair.Value), e); err != nil {
+ return err
+ }
+ e.SetID(id)
+
+ select {
+ case entities <- e:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ return nil
+ })
+ }
+
+ return g.Wait()
+ })
+
+ return entities, com.WaitAsync(g)
+}
+
+// SetChecksums concurrently streams from the given entities and
+// sets their checksums using the specified map and
+// streams the results on a returned channel.
+func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) {
+ entitiesWithChecksum := make(chan contracts.Entity)
+ g, ctx := errgroup.WithContext(ctx)
+
+ g.Go(func() error {
+ defer close(entitiesWithChecksum)
+
+ g, ctx := errgroup.WithContext(ctx)
+
+ for i := 0; i < concurrent; i++ {
+ g.Go(func() error {
+ for entity := range entities {
+ if checksumer, ok := checksums[entity.ID().String()]; ok {
+ entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum())
+ } else {
+ return errors.Errorf("no checksum for %#v", entity)
+ }
+
+ select {
+ case entitiesWithChecksum <- entity:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ return nil
+ })
+ }
+
+ return g.Wait()
+ })
+
+ return entitiesWithChecksum, com.WaitAsync(g)
+}
+
+// WrapCmdErr adds the command itself and
+// the stack of the current goroutine to the command's error if any.
+func WrapCmdErr(cmd redis.Cmder) error {
+ err := cmd.Err()
+ if err != nil {
+ err = errors.Wrapf(err, "can't perform %q", utils.Ellipsize(
+ redis.NewCmd(context.Background(), cmd.Args()).String(), // Omits error in opposite to cmd.String()
+ 100,
+ ))
+ }
+
+ return err
+}
diff --git a/pkg/icingaredis/v1/icinga_status.go b/pkg/icingaredis/v1/icinga_status.go
new file mode 100644
index 0000000..d94d3d6
--- /dev/null
+++ b/pkg/icingaredis/v1/icinga_status.go
@@ -0,0 +1,21 @@
+package v1
+
+import (
+ "github.com/icinga/icingadb/pkg/types"
+)
+
+// IcingaStatus defines Icinga status information.
+type IcingaStatus struct {
+ // Note: Icinga2Environment is not related to the environment_id used throughout Icinga DB.
+ Icinga2Environment string `json:"environment"`
+ NodeName string `json:"node_name"`
+ Version string `json:"version"`
+ ProgramStart types.UnixMilli `json:"program_start"`
+ EndpointId types.Binary `json:"endpoint_id"`
+ NotificationsEnabled types.Bool `json:"enable_notifications"`
+ ActiveServiceChecksEnabled types.Bool `json:"enable_service_checks"`
+ ActiveHostChecksEnabled types.Bool `json:"enable_host_checks"`
+ EventHandlersEnabled types.Bool `json:"enable_event_handlers"`
+ FlapDetectionEnabled types.Bool `json:"enable_flapping"`
+ PerformanceDataEnabled types.Bool `json:"enable_perfdata"`
+}
diff --git a/pkg/icingaredis/v1/stats_message.go b/pkg/icingaredis/v1/stats_message.go
new file mode 100644
index 0000000..5b04629
--- /dev/null
+++ b/pkg/icingaredis/v1/stats_message.go
@@ -0,0 +1,51 @@
+package v1
+
+import (
+ "github.com/icinga/icingadb/internal"
+ "github.com/icinga/icingadb/pkg/types"
+ "github.com/pkg/errors"
+)
+
+// StatsMessage represents a message from the Redis stream icinga:stats.
+type StatsMessage map[string]interface{}
+
+// Raw returns the key-value pairs of the message.
+func (m StatsMessage) Raw() map[string]interface{} {
+ return m
+}
+
+// IcingaStatus extracts Icinga status information from the message into IcingaStatus and returns it.
+func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) {
+ if s, ok := m["IcingaApplication"].(string); ok {
+ var envelope struct {
+ Status struct {
+ IcingaApplication struct {
+ IcingaStatus `json:"app"`
+ } `json:"icingaapplication"`
+ } `json:"status"`
+ }
+
+ if err := internal.UnmarshalJSON([]byte(s), &envelope); err != nil {
+ return nil, err
+ }
+
+ return &envelope.Status.IcingaApplication.IcingaStatus, nil
+ }
+
+ return nil, errors.Errorf(`bad message %#v. "IcingaApplication" missing`, m)
+}
+
+// Time extracts the timestamp of the message into types.UnixMilli and returns it.
+func (m StatsMessage) Time() (*types.UnixMilli, error) {
+ if s, ok := m["timestamp"].(string); ok {
+ var t types.UnixMilli
+
+ if err := internal.UnmarshalJSON([]byte(s), &t); err != nil {
+ return nil, err
+ }
+
+ return &t, nil
+ }
+
+ return nil, errors.Errorf(`bad message %#v. "timestamp" missing`, m)
+}