1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
package icingadb
import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"go.uber.org/zap"
"time"
)
// Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted.
type Delta struct {
Create EntitiesById
Update EntitiesById
Delete EntitiesById
Subject *common.SyncSubject
done chan error
logger *logging.Logger
}
// NewDelta creates a new Delta and starts calculating it. The caller must ensure
// that no duplicate entities are sent to the same stream.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta {
delta := &Delta{
Subject: subject,
done: make(chan error, 1),
logger: logger,
}
go delta.run(ctx, actual, desired)
return delta
}
// Wait waits for the delta calculation to complete and returns an error, if any.
func (delta *Delta) Wait() error {
return <-delta.done
}
func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
defer close(delta.done)
start := time.Now()
var endActual, endDesired time.Time
var numActual, numDesired uint64
actual := EntitiesById{} // only read from actualCh (so far)
desired := EntitiesById{} // only read from desiredCh (so far)
var update EntitiesById
if delta.Subject.WithChecksum() {
update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
}
for actualCh != nil || desiredCh != nil {
select {
case actualValue, ok := <-actualCh:
if !ok {
endActual = time.Now()
actualCh = nil // Done reading all actual entities, disable this case.
break
}
numActual++
id := actualValue.ID().String()
if desiredValue, ok := desired[id]; ok {
delete(desired, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
actual[id] = actualValue
}
case desiredValue, ok := <-desiredCh:
if !ok {
endDesired = time.Now()
desiredCh = nil // Done reading all desired entities, disable this case.
break
}
numDesired++
id := desiredValue.ID().String()
if actualValue, ok := actual[id]; ok {
delete(actual, id)
if update != nil && !checksumsMatch(actualValue, desiredValue) {
update[id] = desiredValue
}
} else {
desired[id] = desiredValue
}
case <-ctx.Done():
delta.done <- ctx.Err()
return
}
}
delta.Create = desired
delta.Update = update
delta.Delete = actual
delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())),
zap.String("subject", utils.Name(delta.Subject.Entity())),
zap.Duration("time_total", time.Since(start)),
zap.Duration("time_actual", endActual.Sub(start)),
zap.Duration("time_desired", endDesired.Sub(start)),
zap.Uint64("num_actual", numActual),
zap.Uint64("num_desired", numDesired),
zap.Int("create", len(delta.Create)),
zap.Int("update", len(delta.Update)),
zap.Int("delete", len(delta.Delete)))
}
// checksumsMatch returns whether the checksums of two entities are the same.
// Both entities must implement contracts.Checksumer.
func checksumsMatch(a, b contracts.Entity) bool {
c1 := a.(contracts.Checksumer).Checksum()
c2 := b.(contracts.Checksumer).Checksum()
return c1.Equal(c2)
}
|