diff options
Diffstat (limited to 'pkg/icingadb/history/sync.go')
-rw-r--r-- | pkg/icingadb/history/sync.go | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/pkg/icingadb/history/sync.go b/pkg/icingadb/history/sync.go new file mode 100644 index 0000000..dc8bc61 --- /dev/null +++ b/pkg/icingadb/history/sync.go @@ -0,0 +1,383 @@ +package history + +import ( + "context" + "github.com/go-redis/redis/v8" + "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + "github.com/icinga/icingadb/pkg/icingadb" + v1types "github.com/icinga/icingadb/pkg/icingadb/v1" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1/history" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/periodic" + "github.com/icinga/icingadb/pkg/structify" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "reflect" + "sync" +) + +// Sync specifies the source and destination of a history sync. +type Sync struct { + db *icingadb.DB + redis *icingaredis.Client + logger *logging.Logger +} + +// NewSync creates a new Sync. +func NewSync(db *icingadb.DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success. +func (s Sync) Sync(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + for key, pipeline := range syncPipelines { + key := key + pipeline := pipeline + + s.logger.Debugf("Starting %s history sync", key) + + // The pipeline consists of n+2 stages connected sequentially using n+1 channels of type chan redis.XMessage, + // where n = len(pipeline), i.e. the number of actual sync stages. So the resulting pipeline looks like this: + // + // readFromRedis() Reads from redis and sends the history entries to the next stage + // ↓ ch[0] + // pipeline[0]() First actual sync stage, receives history items from the previous stage, syncs them + // and once completed, sends them off to the next stage. + // ↓ ch[1] + // ... There may be a different number of pipeline stages in between. + // ↓ ch[n-1] + // pipeline[n-1]() Last actual sync stage, once it's done, sends the history item to the final stage. + // ↓ ch[n] + // deleteFromRedis() After all stages have processed a message successfully, this final stage deletes + // the history entry from the Redis stream as it is now persisted in the database. + // + // Each history entry is processed by at most one stage at each time. Each state must forward the entry after + // it has processed it, even if the stage itself does not do anything with this specific entry. It should only + // forward the entry after it has completed its own sync so that later stages can rely on previous stages being + // executed successfully. + + ch := make([]chan redis.XMessage, len(pipeline)+1) + for i := range ch { + if i == 0 { + // Make the first channel buffered so that all items of one read iteration fit into the channel. + // This allows starting the next Redis XREAD right after the previous one has finished. + ch[i] = make(chan redis.XMessage, s.redis.Options.XReadCount) + } else { + ch[i] = make(chan redis.XMessage) + } + } + + g.Go(func() error { + return s.readFromRedis(ctx, key, ch[0]) + }) + + for i, stage := range pipeline { + i := i + stage := stage + + g.Go(func() error { + return stage(ctx, s, key, ch[i], ch[i+1]) + }) + } + + g.Go(func() error { + return s.deleteFromRedis(ctx, key, ch[len(pipeline)]) + }) + } + + return g.Wait() +} + +// readFromRedis is the first stage of the history sync pipeline. It reads the history stream from Redis +// and feeds the history entries into the next stage. +func (s Sync) readFromRedis(ctx context.Context, key string, output chan<- redis.XMessage) error { + defer close(output) + + xra := &redis.XReadArgs{ + Streams: []string{"icinga:history:stream:" + key, "0-0"}, + Count: int64(s.redis.Options.XReadCount), + } + + for { + streams, err := s.redis.XReadUntilResult(ctx, xra) + if err != nil { + return errors.Wrap(err, "can't read history") + } + + for _, stream := range streams { + for _, message := range stream.Messages { + xra.Streams[1] = message.ID + + select { + case output <- message: + case <-ctx.Done(): + return ctx.Err() + } + } + } + } +} + +// deleteFromRedis is the last stage of the history sync pipeline. It receives history entries from the second to last +// pipeline stage and then deletes the stream entry from Redis as all pipeline stages successfully processed the entry. +func (s Sync) deleteFromRedis(ctx context.Context, key string, input <-chan redis.XMessage) error { + var counter com.Counter + defer periodic.Start(ctx, s.logger.Interval(), func(_ periodic.Tick) { + if count := counter.Reset(); count > 0 { + s.logger.Infof("Synced %d %s history items", count, key) + } + }).Stop() + + bulks := com.Bulk(ctx, input, s.redis.Options.HScanCount, com.NeverSplit[redis.XMessage]) + stream := "icinga:history:stream:" + key + for { + select { + case bulk := <-bulks: + ids := make([]string, len(bulk)) + for i := range bulk { + ids[i] = bulk[i].ID + } + + cmd := s.redis.XDel(ctx, stream, ids...) + if _, err := cmd.Result(); err != nil { + return icingaredis.WrapCmdErr(cmd) + } + + counter.Add(uint64(len(ids))) + telemetry.Stats.History.Add(uint64(len(ids))) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// stageFunc is a function type that represents a sync pipeline stage. It is called with a context (it should stop +// once that context is canceled), the Sync instance (for access to Redis, SQL database, logging), the key (information +// about which pipeline this function is running in, i.e. "notification"), an in channel for the stage to read history +// events from and an out channel to forward history entries to after processing them successfully. A stage function +// is supposed to forward each message from in to out, even if the event is not relevant for the current stage. On +// error conditions, the message must not be forwarded to the next stage so that the event is not deleted from Redis +// and can be processed at a later time. +type stageFunc func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error + +// writeOneEntityStage creates a stageFunc from a pointer to a struct implementing the v1.UpserterEntity interface. +// For each history event it receives, it parses that event into a new instance of that entity type and writes it to +// the database. It writes exactly one entity to the database for each history event. +func writeOneEntityStage(structPtr interface{}) stageFunc { + structifier := structify.MakeMapStructifier(reflect.TypeOf(structPtr).Elem(), "json") + + return writeMultiEntityStage(func(entry redis.XMessage) ([]v1.UpserterEntity, error) { + ptr, err := structifier(entry.Values) + if err != nil { + return nil, errors.Wrapf(err, "can't structify values %#v", entry.Values) + } + return []v1.UpserterEntity{ptr.(v1.UpserterEntity)}, nil + }) +} + +// writeMultiEntityStage creates a stageFunc from a function that takes a history event as an input and returns a +// (potentially empty) slice of v1.UpserterEntity instances that it then inserts into the database. +func writeMultiEntityStage(entryToEntities func(entry redis.XMessage) ([]v1.UpserterEntity, error)) stageFunc { + return func(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + type State struct { + Message redis.XMessage // Original event from Redis. + Pending int // Number of pending entities. When reaching 0, the message is forwarded to out. + } + + bufSize := s.db.Options.MaxPlaceholdersPerStatement + insert := make(chan contracts.Entity, bufSize) // Events sent to the database for insertion. + inserted := make(chan contracts.Entity) // Events returned by the database after successful insertion. + skipped := make(chan redis.XMessage) // Events skipping insert/inserted (no entities generated). + state := make(map[contracts.Entity]*State) // Shared state between all entities created by one event. + var stateMu sync.Mutex // Synchronizes concurrent access to state. + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(insert) + defer close(skipped) + + for { + select { + case e, ok := <-in: + if !ok { + return nil + } + + entities, err := entryToEntities(e) + if err != nil { + return err + } + + if len(entities) == 0 { + skipped <- e + } else { + st := &State{ + Message: e, + Pending: len(entities), + } + + stateMu.Lock() + for _, entity := range entities { + state[entity] = st + } + stateMu.Unlock() + + for _, entity := range entities { + select { + case insert <- entity: + case <-ctx.Done(): + return ctx.Err() + } + } + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + defer close(inserted) + + return s.db.UpsertStreamed(ctx, insert, icingadb.OnSuccessSendTo[contracts.Entity](inserted)) + }) + + g.Go(func() error { + defer close(out) + + for { + select { + case e, ok := <-inserted: + if !ok { + return nil + } + + stateMu.Lock() + st := state[e] + delete(state, e) + stateMu.Unlock() + + st.Pending-- + if st.Pending == 0 { + select { + case out <- st.Message: + case <-ctx.Done(): + return ctx.Err() + } + } + + case m, ok := <-skipped: + if !ok { + return nil + } + + select { + case out <- m: + case <-ctx.Done(): + return ctx.Err() + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() + } +} + +// userNotificationStage is a specialized stageFunc that populates the user_notification_history table. It is executed +// on the notification history stream and uses the users_notified_ids attribute to create an entry in the +// user_notification_history relation table for each user ID. +func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error { + type NotificationHistory struct { + Id types.Binary `structify:"id"` + EnvironmentId types.Binary `structify:"environment_id"` + EndpointId types.Binary `structify:"endpoint_id"` + UserIds types.String `structify:"users_notified_ids"` + } + + structifier := structify.MakeMapStructifier(reflect.TypeOf((*NotificationHistory)(nil)).Elem(), "structify") + + return writeMultiEntityStage(func(entry redis.XMessage) ([]v1.UpserterEntity, error) { + rawNotificationHistory, err := structifier(entry.Values) + if err != nil { + return nil, err + } + notificationHistory := rawNotificationHistory.(*NotificationHistory) + + if !notificationHistory.UserIds.Valid { + return nil, nil + } + + var users []types.Binary + err = internal.UnmarshalJSON([]byte(notificationHistory.UserIds.String), &users) + if err != nil { + return nil, err + } + + var userNotifications []v1.UpserterEntity + + for _, user := range users { + userNotifications = append(userNotifications, &v1.UserNotificationHistory{ + EntityWithoutChecksum: v1types.EntityWithoutChecksum{ + IdMeta: v1types.IdMeta{ + Id: utils.Checksum(append(append([]byte(nil), notificationHistory.Id...), user...)), + }, + }, + EnvironmentMeta: v1types.EnvironmentMeta{ + EnvironmentId: notificationHistory.EnvironmentId, + }, + NotificationHistoryId: notificationHistory.Id, + UserId: user, + }) + } + + return userNotifications, nil + })(ctx, s, key, in, out) +} + +var syncPipelines = map[string][]stageFunc{ + "notification": { + writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history + userNotificationStage, // user_notification_history (depends on notification_history) + writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history) + }, + "state": { + writeOneEntityStage((*v1.StateHistory)(nil)), // state_history + writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history) + writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state + }, + "downtime": { + writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history + writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history) + writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime + }, + "comment": { + writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history + writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history) + }, + "flapping": { + writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history + writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history) + }, + "acknowledgement": { + writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history + writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history) + }, +} |