summaryrefslogtreecommitdiffstats
path: root/pkg/icingadb/delta.go
blob: 4f6d0989412bf9898c6a9d20e3c16517cce80e5d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package icingadb

import (
	"context"
	"fmt"
	"github.com/icinga/icingadb/pkg/common"
	"github.com/icinga/icingadb/pkg/contracts"
	"github.com/icinga/icingadb/pkg/logging"
	"github.com/icinga/icingadb/pkg/utils"
	"go.uber.org/zap"
	"time"
)

// Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted.
type Delta struct {
	Create  EntitiesById
	Update  EntitiesById
	Delete  EntitiesById
	Subject *common.SyncSubject
	done    chan error
	logger  *logging.Logger
}

// NewDelta creates a new Delta and starts calculating it. The caller must ensure
// that no duplicate entities are sent to the same stream.
func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta {
	delta := &Delta{
		Subject: subject,
		done:    make(chan error, 1),
		logger:  logger,
	}

	go delta.run(ctx, actual, desired)

	return delta
}

// Wait waits for the delta calculation to complete and returns an error, if any.
func (delta *Delta) Wait() error {
	return <-delta.done
}

func (delta *Delta) run(ctx context.Context, actualCh, desiredCh <-chan contracts.Entity) {
	defer close(delta.done)

	start := time.Now()
	var endActual, endDesired time.Time
	var numActual, numDesired uint64

	actual := EntitiesById{}  // only read from actualCh (so far)
	desired := EntitiesById{} // only read from desiredCh (so far)

	var update EntitiesById
	if delta.Subject.WithChecksum() {
		update = EntitiesById{} // read from actualCh and desiredCh with mismatching checksums
	}

	for actualCh != nil || desiredCh != nil {
		select {
		case actualValue, ok := <-actualCh:
			if !ok {
				endActual = time.Now()
				actualCh = nil // Done reading all actual entities, disable this case.
				break
			}
			numActual++

			id := actualValue.ID().String()
			if desiredValue, ok := desired[id]; ok {
				delete(desired, id)
				if update != nil && !checksumsMatch(actualValue, desiredValue) {
					update[id] = desiredValue
				}
			} else {
				actual[id] = actualValue
			}

		case desiredValue, ok := <-desiredCh:
			if !ok {
				endDesired = time.Now()
				desiredCh = nil // Done reading all desired entities, disable this case.
				break
			}
			numDesired++

			id := desiredValue.ID().String()
			if actualValue, ok := actual[id]; ok {
				delete(actual, id)
				if update != nil && !checksumsMatch(actualValue, desiredValue) {
					update[id] = desiredValue
				}
			} else {
				desired[id] = desiredValue
			}

		case <-ctx.Done():
			delta.done <- ctx.Err()
			return
		}
	}

	delta.Create = desired
	delta.Update = update
	delta.Delete = actual

	delta.logger.Debugw(fmt.Sprintf("Finished %s delta", utils.Name(delta.Subject.Entity())),
		zap.String("subject", utils.Name(delta.Subject.Entity())),
		zap.Duration("time_total", time.Since(start)),
		zap.Duration("time_actual", endActual.Sub(start)),
		zap.Duration("time_desired", endDesired.Sub(start)),
		zap.Uint64("num_actual", numActual),
		zap.Uint64("num_desired", numDesired),
		zap.Int("create", len(delta.Create)),
		zap.Int("update", len(delta.Update)),
		zap.Int("delete", len(delta.Delete)))
}

// checksumsMatch returns whether the checksums of two entities are the same.
// Both entities must implement contracts.Checksumer.
func checksumsMatch(a, b contracts.Entity) bool {
	c1 := a.(contracts.Checksumer).Checksum()
	c2 := b.(contracts.Checksumer).Checksum()
	return c1.Equal(c2)
}