1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
|
package main
import (
"database/sql"
_ "embed"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"math"
"strings"
"time"
)
//go:embed embed/event_time_cache_schema.sql
var eventTimeCacheSchema string
//go:embed embed/previous_hard_state_cache_schema.sql
var previousHardStateCacheSchema string
// buildEventTimeCache rationale:
//
// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
// That would make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
func buildEventTimeCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
EventTime int64
EventTimeUsec uint32
EventIsStart uint8
ObjectId uint64
}
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var checkpoint struct {
Cnt int64
MaxId sql.NullInt64
}
cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
ht.bar.SetCurrent(checkpoint.Cnt * 2)
// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
if idoRow.EventIsStart == 0 {
// Ack/flapping end event. Get the start event time:
var lst []struct {
EventTime int64
EventTimeUsec uint32
}
cacheSelect(
*tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
idoRow.ObjectId,
)
// If we have that, ...
if len(lst) > 0 {
// ... save the start event time for the actual migration:
cacheExec(
*tx,
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
)
// This previously queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
}
} else {
// Ack/flapping start event directly after another start event (per checkable).
// The old one won't have (but the new one will) an end event (which will need its time).
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
// An ack/flapping start event. The following end event (per checkable) will need its time.
cacheExec(
*tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
)
}
commitPeriodically()
checkpoint = idoRow.Id
}
ht.bar.IncrBy(len(idoRows))
return
},
)
// This never queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time")
})
ht.bar.SetTotal(ht.bar.Current(), true)
}
// buildPreviousHardStateCache rationale:
//
// Icinga DB's state_history#previous_hard_state would need a subquery.
// That make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for notifications. (On non-recoverable errors the whole program exits.)
func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
ObjectId uint64
LastHardState uint8
}
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var nextIds struct {
Cnt int64
MinId sql.NullInt64
}
cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
var previousHardStateCnt int64
cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
var checkpoint int64
if nextIds.MinId.Valid { // there are next_ids
checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
} else { // there aren't any next_ids
// next_ids contains the most recently processed IDs and is only empty if...
if previousHardStateCnt == 0 {
// ... we didn't actually start yet...
checkpoint = math.MaxInt64 // start from the largest (possible) ID
} else {
// ... or we've already finished.
checkpoint = 0 // make following query no-op
}
}
ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
// We continue where we finished before. As we build the cache in reverse chronological order:
// 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
// a. Start migration after Icinga DB is up and running.
// b. Remove the cache before the next migration trial.
// 2. If the history gets cleaned up between two migration trials,
// the difference either just doesn't appear in the cache or - if already there - will be ignored later.
// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
nil, checkpoint, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
var nhs []struct{ NextHardState uint8 }
cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
if len(nhs) < 1 { // we just started (per checkable)
// At the moment (we're "travelling back in time") that's the checkable's hard state:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)
// But for the current time point the previous hard state isn't known, yet:
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else if idoRow.LastHardState == nhs[0].NextHardState {
// The hard state didn't change yet (per checkable),
// so this time point also awaits the previous hard state.
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else { // the hard state changed (per checkable)
// That past hard state is now available for the processed future time points:
cacheExec(
*tx,
"INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
"SELECT history_id, ? FROM next_ids WHERE object_id=?",
idoRow.LastHardState, idoRow.ObjectId,
)
// Now they have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
// That's done.
// Now do the same thing as in the "we just started" case above, for the same reason:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
}
commitPeriodically()
checkpoint = idoRow.Id
}
ht.bar.IncrBy(len(idoRows))
return
},
)
// No past hard state is available for the processed future time points, assuming pending:
cacheExec(
*tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
)
// Now they should have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state")
cacheExec(*tx, "DELETE FROM next_ids")
})
ht.bar.SetTotal(ht.bar.Current(), true)
}
// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
// (On non-recoverable errors the whole program exits.)
func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
logger := log.With("backend", "cache")
tx, err := cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
const commitInterval = 5 * time.Minute
nextCommit := time.Now().Add(commitInterval)
do(&tx, func() { // commitPeriodically
if now := time.Now(); now.After(nextCommit) {
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
var err error
tx, err = cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
nextCommit = nextCommit.Add(commitInterval)
}
})
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
}
// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheGet(cache interface {
Get(dest interface{}, query string, args ...interface{}) error
}, dest interface{}, query string, args ...interface{}) {
if err := cache.Get(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
if err := cacheTx.Select(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
if _, err := cacheTx.Exec(dml, args...); err != nil {
log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
|