summaryrefslogtreecommitdiffstats
path: root/pkg/icingadb/delta.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/icingadb/delta.go')
-rw-r--r--pkg/icingadb/delta.go124
1 files changed, 124 insertions, 0 deletions
diff --git a/pkg/icingadb/delta.go b/pkg/icingadb/delta.go
new file mode 100644
index 0000000..4f6d098
--- /dev/null
+++ b/pkg/icingadb/delta.go
@@ -0,0 +1,124 @@
+package icingadb
+
+import (
+ "context"
+ "fmt"
+ "github.com/icinga/icingadb/pkg/common"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "github.com/icinga/icingadb/pkg/logging"
+ "github.com/icinga/icingadb/pkg/utils"
+ "go.uber.org/zap"
+ "time"
+)
+
+// Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted.
+type Delta struct {
+ Create EntitiesById
+ Update EntitiesById
+ Delete EntitiesById
+ Subject *common.SyncSubject
+ done chan error
+ logger *logging.Logger
+}
+
+// NewDelta creates a new Delta and starts calculating it. The caller must ensure
+// that no duplicate entities are sent to the same stream.
+func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta {
+ delta := &Delta{
+ Subject: subject,
+ done: make(chan error, 1),
+ logger: logger,
+ }
+
+ go delta.run(ctx, actual, desired)
+
+ return delta
+}
+
+// Wait waits for the delta calculation to complete and returns an error, if any.
+func (delta *Delta) Wait() error {
+ return <-delta.done
+}
+
+func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
+ defer close(delta.done)
+
+ start := time.Now()
+ var endActual, endDesired time.Time
+ var numActual, numDesired uint64
+
+ actual := EntitiesById{} // only read from actualCh (so far)
+ desired := EntitiesById{} // only read from desiredCh (so far)
+
+ var update EntitiesById
+ if delta.Subject.WithChecksum() {
+ update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
+ }
+
+ for actualCh != nil || desiredCh != nil {
+ select {
+ case actualValue, ok := <-actualCh:
+ if !ok {
+ endActual = time.Now()
+ actualCh = nil // Done reading all actual entities, disable this case.
+ break
+ }
+ numActual++
+
+ id := actualValue.ID().String()
+ if desiredValue, ok := desired[id]; ok {
+ delete(desired, id)
+ if update != nil && !checksumsMatch(actualValue, desiredValue) {
+ update[id] = desiredValue
+ }
+ } else {
+ actual[id] = actualValue
+ }
+
+ case desiredValue, ok := <-desiredCh:
+ if !ok {
+ endDesired = time.Now()
+ desiredCh = nil // Done reading all desired entities, disable this case.
+ break
+ }
+ numDesired++
+
+ id := desiredValue.ID().String()
+ if actualValue, ok := actual[id]; ok {
+ delete(actual, id)
+ if update != nil && !checksumsMatch(actualValue, desiredValue) {
+ update[id] = desiredValue
+ }
+ } else {
+ desired[id] = desiredValue
+ }
+
+ case <-ctx.Done():
+ delta.done <- ctx.Err()
+ return
+ }
+ }
+
+ delta.Create = desired
+ delta.Update = update
+ delta.Delete = actual
+
+ delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())),
+ zap.String("subject", utils.Name(delta.Subject.Entity())),
+ zap.Duration("time_total", time.Since(start)),
+ zap.Duration("time_actual", endActual.Sub(start)),
+ zap.Duration("time_desired", endDesired.Sub(start)),
+ zap.Uint64("num_actual", numActual),
+ zap.Uint64("num_desired", numDesired),
+ zap.Int("create", len(delta.Create)),
+ zap.Int("update", len(delta.Update)),
+ zap.Int("delete", len(delta.Delete)))
+}
+
+// checksumsMatch returns whether the checksums of two entities are the same.
+// Both entities must implement contracts.Checksumer.
+func checksumsMatch(a, b contracts.Entity) bool {
+ c1 := a.(contracts.Checksumer).Checksum()
+ c2 := b.(contracts.Checksumer).Checksum()
+ return c1.Equal(c2)
+}