diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:40:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:40:59 +0000 |
commit | bc4e624732bd51c0dd1e9529cf228e8c23127732 (patch) | |
tree | d95dab8960e9d02d3b95f8653074ad2e54ca207c /pkg/icingaredis | |
parent | Initial commit. (diff) | |
download | icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.tar.xz icingadb-bc4e624732bd51c0dd1e9529cf228e8c23127732.zip |
Adding upstream version 1.1.1.upstream/1.1.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | pkg/icingaredis/client.go | 243 | ||||
-rw-r--r-- | pkg/icingaredis/heartbeat.go | 221 | ||||
-rw-r--r-- | pkg/icingaredis/telemetry/heartbeat.go | 203 | ||||
-rw-r--r-- | pkg/icingaredis/telemetry/stats.go | 51 | ||||
-rw-r--r-- | pkg/icingaredis/utils.go | 128 | ||||
-rw-r--r-- | pkg/icingaredis/v1/icinga_status.go | 21 | ||||
-rw-r--r-- | pkg/icingaredis/v1/stats_message.go | 51 |
7 files changed, 918 insertions, 0 deletions
diff --git a/pkg/icingaredis/client.go b/pkg/icingaredis/client.go new file mode 100644 index 0000000..d42713c --- /dev/null +++ b/pkg/icingaredis/client.go @@ -0,0 +1,243 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" + "runtime" + "time" +) + +// Client is a wrapper around redis.Client with +// streaming and logging capabilities. +type Client struct { + *redis.Client + + Options *Options + + logger *logging.Logger +} + +// Options define user configurable Redis options. +type Options struct { + BlockTimeout time.Duration `yaml:"block_timeout" default:"1s"` + HMGetCount int `yaml:"hmget_count" default:"4096"` + HScanCount int `yaml:"hscan_count" default:"4096"` + MaxHMGetConnections int `yaml:"max_hmget_connections" default:"8"` + Timeout time.Duration `yaml:"timeout" default:"30s"` + XReadCount int `yaml:"xread_count" default:"4096"` +} + +// Validate checks constraints in the supplied Redis options and returns an error if they are violated. +func (o *Options) Validate() error { + if o.BlockTimeout <= 0 { + return errors.New("block_timeout must be positive") + } + if o.HMGetCount < 1 { + return errors.New("hmget_count must be at least 1") + } + if o.HScanCount < 1 { + return errors.New("hscan_count must be at least 1") + } + if o.MaxHMGetConnections < 1 { + return errors.New("max_hmget_connections must be at least 1") + } + if o.Timeout == 0 { + return errors.New("timeout cannot be 0. Configure a value greater than zero, or use -1 for no timeout") + } + if o.XReadCount < 1 { + return errors.New("xread_count must be at least 1") + } + + return nil +} + +// NewClient returns a new icingaredis.Client wrapper for a pre-existing *redis.Client. +func NewClient(client *redis.Client, logger *logging.Logger, options *Options) *Client { + return &Client{Client: client, logger: logger, Options: options} +} + +// HPair defines Redis hashes field-value pairs. +type HPair struct { + Field string + Value string +} + +// HYield yields HPair field-value pairs for all fields in the hash stored at key. +func (c *Client) HYield(ctx context.Context, key string) (<-chan HPair, <-chan error) { + pairs := make(chan HPair, c.Options.HScanCount) + + return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() + defer close(pairs) + + seen := make(map[string]struct{}) + + var cursor uint64 + var err error + var page []string + + for { + cmd := c.HScan(ctx, key, cursor, "", int64(c.Options.HScanCount)) + page, cursor, err = cmd.Result() + + if err != nil { + return WrapCmdErr(cmd) + } + + for i := 0; i < len(page); i += 2 { + if _, ok := seen[page[i]]; ok { + // Ignore duplicate returned by HSCAN. + continue + } + + seen[page[i]] = struct{}{} + + select { + case pairs <- HPair{ + Field: page[i], + Value: page[i+1], + }: + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + + if cursor == 0 { + break + } + } + + return nil + })) +} + +// HMYield yields HPair field-value pairs for the specified fields in the hash stored at key. +func (c *Client) HMYield(ctx context.Context, key string, fields ...string) (<-chan HPair, <-chan error) { + pairs := make(chan HPair) + + return pairs, com.WaitAsync(contracts.WaiterFunc(func() error { + var counter com.Counter + defer c.log(ctx, key, &counter).Stop() + + g, ctx := errgroup.WithContext(ctx) + + defer func() { + // Wait until the group is done so that we can safely close the pairs channel, + // because on error, sem.Acquire will return before calling g.Wait(), + // which can result in goroutines working on a closed channel. + _ = g.Wait() + close(pairs) + }() + + // Use context from group. + batches := utils.BatchSliceOfStrings(ctx, fields, c.Options.HMGetCount) + + sem := semaphore.NewWeighted(int64(c.Options.MaxHMGetConnections)) + + for batch := range batches { + if err := sem.Acquire(ctx, 1); err != nil { + return errors.Wrap(err, "can't acquire semaphore") + } + + batch := batch + g.Go(func() error { + defer sem.Release(1) + + cmd := c.HMGet(ctx, key, batch...) + vals, err := cmd.Result() + + if err != nil { + return WrapCmdErr(cmd) + } + + for i, v := range vals { + if v == nil { + c.logger.Warnf("HMGET %s: field %#v missing", key, batch[i]) + continue + } + + select { + case pairs <- HPair{ + Field: batch[i], + Value: v.(string), + }: + counter.Inc() + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + })) +} + +// XReadUntilResult (repeatedly) calls XREAD with the specified arguments until a result is returned. +// Each call blocks at most for the duration specified in Options.BlockTimeout until data +// is available before it times out and the next call is made. +// This also means that an already set block timeout is overridden. +func (c *Client) XReadUntilResult(ctx context.Context, a *redis.XReadArgs) ([]redis.XStream, error) { + a.Block = c.Options.BlockTimeout + + for { + cmd := c.XRead(ctx, a) + streams, err := cmd.Result() + if err != nil { + if errors.Is(err, redis.Nil) { + continue + } + + return streams, WrapCmdErr(cmd) + } + + return streams, nil + } +} + +// YieldAll yields all entities from Redis that belong to the specified SyncSubject. +func (c Client) YieldAll(ctx context.Context, subject *common.SyncSubject) (<-chan contracts.Entity, <-chan error) { + key := utils.Key(utils.Name(subject.Entity()), ':') + if subject.WithChecksum() { + key = "icinga:checksum:" + key + } else { + key = "icinga:" + key + } + + pairs, errs := c.HYield(ctx, key) + g, ctx := errgroup.WithContext(ctx) + // Let errors from HYield cancel the group. + com.ErrgroupReceive(g, errs) + + desired, errs := CreateEntities(ctx, subject.FactoryForDelta(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel the group. + com.ErrgroupReceive(g, errs) + + return desired, com.WaitAsync(g) +} + +func (c *Client) log(ctx context.Context, key string, counter *com.Counter) periodic.Stopper { + return periodic.Start(ctx, c.logger.Interval(), func(tick periodic.Tick) { + // We may never get to progress logging here, + // as fetching should be completed before the interval expires, + // but if it does, it is good to have this log message. + if count := counter.Reset(); count > 0 { + c.logger.Debugf("Fetched %d items from %s", count, key) + } + }, periodic.OnStop(func(tick periodic.Tick) { + c.logger.Debugf("Finished fetching from %s with %d items in %s", key, counter.Total(), tick.Elapsed) + })) +} diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go new file mode 100644 index 0000000..9a8ebad --- /dev/null +++ b/pkg/icingaredis/heartbeat.go @@ -0,0 +1,221 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + v1 "github.com/icinga/icingadb/pkg/icingaredis/v1" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "sync" + "sync/atomic" + "time" +) + +// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// After this time, a heartbeat loss is propagated. +var timeout = 60 * time.Second + +// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. +// Also signals on if the heartbeat is Lost. +type Heartbeat struct { + active bool + events chan *HeartbeatMessage + lastReceivedMs int64 + cancelCtx context.CancelFunc + client *Client + done chan struct{} + errMu sync.Mutex + err error + logger *logging.Logger +} + +// NewHeartbeat returns a new Heartbeat and starts the heartbeat controller loop. +func NewHeartbeat(ctx context.Context, client *Client, logger *logging.Logger) *Heartbeat { + ctx, cancelCtx := context.WithCancel(ctx) + + heartbeat := &Heartbeat{ + events: make(chan *HeartbeatMessage, 1), + cancelCtx: cancelCtx, + client: client, + done: make(chan struct{}), + logger: logger, + } + + go heartbeat.controller(ctx) + + return heartbeat +} + +// Events returns a channel that is sent to on heartbeat events. +// +// A non-nil pointer signals that a heartbeat was received from Icinga 2 whereas a nil pointer signals a heartbeat loss. +func (h *Heartbeat) Events() <-chan *HeartbeatMessage { + return h.events +} + +// LastReceived returns the last heartbeat's receive time in ms. +func (h *Heartbeat) LastReceived() int64 { + return atomic.LoadInt64(&h.lastReceivedMs) +} + +// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. +// Implements the io.Closer interface. +func (h *Heartbeat) Close() error { + h.cancelCtx() + <-h.Done() + + return h.Err() +} + +// Done returns a channel that will be closed when the heartbeat controller loop has ended. +func (h *Heartbeat) Done() <-chan struct{} { + return h.done +} + +// Err returns an error if Done has been closed and there is an error. Otherwise returns nil. +func (h *Heartbeat) Err() error { + h.errMu.Lock() + defer h.errMu.Unlock() + + return h.err +} + +// controller loop. +func (h *Heartbeat) controller(ctx context.Context) { + defer close(h.done) + + messages := make(chan *HeartbeatMessage) + defer close(messages) + + g, ctx := errgroup.WithContext(ctx) + + // Message producer loop. + g.Go(func() error { + // We expect heartbeats every second but only read them every 3 seconds. + throttle := time.NewTicker(time.Second * 3) + defer throttle.Stop() + + for id := "$"; ; { + streams, err := h.client.XReadUntilResult(ctx, &redis.XReadArgs{ + Streams: []string{"icinga:stats", id}, + }) + if err != nil { + return errors.Wrap(err, "can't read Icinga heartbeat") + } + + m := &HeartbeatMessage{ + received: time.Now(), + stats: streams[0].Messages[0].Values, + } + + select { + case messages <- m: + case <-ctx.Done(): + return ctx.Err() + } + + id = streams[0].Messages[0].ID + + <-throttle.C + } + }) + + // State loop. + g.Go(func() error { + for { + select { + case m := <-messages: + if !h.active { + envId, err := m.EnvironmentID() + if err != nil { + return err + } + h.logger.Infow("Received Icinga heartbeat", zap.String("environment", envId.String())) + h.active = true + } + + atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) + h.sendEvent(m) + case <-time.After(timeout): + if h.active { + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) + h.sendEvent(nil) + h.active = false + } else { + h.logger.Warn("Waiting for Icinga heartbeat") + } + + atomic.StoreInt64(&h.lastReceivedMs, 0) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + // Since the goroutines of the group actually run endlessly, + // we wait here forever, unless an error occurs. + if err := g.Wait(); err != nil && !utils.IsContextCanceled(err) { + // Do not propagate any context-canceled errors here, + // as this is to be expected when calling Close or + // when the parent context is canceled. + h.setError(err) + } +} + +func (h *Heartbeat) setError(err error) { + h.errMu.Lock() + defer h.errMu.Unlock() + + h.err = errors.Wrap(err, "heartbeat failed") +} + +func (h *Heartbeat) sendEvent(m *HeartbeatMessage) { + // Remove any not yet delivered event + select { + case old := <-h.events: + if old != nil { + kv := []any{zap.Time("previous", old.received)} + if m != nil { + kv = append(kv, zap.Time("current", m.received)) + } + + h.logger.Debugw("Previous heartbeat not read from channel", kv...) + } else { + h.logger.Debug("Previous heartbeat loss event not read from channel") + } + default: + } + + h.events <- m +} + +// HeartbeatMessage represents a heartbeat received from Icinga 2 together with a timestamp when it was received. +type HeartbeatMessage struct { + received time.Time + stats v1.StatsMessage +} + +// Stats returns the underlying heartbeat message from the icinga:stats stream. +func (m *HeartbeatMessage) Stats() *v1.StatsMessage { + return &m.stats +} + +// EnvironmentID returns the Icinga DB environment ID stored in the heartbeat message. +func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) { + var id types.Binary + err := internal.UnmarshalJSON([]byte(m.stats["icingadb_environment"].(string)), &id) + if err != nil { + return nil, err + } + return id, nil +} + +// ExpiryTime returns the timestamp when the heartbeat expires. +func (m *HeartbeatMessage) ExpiryTime() time.Time { + return m.received.Add(timeout) +} diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go new file mode 100644 index 0000000..ee476a1 --- /dev/null +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -0,0 +1,203 @@ +package telemetry + +import ( + "context" + "fmt" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "regexp" + "runtime/metrics" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// ha represents icingadb.HA to avoid import cycles. +type ha interface { + State() (weResponsibleMilli int64, weResponsible, otherResponsible bool) +} + +type SuccessfulSync struct { + FinishMilli int64 + DurationMilli int64 +} + +// currentDbConnErr stores ongoing errors from database connections. +var currentDbConnErr struct { + mu sync.Mutex + message string + sinceMilli int64 +} + +// UpdateCurrentDbConnErr updates the current error information stored in currentDbConnErr. +func UpdateCurrentDbConnErr(err error) { + now := time.Now().UnixMilli() + + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + if currentDbConnErr.sinceMilli >= now { + // Already updated with a more recent error, ignore this one. + return + } + + message := "" + if err != nil { + message = err.Error() + } + + if currentDbConnErr.message == message { + // Error stayed the same, no update needed, keeping the old timestamp. + return + } + + if currentDbConnErr.message == "" || message == "" { + // Either first error or recovery from an error, update timestamp. + currentDbConnErr.sinceMilli = now + } + + currentDbConnErr.message = message +} + +// GetCurrentDbConnErr returns the last error message (or the empty string if not in an error state) and a timestamp in +// milliseconds of the last change from OK to error or from error to OK. +func GetCurrentDbConnErr() (string, int64) { + currentDbConnErr.mu.Lock() + defer currentDbConnErr.mu.Unlock() + + return currentDbConnErr.message, currentDbConnErr.sinceMilli +} + +// OngoingSyncStartMilli is to be updated by the main() function. +var OngoingSyncStartMilli int64 + +// LastSuccessfulSync is to be updated by the main() function. +var LastSuccessfulSync com.Atomic[SuccessfulSync] + +var boolToStr = map[bool]string{false: "0", true: "1"} +var startTime = time.Now().UnixMilli() + +// StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2. +func StartHeartbeat( + ctx context.Context, client *icingaredis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, +) { + goMetrics := NewGoMetrics() + + const interval = time.Second + + var lastErr string + var silenceUntil time.Time + + periodic.Start(ctx, interval, func(tick periodic.Tick) { + heartbeat := heartbeat.LastReceived() + responsibleTsMilli, responsible, otherResponsible := ha.State() + ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) + sync, _ := LastSuccessfulSync.Load() + dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() + now := time.Now() + + values := map[string]string{ + "version": internal.Version.Version, + "time": strconv.FormatInt(now.UnixMilli(), 10), + "start-time": strconv.FormatInt(startTime, 10), + "error": dbConnErr, + "error-since": strconv.FormatInt(dbConnErrSinceMilli, 10), + "performance-data": goMetrics.PerformanceData(), + "last-heartbeat-received": strconv.FormatInt(heartbeat, 10), + "ha-responsible": boolToStr[responsible], + "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), + "ha-other-responsible": boolToStr[otherResponsible], + "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), + "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10), + "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10), + } + + ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) + defer cancel() + + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:heartbeat", + MaxLen: 1, + Values: values, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) && !errors.Is(err, context.DeadlineExceeded) { + logw := logger.Debugw + currentErr := err.Error() + + if currentErr != lastErr || now.After(silenceUntil) { + logw = logger.Warnw + lastErr = currentErr + silenceUntil = now.Add(time.Minute) + } + + logw("Can't update own heartbeat", zap.Error(icingaredis.WrapCmdErr(cmd))) + } else { + lastErr = "" + silenceUntil = time.Time{} + } + }) +} + +type goMetrics struct { + names []string + units []string + samples []metrics.Sample +} + +func NewGoMetrics() *goMetrics { + m := &goMetrics{} + + forbiddenRe := regexp.MustCompile(`\W`) + + for _, d := range metrics.All() { + switch d.Kind { + case metrics.KindUint64, metrics.KindFloat64: + name := "go_" + strings.TrimLeft(forbiddenRe.ReplaceAllString(d.Name, "_"), "_") + + unit := "" + if strings.HasSuffix(d.Name, ":bytes") { + unit = "B" + } else if strings.HasSuffix(d.Name, ":seconds") { + unit = "s" + } else if d.Cumulative { + unit = "c" + } + + m.names = append(m.names, name) + m.units = append(m.units, unit) + m.samples = append(m.samples, metrics.Sample{Name: d.Name}) + } + } + + return m +} + +func (g *goMetrics) PerformanceData() string { + metrics.Read(g.samples) + + var buf strings.Builder + + for i, sample := range g.samples { + if i > 0 { + buf.WriteByte(' ') + } + + switch sample.Value.Kind() { + case metrics.KindUint64: + _, _ = fmt.Fprintf(&buf, "%s=%d%s", g.names[i], sample.Value.Uint64(), g.units[i]) + case metrics.KindFloat64: + _, _ = fmt.Fprintf(&buf, "%s=%f%s", g.names[i], sample.Value.Float64(), g.units[i]) + } + } + + return buf.String() +} diff --git a/pkg/icingaredis/telemetry/stats.go b/pkg/icingaredis/telemetry/stats.go new file mode 100644 index 0000000..86db0b3 --- /dev/null +++ b/pkg/icingaredis/telemetry/stats.go @@ -0,0 +1,51 @@ +package telemetry + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/utils" + "go.uber.org/zap" + "strconv" + "time" +) + +var Stats struct { + // Config & co. are to be increased by the T sync once for every T object synced. + Config, State, History, Overdue, HistoryCleanup com.Counter +} + +// WriteStats periodically forwards Stats to Redis for being monitored by Icinga 2. +func WriteStats(ctx context.Context, client *icingaredis.Client, logger *logging.Logger) { + counters := map[string]*com.Counter{ + "config_sync": &Stats.Config, + "state_sync": &Stats.State, + "history_sync": &Stats.History, + "overdue_sync": &Stats.Overdue, + "history_cleanup": &Stats.HistoryCleanup, + } + + periodic.Start(ctx, time.Second, func(_ periodic.Tick) { + var data []string + for kind, counter := range counters { + if cnt := counter.Reset(); cnt > 0 { + data = append(data, kind, strconv.FormatUint(cnt, 10)) + } + } + + if data != nil { + cmd := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "icingadb:telemetry:stats", + MaxLen: 15 * 60, + Approx: true, + Values: data, + }) + if err := cmd.Err(); err != nil && !utils.IsContextCanceled(err) { + logger.Warnw("Can't update own stats", zap.Error(icingaredis.WrapCmdErr(cmd))) + } + } + }) +} diff --git a/pkg/icingaredis/utils.go b/pkg/icingaredis/utils.go new file mode 100644 index 0000000..9176dba --- /dev/null +++ b/pkg/icingaredis/utils.go @@ -0,0 +1,128 @@ +package icingaredis + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +// Streams represents a Redis stream key to ID mapping. +type Streams map[string]string + +// Option returns the Redis stream key to ID mapping +// as a slice of stream keys followed by their IDs +// that is compatible for the Redis STREAMS option. +func (s Streams) Option() []string { + // len*2 because we're appending the IDs later. + streams := make([]string, 0, len(s)*2) + ids := make([]string, 0, len(s)) + + for key, id := range s { + streams = append(streams, key) + ids = append(ids, id) + } + + return append(streams, ids...) +} + +// CreateEntities streams and creates entities from the +// given Redis field value pairs using the specified factory function, +// and streams them on a returned channel. +func CreateEntities(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, pairs <-chan HPair, concurrent int) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entities) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for pair := range pairs { + var id types.Binary + + if err := id.UnmarshalText([]byte(pair.Field)); err != nil { + return errors.Wrapf(err, "can't create ID from value %#v", pair.Field) + } + + e := factoryFunc() + if err := internal.UnmarshalJSON([]byte(pair.Value), e); err != nil { + return err + } + e.SetID(id) + + select { + case entities <- e: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entities, com.WaitAsync(g) +} + +// SetChecksums concurrently streams from the given entities and +// sets their checksums using the specified map and +// streams the results on a returned channel. +func SetChecksums(ctx context.Context, entities <-chan contracts.Entity, checksums map[string]contracts.Entity, concurrent int) (<-chan contracts.Entity, <-chan error) { + entitiesWithChecksum := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entitiesWithChecksum) + + g, ctx := errgroup.WithContext(ctx) + + for i := 0; i < concurrent; i++ { + g.Go(func() error { + for entity := range entities { + if checksumer, ok := checksums[entity.ID().String()]; ok { + entity.(contracts.Checksumer).SetChecksum(checksumer.(contracts.Checksumer).Checksum()) + } else { + return errors.Errorf("no checksum for %#v", entity) + } + + select { + case entitiesWithChecksum <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + } + + return g.Wait() + }) + + return entitiesWithChecksum, com.WaitAsync(g) +} + +// WrapCmdErr adds the command itself and +// the stack of the current goroutine to the command's error if any. +func WrapCmdErr(cmd redis.Cmder) error { + err := cmd.Err() + if err != nil { + err = errors.Wrapf(err, "can't perform %q", utils.Ellipsize( + redis.NewCmd(context.Background(), cmd.Args()).String(), // Omits error in opposite to cmd.String() + 100, + )) + } + + return err +} diff --git a/pkg/icingaredis/v1/icinga_status.go b/pkg/icingaredis/v1/icinga_status.go new file mode 100644 index 0000000..d94d3d6 --- /dev/null +++ b/pkg/icingaredis/v1/icinga_status.go @@ -0,0 +1,21 @@ +package v1 + +import ( + "github.com/icinga/icingadb/pkg/types" +) + +// IcingaStatus defines Icinga status information. +type IcingaStatus struct { + // Note: Icinga2Environment is not related to the environment_id used throughout Icinga DB. + Icinga2Environment string `json:"environment"` + NodeName string `json:"node_name"` + Version string `json:"version"` + ProgramStart types.UnixMilli `json:"program_start"` + EndpointId types.Binary `json:"endpoint_id"` + NotificationsEnabled types.Bool `json:"enable_notifications"` + ActiveServiceChecksEnabled types.Bool `json:"enable_service_checks"` + ActiveHostChecksEnabled types.Bool `json:"enable_host_checks"` + EventHandlersEnabled types.Bool `json:"enable_event_handlers"` + FlapDetectionEnabled types.Bool `json:"enable_flapping"` + PerformanceDataEnabled types.Bool `json:"enable_perfdata"` +} diff --git a/pkg/icingaredis/v1/stats_message.go b/pkg/icingaredis/v1/stats_message.go new file mode 100644 index 0000000..5b04629 --- /dev/null +++ b/pkg/icingaredis/v1/stats_message.go @@ -0,0 +1,51 @@ +package v1 + +import ( + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/types" + "github.com/pkg/errors" +) + +// StatsMessage represents a message from the Redis stream icinga:stats. +type StatsMessage map[string]interface{} + +// Raw returns the key-value pairs of the message. +func (m StatsMessage) Raw() map[string]interface{} { + return m +} + +// IcingaStatus extracts Icinga status information from the message into IcingaStatus and returns it. +func (m StatsMessage) IcingaStatus() (*IcingaStatus, error) { + if s, ok := m["IcingaApplication"].(string); ok { + var envelope struct { + Status struct { + IcingaApplication struct { + IcingaStatus `json:"app"` + } `json:"icingaapplication"` + } `json:"status"` + } + + if err := internal.UnmarshalJSON([]byte(s), &envelope); err != nil { + return nil, err + } + + return &envelope.Status.IcingaApplication.IcingaStatus, nil + } + + return nil, errors.Errorf(`bad message %#v. "IcingaApplication" missing`, m) +} + +// Time extracts the timestamp of the message into types.UnixMilli and returns it. +func (m StatsMessage) Time() (*types.UnixMilli, error) { + if s, ok := m["timestamp"].(string); ok { + var t types.UnixMilli + + if err := internal.UnmarshalJSON([]byte(s), &t); err != nil { + return nil, err + } + + return &t, nil + } + + return nil, errors.Errorf(`bad message %#v. "timestamp" missing`, m) +} |