summaryrefslogtreecommitdiffstats
path: root/cmd/icingadb-migrate/cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/icingadb-migrate/cache.go')
-rw-r--r--cmd/icingadb-migrate/cache.go298
1 files changed, 298 insertions, 0 deletions
diff --git a/cmd/icingadb-migrate/cache.go b/cmd/icingadb-migrate/cache.go
new file mode 100644
index 0000000..d1854c9
--- /dev/null
+++ b/cmd/icingadb-migrate/cache.go
@@ -0,0 +1,298 @@
+package main
+
+import (
+ "database/sql"
+ _ "embed"
+ "github.com/jmoiron/sqlx"
+ "github.com/pkg/errors"
+ "math"
+ "strings"
+ "time"
+)
+
+//go:embed embed/event_time_cache_schema.sql
+var eventTimeCacheSchema string
+
+//go:embed embed/previous_hard_state_cache_schema.sql
+var previousHardStateCacheSchema string
+
+// buildEventTimeCache rationale:
+//
+// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
+// That would make the IDO reading even slower than the Icinga DB writing.
+// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
+// and cache it into an SQLite database. Then steam from that database and the IDO.
+//
+// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
+func buildEventTimeCache(ht *historyType, idoColumns []string) {
+ type row = struct {
+ Id uint64
+ EventTime int64
+ EventTimeUsec uint32
+ EventIsStart uint8
+ ObjectId uint64
+ }
+
+ chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
+ var checkpoint struct {
+ Cnt int64
+ MaxId sql.NullInt64
+ }
+ cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
+
+ ht.bar.SetCurrent(checkpoint.Cnt * 2)
+
+ // Stream source data...
+ sliceIdoHistory(
+ ht,
+ "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
+ // For actual migration icinga_objects will be joined anyway,
+ // so it makes no sense to take vanished objects into account.
+ " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
+ ht.idoIdColumn+" <= :toid AND xh."+
+ ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
+ nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
+ func(idoRows []row) (checkpoint interface{}) {
+ for _, idoRow := range idoRows {
+ if idoRow.EventIsStart == 0 {
+ // Ack/flapping end event. Get the start event time:
+ var lst []struct {
+ EventTime int64
+ EventTimeUsec uint32
+ }
+ cacheSelect(
+ *tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
+ idoRow.ObjectId,
+ )
+
+ // If we have that, ...
+ if len(lst) > 0 {
+ // ... save the start event time for the actual migration:
+ cacheExec(
+ *tx,
+ "INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
+ idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
+ )
+
+ // This previously queried info isn't needed anymore.
+ cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
+ }
+ } else {
+ // Ack/flapping start event directly after another start event (per checkable).
+ // The old one won't have (but the new one will) an end event (which will need its time).
+ cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
+
+ // An ack/flapping start event. The following end event (per checkable) will need its time.
+ cacheExec(
+ *tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
+ idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
+ )
+ }
+
+ commitPeriodically()
+ checkpoint = idoRow.Id
+ }
+
+ ht.bar.IncrBy(len(idoRows))
+ return
+ },
+ )
+
+ // This never queried info isn't needed anymore.
+ cacheExec(*tx, "DELETE FROM last_start_time")
+ })
+
+ ht.bar.SetTotal(ht.bar.Current(), true)
+}
+
+// buildPreviousHardStateCache rationale:
+//
+// Icinga DB's state_history#previous_hard_state would need a subquery.
+// That make the IDO reading even slower than the Icinga DB writing.
+// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
+// and cache it into an SQLite database. Then steam from that database and the IDO.
+//
+// Similar for notifications. (On non-recoverable errors the whole program exits.)
+func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
+ type row = struct {
+ Id uint64
+ ObjectId uint64
+ LastHardState uint8
+ }
+
+ chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
+ var nextIds struct {
+ Cnt int64
+ MinId sql.NullInt64
+ }
+ cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
+
+ var previousHardStateCnt int64
+ cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
+
+ var checkpoint int64
+ if nextIds.MinId.Valid { // there are next_ids
+ checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
+ } else { // there aren't any next_ids
+ // next_ids contains the most recently processed IDs and is only empty if...
+ if previousHardStateCnt == 0 {
+ // ... we didn't actually start yet...
+ checkpoint = math.MaxInt64 // start from the largest (possible) ID
+ } else {
+ // ... or we've already finished.
+ checkpoint = 0 // make following query no-op
+ }
+ }
+
+ ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
+
+ // We continue where we finished before. As we build the cache in reverse chronological order:
+ // 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
+ // a. Start migration after Icinga DB is up and running.
+ // b. Remove the cache before the next migration trial.
+ // 2. If the history gets cleaned up between two migration trials,
+ // the difference either just doesn't appear in the cache or - if already there - will be ignored later.
+
+ // Stream source data...
+ sliceIdoHistory(
+ ht,
+ "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
+ // For actual migration icinga_objects will be joined anyway,
+ // so it makes no sense to take vanished objects into account.
+ " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
+ ht.idoIdColumn+" <= :toid AND xh."+
+ ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
+ nil, checkpoint, // ... since we were interrupted:
+ func(idoRows []row) (checkpoint interface{}) {
+ for _, idoRow := range idoRows {
+ var nhs []struct{ NextHardState uint8 }
+ cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
+
+ if len(nhs) < 1 { // we just started (per checkable)
+ // At the moment (we're "travelling back in time") that's the checkable's hard state:
+ cacheExec(
+ *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
+ idoRow.ObjectId, idoRow.LastHardState,
+ )
+
+ // But for the current time point the previous hard state isn't known, yet:
+ cacheExec(
+ *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
+ idoRow.Id, idoRow.ObjectId,
+ )
+ } else if idoRow.LastHardState == nhs[0].NextHardState {
+ // The hard state didn't change yet (per checkable),
+ // so this time point also awaits the previous hard state.
+ cacheExec(
+ *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
+ idoRow.Id, idoRow.ObjectId,
+ )
+ } else { // the hard state changed (per checkable)
+ // That past hard state is now available for the processed future time points:
+ cacheExec(
+ *tx,
+ "INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
+ "SELECT history_id, ? FROM next_ids WHERE object_id=?",
+ idoRow.LastHardState, idoRow.ObjectId,
+ )
+
+ // Now they have what they wanted:
+ cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
+ cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
+
+ // That's done.
+ // Now do the same thing as in the "we just started" case above, for the same reason:
+
+ cacheExec(
+ *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
+ idoRow.ObjectId, idoRow.LastHardState,
+ )
+
+ cacheExec(
+ *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
+ idoRow.Id, idoRow.ObjectId,
+ )
+ }
+
+ commitPeriodically()
+ checkpoint = idoRow.Id
+ }
+
+ ht.bar.IncrBy(len(idoRows))
+ return
+ },
+ )
+
+ // No past hard state is available for the processed future time points, assuming pending:
+ cacheExec(
+ *tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
+ )
+
+ // Now they should have what they wanted:
+ cacheExec(*tx, "DELETE FROM next_hard_state")
+ cacheExec(*tx, "DELETE FROM next_ids")
+ })
+
+ ht.bar.SetTotal(ht.bar.Current(), true)
+}
+
+// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
+// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
+// (On non-recoverable errors the whole program exits.)
+func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
+ logger := log.With("backend", "cache")
+
+ tx, err := cache.Beginx()
+ if err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
+ }
+
+ const commitInterval = 5 * time.Minute
+ nextCommit := time.Now().Add(commitInterval)
+
+ do(&tx, func() { // commitPeriodically
+ if now := time.Now(); now.After(nextCommit) {
+ if err := tx.Commit(); err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
+ }
+
+ var err error
+
+ tx, err = cache.Beginx()
+ if err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
+ }
+
+ nextCommit = nextCommit.Add(commitInterval)
+ }
+ })
+
+ if err := tx.Commit(); err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
+ }
+}
+
+// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
+func cacheGet(cache interface {
+ Get(dest interface{}, query string, args ...interface{}) error
+}, dest interface{}, query string, args ...interface{}) {
+ if err := cache.Get(dest, query, args...); err != nil {
+ log.With("backend", "cache", "query", query, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+}
+
+// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
+func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
+ if err := cacheTx.Select(dest, query, args...); err != nil {
+ log.With("backend", "cache", "query", query, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+}
+
+// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
+func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
+ if _, err := cacheTx.Exec(dml, args...); err != nil {
+ log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
+ }
+}