diff options
Diffstat (limited to 'pkg/icingadb/sync.go')
-rw-r--r-- | pkg/icingadb/sync.go | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go new file mode 100644 index 0000000..790f11e --- /dev/null +++ b/pkg/icingadb/sync.go @@ -0,0 +1,221 @@ +package icingadb + +import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/common" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingaredis" + "github.com/icinga/icingadb/pkg/icingaredis/telemetry" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "runtime" + "time" +) + +// Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities. +type Sync struct { + db *DB + redis *icingaredis.Client + logger *logging.Logger +} + +// NewSync returns a new Sync. +func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync { + return &Sync{ + db: db, + redis: redis, + logger: logger, + } +} + +// SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given +// sync subject using the Sync function. +func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error { + typeName := utils.Name(subject.Entity()) + key := "icinga:" + utils.Key(typeName, ':') + + startTime := time.Now() + logTicker := time.NewTicker(s.logger.Interval()) + defer logTicker.Stop() + loggedWaiting := false + + for { + select { + case <-logTicker.C: + s.logger.Infow("Waiting for dump done signal", + zap.String("type", typeName), + zap.String("key", key), + zap.Duration("duration", time.Since(startTime))) + loggedWaiting = true + case <-dump.Done(key): + logFn := s.logger.Debugw + if loggedWaiting { + logFn = s.logger.Infow + } + logFn("Starting sync", + zap.String("type", typeName), + zap.String("key", key), + zap.Duration("waited", time.Since(startTime))) + return s.Sync(ctx, subject) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. +// This function does not respect dump signals. For this, use SyncAfterDump. +func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { + g, ctx := errgroup.WithContext(ctx) + + desired, redisErrs := s.redis.YieldAll(ctx, subject) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, redisErrs) + + e, ok := v1.EnvironmentFromContext(ctx) + if !ok { + return errors.New("can't get environment from context") + } + + actual, dbErrs := s.db.YieldAll( + ctx, subject.FactoryForDelta(), + s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(), + ) + // Let errors from DB cancel our group. + com.ErrgroupReceive(g, dbErrs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actual, desired, subject, s.logger)) + }) + + return g.Wait() +} + +// ApplyDelta applies all changes from Delta to the database. +func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { + if err := delta.Wait(); err != nil { + return errors.Wrap(err, "can't calculate delta") + } + + g, ctx := errgroup.WithContext(ctx) + stat := getCounterForEntity(delta.Subject.Entity()) + + // Create + if len(delta.Create) > 0 { + s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + var entities <-chan contracts.Entity + if delta.Subject.WithChecksum() { + pairs, errs := s.redis.HMYield( + ctx, + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), + delta.Create.Keys()...) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, errs) + + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + entities, errs = icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Create, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + } else { + entities = delta.Create.Entities(ctx) + } + + g.Go(func() error { + return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) + }) + } + + // Update + if len(delta.Update) > 0 { + s.logger.Infof("Updating %d items of type %s", len(delta.Update), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + pairs, errs := s.redis.HMYield( + ctx, + fmt.Sprintf("icinga:%s", utils.Key(utils.Name(delta.Subject.Entity()), ':')), + delta.Update.Keys()...) + // Let errors from Redis cancel our group. + com.ErrgroupReceive(g, errs) + + entitiesWithoutChecksum, errs := icingaredis.CreateEntities(ctx, delta.Subject.Factory(), pairs, runtime.NumCPU()) + // Let errors from CreateEntities cancel our group. + com.ErrgroupReceive(g, errs) + entities, errs := icingaredis.SetChecksums(ctx, entitiesWithoutChecksum, delta.Update, runtime.NumCPU()) + // Let errors from SetChecksums cancel our group. + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + // Using upsert here on purpose as this is the fastest way to do bulk updates. + // However, there is a risk that errors in the sync implementation could silently insert new rows. + return s.db.UpsertStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) + }) + } + + // Delete + if len(delta.Delete) > 0 { + s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + g.Go(func() error { + return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) + }) + } + + return g.Wait() +} + +// SyncCustomvars synchronizes customvar and customvar_flat. +func (s Sync) SyncCustomvars(ctx context.Context) error { + e, ok := v1.EnvironmentFromContext(ctx) + if !ok { + return errors.New("can't get environment from context") + } + + g, ctx := errgroup.WithContext(ctx) + + cv := common.NewSyncSubject(v1.NewCustomvar) + + cvs, errs := s.redis.YieldAll(ctx, cv) + com.ErrgroupReceive(g, errs) + + desiredCvs, desiredFlatCvs, errs := v1.ExpandCustomvars(ctx, cvs) + com.ErrgroupReceive(g, errs) + + actualCvs, errs := s.db.YieldAll( + ctx, cv.FactoryForDelta(), + s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(), + ) + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actualCvs, desiredCvs, cv, s.logger)) + }) + + flatCv := common.NewSyncSubject(v1.NewCustomvarFlat) + + actualFlatCvs, errs := s.db.YieldAll( + ctx, flatCv.FactoryForDelta(), + s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(), + ) + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + return s.ApplyDelta(ctx, NewDelta(ctx, actualFlatCvs, desiredFlatCvs, flatCv, s.logger)) + }) + + return g.Wait() +} + +// getCounterForEntity returns the appropriate counter (config/state) from telemetry.Stats for e. +func getCounterForEntity(e contracts.Entity) *com.Counter { + switch e.(type) { + case *v1.HostState, *v1.ServiceState: + return &telemetry.Stats.State + default: + return &telemetry.Stats.Config + } +} |