summaryrefslogtreecommitdiffstats
path: root/cmd/icingadb-migrate
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/icingadb-migrate')
-rw-r--r--cmd/icingadb-migrate/cache.go298
-rw-r--r--cmd/icingadb-migrate/convert.go865
-rw-r--r--cmd/icingadb-migrate/embed/comment_query.sql11
-rw-r--r--cmd/icingadb-migrate/embed/downtime_query.sql14
-rw-r--r--cmd/icingadb-migrate/embed/event_time_cache_schema.sql15
-rw-r--r--cmd/icingadb-migrate/embed/flapping_query.sql9
-rw-r--r--cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql9
-rw-r--r--cmd/icingadb-migrate/embed/notification_query.sql9
-rw-r--r--cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql22
-rw-r--r--cmd/icingadb-migrate/embed/state_query.sql9
-rw-r--r--cmd/icingadb-migrate/main.go488
-rw-r--r--cmd/icingadb-migrate/misc.go321
12 files changed, 2070 insertions, 0 deletions
diff --git a/cmd/icingadb-migrate/cache.go b/cmd/icingadb-migrate/cache.go
new file mode 100644
index 0000000..d1854c9
--- /dev/null
+++ b/cmd/icingadb-migrate/cache.go
@@ -0,0 +1,298 @@
+package main
+
+import (
+ "database/sql"
+ _ "embed"
+ "github.com/jmoiron/sqlx"
+ "github.com/pkg/errors"
+ "math"
+ "strings"
+ "time"
+)
+
+//go:embed embed/event_time_cache_schema.sql
+var eventTimeCacheSchema string
+
+//go:embed embed/previous_hard_state_cache_schema.sql
+var previousHardStateCacheSchema string
+
+// buildEventTimeCache rationale:
+//
+// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
+// That would make the IDO reading even slower than the Icinga DB writing.
+// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
+// and cache it into an SQLite database. Then steam from that database and the IDO.
+//
+// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
+func buildEventTimeCache(ht *historyType, idoColumns []string) {
+ type row = struct {
+ Id uint64
+ EventTime int64
+ EventTimeUsec uint32
+ EventIsStart uint8
+ ObjectId uint64
+ }
+
+ chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
+ var checkpoint struct {
+ Cnt int64
+ MaxId sql.NullInt64
+ }
+ cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
+
+ ht.bar.SetCurrent(checkpoint.Cnt * 2)
+
+ // Stream source data...
+ sliceIdoHistory(
+ ht,
+ "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
+ // For actual migration icinga_objects will be joined anyway,
+ // so it makes no sense to take vanished objects into account.
+ " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
+ ht.idoIdColumn+" <= :toid AND xh."+
+ ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
+ nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
+ func(idoRows []row) (checkpoint interface{}) {
+ for _, idoRow := range idoRows {
+ if idoRow.EventIsStart == 0 {
+ // Ack/flapping end event. Get the start event time:
+ var lst []struct {
+ EventTime int64
+ EventTimeUsec uint32
+ }
+ cacheSelect(
+ *tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
+ idoRow.ObjectId,
+ )
+
+ // If we have that, ...
+ if len(lst) > 0 {
+ // ... save the start event time for the actual migration:
+ cacheExec(
+ *tx,
+ "INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
+ idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
+ )
+
+ // This previously queried info isn't needed anymore.
+ cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
+ }
+ } else {
+ // Ack/flapping start event directly after another start event (per checkable).
+ // The old one won't have (but the new one will) an end event (which will need its time).
+ cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
+
+ // An ack/flapping start event. The following end event (per checkable) will need its time.
+ cacheExec(
+ *tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
+ idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
+ )
+ }
+
+ commitPeriodically()
+ checkpoint = idoRow.Id
+ }
+
+ ht.bar.IncrBy(len(idoRows))
+ return
+ },
+ )
+
+ // This never queried info isn't needed anymore.
+ cacheExec(*tx, "DELETE FROM last_start_time")
+ })
+
+ ht.bar.SetTotal(ht.bar.Current(), true)
+}
+
+// buildPreviousHardStateCache rationale:
+//
+// Icinga DB's state_history#previous_hard_state would need a subquery.
+// That make the IDO reading even slower than the Icinga DB writing.
+// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
+// and cache it into an SQLite database. Then steam from that database and the IDO.
+//
+// Similar for notifications. (On non-recoverable errors the whole program exits.)
+func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
+ type row = struct {
+ Id uint64
+ ObjectId uint64
+ LastHardState uint8
+ }
+
+ chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
+ var nextIds struct {
+ Cnt int64
+ MinId sql.NullInt64
+ }
+ cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
+
+ var previousHardStateCnt int64
+ cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
+
+ var checkpoint int64
+ if nextIds.MinId.Valid { // there are next_ids
+ checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
+ } else { // there aren't any next_ids
+ // next_ids contains the most recently processed IDs and is only empty if...
+ if previousHardStateCnt == 0 {
+ // ... we didn't actually start yet...
+ checkpoint = math.MaxInt64 // start from the largest (possible) ID
+ } else {
+ // ... or we've already finished.
+ checkpoint = 0 // make following query no-op
+ }
+ }
+
+ ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
+
+ // We continue where we finished before. As we build the cache in reverse chronological order:
+ // 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
+ // a. Start migration after Icinga DB is up and running.
+ // b. Remove the cache before the next migration trial.
+ // 2. If the history gets cleaned up between two migration trials,
+ // the difference either just doesn't appear in the cache or - if already there - will be ignored later.
+
+ // Stream source data...
+ sliceIdoHistory(
+ ht,
+ "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
+ // For actual migration icinga_objects will be joined anyway,
+ // so it makes no sense to take vanished objects into account.
+ " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
+ ht.idoIdColumn+" <= :toid AND xh."+
+ ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
+ nil, checkpoint, // ... since we were interrupted:
+ func(idoRows []row) (checkpoint interface{}) {
+ for _, idoRow := range idoRows {
+ var nhs []struct{ NextHardState uint8 }
+ cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
+
+ if len(nhs) < 1 { // we just started (per checkable)
+ // At the moment (we're "travelling back in time") that's the checkable's hard state:
+ cacheExec(
+ *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
+ idoRow.ObjectId, idoRow.LastHardState,
+ )
+
+ // But for the current time point the previous hard state isn't known, yet:
+ cacheExec(
+ *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
+ idoRow.Id, idoRow.ObjectId,
+ )
+ } else if idoRow.LastHardState == nhs[0].NextHardState {
+ // The hard state didn't change yet (per checkable),
+ // so this time point also awaits the previous hard state.
+ cacheExec(
+ *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
+ idoRow.Id, idoRow.ObjectId,
+ )
+ } else { // the hard state changed (per checkable)
+ // That past hard state is now available for the processed future time points:
+ cacheExec(
+ *tx,
+ "INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
+ "SELECT history_id, ? FROM next_ids WHERE object_id=?",
+ idoRow.LastHardState, idoRow.ObjectId,
+ )
+
+ // Now they have what they wanted:
+ cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
+ cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
+
+ // That's done.
+ // Now do the same thing as in the "we just started" case above, for the same reason:
+
+ cacheExec(
+ *tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
+ idoRow.ObjectId, idoRow.LastHardState,
+ )
+
+ cacheExec(
+ *tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
+ idoRow.Id, idoRow.ObjectId,
+ )
+ }
+
+ commitPeriodically()
+ checkpoint = idoRow.Id
+ }
+
+ ht.bar.IncrBy(len(idoRows))
+ return
+ },
+ )
+
+ // No past hard state is available for the processed future time points, assuming pending:
+ cacheExec(
+ *tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
+ )
+
+ // Now they should have what they wanted:
+ cacheExec(*tx, "DELETE FROM next_hard_state")
+ cacheExec(*tx, "DELETE FROM next_ids")
+ })
+
+ ht.bar.SetTotal(ht.bar.Current(), true)
+}
+
+// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
+// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
+// (On non-recoverable errors the whole program exits.)
+func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
+ logger := log.With("backend", "cache")
+
+ tx, err := cache.Beginx()
+ if err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
+ }
+
+ const commitInterval = 5 * time.Minute
+ nextCommit := time.Now().Add(commitInterval)
+
+ do(&tx, func() { // commitPeriodically
+ if now := time.Now(); now.After(nextCommit) {
+ if err := tx.Commit(); err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
+ }
+
+ var err error
+
+ tx, err = cache.Beginx()
+ if err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
+ }
+
+ nextCommit = nextCommit.Add(commitInterval)
+ }
+ })
+
+ if err := tx.Commit(); err != nil {
+ logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
+ }
+}
+
+// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
+func cacheGet(cache interface {
+ Get(dest interface{}, query string, args ...interface{}) error
+}, dest interface{}, query string, args ...interface{}) {
+ if err := cache.Get(dest, query, args...); err != nil {
+ log.With("backend", "cache", "query", query, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+}
+
+// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
+func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
+ if err := cacheTx.Select(dest, query, args...); err != nil {
+ log.With("backend", "cache", "query", query, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+}
+
+// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
+func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
+ if _, err := cacheTx.Exec(dml, args...); err != nil {
+ log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
+ }
+}
diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go
new file mode 100644
index 0000000..e14746e
--- /dev/null
+++ b/cmd/icingadb-migrate/convert.go
@@ -0,0 +1,865 @@
+package main
+
+import (
+ "database/sql"
+ _ "embed"
+ "github.com/icinga/icingadb/pkg/contracts"
+ v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
+ "github.com/icinga/icingadb/pkg/icingadb/v1/history"
+ icingadbTypes "github.com/icinga/icingadb/pkg/types"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/jmoiron/sqlx"
+ "github.com/pkg/errors"
+ "strconv"
+ "strings"
+ "time"
+)
+
+//go:embed embed/comment_query.sql
+var commentMigrationQuery string
+
+//go:embed embed/downtime_query.sql
+var downtimeMigrationQuery string
+
+//go:embed embed/flapping_query.sql
+var flappingMigrationQuery string
+
+//go:embed embed/notification_query.sql
+var notificationMigrationQuery string
+
+//go:embed embed/state_query.sql
+var stateMigrationQuery string
+
+type commentRow = struct {
+ CommenthistoryId uint64
+ EntryTime sql.NullInt64
+ EntryTimeUsec uint32
+ EntryType uint8
+ AuthorName string
+ CommentData string
+ IsPersistent uint8
+ ExpirationTime int64
+ DeletionTime int64
+ DeletionTimeUsec uint32
+ Name string
+ ObjecttypeId uint8
+ Name1 string
+ Name2 string
+}
+
+func convertCommentRows(
+ env string, envId icingadbTypes.Binary,
+ _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow,
+) (stages []icingaDbOutputStage, checkpoint any) {
+ var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity
+
+ for _, row := range idoRows {
+ checkpoint = row.CommenthistoryId
+
+ if !row.EntryTime.Valid {
+ continue
+ }
+
+ typ := objectTypes[row.ObjecttypeId]
+ hostId := calcObjectId(env, row.Name1)
+ serviceId := calcServiceId(env, row.Name1, row.Name2)
+
+ switch row.EntryType {
+ case 1: // user
+ id := calcObjectId(env, row.Name)
+ entryTime := convertTime(row.EntryTime.Int64, row.EntryTimeUsec)
+ removeTime := convertTime(row.DeletionTime, row.DeletionTimeUsec)
+ expireTime := convertTime(row.ExpirationTime, 0)
+
+ commentHistory = append(commentHistory, &history.CommentHistory{
+ CommentHistoryEntity: history.CommentHistoryEntity{CommentId: id},
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ CommentHistoryUpserter: history.CommentHistoryUpserter{
+ RemoveTime: removeTime,
+ HasBeenRemoved: icingadbTypes.Bool{Bool: !removeTime.Time().IsZero(), Valid: true},
+ },
+ EntryTime: entryTime,
+ Author: row.AuthorName,
+ Comment: row.CommentData,
+ EntryType: icingadbTypes.CommentType(row.EntryType),
+ IsPersistent: icingadbTypes.Bool{Bool: row.IsPersistent != 0, Valid: true},
+ IsSticky: icingadbTypes.Bool{Bool: false, Valid: true},
+ ExpireTime: expireTime,
+ })
+
+ h1 := &history.HistoryComment{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_add", row.Name})},
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "comment_add",
+ },
+ CommentHistoryId: id,
+ EntryTime: entryTime,
+ }
+
+ h1.EventTime.History = h1
+ allHistoryComment = append(allHistoryComment, h1)
+
+ if !removeTime.Time().IsZero() { // remove
+ h2 := &history.HistoryComment{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_remove", row.Name})},
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "comment_remove",
+ },
+ CommentHistoryId: id,
+ EntryTime: entryTime,
+ RemoveTime: removeTime,
+ ExpireTime: expireTime,
+ }
+
+ h2.EventTime.History = h2
+ allHistoryComment = append(allHistoryComment, h2)
+ }
+ case 4: // ack
+ name := row.Name1
+ if row.Name2 != "" {
+ name += "!" + row.Name2
+ }
+
+ setTime := convertTime(row.EntryTime.Int64, row.EntryTimeUsec)
+ setTs := float64(setTime.Time().UnixMilli())
+ clearTime := convertTime(row.DeletionTime, row.DeletionTimeUsec)
+ acknowledgementHistoryId := hashAny([]any{env, name, setTs})
+
+ acknowledgementHistory = append(acknowledgementHistory, &history.AcknowledgementHistory{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{Id: acknowledgementHistoryId},
+ },
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ AckHistoryUpserter: history.AckHistoryUpserter{ClearTime: clearTime},
+ SetTime: setTime,
+ Author: icingadbTypes.String{
+ NullString: sql.NullString{
+ String: row.AuthorName,
+ Valid: true,
+ },
+ },
+ Comment: icingadbTypes.String{
+ NullString: sql.NullString{
+ String: row.CommentData,
+ Valid: true,
+ },
+ },
+ ExpireTime: convertTime(row.ExpirationTime, 0),
+ IsPersistent: icingadbTypes.Bool{
+ Bool: row.IsPersistent != 0,
+ Valid: true,
+ },
+ })
+
+ h1 := &history.HistoryAck{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{
+ Id: hashAny([]any{env, "ack_set", name, setTs}),
+ },
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "ack_set",
+ },
+ AcknowledgementHistoryId: acknowledgementHistoryId,
+ SetTime: setTime,
+ ClearTime: clearTime,
+ }
+
+ h1.EventTime.History = h1
+ allHistoryAck = append(allHistoryAck, h1)
+
+ if !clearTime.Time().IsZero() {
+ h2 := &history.HistoryAck{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{
+ Id: hashAny([]any{env, "ack_clear", name, setTs}),
+ },
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "ack_clear",
+ },
+ AcknowledgementHistoryId: acknowledgementHistoryId,
+ SetTime: setTime,
+ ClearTime: clearTime,
+ }
+
+ h2.EventTime.History = h2
+ allHistoryAck = append(allHistoryAck, h2)
+ }
+ }
+ }
+
+ stages = []icingaDbOutputStage{
+ {insert: commentHistory},
+ {insert: acknowledgementHistory},
+ {insert: allHistoryComment},
+ {insert: allHistoryAck},
+ }
+ return
+}
+
+type downtimeRow = struct {
+ DowntimehistoryId uint64
+ EntryTime int64
+ AuthorName string
+ CommentData string
+ IsFixed uint8
+ Duration int64
+ ScheduledStartTime sql.NullInt64
+ ScheduledEndTime int64
+ WasStarted uint8
+ ActualStartTime int64
+ ActualStartTimeUsec uint32
+ ActualEndTime int64
+ ActualEndTimeUsec uint32
+ WasCancelled uint8
+ TriggerTime int64
+ Name string
+ ObjecttypeId uint8
+ Name1 string
+ Name2 string
+ TriggeredBy string
+}
+
+func convertDowntimeRows(
+ env string, envId icingadbTypes.Binary,
+ _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow,
+) (stages []icingaDbOutputStage, checkpoint any) {
+ var downtimeHistory, allHistory, sla []contracts.Entity
+
+ for _, row := range idoRows {
+ checkpoint = row.DowntimehistoryId
+
+ if !row.ScheduledStartTime.Valid || row.WasStarted == 0 {
+ continue
+ }
+
+ id := calcObjectId(env, row.Name)
+ typ := objectTypes[row.ObjecttypeId]
+ hostId := calcObjectId(env, row.Name1)
+ serviceId := calcServiceId(env, row.Name1, row.Name2)
+ scheduledStart := convertTime(row.ScheduledStartTime.Int64, 0)
+ scheduledEnd := convertTime(row.ScheduledEndTime, 0)
+ triggerTime := convertTime(row.TriggerTime, 0)
+ actualStart := convertTime(row.ActualStartTime, row.ActualStartTimeUsec)
+ actualEnd := convertTime(row.ActualEndTime, row.ActualEndTimeUsec)
+ var startTime, endTime, cancelTime icingadbTypes.UnixMilli
+
+ if scheduledEnd.Time().IsZero() {
+ scheduledEnd = icingadbTypes.UnixMilli(scheduledStart.Time().Add(time.Duration(row.Duration) * time.Second))
+ }
+
+ if actualStart.Time().IsZero() {
+ startTime = scheduledStart
+ } else {
+ startTime = actualStart
+ }
+
+ if actualEnd.Time().IsZero() {
+ endTime = scheduledEnd
+ } else {
+ endTime = actualEnd
+ }
+
+ if triggerTime.Time().IsZero() {
+ triggerTime = startTime
+ }
+
+ if row.WasCancelled != 0 {
+ cancelTime = actualEnd
+ }
+
+ downtimeHistory = append(downtimeHistory, &history.DowntimeHistory{
+ DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id},
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ DowntimeHistoryUpserter: history.DowntimeHistoryUpserter{
+ HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
+ CancelTime: cancelTime,
+ },
+ TriggeredById: calcObjectId(env, row.TriggeredBy),
+ EntryTime: convertTime(row.EntryTime, 0),
+ Author: row.AuthorName,
+ Comment: row.CommentData,
+ IsFlexible: icingadbTypes.Bool{Bool: row.IsFixed == 0, Valid: true},
+ FlexibleDuration: uint64(row.Duration) * 1000,
+ ScheduledStartTime: scheduledStart,
+ ScheduledEndTime: scheduledEnd,
+ StartTime: startTime,
+ EndTime: endTime,
+ TriggerTime: triggerTime,
+ })
+
+ h1 := &history.HistoryDowntime{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_start", row.Name})},
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "downtime_start",
+ },
+ DowntimeHistoryId: id,
+ StartTime: startTime,
+ }
+
+ h1.EventTime.History = h1
+ allHistory = append(allHistory, h1)
+
+ if !actualEnd.Time().IsZero() { // remove
+ h2 := &history.HistoryDowntime{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_end", row.Name})},
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "downtime_end",
+ },
+ DowntimeHistoryId: id,
+ StartTime: startTime,
+ CancelTime: cancelTime,
+ EndTime: endTime,
+ HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
+ }
+
+ h2.EventTime.History = h2
+ allHistory = append(allHistory, h2)
+ }
+
+ s := &history.SlaHistoryDowntime{
+ DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id},
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ DowntimeStart: startTime,
+ HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
+ CancelTime: cancelTime,
+ EndTime: endTime,
+ }
+
+ s.DowntimeEnd.History = s
+ sla = append(sla, s)
+ }
+
+ stages = []icingaDbOutputStage{
+ {insert: downtimeHistory},
+ {insert: allHistory},
+ {insert: sla},
+ }
+ return
+}
+
+type flappingRow = struct {
+ FlappinghistoryId uint64
+ EventTime sql.NullInt64
+ EventTimeUsec uint32
+ EventType uint16
+ PercentStateChange sql.NullFloat64
+ LowThreshold float64
+ HighThreshold float64
+ ObjecttypeId uint8
+ Name1 string
+ Name2 string
+}
+
+func convertFlappingRows(
+ env string, envId icingadbTypes.Binary,
+ selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow,
+) (stages []icingaDbOutputStage, checkpoint any) {
+ if len(idoRows) < 1 {
+ return
+ }
+
+ var cached []struct {
+ HistoryId uint64
+ EventTime int64
+ EventTimeUsec uint32
+ }
+ selectCache(
+ &cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?",
+ idoRows[0].FlappinghistoryId, idoRows[len(idoRows)-1].FlappinghistoryId,
+ )
+
+ // Needed for start time (see below).
+ cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached))
+ for _, c := range cached {
+ cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec)
+ }
+
+ var flappingHistory, flappingHistoryUpserts, allHistory []contracts.Entity
+ for _, row := range idoRows {
+ checkpoint = row.FlappinghistoryId
+
+ if !row.EventTime.Valid {
+ continue
+ }
+
+ ts := convertTime(row.EventTime.Int64, row.EventTimeUsec)
+
+ // Needed for ID (see below).
+ var start icingadbTypes.UnixMilli
+ if row.EventType == 1001 { // end
+ var ok bool
+ start, ok = cachedById[row.FlappinghistoryId]
+
+ if !ok {
+ continue
+ }
+ } else {
+ start = ts
+ }
+
+ name := row.Name1
+ if row.Name2 != "" {
+ name += "!" + row.Name2
+ }
+
+ typ := objectTypes[row.ObjecttypeId]
+ hostId := calcObjectId(env, row.Name1)
+ serviceId := calcServiceId(env, row.Name1, row.Name2)
+ startTime := float64(start.Time().UnixMilli())
+ flappingHistoryId := hashAny([]interface{}{env, name, startTime})
+
+ if row.EventType == 1001 { // end
+ // The start counterpart should already have been inserted.
+ flappingHistoryUpserts = append(flappingHistoryUpserts, &history.FlappingHistory{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{Id: flappingHistoryId},
+ },
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ FlappingHistoryUpserter: history.FlappingHistoryUpserter{
+ EndTime: ts,
+ PercentStateChangeEnd: icingadbTypes.Float{NullFloat64: row.PercentStateChange},
+ FlappingThresholdLow: float32(row.LowThreshold),
+ FlappingThresholdHigh: float32(row.HighThreshold),
+ },
+ StartTime: start,
+ })
+
+ h := &history.HistoryFlapping{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{
+ Id: hashAny([]interface{}{env, "flapping_end", name, startTime}),
+ },
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "flapping_end",
+ },
+ FlappingHistoryId: flappingHistoryId,
+ StartTime: start,
+ EndTime: ts,
+ }
+
+ h.EventTime.History = h
+ allHistory = append(allHistory, h)
+ } else {
+ flappingHistory = append(flappingHistory, &history.FlappingHistory{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{Id: flappingHistoryId},
+ },
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ FlappingHistoryUpserter: history.FlappingHistoryUpserter{
+ FlappingThresholdLow: float32(row.LowThreshold),
+ FlappingThresholdHigh: float32(row.HighThreshold),
+ },
+ StartTime: start,
+ PercentStateChangeStart: icingadbTypes.Float{NullFloat64: row.PercentStateChange},
+ })
+
+ h := &history.HistoryFlapping{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{
+ Id: hashAny([]interface{}{env, "flapping_start", name, startTime}),
+ },
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "flapping_start",
+ },
+ FlappingHistoryId: flappingHistoryId,
+ StartTime: start,
+ }
+
+ h.EventTime.History = h
+ allHistory = append(allHistory, h)
+ }
+ }
+
+ stages = []icingaDbOutputStage{
+ {insert: flappingHistory},
+ {upsert: flappingHistoryUpserts},
+ {insert: allHistory},
+ }
+ return
+}
+
+type notificationRow = struct {
+ NotificationId uint64
+ NotificationReason uint8
+ EndTime sql.NullInt64
+ EndTimeUsec uint32
+ State uint8
+ Output string
+ LongOutput sql.NullString
+ ContactsNotified uint16
+ ObjecttypeId uint8
+ Name1 string
+ Name2 string
+}
+
+func convertNotificationRows(
+ env string, envId icingadbTypes.Binary,
+ selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow,
+) (stages []icingaDbOutputStage, checkpoint any) {
+ if len(idoRows) < 1 {
+ return
+ }
+
+ var cached []struct {
+ HistoryId uint64
+ PreviousHardState uint8
+ }
+ selectCache(
+ &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?",
+ idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId,
+ )
+
+ cachedById := make(map[uint64]uint8, len(cached))
+ for _, c := range cached {
+ cachedById[c.HistoryId] = c.PreviousHardState
+ }
+
+ var contacts []struct {
+ NotificationId uint64
+ Name1 string
+ }
+
+ {
+ var query = ido.Rebind(
+ "SELECT c.notification_id, o.name1 FROM icinga_contactnotifications c " +
+ "INNER JOIN icinga_objects o ON o.object_id=c.contact_object_id WHERE c.notification_id BETWEEN ? AND ?",
+ )
+
+ err := ido.Select(&contacts, query, idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId)
+ if err != nil {
+ log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+ }
+
+ contactsById := map[uint64]map[string]struct{}{}
+ for _, contact := range contacts {
+ perId, ok := contactsById[contact.NotificationId]
+ if !ok {
+ perId = map[string]struct{}{}
+ contactsById[contact.NotificationId] = perId
+ }
+
+ perId[contact.Name1] = struct{}{}
+ }
+
+ var notificationHistory, userNotificationHistory, allHistory []contracts.Entity
+ for _, row := range idoRows {
+ checkpoint = row.NotificationId
+
+ if !row.EndTime.Valid {
+ continue
+ }
+
+ previousHardState, ok := cachedById[row.NotificationId]
+ if !ok {
+ continue
+ }
+
+ // The IDO tracks only sent notifications, but not notification config objects, nor even their names.
+ // We have to improvise. By the way we avoid unwanted collisions between synced and migrated data via "ID"
+ // instead of "HOST[!SERVICE]!NOTIFICATION" (ok as this name won't be parsed, but only hashed) and between
+ // migrated data itself via the history ID as object name, i.e. one "virtual object" per sent notification.
+ name := strconv.FormatUint(row.NotificationId, 10)
+
+ nt := convertNotificationType(row.NotificationReason, row.State)
+
+ ntEnum, err := nt.Value()
+ if err != nil {
+ continue
+ }
+
+ ts := convertTime(row.EndTime.Int64, row.EndTimeUsec)
+ tsMilli := float64(ts.Time().UnixMilli())
+ notificationHistoryId := hashAny([]interface{}{env, name, ntEnum, tsMilli})
+ id := hashAny([]interface{}{env, "notification", name, ntEnum, tsMilli})
+ typ := objectTypes[row.ObjecttypeId]
+ hostId := calcObjectId(env, row.Name1)
+ serviceId := calcServiceId(env, row.Name1, row.Name2)
+
+ text := row.Output
+ if row.LongOutput.Valid {
+ text += "\n\n" + row.LongOutput.String
+ }
+
+ notificationHistory = append(notificationHistory, &history.NotificationHistory{
+ HistoryTableEntity: history.HistoryTableEntity{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{Id: notificationHistoryId},
+ },
+ },
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ NotificationId: calcObjectId(env, name),
+ Type: nt,
+ SendTime: ts,
+ State: row.State,
+ PreviousHardState: previousHardState,
+ Text: icingadbTypes.String{
+ NullString: sql.NullString{
+ String: text,
+ Valid: true,
+ },
+ },
+ UsersNotified: row.ContactsNotified,
+ })
+
+ allHistory = append(allHistory, &history.HistoryNotification{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{Id: id},
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "notification",
+ },
+ NotificationHistoryId: notificationHistoryId,
+ EventTime: ts,
+ })
+
+ for contact := range contactsById[row.NotificationId] {
+ userId := calcObjectId(env, contact)
+
+ userNotificationHistory = append(userNotificationHistory, &history.UserNotificationHistory{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{
+ Id: utils.Checksum(append(append([]byte(nil), notificationHistoryId...), userId...)),
+ },
+ },
+ EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: envId},
+ NotificationHistoryId: notificationHistoryId,
+ UserId: userId,
+ })
+ }
+ }
+
+ stages = []icingaDbOutputStage{
+ {insert: notificationHistory},
+ {insert: userNotificationHistory},
+ {insert: allHistory},
+ }
+ return
+}
+
+// convertNotificationType maps IDO values[1] to Icinga DB ones[2].
+//
+// [1]: https://github.com/Icinga/icinga2/blob/32c7f7730db154ba0dff5856a8985d125791c/lib/db_ido/dbevents.cpp#L1507-L1524
+// [2]: https://github.com/Icinga/icingadb/blob/8f31ac143875498797725adb9bfacf3d4/pkg/types/notification_type.go#L53-L61
+func convertNotificationType(notificationReason, state uint8) icingadbTypes.NotificationType {
+ switch notificationReason {
+ case 0: // state
+ if state == 0 {
+ return 64 // recovery
+ } else {
+ return 32 // problem
+ }
+ case 1: // acknowledgement
+ return 16
+ case 2: // flapping start
+ return 128
+ case 3: // flapping end
+ return 256
+ case 5: // downtime start
+ return 1
+ case 6: // downtime end
+ return 2
+ case 7: // downtime removed
+ return 4
+ case 8: // custom
+ return 8
+ default: // bad notification type
+ return 0
+ }
+}
+
+type stateRow = struct {
+ StatehistoryId uint64
+ StateTime sql.NullInt64
+ StateTimeUsec uint32
+ State uint8
+ StateType uint8
+ CurrentCheckAttempt uint16
+ MaxCheckAttempts uint16
+ LastState uint8
+ LastHardState uint8
+ Output sql.NullString
+ LongOutput sql.NullString
+ CheckSource sql.NullString
+ ObjecttypeId uint8
+ Name1 string
+ Name2 string
+}
+
+func convertStateRows(
+ env string, envId icingadbTypes.Binary,
+ selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow,
+) (stages []icingaDbOutputStage, checkpoint any) {
+ if len(idoRows) < 1 {
+ return
+ }
+
+ var cached []struct {
+ HistoryId uint64
+ PreviousHardState uint8
+ }
+ selectCache(
+ &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?",
+ idoRows[0].StatehistoryId, idoRows[len(idoRows)-1].StatehistoryId,
+ )
+
+ cachedById := make(map[uint64]uint8, len(cached))
+ for _, c := range cached {
+ cachedById[c.HistoryId] = c.PreviousHardState
+ }
+
+ var stateHistory, allHistory, sla []contracts.Entity
+ for _, row := range idoRows {
+ checkpoint = row.StatehistoryId
+
+ if !row.StateTime.Valid {
+ continue
+ }
+
+ previousHardState, ok := cachedById[row.StatehistoryId]
+ if !ok {
+ continue
+ }
+
+ name := strings.Join([]string{row.Name1, row.Name2}, "!")
+ ts := convertTime(row.StateTime.Int64, row.StateTimeUsec)
+ tsMilli := float64(ts.Time().UnixMilli())
+ stateHistoryId := hashAny([]interface{}{env, name, tsMilli})
+ id := hashAny([]interface{}{env, "state_change", name, tsMilli})
+ typ := objectTypes[row.ObjecttypeId]
+ hostId := calcObjectId(env, row.Name1)
+ serviceId := calcServiceId(env, row.Name1, row.Name2)
+
+ stateHistory = append(stateHistory, &history.StateHistory{
+ HistoryTableEntity: history.HistoryTableEntity{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{Id: stateHistoryId},
+ },
+ },
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ EventTime: ts,
+ StateType: icingadbTypes.StateType(row.StateType),
+ SoftState: row.State,
+ HardState: row.LastHardState,
+ PreviousSoftState: row.LastState,
+ PreviousHardState: previousHardState,
+ CheckAttempt: uint8(row.CurrentCheckAttempt),
+ Output: icingadbTypes.String{NullString: row.Output},
+ LongOutput: icingadbTypes.String{NullString: row.LongOutput},
+ MaxCheckAttempts: uint32(row.MaxCheckAttempts),
+ CheckSource: icingadbTypes.String{NullString: row.CheckSource},
+ })
+
+ allHistory = append(allHistory, &history.HistoryState{
+ HistoryMeta: history.HistoryMeta{
+ HistoryEntity: history.HistoryEntity{Id: id},
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ EventType: "state_change",
+ },
+ StateHistoryId: stateHistoryId,
+ EventTime: ts,
+ })
+
+ if icingadbTypes.StateType(row.StateType) == icingadbTypes.StateHard {
+ // only hard state changes are relevant for SLA history, discard all others
+
+ sla = append(sla, &history.SlaHistoryState{
+ HistoryTableEntity: history.HistoryTableEntity{
+ EntityWithoutChecksum: v1.EntityWithoutChecksum{
+ IdMeta: v1.IdMeta{Id: stateHistoryId},
+ },
+ },
+ HistoryTableMeta: history.HistoryTableMeta{
+ EnvironmentId: envId,
+ ObjectType: typ,
+ HostId: hostId,
+ ServiceId: serviceId,
+ },
+ EventTime: ts,
+ StateType: icingadbTypes.StateType(row.StateType),
+ HardState: row.LastHardState,
+ PreviousHardState: previousHardState,
+ })
+ }
+ }
+
+ stages = []icingaDbOutputStage{
+ {insert: stateHistory},
+ {insert: allHistory},
+ {insert: sla},
+ }
+ return
+}
diff --git a/cmd/icingadb-migrate/embed/comment_query.sql b/cmd/icingadb-migrate/embed/comment_query.sql
new file mode 100644
index 0000000..774ccf9
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/comment_query.sql
@@ -0,0 +1,11 @@
+SELECT ch.commenthistory_id, UNIX_TIMESTAMP(ch.entry_time) entry_time,
+ ch.entry_time_usec, ch.entry_type, ch.author_name, ch.comment_data, ch.is_persistent,
+ COALESCE(UNIX_TIMESTAMP(ch.expiration_time), 0) expiration_time,
+ COALESCE(UNIX_TIMESTAMP(ch.deletion_time), 0) deletion_time,
+ ch.deletion_time_usec, ch.name, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
+FROM icinga_commenthistory ch USE INDEX (PRIMARY)
+INNER JOIN icinga_objects o ON o.object_id=ch.object_id
+WHERE ch.commenthistory_id BETWEEN :fromid AND :toid
+AND ch.commenthistory_id > :checkpoint -- where we were interrupted
+ORDER BY ch.commenthistory_id -- this way we know what has already been migrated from just the last row's ID
+LIMIT :bulk
diff --git a/cmd/icingadb-migrate/embed/downtime_query.sql b/cmd/icingadb-migrate/embed/downtime_query.sql
new file mode 100644
index 0000000..467dfca
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/downtime_query.sql
@@ -0,0 +1,14 @@
+SELECT dh.downtimehistory_id, UNIX_TIMESTAMP(dh.entry_time) entry_time, dh.author_name, dh.comment_data,
+ dh.is_fixed, dh.duration, UNIX_TIMESTAMP(dh.scheduled_start_time) scheduled_start_time,
+ COALESCE(UNIX_TIMESTAMP(dh.scheduled_end_time), 0) scheduled_end_time, dh.was_started,
+ COALESCE(UNIX_TIMESTAMP(dh.actual_start_time), 0) actual_start_time, dh.actual_start_time_usec,
+ COALESCE(UNIX_TIMESTAMP(dh.actual_end_time), 0) actual_end_time, dh.actual_end_time_usec, dh.was_cancelled,
+ COALESCE(UNIX_TIMESTAMP(dh.trigger_time), 0) trigger_time, dh.name, o.objecttype_id,
+ o.name1, COALESCE(o.name2, '') name2, COALESCE(sd.name, '') triggered_by
+FROM icinga_downtimehistory dh USE INDEX (PRIMARY)
+INNER JOIN icinga_objects o ON o.object_id=dh.object_id
+LEFT JOIN icinga_scheduleddowntime sd ON sd.scheduleddowntime_id=dh.triggered_by_id
+WHERE dh.downtimehistory_id BETWEEN :fromid AND :toid
+AND dh.downtimehistory_id > :checkpoint -- where we were interrupted
+ORDER BY dh.downtimehistory_id -- this way we know what has already been migrated from just the last row's ID
+LIMIT :bulk
diff --git a/cmd/icingadb-migrate/embed/event_time_cache_schema.sql b/cmd/icingadb-migrate/embed/event_time_cache_schema.sql
new file mode 100644
index 0000000..5940754
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/event_time_cache_schema.sql
@@ -0,0 +1,15 @@
+PRAGMA main.auto_vacuum = 1;
+
+-- Icinga DB's flapping_history#start_time per flapping_end row (IDO's icinga_flappinghistory#flappinghistory_id).
+CREATE TABLE IF NOT EXISTS end_start_time (
+ history_id INT PRIMARY KEY,
+ event_time INT NOT NULL,
+ event_time_usec INT NOT NULL
+);
+
+-- Helper table, the last start_time per icinga_statehistory#object_id.
+CREATE TABLE IF NOT EXISTS last_start_time (
+ object_id INT PRIMARY KEY,
+ event_time INT NOT NULL,
+ event_time_usec INT NOT NULL
+);
diff --git a/cmd/icingadb-migrate/embed/flapping_query.sql b/cmd/icingadb-migrate/embed/flapping_query.sql
new file mode 100644
index 0000000..5e25bde
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/flapping_query.sql
@@ -0,0 +1,9 @@
+SELECT fh.flappinghistory_id, UNIX_TIMESTAMP(fh.event_time) event_time,
+ fh.event_time_usec, fh.event_type, fh.percent_state_change, fh.low_threshold,
+ fh.high_threshold, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
+FROM icinga_flappinghistory fh USE INDEX (PRIMARY)
+INNER JOIN icinga_objects o ON o.object_id=fh.object_id
+WHERE fh.flappinghistory_id BETWEEN :fromid AND :toid
+AND fh.flappinghistory_id > :checkpoint -- where we were interrupted
+ORDER BY fh.flappinghistory_id -- this way we know what has already been migrated from just the last row's ID
+LIMIT :bulk
diff --git a/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql b/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql
new file mode 100644
index 0000000..54c1c00
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/ido_migration_progress_schema.sql
@@ -0,0 +1,9 @@
+CREATE TABLE IF NOT EXISTS ido_migration_progress (
+ environment_id CHAR(40) NOT NULL, -- Hex SHA1. Rationale: CHAR(40) is not RDBMS-specific
+ history_type VARCHAR(63) NOT NULL,
+ from_ts BIGINT NOT NULL,
+ to_ts BIGINT NOT NULL,
+ last_ido_id BIGINT NOT NULL,
+
+ CONSTRAINT pk_ido_migration_progress PRIMARY KEY (environment_id, history_type, from_ts, to_ts)
+);
diff --git a/cmd/icingadb-migrate/embed/notification_query.sql b/cmd/icingadb-migrate/embed/notification_query.sql
new file mode 100644
index 0000000..f963b2e
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/notification_query.sql
@@ -0,0 +1,9 @@
+SELECT n.notification_id, n.notification_reason, UNIX_TIMESTAMP(n.end_time) end_time,
+ n.end_time_usec, n.state, COALESCE(n.output, '') output, n.long_output,
+ n.contacts_notified, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
+FROM icinga_notifications n USE INDEX (PRIMARY)
+INNER JOIN icinga_objects o ON o.object_id=n.object_id
+WHERE n.notification_id BETWEEN :fromid AND :toid
+AND n.notification_id <= :cache_limit AND n.notification_id > :checkpoint -- where we were interrupted
+ORDER BY n.notification_id -- this way we know what has already been migrated from just the last row's ID
+LIMIT :bulk
diff --git a/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql b/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql
new file mode 100644
index 0000000..315f22d
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/previous_hard_state_cache_schema.sql
@@ -0,0 +1,22 @@
+PRAGMA main.auto_vacuum = 1;
+
+-- Icinga DB's state_history#previous_hard_state per IDO's icinga_statehistory#statehistory_id.
+CREATE TABLE IF NOT EXISTS previous_hard_state (
+ history_id INT PRIMARY KEY,
+ previous_hard_state INT NOT NULL
+);
+
+-- Helper table, the current last_hard_state per icinga_statehistory#object_id.
+CREATE TABLE IF NOT EXISTS next_hard_state (
+ object_id INT PRIMARY KEY,
+ next_hard_state INT NOT NULL
+);
+
+-- Helper table for stashing icinga_statehistory#statehistory_id until last_hard_state changes.
+CREATE TABLE IF NOT EXISTS next_ids (
+ object_id INT NOT NULL,
+ history_id INT NOT NULL
+);
+
+CREATE INDEX IF NOT EXISTS next_ids_object_id ON next_ids (object_id);
+CREATE INDEX IF NOT EXISTS next_ids_history_id ON next_ids (history_id);
diff --git a/cmd/icingadb-migrate/embed/state_query.sql b/cmd/icingadb-migrate/embed/state_query.sql
new file mode 100644
index 0000000..3e95d48
--- /dev/null
+++ b/cmd/icingadb-migrate/embed/state_query.sql
@@ -0,0 +1,9 @@
+SELECT sh.statehistory_id, UNIX_TIMESTAMP(sh.state_time) state_time, sh.state_time_usec, sh.state,
+ sh.state_type, sh.current_check_attempt, sh.max_check_attempts, sh.last_state, sh.last_hard_state,
+ sh.output, sh.long_output, sh.check_source, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
+FROM icinga_statehistory sh USE INDEX (PRIMARY)
+INNER JOIN icinga_objects o ON o.object_id=sh.object_id
+WHERE sh.statehistory_id BETWEEN :fromid AND :toid
+AND sh.statehistory_id <= :cache_limit AND sh.statehistory_id > :checkpoint -- where we were interrupted
+ORDER BY sh.statehistory_id -- this way we know what has already been migrated from just the last row's ID
+LIMIT :bulk
diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go
new file mode 100644
index 0000000..c089e78
--- /dev/null
+++ b/cmd/icingadb-migrate/main.go
@@ -0,0 +1,488 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ _ "embed"
+ "encoding/hex"
+ "fmt"
+ "github.com/creasty/defaults"
+ "github.com/goccy/go-yaml"
+ "github.com/icinga/icingadb/pkg/config"
+ "github.com/icinga/icingadb/pkg/icingadb"
+ "github.com/icinga/icingadb/pkg/logging"
+ icingadbTypes "github.com/icinga/icingadb/pkg/types"
+ "github.com/icinga/icingadb/pkg/utils"
+ "github.com/jessevdk/go-flags"
+ "github.com/jmoiron/sqlx"
+ "github.com/jmoiron/sqlx/reflectx"
+ _ "github.com/mattn/go-sqlite3"
+ "github.com/pkg/errors"
+ "github.com/vbauerster/mpb/v6"
+ "go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
+ "math"
+ "os"
+ "path"
+ "path/filepath"
+ "regexp"
+ "strings"
+ "time"
+)
+
+// Flags defines the CLI flags.
+type Flags struct {
+ // Config is the path to the config file.
+ Config string `short:"c" long:"config" description:"path to config file" required:"true"`
+ // Cache is a (not necessarily yet existing) directory for caching.
+ Cache string `short:"t" long:"cache" description:"path for caching" required:"true"`
+}
+
+// Config defines the YAML config structure.
+type Config struct {
+ IDO struct {
+ config.Database `yaml:"-,inline"`
+ From int32 `yaml:"from"`
+ To int32 `yaml:"to" default:"2147483647"`
+ } `yaml:"ido"`
+ IcingaDB config.Database `yaml:"icingadb"`
+ // Icinga2 specifies information the IDO doesn't provide.
+ Icinga2 struct {
+ // Env specifies the environment ID, hex.
+ Env string `yaml:"env"`
+ } `yaml:"icinga2"`
+}
+
+// main validates the CLI, parses the config and migrates history from IDO to Icinga DB (see comments below).
+// Most of the called functions exit the whole program by themselves on non-recoverable errors.
+func main() {
+ f := &Flags{}
+ if _, err := flags.NewParser(f, flags.Default).Parse(); err != nil {
+ os.Exit(2)
+ }
+
+ c, ex := parseConfig(f)
+ if c == nil {
+ os.Exit(ex)
+ }
+
+ envId, err := hex.DecodeString(c.Icinga2.Env)
+ if err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "bad env ID: %s\n", err.Error())
+ os.Exit(2)
+ }
+
+ defer func() { _ = log.Sync() }()
+
+ log.Info("Starting IDO to Icinga DB history migration")
+
+ ido, idb := connectAll(c)
+
+ if err := idb.CheckSchema(context.Background()); err != nil {
+ log.Fatalf("%+v", err)
+ }
+
+ // Start repeatable-read-isolated transactions (consistent SELECTs)
+ // not to have to care for IDO data changes during migration.
+ startIdoTx(ido)
+
+ // Prepare the directory structure the following fillCache() will need later.
+ mkCache(f, c, idb.Mapper)
+
+ log.Info("Computing progress")
+
+ // Convert Config#IDO.From and .To to IDs to restrict data by PK.
+ computeIdRange(c)
+
+ // computeProgress figures out which data has already been migrated
+ // not to start from the beginning every time in the following migrate().
+ computeProgress(c, idb, envId)
+
+ // On rationale read buildEventTimeCache() and buildPreviousHardStateCache() docs.
+ log.Info("Filling cache")
+ fillCache()
+
+ log.Info("Actually migrating")
+ migrate(c, idb, envId)
+
+ log.Info("Cleaning up cache")
+ cleanupCache(f)
+}
+
+// parseConfig validates the f.Config file and returns the config and -1 or - on failure - nil and an exit code.
+func parseConfig(f *Flags) (_ *Config, exit int) {
+ cf, err := os.Open(f.Config)
+ if err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "can't open config file: %s\n", err.Error())
+ return nil, 2
+ }
+ defer func() { _ = cf.Close() }()
+
+ c := &Config{}
+ if err := defaults.Set(c); err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "can't set config defaults: %s\n", err.Error())
+ return nil, 2
+ }
+
+ if err := yaml.NewDecoder(cf, yaml.DisallowUnknownField()).Decode(c); err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "can't parse config file: %s\n", err.Error())
+ return nil, 2
+ }
+
+ return c, -1
+}
+
+var nonWords = regexp.MustCompile(`\W+`)
+
+// mkCache ensures <f.Cache>/<history type>.sqlite3 files are present and contain their schema
+// and initializes types[*].cache. (On non-recoverable errors the whole program exits.)
+func mkCache(f *Flags, c *Config, mapper *reflectx.Mapper) {
+ log.Info("Preparing cache")
+
+ if err := os.MkdirAll(f.Cache, 0700); err != nil {
+ log.With("dir", f.Cache).Fatalf("%+v", errors.Wrap(err, "can't create directory"))
+ }
+
+ types.forEach(func(ht *historyType) {
+ if ht.cacheSchema == "" {
+ return
+ }
+
+ file := path.Join(f.Cache, fmt.Sprintf(
+ "%s_%d-%d.sqlite3", nonWords.ReplaceAllLiteralString(ht.name, "_"), c.IDO.From, c.IDO.To,
+ ))
+
+ var err error
+
+ ht.cache, err = sqlx.Open("sqlite3", "file:"+file)
+ if err != nil {
+ log.With("file", file).Fatalf("%+v", errors.Wrap(err, "can't open SQLite database"))
+ }
+
+ ht.cacheFile = file
+ ht.cache.Mapper = mapper
+
+ if _, err := ht.cache.Exec(ht.cacheSchema); err != nil {
+ log.With("file", file, "ddl", ht.cacheSchema).
+ Fatalf("%+v", errors.Wrap(err, "can't import schema into SQLite database"))
+ }
+ })
+}
+
+// connectAll connects to ido and idb (Icinga DB) as c specifies. (On non-recoverable errors the whole program exits.)
+func connectAll(c *Config) (ido, idb *icingadb.DB) {
+ log.Info("Connecting to databases")
+ eg, _ := errgroup.WithContext(context.Background())
+
+ eg.Go(func() error {
+ ido = connect("IDO", &c.IDO.Database)
+ return nil
+ })
+
+ eg.Go(func() error {
+ idb = connect("Icinga DB", &c.IcingaDB)
+ return nil
+ })
+
+ _ = eg.Wait()
+ return
+}
+
+// connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.)
+func connect(which string, cfg *config.Database) *icingadb.DB {
+ db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second))
+ if err != nil {
+ log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database"))
+ }
+
+ if err := db.Ping(); err != nil {
+ log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database"))
+ }
+
+ return db
+}
+
+// startIdoTx initializes types[*].snapshot with new repeatable-read-isolated ido transactions.
+// (On non-recoverable errors the whole program exits.)
+func startIdoTx(ido *icingadb.DB) {
+ types.forEach(func(ht *historyType) {
+ tx, err := ido.BeginTxx(context.Background(), &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
+ if err != nil {
+ log.Fatalf("%+v", errors.Wrap(err, "can't begin snapshot transaction"))
+ }
+
+ ht.snapshot = tx
+ })
+}
+
+// computeIdRange initializes types[*].fromId and types[*].toId.
+// (On non-recoverable errors the whole program exits.)
+func computeIdRange(c *Config) {
+ types.forEach(func(ht *historyType) {
+ getBorderId := func(id *uint64, timeColumns []string, compOperator string, borderTime int32, sortOrder string) {
+ deZeroFied := make([]string, 0, len(timeColumns))
+ for _, column := range timeColumns {
+ deZeroFied = append(deZeroFied, fmt.Sprintf(
+ "CASE WHEN %[1]s < '1970-01-03 00:00:00' THEN NULL ELSE %[1]s END", column,
+ ))
+ }
+
+ var timeExpr string
+ if len(deZeroFied) > 1 {
+ timeExpr = "COALESCE(" + strings.Join(deZeroFied, ",") + ")"
+ } else {
+ timeExpr = deZeroFied[0]
+ }
+
+ query := ht.snapshot.Rebind(
+ "SELECT " + ht.idoIdColumn + " FROM " + ht.idoTable + " WHERE " + timeExpr + " " + compOperator +
+ " FROM_UNIXTIME(?) ORDER BY " + ht.idoIdColumn + " " + sortOrder + " LIMIT 1",
+ )
+
+ switch err := ht.snapshot.Get(id, query, borderTime); err {
+ case nil, sql.ErrNoRows:
+ default:
+ log.With("backend", "IDO", "query", query, "args", []any{borderTime}).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+ }
+
+ ht.fromId = math.MaxInt64
+
+ getBorderId(&ht.fromId, ht.idoEndColumns, ">=", c.IDO.From, "ASC")
+ getBorderId(&ht.toId, ht.idoStartColumns, "<=", c.IDO.To, "DESC")
+ })
+}
+
+//go:embed embed/ido_migration_progress_schema.sql
+var idoMigrationProgressSchema string
+
+// computeProgress initializes types[*].lastId, types[*].total and types[*].done.
+// (On non-recoverable errors the whole program exits.)
+func computeProgress(c *Config, idb *icingadb.DB, envId []byte) {
+ if _, err := idb.Exec(idoMigrationProgressSchema); err != nil {
+ log.Fatalf("%+v", errors.Wrap(err, "can't create table ido_migration_progress"))
+ }
+
+ envIdHex := hex.EncodeToString(envId)
+ types.forEach(func(ht *historyType) {
+ var query = idb.Rebind(
+ "SELECT last_ido_id FROM ido_migration_progress" +
+ " WHERE environment_id=? AND history_type=? AND from_ts=? AND to_ts=?",
+ )
+
+ args := []any{envIdHex, ht.name, c.IDO.From, c.IDO.To}
+
+ if err := idb.Get(&ht.lastId, query, args...); err != nil && err != sql.ErrNoRows {
+ log.With("backend", "Icinga DB", "query", query, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+ })
+
+ types.forEach(func(ht *historyType) {
+ if ht.cacheFiller != nil {
+ err := ht.snapshot.Get(
+ &ht.cacheTotal,
+ ht.snapshot.Rebind(
+ // For actual migration icinga_objects will be joined anyway,
+ // so it makes no sense to take vanished objects into account.
+ "SELECT COUNT(*) FROM "+ht.idoTable+
+ " xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ht.idoIdColumn+" <= ?",
+ ),
+ ht.toId,
+ )
+ if err != nil {
+ log.Fatalf("%+v", errors.Wrap(err, "can't count query"))
+ }
+ }
+ })
+
+ types.forEach(func(ht *historyType) {
+ var rows []struct {
+ Migrated uint8
+ Cnt int64
+ }
+
+ err := ht.snapshot.Select(
+ &rows,
+ ht.snapshot.Rebind(
+ // For actual migration icinga_objects will be joined anyway,
+ // so it makes no sense to take vanished objects into account.
+ "SELECT CASE WHEN xh."+ht.idoIdColumn+"<=? THEN 1 ELSE 0 END migrated, COUNT(*) cnt FROM "+
+ ht.idoTable+" xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
+ ht.idoIdColumn+" BETWEEN ? AND ? GROUP BY migrated",
+ ),
+ ht.lastId, ht.fromId, ht.toId,
+ )
+ if err != nil {
+ log.Fatalf("%+v", errors.Wrap(err, "can't count query"))
+ }
+
+ for _, row := range rows {
+ ht.total += row.Cnt
+
+ if row.Migrated == 1 {
+ ht.done = row.Cnt
+ }
+ }
+
+ log.Infow("Counted migrated IDO events", "type", ht.name, "migrated", ht.done, "total", ht.total)
+ })
+}
+
+// fillCache fills <f.Cache>/<history type>.sqlite3 (actually types[*].cacheFiller does).
+func fillCache() {
+ progress := mpb.New()
+ for _, ht := range types {
+ if ht.cacheFiller != nil {
+ ht.setupBar(progress, ht.cacheTotal)
+ }
+ }
+
+ types.forEach(func(ht *historyType) {
+ if ht.cacheFiller != nil {
+ ht.cacheFiller(ht)
+ }
+ })
+
+ progress.Wait()
+}
+
+// migrate does the actual migration.
+func migrate(c *Config, idb *icingadb.DB, envId []byte) {
+ progress := mpb.New()
+ for _, ht := range types {
+ ht.setupBar(progress, ht.total)
+ }
+
+ types.forEach(func(ht *historyType) {
+ ht.migrate(c, idb, envId, ht)
+ })
+
+ progress.Wait()
+}
+
+// migrate does the actual migration for one history type.
+func migrateOneType[IdoRow any](
+ c *Config, idb *icingadb.DB, envId []byte, ht *historyType,
+ convertRows func(env string, envId icingadbTypes.Binary,
+ selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx,
+ idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any),
+) {
+ var lastQuery string
+ var lastStmt *sqlx.Stmt
+
+ defer func() {
+ if lastStmt != nil {
+ _ = lastStmt.Close()
+ }
+ }()
+
+ selectCache := func(dest interface{}, query string, args ...interface{}) {
+ // Prepare new one, if old one doesn't fit anymore.
+ if query != lastQuery {
+ if lastStmt != nil {
+ _ = lastStmt.Close()
+ }
+
+ var err error
+
+ lastStmt, err = ht.cache.Preparex(query)
+ if err != nil {
+ log.With("backend", "cache", "query", query).
+ Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
+ }
+
+ lastQuery = query
+ }
+
+ if err := lastStmt.Select(dest, args...); err != nil {
+ log.With("backend", "cache", "query", query, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+ }
+
+ var args map[string]interface{}
+
+ // For the case that the cache was older that the IDO,
+ // but ht.cacheFiller couldn't update it, limit (WHERE) our source data set.
+ if ht.cacheLimitQuery != "" {
+ var limit sql.NullInt64
+ cacheGet(ht.cache, &limit, ht.cacheLimitQuery)
+ args = map[string]interface{}{"cache_limit": limit.Int64}
+ }
+
+ upsertProgress, _ := idb.BuildUpsertStmt(&IdoMigrationProgress{})
+ envIdHex := hex.EncodeToString(envId)
+
+ ht.bar.SetCurrent(ht.done)
+
+ // Stream IDO rows, ...
+ sliceIdoHistory(
+ ht, ht.migrationQuery, args, ht.lastId,
+ func(idoRows []IdoRow) (checkpoint interface{}) {
+ // ... convert them, ...
+ stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)
+
+ // ... and insert them:
+
+ for _, stage := range stages {
+ if len(stage.insert) > 0 {
+ ch := utils.ChanFromSlice(stage.insert)
+
+ if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil {
+ log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])).
+ Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
+ }
+ }
+
+ if len(stage.upsert) > 0 {
+ ch := utils.ChanFromSlice(stage.upsert)
+
+ if err := idb.UpsertStreamed(context.Background(), ch); err != nil {
+ log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])).
+ Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
+ }
+ }
+ }
+
+ if lastIdoId != nil {
+ args := map[string]interface{}{"history_type": ht.name, "last_ido_id": lastIdoId}
+
+ _, err := idb.NamedExec(upsertProgress, &IdoMigrationProgress{
+ IdoMigrationProgressUpserter{lastIdoId}, envIdHex, ht.name, c.IDO.From, c.IDO.To,
+ })
+ if err != nil {
+ log.With("backend", "Icinga DB", "dml", upsertProgress, "args", args).
+ Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
+ }
+ }
+
+ ht.bar.IncrBy(len(idoRows))
+ return lastIdoId
+ },
+ )
+
+ ht.bar.SetTotal(ht.bar.Current(), true)
+}
+
+// cleanupCache removes <f.Cache>/<history type>.sqlite3 files.
+func cleanupCache(f *Flags) {
+ types.forEach(func(ht *historyType) {
+ if ht.cacheFile != "" {
+ if err := ht.cache.Close(); err != nil {
+ log.With("file", ht.cacheFile).Warnf("%+v", errors.Wrap(err, "can't close SQLite database"))
+ }
+ }
+ })
+
+ if matches, err := filepath.Glob(path.Join(f.Cache, "*.sqlite3")); err == nil {
+ for _, match := range matches {
+ if err := os.Remove(match); err != nil {
+ log.With("file", match).Warnf("%+v", errors.Wrap(err, "can't remove SQLite database"))
+ }
+ }
+ } else {
+ log.With("dir", f.Cache).Warnf("%+v", errors.Wrap(err, "can't list SQLite databases"))
+ }
+}
diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go
new file mode 100644
index 0000000..f1db20c
--- /dev/null
+++ b/cmd/icingadb-migrate/misc.go
@@ -0,0 +1,321 @@
+package main
+
+import (
+ "context"
+ "crypto/sha1"
+ "github.com/icinga/icingadb/pkg/contracts"
+ "github.com/icinga/icingadb/pkg/driver"
+ "github.com/icinga/icingadb/pkg/icingadb"
+ "github.com/icinga/icingadb/pkg/icingadb/objectpacker"
+ icingadbTypes "github.com/icinga/icingadb/pkg/types"
+ "github.com/jmoiron/sqlx"
+ "github.com/pkg/errors"
+ "github.com/vbauerster/mpb/v6"
+ "github.com/vbauerster/mpb/v6/decor"
+ "go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
+ "strings"
+ "time"
+)
+
+type IdoMigrationProgressUpserter struct {
+ LastIdoId any `json:"last_ido_id"`
+}
+
+// Upsert implements the contracts.Upserter interface.
+func (impu *IdoMigrationProgressUpserter) Upsert() interface{} {
+ return impu
+}
+
+type IdoMigrationProgress struct {
+ IdoMigrationProgressUpserter `json:",inline"`
+ EnvironmentId string `json:"environment_id"`
+ HistoryType string `json:"history_type"`
+ FromTs int32 `json:"from_ts"`
+ ToTs int32 `json:"to_ts"`
+}
+
+// Assert interface compliance.
+var (
+ _ contracts.Upserter = (*IdoMigrationProgressUpserter)(nil)
+ _ contracts.Upserter = (*IdoMigrationProgress)(nil)
+)
+
+// log is the root logger.
+var log = func() *zap.SugaredLogger {
+ logger, err := zap.NewDevelopmentConfig().Build()
+ if err != nil {
+ panic(err)
+ }
+
+ return logger.Sugar()
+}()
+
+// objectTypes maps IDO values to Icinga DB ones.
+var objectTypes = map[uint8]string{1: "host", 2: "service"}
+
+// hashAny combines objectpacker.PackAny and SHA1 hashing.
+func hashAny(in interface{}) []byte {
+ hash := sha1.New()
+ if err := objectpacker.PackAny(in, hash); err != nil {
+ panic(err)
+ }
+
+ return hash.Sum(nil)
+}
+
+// convertTime converts *nix timestamps from the IDO for Icinga DB.
+func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli {
+ if ts == 0 && tsUs == 0 {
+ return icingadbTypes.UnixMilli{}
+ }
+
+ return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond)))
+}
+
+// calcObjectId calculates the ID of the config object named name1 for Icinga DB.
+func calcObjectId(env, name1 string) []byte {
+ if name1 == "" {
+ return nil
+ }
+
+ return hashAny([2]string{env, name1})
+}
+
+// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB.
+func calcServiceId(env, name1, name2 string) []byte {
+ if name2 == "" {
+ return nil
+ }
+
+ return hashAny([2]string{env, name1 + "!" + name2})
+}
+
+// sliceIdoHistory performs query with args+fromid,toid,checkpoint,bulk on ht.snapshot
+// and passes the results to onRows until either an empty result set or onRows() returns nil.
+// Rationale: split the likely large result set of a query by adding a WHERE condition and a LIMIT,
+// both with :named placeholders (:checkpoint, :bulk).
+// checkpoint is the initial value for the WHERE condition, onRows() returns follow-up ones.
+// (On non-recoverable errors the whole program exits.)
+func sliceIdoHistory[Row any](
+ ht *historyType, query string, args map[string]any,
+ checkpoint interface{}, onRows func([]Row) (checkpoint interface{}),
+) {
+ if args == nil {
+ args = map[string]interface{}{}
+ }
+
+ args["fromid"] = ht.fromId
+ args["toid"] = ht.toId
+ args["checkpoint"] = checkpoint
+ args["bulk"] = 20000
+
+ if ht.snapshot.DriverName() != driver.MySQL {
+ query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "")
+ }
+
+ for {
+ // TODO: use Tx#SelectNamed() one nice day (https://github.com/jmoiron/sqlx/issues/779)
+ stmt, err := ht.snapshot.PrepareNamed(query)
+ if err != nil {
+ log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
+ }
+
+ var rows []Row
+ if err := stmt.Select(&rows, args); err != nil {
+ log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
+ }
+
+ _ = stmt.Close()
+
+ if len(rows) < 1 {
+ break
+ }
+
+ if checkpoint = onRows(rows); checkpoint == nil {
+ break
+ }
+
+ args["checkpoint"] = checkpoint
+ }
+}
+
+type progressBar struct {
+ *mpb.Bar
+
+ lastUpdate time.Time
+}
+
+// IncrBy does pb.Bar.DecoratorEwmaUpdate() automatically.
+func (pb *progressBar) IncrBy(n int) {
+ pb.Bar.IncrBy(n)
+
+ now := time.Now()
+
+ if !pb.lastUpdate.IsZero() {
+ pb.Bar.DecoratorEwmaUpdate(now.Sub(pb.lastUpdate))
+ }
+
+ pb.lastUpdate = now
+}
+
+// historyType specifies a history data type.
+type historyType struct {
+ // name is a human-readable common name.
+ name string
+ // idoTable specifies the source table.
+ idoTable string
+ // idoIdColumn specifies idoTable's primary key.
+ idoIdColumn string
+ // idoStartColumns specifies idoTable's event start time locations. (First non-NULL is used.)
+ idoStartColumns []string
+ // idoEndColumns specifies idoTable's event end time locations. (First non-NULL is used.)
+ idoEndColumns []string
+ // cacheSchema specifies <name>.sqlite3's structure.
+ cacheSchema string
+ // cacheFiller fills cache from snapshot.
+ cacheFiller func(*historyType)
+ // cacheLimitQuery rationale: see migrate().
+ cacheLimitQuery string
+ // migrationQuery SELECTs source data for actual migration.
+ migrationQuery string
+ // migrate does the actual migration.
+ migrate func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType)
+
+ // cacheFile locates <name>.sqlite3.
+ cacheFile string
+ // cache represents <cacheFile>.
+ cache *sqlx.DB
+ // snapshot represents the data source.
+ snapshot *sqlx.Tx
+ // fromId is the first IDO row ID to migrate.
+ fromId uint64
+ // toId is the last IDO row ID to migrate.
+ toId uint64
+ // total summarizes the source data.
+ total int64
+ // cacheTotal summarizes the cache source data.
+ cacheTotal int64
+ // done summarizes the migrated data.
+ done int64
+ // bar represents the current progress bar.
+ bar *progressBar
+ // lastId is the last already migrated ID.
+ lastId uint64
+}
+
+// setupBar (re-)initializes ht.bar.
+func (ht *historyType) setupBar(progress *mpb.Progress, total int64) {
+ ht.bar = &progressBar{Bar: progress.AddBar(
+ total,
+ mpb.BarFillerClearOnComplete(),
+ mpb.PrependDecorators(
+ decor.Name(ht.name, decor.WC{W: len(ht.name) + 1, C: decor.DidentRight}),
+ decor.Percentage(decor.WC{W: 5}),
+ ),
+ mpb.AppendDecorators(
+ decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{W: 4}),
+ decor.Name(" "),
+ decor.EwmaSpeed(0, "%.0f/s", 0, decor.WC{W: 4}),
+ ),
+ )}
+}
+
+type historyTypes []*historyType
+
+// forEach performs f per hts in parallel.
+func (hts historyTypes) forEach(f func(*historyType)) {
+ eg, _ := errgroup.WithContext(context.Background())
+ for _, ht := range hts {
+ ht := ht
+ eg.Go(func() error {
+ f(ht)
+ return nil
+ })
+ }
+
+ _ = eg.Wait()
+}
+
+type icingaDbOutputStage struct {
+ insert, upsert []contracts.Entity
+}
+
+var types = historyTypes{
+ {
+ name: "ack & comment",
+ idoTable: "icinga_commenthistory",
+ idoIdColumn: "commenthistory_id",
+ idoStartColumns: []string{"entry_time"},
+ // Manual deletion time wins vs. time of expiration which never happens due to manual deletion.
+ idoEndColumns: []string{"deletion_time", "expiration_time"},
+ migrationQuery: commentMigrationQuery,
+ migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
+ migrateOneType(c, idb, envId, ht, convertCommentRows)
+ },
+ },
+ {
+ name: "downtime",
+ idoTable: "icinga_downtimehistory",
+ idoIdColumn: "downtimehistory_id",
+ // Fall back to scheduled time if actual time is missing.
+ idoStartColumns: []string{"actual_start_time", "scheduled_start_time"},
+ idoEndColumns: []string{"actual_end_time", "scheduled_end_time"},
+ migrationQuery: downtimeMigrationQuery,
+ migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
+ migrateOneType(c, idb, envId, ht, convertDowntimeRows)
+ },
+ },
+ {
+ name: "flapping",
+ idoTable: "icinga_flappinghistory",
+ idoIdColumn: "flappinghistory_id",
+ idoStartColumns: []string{"event_time"},
+ idoEndColumns: []string{"event_time"},
+ cacheSchema: eventTimeCacheSchema,
+ cacheFiller: func(ht *historyType) {
+ buildEventTimeCache(ht, []string{
+ "xh.flappinghistory_id id", "UNIX_TIMESTAMP(xh.event_time) event_time",
+ "xh.event_time_usec", "1001-xh.event_type event_is_start", "xh.object_id",
+ })
+ },
+ migrationQuery: flappingMigrationQuery,
+ migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
+ migrateOneType(c, idb, envId, ht, convertFlappingRows)
+ },
+ },
+ {
+ name: "notification",
+ idoTable: "icinga_notifications",
+ idoIdColumn: "notification_id",
+ idoStartColumns: []string{"start_time"},
+ idoEndColumns: []string{"end_time"},
+ cacheSchema: previousHardStateCacheSchema,
+ cacheFiller: func(ht *historyType) {
+ buildPreviousHardStateCache(ht, []string{
+ "xh.notification_id id", "xh.object_id", "xh.state last_hard_state",
+ })
+ },
+ cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
+ migrationQuery: notificationMigrationQuery,
+ migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
+ migrateOneType(c, idb, envId, ht, convertNotificationRows)
+ },
+ },
+ {
+ name: "state",
+ idoTable: "icinga_statehistory",
+ idoIdColumn: "statehistory_id",
+ idoStartColumns: []string{"state_time"},
+ idoEndColumns: []string{"state_time"},
+ cacheSchema: previousHardStateCacheSchema,
+ cacheFiller: func(ht *historyType) {
+ buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"})
+ },
+ cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
+ migrationQuery: stateMigrationQuery,
+ migrate: func(c *Config, idb *icingadb.DB, envId []byte, ht *historyType) {
+ migrateOneType(c, idb, envId, ht, convertStateRows)
+ },
+ },
+}